diff --git a/src/dstack/_internal/cli/services/configurators/run.py b/src/dstack/_internal/cli/services/configurators/run.py index 63738c968..578c4a9eb 100644 --- a/src/dstack/_internal/cli/services/configurators/run.py +++ b/src/dstack/_internal/cli/services/configurators/run.py @@ -28,7 +28,12 @@ from dstack._internal.cli.services.resources import apply_resources_args, register_resources_args from dstack._internal.cli.utils.common import confirm_ask, console from dstack._internal.cli.utils.rich import MultiItemStatus -from dstack._internal.cli.utils.run import get_runs_table, print_run_plan +from dstack._internal.cli.utils.run import ( + RunWaitStatus, + get_run_wait_status, + get_runs_table, + print_run_plan, +) from dstack._internal.core.errors import ( CLIError, ConfigurationError, @@ -192,10 +197,14 @@ def apply_configuration( try: # We can attach to run multiple times if it goes from running to pending (retried). while True: - with MultiItemStatus(f"Launching [code]{run.name}[/]...", console=console) as live: + with MultiItemStatus(_get_apply_status(run), console=console) as live: while not _is_ready_to_attach(run): table = get_runs_table([run]) - live.update(table) + live.update( + table, + *_get_apply_wait_renderables(run), + status=_get_apply_status(run), + ) time.sleep(5) run.refresh() @@ -793,7 +802,7 @@ def _detect_windsurf_version(exe: str = "windsurf") -> Optional[str]: def _print_service_urls(run: Run) -> None: if run._run.run_spec.configuration.type != RunConfigurationType.SERVICE.value: return - console.print(f"Service is published at:\n [link={run.service_url}]{run.service_url}[/]") + console.print(_get_service_url_renderable(run)) if model := run.service_model: console.print( f"Model [code]{model.name}[/] is published at:\n [link={model.url}]{model.url}[/]" @@ -801,6 +810,30 @@ def _print_service_urls(run: Run) -> None: console.print() +def _get_apply_status(run: Run) -> str: + wait_status = get_run_wait_status(run._run) + if wait_status is None: + return f"Launching [code]{run.name}[/]..." + return f"[code]{run.name}[/] is {wait_status.value}..." + + +def _get_apply_wait_renderables(run: Run) -> list[str]: + wait_status = get_run_wait_status(run._run) + if wait_status is RunWaitStatus.WAITING_FOR_REQUESTS and run._run.service is not None: + return [_get_service_url_renderable(run)] + if ( + wait_status is RunWaitStatus.WAITING_FOR_SCHEDULE + and run._run.next_triggered_at is not None + ): + next_run = run._run.next_triggered_at.astimezone().strftime("%Y-%m-%d %H:%M %Z") + return [f"Next run: {next_run}"] + return [] + + +def _get_service_url_renderable(run: Run) -> str: + return f"Service is published at:\n [link={run.service_url}]{run.service_url}[/]" + + def _print_dev_environment_connection_info(run: Run) -> None: if not FeatureFlags.CLI_PRINT_JOB_CONNECTION_INFO: return diff --git a/src/dstack/_internal/cli/utils/rich.py b/src/dstack/_internal/cli/utils/rich.py index fb2895ab3..1a8905920 100644 --- a/src/dstack/_internal/cli/utils/rich.py +++ b/src/dstack/_internal/cli/utils/rich.py @@ -140,7 +140,11 @@ def __init__(self, status: "RenderableType", *, console: Optional["Console"] = N transient=True, ) - def update(self, *renderables: "RenderableType") -> None: + def update( + self, *renderables: "RenderableType", status: Optional["RenderableType"] = None + ) -> None: + if status is not None: + self._spinner.update(text=status) self._live.update(renderable=Group(self._spinner, *renderables)) def __enter__(self) -> "MultiItemStatus": diff --git a/src/dstack/_internal/cli/utils/run.py b/src/dstack/_internal/cli/utils/run.py index 2e76c5569..35f9c8848 100644 --- a/src/dstack/_internal/cli/utils/run.py +++ b/src/dstack/_internal/cli/utils/run.py @@ -1,4 +1,5 @@ import shutil +from enum import Enum from typing import Any, Dict, List, Optional from rich.markup import escape @@ -49,6 +50,11 @@ from dstack.api import Run +class RunWaitStatus(str, Enum): + WAITING_FOR_REQUESTS = "waiting for requests" + WAITING_FOR_SCHEDULE = "waiting for schedule" + + def print_offers_json(run_plan: RunPlan, run_spec): """Print offers information in JSON format.""" job_plan = run_plan.job_plans[0] @@ -200,6 +206,40 @@ def th(s: str) -> str: console.print(NO_FLEETS_WARNING if no_fleets else NO_OFFERS_WARNING) +def get_run_wait_status(run: CoreRun) -> Optional[RunWaitStatus]: + # Only synthesize a CLI-specific waiting state when the server did not provide + # a more specific run-level message such as "retrying". + if run.status_message not in ("", run.status.value): + return None + + if run.status == RunStatus.PENDING and run.next_triggered_at is not None: + return RunWaitStatus.WAITING_FOR_SCHEDULE + + if _is_waiting_for_requests(run): + return RunWaitStatus.WAITING_FOR_REQUESTS + + return None + + +def _is_waiting_for_requests(run: CoreRun) -> bool: + if run.run_spec.configuration.type != "service": + return False + if run.service is None or run.next_triggered_at is not None: + return False + if run.status not in (RunStatus.SUBMITTED, RunStatus.PENDING): + return False + return not any(_is_job_active(job.job_submissions[-1].status) for job in run.jobs) + + +def _is_job_active(status: JobStatus) -> bool: + return status in ( + JobStatus.SUBMITTED, + JobStatus.PROVISIONING, + JobStatus.PULLING, + JobStatus.RUNNING, + ) + + def _format_run_status(run) -> str: status_text = ( run.latest_job_submission.status_message