Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion frontend/src/types/fleet.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ declare interface IProfileRequest {
}

declare interface IFleetSpec {
autocreated?: boolean;
configuration: IFleetConfigurationRequest;
configuration_path?: string;
profile: IProfileRequest;
Expand Down
7 changes: 1 addition & 6 deletions src/dstack/_internal/core/compatibility/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,7 @@ def get_fleet_spec_excludes(fleet_spec: FleetSpec) -> Optional[IncludeExcludeDic
configuration_excludes: IncludeExcludeDictType = {}
profile_excludes: IncludeExcludeSetType = set()

# Add excludes like this:
#
# if fleet_spec.configuration.tags is None:
# configuration_excludes["tags"] = True
# if fleet_spec.profile.tags is None:
# profile_excludes.add("tags")
spec_excludes["autocreated"] = True

if configuration_excludes:
spec_excludes["configuration"] = configuration_excludes
Expand Down
4 changes: 4 additions & 0 deletions src/dstack/_internal/core/models/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ class FleetSpec(generate_dual_core_model(FleetSpecConfig)):
configuration: FleetConfiguration
configuration_path: Optional[str] = None
profile: Profile
# TODO: Drop `autocreated` once last client sending it (0.20.16)
# and existing autocreated fleets no longer supported.
autocreated: bool = False
"""Deprecated. Kept for deserialization of old client requests and existing DB records.
"""
# TODO: make `merged_profile` a computed field after migrating to Pydantic v2.
merged_profile: Annotated[Profile, Field(exclude=True)] = None
"""`merged_profile` stores profile parameters merged from `profile` and `configuration`.
Expand Down
20 changes: 10 additions & 10 deletions src/dstack/_internal/server/background/pipeline_tasks/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ def _get_fleet_spec_if_ready_for_consolidation(fleet_model: FleetModel) -> Optio
if fleet_model.status == FleetStatus.TERMINATING:
return None
consolidation_fleet_spec = get_fleet_spec(fleet_model)
# TODO: Drop fleet_spec.autocreated check after existing autocreated fleets no longer supported
if (
consolidation_fleet_spec.configuration.nodes is None
or consolidation_fleet_spec.autocreated
Expand Down Expand Up @@ -718,18 +719,17 @@ def _should_delete_fleet(fleet_model: FleetModel) -> bool:
if is_fleet_in_use(fleet_model) or not is_fleet_empty(fleet_model):
return False

# TODO: Drop non-terminating fleets auto-deletion after dropping fleets auto-creation.
fleet_spec = get_fleet_spec(fleet_model)
if (
fleet_model.status != FleetStatus.TERMINATING
and fleet_spec.configuration.nodes is not None
and fleet_spec.configuration.nodes.min == 0
):
# Empty fleets that allow 0 nodes should not be auto-deleted
return False
if fleet_model.status == FleetStatus.TERMINATING:
logger.info("Automatic cleanup of terminating empty fleet %s", fleet_model.name)
return True

# TODO: Drop autocreated fleet auto-deletion after existing autocreated fleets no longer supported.
if fleet_spec.autocreated:
logger.info("Automatic cleanup of empty autocreated fleet %s", fleet_model.name)
return True

logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name)
return True
return False


def _build_instance_update_rows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async def can_terminate_fleet_instances_on_idle_duration(
fleet_model: FleetModel,
) -> bool:
fleet_spec = get_fleet_spec(fleet_model)
# TODO: Drop fleet_spec.autocreated check after existing autocreated fleets no longer supported
if fleet_spec.configuration.nodes is None or fleet_spec.autocreated:
return True
res = await session.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
ComputeGroupStatus,
)
from dstack._internal.core.models.fleets import (
FleetConfiguration,
FleetNodesSpec,
FleetSpec,
FleetStatus,
InstanceGroupPlacement,
)
from dstack._internal.core.models.instances import (
Expand Down Expand Up @@ -89,7 +86,6 @@
from dstack._internal.server.services.docker import apply_server_docker_defaults
from dstack._internal.server.services.fleets import (
check_can_create_new_cloud_instance_in_fleet,
generate_fleet_name,
get_fleet_master_instance_provisioning_data,
get_fleet_spec,
get_next_instance_num,
Expand Down Expand Up @@ -136,11 +132,9 @@
)
from dstack._internal.server.services.runs.spec import (
check_run_spec_requires_instance_mounts,
get_nodes_required_num,
)
from dstack._internal.server.services.volumes import volume_model_to_volume
from dstack._internal.server.utils import sentry_utils
from dstack._internal.settings import FeatureFlags
from dstack._internal.utils.common import get_current_datetime, get_or_error, run_async
from dstack._internal.utils.logging import get_logger

Expand Down Expand Up @@ -463,7 +457,6 @@ class _NewCapacityProvisioning:
provisioning_data: Union[JobProvisioningData, ComputeGroupProvisioningData]
offer: InstanceOfferWithAvailability
effective_profile: Profile
created_fleet_model: Optional[FleetModel]
placement_group_cleanup: Optional[_PlacementGroupCleanup]
volume_attachment_result: Optional[_VolumeAttachmentResult]
locked_fleet_id: Optional[uuid.UUID]
Expand Down Expand Up @@ -866,21 +859,16 @@ async def _apply_no_fleet_selection(
)
return

if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED:
logger.debug("%s: no fleet found", fmt(job_model))
await _terminate_submitted_job(
session=session,
job_model=job_model,
reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
message=(
"No matching fleet found. Possible reasons: "
"https://dstack.ai/docs/guides/troubleshooting/#no-fleets"
),
)
return

job_model.instance_assigned = True
await _mark_job_processed(session=session, job_model=job_model)
logger.debug("%s: no fleet found", fmt(job_model))
await _terminate_submitted_job(
session=session,
job_model=job_model,
reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
message=(
"No matching fleet found. Possible reasons: "
"https://dstack.ai/docs/guides/troubleshooting/#no-fleets"
),
)


async def _lock_assignment_fleet_for_existing_instance_assignment(
Expand Down Expand Up @@ -1170,6 +1158,16 @@ async def _process_new_capacity_provisioning(
preconditions: _ProcessedPreconditions,
) -> _ProvisioningResult:
fleet_model = context.fleet_model
if fleet_model is None:
# Legacy in-flight job from autocreated fleets path (instance_assigned=True, no fleet).
# Autocreated fleets are no longer supported; terminate the job.
return _TerminateSubmittedJobResult(
reason=JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY,
message=(
"No matching fleet found. Possible reasons: "
"https://dstack.ai/docs/guides/troubleshooting/#no-fleets"
),
)
locked_fleet_id = None
if _should_refresh_related_cluster_master_fleet(context=context):
assert fleet_model is not None
Expand Down Expand Up @@ -1220,14 +1218,6 @@ async def _process_new_capacity_provisioning(
placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup,
)

created_fleet_model = None
if context.fleet_model is None:
# TODO: Drop once autocreated fleets are dropped.
created_fleet_model = await _create_fleet_model_for_job(
project=context.project,
run=context.run,
)

volume_attachment_result = None
# TODO: Volume attachment for compute groups is not yet supported since
# currently supported compute groups don't require explicit volume attachment.
Expand All @@ -1244,7 +1234,6 @@ async def _process_new_capacity_provisioning(
provisioning_data=provision_new_capacity_result.provisioning_data,
offer=provision_new_capacity_result.offer,
effective_profile=provision_new_capacity_result.effective_profile,
created_fleet_model=created_fleet_model,
placement_group_cleanup=provision_new_capacity_result.placement_group_cleanup,
volume_attachment_result=volume_attachment_result,
locked_fleet_id=locked_fleet_id,
Expand All @@ -1259,23 +1248,6 @@ async def _apply_new_capacity_provisioning(
) -> None:
fresh_context = await _load_submitted_job_context(session=session, job_model=job_model)
fleet_model = fresh_context.fleet_model
if provisioning.created_fleet_model is not None:
fleet_model = provisioning.created_fleet_model
# Replace the project loaded in the processing session with the one
# bound to this apply session to avoid a duplicate-identity conflict.
fleet_model.project = fresh_context.project
session.add(fleet_model)
fresh_context.job_model.fleet = fleet_model
events.emit(
session,
f"Fleet created for job. Fleet status: {fleet_model.status.upper()}",
actor=events.SystemActor(),
targets=[
events.Target.from_model(fleet_model),
events.Target.from_model(fresh_context.job_model),
],
)

assert fleet_model is not None
await _persist_placement_group_cleanup(
session=session,
Expand Down Expand Up @@ -2165,42 +2137,6 @@ def _get_effective_profile_and_requirements(
return effective_profile, requirements


async def _create_fleet_model_for_job(
project: ProjectModel,
run: Run,
) -> FleetModel:
placement = InstanceGroupPlacement.ANY
if run.run_spec.configuration.type == "task" and run.run_spec.configuration.nodes > 1:
placement = InstanceGroupPlacement.CLUSTER
nodes = get_nodes_required_num(run.run_spec)
async with get_session_ctx() as session:
# Duplicate fleet names are possible because of the missing fleet lock.
# Unfixed since autocreated are to be dropped anyway.
fleet_name = await generate_fleet_name(session=session, project=project)
spec = FleetSpec(
configuration=FleetConfiguration(
name=fleet_name,
placement=placement,
reservation=run.run_spec.configuration.reservation,
nodes=FleetNodesSpec(
min=nodes,
target=nodes,
max=None,
),
),
profile=run.run_spec.merged_profile,
autocreated=True,
)
return FleetModel(
id=uuid.uuid4(),
name=fleet_name,
project=project,
status=FleetStatus.ACTIVE,
spec=spec.json(),
instances=[],
)


def _get_offer_volumes(
volumes: list[list[Volume]],
offer: InstanceOfferWithAvailability,
Expand Down
44 changes: 0 additions & 44 deletions src/dstack/_internal/server/services/runs/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
get_nodes_required_num,
)
from dstack._internal.server.services.secrets import get_project_secrets_mapping
from dstack._internal.settings import FeatureFlags
from dstack._internal.utils import common as common_utils
from dstack._internal.utils.logging import get_logger

Expand Down Expand Up @@ -139,22 +138,6 @@ async def get_job_plans(
job=jobs[0],
volumes=volumes,
)
elif (
FeatureFlags.AUTOCREATED_FLEETS_ENABLED
and profile.fleets is None
and fleet_model is None
):
# Keep the old behavior returning all offers irrespective of fleets
# when no fleets are explicitly specified. Needed for supporting
# offers with autocreated fleets flow.
instance_offers, backend_offers = await _get_non_fleet_offers(
session=session,
project=project,
profile=profile,
run_spec=run_spec,
job=jobs[0],
volumes=volumes,
)

for job in jobs:
job_plan = _get_job_plan(
Expand Down Expand Up @@ -211,22 +194,6 @@ async def get_job_plans(
job=jobs[0],
volumes=volumes,
)
elif (
FeatureFlags.AUTOCREATED_FLEETS_ENABLED
and profile.fleets is None
and fleet_model is None
):
# Keep the old behavior returning all offers irrespective of fleets
# when no fleets are explicitly specified. Needed for supporting
# offers with autocreated fleets flow.
instance_offers, backend_offers = await _get_non_fleet_offers(
session=session,
project=project,
profile=profile,
run_spec=run_spec,
job=jobs[0],
volumes=volumes,
)

for job in jobs:
job_plan = _get_job_plan(
Expand Down Expand Up @@ -461,17 +428,6 @@ async def find_optimal_fleet_with_offers(
if len(candidate_fleets_with_offers) == 0:
return None, [], []

if (
FeatureFlags.AUTOCREATED_FLEETS_ENABLED
and run_spec.merged_profile.fleets is None
and all(t[3] == 0 and t[4] == 0 for t in candidate_fleets_with_offers)
):
# If fleets are not specified and no fleets have available pool
# or backend offers, create a new fleet.
# This is for compatibility with non-fleet-first UX when runs created new fleets
# if there are no instances to reuse.
return None, [], []

candidate_fleets_with_offers.sort(key=lambda t: t[-1])
optimal_fleet_model, instance_offers = candidate_fleets_with_offers[0][:2]
# Refetch backend offers without limit to return all offers for the optimal fleet.
Expand Down
5 changes: 0 additions & 5 deletions src/dstack/_internal/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@ class FeatureFlags:
development. Feature flags are environment variables of the form DSTACK_FF_*
"""

AUTOCREATED_FLEETS_ENABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_ENABLED") is not None
"""DSTACK_FF_AUTOCREATED_FLEETS_ENABLED enables legacy autocreated fleets:
If there are no fleet suitable for the run, a new fleet is created automatically instead of an error.
"""

CLI_PRINT_JOB_CONNECTION_INFO = (
os.getenv("DSTACK_FF_CLI_PRINT_JOB_CONNECTION_INFO") is not None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,6 @@ async def test_deletes_terminating_user_fleet(
self, test_db, session: AsyncSession, worker: FleetWorker
):
project = await create_project(session)
spec = get_fleet_spec()
spec.autocreated = False
fleet = await create_fleet(
session=session,
project=project,
Expand All @@ -911,6 +909,24 @@ async def test_deletes_terminating_user_fleet(
await session.refresh(fleet)
assert fleet.deleted

async def test_does_not_delete_empty_active_user_fleet(
self, test_db, session: AsyncSession, worker: FleetWorker
):
project = await create_project(session)
fleet = await create_fleet(
session=session,
project=project,
)

fleet.lock_token = uuid.uuid4()
fleet.lock_expires_at = datetime(2025, 1, 2, 3, 4, tzinfo=timezone.utc)
await session.commit()

await worker.process(_fleet_to_pipeline_item(fleet))

await session.refresh(fleet)
assert not fleet.deleted

async def test_does_not_delete_fleet_with_active_run(
self, test_db, session: AsyncSession, worker: FleetWorker
):
Expand Down
Loading
Loading