From 2b88ef9bbf6b119d22e78940a8a41e0d9e39b704 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 22 Apr 2026 14:47:29 +0500 Subject: [PATCH 01/16] Create placeholder instances --- .../pipeline_tasks/instances/__init__.py | 7 + .../pipeline_tasks/jobs_submitted.py | 254 ++++++++++++++++-- .../pipeline_tasks/jobs_terminating.py | 14 + ...0_add_instancemodel_provisioning_job_id.py | 32 +++ src/dstack/_internal/server/models.py | 7 + .../_internal/server/services/fleets.py | 2 + src/dstack/_internal/server/testing/common.py | 2 + 7 files changed, 298 insertions(+), 20 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/2026/04_22_1200_add_instancemodel_provisioning_job_id.py diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py index a070eb800..5b1ce5a26 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py @@ -179,6 +179,13 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]: InstanceModel.compute_group_id.is_not(None), ) ), + # Skip placeholder instances managed by JobSubmittedPipeline. + not_( + and_( + InstanceModel.status == InstanceStatus.PENDING, + InstanceModel.provisioning_job_id.is_not(None), + ) + ), InstanceModel.deleted == False, or_( # Process fast-moving instances (pending, provisioning, terminating) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index d29d68db8..d958a8f5a 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -2,7 +2,7 @@ import copy import uuid from contextlib import AsyncExitStack -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import timedelta from typing import Optional, Sequence, Union @@ -85,6 +85,7 @@ from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.docker import apply_server_docker_defaults from dstack._internal.server.services.fleets import ( + can_create_new_cloud_instance_in_fleet, check_can_create_new_cloud_instance_in_fleet, get_fleet_master_instance_provisioning_data, get_fleet_spec, @@ -392,6 +393,7 @@ class _TerminateSubmittedJobResult: message: Optional[str] = None locked_fleet_id: Optional[uuid.UUID] = None placement_group_cleanup: Optional[_PlacementGroupCleanup] = None + placeholder_instance_ids: list[uuid.UUID] = field(default_factory=list) @dataclass @@ -559,6 +561,48 @@ async def _apply_assignment_result( return if isinstance(assignment, _NewCapacityAssignment): + # For single-instance jobs, create a placeholder instance under fleet lock + # so that instance_num is unique and nodes.max is enforced as a hard limit. + # Compute groups still use the old path (placeholder created after provisioning). + if len(context.jobs_to_provision) == 1: + async with AsyncExitStack() as exit_stack: + fleet_model = await _lock_fleet_for_placeholder( + exit_stack=exit_stack, + session=session, + fleet_id=assignment.fleet_id, + ) + if fleet_model is None: + logger.debug( + "%s: failed to lock fleet for placeholder creation", + fmt(context.job_model), + ) + await _reset_job_lock_for_retry(session=session, item=item) + return + fleet_spec = get_fleet_spec(fleet_model) + if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec): + logger.debug( + "%s: fleet %s is full, retrying assignment", + fmt(context.job_model), + fleet_model.name, + ) + await _reset_job_lock_for_retry(session=session, item=item) + return + instance_model = _create_placeholder_instance( + fleet_model=fleet_model, + project=context.project, + job_model=job_model, + ) + session.add(instance_model) + job_model.used_instance_id = instance_model.id + events.emit( + session, + f"Instance created for job. Instance status: {instance_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(instance_model), + events.Target.from_model(job_model), + ], + ) job_model.fleet_id = assignment.fleet_id job_model.instance_assigned = True await _mark_job_processed(session=session, job_model=job_model) @@ -922,6 +966,104 @@ async def _lock_assignment_fleet_for_existing_instance_assignment( return fleets_with_locked_instances[0] +async def _lock_fleet_for_placeholder( + exit_stack: AsyncExitStack, + session: AsyncSession, + fleet_id: uuid.UUID, +) -> Optional[FleetModel]: + """Lock a fleet and load its non-deleted instances for placeholder creation. + + Returns the fleet model with instances loaded, or None if the fleet + cannot be locked (e.g. it is gone, deleted, or already locked by another pipeline). + """ + res = await session.execute( + select(FleetModel) + .where( + FleetModel.id == fleet_id, + FleetModel.deleted == False, + ) + .options(selectinload(FleetModel.instances.and_(InstanceModel.deleted == False))) + .execution_options(populate_existing=True) + .with_for_update(skip_locked=True, key_share=True) + ) + fleet_model = res.scalars().unique().one_or_none() + if fleet_model is None: + return None + + if not is_db_sqlite(): + return fleet_model + + await sqlite_commit(session) + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, [fleet_id]) + ) + # Re-query under in-memory lock to see committed changes. + res = await session.execute( + select(FleetModel) + .where( + FleetModel.id == fleet_id, + FleetModel.deleted == False, + ) + .options(selectinload(FleetModel.instances.and_(InstanceModel.deleted == False))) + .execution_options(populate_existing=True) + ) + return res.scalars().unique().one_or_none() + + +def _create_placeholder_instance( + fleet_model: FleetModel, + project: ProjectModel, + job_model: JobModel, +) -> InstanceModel: + taken_instance_nums = {i.instance_num for i in fleet_model.instances} + instance_num = get_next_instance_num(taken_instance_nums) + return InstanceModel( + id=uuid.uuid4(), + name=f"{fleet_model.name}-{instance_num}", + instance_num=instance_num, + project=project, + fleet=fleet_model, + status=InstanceStatus.PENDING, + unreachable=False, + provisioning_job_id=job_model.id, + ) + + +def _is_placeholder_instance(instance: InstanceModel) -> bool: + """A PENDING instance with provisioning_job_id set is a placeholder + created during assignment, waiting for cloud provisioning.""" + return instance.status == InstanceStatus.PENDING and instance.provisioning_job_id is not None + + +def _get_placeholder_instance_ids(context: _SubmittedJobContext) -> list[uuid.UUID]: + instance = context.job_model.instance + if instance is not None and instance.provisioning_job_id is not None: + return [instance.id] + return [] + + +async def _cleanup_placeholder_instances( + session: AsyncSession, + instance_ids: list[uuid.UUID], +) -> None: + if not instance_ids: + return + now = get_current_datetime() + await session.execute( + update(InstanceModel) + .where( + InstanceModel.id.in_(instance_ids), + InstanceModel.provisioning_job_id.is_not(None), + ) + .values( + deleted=True, + deleted_at=now, + finished_at=now, + status=InstanceStatus.TERMINATED, + ) + ) + + def _get_current_reusable_instance_offers( context: _SubmittedJobContext, assignment: _ExistingInstanceAssignment, @@ -1000,6 +1142,13 @@ async def _process_provisioning( return preconditions if context.job_model.instance is not None: + if _is_placeholder_instance(context.job_model.instance): + # Placeholder instance created during assignment — proceed to cloud provisioning. + return await _process_new_capacity_provisioning( + item=item, + context=context, + preconditions=preconditions, + ) return await _process_existing_instance_provisioning( item=item, context=context, @@ -1044,6 +1193,10 @@ async def _apply_provisioning_result( item=item, fleet_id=_get_locked_fleet_id_from_provisioning(provisioning), ) + await _cleanup_placeholder_instances( + session=session, + instance_ids=_get_placeholder_instance_ids_from_provisioning(provisioning), + ) log_lock_token_changed_after_processing(logger, item) return @@ -1072,6 +1225,10 @@ async def _apply_provisioning_result( item=item, fleet_id=provisioning.locked_fleet_id, ) + await _cleanup_placeholder_instances( + session=session, + instance_ids=provisioning.placeholder_instance_ids, + ) await _terminate_submitted_job( session=session, job_model=job_model, @@ -1216,6 +1373,7 @@ async def _process_new_capacity_provisioning( reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, locked_fleet_id=locked_fleet_id, placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup, + placeholder_instance_ids=_get_placeholder_instance_ids(context), ) volume_attachment_result = None @@ -1315,7 +1473,7 @@ async def _materialize_newly_provisioned_capacity( if compute_group_model is not None: session.add(compute_group_model) - instance_models = await _create_instance_models_for_provisioned_jobs( + instance_models = await _promote_or_create_instance_models_for_provisioned_jobs( session=session, context=context, fleet_model=fleet_model, @@ -1355,7 +1513,7 @@ def _resolve_provisioned_jobs_and_data( return [context.job], [provisioning_data], None -async def _create_instance_models_for_provisioned_jobs( +async def _promote_or_create_instance_models_for_provisioned_jobs( session: AsyncSession, context: _SubmittedJobContext, fleet_model: FleetModel, @@ -1367,9 +1525,9 @@ async def _create_instance_models_for_provisioned_jobs( ) -> list[InstanceModel]: provisioned_job_models = _get_job_models_for_jobs(context.run_model.jobs, provisioned_jobs) instance_models: list[InstanceModel] = [] - # FIXME: Fleet is not locked here, which may lead to duplicate `instance_num`. - # This likely needs a separate reservation step so instance rows are created - # before provisioning and `instance_num` is allocated under fleet serialization. + # FIXME: For compute groups, the fleet is not locked here, which may lead to + # duplicate `instance_num`. Single-instance jobs use placeholder instances + # created under fleet lock during assignment, so they are not affected. taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) for provisioned_job_model, job_provisioning_data in zip( provisioned_job_models, job_provisioning_datas @@ -1377,23 +1535,42 @@ async def _create_instance_models_for_provisioned_jobs( provisioned_job_model.fleet_id = fleet_model.id provisioned_job_model.job_provisioning_data = job_provisioning_data.json() switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) - instance_num = get_next_instance_num(taken_instance_nums) - instance_model = _create_instance_model_for_job( - project=context.project, - fleet_model=fleet_model, - compute_group_model=compute_group_model, - job_model=provisioned_job_model, - job_provisioning_data=job_provisioning_data, - offer=offer, - instance_num=instance_num, - profile=effective_profile, - ) + + # If a placeholder instance was created during assignment, promote it + # with the actual provisioning data. Otherwise create a new instance + # (compute group path that has not been migrated to placeholders yet). + # Safe to update the placeholder without FOR UPDATE: the instance pipeline + # skips placeholders (fetcher filter), fleet consolidation does not modify + # them, and the API refuses to delete them while the job is provisioning. + instance_model = provisioned_job_model.instance + if instance_model is not None and instance_model.provisioning_job_id is not None: + _promote_placeholder_instance( + instance_model=instance_model, + compute_group_model=compute_group_model, + job_provisioning_data=job_provisioning_data, + offer=offer, + profile=effective_profile, + ) + else: + instance_num = get_next_instance_num(taken_instance_nums) + instance_model = _create_instance_model_for_job( + project=context.project, + fleet_model=fleet_model, + compute_group_model=compute_group_model, + job_model=provisioned_job_model, + job_provisioning_data=job_provisioning_data, + offer=offer, + instance_num=instance_num, + profile=effective_profile, + ) + taken_instance_nums.add(instance_num) + session.add(instance_model) + provisioned_job_model.used_instance_id = instance_model.id + instance_models.append(instance_model) - taken_instance_nums.add(instance_num) provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( offer, context.multinode ).json() - session.add(instance_model) events.emit( session, f"Instance created for job. Instance status: {instance_model.status.upper()}", @@ -1403,7 +1580,6 @@ async def _create_instance_models_for_provisioned_jobs( events.Target.from_model(provisioned_job_model), ], ) - provisioned_job_model.used_instance_id = instance_model.id provisioned_job_model.last_processed_at = get_current_datetime() return instance_models @@ -1460,6 +1636,36 @@ def _create_instance_model_for_job( ) +def _promote_placeholder_instance( + instance_model: InstanceModel, + compute_group_model: Optional[ComputeGroupModel], + job_provisioning_data: JobProvisioningData, + offer: InstanceOfferWithAvailability, + profile: Profile, +) -> None: + """Promote a placeholder instance to a real provisioning instance + by filling in the fields that were unknown at placeholder creation time.""" + if not job_provisioning_data.dockerized: + termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE + termination_idle_time = 0 + else: + termination_policy, termination_idle_time = get_termination( + profile, DEFAULT_RUN_TERMINATION_IDLE_TIME + ) + instance_model.status = InstanceStatus.PROVISIONING + instance_model.started_at = get_current_datetime() + instance_model.compute_group = compute_group_model + instance_model.job_provisioning_data = job_provisioning_data.json() + instance_model.offer = offer.json() + instance_model.backend = offer.backend + instance_model.price = offer.price + instance_model.region = offer.region + instance_model.termination_policy = termination_policy + instance_model.termination_idle_time = termination_idle_time + instance_model.total_blocks = 1 + instance_model.busy_blocks = 1 + + async def _process_volume_attachments( item: JobSubmittedPipelineItem, project: ProjectModel, @@ -1671,6 +1877,14 @@ def _get_locked_fleet_id_from_provisioning( return None +def _get_placeholder_instance_ids_from_provisioning( + provisioning: _ProvisioningResult, +) -> list[uuid.UUID]: + if isinstance(provisioning, _TerminateSubmittedJobResult): + return provisioning.placeholder_instance_ids + return [] + + def _get_related_volume_lock_owner(job_id: uuid.UUID) -> str: return f"{JobSubmittedPipeline.__name__}:{job_id}" diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 29e8ccecc..5ce388222 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -617,6 +617,20 @@ async def _process_terminating_job( result.job_update_map["status"] = _get_job_termination_status(job_model) return result + # Placeholder instance (PENDING + provisioning_job_id set) has no VM. + # Just mark it for deletion and finish the job. + if ( + instance_model.status == InstanceStatus.PENDING + and instance_model.provisioning_job_id is not None + ): + instance_update_map = get_or_error(result.instance_update_map) + instance_update_map["status"] = InstanceStatus.TERMINATING + instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED + result.job_update_map["instance_id"] = None + await _unregister_replica_and_update_result(result=result, job_model=job_model) + result.job_update_map["status"] = _get_job_termination_status(job_model) + return result + if job_model.graceful_termination_attempts == 0 and job_model.remove_at is None: result.job_update_map = await _stop_job_gracefully(job_model, instance_model) return result diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_add_instancemodel_provisioning_job_id.py b/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_add_instancemodel_provisioning_job_id.py new file mode 100644 index 000000000..80c1fad93 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_add_instancemodel_provisioning_job_id.py @@ -0,0 +1,32 @@ +"""Add InstanceModel.provisioning_job_id for placeholder instances. + +Revision ID: a1b2c3d4e5f6 +Revises: 94fcd7e38b7e +Create Date: 2026-04-22 12:00:00.000000+00:00 + +""" + +import sqlalchemy_utils +from alembic import op +from sqlalchemy import Column + +# revision identifiers, used by Alembic. +revision = "a1b2c3d4e5f6" +down_revision = "94fcd7e38b7e" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "instances", + Column( + "provisioning_job_id", + sqlalchemy_utils.types.uuid.UUIDType(binary=False), + nullable=True, + ), + ) + + +def downgrade() -> None: + op.drop_column("instances", "provisioning_job_id") diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 64990e96a..25032478a 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -823,6 +823,13 @@ class InstanceModel(PipelineModelMixin, BaseModel): job_provisioning_data: Mapped[Optional[str]] = mapped_column(Text) + provisioning_job_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUIDType(binary=False), default=None + ) + """When set, records the job that triggered this instance's creation. + A PENDING instance with this field set is a placeholder managed by + `JobSubmittedPipeline` and is not touched by `InstancePipeline`. + """ remote_connection_info: Mapped[Optional[str]] = mapped_column(Text) total_blocks: Mapped[Optional[int]] = mapped_column(Integer) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 3129bcbb0..b91169471 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -1440,6 +1440,8 @@ def _terminate_fleet_instances( for instance in fleet_model.instances: if instance_nums is not None and instance.instance_num not in instance_nums: continue + if instance.status == InstanceStatus.PENDING and instance.provisioning_job_id is not None: + raise ServerClientError("Failed to delete instance while the job is provisioning.") if instance.status == InstanceStatus.TERMINATED: instance.deleted = True else: diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index c955f5ae1..8775c043c 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -810,6 +810,7 @@ async def create_instance( volumes: Optional[List[VolumeModel]] = None, price: float = 1.0, last_processed_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), + provisioning_job_id: Optional[UUID] = None, ) -> InstanceModel: if instance_id is None: instance_id = uuid.uuid4() @@ -871,6 +872,7 @@ async def create_instance( volume_attachments=volume_attachments, total_blocks=total_blocks, busy_blocks=busy_blocks, + provisioning_job_id=provisioning_job_id, ) if job: im.jobs.append(job) From b7f9a2ce6b439d35db07c4b6bcc6db79284aa6ce Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 22 Apr 2026 15:54:19 +0500 Subject: [PATCH 02/16] Add tests --- .../pipeline_tasks/jobs_submitted.py | 26 ++++- .../test_instances/test_pipeline.py | 31 ++++-- .../pipeline_tasks/test_submitted_jobs.py | 103 ++++++++++++++++-- .../pipeline_tasks/test_terminating_jobs.py | 34 ++++++ .../_internal/server/routers/test_fleets.py | 29 +++++ 5 files changed, 203 insertions(+), 20 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index d958a8f5a..580bffde8 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -1529,6 +1529,7 @@ async def _promote_or_create_instance_models_for_provisioned_jobs( # duplicate `instance_num`. Single-instance jobs use placeholder instances # created under fleet lock during assignment, so they are not affected. taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) + for provisioned_job_model, job_provisioning_data in zip( provisioned_job_models, job_provisioning_datas ): @@ -1536,14 +1537,13 @@ async def _promote_or_create_instance_models_for_provisioned_jobs( provisioned_job_model.job_provisioning_data = job_provisioning_data.json() switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) - # If a placeholder instance was created during assignment, promote it - # with the actual provisioning data. Otherwise create a new instance - # (compute group path that has not been migrated to placeholders yet). + # If a placeholder instance exists, promote it instead of creating a new one. # Safe to update the placeholder without FOR UPDATE: the instance pipeline # skips placeholders (fetcher filter), fleet consolidation does not modify # them, and the API refuses to delete them while the job is provisioning. - instance_model = provisioned_job_model.instance - if instance_model is not None and instance_model.provisioning_job_id is not None: + placeholder_instance = _get_job_placeholder_instance(context, provisioned_job_model) + if placeholder_instance is not None: + instance_model = placeholder_instance _promote_placeholder_instance( instance_model=instance_model, compute_group_model=compute_group_model, @@ -1594,6 +1594,22 @@ async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetMode return set(res.scalars().all()) +def _get_job_placeholder_instance( + context: _SubmittedJobContext, + job_model: JobModel, +) -> Optional[InstanceModel]: + """Return the placeholder instance for a job, or None. + Only context.job_model has the instance relationship eagerly loaded, + so we match by id and return its instance. + """ + if job_model.id != context.job_model.id: + return None + instance = context.job_model.instance + if instance is not None and instance.provisioning_job_id is not None: + return instance + return None + + def _create_instance_model_for_job( project: ProjectModel, fleet_model: FleetModel, diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py b/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py index 2e31567ae..1c0ef2bca 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py @@ -112,6 +112,28 @@ async def test_fetch_selects_eligible_instances_and_sets_lock_fields( locked.lock_token = uuid.uuid4() locked.lock_owner = "OtherPipeline" + # Placeholder instance managed by JobSubmittedPipeline — should be skipped + placeholder = await create_instance( + session=session, + project=project, + status=InstanceStatus.PENDING, + name="placeholder", + last_processed_at=stale + dt.timedelta(seconds=3), + provisioning_job_id=uuid.uuid4(), + offer=None, + job_provisioning_data=None, + ) + + # Promoted placeholder (PROVISIONING + provisioning_job_id) — should be fetched + promoted = await create_instance( + session=session, + project=project, + status=InstanceStatus.PROVISIONING, + name="promoted", + last_processed_at=stale + dt.timedelta(seconds=4), + provisioning_job_id=uuid.uuid4(), + ) + await session.commit() items = await fetcher.fetch(limit=10) @@ -122,14 +144,9 @@ async def test_fetch_selects_eligible_instances_and_sets_lock_fields( busy.id, idle.id, terminating.id, + promoted.id, } - assert {item.status for item in items} == { - InstanceStatus.PENDING, - InstanceStatus.PROVISIONING, - InstanceStatus.BUSY, - InstanceStatus.IDLE, - InstanceStatus.TERMINATING, - } + assert placeholder.id not in {item.id for item in items} for instance in [ pending, diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 4ef56ffc3..8c9601d4f 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -387,13 +387,15 @@ async def test_provisions_assigned_job_on_existing_instance( assert job.lock_token is None assert job.lock_expires_at is None - async def test_provisions_new_capacity_for_assigned_job( + async def test_provisions_new_capacity_for_assigned_job_with_placeholder( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): project = await create_project(session=session) user = await create_user(session=session) repo = await create_repo(session=session, project_id=project.id) - fleet = await create_fleet(session=session, project=project) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) run = await create_run( session=session, project=project, @@ -401,7 +403,21 @@ async def test_provisions_new_capacity_for_assigned_job( user=user, fleet=fleet, ) - job = await create_job(session=session, run=run, instance_assigned=True) + # Simulate the placeholder instance created during assignment + placeholder = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.PENDING, + offer=None, + job_provisioning_data=None, + backend=BackendType.AWS, + ) + job = await create_job( + session=session, run=run, instance=placeholder, instance_assigned=True + ) + placeholder.provisioning_job_id = job.id + await session.commit() offer = get_instance_offer_with_availability(backend=BackendType.AWS) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -418,11 +434,12 @@ async def test_provisions_new_capacity_for_assigned_job( job = await _get_job(session, job.id) assert job.status == JobStatus.PROVISIONING - assert job.instance is not None - assert job.instance.fleet_id == fleet.id - assert job.lock_owner is None - assert job.lock_token is None - assert job.lock_expires_at is None + assert job.used_instance_id == placeholder.id + await session.refresh(placeholder) + assert placeholder.status == InstanceStatus.PROVISIONING + assert placeholder.fleet_id == fleet.id + assert placeholder.offer is not None + assert placeholder.provisioning_job_id == job.id # never cleared async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_fleet( self, test_db, session: AsyncSession, worker: JobSubmittedWorker @@ -1050,6 +1067,76 @@ async def test_assigns_job_to_specific_fleet( assert job.instance is not None and job.instance.id == instance_2.id assert job.fleet_id == fleet_2.id + async def test_assignment_creates_placeholder_instance_for_new_capacity( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project) + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job(session=session, run=run) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + m.return_value = [backend_mock] + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.return_value = [offer] + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.fleet_id == fleet.id + assert job.used_instance_id is not None + # Query the placeholder instance directly to avoid stale session cache + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.id == job.used_instance_id) + .execution_options(populate_existing=True) + ) + placeholder = res.scalar_one() + assert placeholder.status == InstanceStatus.PENDING + assert placeholder.provisioning_job_id == job.id + assert placeholder.fleet_id == fleet.id + assert placeholder.offer is None + assert placeholder.instance_num == 0 + + async def test_assignment_retries_when_fleet_is_full( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.BUSY, + ) + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job(session=session, run=run) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + m.return_value = [backend_mock] + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.return_value = [offer] + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + # Assignment retried — job not committed as assigned + assert job.status == JobStatus.SUBMITTED + assert not job.instance_assigned + assert job.instance is None + async def test_provisions_compute_group( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py index ba4eabb66..1d003dce0 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py @@ -758,6 +758,40 @@ async def test_finishes_job_when_used_instance_is_not_set( assert job.lock_token is None assert job.lock_expires_at is None + async def test_terminates_job_with_placeholder_instance( + self, test_db, session: AsyncSession, worker: JobTerminatingWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + placeholder = await create_instance( + session=session, + project=project, + status=InstanceStatus.PENDING, + provisioning_job_id=uuid.uuid4(), + offer=None, + job_provisioning_data=None, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.TERMINATING, + termination_reason=JobTerminationReason.TERMINATED_BY_USER, + instance=placeholder, + ) + _lock_job(job) + await session.commit() + + # No mocks needed — placeholder has no VM, no SSH, no container + await worker.process(_job_to_pipeline_item(job)) + + await session.refresh(job) + await session.refresh(placeholder) + assert job.status == JobStatus.TERMINATED + assert job.instance_id is None + assert placeholder.status == InstanceStatus.TERMINATING + async def test_retries_detaching_when_used_instance_is_missing( self, test_db, session: AsyncSession, worker: JobTerminatingWorker ): diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 196f263a6..d5f9ffd95 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -2071,6 +2071,35 @@ async def test_importer_member_cannot_delete_imported_fleet_instances( ) assert response.status_code == 403 + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_rejects_deleting_placeholder_instance( + self, test_db, session: AsyncSession, client: AsyncClient + ): + user = await create_user(session, global_role=GlobalRole.USER) + project = await create_project(session) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + fleet = await create_fleet(session=session, project=project) + await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=0, + status=InstanceStatus.PENDING, + provisioning_job_id=uuid4(), + offer=None, + job_provisioning_data=None, + ) + response = await client.post( + f"/api/project/{project.name}/fleets/delete_instances", + headers=get_auth_headers(user.token), + json={"name": fleet.name, "instance_nums": [0]}, + ) + assert response.status_code == 400 + assert "provisioning" in response.text + class TestGetPlan: @pytest.mark.asyncio From 0c417a24dca844b5a460cc3dd88bc026e678d119 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 22 Apr 2026 16:11:53 +0500 Subject: [PATCH 03/16] Update terminating comment --- .../server/background/pipeline_tasks/jobs_terminating.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 5ce388222..20f53be61 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -617,12 +617,12 @@ async def _process_terminating_job( result.job_update_map["status"] = _get_job_termination_status(job_model) return result - # Placeholder instance (PENDING + provisioning_job_id set) has no VM. - # Just mark it for deletion and finish the job. if ( instance_model.status == InstanceStatus.PENDING and instance_model.provisioning_job_id is not None ): + # Placeholder instance (PENDING with provisioning_job_id) has no VM and no + # provisioning data. Skip graceful stop, container stop, and volume detach. instance_update_map = get_or_error(result.instance_update_map) instance_update_map["status"] = InstanceStatus.TERMINATING instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED From cdcbeb8247610afe305a2cecd1c858c35a07e64d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 22 Apr 2026 16:38:48 +0500 Subject: [PATCH 04/16] Fix tests --- ..._add_instancemodel_provisioning_job_id.py} | 4 +- .../pipeline_tasks/test_submitted_jobs.py | 58 +++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) rename src/dstack/_internal/server/migrations/versions/2026/{04_22_1200_add_instancemodel_provisioning_job_id.py => 04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py} (92%) diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_add_instancemodel_provisioning_job_id.py b/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py similarity index 92% rename from src/dstack/_internal/server/migrations/versions/2026/04_22_1200_add_instancemodel_provisioning_job_id.py rename to src/dstack/_internal/server/migrations/versions/2026/04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py index 80c1fad93..6053dbf0a 100644 --- a/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_add_instancemodel_provisioning_job_id.py +++ b/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py @@ -1,6 +1,6 @@ """Add InstanceModel.provisioning_job_id for placeholder instances. -Revision ID: a1b2c3d4e5f6 +Revision ID: d5addc51d0c3 Revises: 94fcd7e38b7e Create Date: 2026-04-22 12:00:00.000000+00:00 @@ -11,7 +11,7 @@ from sqlalchemy import Column # revision identifiers, used by Alembic. -revision = "a1b2c3d4e5f6" +revision = "d5addc51d0c3" down_revision = "94fcd7e38b7e" branch_labels = None depends_on = None diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 8c9601d4f..1fe3f606f 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -1136,6 +1136,64 @@ async def test_assignment_retries_when_fleet_is_full( assert job.status == JobStatus.SUBMITTED assert not job.instance_assigned assert job.instance is None + # No placeholder must be committed when the fleet is full. + res = await session.execute( + select(InstanceModel).where( + InstanceModel.fleet_id == fleet.id, + InstanceModel.deleted == False, + ) + ) + assert len(res.scalars().all()) == 1 + + async def test_cleans_up_placeholder_on_failed_new_capacity_provisioning( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + fleet=fleet, + ) + placeholder = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.PENDING, + offer=None, + job_provisioning_data=None, + backend=BackendType.AWS, + ) + job = await create_job( + session=session, run=run, instance=placeholder, instance_assigned=True + ) + placeholder.provisioning_job_id = job.id + await session.commit() + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + compute_mock = Mock(spec=ComputeMockSpec) + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value = compute_mock + m.return_value = [backend_mock] + compute_mock.get_offers.return_value = [offer] + compute_mock.run_job.side_effect = BackendError("boom") + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + await session.refresh(placeholder) + assert placeholder.deleted + assert placeholder.status == InstanceStatus.TERMINATED async def test_provisions_compute_group( self, test_db, session: AsyncSession, worker: JobSubmittedWorker From 4995793f5458540d8fe00648940f7fdb0423b9bb Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 22 Apr 2026 16:43:30 +0500 Subject: [PATCH 05/16] Fix job_model.instance not set to placeholder --- .../pipeline_tasks/jobs_submitted.py | 4 +- .../pipeline_tasks/test_submitted_jobs.py | 56 +++++++++---------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 580bffde8..659ad0701 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -593,6 +593,7 @@ async def _apply_assignment_result( job_model=job_model, ) session.add(instance_model) + job_model.instance = instance_model job_model.used_instance_id = instance_model.id events.emit( session, @@ -2354,7 +2355,8 @@ def _get_effective_profile_and_requirements( fleet_spec = get_fleet_spec(fleet_model) try: - check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) + if job_model.instance is None or not _is_placeholder_instance(job_model.instance): + check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) effective_profile, requirements = get_run_profile_and_requirements_in_fleet( job=job, run_spec=run.run_spec, diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 1fe3f606f..a61aef80e 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -394,30 +394,10 @@ async def test_provisions_new_capacity_for_assigned_job_with_placeholder( user = await create_user(session=session) repo = await create_repo(session=session, project_id=project.id) fleet_spec = get_fleet_spec() - fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) fleet = await create_fleet(session=session, project=project, spec=fleet_spec) - run = await create_run( - session=session, - project=project, - repo=repo, - user=user, - fleet=fleet, - ) - # Simulate the placeholder instance created during assignment - placeholder = await create_instance( - session=session, - project=project, - fleet=fleet, - status=InstanceStatus.PENDING, - offer=None, - job_provisioning_data=None, - backend=BackendType.AWS, - ) - job = await create_job( - session=session, run=run, instance=placeholder, instance_assigned=True - ) - placeholder.provisioning_job_id = job.id - await session.commit() + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job(session=session, run=run) offer = get_instance_offer_with_availability(backend=BackendType.AWS) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -432,14 +412,32 @@ async def test_provisions_new_capacity_for_assigned_job_with_placeholder( await _process_job(session=session, worker=worker, job_model=job) + job = await _get_job(session, job.id) + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.instance is not None + placeholder_id = job.instance.id + assert job.used_instance_id == placeholder_id + assert job.instance.status == InstanceStatus.PENDING + + await _process_job(session=session, worker=worker, job_model=job) + job = await _get_job(session, job.id) assert job.status == JobStatus.PROVISIONING - assert job.used_instance_id == placeholder.id - await session.refresh(placeholder) - assert placeholder.status == InstanceStatus.PROVISIONING - assert placeholder.fleet_id == fleet.id - assert placeholder.offer is not None - assert placeholder.provisioning_job_id == job.id # never cleared + assert job.instance is not None + assert job.instance.id == placeholder_id + assert job.used_instance_id == placeholder_id + assert job.instance.status == InstanceStatus.PROVISIONING + assert job.instance.fleet_id == fleet.id + assert job.instance.offer is not None + assert job.instance.provisioning_job_id == job.id # never cleared + res = await session.execute( + select(InstanceModel).where( + InstanceModel.fleet_id == fleet.id, + InstanceModel.deleted == False, + ) + ) + assert len(res.scalars().all()) == 1 async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_fleet( self, test_db, session: AsyncSession, worker: JobSubmittedWorker From fd027bbc789d10f85337ac246a4571825c657334 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 22 Apr 2026 17:34:12 +0500 Subject: [PATCH 06/16] Fix placeholders not used for multinode tasks --- .../pipeline_tasks/jobs_submitted.py | 117 +++++++++++------- .../pipeline_tasks/test_submitted_jobs.py | 96 ++++++++++++++ 2 files changed, 165 insertions(+), 48 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 659ad0701..147cf87b2 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -86,7 +86,6 @@ from dstack._internal.server.services.docker import apply_server_docker_defaults from dstack._internal.server.services.fleets import ( can_create_new_cloud_instance_in_fleet, - check_can_create_new_cloud_instance_in_fleet, get_fleet_master_instance_provisioning_data, get_fleet_spec, get_next_instance_num, @@ -561,49 +560,50 @@ async def _apply_assignment_result( return if isinstance(assignment, _NewCapacityAssignment): - # For single-instance jobs, create a placeholder instance under fleet lock - # so that instance_num is unique and nodes.max is enforced as a hard limit. - # Compute groups still use the old path (placeholder created after provisioning). - if len(context.jobs_to_provision) == 1: - async with AsyncExitStack() as exit_stack: - fleet_model = await _lock_fleet_for_placeholder( - exit_stack=exit_stack, - session=session, - fleet_id=assignment.fleet_id, - ) - if fleet_model is None: - logger.debug( - "%s: failed to lock fleet for placeholder creation", - fmt(context.job_model), - ) - await _reset_job_lock_for_retry(session=session, item=item) - return - fleet_spec = get_fleet_spec(fleet_model) - if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec): - logger.debug( - "%s: fleet %s is full, retrying assignment", - fmt(context.job_model), - fleet_model.name, - ) - await _reset_job_lock_for_retry(session=session, item=item) - return - instance_model = _create_placeholder_instance( - fleet_model=fleet_model, - project=context.project, - job_model=job_model, + # Always reserve one placeholder instance under fleet lock for the current + # submitted job. This keeps instance_num unique and makes nodes.max a hard + # limit for the single-instance provisioning path, including multinode + # masters that later fall back to run_job(). Compute groups still use the + # old partial path: one placeholder does not reserve the full group. + async with AsyncExitStack() as exit_stack: + fleet_model = await _lock_fleet_for_placeholder( + exit_stack=exit_stack, + session=session, + fleet_id=assignment.fleet_id, + ) + if fleet_model is None: + logger.debug( + "%s: failed to lock fleet for placeholder creation", + fmt(context.job_model), ) - session.add(instance_model) - job_model.instance = instance_model - job_model.used_instance_id = instance_model.id - events.emit( - session, - f"Instance created for job. Instance status: {instance_model.status.upper()}", - actor=events.SystemActor(), - targets=[ - events.Target.from_model(instance_model), - events.Target.from_model(job_model), - ], + await _reset_job_lock_for_retry(session=session, item=item) + return + fleet_spec = get_fleet_spec(fleet_model) + if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec): + logger.debug( + "%s: fleet %s is full, retrying assignment", + fmt(context.job_model), + fleet_model.name, ) + await _reset_job_lock_for_retry(session=session, item=item) + return + instance_model = _create_placeholder_instance( + fleet_model=fleet_model, + project=context.project, + job_model=job_model, + ) + session.add(instance_model) + job_model.instance = instance_model + job_model.used_instance_id = instance_model.id + events.emit( + session, + f"Instance created for job. Instance status: {instance_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(instance_model), + events.Target.from_model(job_model), + ], + ) job_model.fleet_id = assignment.fleet_id job_model.instance_assigned = True await _mark_job_processed(session=session, job_model=job_model) @@ -1036,6 +1036,12 @@ def _is_placeholder_instance(instance: InstanceModel) -> bool: return instance.status == InstanceStatus.PENDING and instance.provisioning_job_id is not None +def _get_non_placeholder_fleet_instances(fleet_model: FleetModel) -> list[InstanceModel]: + return [ + instance for instance in fleet_model.instances if not _is_placeholder_instance(instance) + ] + + def _get_placeholder_instance_ids(context: _SubmittedJobContext) -> list[uuid.UUID]: instance = context.job_model.instance if instance is not None and instance.provisioning_job_id is not None: @@ -1350,7 +1356,9 @@ async def _process_new_capacity_provisioning( is_master_job(context.job) and fleet_model is not None and _get_cluster_fleet_spec(fleet_model) is not None - and any(not instance.deleted for instance in fleet_model.instances) + # Wait only for a real in-flight/provisioned instance that can anchor + # cluster placement. Placeholder reservations never become fleet masters. + and _get_non_placeholder_fleet_instances(fleet_model) and master_provisioning_data is None ): return _DeferSubmittedJobResult( @@ -1995,7 +2003,9 @@ async def _resolve_related_cluster_master_fleet( fleet_model = res.unique().scalar_one_or_none() if fleet_model is None: return None - if len(fleet_model.instances) != 0: + # Placeholder reservations should not make an empty cluster fleet look + # non-empty; only real instances mean placement is already anchored. + if _get_non_placeholder_fleet_instances(fleet_model): return _ResolvedRelatedClusterMasterFleet( fleet_model=fleet_model, locked_fleet_id=None, @@ -2107,6 +2117,10 @@ async def _provision_new_capacity( job = jobs[0] if volumes is None: volumes = [] + # New-capacity provisioning is reached only for fleet-backed jobs. During + # the transition, legacy in-flight jobs may still have no attached + # instance; otherwise any attached instance here is expected to be the + # placeholder created during assignment. effective_profile_and_requirements = _get_effective_profile_and_requirements( job_model=job_model, run=run, @@ -2163,7 +2177,10 @@ async def _provision_new_capacity( ) if ( fleet_model is not None - and len(fleet_model.instances) == 0 + # The first real instance in an empty cluster fleet is responsible + # for creating/selecting the placement group. A placeholder alone + # must not suppress that path. + and not _get_non_placeholder_fleet_instances(fleet_model) and is_cloud_cluster(fleet_model) and offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT and isinstance(compute, ComputeWithPlacementGroupSupport) @@ -2294,7 +2311,13 @@ def _build_placement_group_cleanup( selected_placement_group_id: Optional[uuid.UUID], new_placement_group_models: list[PlacementGroupModel], ) -> Optional[_PlacementGroupCleanup]: - if fleet_model is None or len(fleet_model.instances) != 0 or offers_tried == 0: + if ( + fleet_model is None + # Treat placeholder-only fleets as empty so a failed first-instance attempt + # still cleans up placement groups created for that attempt. + or _get_non_placeholder_fleet_instances(fleet_model) + or offers_tried == 0 + ): return None return _PlacementGroupCleanup( fleet_id=fleet_model.id, @@ -2355,8 +2378,6 @@ def _get_effective_profile_and_requirements( fleet_spec = get_fleet_spec(fleet_model) try: - if job_model.instance is None or not _is_placeholder_instance(job_model.instance): - check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) effective_profile, requirements = get_run_profile_and_requirements_in_fleet( job=job, run_spec=run.run_spec, diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index a61aef80e..024dbdc24 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -439,6 +439,102 @@ async def test_provisions_new_capacity_for_assigned_job_with_placeholder( ) assert len(res.scalars().all()) == 1 + async def test_multinode_master_reuses_placeholder_when_provisioning_falls_back_to_run_job( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + run_spec = get_run_spec( + run_name="run", + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", nodes=2), + ) + run = await create_run( + session=session, + run_name="run", + project=project, + repo=repo, + user=user, + run_spec=run_spec, + fleet=fleet, + ) + master_job = await create_job( + session=session, + run=run, + job_num=0, + waiting_master_job=False, + ) + worker_job = await create_job( + session=session, + run=run, + job_num=1, + waiting_master_job=True, + ) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + compute_mock = Mock(spec=ComputeMockSpec) + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value = compute_mock + m.return_value = [backend_mock] + compute_mock.get_offers.return_value = [offer] + compute_mock.run_job.return_value = get_job_provisioning_data( + dockerized=True, + backend=BackendType.AWS, + ) + + await _process_job(session=session, worker=worker, job_model=master_job) + + master_job = await _get_job(session, master_job.id) + worker_job = await _get_job(session, worker_job.id) + assert master_job.status == JobStatus.SUBMITTED + assert master_job.instance_assigned + assert master_job.instance is not None + placeholder_id = master_job.instance.id + assert master_job.instance.status == InstanceStatus.PENDING + assert master_job.used_instance_id == placeholder_id + assert master_job.fleet_id == fleet.id + assert worker_job.waiting_master_job + compute_mock.run_job.assert_not_called() + compute_mock.run_jobs.assert_not_called() + + competing_instance = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.BUSY, + backend=BackendType.AWS, + job_provisioning_data=get_job_provisioning_data(backend=BackendType.AWS), + ) + + await _process_job(session=session, worker=worker, job_model=master_job) + + master_job = await _get_job(session, master_job.id) + worker_job = await _get_job(session, worker_job.id) + assert master_job.status == JobStatus.PROVISIONING + assert master_job.instance is not None + assert master_job.instance.id == placeholder_id + assert master_job.instance.status == InstanceStatus.PROVISIONING + assert master_job.used_instance_id == placeholder_id + assert worker_job.waiting_master_job is False + compute_mock.run_job.assert_called_once() + compute_mock.run_jobs.assert_not_called() + res = await session.execute( + select(InstanceModel).where( + InstanceModel.fleet_id == fleet.id, + InstanceModel.deleted == False, + ) + ) + assert {instance.id for instance in res.scalars().all()} == { + placeholder_id, + competing_instance.id, + } + async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_fleet( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): From ea86d516bcfdf8b77d70d8f5257a442b6d5f1e2e Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 22 Apr 2026 17:39:16 +0500 Subject: [PATCH 07/16] Drop optional fleet_model handling --- .../pipeline_tasks/jobs_submitted.py | 37 +++++-------------- 1 file changed, 9 insertions(+), 28 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 147cf87b2..09aa85f37 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -1334,7 +1334,6 @@ async def _process_new_capacity_provisioning( ) locked_fleet_id = None if _should_refresh_related_cluster_master_fleet(context=context): - assert fleet_model is not None related_cluster_master_fleet = await _resolve_related_cluster_master_fleet( item=item, fleet_id=fleet_model.id, @@ -1354,7 +1353,6 @@ async def _process_new_capacity_provisioning( ) if ( is_master_job(context.job) - and fleet_model is not None and _get_cluster_fleet_spec(fleet_model) is not None # Wait only for a real in-flight/provisioned instance that can anchor # cluster placement. Placeholder reservations never become fleet masters. @@ -2100,6 +2098,7 @@ def _release_replica_jobs_from_master_wait( async def _provision_new_capacity( project: ProjectModel, + fleet_model: FleetModel, job_model: JobModel, run: Run, jobs: list[Job], @@ -2107,7 +2106,6 @@ async def _provision_new_capacity( project_ssh_private_key: str, master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[list[list[Volume]]] = None, - fleet_model: Optional[FleetModel] = None, ) -> Union[_FailedNewCapacityProvisioning, _ProvisionNewCapacityResult]: jobs = copy.deepcopy(jobs) for job in jobs: @@ -2131,9 +2129,7 @@ async def _provision_new_capacity( return _FailedNewCapacityProvisioning(placement_group_cleanup=None) profile, requirements = effective_profile_and_requirements - placement_group_models = await _load_fleet_placement_group_models( - fleet_id=fleet_model.id if fleet_model else None, - ) + placement_group_models = await _load_fleet_placement_group_models(fleet_model.id) new_placement_group_models: list[PlacementGroupModel] = [] known_placement_group_ids = { placement_group_model.id for placement_group_model in placement_group_models @@ -2176,11 +2172,10 @@ async def _provision_new_capacity( master_job_provisioning_data=master_job_provisioning_data, ) if ( - fleet_model is not None # The first real instance in an empty cluster fleet is responsible # for creating/selecting the placement group. A placeholder alone # must not suppress that path. - and not _get_non_placeholder_fleet_instances(fleet_model) + not _get_non_placeholder_fleet_instances(fleet_model) and is_cloud_cluster(fleet_model) and offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT and isinstance(compute, ComputeWithPlacementGroupSupport) @@ -2279,12 +2274,7 @@ async def _provision_new_capacity( ) -async def _load_fleet_placement_group_models( - fleet_id: Optional[uuid.UUID], -) -> list["PlacementGroupModel"]: - if fleet_id is None: - return [] - +async def _load_fleet_placement_group_models(fleet_id: uuid.UUID) -> list["PlacementGroupModel"]: async with get_session_ctx() as session: res = await session.execute( select(PlacementGroupModel) @@ -2306,18 +2296,14 @@ async def _load_fleet_placement_group_models( def _build_placement_group_cleanup( - fleet_model: Optional[FleetModel], + fleet_model: FleetModel, offers_tried: int, selected_placement_group_id: Optional[uuid.UUID], new_placement_group_models: list[PlacementGroupModel], ) -> Optional[_PlacementGroupCleanup]: - if ( - fleet_model is None - # Treat placeholder-only fleets as empty so a failed first-instance attempt - # still cleans up placement groups created for that attempt. - or _get_non_placeholder_fleet_instances(fleet_model) - or offers_tried == 0 - ): + # Treat placeholder-only fleets as empty so a failed first-instance attempt + # still cleans up placement groups created for that attempt. + if _get_non_placeholder_fleet_instances(fleet_model) or offers_tried == 0: return None return _PlacementGroupCleanup( fleet_id=fleet_model.id, @@ -2369,13 +2355,8 @@ def _get_effective_profile_and_requirements( job_model: JobModel, run: Run, job: Job, - fleet_model: Optional[FleetModel], + fleet_model: FleetModel, ) -> Optional[tuple[Profile, Requirements]]: - effective_profile = run.run_spec.merged_profile - requirements = job.job_spec.requirements - if fleet_model is None: - return effective_profile, requirements - fleet_spec = get_fleet_spec(fleet_model) try: effective_profile, requirements = get_run_profile_and_requirements_in_fleet( From a297d6946a7588ac541fcb7c056dd04a8d21f7cf Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 10:47:44 +0500 Subject: [PATCH 08/16] Fix placeholder cleanup on stale lock --- .../pipeline_tasks/jobs_submitted.py | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 09aa85f37..80d8eb476 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -1044,7 +1044,7 @@ def _get_non_placeholder_fleet_instances(fleet_model: FleetModel) -> list[Instan def _get_placeholder_instance_ids(context: _SubmittedJobContext) -> list[uuid.UUID]: instance = context.job_model.instance - if instance is not None and instance.provisioning_job_id is not None: + if instance is not None and _is_placeholder_instance(instance): return [instance.id] return [] @@ -1058,10 +1058,7 @@ async def _cleanup_placeholder_instances( now = get_current_datetime() await session.execute( update(InstanceModel) - .where( - InstanceModel.id.in_(instance_ids), - InstanceModel.provisioning_job_id.is_not(None), - ) + .where(InstanceModel.id.in_(instance_ids)) .values( deleted=True, deleted_at=now, @@ -1200,10 +1197,6 @@ async def _apply_provisioning_result( item=item, fleet_id=_get_locked_fleet_id_from_provisioning(provisioning), ) - await _cleanup_placeholder_instances( - session=session, - instance_ids=_get_placeholder_instance_ids_from_provisioning(provisioning), - ) log_lock_token_changed_after_processing(logger, item) return @@ -1612,7 +1605,7 @@ def _get_job_placeholder_instance( if job_model.id != context.job_model.id: return None instance = context.job_model.instance - if instance is not None and instance.provisioning_job_id is not None: + if instance is not None and _is_placeholder_instance(instance): return instance return None @@ -1900,14 +1893,6 @@ def _get_locked_fleet_id_from_provisioning( return None -def _get_placeholder_instance_ids_from_provisioning( - provisioning: _ProvisioningResult, -) -> list[uuid.UUID]: - if isinstance(provisioning, _TerminateSubmittedJobResult): - return provisioning.placeholder_instance_ids - return [] - - def _get_related_volume_lock_owner(job_id: uuid.UUID) -> str: return f"{JobSubmittedPipeline.__name__}:{job_id}" From 37e9e34787bc68a16d24d8675a8818129ad5e28d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 11:12:25 +0500 Subject: [PATCH 09/16] Drop placeholder cleanup on non-stale path --- .../pipeline_tasks/jobs_submitted.py | 40 ++----------------- .../pipeline_tasks/test_submitted_jobs.py | 6 +-- 2 files changed, 7 insertions(+), 39 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 80d8eb476..3439e4cac 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -2,7 +2,7 @@ import copy import uuid from contextlib import AsyncExitStack -from dataclasses import dataclass, field +from dataclasses import dataclass from datetime import timedelta from typing import Optional, Sequence, Union @@ -392,7 +392,6 @@ class _TerminateSubmittedJobResult: message: Optional[str] = None locked_fleet_id: Optional[uuid.UUID] = None placement_group_cleanup: Optional[_PlacementGroupCleanup] = None - placeholder_instance_ids: list[uuid.UUID] = field(default_factory=list) @dataclass @@ -1042,32 +1041,6 @@ def _get_non_placeholder_fleet_instances(fleet_model: FleetModel) -> list[Instan ] -def _get_placeholder_instance_ids(context: _SubmittedJobContext) -> list[uuid.UUID]: - instance = context.job_model.instance - if instance is not None and _is_placeholder_instance(instance): - return [instance.id] - return [] - - -async def _cleanup_placeholder_instances( - session: AsyncSession, - instance_ids: list[uuid.UUID], -) -> None: - if not instance_ids: - return - now = get_current_datetime() - await session.execute( - update(InstanceModel) - .where(InstanceModel.id.in_(instance_ids)) - .values( - deleted=True, - deleted_at=now, - finished_at=now, - status=InstanceStatus.TERMINATED, - ) - ) - - def _get_current_reusable_instance_offers( context: _SubmittedJobContext, assignment: _ExistingInstanceAssignment, @@ -1225,10 +1198,8 @@ async def _apply_provisioning_result( item=item, fleet_id=provisioning.locked_fleet_id, ) - await _cleanup_placeholder_instances( - session=session, - instance_ids=provisioning.placeholder_instance_ids, - ) + # Keep the placeholder live here: JobTerminatingPipeline will unassign it + # from the job, and InstancePipeline will finish deleting it. await _terminate_submitted_job( session=session, job_model=job_model, @@ -1373,7 +1344,6 @@ async def _process_new_capacity_provisioning( reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, locked_fleet_id=locked_fleet_id, placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup, - placeholder_instance_ids=_get_placeholder_instance_ids(context), ) volume_attachment_result = None @@ -1538,9 +1508,7 @@ async def _promote_or_create_instance_models_for_provisioned_jobs( switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) # If a placeholder instance exists, promote it instead of creating a new one. - # Safe to update the placeholder without FOR UPDATE: the instance pipeline - # skips placeholders (fetcher filter), fleet consolidation does not modify - # them, and the API refuses to delete them while the job is provisioning. + # Safe to update the placeholder without locking: nobody else should update the placeholder. placeholder_instance = _get_job_placeholder_instance(context, provisioned_job_model) if placeholder_instance is not None: instance_model = placeholder_instance diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 024dbdc24..39c6c8c0f 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -1239,7 +1239,7 @@ async def test_assignment_retries_when_fleet_is_full( ) assert len(res.scalars().all()) == 1 - async def test_cleans_up_placeholder_on_failed_new_capacity_provisioning( + async def test_leaves_placeholder_for_terminating_pipeline_on_failed_new_capacity_provisioning( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): project = await create_project(session=session) @@ -1286,8 +1286,8 @@ async def test_cleans_up_placeholder_on_failed_new_capacity_provisioning( assert job.status == JobStatus.TERMINATING assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY await session.refresh(placeholder) - assert placeholder.deleted - assert placeholder.status == InstanceStatus.TERMINATED + assert not placeholder.deleted + assert placeholder.status == InstanceStatus.PENDING async def test_provisions_compute_group( self, test_db, session: AsyncSession, worker: JobSubmittedWorker From 4f6557289e155c08f854fb0f0ab6e559b06c94f7 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 13:53:03 +0500 Subject: [PATCH 10/16] Fix sqlite commite after unlock --- .../server/background/pipeline_tasks/jobs_submitted.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 3439e4cac..f694d0dbc 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -603,9 +603,9 @@ async def _apply_assignment_result( events.Target.from_model(job_model), ], ) - job_model.fleet_id = assignment.fleet_id - job_model.instance_assigned = True - await _mark_job_processed(session=session, job_model=job_model) + job_model.fleet_id = assignment.fleet_id + job_model.instance_assigned = True + await _mark_job_processed(session=session, job_model=job_model) return async with AsyncExitStack() as exit_stack: @@ -1318,8 +1318,7 @@ async def _process_new_capacity_provisioning( if ( is_master_job(context.job) and _get_cluster_fleet_spec(fleet_model) is not None - # Wait only for a real in-flight/provisioned instance that can anchor - # cluster placement. Placeholder reservations never become fleet masters. + # Placeholder reservations never become fleet masters. and _get_non_placeholder_fleet_instances(fleet_model) and master_provisioning_data is None ): From ba28b50876e5531aea844aeae57df5efa707186a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 13:53:33 +0500 Subject: [PATCH 11/16] Do not elect placeholder instances as masters --- .../_internal/server/background/pipeline_tasks/fleets.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py index e34d1137b..d8a416110 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py @@ -935,8 +935,15 @@ def _select_current_master_instance_id( return instance_model.id # Prefer existing surviving instances over freshly planned replacements to - # avoid election churn during min-nodes backfill. + # avoid election churn during min-nodes backfill. Skip placeholder instances + # (PENDING + provisioning_job_id) — they have no JPD and cannot anchor cluster + # placement, so electing one just defers the real master decision. for instance_model in surviving_instance_models: + if ( + instance_model.status == InstanceStatus.PENDING + and instance_model.provisioning_job_id is not None + ): + continue if ( _get_effective_instance_status( instance_model, From abfd094ce1daab9aa1e45f9c73ecd3d7ac8fe5a1 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 13:54:01 +0500 Subject: [PATCH 12/16] Count placeholders in _run_can_fit_into_fleet --- .../_internal/server/services/runs/plan.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 18845b585..1569e4905 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -552,8 +552,8 @@ def _run_can_fit_into_fleet( and fleet_spec.configuration.blocks == 1 and fleet_spec.configuration.nodes.max is not None ): - busy_instances = [i for i in fleet_model.instances if i.busy_blocks > 0] - fleet_available_capacity = fleet_spec.configuration.nodes.max - len(busy_instances) + occupied_instances = _get_occupied_instances(fleet_model.instances) + fleet_available_capacity = fleet_spec.configuration.nodes.max - len(occupied_instances) if fleet_available_capacity < nodes_required_num: return False elif fleet_spec.configuration.ssh_config is not None: @@ -568,6 +568,18 @@ def _run_can_fit_into_fleet( return True +def _get_occupied_instances(instance_models: list[InstanceModel]) -> list[InstanceModel]: + # A placeholder has busy_blocks == 0 but reserves a `nodes.max` slot + # (unlike an IDLE instance, which can be reused by this run), so count + # it here the same as a busy instance. + return [ + i + for i in instance_models + if i.busy_blocks > 0 + or (i.status == InstanceStatus.PENDING and i.provisioning_job_id is not None) + ] + + async def _get_backend_offers_in_fleet( project: ProjectModel, fleet_model: FleetModel, From a0f35fcb967bdecbb1ef4556aee65fd03f7e297f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 13:56:50 +0500 Subject: [PATCH 13/16] Update contributing/RUNS-AND-JOBS.md --- contributing/RUNS-AND-JOBS.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/contributing/RUNS-AND-JOBS.md b/contributing/RUNS-AND-JOBS.md index 79734a7f5..2f00e751b 100644 --- a/contributing/RUNS-AND-JOBS.md +++ b/contributing/RUNS-AND-JOBS.md @@ -61,7 +61,9 @@ Services' run lifecycle has some modifications: ## Job's Lifecycle - STEP 1: A newly submitted job has status `SUBMITTED`. It is not assigned to any instance yet. -- STEP 2: `JobSubmittedPipeline` tries to assign an existing instance or provision new capacity. +- STEP 2: `JobSubmittedPipeline` assigns the job in two phases: + - Assignment: claim an existing instance or reserve a *placeholder* `InstanceModel`. Placeholders are `PENDING` instances that reserve an `instance_num` and a `nodes.max` slot. `InstancePipeline` ignores them. + - Provisioning: reuse the existing instance, or cloud-provision and promote the placeholder to `PROVISIONING`. - On success, the job becomes `PROVISIONING`. - On failure, the job becomes `TERMINATING`. `JobTerminatingPipeline` later assigns the final failed status. - STEP 3: `JobRunningPipeline` processes `PROVISIONING`, `PULLING`, and `RUNNING` jobs. From 1fd2c079bbcbc5d3d06c37fa70795d933506b1df Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 14:08:14 +0500 Subject: [PATCH 14/16] Regenerate migration --- ...3_add_instancemodel_provisioning_job_id.py | 32 --------------- ..._add_instancemodel_provisioning_job_id_.py | 39 +++++++++++++++++++ 2 files changed, 39 insertions(+), 32 deletions(-) delete mode 100644 src/dstack/_internal/server/migrations/versions/2026/04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py create mode 100644 src/dstack/_internal/server/migrations/versions/2026/04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py b/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py deleted file mode 100644 index 6053dbf0a..000000000 --- a/src/dstack/_internal/server/migrations/versions/2026/04_22_1200_d5addc51d0c3_add_instancemodel_provisioning_job_id.py +++ /dev/null @@ -1,32 +0,0 @@ -"""Add InstanceModel.provisioning_job_id for placeholder instances. - -Revision ID: d5addc51d0c3 -Revises: 94fcd7e38b7e -Create Date: 2026-04-22 12:00:00.000000+00:00 - -""" - -import sqlalchemy_utils -from alembic import op -from sqlalchemy import Column - -# revision identifiers, used by Alembic. -revision = "d5addc51d0c3" -down_revision = "94fcd7e38b7e" -branch_labels = None -depends_on = None - - -def upgrade() -> None: - op.add_column( - "instances", - Column( - "provisioning_job_id", - sqlalchemy_utils.types.uuid.UUIDType(binary=False), - nullable=True, - ), - ) - - -def downgrade() -> None: - op.drop_column("instances", "provisioning_job_id") diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py b/src/dstack/_internal/server/migrations/versions/2026/04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py new file mode 100644 index 000000000..d959970f5 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py @@ -0,0 +1,39 @@ +"""Add InstanceModel.provisioning_job_id for placeholder instances + +Revision ID: 13ae4ee1df6a +Revises: 94fcd7e38b7e +Create Date: 2026-04-23 09:07:24.552837+00:00 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +from alembic import op + +# revision identifiers, used by Alembic. +revision = "13ae4ee1df6a" +down_revision = "94fcd7e38b7e" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "provisioning_job_id", + sqlalchemy_utils.types.uuid.UUIDType(binary=False), + nullable=True, + ) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_column("provisioning_job_id") + + # ### end Alembic commands ### From 5c04cac5229ede365550a77099f3541b460d2fe5 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 23 Apr 2026 14:55:56 +0500 Subject: [PATCH 15/16] Ignore placeholders in get_placement_group_model_for_job --- .../background/pipeline_tasks/fleets.py | 16 +++++------ .../pipeline_tasks/jobs_submitted.py | 28 ++++++------------- .../pipeline_tasks/jobs_terminating.py | 10 +++---- .../_internal/server/services/fleets.py | 3 +- .../_internal/server/services/instances.py | 18 ++++++++++++ .../_internal/server/services/placement.py | 7 ++++- .../_internal/server/services/runs/plan.py | 8 ++---- 7 files changed, 49 insertions(+), 41 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py index d8a416110..1e033865f 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py @@ -49,7 +49,10 @@ is_fleet_empty, is_fleet_in_use, ) -from dstack._internal.server.services.instances import instance_matches_constraints +from dstack._internal.server.services.instances import ( + instance_matches_constraints, + is_placeholder_instance, +) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.pipelines import PipelineHinterProtocol from dstack._internal.server.utils import sentry_utils @@ -935,14 +938,11 @@ def _select_current_master_instance_id( return instance_model.id # Prefer existing surviving instances over freshly planned replacements to - # avoid election churn during min-nodes backfill. Skip placeholder instances - # (PENDING + provisioning_job_id) — they have no JPD and cannot anchor cluster - # placement, so electing one just defers the real master decision. + # avoid election churn during min-nodes backfill. Skip placeholders — + # they have no JPD and cannot anchor cluster placement, so electing one + # just defers the real master decision. for instance_model in surviving_instance_models: - if ( - instance_model.status == InstanceStatus.PENDING - and instance_model.provisioning_job_id is not None - ): + if is_placeholder_instance(instance_model): continue if ( _get_effective_instance_status( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index f694d0dbc..5aedd2be2 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -92,9 +92,11 @@ is_cloud_cluster, ) from dstack._internal.server.services.instances import ( + filter_non_placeholder_instances, format_instance_blocks_for_event, get_instance_offer, get_instance_provisioning_data, + is_placeholder_instance, switch_instance_status, ) from dstack._internal.server.services.jobs import ( @@ -1029,18 +1031,6 @@ def _create_placeholder_instance( ) -def _is_placeholder_instance(instance: InstanceModel) -> bool: - """A PENDING instance with provisioning_job_id set is a placeholder - created during assignment, waiting for cloud provisioning.""" - return instance.status == InstanceStatus.PENDING and instance.provisioning_job_id is not None - - -def _get_non_placeholder_fleet_instances(fleet_model: FleetModel) -> list[InstanceModel]: - return [ - instance for instance in fleet_model.instances if not _is_placeholder_instance(instance) - ] - - def _get_current_reusable_instance_offers( context: _SubmittedJobContext, assignment: _ExistingInstanceAssignment, @@ -1119,7 +1109,7 @@ async def _process_provisioning( return preconditions if context.job_model.instance is not None: - if _is_placeholder_instance(context.job_model.instance): + if is_placeholder_instance(context.job_model.instance): # Placeholder instance created during assignment — proceed to cloud provisioning. return await _process_new_capacity_provisioning( item=item, @@ -1319,7 +1309,7 @@ async def _process_new_capacity_provisioning( is_master_job(context.job) and _get_cluster_fleet_spec(fleet_model) is not None # Placeholder reservations never become fleet masters. - and _get_non_placeholder_fleet_instances(fleet_model) + and filter_non_placeholder_instances(fleet_model.instances) and master_provisioning_data is None ): return _DeferSubmittedJobResult( @@ -1540,7 +1530,7 @@ async def _promote_or_create_instance_models_for_provisioned_jobs( ).json() events.emit( session, - f"Instance created for job. Instance status: {instance_model.status.upper()}", + f"Instance provisioned for job. Instance status: {instance_model.status.upper()}", actor=events.SystemActor(), targets=[ events.Target.from_model(instance_model), @@ -1572,7 +1562,7 @@ def _get_job_placeholder_instance( if job_model.id != context.job_model.id: return None instance = context.job_model.instance - if instance is not None and _is_placeholder_instance(instance): + if instance is not None and is_placeholder_instance(instance): return instance return None @@ -1955,7 +1945,7 @@ async def _resolve_related_cluster_master_fleet( return None # Placeholder reservations should not make an empty cluster fleet look # non-empty; only real instances mean placement is already anchored. - if _get_non_placeholder_fleet_instances(fleet_model): + if filter_non_placeholder_instances(fleet_model.instances): return _ResolvedRelatedClusterMasterFleet( fleet_model=fleet_model, locked_fleet_id=None, @@ -2127,7 +2117,7 @@ async def _provision_new_capacity( # The first real instance in an empty cluster fleet is responsible # for creating/selecting the placement group. A placeholder alone # must not suppress that path. - not _get_non_placeholder_fleet_instances(fleet_model) + not filter_non_placeholder_instances(fleet_model.instances) and is_cloud_cluster(fleet_model) and offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT and isinstance(compute, ComputeWithPlacementGroupSupport) @@ -2255,7 +2245,7 @@ def _build_placement_group_cleanup( ) -> Optional[_PlacementGroupCleanup]: # Treat placeholder-only fleets as empty so a failed first-instance attempt # still cleans up placement groups created for that attempt. - if _get_non_placeholder_fleet_instances(fleet_model) or offers_tried == 0: + if filter_non_placeholder_instances(fleet_model.instances) or offers_tried == 0: return None return _PlacementGroupCleanup( fleet_id=fleet_model.id, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 20f53be61..097c835e5 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -54,6 +54,7 @@ from dstack._internal.server.services.instances import ( emit_instance_status_change_event, get_instance_ssh_private_keys, + is_placeholder_instance, ) from dstack._internal.server.services.jobs import ( emit_job_status_change_event, @@ -617,12 +618,9 @@ async def _process_terminating_job( result.job_update_map["status"] = _get_job_termination_status(job_model) return result - if ( - instance_model.status == InstanceStatus.PENDING - and instance_model.provisioning_job_id is not None - ): - # Placeholder instance (PENDING with provisioning_job_id) has no VM and no - # provisioning data. Skip graceful stop, container stop, and volume detach. + if is_placeholder_instance(instance_model): + # Placeholder has no VM and no provisioning data. Skip graceful stop, + # container stop, and volume detach. instance_update_map = get_or_error(result.instance_update_map) instance_update_map["status"] = InstanceStatus.TERMINATING instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index b91169471..f6d9c99d6 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -69,6 +69,7 @@ from dstack._internal.server.services import offers as offers_services from dstack._internal.server.services.instances import ( get_instance_remote_connection_info, + is_placeholder_instance, list_active_remote_instances, switch_instance_status, ) @@ -1440,7 +1441,7 @@ def _terminate_fleet_instances( for instance in fleet_model.instances: if instance_nums is not None and instance.instance_num not in instance_nums: continue - if instance.status == InstanceStatus.PENDING and instance.provisioning_job_id is not None: + if is_placeholder_instance(instance): raise ServerClientError("Failed to delete instance while the job is provisioning.") if instance.status == InstanceStatus.TERMINATED: instance.deleted = True diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index a311d7d95..ad48ff1f5 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -328,6 +328,24 @@ def is_ssh_instance(instance_model: InstanceModel) -> bool: return instance_model.remote_connection_info is not None +def is_placeholder_instance(instance_model: InstanceModel) -> bool: + """A PENDING instance with `provisioning_job_id` set is a placeholder + reserved by `JobSubmittedPipeline` during assignment and awaiting cloud + provisioning. It reserves an `instance_num` and a `nodes.max` slot but + has no backend, offer, or provisioning data until it is promoted. + `InstancePipeline` ignores placeholders; only `JobSubmittedPipeline` and + `JobTerminatingPipeline` act on them. + """ + return ( + instance_model.status == InstanceStatus.PENDING + and instance_model.provisioning_job_id is not None + ) + + +def filter_non_placeholder_instances(instance_models: list[InstanceModel]) -> list[InstanceModel]: + return [i for i in instance_models if not is_placeholder_instance(i)] + + def get_instance_remote_connection_info( instance_model: InstanceModel, ) -> Optional[RemoteConnectionInfo]: diff --git a/src/dstack/_internal/server/services/placement.py b/src/dstack/_internal/server/services/placement.py index 14365f09e..3292b7029 100644 --- a/src/dstack/_internal/server/services/placement.py +++ b/src/dstack/_internal/server/services/placement.py @@ -19,6 +19,7 @@ PlacementStrategy, ) from dstack._internal.server.models import FleetModel, PlacementGroupModel +from dstack._internal.server.services.instances import is_placeholder_instance from dstack._internal.utils.common import run_async from dstack._internal.utils.logging import get_logger @@ -103,11 +104,15 @@ def get_placement_group_model_for_job( Returns any fleet placement group for jobs that provision in non-empty fleets and `None` for empty fleets. This is so that only the first job creates placement groups. + Placeholder reservations are excluded: a placeholder-only fleet is treated + as empty here so offer selection is not pinned to a stale PG's region. """ placement_group_model = None active_instances = [] if fleet_model is not None: - active_instances = [i for i in fleet_model.instances if not i.deleted] + active_instances = [ + i for i in fleet_model.instances if not i.deleted and not is_placeholder_instance(i) + ] if len(active_instances) > 0 and len(placement_group_models) > 0: placement_group_model = placement_group_models[0] return placement_group_model diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 1569e4905..1fcf3e7bd 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -43,6 +43,7 @@ get_instance_offer, get_pool_instances, get_shared_instances_with_offers, + is_placeholder_instance, ) from dstack._internal.server.services.jobs import ( get_instances_ids_with_detaching_volumes, @@ -572,12 +573,7 @@ def _get_occupied_instances(instance_models: list[InstanceModel]) -> list[Instan # A placeholder has busy_blocks == 0 but reserves a `nodes.max` slot # (unlike an IDLE instance, which can be reused by this run), so count # it here the same as a busy instance. - return [ - i - for i in instance_models - if i.busy_blocks > 0 - or (i.status == InstanceStatus.PENDING and i.provisioning_job_id is not None) - ] + return [i for i in instance_models if i.busy_blocks > 0 or is_placeholder_instance(i)] async def _get_backend_offers_in_fleet( From b375648a46c17b944f4b4ffe2b0a470ac859126a Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 24 Apr 2026 10:42:34 +0500 Subject: [PATCH 16/16] Rebase migration --- ...71d9c5ab_add_instancemodel_provisioning_job_id_.py} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename src/dstack/_internal/server/migrations/versions/2026/{04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py => 04_24_0542_82b671d9c5ab_add_instancemodel_provisioning_job_id_.py} (85%) diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py b/src/dstack/_internal/server/migrations/versions/2026/04_24_0542_82b671d9c5ab_add_instancemodel_provisioning_job_id_.py similarity index 85% rename from src/dstack/_internal/server/migrations/versions/2026/04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py rename to src/dstack/_internal/server/migrations/versions/2026/04_24_0542_82b671d9c5ab_add_instancemodel_provisioning_job_id_.py index d959970f5..38b5b78d4 100644 --- a/src/dstack/_internal/server/migrations/versions/2026/04_23_0907_13ae4ee1df6a_add_instancemodel_provisioning_job_id_.py +++ b/src/dstack/_internal/server/migrations/versions/2026/04_24_0542_82b671d9c5ab_add_instancemodel_provisioning_job_id_.py @@ -1,8 +1,8 @@ """Add InstanceModel.provisioning_job_id for placeholder instances -Revision ID: 13ae4ee1df6a -Revises: 94fcd7e38b7e -Create Date: 2026-04-23 09:07:24.552837+00:00 +Revision ID: 82b671d9c5ab +Revises: f48b23790053 +Create Date: 2026-04-24 05:42:14.856254+00:00 """ @@ -11,8 +11,8 @@ from alembic import op # revision identifiers, used by Alembic. -revision = "13ae4ee1df6a" -down_revision = "94fcd7e38b7e" +revision = "82b671d9c5ab" +down_revision = "f48b23790053" branch_labels = None depends_on = None