diff --git a/frontend/src/types/fleet.d.ts b/frontend/src/types/fleet.d.ts index b5050167b..b72216158 100644 --- a/frontend/src/types/fleet.d.ts +++ b/frontend/src/types/fleet.d.ts @@ -85,7 +85,6 @@ declare interface IProfileRequest { } declare interface IFleetSpec { - autocreated?: boolean; configuration: IFleetConfigurationRequest; configuration_path?: string; profile: IProfileRequest; diff --git a/src/dstack/_internal/core/compatibility/fleets.py b/src/dstack/_internal/core/compatibility/fleets.py index 04a92ed77..3c76c73e7 100644 --- a/src/dstack/_internal/core/compatibility/fleets.py +++ b/src/dstack/_internal/core/compatibility/fleets.py @@ -46,12 +46,7 @@ def get_fleet_spec_excludes(fleet_spec: FleetSpec) -> Optional[IncludeExcludeDic configuration_excludes: IncludeExcludeDictType = {} profile_excludes: IncludeExcludeSetType = set() - # Add excludes like this: - # - # if fleet_spec.configuration.tags is None: - # configuration_excludes["tags"] = True - # if fleet_spec.profile.tags is None: - # profile_excludes.add("tags") + spec_excludes["autocreated"] = True if configuration_excludes: spec_excludes["configuration"] = configuration_excludes diff --git a/src/dstack/_internal/core/models/fleets.py b/src/dstack/_internal/core/models/fleets.py index dbf78ca25..af8f04566 100644 --- a/src/dstack/_internal/core/models/fleets.py +++ b/src/dstack/_internal/core/models/fleets.py @@ -371,7 +371,11 @@ class FleetSpec(generate_dual_core_model(FleetSpecConfig)): configuration: FleetConfiguration configuration_path: Optional[str] = None profile: Profile + # TODO: Drop `autocreated` once last client sending it (0.20.16) + # and existing autocreated fleets no longer supported. autocreated: bool = False + """Deprecated. Kept for deserialization of old client requests and existing DB records. + """ # TODO: make `merged_profile` a computed field after migrating to Pydantic v2. merged_profile: Annotated[Profile, Field(exclude=True)] = None """`merged_profile` stores profile parameters merged from `profile` and `configuration`. diff --git a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py index 1c4758889..e34d1137b 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/fleets.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/fleets.py @@ -354,6 +354,7 @@ def _get_fleet_spec_if_ready_for_consolidation(fleet_model: FleetModel) -> Optio if fleet_model.status == FleetStatus.TERMINATING: return None consolidation_fleet_spec = get_fleet_spec(fleet_model) + # TODO: Drop fleet_spec.autocreated check after existing autocreated fleets no longer supported if ( consolidation_fleet_spec.configuration.nodes is None or consolidation_fleet_spec.autocreated @@ -718,18 +719,17 @@ def _should_delete_fleet(fleet_model: FleetModel) -> bool: if is_fleet_in_use(fleet_model) or not is_fleet_empty(fleet_model): return False - # TODO: Drop non-terminating fleets auto-deletion after dropping fleets auto-creation. fleet_spec = get_fleet_spec(fleet_model) - if ( - fleet_model.status != FleetStatus.TERMINATING - and fleet_spec.configuration.nodes is not None - and fleet_spec.configuration.nodes.min == 0 - ): - # Empty fleets that allow 0 nodes should not be auto-deleted - return False + if fleet_model.status == FleetStatus.TERMINATING: + logger.info("Automatic cleanup of terminating empty fleet %s", fleet_model.name) + return True + + # TODO: Drop autocreated fleet auto-deletion after existing autocreated fleets no longer supported. + if fleet_spec.autocreated: + logger.info("Automatic cleanup of empty autocreated fleet %s", fleet_model.name) + return True - logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name) - return True + return False def _build_instance_update_rows( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py index a896791b4..a38696047 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py @@ -77,6 +77,7 @@ async def can_terminate_fleet_instances_on_idle_duration( fleet_model: FleetModel, ) -> bool: fleet_spec = get_fleet_spec(fleet_model) + # TODO: Drop fleet_spec.autocreated check after existing autocreated fleets no longer supported if fleet_spec.configuration.nodes is None or fleet_spec.autocreated: return True res = await session.execute( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 0e4358d6f..d2bf039e6 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -27,10 +27,7 @@ ComputeGroupStatus, ) from dstack._internal.core.models.fleets import ( - FleetConfiguration, - FleetNodesSpec, FleetSpec, - FleetStatus, InstanceGroupPlacement, ) from dstack._internal.core.models.instances import ( @@ -89,7 +86,6 @@ from dstack._internal.server.services.docker import apply_server_docker_defaults from dstack._internal.server.services.fleets import ( check_can_create_new_cloud_instance_in_fleet, - generate_fleet_name, get_fleet_master_instance_provisioning_data, get_fleet_spec, get_next_instance_num, @@ -136,11 +132,9 @@ ) from dstack._internal.server.services.runs.spec import ( check_run_spec_requires_instance_mounts, - get_nodes_required_num, ) from dstack._internal.server.services.volumes import volume_model_to_volume from dstack._internal.server.utils import sentry_utils -from dstack._internal.settings import FeatureFlags from dstack._internal.utils.common import get_current_datetime, get_or_error, run_async from dstack._internal.utils.logging import get_logger @@ -463,7 +457,6 @@ class _NewCapacityProvisioning: provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData] offer: InstanceOfferWithAvailability effective_profile: Profile - created_fleet_model: Optional[FleetModel] placement_group_cleanup: Optional[_PlacementGroupCleanup] volume_attachment_result: Optional[_VolumeAttachmentResult] locked_fleet_id: Optional[uuid.UUID] @@ -866,21 +859,16 @@ async def _apply_no_fleet_selection( ) return - if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: - logger.debug("%s: no fleet found", fmt(job_model)) - await _terminate_submitted_job( - session=session, - job_model=job_model, - reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, - message=( - "No matching fleet found. Possible reasons: " - "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" - ), - ) - return - - job_model.instance_assigned = True - await _mark_job_processed(session=session, job_model=job_model) + logger.debug("%s: no fleet found", fmt(job_model)) + await _terminate_submitted_job( + session=session, + job_model=job_model, + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message=( + "No matching fleet found. Possible reasons: " + "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" + ), + ) async def _lock_assignment_fleet_for_existing_instance_assignment( @@ -1170,6 +1158,16 @@ async def _process_new_capacity_provisioning( preconditions: _ProcessedPreconditions, ) -> _ProvisioningResult: fleet_model = context.fleet_model + if fleet_model is None: + # Legacy in-flight job from autocreated fleets path (instance_assigned=True, no fleet). + # Autocreated fleets are no longer supported; terminate the job. + return _TerminateSubmittedJobResult( + reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, + message=( + "No matching fleet found. Possible reasons: " + "https://dstack.ai/docs/guides/troubleshooting/#no-fleets" + ), + ) locked_fleet_id = None if _should_refresh_related_cluster_master_fleet(context=context): assert fleet_model is not None @@ -1220,14 +1218,6 @@ async def _process_new_capacity_provisioning( placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup, ) - created_fleet_model = None - if context.fleet_model is None: - # TODO: Drop once autocreated fleets are dropped. - created_fleet_model = await _create_fleet_model_for_job( - project=context.project, - run=context.run, - ) - volume_attachment_result = None # TODO: Volume attachment for compute groups is not yet supported since # currently supported compute groups don't require explicit volume attachment. @@ -1244,7 +1234,6 @@ async def _process_new_capacity_provisioning( provisioning_data=provision_new_capacity_result.provisioning_data, offer=provision_new_capacity_result.offer, effective_profile=provision_new_capacity_result.effective_profile, - created_fleet_model=created_fleet_model, placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup, volume_attachment_result=volume_attachment_result, locked_fleet_id=locked_fleet_id, @@ -1259,23 +1248,6 @@ async def _apply_new_capacity_provisioning( ) -> None: fresh_context = await _load_submitted_job_context(session=session, job_model=job_model) fleet_model = fresh_context.fleet_model - if provisioning.created_fleet_model is not None: - fleet_model = provisioning.created_fleet_model - # Replace the project loaded in the processing session with the one - # bound to this apply session to avoid a duplicate-identity conflict. - fleet_model.project = fresh_context.project - session.add(fleet_model) - fresh_context.job_model.fleet = fleet_model - events.emit( - session, - f"Fleet created for job. Fleet status: {fleet_model.status.upper()}", - actor=events.SystemActor(), - targets=[ - events.Target.from_model(fleet_model), - events.Target.from_model(fresh_context.job_model), - ], - ) - assert fleet_model is not None await _persist_placement_group_cleanup( session=session, @@ -2165,42 +2137,6 @@ def _get_effective_profile_and_requirements( return effective_profile, requirements -async def _create_fleet_model_for_job( - project: ProjectModel, - run: Run, -) -> FleetModel: - placement = InstanceGroupPlacement.ANY - if run.run_spec.configuration.type == "task" and run.run_spec.configuration.nodes > 1: - placement = InstanceGroupPlacement.CLUSTER - nodes = get_nodes_required_num(run.run_spec) - async with get_session_ctx() as session: - # Duplicate fleet names are possible because of the missing fleet lock. - # Unfixed since autocreated are to be dropped anyway. - fleet_name = await generate_fleet_name(session=session, project=project) - spec = FleetSpec( - configuration=FleetConfiguration( - name=fleet_name, - placement=placement, - reservation=run.run_spec.configuration.reservation, - nodes=FleetNodesSpec( - min=nodes, - target=nodes, - max=None, - ), - ), - profile=run.run_spec.merged_profile, - autocreated=True, - ) - return FleetModel( - id=uuid.uuid4(), - name=fleet_name, - project=project, - status=FleetStatus.ACTIVE, - spec=spec.json(), - instances=[], - ) - - def _get_offer_volumes( volumes: list[list[Volume]], offer: InstanceOfferWithAvailability, diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 5d05fcaa6..2c1744504 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -61,7 +61,6 @@ get_nodes_required_num, ) from dstack._internal.server.services.secrets import get_project_secrets_mapping -from dstack._internal.settings import FeatureFlags from dstack._internal.utils import common as common_utils from dstack._internal.utils.logging import get_logger @@ -139,22 +138,6 @@ async def get_job_plans( job=jobs[0], volumes=volumes, ) - elif ( - FeatureFlags.AUTOCREATED_FLEETS_ENABLED - and profile.fleets is None - and fleet_model is None - ): - # Keep the old behavior returning all offers irrespective of fleets - # when no fleets are explicitly specified. Needed for supporting - # offers with autocreated fleets flow. - instance_offers, backend_offers = await _get_non_fleet_offers( - session=session, - project=project, - profile=profile, - run_spec=run_spec, - job=jobs[0], - volumes=volumes, - ) for job in jobs: job_plan = _get_job_plan( @@ -211,22 +194,6 @@ async def get_job_plans( job=jobs[0], volumes=volumes, ) - elif ( - FeatureFlags.AUTOCREATED_FLEETS_ENABLED - and profile.fleets is None - and fleet_model is None - ): - # Keep the old behavior returning all offers irrespective of fleets - # when no fleets are explicitly specified. Needed for supporting - # offers with autocreated fleets flow. - instance_offers, backend_offers = await _get_non_fleet_offers( - session=session, - project=project, - profile=profile, - run_spec=run_spec, - job=jobs[0], - volumes=volumes, - ) for job in jobs: job_plan = _get_job_plan( @@ -461,17 +428,6 @@ async def find_optimal_fleet_with_offers( if len(candidate_fleets_with_offers) == 0: return None, [], [] - if ( - FeatureFlags.AUTOCREATED_FLEETS_ENABLED - and run_spec.merged_profile.fleets is None - and all(t[3] == 0 and t[4] == 0 for t in candidate_fleets_with_offers) - ): - # If fleets are not specified and no fleets have available pool - # or backend offers, create a new fleet. - # This is for compatibility with non-fleet-first UX when runs created new fleets - # if there are no instances to reuse. - return None, [], [] - candidate_fleets_with_offers.sort(key=lambda t: t[-1]) optimal_fleet_model, instance_offers = candidate_fleets_with_offers[0][:2] # Refetch backend offers without limit to return all offers for the optimal fleet. diff --git a/src/dstack/_internal/settings.py b/src/dstack/_internal/settings.py index b6567810d..18926632b 100644 --- a/src/dstack/_internal/settings.py +++ b/src/dstack/_internal/settings.py @@ -44,11 +44,6 @@ class FeatureFlags: development. Feature flags are environment variables of the form DSTACK_FF_* """ - AUTOCREATED_FLEETS_ENABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_ENABLED") is not None - """DSTACK_FF_AUTOCREATED_FLEETS_ENABLED enables legacy autocreated fleets: - If there are no fleet suitable for the run, a new fleet is created automatically instead of an error. - """ - CLI_PRINT_JOB_CONNECTION_INFO = ( os.getenv("DSTACK_FF_CLI_PRINT_JOB_CONNECTION_INFO") is not None ) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_fleets.py b/src/tests/_internal/server/background/pipeline_tasks/test_fleets.py index 4726dfb9d..1a47d6ca5 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_fleets.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_fleets.py @@ -894,8 +894,6 @@ async def test_deletes_terminating_user_fleet( self, test_db, session: AsyncSession, worker: FleetWorker ): project = await create_project(session) - spec = get_fleet_spec() - spec.autocreated = False fleet = await create_fleet( session=session, project=project, @@ -911,6 +909,24 @@ async def test_deletes_terminating_user_fleet( await session.refresh(fleet) assert fleet.deleted + async def test_does_not_delete_empty_active_user_fleet( + self, test_db, session: AsyncSession, worker: FleetWorker + ): + project = await create_project(session) + fleet = await create_fleet( + session=session, + project=project, + ) + + fleet.lock_token = uuid.uuid4() + fleet.lock_expires_at = datetime(2025, 1, 2, 3, 4, tzinfo=timezone.utc) + await session.commit() + + await worker.process(_fleet_to_pipeline_item(fleet)) + + await session.refresh(fleet) + assert not fleet.deleted + async def test_does_not_delete_fleet_with_active_run( self, test_db, session: AsyncSession, worker: FleetWorker ): diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 631297522..4ef56ffc3 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -61,7 +61,6 @@ get_ssh_fleet_configuration, get_volume_provisioning_data, ) -from dstack._internal.settings import FeatureFlags from dstack._internal.utils.common import get_current_datetime pytestmark = pytest.mark.usefixtures("image_config_mock") @@ -1253,7 +1252,7 @@ async def test_terminates_job_when_specified_fleets_cannot_be_used( assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY assert job.termination_reason_message == "Failed to use specified fleets" - async def test_terminates_job_when_no_matching_fleet_and_autocreated_disabled( + async def test_terminates_job_when_no_matching_fleet( self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): project = await create_project(session=session) @@ -1270,30 +1269,23 @@ async def test_terminates_job_when_no_matching_fleet_and_autocreated_disabled( assert job.termination_reason_message is not None assert "No matching fleet found" in job.termination_reason_message - async def test_marks_job_assigned_without_fleet_when_autocreated_enabled( - self, - test_db, - session: AsyncSession, - worker: JobSubmittedWorker, - monkeypatch: pytest.MonkeyPatch, + async def test_terminates_legacy_autocreated_job_with_no_fleet( + self, test_db, session: AsyncSession, worker: JobSubmittedWorker ): - monkeypatch.setattr(FeatureFlags, "AUTOCREATED_FLEETS_ENABLED", True) project = await create_project(session=session) user = await create_user(session=session) repo = await create_repo(session=session, project_id=project.id) run = await create_run(session=session, project=project, repo=repo, user=user) - job = await create_job(session=session, run=run) + # Simulate legacy in-flight state: instance_assigned=True but no fleet + job = await create_job(session=session, run=run, instance_assigned=True) await _process_job(session=session, worker=worker, job_model=job) await session.refresh(job) - assert job.status == JobStatus.SUBMITTED - assert job.instance_assigned - assert job.fleet_id is None - assert job.instance_id is None - assert job.lock_owner is None - assert job.lock_token is None - assert job.lock_expires_at is None + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + assert job.termination_reason_message is not None + assert "No matching fleet found" in job.termination_reason_message async def test_resets_lock_for_retry_when_existing_instance_offer_cannot_be_locked( self, test_db, session: AsyncSession, worker: JobSubmittedWorker @@ -1519,50 +1511,6 @@ async def test_reclaims_stale_related_volume_lock( assert volume.lock_expires_at is None backend_mock.compute.return_value.attach_volume.assert_called_once() - async def test_provisions_new_capacity_with_autocreated_fleet( - self, - test_db, - session: AsyncSession, - worker: JobSubmittedWorker, - monkeypatch: pytest.MonkeyPatch, - ): - monkeypatch.setattr(FeatureFlags, "AUTOCREATED_FLEETS_ENABLED", True) - project = await create_project(session=session) - user = await create_user(session=session) - repo = await create_repo(session=session, project_id=project.id) - run = await create_run(session=session, project=project, repo=repo, user=user) - job = await create_job(session=session, run=run) - - # First pass: no fleet found, mark instance_assigned=True with no fleet - await _process_job(session=session, worker=worker, job_model=job) - - job = await _get_job(session, job.id) - assert job.status == JobStatus.SUBMITTED - assert job.instance_assigned - assert job.fleet_id is None - - # Second pass: provision new capacity and autocreate fleet - offer = get_instance_offer_with_availability(backend=BackendType.AWS) - with patch("dstack._internal.server.services.backends.get_project_backends") as m: - backend_mock = Mock() - m.return_value = [backend_mock] - backend_mock.TYPE = BackendType.AWS - backend_mock.compute.return_value.get_offers.return_value = [offer] - backend_mock.compute.return_value.run_job.return_value = get_job_provisioning_data( - dockerized=True, - backend=BackendType.AWS, - ) - - await _process_job(session=session, worker=worker, job_model=job) - - job = await _get_job(session, job.id) - assert job.status == JobStatus.PROVISIONING - assert job.instance is not None - assert job.fleet_id is not None - assert job.lock_owner is None - assert job.lock_token is None - assert job.lock_expires_at is None - async def test_run_job_uses_server_default_registry( self, monkeypatch: pytest.MonkeyPatch,