diff --git a/contributing/RUNS-AND-JOBS.md b/contributing/RUNS-AND-JOBS.md index 79734a7f5..2f00e751b 100644 --- a/contributing/RUNS-AND-JOBS.md +++ b/contributing/RUNS-AND-JOBS.md @@ -61,7 +61,9 @@ Services' run lifecycle has some modifications: ## Job's Lifecycle - STEP 1: A newly submitted job has status `SUBMITTED`. It is not assigned to any instance yet. -- STEP 2: `JobSubmittedPipeline` tries to assign an existing instance or provision new capacity. +- STEP 2: `JobSubmittedPipeline` assigns the job in two phases: + - Assignment: claim an existing instance or reserve a *placeholder* `InstanceModel`. Placeholders are `PENDING` instances that reserve an `instance_num` and a `nodes.max` slot. `InstancePipeline` ignores them. + - Provisioning: reuse the existing instance, or cloud-provision and promote the placeholder to `PROVISIONING`. - On success, the job becomes `PROVISIONING`. - On failure, the job becomes `TERMINATING`. `JobTerminatingPipeline` later assigns the final failed status. - STEP 3: `JobRunningPipeline` processes `PROVISIONING`, `PULLING`, and `RUNNING` jobs. diff --git a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py index e34d1137b..1e033865f 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py @@ -49,7 +49,10 @@ is_fleet_empty, is_fleet_in_use, ) -from dstack._internal.server.services.instances import instance_matches_constraints +from dstack._internal.server.services.instances import ( + instance_matches_constraints, + is_placeholder_instance, +) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.pipelines import PipelineHinterProtocol from dstack._internal.server.utils import sentry_utils @@ -935,8 +938,12 @@ def _select_current_master_instance_id( return instance_model.id # Prefer existing surviving instances over freshly planned replacements to - # avoid election churn during min-nodes backfill. + # avoid election churn during min-nodes backfill. Skip placeholders — + # they have no JPD and cannot anchor cluster placement, so electing one + # just defers the real master decision. for instance_model in surviving_instance_models: + if is_placeholder_instance(instance_model): + continue if ( _get_effective_instance_status( instance_model, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py index a070eb800..5b1ce5a26 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py @@ -179,6 +179,13 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]: InstanceModel.compute_group_id.is_not(None), ) ), + # Skip placeholder instances managed by JobSubmittedPipeline. + not_( + and_( + InstanceModel.status == InstanceStatus.PENDING, + InstanceModel.provisioning_job_id.is_not(None), + ) + ), InstanceModel.deleted == False, or_( # Process fast-moving instances (pending, provisioning, terminating) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 5a2ec3e0e..549855391 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -85,16 +85,18 @@ from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.docker import apply_server_docker_defaults from dstack._internal.server.services.fleets import ( - check_can_create_new_cloud_instance_in_fleet, + can_create_new_cloud_instance_in_fleet, get_fleet_master_instance_provisioning_data, get_fleet_spec, get_next_instance_num, is_cloud_cluster, ) from dstack._internal.server.services.instances import ( + filter_non_placeholder_instances, format_instance_blocks_for_event, get_instance_offer, get_instance_provisioning_data, + is_placeholder_instance, switch_instance_status, ) from dstack._internal.server.services.jobs import ( @@ -559,9 +561,53 @@ async def _apply_assignment_result( return if isinstance(assignment, _NewCapacityAssignment): - job_model.fleet_id = assignment.fleet_id - job_model.instance_assigned = True - await _mark_job_processed(session=session, job_model=job_model) + # Always reserve one placeholder instance under fleet lock for the current + # submitted job. This keeps instance_num unique and makes nodes.max a hard + # limit for the single-instance provisioning path, including multinode + # masters that later fall back to run_job(). Compute groups still use the + # old partial path: one placeholder does not reserve the full group. + async with AsyncExitStack() as exit_stack: + fleet_model = await _lock_fleet_for_placeholder( + exit_stack=exit_stack, + session=session, + fleet_id=assignment.fleet_id, + ) + if fleet_model is None: + logger.debug( + "%s: failed to lock fleet for placeholder creation", + fmt(context.job_model), + ) + await _reset_job_lock_for_retry(session=session, item=item) + return + fleet_spec = get_fleet_spec(fleet_model) + if not can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec): + logger.debug( + "%s: fleet %s is full, retrying assignment", + fmt(context.job_model), + fleet_model.name, + ) + await _reset_job_lock_for_retry(session=session, item=item) + return + instance_model = _create_placeholder_instance( + fleet_model=fleet_model, + project=context.project, + job_model=job_model, + ) + session.add(instance_model) + job_model.instance = instance_model + job_model.used_instance_id = instance_model.id + events.emit( + session, + f"Instance created for job. Instance status: {instance_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(instance_model), + events.Target.from_model(job_model), + ], + ) + job_model.fleet_id = assignment.fleet_id + job_model.instance_assigned = True + await _mark_job_processed(session=session, job_model=job_model) return async with AsyncExitStack() as exit_stack: @@ -922,6 +968,69 @@ async def _lock_assignment_fleet_for_existing_instance_assignment( return fleets_with_locked_instances[0] +async def _lock_fleet_for_placeholder( + exit_stack: AsyncExitStack, + session: AsyncSession, + fleet_id: uuid.UUID, +) -> Optional[FleetModel]: + """Lock a fleet and load its non-deleted instances for placeholder creation. + + Returns the fleet model with instances loaded, or None if the fleet + cannot be locked (e.g. it is gone, deleted, or already locked by another pipeline). + """ + res = await session.execute( + select(FleetModel) + .where( + FleetModel.id == fleet_id, + FleetModel.deleted == False, + ) + .options(selectinload(FleetModel.instances.and_(InstanceModel.deleted == False))) + .execution_options(populate_existing=True) + .with_for_update(skip_locked=True, key_share=True) + ) + fleet_model = res.scalars().unique().one_or_none() + if fleet_model is None: + return None + + if not is_db_sqlite(): + return fleet_model + + await sqlite_commit(session) + await exit_stack.enter_async_context( + get_locker(get_db().dialect_name).lock_ctx(FleetModel.__tablename__, [fleet_id]) + ) + # Re-query under in-memory lock to see committed changes. + res = await session.execute( + select(FleetModel) + .where( + FleetModel.id == fleet_id, + FleetModel.deleted == False, + ) + .options(selectinload(FleetModel.instances.and_(InstanceModel.deleted == False))) + .execution_options(populate_existing=True) + ) + return res.scalars().unique().one_or_none() + + +def _create_placeholder_instance( + fleet_model: FleetModel, + project: ProjectModel, + job_model: JobModel, +) -> InstanceModel: + taken_instance_nums = {i.instance_num for i in fleet_model.instances} + instance_num = get_next_instance_num(taken_instance_nums) + return InstanceModel( + id=uuid.uuid4(), + name=f"{fleet_model.name}-{instance_num}", + instance_num=instance_num, + project=project, + fleet=fleet_model, + status=InstanceStatus.PENDING, + unreachable=False, + provisioning_job_id=job_model.id, + ) + + def _get_current_reusable_instance_offers( context: _SubmittedJobContext, assignment: _ExistingInstanceAssignment, @@ -1000,6 +1109,13 @@ async def _process_provisioning( return preconditions if context.job_model.instance is not None: + if is_placeholder_instance(context.job_model.instance): + # Placeholder instance created during assignment — proceed to cloud provisioning. + return await _process_new_capacity_provisioning( + item=item, + context=context, + preconditions=preconditions, + ) return await _process_existing_instance_provisioning( item=item, context=context, @@ -1072,6 +1188,8 @@ async def _apply_provisioning_result( item=item, fleet_id=provisioning.locked_fleet_id, ) + # Keep the placeholder live here: JobTerminatingPipeline will unassign it + # from the job, and InstancePipeline will finish deleting it. await _terminate_submitted_job( session=session, job_model=job_model, @@ -1170,7 +1288,6 @@ async def _process_new_capacity_provisioning( ) locked_fleet_id = None if _should_refresh_related_cluster_master_fleet(context=context): - assert fleet_model is not None related_cluster_master_fleet = await _resolve_related_cluster_master_fleet( item=item, fleet_id=fleet_model.id, @@ -1190,9 +1307,9 @@ async def _process_new_capacity_provisioning( ) if ( is_master_job(context.job) - and fleet_model is not None and _get_cluster_fleet_spec(fleet_model) is not None - and any(not instance.deleted for instance in fleet_model.instances) + # Placeholder reservations never become fleet masters. + and filter_non_placeholder_instances(fleet_model.instances) and master_provisioning_data is None ): return _DeferSubmittedJobResult( @@ -1315,7 +1432,7 @@ async def _materialize_newly_provisioned_capacity( if compute_group_model is not None: session.add(compute_group_model) - instance_models = await _create_instance_models_for_provisioned_jobs( + instance_models = await _promote_or_create_instance_models_for_provisioned_jobs( session=session, context=context, fleet_model=fleet_model, @@ -1355,7 +1472,7 @@ def _resolve_provisioned_jobs_and_data( return [context.job], [provisioning_data], None -async def _create_instance_models_for_provisioned_jobs( +async def _promote_or_create_instance_models_for_provisioned_jobs( session: AsyncSession, context: _SubmittedJobContext, fleet_model: FleetModel, @@ -1367,43 +1484,59 @@ async def _create_instance_models_for_provisioned_jobs( ) -> list[InstanceModel]: provisioned_job_models = _get_job_models_for_jobs(context.run_model.jobs, provisioned_jobs) instance_models: list[InstanceModel] = [] - # FIXME: Fleet is not locked here, which may lead to duplicate `instance_num`. - # This likely needs a separate reservation step so instance rows are created - # before provisioning and `instance_num` is allocated under fleet serialization. + # FIXME: For compute groups, the fleet is not locked here, which may lead to + # duplicate `instance_num`. Single-instance jobs use placeholder instances + # created under fleet lock during assignment, so they are not affected. taken_instance_nums = await _get_taken_instance_nums(session, fleet_model) + for provisioned_job_model, job_provisioning_data in zip( provisioned_job_models, job_provisioning_datas ): provisioned_job_model.fleet_id = fleet_model.id provisioned_job_model.job_provisioning_data = job_provisioning_data.json() switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) - instance_num = get_next_instance_num(taken_instance_nums) - instance_model = _create_instance_model_for_job( - project=context.project, - fleet_model=fleet_model, - compute_group_model=compute_group_model, - job_model=provisioned_job_model, - job_provisioning_data=job_provisioning_data, - offer=offer, - instance_num=instance_num, - profile=effective_profile, - ) + + # If a placeholder instance exists, promote it instead of creating a new one. + # Safe to update the placeholder without locking: nobody else should update the placeholder. + placeholder_instance = _get_job_placeholder_instance(context, provisioned_job_model) + if placeholder_instance is not None: + instance_model = placeholder_instance + _promote_placeholder_instance( + instance_model=instance_model, + compute_group_model=compute_group_model, + job_provisioning_data=job_provisioning_data, + offer=offer, + profile=effective_profile, + ) + else: + instance_num = get_next_instance_num(taken_instance_nums) + instance_model = _create_instance_model_for_job( + project=context.project, + fleet_model=fleet_model, + compute_group_model=compute_group_model, + job_model=provisioned_job_model, + job_provisioning_data=job_provisioning_data, + offer=offer, + instance_num=instance_num, + profile=effective_profile, + ) + taken_instance_nums.add(instance_num) + session.add(instance_model) + provisioned_job_model.used_instance_id = instance_model.id + instance_models.append(instance_model) - taken_instance_nums.add(instance_num) provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( offer, context.multinode ).json() - session.add(instance_model) events.emit( session, - f"Instance created for job. Instance status: {instance_model.status.upper()}", + f"Instance provisioned for job. Instance status: {instance_model.status.upper()}", actor=events.SystemActor(), targets=[ events.Target.from_model(instance_model), events.Target.from_model(provisioned_job_model), ], ) - provisioned_job_model.used_instance_id = instance_model.id provisioned_job_model.last_processed_at = get_current_datetime() return instance_models @@ -1418,6 +1551,22 @@ async def _get_taken_instance_nums(session: AsyncSession, fleet_model: FleetMode return set(res.scalars().all()) +def _get_job_placeholder_instance( + context: _SubmittedJobContext, + job_model: JobModel, +) -> Optional[InstanceModel]: + """Return the placeholder instance for a job, or None. + Only context.job_model has the instance relationship eagerly loaded, + so we match by id and return its instance. + """ + if job_model.id != context.job_model.id: + return None + instance = context.job_model.instance + if instance is not None and is_placeholder_instance(instance): + return instance + return None + + def _create_instance_model_for_job( project: ProjectModel, fleet_model: FleetModel, @@ -1460,6 +1609,36 @@ def _create_instance_model_for_job( ) +def _promote_placeholder_instance( + instance_model: InstanceModel, + compute_group_model: Optional[ComputeGroupModel], + job_provisioning_data: JobProvisioningData, + offer: InstanceOfferWithAvailability, + profile: Profile, +) -> None: + """Promote a placeholder instance to a real provisioning instance + by filling in the fields that were unknown at placeholder creation time.""" + if not job_provisioning_data.dockerized: + termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE + termination_idle_time = 0 + else: + termination_policy, termination_idle_time = get_termination( + profile, DEFAULT_RUN_TERMINATION_IDLE_TIME + ) + instance_model.status = InstanceStatus.PROVISIONING + instance_model.started_at = get_current_datetime() + instance_model.compute_group = compute_group_model + instance_model.job_provisioning_data = job_provisioning_data.json() + instance_model.offer = offer.json() + instance_model.backend = offer.backend + instance_model.price = offer.price + instance_model.region = offer.region + instance_model.termination_policy = termination_policy + instance_model.termination_idle_time = termination_idle_time + instance_model.total_blocks = 1 + instance_model.busy_blocks = 1 + + async def _process_volume_attachments( item: JobSubmittedPipelineItem, project: ProjectModel, @@ -1767,7 +1946,9 @@ async def _resolve_related_cluster_master_fleet( fleet_model = res.unique().scalar_one_or_none() if fleet_model is None: return None - if len(fleet_model.instances) != 0: + # Placeholder reservations should not make an empty cluster fleet look + # non-empty; only real instances mean placement is already anchored. + if filter_non_placeholder_instances(fleet_model.instances): return _ResolvedRelatedClusterMasterFleet( fleet_model=fleet_model, locked_fleet_id=None, @@ -1862,6 +2043,7 @@ def _release_replica_jobs_from_master_wait( async def _provision_new_capacity( project: ProjectModel, + fleet_model: FleetModel, job_model: JobModel, run: Run, jobs: list[Job], @@ -1869,7 +2051,6 @@ async def _provision_new_capacity( project_ssh_private_key: str, master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[list[list[Volume]]] = None, - fleet_model: Optional[FleetModel] = None, ) -> Union[_FailedNewCapacityProvisioning, _ProvisionNewCapacityResult]: jobs = copy.deepcopy(jobs) for job in jobs: @@ -1879,6 +2060,10 @@ async def _provision_new_capacity( job = jobs[0] if volumes is None: volumes = [] + # New-capacity provisioning is reached only for fleet-backed jobs. During + # the transition, legacy in-flight jobs may still have no attached + # instance; otherwise any attached instance here is expected to be the + # placeholder created during assignment. effective_profile_and_requirements = _get_effective_profile_and_requirements( job_model=job_model, run=run, @@ -1889,9 +2074,7 @@ async def _provision_new_capacity( return _FailedNewCapacityProvisioning(placement_group_cleanup=None) profile, requirements = effective_profile_and_requirements - placement_group_models = await _load_fleet_placement_group_models( - fleet_id=fleet_model.id if fleet_model else None, - ) + placement_group_models = await _load_fleet_placement_group_models(fleet_model.id) new_placement_group_models: list[PlacementGroupModel] = [] known_placement_group_ids = { placement_group_model.id for placement_group_model in placement_group_models @@ -1934,8 +2117,10 @@ async def _provision_new_capacity( master_job_provisioning_data=master_job_provisioning_data, ) if ( - fleet_model is not None - and len(fleet_model.instances) == 0 + # The first real instance in an empty cluster fleet is responsible + # for creating/selecting the placement group. A placeholder alone + # must not suppress that path. + not filter_non_placeholder_instances(fleet_model.instances) and is_cloud_cluster(fleet_model) and offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT and isinstance(compute, ComputeWithPlacementGroupSupport) @@ -2034,12 +2219,7 @@ async def _provision_new_capacity( ) -async def _load_fleet_placement_group_models( - fleet_id: Optional[uuid.UUID], -) -> list["PlacementGroupModel"]: - if fleet_id is None: - return [] - +async def _load_fleet_placement_group_models(fleet_id: uuid.UUID) -> list["PlacementGroupModel"]: async with get_session_ctx() as session: res = await session.execute( select(PlacementGroupModel) @@ -2061,12 +2241,14 @@ async def _load_fleet_placement_group_models( def _build_placement_group_cleanup( - fleet_model: Optional[FleetModel], + fleet_model: FleetModel, offers_tried: int, selected_placement_group_id: Optional[uuid.UUID], new_placement_group_models: list[PlacementGroupModel], ) -> Optional[_PlacementGroupCleanup]: - if fleet_model is None or len(fleet_model.instances) != 0 or offers_tried == 0: + # Treat placeholder-only fleets as empty so a failed first-instance attempt + # still cleans up placement groups created for that attempt. + if filter_non_placeholder_instances(fleet_model.instances) or offers_tried == 0: return None return _PlacementGroupCleanup( fleet_id=fleet_model.id, @@ -2118,16 +2300,10 @@ def _get_effective_profile_and_requirements( job_model: JobModel, run: Run, job: Job, - fleet_model: Optional[FleetModel], + fleet_model: FleetModel, ) -> Optional[tuple[Profile, Requirements]]: - effective_profile = run.run_spec.merged_profile - requirements = job.job_spec.requirements - if fleet_model is None: - return effective_profile, requirements - fleet_spec = get_fleet_spec(fleet_model) try: - check_can_create_new_cloud_instance_in_fleet(fleet_model, fleet_spec) effective_profile, requirements = get_run_profile_and_requirements_in_fleet( job=job, run_spec=run.run_spec, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 29e8ccecc..097c835e5 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -54,6 +54,7 @@ from dstack._internal.server.services.instances import ( emit_instance_status_change_event, get_instance_ssh_private_keys, + is_placeholder_instance, ) from dstack._internal.server.services.jobs import ( emit_job_status_change_event, @@ -617,6 +618,17 @@ async def _process_terminating_job( result.job_update_map["status"] = _get_job_termination_status(job_model) return result + if is_placeholder_instance(instance_model): + # Placeholder has no VM and no provisioning data. Skip graceful stop, + # container stop, and volume detach. + instance_update_map = get_or_error(result.instance_update_map) + instance_update_map["status"] = InstanceStatus.TERMINATING + instance_update_map["termination_reason"] = InstanceTerminationReason.JOB_FINISHED + result.job_update_map["instance_id"] = None + await _unregister_replica_and_update_result(result=result, job_model=job_model) + result.job_update_map["status"] = _get_job_termination_status(job_model) + return result + if job_model.graceful_termination_attempts == 0 and job_model.remove_at is None: result.job_update_map = await _stop_job_gracefully(job_model, instance_model) return result diff --git a/src/dstack/_internal/server/migrations/versions/2026/04_24_0542_82b671d9c5ab_add_instancemodel_provisioning_job_id_.py b/src/dstack/_internal/server/migrations/versions/2026/04_24_0542_82b671d9c5ab_add_instancemodel_provisioning_job_id_.py new file mode 100644 index 000000000..38b5b78d4 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/04_24_0542_82b671d9c5ab_add_instancemodel_provisioning_job_id_.py @@ -0,0 +1,39 @@ +"""Add InstanceModel.provisioning_job_id for placeholder instances + +Revision ID: 82b671d9c5ab +Revises: f48b23790053 +Create Date: 2026-04-24 05:42:14.856254+00:00 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +from alembic import op + +# revision identifiers, used by Alembic. +revision = "82b671d9c5ab" +down_revision = "f48b23790053" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "provisioning_job_id", + sqlalchemy_utils.types.uuid.UUIDType(binary=False), + nullable=True, + ) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_column("provisioning_job_id") + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index 0b7595629..f8725a970 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -824,6 +824,13 @@ class InstanceModel(PipelineModelMixin, BaseModel): job_provisioning_data: Mapped[Optional[str]] = mapped_column(Text) + provisioning_job_id: Mapped[Optional[uuid.UUID]] = mapped_column( + UUIDType(binary=False), default=None + ) + """When set, records the job that triggered this instance's creation. + A PENDING instance with this field set is a placeholder managed by + `JobSubmittedPipeline` and is not touched by `InstancePipeline`. + """ remote_connection_info: Mapped[Optional[str]] = mapped_column(Text) total_blocks: Mapped[Optional[int]] = mapped_column(Integer) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 3129bcbb0..f6d9c99d6 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -69,6 +69,7 @@ from dstack._internal.server.services import offers as offers_services from dstack._internal.server.services.instances import ( get_instance_remote_connection_info, + is_placeholder_instance, list_active_remote_instances, switch_instance_status, ) @@ -1440,6 +1441,8 @@ def _terminate_fleet_instances( for instance in fleet_model.instances: if instance_nums is not None and instance.instance_num not in instance_nums: continue + if is_placeholder_instance(instance): + raise ServerClientError("Failed to delete instance while the job is provisioning.") if instance.status == InstanceStatus.TERMINATED: instance.deleted = True else: diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index a311d7d95..ad48ff1f5 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -328,6 +328,24 @@ def is_ssh_instance(instance_model: InstanceModel) -> bool: return instance_model.remote_connection_info is not None +def is_placeholder_instance(instance_model: InstanceModel) -> bool: + """A PENDING instance with `provisioning_job_id` set is a placeholder + reserved by `JobSubmittedPipeline` during assignment and awaiting cloud + provisioning. It reserves an `instance_num` and a `nodes.max` slot but + has no backend, offer, or provisioning data until it is promoted. + `InstancePipeline` ignores placeholders; only `JobSubmittedPipeline` and + `JobTerminatingPipeline` act on them. + """ + return ( + instance_model.status == InstanceStatus.PENDING + and instance_model.provisioning_job_id is not None + ) + + +def filter_non_placeholder_instances(instance_models: list[InstanceModel]) -> list[InstanceModel]: + return [i for i in instance_models if not is_placeholder_instance(i)] + + def get_instance_remote_connection_info( instance_model: InstanceModel, ) -> Optional[RemoteConnectionInfo]: diff --git a/src/dstack/_internal/server/services/placement.py b/src/dstack/_internal/server/services/placement.py index 14365f09e..3292b7029 100644 --- a/src/dstack/_internal/server/services/placement.py +++ b/src/dstack/_internal/server/services/placement.py @@ -19,6 +19,7 @@ PlacementStrategy, ) from dstack._internal.server.models import FleetModel, PlacementGroupModel +from dstack._internal.server.services.instances import is_placeholder_instance from dstack._internal.utils.common import run_async from dstack._internal.utils.logging import get_logger @@ -103,11 +104,15 @@ def get_placement_group_model_for_job( Returns any fleet placement group for jobs that provision in non-empty fleets and `None` for empty fleets. This is so that only the first job creates placement groups. + Placeholder reservations are excluded: a placeholder-only fleet is treated + as empty here so offer selection is not pinned to a stale PG's region. """ placement_group_model = None active_instances = [] if fleet_model is not None: - active_instances = [i for i in fleet_model.instances if not i.deleted] + active_instances = [ + i for i in fleet_model.instances if not i.deleted and not is_placeholder_instance(i) + ] if len(active_instances) > 0 and len(placement_group_models) > 0: placement_group_model = placement_group_models[0] return placement_group_model diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 18845b585..1fcf3e7bd 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -43,6 +43,7 @@ get_instance_offer, get_pool_instances, get_shared_instances_with_offers, + is_placeholder_instance, ) from dstack._internal.server.services.jobs import ( get_instances_ids_with_detaching_volumes, @@ -552,8 +553,8 @@ def _run_can_fit_into_fleet( and fleet_spec.configuration.blocks == 1 and fleet_spec.configuration.nodes.max is not None ): - busy_instances = [i for i in fleet_model.instances if i.busy_blocks > 0] - fleet_available_capacity = fleet_spec.configuration.nodes.max - len(busy_instances) + occupied_instances = _get_occupied_instances(fleet_model.instances) + fleet_available_capacity = fleet_spec.configuration.nodes.max - len(occupied_instances) if fleet_available_capacity < nodes_required_num: return False elif fleet_spec.configuration.ssh_config is not None: @@ -568,6 +569,13 @@ def _run_can_fit_into_fleet( return True +def _get_occupied_instances(instance_models: list[InstanceModel]) -> list[InstanceModel]: + # A placeholder has busy_blocks == 0 but reserves a `nodes.max` slot + # (unlike an IDLE instance, which can be reused by this run), so count + # it here the same as a busy instance. + return [i for i in instance_models if i.busy_blocks > 0 or is_placeholder_instance(i)] + + async def _get_backend_offers_in_fleet( project: ProjectModel, fleet_model: FleetModel, diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index c955f5ae1..8775c043c 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -810,6 +810,7 @@ async def create_instance( volumes: Optional[List[VolumeModel]] = None, price: float = 1.0, last_processed_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc), + provisioning_job_id: Optional[UUID] = None, ) -> InstanceModel: if instance_id is None: instance_id = uuid.uuid4() @@ -871,6 +872,7 @@ async def create_instance( volume_attachments=volume_attachments, total_blocks=total_blocks, busy_blocks=busy_blocks, + provisioning_job_id=provisioning_job_id, ) if job: im.jobs.append(job) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py b/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py index 2e31567ae..1c0ef2bca 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py @@ -112,6 +112,28 @@ async def test_fetch_selects_eligible_instances_and_sets_lock_fields( locked.lock_token = uuid.uuid4() locked.lock_owner = "OtherPipeline" + # Placeholder instance managed by JobSubmittedPipeline — should be skipped + placeholder = await create_instance( + session=session, + project=project, + status=InstanceStatus.PENDING, + name="placeholder", + last_processed_at=stale + dt.timedelta(seconds=3), + provisioning_job_id=uuid.uuid4(), + offer=None, + job_provisioning_data=None, + ) + + # Promoted placeholder (PROVISIONING + provisioning_job_id) — should be fetched + promoted = await create_instance( + session=session, + project=project, + status=InstanceStatus.PROVISIONING, + name="promoted", + last_processed_at=stale + dt.timedelta(seconds=4), + provisioning_job_id=uuid.uuid4(), + ) + await session.commit() items = await fetcher.fetch(limit=10) @@ -122,14 +144,9 @@ async def test_fetch_selects_eligible_instances_and_sets_lock_fields( busy.id, idle.id, terminating.id, + promoted.id, } - assert {item.status for item in items} == { - InstanceStatus.PENDING, - InstanceStatus.PROVISIONING, - InstanceStatus.BUSY, - InstanceStatus.IDLE, - InstanceStatus.TERMINATING, - } + assert placeholder.id not in {item.id for item in items} for instance in [ pending, diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 4ef56ffc3..39c6c8c0f 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -387,21 +387,17 @@ async def test_provisions_assigned_job_on_existing_instance( assert job.lock_token is None assert job.lock_expires_at is None - async def test_provisions_new_capacity_for_assigned_job( + async def test_provisions_new_capacity_for_assigned_job_with_placeholder( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): project = await create_project(session=session) user = await create_user(session=session) repo = await create_repo(session=session, project_id=project.id) - fleet = await create_fleet(session=session, project=project) - run = await create_run( - session=session, - project=project, - repo=repo, - user=user, - fleet=fleet, - ) - job = await create_job(session=session, run=run, instance_assigned=True) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job(session=session, run=run) offer = get_instance_offer_with_availability(backend=BackendType.AWS) with patch("dstack._internal.server.services.backends.get_project_backends") as m: @@ -416,13 +412,128 @@ async def test_provisions_new_capacity_for_assigned_job( await _process_job(session=session, worker=worker, job_model=job) + job = await _get_job(session, job.id) + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.instance is not None + placeholder_id = job.instance.id + assert job.used_instance_id == placeholder_id + assert job.instance.status == InstanceStatus.PENDING + + await _process_job(session=session, worker=worker, job_model=job) + job = await _get_job(session, job.id) assert job.status == JobStatus.PROVISIONING assert job.instance is not None + assert job.instance.id == placeholder_id + assert job.used_instance_id == placeholder_id + assert job.instance.status == InstanceStatus.PROVISIONING assert job.instance.fleet_id == fleet.id - assert job.lock_owner is None - assert job.lock_token is None - assert job.lock_expires_at is None + assert job.instance.offer is not None + assert job.instance.provisioning_job_id == job.id # never cleared + res = await session.execute( + select(InstanceModel).where( + InstanceModel.fleet_id == fleet.id, + InstanceModel.deleted == False, + ) + ) + assert len(res.scalars().all()) == 1 + + async def test_multinode_master_reuses_placeholder_when_provisioning_falls_back_to_run_job( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + run_spec = get_run_spec( + run_name="run", + repo_id=repo.name, + configuration=TaskConfiguration(image="debian", nodes=2), + ) + run = await create_run( + session=session, + run_name="run", + project=project, + repo=repo, + user=user, + run_spec=run_spec, + fleet=fleet, + ) + master_job = await create_job( + session=session, + run=run, + job_num=0, + waiting_master_job=False, + ) + worker_job = await create_job( + session=session, + run=run, + job_num=1, + waiting_master_job=True, + ) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + compute_mock = Mock(spec=ComputeMockSpec) + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value = compute_mock + m.return_value = [backend_mock] + compute_mock.get_offers.return_value = [offer] + compute_mock.run_job.return_value = get_job_provisioning_data( + dockerized=True, + backend=BackendType.AWS, + ) + + await _process_job(session=session, worker=worker, job_model=master_job) + + master_job = await _get_job(session, master_job.id) + worker_job = await _get_job(session, worker_job.id) + assert master_job.status == JobStatus.SUBMITTED + assert master_job.instance_assigned + assert master_job.instance is not None + placeholder_id = master_job.instance.id + assert master_job.instance.status == InstanceStatus.PENDING + assert master_job.used_instance_id == placeholder_id + assert master_job.fleet_id == fleet.id + assert worker_job.waiting_master_job + compute_mock.run_job.assert_not_called() + compute_mock.run_jobs.assert_not_called() + + competing_instance = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.BUSY, + backend=BackendType.AWS, + job_provisioning_data=get_job_provisioning_data(backend=BackendType.AWS), + ) + + await _process_job(session=session, worker=worker, job_model=master_job) + + master_job = await _get_job(session, master_job.id) + worker_job = await _get_job(session, worker_job.id) + assert master_job.status == JobStatus.PROVISIONING + assert master_job.instance is not None + assert master_job.instance.id == placeholder_id + assert master_job.instance.status == InstanceStatus.PROVISIONING + assert master_job.used_instance_id == placeholder_id + assert worker_job.waiting_master_job is False + compute_mock.run_job.assert_called_once() + compute_mock.run_jobs.assert_not_called() + res = await session.execute( + select(InstanceModel).where( + InstanceModel.fleet_id == fleet.id, + InstanceModel.deleted == False, + ) + ) + assert {instance.id for instance in res.scalars().all()} == { + placeholder_id, + competing_instance.id, + } async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_fleet( self, test_db, session: AsyncSession, worker: JobSubmittedWorker @@ -1050,6 +1161,134 @@ async def test_assigns_job_to_specific_fleet( assert job.instance is not None and job.instance.id == instance_2.id assert job.fleet_id == fleet_2.id + async def test_assignment_creates_placeholder_instance_for_new_capacity( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project) + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job(session=session, run=run) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + m.return_value = [backend_mock] + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.return_value = [offer] + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + assert job.status == JobStatus.SUBMITTED + assert job.instance_assigned + assert job.fleet_id == fleet.id + assert job.used_instance_id is not None + # Query the placeholder instance directly to avoid stale session cache + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.id == job.used_instance_id) + .execution_options(populate_existing=True) + ) + placeholder = res.scalar_one() + assert placeholder.status == InstanceStatus.PENDING + assert placeholder.provisioning_job_id == job.id + assert placeholder.fleet_id == fleet.id + assert placeholder.offer is None + assert placeholder.instance_num == 0 + + async def test_assignment_retries_when_fleet_is_full( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.BUSY, + ) + run = await create_run(session=session, project=project, repo=repo, user=user) + job = await create_job(session=session, run=run) + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + m.return_value = [backend_mock] + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value.get_offers.return_value = [offer] + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + # Assignment retried — job not committed as assigned + assert job.status == JobStatus.SUBMITTED + assert not job.instance_assigned + assert job.instance is None + # No placeholder must be committed when the fleet is full. + res = await session.execute( + select(InstanceModel).where( + InstanceModel.fleet_id == fleet.id, + InstanceModel.deleted == False, + ) + ) + assert len(res.scalars().all()) == 1 + + async def test_leaves_placeholder_for_terminating_pipeline_on_failed_new_capacity_provisioning( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + fleet=fleet, + ) + placeholder = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.PENDING, + offer=None, + job_provisioning_data=None, + backend=BackendType.AWS, + ) + job = await create_job( + session=session, run=run, instance=placeholder, instance_assigned=True + ) + placeholder.provisioning_job_id = job.id + await session.commit() + + offer = get_instance_offer_with_availability(backend=BackendType.AWS) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + compute_mock = Mock(spec=ComputeMockSpec) + backend_mock.TYPE = BackendType.AWS + backend_mock.compute.return_value = compute_mock + m.return_value = [backend_mock] + compute_mock.get_offers.return_value = [offer] + compute_mock.run_job.side_effect = BackendError("boom") + + await _process_job(session=session, worker=worker, job_model=job) + + job = await _get_job(session, job.id) + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + await session.refresh(placeholder) + assert not placeholder.deleted + assert placeholder.status == InstanceStatus.PENDING + async def test_provisions_compute_group( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py index ba4eabb66..1d003dce0 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_terminating_jobs.py @@ -758,6 +758,40 @@ async def test_finishes_job_when_used_instance_is_not_set( assert job.lock_token is None assert job.lock_expires_at is None + async def test_terminates_job_with_placeholder_instance( + self, test_db, session: AsyncSession, worker: JobTerminatingWorker + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + placeholder = await create_instance( + session=session, + project=project, + status=InstanceStatus.PENDING, + provisioning_job_id=uuid.uuid4(), + offer=None, + job_provisioning_data=None, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.TERMINATING, + termination_reason=JobTerminationReason.TERMINATED_BY_USER, + instance=placeholder, + ) + _lock_job(job) + await session.commit() + + # No mocks needed — placeholder has no VM, no SSH, no container + await worker.process(_job_to_pipeline_item(job)) + + await session.refresh(job) + await session.refresh(placeholder) + assert job.status == JobStatus.TERMINATED + assert job.instance_id is None + assert placeholder.status == InstanceStatus.TERMINATING + async def test_retries_detaching_when_used_instance_is_missing( self, test_db, session: AsyncSession, worker: JobTerminatingWorker ): diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 196f263a6..d5f9ffd95 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -2071,6 +2071,35 @@ async def test_importer_member_cannot_delete_imported_fleet_instances( ) assert response.status_code == 403 + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_rejects_deleting_placeholder_instance( + self, test_db, session: AsyncSession, client: AsyncClient + ): + user = await create_user(session, global_role=GlobalRole.USER) + project = await create_project(session) + await add_project_member( + session=session, project=project, user=user, project_role=ProjectRole.USER + ) + fleet = await create_fleet(session=session, project=project) + await create_instance( + session=session, + project=project, + fleet=fleet, + instance_num=0, + status=InstanceStatus.PENDING, + provisioning_job_id=uuid4(), + offer=None, + job_provisioning_data=None, + ) + response = await client.post( + f"/api/project/{project.name}/fleets/delete_instances", + headers=get_auth_headers(user.token), + json={"name": fleet.name, "instance_nums": [0]}, + ) + assert response.status_code == 400 + assert "provisioning" in response.text + class TestGetPlan: @pytest.mark.asyncio