From 333bc18d92ee9ba55ebcbda05ca55b6ab807e28a Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Mon, 2 Dec 2024 19:00:41 -0800 Subject: [PATCH 01/10] [Tests] Move tests to uv to speed up the dependency installation by >10x (#4424) * correct cache for pypi * Add doc cache and test cache * Add examples folder * fix policy path * use uv for pylint * Fix azure cli * disable cache * use venv * set venv * source instead * rename doc build * Move to uv * Fix azure cli * Add -e * Update .github/workflows/format.yml Co-authored-by: Christopher Cooper * Update .github/workflows/mypy.yml Co-authored-by: Christopher Cooper * Update .github/workflows/pylint.yml Co-authored-by: Christopher Cooper * Update .github/workflows/pytest.yml Co-authored-by: Christopher Cooper * Update .github/workflows/test-doc-build.yml Co-authored-by: Christopher Cooper * fix pytest yml * Add merge group --------- Co-authored-by: Christopher Cooper --- .github/workflows/format.yml | 20 +++++++++++------ .github/workflows/mypy-generic.yml | 22 ------------------- .github/workflows/mypy.yml | 15 ++++++++----- .github/workflows/pylint.yml | 16 ++++++++------ .github/workflows/pytest.yml | 31 +++++++++++---------------- .github/workflows/test-doc-build.yml | 16 ++++++++------ tests/unit_tests/test_admin_policy.py | 4 ++++ 7 files changed, 60 insertions(+), 64 deletions(-) delete mode 100644 .github/workflows/mypy-generic.yml diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index a19bdcd020d..d9664287a41 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -21,29 +21,35 @@ jobs: python-version: ["3.8"] steps: - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + - name: Install the latest version of uv + uses: astral-sh/setup-uv@v4 with: + version: "latest" python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install yapf==0.32.0 - pip install toml==0.10.2 - pip install black==22.10.0 - pip install isort==5.12.0 + uv venv --seed ~/test-env + source ~/test-env/bin/activate + uv pip install yapf==0.32.0 + uv pip install toml==0.10.2 + uv pip install black==22.10.0 + uv pip install isort==5.12.0 - name: Running yapf run: | + source ~/test-env/bin/activate yapf --diff --recursive ./ --exclude 'sky/skylet/ray_patches/**' \ --exclude 'sky/skylet/providers/ibm/**' - name: Running black run: | + source ~/test-env/bin/activate black --diff --check sky/skylet/providers/ibm/ - name: Running isort for black formatted files run: | + source ~/test-env/bin/activate isort --diff --check --profile black -l 88 -m 3 \ sky/skylet/providers/ibm/ - name: Running isort for yapf formatted files run: | + source ~/test-env/bin/activate isort --diff --check ./ --sg 'sky/skylet/ray_patches/**' \ --sg 'sky/skylet/providers/ibm/**' diff --git a/.github/workflows/mypy-generic.yml b/.github/workflows/mypy-generic.yml deleted file mode 100644 index c28ffad9bb7..00000000000 --- a/.github/workflows/mypy-generic.yml +++ /dev/null @@ -1,22 +0,0 @@ -# This is needed for GitHub Actions for the "Waiting for status to be reported" problem, -# according to https://docs.github.com/en/repositories/configuring-branches-and-merges-in-your-repository/defining-the-mergeability-of-pull-requests/troubleshooting-required-status-checks -name: mypy - -on: - # Trigger the workflow on push or pull request, - # but only for the main branch - push: - branches: - - master - - 'releases/**' - pull_request: - branches: - - master - - 'releases/**' - merge_group: - -jobs: - mypy: - runs-on: ubuntu-latest - steps: - - run: 'echo "No mypy to run"' diff --git a/.github/workflows/mypy.yml b/.github/workflows/mypy.yml index d59e90a9e99..6df98401fcb 100644 --- a/.github/workflows/mypy.yml +++ b/.github/workflows/mypy.yml @@ -11,6 +11,8 @@ on: branches: - master - 'releases/**' + merge_group: + jobs: mypy: runs-on: ubuntu-latest @@ -19,15 +21,18 @@ jobs: python-version: ["3.8"] steps: - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + - name: Install the latest version of uv + uses: astral-sh/setup-uv@v4 with: + version: "latest" python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install mypy==$(grep mypy requirements-dev.txt | cut -d'=' -f3) - pip install $(grep types- requirements-dev.txt | tr '\n' ' ') + uv venv --seed ~/test-env + source ~/test-env/bin/activate + uv pip install mypy==$(grep mypy requirements-dev.txt | cut -d'=' -f3) + uv pip install $(grep types- requirements-dev.txt | tr '\n' ' ') - name: Running mypy run: | + source ~/test-env/bin/activate mypy $(cat tests/mypy_files.txt) diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml index 0555fb934d0..f5cf40a31ad 100644 --- a/.github/workflows/pylint.yml +++ b/.github/workflows/pylint.yml @@ -21,16 +21,20 @@ jobs: python-version: ["3.8"] steps: - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + - name: Install the latest version of uv + uses: astral-sh/setup-uv@v4 with: + version: "latest" python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install ".[all]" - pip install pylint==2.14.5 - pip install pylint-quotes==0.2.3 + uv venv --seed ~/test-env + source ~/test-env/bin/activate + uv pip install --prerelease=allow "azure-cli>=2.65.0" + uv pip install ".[all]" + uv pip install pylint==2.14.5 + uv pip install pylint-quotes==0.2.3 - name: Analysing the code with pylint run: | + source ~/test-env/bin/activate pylint --load-plugins pylint_quotes sky diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 757bfec36d2..bface9232cf 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -35,26 +35,21 @@ jobs: steps: - name: Checkout repository uses: actions/checkout@v3 - - - name: Install Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + - name: Install the latest version of uv + uses: astral-sh/setup-uv@v4 with: + version: "latest" python-version: ${{ matrix.python-version }} - - - name: Cache dependencies - uses: actions/cache@v3 - if: startsWith(runner.os, 'Linux') - with: - path: ~/.cache/pip - key: ${{ runner.os }}-pip-pytest-${{ matrix.python-version }} - restore-keys: | - ${{ runner.os }}-pip-pytest-${{ matrix.python-version }} - - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install -e ".[all]" - pip install pytest pytest-xdist pytest-env>=0.6 memory-profiler==0.61.0 - + uv venv --seed ~/test-env + source ~/test-env/bin/activate + uv pip install --prerelease=allow "azure-cli>=2.65.0" + # Use -e to include examples and tests folder in the path for unit + # tests to access them. + uv pip install -e ".[all]" + uv pip install pytest pytest-xdist pytest-env>=0.6 memory-profiler==0.61.0 - name: Run tests with pytest - run: SKYPILOT_DISABLE_USAGE_COLLECTION=1 SKYPILOT_SKIP_CLOUD_IDENTITY_CHECK=1 pytest -n 0 --dist no ${{ matrix.test-path }} + run: | + source ~/test-env/bin/activate + SKYPILOT_DISABLE_USAGE_COLLECTION=1 SKYPILOT_SKIP_CLOUD_IDENTITY_CHECK=1 pytest -n 0 --dist no ${{ matrix.test-path }} diff --git a/.github/workflows/test-doc-build.yml b/.github/workflows/test-doc-build.yml index 706aa071706..4a55e4fef89 100644 --- a/.github/workflows/test-doc-build.yml +++ b/.github/workflows/test-doc-build.yml @@ -14,24 +14,28 @@ on: merge_group: jobs: - format: + doc-build: runs-on: ubuntu-latest strategy: matrix: python-version: ["3.10"] steps: - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v4 + - name: Install the latest version of uv + uses: astral-sh/setup-uv@v4 with: + version: "latest" python-version: ${{ matrix.python-version }} - name: Install dependencies run: | - python -m pip install --upgrade pip - pip install . + uv venv --seed ~/test-env + source ~/test-env/bin/activate + uv pip install --prerelease=allow "azure-cli>=2.65.0" + uv pip install ".[all]" cd docs - pip install -r ./requirements-docs.txt + uv pip install -r ./requirements-docs.txt - name: Build documentation run: | + source ~/test-env/bin/activate cd ./docs ./build.sh diff --git a/tests/unit_tests/test_admin_policy.py b/tests/unit_tests/test_admin_policy.py index be40cc55723..c9e7ad35af2 100644 --- a/tests/unit_tests/test_admin_policy.py +++ b/tests/unit_tests/test_admin_policy.py @@ -16,6 +16,10 @@ POLICY_PATH = os.path.join(os.path.dirname(os.path.dirname(sky.__file__)), 'examples', 'admin_policy') +if not os.path.exists(POLICY_PATH): + # This is used for GitHub Actions, as we copy the examples to the package. + POLICY_PATH = os.path.join(os.path.dirname(__file__), 'examples', + 'admin_policy') @pytest.fixture From 5c1cb9cf919ade8991915d4c81a943243e801296 Mon Sep 17 00:00:00 2001 From: zpoint Date: Tue, 3 Dec 2024 14:08:43 +0800 Subject: [PATCH 02/10] update readme for test kubernetes example (#4426) * update readme * fetch version from gcloud * rename var to GKE_VERSION * subnetwork also use REGION --- tests/kubernetes/README.md | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tests/kubernetes/README.md b/tests/kubernetes/README.md index 7c5ed7586ff..e15f593e006 100644 --- a/tests/kubernetes/README.md +++ b/tests/kubernetes/README.md @@ -1,10 +1,10 @@ # SkyPilot Kubernetes Development Scripts -This directory contains useful scripts and notes for developing SkyPilot on Kubernetes. +This directory contains useful scripts and notes for developing SkyPilot on Kubernetes. ## Building and pushing SkyPilot image -We maintain a container image that has all basic SkyPilot dependencies installed. +We maintain a container image that has all basic SkyPilot dependencies installed. This image is hosted at `us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:latest`. To build this image locally and optionally push to the SkyPilot registry, run: @@ -18,10 +18,10 @@ To build this image locally and optionally push to the SkyPilot registry, run: ``` ## Running a local development cluster -We use (kind)[https://kind.sigs.k8s.io/] to run a local Kubernetes cluster +We use (kind)[https://kind.sigs.k8s.io/] to run a local Kubernetes cluster for development. To create a local development cluster, run: -```bash +```bash sky local up ``` @@ -50,7 +50,13 @@ curl --header "Content-Type: application/json-patch+json" \ ```bash PROJECT_ID=$(gcloud config get-value project) CLUSTER_NAME=testclusterromil - gcloud beta container --project "${PROJECT_ID}" clusters create "${CLUSTER_NAME}" --zone "us-central1-c" --no-enable-basic-auth --cluster-version "1.29.1-gke.1589020" --release-channel "regular" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-t4,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --logging=SYSTEM,WORKLOAD --monitoring=SYSTEM --enable-ip-alias --network "projects/${PROJECT_ID}/global/networks/default" --subnetwork "projects/${PROJECT_ID}/regions/us-central1/subnetworks/default" --no-enable-intra-node-visibility --default-max-pods-per-node "110" --security-posture=standard --workload-vulnerability-scanning=disabled --no-enable-master-authorized-networks --addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --enable-managed-prometheus --enable-shielded-nodes --node-locations "us-central1-c" && gcloud beta container --project "${PROJECT_ID}" node-pools create "v100" --cluster "${CLUSTER_NAME}" --zone "us-central1-c" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-v100,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "us-central1-c" && gcloud beta container --project "${PROJECT_ID}" node-pools create "largecpu" --cluster "${CLUSTER_NAME}" --zone "us-central1-c" --machine-type "n1-standard-16" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "us-central1-c" && gcloud beta container --project "${PROJECT_ID}" node-pools create "l4" --cluster "${CLUSTER_NAME}" --zone "us-central1-c" --machine-type "g2-standard-4" --accelerator "type=nvidia-l4,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "us-central1-c" + REGION=us-central1-c + GKE_VERSION=$(gcloud container get-server-config \ + --region=${REGION} \ + --flatten=channels \ + --filter="channels.channel=REGULAR" \ + --format="value(channels.defaultVersion)") + gcloud beta container --project "${PROJECT_ID}" clusters create "${CLUSTER_NAME}" --zone "${REGION}" --no-enable-basic-auth --cluster-version "${GKE_VERSION}" --release-channel "regular" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-t4,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --logging=SYSTEM,WORKLOAD --monitoring=SYSTEM --enable-ip-alias --network "projects/${PROJECT_ID}/global/networks/default" --subnetwork "projects/${PROJECT_ID}/regions/${REGION%-*}/subnetworks/default" --no-enable-intra-node-visibility --default-max-pods-per-node "110" --security-posture=standard --workload-vulnerability-scanning=disabled --no-enable-master-authorized-networks --addons HorizontalPodAutoscaling,HttpLoadBalancing,GcePersistentDiskCsiDriver --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --enable-managed-prometheus --enable-shielded-nodes --node-locations "${REGION}" && gcloud beta container --project "${PROJECT_ID}" node-pools create "v100" --cluster "${CLUSTER_NAME}" --zone "${REGION}" --machine-type "n1-standard-8" --accelerator "type=nvidia-tesla-v100,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "${REGION}" && gcloud beta container --project "${PROJECT_ID}" node-pools create "largecpu" --cluster "${CLUSTER_NAME}" --zone "${REGION}" --machine-type "n1-standard-16" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "${REGION}" && gcloud beta container --project "${PROJECT_ID}" node-pools create "l4" --cluster "${CLUSTER_NAME}" --zone "${REGION}" --machine-type "g2-standard-4" --accelerator "type=nvidia-l4,count=1" --image-type "COS_CONTAINERD" --disk-type "pd-balanced" --disk-size "100" --metadata disable-legacy-endpoints=true --scopes "https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" --num-nodes "2" --enable-autoupgrade --enable-autorepair --max-surge-upgrade 1 --max-unavailable-upgrade 0 --node-locations "${REGION}" ``` 2. Get the kubeconfig for your cluster and place it in `~/.kube/config`: ```bash @@ -65,7 +71,7 @@ curl --header "Content-Type: application/json-patch+json" \ kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded.yaml kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded-latest.yaml - + # If using Ubuntu based nodes: kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/ubuntu/daemonset-preloaded.yaml @@ -123,6 +129,6 @@ NOTE - If are using nodeport networking, make sure port 32100 is open in your no NOTE - If are using nodeport networking, make sure port 32100 is open in your EKS cluster's default security group. ## Other useful scripts -`scripts` directory contains other useful scripts for development, including -Kubernetes dashboard, ray yaml for testing the SkyPilot Kubernetes node provider +`scripts` directory contains other useful scripts for development, including +Kubernetes dashboard, ray yaml for testing the SkyPilot Kubernetes node provider and more. From 747382a7cf75c691c91846efe708ce25b2a3aeb8 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Tue, 3 Dec 2024 15:11:55 +0530 Subject: [PATCH 03/10] [k8s] Fix `show-gpus` availability map when nvidia drivers are not installed (#4429) * Fix availability map * Fix availability map --- sky/clouds/service_catalog/kubernetes_catalog.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sky/clouds/service_catalog/kubernetes_catalog.py b/sky/clouds/service_catalog/kubernetes_catalog.py index 1d0c97c0442..2c7eafc20e5 100644 --- a/sky/clouds/service_catalog/kubernetes_catalog.py +++ b/sky/clouds/service_catalog/kubernetes_catalog.py @@ -239,13 +239,12 @@ def _list_accelerators( accelerators_available = accelerator_count - allocated_qty - if accelerator_name not in total_accelerators_available: - total_accelerators_available[accelerator_name] = 0 if accelerators_available >= min_quantity_filter: quantized_availability = min_quantity_filter * ( accelerators_available // min_quantity_filter) - total_accelerators_available[ - accelerator_name] += quantized_availability + total_accelerators_available[accelerator_name] = ( + total_accelerators_available.get(accelerator_name, 0) + + quantized_availability) result = [] From 6f96e7a451714dd9180156df393b2cf8a10e5c1f Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Tue, 3 Dec 2024 11:01:10 -0800 Subject: [PATCH 04/10] avoid catching ValueError during failover (#4432) * avoid catching ValueError during failover If the cloud api raises ValueError or a subclass of ValueError during instance termination, we will assume the cluster was downed. Fix this by introducing a new exception ClusterDoesNotExist that we can catch instead of the more general ValueError. * add unit test * lint --- sky/backends/backend_utils.py | 9 ++-- sky/core.py | 43 +++++++++++-------- sky/exceptions.py | 7 ++++ sky/execution.py | 5 ++- sky/jobs/recovery_strategy.py | 3 +- tests/unit_tests/test_recovery_strategy.py | 48 ++++++++++++++++++++++ 6 files changed, 90 insertions(+), 25 deletions(-) create mode 100644 tests/unit_tests/test_recovery_strategy.py diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index a116681da1b..9c56546234a 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1612,14 +1612,14 @@ def check_can_clone_disk_and_override_task( The task to use and the resource handle of the source cluster. Raises: - ValueError: If the source cluster does not exist. + exceptions.ClusterDoesNotExist: If the source cluster does not exist. exceptions.NotSupportedError: If the source cluster is not valid or the task is not compatible to clone disk from the source cluster. """ source_cluster_status, handle = refresh_cluster_status_handle(cluster_name) if source_cluster_status is None: with ux_utils.print_exception_no_traceback(): - raise ValueError( + raise exceptions.ClusterDoesNotExist( f'Cannot find cluster {cluster_name!r} to clone disk from.') if not isinstance(handle, backends.CloudVmRayResourceHandle): @@ -2136,7 +2136,7 @@ def check_cluster_available( """Check if the cluster is available. Raises: - ValueError: if the cluster does not exist. + exceptions.ClusterDoesNotExist: if the cluster does not exist. exceptions.ClusterNotUpError: if the cluster is not UP. exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -2201,7 +2201,8 @@ def check_cluster_available( error_msg += message with ux_utils.print_exception_no_traceback(): - raise ValueError(f'{colorama.Fore.YELLOW}{error_msg}{reset}') + raise exceptions.ClusterDoesNotExist( + f'{colorama.Fore.YELLOW}{error_msg}{reset}') assert cluster_status is not None, 'handle is not None but status is None' backend = get_backend_from_handle(handle) if check_cloud_vm_ray_backend and not isinstance( diff --git a/sky/core.py b/sky/core.py index 4bb12f4a21a..9f1288d7fb6 100644 --- a/sky/core.py +++ b/sky/core.py @@ -268,7 +268,8 @@ def _start( cluster_status, handle = backend_utils.refresh_cluster_status_handle( cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') if not force and cluster_status == status_lib.ClusterStatus.UP: sky_logging.print(f'Cluster {cluster_name!r} is already up.') return handle @@ -359,12 +360,13 @@ def start( Useful for upgrading SkyPilot runtime. Raises: - ValueError: argument values are invalid: (1) the specified cluster does - not exist; (2) if ``down`` is set to True but - ``idle_minutes_to_autostop`` is None; (3) if the specified cluster is - the managed jobs controller, and either ``idle_minutes_to_autostop`` - is not None or ``down`` is True (omit them to use the default - autostop settings). + ValueError: argument values are invalid: (1) if ``down`` is set to True + but ``idle_minutes_to_autostop`` is None; (2) if the specified + cluster is the managed jobs controller, and either + ``idle_minutes_to_autostop`` is not None or ``down`` is True (omit + them to use the default autostop settings). + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. sky.exceptions.NotSupportedError: if the cluster to restart was launched using a non-default backend that does not support this operation. @@ -412,7 +414,8 @@ def stop(cluster_name: str, purge: bool = False) -> None: related resources. Raises: - ValueError: the specified cluster does not exist. + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. RuntimeError: failed to stop the cluster. sky.exceptions.NotSupportedError: if the specified cluster is a spot cluster, or a TPU VM Pod cluster, or the managed jobs controller. @@ -423,7 +426,8 @@ def stop(cluster_name: str, purge: bool = False) -> None: f'is not supported.') handle = global_user_state.get_handle_from_cluster_name(cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') backend = backend_utils.get_backend_from_handle(handle) @@ -467,14 +471,16 @@ def down(cluster_name: str, purge: bool = False) -> None: resources. Raises: - ValueError: the specified cluster does not exist. + sky.exceptions.ClusterDoesNotExist: the specified cluster does not + exist. RuntimeError: failed to tear down the cluster. sky.exceptions.NotSupportedError: the specified cluster is the managed jobs controller. """ handle = global_user_state.get_handle_from_cluster_name(cluster_name) if handle is None: - raise ValueError(f'Cluster {cluster_name!r} does not exist.') + raise exceptions.ClusterDoesNotExist( + f'Cluster {cluster_name!r} does not exist.') usage_lib.record_cluster_name_for_current_operation(cluster_name) backend = backend_utils.get_backend_from_handle(handle) @@ -521,7 +527,7 @@ def autostop( rather than autostop (restartable). Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend or the cluster is TPU VM Pod. @@ -615,7 +621,7 @@ def queue(cluster_name: str, } ] raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -674,7 +680,8 @@ def cancel( worker node is preempted in the spot cluster. Raises: - ValueError: if arguments are invalid, or the cluster does not exist. + ValueError: if arguments are invalid. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the specified cluster is a controller that does not support this operation. @@ -750,8 +757,8 @@ def tail_logs(cluster_name: str, Please refer to the sky.cli.tail_logs for the document. Raises: - ValueError: arguments are invalid or the cluster is not supported or - the cluster does not exist. + ValueError: if arguments are invalid or the cluster is not supported. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -793,7 +800,7 @@ def download_logs( Returns: Dict[str, str]: a mapping of job_id to local log path. Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. @@ -838,7 +845,7 @@ def job_status(cluster_name: str, If job_ids is None and there is no job on the cluster, it will return {None: None}. Raises: - ValueError: if the cluster does not exist. + sky.exceptions.ClusterDoesNotExist: if the cluster does not exist. sky.exceptions.ClusterNotUpError: if the cluster is not UP. sky.exceptions.NotSupportedError: if the cluster is not based on CloudVmRayBackend. diff --git a/sky/exceptions.py b/sky/exceptions.py index c1ade2eb02a..40d2b4d867b 100644 --- a/sky/exceptions.py +++ b/sky/exceptions.py @@ -132,6 +132,13 @@ class ClusterSetUpError(Exception): pass +class ClusterDoesNotExist(ValueError): + """Raise when trying to operate on a cluster that does not exist.""" + # This extends ValueError for compatibility reasons - we used to throw + # ValueError instead of this. + pass + + class NotSupportedError(Exception): """Raised when a feature is not supported.""" pass diff --git a/sky/execution.py b/sky/execution.py index 963e0356753..103dcf5ee83 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -581,8 +581,9 @@ def exec( # pylint: disable=redefined-builtin submitted. Raises: - ValueError: if the specified cluster does not exist or is not in UP - status. + ValueError: if the specified cluster is not in UP status. + sky.exceptions.ClusterDoesNotExist: if the specified cluster does not + exist. sky.exceptions.NotSupportedError: if the specified cluster is a controller that does not support this operation. diff --git a/sky/jobs/recovery_strategy.py b/sky/jobs/recovery_strategy.py index 09e4bd8ed6e..4fda1a07e08 100644 --- a/sky/jobs/recovery_strategy.py +++ b/sky/jobs/recovery_strategy.py @@ -50,8 +50,9 @@ def terminate_cluster(cluster_name: str, max_retry: int = 3) -> None: usage_lib.messages.usage.set_internal() sky.down(cluster_name) return - except ValueError: + except exceptions.ClusterDoesNotExist: # The cluster is already down. + logger.debug(f'The cluster {cluster_name} is already down.') return except Exception as e: # pylint: disable=broad-except retry_cnt += 1 diff --git a/tests/unit_tests/test_recovery_strategy.py b/tests/unit_tests/test_recovery_strategy.py new file mode 100644 index 00000000000..da8e8142da0 --- /dev/null +++ b/tests/unit_tests/test_recovery_strategy.py @@ -0,0 +1,48 @@ +from unittest import mock + +from sky.exceptions import ClusterDoesNotExist +from sky.jobs import recovery_strategy + + +@mock.patch('sky.down') +@mock.patch('sky.usage.usage_lib.messages.usage.set_internal') +def test_terminate_cluster_retry_on_value_error(mock_set_internal, + mock_sky_down) -> None: + # Set up mock to fail twice with ValueError, then succeed + mock_sky_down.side_effect = [ + ValueError('Mock error 1'), + ValueError('Mock error 2'), + None, + ] + + # Call should succeed after retries + recovery_strategy.terminate_cluster('test-cluster') + + # Verify sky.down was called 3 times + assert mock_sky_down.call_count == 3 + mock_sky_down.assert_has_calls([ + mock.call('test-cluster'), + mock.call('test-cluster'), + mock.call('test-cluster'), + ]) + + # Verify usage.set_internal was called before each sky.down + assert mock_set_internal.call_count == 3 + + +@mock.patch('sky.down') +@mock.patch('sky.usage.usage_lib.messages.usage.set_internal') +def test_terminate_cluster_handles_nonexistent_cluster(mock_set_internal, + mock_sky_down) -> None: + # Set up mock to raise ClusterDoesNotExist + mock_sky_down.side_effect = ClusterDoesNotExist('test-cluster') + + # Call should succeed silently + recovery_strategy.terminate_cluster('test-cluster') + + # Verify sky.down was called once + assert mock_sky_down.call_count == 1 + mock_sky_down.assert_called_once_with('test-cluster') + + # Verify usage.set_internal was called once + assert mock_set_internal.call_count == 1 From c3c1fde7172258265951392aa962b310a02c1a78 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 3 Dec 2024 11:48:04 -0800 Subject: [PATCH 05/10] [Core] Execute setup when `--detach-setup` and no `run` section (#4430) * Execute setup when --detach-setup and no run section * Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Tian Xia * add comments * Fix types * format * minor * Add test for detach setup only --------- Co-authored-by: Tian Xia --- sky/__init__.py | 2 + sky/backends/cloud_vm_ray_backend.py | 34 ++++-- tests/test_smoke.py | 150 ++++++++++++++------------ tests/test_yamls/test_only_setup.yaml | 2 + 4 files changed, 112 insertions(+), 76 deletions(-) create mode 100644 tests/test_yamls/test_only_setup.yaml diff --git a/sky/__init__.py b/sky/__init__.py index b851775dabf..4e720d63ce0 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -105,6 +105,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): from sky.data import StoreType from sky.execution import exec # pylint: disable=redefined-builtin from sky.execution import launch +from sky.jobs import ManagedJobStatus # TODO (zhwu): These imports are for backward compatibility, and spot APIs # should be called with `sky.spot.xxx` instead. Remove in release 0.8.0 from sky.jobs.core import spot_cancel @@ -163,6 +164,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): 'StoreType', 'ClusterStatus', 'JobStatus', + 'ManagedJobStatus', # APIs 'Dag', 'Task', diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 5682cf24586..d73b7f54b8d 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -301,6 +301,8 @@ def add_prologue(self, job_id: int) -> None: ) def get_or_fail(futures, pg) -> List[int]: \"\"\"Wait for tasks, if any fails, cancel all unready.\"\"\" + if not futures: + return [] returncodes = [1] * len(futures) # Wait for 1 task to be ready. ready = [] @@ -3460,15 +3462,33 @@ def _execute( Returns: Job id if the task is submitted to the cluster, None otherwise. """ - if task.run is None: + if task.run is None and self._setup_cmd is None: + # This message is fine without mentioning setup, as there are three + # cases when run section is empty: + # 1. setup specified, no --detach-setup: setup is executed and this + # message is fine for saying no run command specified. + # 2. setup specified, with --detach-setup: setup is executed in + # detached mode and this message will not be shown. + # 3. no setup specified: this message is fine as a user is likely + # creating a cluster only, and ok with the empty run command. logger.info('Run commands not specified or empty.') return None - # Check the task resources vs the cluster resources. Since `sky exec` - # will not run the provision and _check_existing_cluster - # We need to check ports here since sky.exec shouldn't change resources - valid_resource = self.check_resources_fit_cluster(handle, - task, - check_ports=True) + if task.run is None: + # If the task has no run command, we still need to execute the + # generated ray driver program to run the setup command in detached + # mode. + # In this case, we reset the resources for the task, so that the + # detached setup does not need to wait for the task resources to be + # ready (which is not used for setup anyway). + valid_resource = sky.Resources() + else: + # Check the task resources vs the cluster resources. Since + # `sky exec` will not run the provision and _check_existing_cluster + # We need to check ports here since sky.exec shouldn't change + # resources. + valid_resource = self.check_resources_fit_cluster(handle, + task, + check_ports=True) task_copy = copy.copy(task) # Handle multiple resources exec case. task_copy.set_resources(valid_resource) diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 574dae21ea0..26f7ba73ef9 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -59,11 +59,8 @@ from sky.data import data_utils from sky.data import storage as storage_lib from sky.data.data_utils import Rclone -from sky.jobs.state import ManagedJobStatus from sky.skylet import constants from sky.skylet import events -from sky.skylet.job_lib import JobStatus -from sky.status_lib import ClusterStatus from sky.utils import common_utils from sky.utils import resources_utils from sky.utils import subprocess_utils @@ -100,10 +97,10 @@ 'echo "Waiting for job to stop RUNNING"; echo "$s"; done') # Cluster functions -_ALL_JOB_STATUSES = "|".join([status.value for status in JobStatus]) -_ALL_CLUSTER_STATUSES = "|".join([status.value for status in ClusterStatus]) +_ALL_JOB_STATUSES = "|".join([status.value for status in sky.JobStatus]) +_ALL_CLUSTER_STATUSES = "|".join([status.value for status in sky.ClusterStatus]) _ALL_MANAGED_JOB_STATUSES = "|".join( - [status.value for status in ManagedJobStatus]) + [status.value for status in sky.ManagedJobStatus]) def _statuses_to_str(statuses: List[enum.Enum]): @@ -135,7 +132,8 @@ def _statuses_to_str(statuses: List[enum.Enum]): def _get_cmd_wait_until_cluster_status_contains( - cluster_name: str, cluster_status: List[ClusterStatus], timeout: int): + cluster_name: str, cluster_status: List[sky.ClusterStatus], + timeout: int): return _WAIT_UNTIL_CLUSTER_STATUS_CONTAINS.format( cluster_name=cluster_name, cluster_status=_statuses_to_str(cluster_status), @@ -143,7 +141,7 @@ def _get_cmd_wait_until_cluster_status_contains( def _get_cmd_wait_until_cluster_status_contains_wildcard( - cluster_name_wildcard: str, cluster_status: List[ClusterStatus], + cluster_name_wildcard: str, cluster_status: List[sky.ClusterStatus], timeout: int): wait_cmd = _WAIT_UNTIL_CLUSTER_STATUS_CONTAINS.replace( 'sky status {cluster_name}', @@ -209,7 +207,7 @@ def _get_cmd_wait_until_cluster_is_not_found(cluster_name: str, timeout: int): def _get_cmd_wait_until_job_status_contains_matching_job_id( - cluster_name: str, job_id: str, job_status: List[JobStatus], + cluster_name: str, job_id: str, job_status: List[sky.JobStatus], timeout: int): return _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_ID.format( cluster_name=cluster_name, @@ -219,7 +217,7 @@ def _get_cmd_wait_until_job_status_contains_matching_job_id( def _get_cmd_wait_until_job_status_contains_without_matching_job( - cluster_name: str, job_status: List[JobStatus], timeout: int): + cluster_name: str, job_status: List[sky.JobStatus], timeout: int): return _WAIT_UNTIL_JOB_STATUS_CONTAINS_WITHOUT_MATCHING_JOB.format( cluster_name=cluster_name, job_status=_statuses_to_str(job_status), @@ -227,7 +225,7 @@ def _get_cmd_wait_until_job_status_contains_without_matching_job( def _get_cmd_wait_until_job_status_contains_matching_job_name( - cluster_name: str, job_name: str, job_status: List[JobStatus], + cluster_name: str, job_name: str, job_status: List[sky.JobStatus], timeout: int): return _WAIT_UNTIL_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( cluster_name=cluster_name, @@ -246,7 +244,7 @@ def _get_cmd_wait_until_job_status_contains_matching_job_name( def _get_cmd_wait_until_managed_job_status_contains_matching_job_name( - job_name: str, job_status: List[JobStatus], timeout: int): + job_name: str, job_status: List[sky.JobStatus], timeout: int): return _WAIT_UNTIL_MANAGED_JOB_STATUS_CONTAINS_MATCHING_JOB_NAME.format( job_name=job_name, job_status=_statuses_to_str(job_status), @@ -573,7 +571,7 @@ def test_launch_fast_with_autostop(generic_cloud: str): # Ensure cluster is stopped _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=autostop_timeout), # Even the cluster is stopped, cloud platform may take a while to # delete the VM. @@ -638,14 +636,15 @@ def test_aws_with_ssh_proxy_command(): # the job controller is not launched with proxy command. _get_cmd_wait_until_cluster_status_contains_wildcard( cluster_name_wildcard='sky-jobs-controller-*', - cluster_status=[ClusterStatus.UP], + cluster_status=[sky.ClusterStatus.UP], timeout=300), f'export SKYPILOT_CONFIG={f.name}; sky jobs launch -n {name} --cpus 2 --cloud aws --region us-east-1 -yd echo hi', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[ - ManagedJobStatus.SUCCEEDED, ManagedJobStatus.RUNNING, - ManagedJobStatus.STARTING + sky.ManagedJobStatus.SUCCEEDED, + sky.ManagedJobStatus.RUNNING, + sky.ManagedJobStatus.STARTING ], timeout=300), ], @@ -1019,7 +1018,7 @@ def test_clone_disk_aws(): f'sky stop {name} -y', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=60), # Wait for EC2 instance to be in stopped state. # TODO: event based wait. @@ -1139,7 +1138,7 @@ def test_custom_default_conda_env(generic_cloud: str): f'sky autostop -y -i 0 {name}', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=80), f'sky start -y {name}', f'sky logs {name} 2 --no-follow | grep -E "myenv\\s+\\*"', @@ -1163,7 +1162,7 @@ def test_stale_job(generic_cloud: str): f'sky stop {name} -y', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=100), f'sky start {name} -y', f'sky logs {name} 1 --status', @@ -1194,7 +1193,7 @@ def test_aws_stale_job_manual_restart(): '--instance-ids $id', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=40), f'sky launch -c {name} -y "echo hi"', f'sky logs {name} 1 --status', @@ -1202,7 +1201,7 @@ def test_aws_stale_job_manual_restart(): # Ensure the skylet updated the stale job status. _get_cmd_wait_until_job_status_contains_without_matching_job( cluster_name=name, - job_status=[JobStatus.FAILED_DRIVER], + job_status=[sky.JobStatus.FAILED_DRIVER], timeout=events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS), ], f'sky down -y {name}', @@ -1235,7 +1234,7 @@ def test_gcp_stale_job_manual_restart(): # Ensure the skylet updated the stale job status. _get_cmd_wait_until_job_status_contains_without_matching_job( cluster_name=name, - job_status=[JobStatus.FAILED_DRIVER], + job_status=[sky.JobStatus.FAILED_DRIVER], timeout=events.JobSchedulerEvent.EVENT_INTERVAL_SECONDS) ], f'sky down -y {name}', @@ -1254,6 +1253,10 @@ def test_env_check(generic_cloud: str): [ f'sky launch -y -c {name} --cloud {generic_cloud} --detach-setup examples/env_check.yaml', f'sky logs {name} 1 --status', # Ensure the job succeeded. + # Test --detach-setup with only setup. + f'sky launch -y -c {name} --detach-setup tests/test_yamls/test_only_setup.yaml', + f'sky logs {name} 2 --status', + f'sky logs {name} 2 | grep "hello world"', ], f'sky down -y {name}', timeout=total_timeout_minutes * 60, @@ -2059,7 +2062,7 @@ def test_multi_echo(generic_cloud: str): _get_cmd_wait_until_job_status_contains_matching_job_id( cluster_name=name, job_id=i + 1, - job_status=[JobStatus.SUCCEEDED], + job_status=[sky.JobStatus.SUCCEEDED], timeout=120) for i in range(32) ] + # Ensure monitor/autoscaler didn't crash on the 'assert not @@ -2635,14 +2638,16 @@ def test_gcp_start_stop(): f'sky stop -y {name}', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=40), f'sky start -y {name} -i 1', f'sky exec {name} examples/gcp_start_stop.yaml', f'sky logs {name} 4 --status', # Ensure the job succeeded. _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED, ClusterStatus.INIT], + cluster_status=[ + sky.ClusterStatus.STOPPED, sky.ClusterStatus.INIT + ], timeout=200), ], f'sky down -y {name}', @@ -2668,7 +2673,9 @@ def test_azure_start_stop(): f'sky logs {name} 3 --status', # Ensure the job succeeded. _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED, ClusterStatus.INIT], + cluster_status=[ + sky.ClusterStatus.STOPPED, sky.ClusterStatus.INIT + ], timeout=280) + f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}', ], @@ -2708,7 +2715,7 @@ def test_autostop(generic_cloud: str): # Ensure the cluster is STOPPED. _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=autostop_timeout), # Ensure the cluster is UP and the autostop setting is reset ('-'). @@ -2727,7 +2734,7 @@ def test_autostop(generic_cloud: str): f's=$(sky status {name} --refresh); echo "$s"; echo; echo; echo "$s" | grep {name} | grep UP', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=autostop_timeout), # Test restarting the idleness timer via exec: @@ -2739,7 +2746,7 @@ def test_autostop(generic_cloud: str): 'sleep 45', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=autostop_timeout + _BUMP_UP_SECONDS), ], f'sky down -y {name}', @@ -2959,7 +2966,7 @@ def test_stop_gcp_spot(): f'sky autostop {name} -i0 -y', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=90), f'sky start {name} -y', f'sky exec {name} -- ls myfile', @@ -2968,7 +2975,7 @@ def test_stop_gcp_spot(): f'sky launch -c {name} -i0 -y', _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=120), ], f'sky down -y {name}', @@ -2992,21 +2999,23 @@ def test_managed_jobs(generic_cloud: str): _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-1', job_status=[ - ManagedJobStatus.PENDING, ManagedJobStatus.SUBMITTED, - ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + sky.ManagedJobStatus.PENDING, + sky.ManagedJobStatus.SUBMITTED, + sky.ManagedJobStatus.STARTING, sky.ManagedJobStatus.RUNNING ], timeout=60), _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', job_status=[ - ManagedJobStatus.PENDING, ManagedJobStatus.SUBMITTED, - ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + sky.ManagedJobStatus.PENDING, + sky.ManagedJobStatus.SUBMITTED, + sky.ManagedJobStatus.STARTING, sky.ManagedJobStatus.RUNNING ], timeout=60), f'sky jobs cancel -y -n {name}-1', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-1', - job_status=[ManagedJobStatus.CANCELLED], + job_status=[sky.ManagedJobStatus.CANCELLED], timeout=230), # Test the functionality for logging. f's=$(sky jobs logs -n {name}-2 --no-follow); echo "$s"; echo "$s" | grep "start counting"', @@ -3080,7 +3089,7 @@ def test_managed_jobs_failed_setup(generic_cloud: str): # Make sure the job failed quickly. _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.FAILED_SETUP], + job_status=[sky.ManagedJobStatus.FAILED_SETUP], timeout=330 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', @@ -3106,7 +3115,7 @@ def test_managed_jobs_pipeline_failed_setup(generic_cloud: str): f'sky jobs launch -n {name} -y -d tests/test_yamls/failed_setup_pipeline.yaml', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.FAILED_SETUP], + job_status=[sky.ManagedJobStatus.FAILED_SETUP], timeout=600), # Make sure the job failed quickly. f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "FAILED_SETUP"', @@ -3143,7 +3152,7 @@ def test_managed_jobs_recovery_aws(aws_config_region): f'sky jobs launch --cloud aws --region {region} --use-spot -n {name} "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=600), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the cluster manually. @@ -3156,7 +3165,7 @@ def test_managed_jobs_recovery_aws(aws_config_region): f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo "$RUN_ID"; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | grep "$RUN_ID"', ], @@ -3187,7 +3196,7 @@ def test_managed_jobs_recovery_gcp(): f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot --cpus 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=300), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the cluster manually. @@ -3196,7 +3205,7 @@ def test_managed_jobs_recovery_gcp(): f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo "$RUN_ID"; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', ], @@ -3222,7 +3231,7 @@ def test_managed_jobs_pipeline_recovery_aws(aws_config_region): f'sky jobs launch -n {name} tests/test_yamls/pipeline_aws.yaml -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids', @@ -3244,7 +3253,7 @@ def test_managed_jobs_pipeline_recovery_aws(aws_config_region): f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids-new', @@ -3277,7 +3286,7 @@ def test_managed_jobs_pipeline_recovery_gcp(): f'sky jobs launch -n {name} tests/test_yamls/pipeline_gcp.yaml -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids', @@ -3291,7 +3300,7 @@ def test_managed_jobs_pipeline_recovery_gcp(): f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=200), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID: | grep "$RUN_ID"', f'RUN_IDS=$(sky jobs logs -n {name} --no-follow | grep -A 4 SKYPILOT_TASK_IDS | cut -d")" -f2); echo "$RUN_IDS" | tee /tmp/{name}-run-ids-new', @@ -3321,7 +3330,8 @@ def test_managed_jobs_recovery_default_resources(generic_cloud: str): _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[ - ManagedJobStatus.RUNNING, ManagedJobStatus.RECOVERING + sky.ManagedJobStatus.RUNNING, + sky.ManagedJobStatus.RECOVERING ], timeout=360), ], @@ -3345,7 +3355,7 @@ def test_managed_jobs_recovery_multi_node_aws(aws_config_region): f'sky jobs launch --cloud aws --region {region} -n {name} --use-spot --num-nodes 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=450), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the worker manually. @@ -3359,7 +3369,7 @@ def test_managed_jobs_recovery_multi_node_aws(aws_config_region): f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=560), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2 | grep "$RUN_ID"', ], @@ -3390,7 +3400,7 @@ def test_managed_jobs_recovery_multi_node_gcp(): f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot --num-nodes 2 "echo SKYPILOT_TASK_ID: \$SKYPILOT_TASK_ID; sleep 1800" -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=400), f'RUN_ID=$(sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2); echo "$RUN_ID" | tee /tmp/{name}-run-id', # Terminate the worker manually. @@ -3399,7 +3409,7 @@ def test_managed_jobs_recovery_multi_node_gcp(): f'{_GET_JOB_QUEUE} | grep {name} | head -n1 | grep "RECOVERING"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=560), f'RUN_ID=$(cat /tmp/{name}-run-id); echo $RUN_ID; sky jobs logs -n {name} --no-follow | grep SKYPILOT_TASK_ID | cut -d: -f2 | grep "$RUN_ID"', ], @@ -3428,13 +3438,13 @@ def test_managed_jobs_cancellation_aws(aws_config_region): _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[ - ManagedJobStatus.STARTING, ManagedJobStatus.RUNNING + sky.ManagedJobStatus.STARTING, sky.ManagedJobStatus.RUNNING ], timeout=60 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.CANCELLED], + job_status=[sky.ManagedJobStatus.CANCELLED], timeout=120 + _BUMP_UP_SECONDS), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_on_cloud}-* ' @@ -3446,12 +3456,12 @@ def test_managed_jobs_cancellation_aws(aws_config_region): # The job is set up in the cluster, will shown as RUNNING. _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=300 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}-2', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', - job_status=[ManagedJobStatus.CANCELLED], + job_status=[sky.ManagedJobStatus.CANCELLED], timeout=120 + _BUMP_UP_SECONDS), (f's=$(aws ec2 describe-instances --region {region} ' f'--filters Name=tag:ray-cluster-name,Values={name_2_on_cloud}-* ' @@ -3463,7 +3473,7 @@ def test_managed_jobs_cancellation_aws(aws_config_region): # The job is running in the cluster, will shown as RUNNING. _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=300 + _BUMP_UP_SECONDS), # Terminate the cluster manually. (f'aws ec2 terminate-instances --region {region} --instance-ids $(' @@ -3476,7 +3486,7 @@ def test_managed_jobs_cancellation_aws(aws_config_region): f'sky jobs cancel -y -n {name}-3', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', - job_status=[ManagedJobStatus.CANCELLED], + job_status=[sky.ManagedJobStatus.CANCELLED], timeout=120 + _BUMP_UP_SECONDS), # The cluster should be terminated (shutting-down) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. @@ -3514,30 +3524,30 @@ def test_managed_jobs_cancellation_gcp(): f'sky jobs launch --cloud gcp --zone {zone} -n {name} --use-spot "sleep 1000" -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.STARTING], + job_status=[sky.ManagedJobStatus.STARTING], timeout=60 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.CANCELLED], + job_status=[sky.ManagedJobStatus.CANCELLED], timeout=120 + _BUMP_UP_SECONDS), # Test cancelling the spot cluster during spot job being setup. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-2 --use-spot tests/test_yamls/test_long_setup.yaml -y -d', # The job is set up in the cluster, will shown as RUNNING. _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=300 + _BUMP_UP_SECONDS), f'sky jobs cancel -y -n {name}-2', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-2', - job_status=[ManagedJobStatus.CANCELLED], + job_status=[sky.ManagedJobStatus.CANCELLED], timeout=120 + _BUMP_UP_SECONDS), # Test cancellation during spot job is recovering. f'sky jobs launch --cloud gcp --zone {zone} -n {name}-3 --use-spot "sleep 1000" -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', - job_status=[ManagedJobStatus.RUNNING], + job_status=[sky.ManagedJobStatus.RUNNING], timeout=300 + _BUMP_UP_SECONDS), # Terminate the cluster manually. terminate_cmd, @@ -3546,7 +3556,7 @@ def test_managed_jobs_cancellation_gcp(): f'sky jobs cancel -y -n {name}-3', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=f'{name}-3', - job_status=[ManagedJobStatus.CANCELLED], + job_status=[sky.ManagedJobStatus.CANCELLED], timeout=120 + _BUMP_UP_SECONDS), # The cluster should be terminated (STOPPING) after cancellation. We don't use the `=` operator here because # there can be multiple VM with the same name due to the recovery. @@ -3639,7 +3649,7 @@ def test_managed_jobs_storage(generic_cloud: str): region_validation_cmd, # Check if the bucket is created in the correct region _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.SUCCEEDED], + job_status=[sky.ManagedJobStatus.SUCCEEDED], timeout=60 + _BUMP_UP_SECONDS), f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]', # Check if file was written to the mounted output bucket @@ -3666,13 +3676,13 @@ def test_managed_jobs_tpu(): f'sky jobs launch -n {name} --use-spot examples/tpu/tpuvm_mnist.yaml -y -d', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.STARTING], + job_status=[sky.ManagedJobStatus.STARTING], timeout=60 + _BUMP_UP_SECONDS), # TPU takes a while to launch _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[ - ManagedJobStatus.RUNNING, ManagedJobStatus.SUCCEEDED + sky.ManagedJobStatus.RUNNING, sky.ManagedJobStatus.SUCCEEDED ], timeout=900 + _BUMP_UP_SECONDS), ], @@ -3694,7 +3704,7 @@ def test_managed_jobs_inline_env(generic_cloud: str): f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, - job_status=[ManagedJobStatus.SUCCEEDED], + job_status=[sky.ManagedJobStatus.SUCCEEDED], timeout=20 + _BUMP_UP_SECONDS), ], f'sky jobs cancel -y -n {name}', @@ -3804,7 +3814,9 @@ def test_azure_start_stop_two_nodes(): f'sky logs {name} 2 --status', # Ensure the job succeeded. _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.INIT, ClusterStatus.STOPPED], + cluster_status=[ + sky.ClusterStatus.INIT, sky.ClusterStatus.STOPPED + ], timeout=200 + _BUMP_UP_SECONDS) + f'|| {{ ssh {name} "cat ~/.sky/skylet.log"; exit 1; }}' ], @@ -4818,7 +4830,7 @@ def test_core_api_sky_launch_fast(generic_cloud: str): # Sleep to let the cluster autostop _get_cmd_wait_until_cluster_status_contains( cluster_name=name, - cluster_status=[ClusterStatus.STOPPED], + cluster_status=[sky.ClusterStatus.STOPPED], timeout=120) # Run it again - should work with fast=True sky.launch(task, diff --git a/tests/test_yamls/test_only_setup.yaml b/tests/test_yamls/test_only_setup.yaml new file mode 100644 index 00000000000..245d2b1de69 --- /dev/null +++ b/tests/test_yamls/test_only_setup.yaml @@ -0,0 +1,2 @@ +setup: | + echo "hello world" From 2157f01b9afc6578deffdddbce36688a9c92d0f3 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Tue, 3 Dec 2024 13:44:32 -0800 Subject: [PATCH 06/10] [Jobs] Allow logs for finished jobs and add `sky jobs logs --refresh` for restartin jobs controller (#4380) * Stream logs for finished jobs * Allow stream logs for finished jobs * Read files after the indicator lines * Add refresh for `sky jobs logs` * fix log message * address comments * Add smoke test * fix smoke * fix jobs queue smoke test * fix storage --- sky/cli.py | 13 ++++- sky/jobs/controller.py | 36 ++++++++++--- sky/jobs/core.py | 96 ++++++++++++++++++++++------------- sky/jobs/state.py | 34 ++++++++++++- sky/jobs/utils.py | 18 ++++++- sky/skylet/log_lib.py | 4 +- sky/skylet/log_lib.pyi | 3 ++ sky/utils/controller_utils.py | 13 ++++- tests/test_smoke.py | 12 ++++- 9 files changed, 177 insertions(+), 52 deletions(-) diff --git a/sky/cli.py b/sky/cli.py index 94474b30b6c..1faf0003ff9 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3914,16 +3914,25 @@ def jobs_cancel(name: Optional[str], job_ids: Tuple[int], all: bool, yes: bool): default=False, help=('Show the controller logs of this job; useful for debugging ' 'launching/recoveries, etc.')) +@click.option( + '--refresh', + '-r', + default=False, + is_flag=True, + required=False, + help='Query the latest job logs, restarting the jobs controller if stopped.' +) @click.argument('job_id', required=False, type=int) @usage_lib.entrypoint def jobs_logs(name: Optional[str], job_id: Optional[int], follow: bool, - controller: bool): + controller: bool, refresh: bool): """Tail the log of a managed job.""" try: managed_jobs.tail_logs(name=name, job_id=job_id, follow=follow, - controller=controller) + controller=controller, + refresh=refresh) except exceptions.ClusterNotUpError: with ux_utils.print_exception_no_traceback(): raise diff --git a/sky/jobs/controller.py b/sky/jobs/controller.py index 5219c564500..72dce3e50d7 100644 --- a/sky/jobs/controller.py +++ b/sky/jobs/controller.py @@ -6,7 +6,7 @@ import time import traceback import typing -from typing import Tuple +from typing import Optional, Tuple import filelock @@ -87,18 +87,28 @@ def __init__(self, job_id: int, dag_yaml: str, task.update_envs(task_envs) def _download_log_and_stream( - self, - handle: cloud_vm_ray_backend.CloudVmRayResourceHandle) -> None: - """Downloads and streams the logs of the latest job. + self, task_id: Optional[int], + handle: Optional[cloud_vm_ray_backend.CloudVmRayResourceHandle] + ) -> None: + """Downloads and streams the logs of the current job with given task ID. We do not stream the logs from the cluster directly, as the donwload and stream should be faster, and more robust against preemptions or ssh disconnection during the streaming. """ + if handle is None: + logger.info(f'Cluster for job {self._job_id} is not found. ' + 'Skipping downloading and streaming the logs.') + return managed_job_logs_dir = os.path.join(constants.SKY_LOGS_DIRECTORY, 'managed_jobs') - controller_utils.download_and_stream_latest_job_log( + log_file = controller_utils.download_and_stream_latest_job_log( self._backend, handle, managed_job_logs_dir) + if log_file is not None: + # Set the path of the log file for the current task, so it can be + # accessed even after the job is finished + managed_job_state.set_local_log_file(self._job_id, task_id, + log_file) logger.info(f'\n== End of logs (ID: {self._job_id}) ==') def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: @@ -213,7 +223,8 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: if job_status == job_lib.JobStatus.SUCCEEDED: end_time = managed_job_utils.get_job_timestamp( self._backend, cluster_name, get_end_time=True) - # The job is done. + # The job is done. Set the job to SUCCEEDED first before start + # downloading and streaming the logs to make it more responsive. managed_job_state.set_succeeded(self._job_id, task_id, end_time=end_time, @@ -221,12 +232,21 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: logger.info( f'Managed job {self._job_id} (task: {task_id}) SUCCEEDED. ' f'Cleaning up the cluster {cluster_name}.') + clusters = backend_utils.get_clusters( + cluster_names=[cluster_name], + refresh=False, + include_controller=False) + if clusters: + assert len(clusters) == 1, (clusters, cluster_name) + handle = clusters[0].get('handle') + # Best effort to download and stream the logs. + self._download_log_and_stream(task_id, handle) # Only clean up the cluster, not the storages, because tasks may # share storages. recovery_strategy.terminate_cluster(cluster_name=cluster_name) return True - # For single-node jobs, nonterminated job_status indicates a + # For single-node jobs, non-terminated job_status indicates a # healthy cluster. We can safely continue monitoring. # For multi-node jobs, since the job may not be set to FAILED # immediately (depending on user program) when only some of the @@ -278,7 +298,7 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: 'The user job failed. Please check the logs below.\n' f'== Logs of the user job (ID: {self._job_id}) ==\n') - self._download_log_and_stream(handle) + self._download_log_and_stream(task_id, handle) managed_job_status = ( managed_job_state.ManagedJobStatus.FAILED) if job_status == job_lib.JobStatus.FAILED_SETUP: diff --git a/sky/jobs/core.py b/sky/jobs/core.py index f11a556f2d4..9cde3443816 100644 --- a/sky/jobs/core.py +++ b/sky/jobs/core.py @@ -1,6 +1,7 @@ """SDK functions for managed jobs.""" import os import tempfile +import typing from typing import Any, Dict, List, Optional, Union import uuid @@ -29,6 +30,9 @@ from sky.utils import timeline from sky.utils import ux_utils +if typing.TYPE_CHECKING: + from sky.backends import cloud_vm_ray_backend + @timeline.event @usage_lib.entrypoint @@ -225,6 +229,40 @@ def queue_from_kubernetes_pod( return jobs +def _maybe_restart_controller( + refresh: bool, stopped_message: str, spinner_message: str +) -> 'cloud_vm_ray_backend.CloudVmRayResourceHandle': + """Restart controller if refresh is True and it is stopped.""" + jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER + if refresh: + stopped_message = '' + try: + handle = backend_utils.is_controller_accessible( + controller=jobs_controller_type, stopped_message=stopped_message) + except exceptions.ClusterNotUpError as e: + if not refresh: + raise + handle = None + controller_status = e.cluster_status + + if handle is not None: + return handle + + sky_logging.print(f'{colorama.Fore.YELLOW}' + f'Restarting {jobs_controller_type.value.name}...' + f'{colorama.Style.RESET_ALL}') + + rich_utils.force_update_status( + ux_utils.spinner_message(f'{spinner_message} - restarting ' + 'controller')) + handle = sky.start(jobs_controller_type.value.cluster_name) + controller_status = status_lib.ClusterStatus.UP + rich_utils.force_update_status(ux_utils.spinner_message(spinner_message)) + + assert handle is not None, (controller_status, refresh) + return handle + + @usage_lib.entrypoint def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. @@ -252,34 +290,11 @@ def queue(refresh: bool, skip_finished: bool = False) -> List[Dict[str, Any]]: does not exist. RuntimeError: if failed to get the managed jobs with ssh. """ - jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER - stopped_message = '' - if not refresh: - stopped_message = 'No in-progress managed jobs.' - try: - handle = backend_utils.is_controller_accessible( - controller=jobs_controller_type, stopped_message=stopped_message) - except exceptions.ClusterNotUpError as e: - if not refresh: - raise - handle = None - controller_status = e.cluster_status - - if refresh and handle is None: - sky_logging.print(f'{colorama.Fore.YELLOW}' - 'Restarting controller for latest status...' - f'{colorama.Style.RESET_ALL}') - - rich_utils.force_update_status( - ux_utils.spinner_message('Checking managed jobs - restarting ' - 'controller')) - handle = sky.start(jobs_controller_type.value.cluster_name) - controller_status = status_lib.ClusterStatus.UP - rich_utils.force_update_status( - ux_utils.spinner_message('Checking managed jobs')) - - assert handle is not None, (controller_status, refresh) - + handle = _maybe_restart_controller(refresh, + stopped_message='No in-progress ' + 'managed jobs.', + spinner_message='Checking ' + 'managed jobs') backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend) @@ -371,7 +386,7 @@ def cancel(name: Optional[str] = None, @usage_lib.entrypoint def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool, - controller: bool) -> None: + controller: bool, refresh: bool) -> None: # NOTE(dev): Keep the docstring consistent between the Python API and CLI. """Tail logs of managed jobs. @@ -382,15 +397,26 @@ def tail_logs(name: Optional[str], job_id: Optional[int], follow: bool, sky.exceptions.ClusterNotUpError: the jobs controller is not up. """ # TODO(zhwu): Automatically restart the jobs controller + if name is not None and job_id is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Cannot specify both name and job_id.') + jobs_controller_type = controller_utils.Controllers.JOBS_CONTROLLER - handle = backend_utils.is_controller_accessible( - controller=jobs_controller_type, + job_name_or_id_str = '' + if job_id is not None: + job_name_or_id_str = str(job_id) + elif name is not None: + job_name_or_id_str = f'-n {name}' + else: + job_name_or_id_str = '' + handle = _maybe_restart_controller( + refresh, stopped_message=( - 'Please restart the jobs controller with ' - f'`sky start {jobs_controller_type.value.cluster_name}`.')) + f'{jobs_controller_type.value.name.capitalize()} is stopped. To ' + f'get the logs, run: {colorama.Style.BRIGHT}sky jobs logs ' + f'-r {job_name_or_id_str}{colorama.Style.RESET_ALL}'), + spinner_message='Retrieving job logs') - if name is not None and job_id is not None: - raise ValueError('Cannot specify both name and job_id.') backend = backend_utils.get_backend_from_handle(handle) assert isinstance(backend, backends.CloudVmRayBackend), backend diff --git a/sky/jobs/state.py b/sky/jobs/state.py index 6a0e3caeda3..9a5ab4b3cad 100644 --- a/sky/jobs/state.py +++ b/sky/jobs/state.py @@ -66,7 +66,8 @@ def create_table(cursor, conn): spot_job_id INTEGER, task_id INTEGER DEFAULT 0, task_name TEXT, - specs TEXT)""") + specs TEXT, + local_log_file TEXT DEFAULT NULL)""") conn.commit() db_utils.add_column_to_table(cursor, conn, 'spot', 'failure_reason', 'TEXT') @@ -103,6 +104,8 @@ def create_table(cursor, conn): value_to_replace_existing_entries=json.dumps({ 'max_restarts_on_errors': 0, })) + db_utils.add_column_to_table(cursor, conn, 'spot', 'local_log_file', + 'TEXT DEFAULT NULL') # `job_info` contains the mapping from job_id to the job_name. # In the future, it may contain more information about each job. @@ -157,6 +160,7 @@ def _get_db_path() -> str: 'task_id', 'task_name', 'specs', + 'local_log_file', # columns from the job_info table '_job_info_job_id', # This should be the same as job_id 'job_name', @@ -512,6 +516,20 @@ def set_cancelled(job_id: int, callback_func: CallbackType): callback_func('CANCELLED') +def set_local_log_file(job_id: int, task_id: Optional[int], + local_log_file: str): + """Set the local log file for a job.""" + filter_str = 'spot_job_id=(?)' + filter_args = [local_log_file, job_id] + if task_id is not None: + filter_str += ' AND task_id=(?)' + filter_args.append(task_id) + with db_utils.safe_cursor(_DB_PATH) as cursor: + cursor.execute( + 'UPDATE spot SET local_log_file=(?) ' + f'WHERE {filter_str}', filter_args) + + # ======== utility functions ======== def get_nonterminal_job_ids_by_name(name: Optional[str]) -> List[int]: """Get non-terminal job ids by name.""" @@ -662,3 +680,17 @@ def get_task_specs(job_id: int, task_id: int) -> Dict[str, Any]: WHERE spot_job_id=(?) AND task_id=(?)""", (job_id, task_id)).fetchone() return json.loads(task_specs[0]) + + +def get_local_log_file(job_id: int, task_id: Optional[int]) -> Optional[str]: + """Get the local log directory for a job.""" + filter_str = 'spot_job_id=(?)' + filter_args = [job_id] + if task_id is not None: + filter_str += ' AND task_id=(?)' + filter_args.append(task_id) + with db_utils.safe_cursor(_DB_PATH) as cursor: + local_log_file = cursor.execute( + f'SELECT local_log_file FROM spot ' + f'WHERE {filter_str}', filter_args).fetchone() + return local_log_file[-1] if local_log_file else None diff --git a/sky/jobs/utils.py b/sky/jobs/utils.py index f82e1132678..267c205285b 100644 --- a/sky/jobs/utils.py +++ b/sky/jobs/utils.py @@ -327,10 +327,24 @@ def stream_logs_by_id(job_id: int, follow: bool = True) -> str: if managed_job_status.is_failed(): job_msg = ('\nFailure reason: ' f'{managed_job_state.get_failure_reason(job_id)}') + log_file = managed_job_state.get_local_log_file(job_id, None) + if log_file is not None: + with open(log_file, 'r', encoding='utf-8') as f: + # Stream the logs to the console without reading the whole + # file into memory. + start_streaming = False + for line in f: + if log_lib.LOG_FILE_START_STREAMING_AT in line: + start_streaming = True + if start_streaming: + print(line, end='', flush=True) + return '' return (f'{colorama.Fore.YELLOW}' f'Job {job_id} is already in terminal state ' - f'{managed_job_status.value}. Logs will not be shown.' - f'{colorama.Style.RESET_ALL}{job_msg}') + f'{managed_job_status.value}. For more details, run: ' + f'sky jobs logs --controller {job_id}' + f'{colorama.Style.RESET_ALL}' + f'{job_msg}') backend = backends.CloudVmRayBackend() task_id, managed_job_status = ( managed_job_state.get_latest_task_id_status(job_id)) diff --git a/sky/skylet/log_lib.py b/sky/skylet/log_lib.py index fa3f7f9f3fc..8a40982972a 100644 --- a/sky/skylet/log_lib.py +++ b/sky/skylet/log_lib.py @@ -34,6 +34,8 @@ logger = sky_logging.init_logger(__name__) +LOG_FILE_START_STREAMING_AT = 'Waiting for task resources on ' + class _ProcessingArgs: """Arguments for processing logs.""" @@ -435,7 +437,7 @@ def tail_logs(job_id: Optional[int], time.sleep(_SKY_LOG_WAITING_GAP_SECONDS) status = job_lib.update_job_status([job_id], silent=True)[0] - start_stream_at = 'Waiting for task resources on ' + start_stream_at = LOG_FILE_START_STREAMING_AT # Explicitly declare the type to avoid mypy warning. lines: Iterable[str] = [] if follow and status in [ diff --git a/sky/skylet/log_lib.pyi b/sky/skylet/log_lib.pyi index 01b08b6444f..89d1628ec11 100644 --- a/sky/skylet/log_lib.pyi +++ b/sky/skylet/log_lib.pyi @@ -13,6 +13,9 @@ from sky.skylet import constants as constants from sky.skylet import job_lib as job_lib from sky.utils import log_utils as log_utils +LOG_FILE_START_STREAMING_AT: str = ... + + class _ProcessingArgs: log_path: str stream_logs: bool diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index a6657df960d..e157b6416a5 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -26,6 +26,7 @@ from sky.serve import constants as serve_constants from sky.serve import serve_utils from sky.skylet import constants +from sky.skylet import log_lib from sky.utils import common_utils from sky.utils import env_options from sky.utils import rich_utils @@ -380,11 +381,19 @@ def download_and_stream_latest_job_log( else: log_dir = list(log_dirs.values())[0] log_file = os.path.join(log_dir, 'run.log') - # Print the logs to the console. + # TODO(zhwu): refactor this into log_utils, along with the + # refactoring for the log_lib.tail_logs. try: with open(log_file, 'r', encoding='utf-8') as f: - print(f.read()) + # Stream the logs to the console without reading the whole + # file into memory. + start_streaming = False + for line in f: + if log_lib.LOG_FILE_START_STREAMING_AT in line: + start_streaming = True + if start_streaming: + print(line, end='', flush=True) except FileNotFoundError: logger.error('Failed to find the logs for the user ' f'program at {log_file}.') diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 26f7ba73ef9..78c87e0e60e 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -3651,6 +3651,8 @@ def test_managed_jobs_storage(generic_cloud: str): job_name=name, job_status=[sky.ManagedJobStatus.SUCCEEDED], timeout=60 + _BUMP_UP_SECONDS), + # Wait for the job to be cleaned up. + 'sleep 20', f'[ $(aws s3api list-buckets --query "Buckets[?contains(Name, \'{storage_name}\')].Name" --output text | wc -l) -eq 0 ]', # Check if file was written to the mounted output bucket output_check_cmd @@ -3701,11 +3703,19 @@ def test_managed_jobs_inline_env(generic_cloud: str): test = Test( 'test-managed-jobs-inline-env', [ - f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', + f'sky jobs launch -n {name} -y --cloud {generic_cloud} --env TEST_ENV="hello world" -- "echo "\\$TEST_ENV"; ([[ ! -z \\"\$TEST_ENV\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_IPS}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NODE_RANK}\\" ]] && [[ ! -z \\"\${constants.SKYPILOT_NUM_NODES}\\" ]]) || exit 1"', _get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, job_status=[sky.ManagedJobStatus.SUCCEEDED], timeout=20 + _BUMP_UP_SECONDS), + f'JOB_ROW=$(sky jobs queue | grep {name} | head -n1) && ' + f'echo "$JOB_ROW" && echo "$JOB_ROW" | grep "SUCCEEDED" && ' + f'JOB_ID=$(echo "$JOB_ROW" | awk \'{{print $1}}\') && ' + f'echo "JOB_ID=$JOB_ID" && ' + # Test that logs are still available after the job finishes. + 'unset SKYPILOT_DEBUG; s=$(sky jobs logs $JOB_ID --refresh) && echo "$s" && echo "$s" | grep "hello world" && ' + # Make sure we skip the unnecessary logs. + 'echo "$s" | head -n1 | grep "Waiting for"', ], f'sky jobs cancel -y -n {name}', # Increase timeout since sky jobs queue -r can be blocked by other spot tests. From de561c25b07032fe3f2ddc88d32d930c8f78780a Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Tue, 3 Dec 2024 15:46:28 -0800 Subject: [PATCH 07/10] [perf] use uv for venv creation and pip install (#4414) * Revert "remove `uv` from runtime setup due to azure installation issue (#4401)" This reverts commit 0b20d568ee1af454bfec3e50ff62d239f976e52d. * on azure, use --prerelease=allow to install azure-cli * use uv venv --seed * fix backwards compatibility * really fix backwards compatibility * use uv to set up controller dependencies * fix python 3.8 * lint * add missing file * update comment * split out azure-cli dep * fix lint for dependencies * use runpy.run_path rather than modifying sys.path * fix cloud dependency installation commands * lint * Update sky/utils/controller_utils.py Co-authored-by: Zhanghao Wu --------- Co-authored-by: Zhanghao Wu --- sky/setup_files/dependencies.py | 141 +++++++++++++++++++++++++++ sky/setup_files/setup.py | 136 +++----------------------- sky/skylet/constants.py | 47 ++++++--- sky/templates/kubernetes-ray.yml.j2 | 2 +- sky/utils/controller_utils.py | 145 ++++++++++------------------ 5 files changed, 239 insertions(+), 232 deletions(-) create mode 100644 sky/setup_files/dependencies.py diff --git a/sky/setup_files/dependencies.py b/sky/setup_files/dependencies.py new file mode 100644 index 00000000000..18d2f5cdc08 --- /dev/null +++ b/sky/setup_files/dependencies.py @@ -0,0 +1,141 @@ +"""Dependencies for SkyPilot. + +This file is imported by setup.py, so: +- It may not be able to import other skypilot modules, since sys.path may not be + correct. +- It should not import any dependencies, as they may not be installed yet. +""" +from typing import Dict, List + +install_requires = [ + 'wheel', + 'cachetools', + # NOTE: ray requires click>=7.0. + 'click >= 7.0', + 'colorama', + 'cryptography', + # Jinja has a bug in older versions because of the lack of pinning + # the version of the underlying markupsafe package. See: + # https://github.com/pallets/jinja/issues/1585 + 'jinja2 >= 3.0', + 'jsonschema', + 'networkx', + 'pandas>=1.3.0', + 'pendulum', + # PrettyTable with version >=2.0.0 is required for the support of + # `add_rows` method. + 'PrettyTable >= 2.0.0', + 'python-dotenv', + 'rich', + 'tabulate', + # Light weight requirement, can be replaced with "typing" once + # we deprecate Python 3.7 (this will take a while). + 'typing_extensions', + 'filelock >= 3.6.0', + 'packaging', + 'psutil', + 'pulp', + # Cython 3.0 release breaks PyYAML 5.4.* + # (https://github.com/yaml/pyyaml/issues/601) + # <= 3.13 may encounter https://github.com/ultralytics/yolov5/issues/414 + 'pyyaml > 3.13, != 5.4.*', + 'requests', +] + +local_ray = [ + # Lower version of ray will cause dependency conflict for + # click/grpcio/protobuf. + # Excluded 2.6.0 as it has a bug in the cluster launcher: + # https://github.com/ray-project/ray/releases/tag/ray-2.6.1 + 'ray[default] >= 2.2.0, != 2.6.0', +] + +remote = [ + # Adopted from ray's setup.py: + # https://github.com/ray-project/ray/blob/ray-2.4.0/python/setup.py + # SkyPilot: != 1.48.0 is required to avoid the error where ray dashboard + # fails to start when ray start is called (#2054). + # Tracking issue: https://github.com/ray-project/ray/issues/30984 + 'grpcio >= 1.32.0, <= 1.49.1, != 1.48.0; python_version < \'3.10\' and sys_platform == \'darwin\'', # noqa:E501 pylint: disable=line-too-long + 'grpcio >= 1.42.0, <= 1.49.1, != 1.48.0; python_version >= \'3.10\' and sys_platform == \'darwin\'', # noqa:E501 pylint: disable=line-too-long + # Original issue: https://github.com/ray-project/ray/issues/33833 + 'grpcio >= 1.32.0, <= 1.51.3, != 1.48.0; python_version < \'3.10\' and sys_platform != \'darwin\'', # noqa:E501 pylint: disable=line-too-long + 'grpcio >= 1.42.0, <= 1.51.3, != 1.48.0; python_version >= \'3.10\' and sys_platform != \'darwin\'', # noqa:E501 pylint: disable=line-too-long + # Adopted from ray's setup.py: + # https://github.com/ray-project/ray/blob/ray-2.9.3/python/setup.py#L343 + 'protobuf >= 3.15.3, != 3.19.5', + # Some pydantic versions are not compatible with ray. Adopted from ray's + # setup.py: + # https://github.com/ray-project/ray/blob/ray-2.9.3/python/setup.py#L254 + 'pydantic!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,<3', +] + +# NOTE: Change the templates/jobs-controller.yaml.j2 file if any of the +# following packages dependencies are changed. +aws_dependencies = [ + # botocore does not work with urllib3>=2.0.0, according to + # https://github.com/boto/botocore/issues/2926 + # We have to explicitly pin the version to optimize the time for + # poetry install. See https://github.com/orgs/python-poetry/discussions/7937 + 'urllib3<2', + # NOTE: this installs CLI V1. To use AWS SSO (e.g., `aws sso login`), users + # should instead use CLI V2 which is not pip-installable. See + # https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html. + 'awscli>=1.27.10', + 'botocore>=1.29.10', + 'boto3>=1.26.1', + # NOTE: required by awscli. To avoid ray automatically installing + # the latest version. + 'colorama < 0.4.5', +] + +# azure-cli cannot be installed normally by uv, so we need to work around it in +# a few places. +AZURE_CLI = 'azure-cli>=2.65.0' + +extras_require: Dict[str, List[str]] = { + 'aws': aws_dependencies, + # TODO(zongheng): azure-cli is huge and takes a long time to install. + # Tracked in: https://github.com/Azure/azure-cli/issues/7387 + # azure-identity is needed in node_provider. + # We need azure-identity>=1.13.0 to enable the customization of the + # timeout of AzureCliCredential. + 'azure': [ + AZURE_CLI, + 'azure-core>=1.31.0', + 'azure-identity>=1.19.0', + 'azure-mgmt-network>=27.0.0', + 'azure-mgmt-compute>=33.0.0', + 'azure-storage-blob>=12.23.1', + 'msgraph-sdk', + ] + local_ray, + # We need google-api-python-client>=2.69.0 to enable 'discardLocalSsd' + # parameter for stopping instances. Reference: + # https://github.com/googleapis/google-api-python-client/commit/f6e9d3869ed605b06f7cbf2e8cf2db25108506e6 + 'gcp': ['google-api-python-client>=2.69.0', 'google-cloud-storage'], + 'ibm': [ + 'ibm-cloud-sdk-core', 'ibm-vpc', 'ibm-platform-services', 'ibm-cos-sdk' + ] + local_ray, + 'docker': ['docker'] + local_ray, + 'lambda': local_ray, + 'cloudflare': aws_dependencies, + 'scp': local_ray, + 'oci': ['oci'] + local_ray, + 'kubernetes': ['kubernetes>=20.0.0'], + 'remote': remote, + 'runpod': ['runpod>=1.5.1'], + 'fluidstack': [], # No dependencies needed for fluidstack + 'cudo': ['cudo-compute>=0.1.10'], + 'paperspace': [], # No dependencies needed for paperspace + 'vsphere': [ + 'pyvmomi==8.0.1.0.2', + # vsphere-automation-sdk is also required, but it does not have + # pypi release, which cause failure of our pypi release. + # https://peps.python.org/pep-0440/#direct-references + # We have the instruction for its installation in our + # docs instead. + # 'vsphere-automation-sdk @ git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.1.0' pylint: disable=line-too-long + ], +} + +extras_require['all'] = sum(extras_require.values(), []) diff --git a/sky/setup_files/setup.py b/sky/setup_files/setup.py index 0fd6978ec03..121f96d8e8b 100644 --- a/sky/setup_files/setup.py +++ b/sky/setup_files/setup.py @@ -18,19 +18,28 @@ import os import platform import re +import runpy import subprocess import sys -from typing import Dict, List import setuptools +# __file__ is setup.py at the root of the repo. We shouldn't assume it's a +# symlink - e.g. in the sdist it's resolved to a normal file. ROOT_DIR = os.path.dirname(__file__) +DEPENDENCIES_FILE_PATH = os.path.join(ROOT_DIR, 'sky', 'setup_files', + 'dependencies.py') INIT_FILE_PATH = os.path.join(ROOT_DIR, 'sky', '__init__.py') _COMMIT_FAILURE_MESSAGE = ( 'WARNING: SkyPilot fail to {verb} the commit hash in ' f'{INIT_FILE_PATH!r} (SkyPilot can still be normally used): ' '{error}') +# setuptools does not include the script dir on the search path, so we can't +# just do `import dependencies`. Instead, use runpy to manually load it. Note: +# dependencies here is a dict, not a module, so we access it by subscripting. +dependencies = runpy.run_path(DEPENDENCIES_FILE_PATH) + original_init_content = None system = platform.system() @@ -130,127 +139,6 @@ def parse_readme(readme: str) -> str: return readme -install_requires = [ - 'wheel', - 'cachetools', - # NOTE: ray requires click>=7.0. - 'click >= 7.0', - 'colorama', - 'cryptography', - # Jinja has a bug in older versions because of the lack of pinning - # the version of the underlying markupsafe package. See: - # https://github.com/pallets/jinja/issues/1585 - 'jinja2 >= 3.0', - 'jsonschema', - 'networkx', - 'pandas>=1.3.0', - 'pendulum', - # PrettyTable with version >=2.0.0 is required for the support of - # `add_rows` method. - 'PrettyTable >= 2.0.0', - 'python-dotenv', - 'rich', - 'tabulate', - # Light weight requirement, can be replaced with "typing" once - # we deprecate Python 3.7 (this will take a while). - 'typing_extensions', - 'filelock >= 3.6.0', - 'packaging', - 'psutil', - 'pulp', - # Cython 3.0 release breaks PyYAML 5.4.* (https://github.com/yaml/pyyaml/issues/601) - # <= 3.13 may encounter https://github.com/ultralytics/yolov5/issues/414 - 'pyyaml > 3.13, != 5.4.*', - 'requests', -] - -local_ray = [ - # Lower version of ray will cause dependency conflict for - # click/grpcio/protobuf. - # Excluded 2.6.0 as it has a bug in the cluster launcher: - # https://github.com/ray-project/ray/releases/tag/ray-2.6.1 - 'ray[default] >= 2.2.0, != 2.6.0', -] - -remote = [ - # Adopted from ray's setup.py: https://github.com/ray-project/ray/blob/ray-2.4.0/python/setup.py - # SkyPilot: != 1.48.0 is required to avoid the error where ray dashboard fails to start when - # ray start is called (#2054). - # Tracking issue: https://github.com/ray-project/ray/issues/30984 - "grpcio >= 1.32.0, <= 1.49.1, != 1.48.0; python_version < '3.10' and sys_platform == 'darwin'", # noqa:E501 - "grpcio >= 1.42.0, <= 1.49.1, != 1.48.0; python_version >= '3.10' and sys_platform == 'darwin'", # noqa:E501 - # Original issue: https://github.com/ray-project/ray/issues/33833 - "grpcio >= 1.32.0, <= 1.51.3, != 1.48.0; python_version < '3.10' and sys_platform != 'darwin'", # noqa:E501 - "grpcio >= 1.42.0, <= 1.51.3, != 1.48.0; python_version >= '3.10' and sys_platform != 'darwin'", # noqa:E501 - # Adopted from ray's setup.py: - # https://github.com/ray-project/ray/blob/ray-2.9.3/python/setup.py#L343 - 'protobuf >= 3.15.3, != 3.19.5', - # Some pydantic versions are not compatible with ray. Adopted from ray's - # setup.py: https://github.com/ray-project/ray/blob/ray-2.9.3/python/setup.py#L254 - 'pydantic!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,<3', -] - -# NOTE: Change the templates/jobs-controller.yaml.j2 file if any of the -# following packages dependencies are changed. -aws_dependencies = [ - # botocore does not work with urllib3>=2.0.0, according to https://github.com/boto/botocore/issues/2926 - # We have to explicitly pin the version to optimize the time for - # poetry install. See https://github.com/orgs/python-poetry/discussions/7937 - 'urllib3<2', - # NOTE: this installs CLI V1. To use AWS SSO (e.g., `aws sso login`), users - # should instead use CLI V2 which is not pip-installable. See - # https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html. - 'awscli>=1.27.10', - 'botocore>=1.29.10', - 'boto3>=1.26.1', - # NOTE: required by awscli. To avoid ray automatically installing - # the latest version. - 'colorama < 0.4.5', -] - -extras_require: Dict[str, List[str]] = { - 'aws': aws_dependencies, - # TODO(zongheng): azure-cli is huge and takes a long time to install. - # Tracked in: https://github.com/Azure/azure-cli/issues/7387 - # azure-identity is needed in node_provider. - # We need azure-identity>=1.13.0 to enable the customization of the - # timeout of AzureCliCredential. - 'azure': [ - 'azure-cli>=2.65.0', 'azure-core>=1.31.0', 'azure-identity>=1.19.0', - 'azure-mgmt-network>=27.0.0', 'azure-mgmt-compute>=33.0.0', - 'azure-storage-blob>=12.23.1', 'msgraph-sdk' - ] + local_ray, - # We need google-api-python-client>=2.69.0 to enable 'discardLocalSsd' - # parameter for stopping instances. - # Reference: https://github.com/googleapis/google-api-python-client/commit/f6e9d3869ed605b06f7cbf2e8cf2db25108506e6 - 'gcp': ['google-api-python-client>=2.69.0', 'google-cloud-storage'], - 'ibm': [ - 'ibm-cloud-sdk-core', 'ibm-vpc', 'ibm-platform-services', 'ibm-cos-sdk' - ] + local_ray, - 'docker': ['docker'] + local_ray, - 'lambda': local_ray, - 'cloudflare': aws_dependencies, - 'scp': local_ray, - 'oci': ['oci'] + local_ray, - 'kubernetes': ['kubernetes>=20.0.0'], - 'remote': remote, - 'runpod': ['runpod>=1.5.1'], - 'fluidstack': [], # No dependencies needed for fluidstack - 'cudo': ['cudo-compute>=0.1.10'], - 'paperspace': [], # No dependencies needed for paperspace - 'vsphere': [ - 'pyvmomi==8.0.1.0.2', - # vsphere-automation-sdk is also required, but it does not have - # pypi release, which cause failure of our pypi release. - # https://peps.python.org/pep-0440/#direct-references - # We have the instruction for its installation in our - # docs instead. - # 'vsphere-automation-sdk @ git+https://github.com/vmware/vsphere-automation-sdk-python.git@v8.0.1.0' - ], -} - -extras_require['all'] = sum(extras_require.values(), []) - long_description = '' readme_filepath = 'README.md' # When sky/backends/wheel_utils.py builds wheels, it will not contain the @@ -277,8 +165,8 @@ def parse_readme(readme: str) -> str: long_description_content_type='text/markdown', setup_requires=['wheel'], requires_python='>=3.7', - install_requires=install_requires, - extras_require=extras_require, + install_requires=dependencies['install_requires'], + extras_require=dependencies['extras_require'], entry_points={ 'console_scripts': ['sky = sky.cli:cli'], }, diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index 97d745b2e26..0b2a5b08e1b 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -4,6 +4,7 @@ from packaging import version import sky +from sky.setup_files import dependencies SKY_LOGS_DIRECTORY = '~/sky_logs' SKY_REMOTE_WORKDIR = '~/sky_workdir' @@ -39,6 +40,8 @@ 'which python3') # Python executable, e.g., /opt/conda/bin/python3 SKY_PYTHON_CMD = f'$({SKY_GET_PYTHON_PATH_CMD})' +# Prefer SKY_UV_PIP_CMD, which is faster. +# TODO(cooperc): remove remaining usage (GCP TPU setup). SKY_PIP_CMD = f'{SKY_PYTHON_CMD} -m pip' # Ray executable, e.g., /opt/conda/bin/ray # We need to add SKY_PYTHON_CMD before ray executable because: @@ -50,6 +53,14 @@ SKY_REMOTE_PYTHON_ENV_NAME = 'skypilot-runtime' SKY_REMOTE_PYTHON_ENV = f'~/{SKY_REMOTE_PYTHON_ENV_NAME}' ACTIVATE_SKY_REMOTE_PYTHON_ENV = f'source {SKY_REMOTE_PYTHON_ENV}/bin/activate' +# uv is used for venv and pip, much faster than python implementations. +SKY_UV_INSTALL_DIR = '"$HOME/.local/bin"' +SKY_UV_CMD = f'{SKY_UV_INSTALL_DIR}/uv' +# This won't reinstall uv if it's already installed, so it's safe to re-run. +SKY_UV_INSTALL_CMD = (f'{SKY_UV_CMD} -V >/dev/null 2>&1 || ' + 'curl -LsSf https://astral.sh/uv/install.sh ' + f'| UV_INSTALL_DIR={SKY_UV_INSTALL_DIR} sh') +SKY_UV_PIP_CMD = f'VIRTUAL_ENV={SKY_REMOTE_PYTHON_ENV} {SKY_UV_CMD} pip' # Deleting the SKY_REMOTE_PYTHON_ENV_NAME from the PATH to deactivate the # environment. `deactivate` command does not work when conda is used. DEACTIVATE_SKY_REMOTE_PYTHON_ENV = ( @@ -148,28 +159,30 @@ 'echo "Creating conda env with Python 3.10" && ' f'conda create -y -n {SKY_REMOTE_PYTHON_ENV_NAME} python=3.10 && ' f'conda activate {SKY_REMOTE_PYTHON_ENV_NAME};' + # Install uv for venv management and pip installation. + f'{SKY_UV_INSTALL_CMD};' # Create a separate conda environment for SkyPilot dependencies. f'[ -d {SKY_REMOTE_PYTHON_ENV} ] || ' # Do NOT use --system-site-packages here, because if users upgrade any # packages in the base env, they interfere with skypilot dependencies. # Reference: https://github.com/skypilot-org/skypilot/issues/4097 - f'{SKY_PYTHON_CMD} -m venv {SKY_REMOTE_PYTHON_ENV};' + # --seed will include pip and setuptools, which are present in venvs created + # with python -m venv. + f'{SKY_UV_CMD} venv --seed {SKY_REMOTE_PYTHON_ENV};' f'echo "$(echo {SKY_REMOTE_PYTHON_ENV})/bin/python" > {SKY_PYTHON_PATH_FILE};' ) _sky_version = str(version.parse(sky.__version__)) RAY_STATUS = f'RAY_ADDRESS=127.0.0.1:{SKY_REMOTE_RAY_PORT} {SKY_RAY_CMD} status' RAY_INSTALLATION_COMMANDS = ( + f'{SKY_UV_INSTALL_CMD};' 'mkdir -p ~/sky_workdir && mkdir -p ~/.sky/sky_app;' - # Disable the pip version check to avoid the warning message, which makes - # the output hard to read. - 'export PIP_DISABLE_PIP_VERSION_CHECK=1;' # Print the PATH in provision.log to help debug PATH issues. 'echo PATH=$PATH; ' # Install setuptools<=69.5.1 to avoid the issue with the latest setuptools # causing the error: # ImportError: cannot import name 'packaging' from 'pkg_resources'" - f'{SKY_PIP_CMD} install "setuptools<70"; ' + f'{SKY_UV_PIP_CMD} install "setuptools<70"; ' # Backward compatibility for ray upgrade (#3248): do not upgrade ray if the # ray cluster is already running, to avoid the ray cluster being restarted. # @@ -183,10 +196,10 @@ # latest ray port 6380, but those existing cluster launched before #1790 # that has ray cluster on the default port 6379 will be upgraded and # restarted. - f'{SKY_PIP_CMD} list | grep "ray " | ' + f'{SKY_UV_PIP_CMD} list | grep "ray " | ' f'grep {SKY_REMOTE_RAY_VERSION} 2>&1 > /dev/null ' f'|| {RAY_STATUS} || ' - f'{SKY_PIP_CMD} install --exists-action w -U ray[default]=={SKY_REMOTE_RAY_VERSION}; ' # pylint: disable=line-too-long + f'{SKY_UV_PIP_CMD} install -U ray[default]=={SKY_REMOTE_RAY_VERSION}; ' # pylint: disable=line-too-long # In some envs, e.g. pip does not have permission to write under /opt/conda # ray package will be installed under ~/.local/bin. If the user's PATH does # not include ~/.local/bin (the pip install will have the output: `WARNING: @@ -202,10 +215,22 @@ f'which ray > {SKY_RAY_PATH_FILE} || exit 1; }}; ') SKYPILOT_WHEEL_INSTALLATION_COMMANDS = ( - f'{{ {SKY_PIP_CMD} list | grep "skypilot " && ' + f'{SKY_UV_INSTALL_CMD};' + f'{{ {SKY_UV_PIP_CMD} list | grep "skypilot " && ' '[ "$(cat ~/.sky/wheels/current_sky_wheel_hash)" == "{sky_wheel_hash}" ]; } || ' # pylint: disable=line-too-long - f'{{ {SKY_PIP_CMD} uninstall skypilot -y; ' - f'{SKY_PIP_CMD} install "$(echo ~/.sky/wheels/{{sky_wheel_hash}}/' + f'{{ {SKY_UV_PIP_CMD} uninstall skypilot; ' + # uv cannot install azure-cli normally, since it depends on pre-release + # packages. Manually install azure-cli with the --prerelease=allow flag + # first. This will allow skypilot to successfully install. See + # https://docs.astral.sh/uv/pip/compatibility/#pre-release-compatibility. + # We don't want to use --prerelease=allow for all packages, because it will + # cause uv to use pre-releases for some other packages that have sufficient + # stable releases. + 'if [ "{cloud}" = "azure" ]; then ' + f'{SKY_UV_PIP_CMD} install --prerelease=allow "{dependencies.AZURE_CLI}";' + 'fi;' + # Install skypilot from wheel + f'{SKY_UV_PIP_CMD} install "$(echo ~/.sky/wheels/{{sky_wheel_hash}}/' f'skypilot-{_sky_version}*.whl)[{{cloud}}, remote]" && ' 'echo "{sky_wheel_hash}" > ~/.sky/wheels/current_sky_wheel_hash || ' 'exit 1; }; ') @@ -220,7 +245,7 @@ # The ray installation above can be skipped due to the existing ray cluster # for backward compatibility. In this case, we should not patch the ray # files. - f'{SKY_PIP_CMD} list | grep "ray " | ' + f'{SKY_UV_PIP_CMD} list | grep "ray " | ' f'grep {SKY_REMOTE_RAY_VERSION} 2>&1 > /dev/null && ' f'{{ {SKY_PYTHON_CMD} -c ' '"from sky.skylet.ray_patches import patch; patch()" || exit 1; }; ') diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index e572b263924..535e6f0b1ae 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -414,7 +414,7 @@ available_node_types: done {{ conda_installation_commands }} {{ ray_installation_commands }} - ~/skypilot-runtime/bin/python -m pip install skypilot[kubernetes,remote] + VIRTUAL_ENV=~/skypilot-runtime ~/.local/bin/uv pip install skypilot[kubernetes,remote] touch /tmp/ray_skypilot_installation_complete echo "=== Ray and skypilot installation completed ===" diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index e157b6416a5..3f0bd5c5ed7 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -25,6 +25,7 @@ from sky.jobs import utils as managed_job_utils from sky.serve import constants as serve_constants from sky.serve import serve_utils +from sky.setup_files import dependencies from sky.skylet import constants from sky.skylet import log_lib from sky.utils import common_utils @@ -188,79 +189,49 @@ def from_type(cls, controller_type: str) -> Optional['Controllers']: # Install cli dependencies. Not using SkyPilot wheels because the wheel # can be cleaned up by another process. -# TODO(zhwu): Keep the dependencies align with the ones in setup.py def _get_cloud_dependencies_installation_commands( controller: Controllers) -> List[str]: - # TODO(tian): Make dependency installation command a method of cloud - # class and get all installation command for enabled clouds. - commands = [] # We use / instead of strong formatting, as we need to update # the at the end of the for loop, and python does not support # partial string formatting. prefix_str = ('[/] Check & install cloud dependencies ' 'on controller: ') + commands: List[str] = [] # This is to make sure the shorter checking message does not have junk # characters from the previous message. - empty_str = ' ' * 10 - aws_dependencies_installation = ( - 'pip list | grep boto3 > /dev/null 2>&1 || pip install ' - 'botocore>=1.29.10 boto3>=1.26.1; ' - # Need to separate the installation of awscli from above because some - # other clouds will install boto3 but not awscli. - 'pip list | grep awscli> /dev/null 2>&1 || pip install "urllib3<2" ' - 'awscli>=1.27.10 "colorama<0.4.5" > /dev/null 2>&1') - setup_clouds: List[str] = [] + empty_str = ' ' * 20 + + # All python dependencies will be accumulated and then installed in one + # command at the end. This is very fast if the packages are already + # installed, so we don't check that. + python_packages: Set[str] = set() + + step_prefix = prefix_str.replace('', str(len(commands) + 1)) + commands.append(f'echo -en "\\r{step_prefix}uv{empty_str}" &&' + f'{constants.SKY_UV_INSTALL_CMD} >/dev/null 2>&1') + for cloud in sky_check.get_cached_enabled_clouds_or_refresh(): - if isinstance( - clouds, - (clouds.Lambda, clouds.SCP, clouds.Fluidstack, clouds.Paperspace)): - # no need to install any cloud dependencies for lambda, scp, - # fluidstack and paperspace - continue - if isinstance(cloud, clouds.AWS): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) - commands.append(f'echo -en "\\r{step_prefix}AWS{empty_str}" && ' + - aws_dependencies_installation) - setup_clouds.append(str(cloud)) - elif isinstance(cloud, clouds.Azure): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) - commands.append( - f'echo -en "\\r{step_prefix}Azure{empty_str}" && ' - 'pip list | grep azure-cli > /dev/null 2>&1 || ' - 'pip install "azure-cli>=2.31.0" azure-core ' - '"azure-identity>=1.13.0" azure-mgmt-network > /dev/null 2>&1') - # Have to separate this installation of az blob storage from above - # because this is newly-introduced and not part of azure-cli. We - # need a separate installed check for this. + cloud_python_dependencies: List[str] = dependencies.extras_require[ + cloud.canonical_name()] + + if isinstance(cloud, clouds.Azure): + # azure-cli cannot be normally installed by uv. + # See comments in sky/skylet/constants.py. + cloud_python_dependencies.remove(dependencies.AZURE_CLI) + + step_prefix = prefix_str.replace('', str(len(commands) + 1)) commands.append( - 'pip list | grep azure-storage-blob > /dev/null 2>&1 || ' - 'pip install azure-storage-blob msgraph-sdk > /dev/null 2>&1') - setup_clouds.append(str(cloud)) + f'echo -en "\\r{step_prefix}azure-cli{empty_str}" &&' + f'{constants.SKY_UV_PIP_CMD} install --prerelease=allow ' + f'"{dependencies.AZURE_CLI}" > /dev/null 2>&1') elif isinstance(cloud, clouds.GCP): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) - commands.append( - f'echo -en "\\r{step_prefix}GCP{empty_str}" && ' - 'pip list | grep google-api-python-client > /dev/null 2>&1 || ' - 'pip install "google-api-python-client>=2.69.0" ' - '> /dev/null 2>&1') - # Have to separate the installation of google-cloud-storage from - # above because for a VM launched on GCP, the VM may have - # google-api-python-client installed alone. - commands.append( - 'pip list | grep google-cloud-storage > /dev/null 2>&1 || ' - 'pip install google-cloud-storage > /dev/null 2>&1') - commands.append(f'{gcp.GOOGLE_SDK_INSTALLATION_COMMAND}') - setup_clouds.append(str(cloud)) + step_prefix = prefix_str.replace('', str(len(commands) + 1)) + commands.append(f'echo -en "\\r{step_prefix}GCP SDK{empty_str}" &&' + f'{gcp.GOOGLE_SDK_INSTALLATION_COMMAND}') elif isinstance(cloud, clouds.Kubernetes): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) + step_prefix = prefix_str.replace('', str(len(commands) + 1)) commands.append( f'echo -en "\\r{step_prefix}Kubernetes{empty_str}" && ' - 'pip list | grep kubernetes > /dev/null 2>&1 || ' - 'pip install "kubernetes>=20.0.0" > /dev/null 2>&1 &&' # Install k8s + skypilot dependencies 'sudo bash -c "if ' '! command -v curl &> /dev/null || ' @@ -276,54 +247,36 @@ def _get_cloud_dependencies_installation_commands( '/bin/linux/amd64/kubectl" && ' 'sudo install -o root -g root -m 0755 ' 'kubectl /usr/local/bin/kubectl))') - setup_clouds.append(str(cloud)) elif isinstance(cloud, clouds.Cudo): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) + step_prefix = prefix_str.replace('', str(len(commands) + 1)) commands.append( - f'echo -en "\\r{step_prefix}Cudo{empty_str}" && ' - 'pip list | grep cudo-compute > /dev/null 2>&1 || ' - 'pip install "cudo-compute>=0.1.10" > /dev/null 2>&1 && ' + f'echo -en "\\r{step_prefix}cudoctl{empty_str}" && ' 'wget https://download.cudo.org/compute/cudoctl-0.3.2-amd64.deb -O ~/cudoctl.deb > /dev/null 2>&1 && ' # pylint: disable=line-too-long 'sudo dpkg -i ~/cudoctl.deb > /dev/null 2>&1') - setup_clouds.append(str(cloud)) - elif isinstance(cloud, clouds.RunPod): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) - commands.append(f'echo -en "\\r{step_prefix}RunPod{empty_str}" && ' - 'pip list | grep runpod > /dev/null 2>&1 || ' - 'pip install "runpod>=1.5.1" > /dev/null 2>&1') - setup_clouds.append(str(cloud)) - elif isinstance(cloud, clouds.OCI): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) - commands.append(f'echo -en "\\r{prefix_str}OCI{empty_str}" && ' - 'pip list | grep oci > /dev/null 2>&1 || ' - 'pip install oci > /dev/null 2>&1') - setup_clouds.append(str(cloud)) - if controller == Controllers.JOBS_CONTROLLER: - if isinstance(cloud, clouds.IBM): - step_prefix = prefix_str.replace('', - str(len(setup_clouds) + 1)) - commands.append( - f'echo -en "\\r{step_prefix}IBM{empty_str}" ' - '&& pip list | grep ibm-cloud-sdk-core > /dev/null 2>&1 || ' - 'pip install ibm-cloud-sdk-core ibm-vpc ' - 'ibm-platform-services ibm-cos-sdk > /dev/null 2>&1') - setup_clouds.append(str(cloud)) + elif isinstance(cloud, clouds.IBM): + if controller != Controllers.JOBS_CONTROLLER: + # We only need IBM deps on the jobs controller. + cloud_python_dependencies = [] + + python_packages.update(cloud_python_dependencies) + if (cloudflare.NAME in storage_lib.get_cached_enabled_storage_clouds_or_refresh()): - step_prefix = prefix_str.replace('', str(len(setup_clouds) + 1)) - commands.append( - f'echo -en "\\r{step_prefix}Cloudflare{empty_str}" && ' + - aws_dependencies_installation) - setup_clouds.append(cloudflare.NAME) + python_packages.update(dependencies.extras_require['cloudflare']) + + packages_string = ' '.join([f'"{package}"' for package in python_packages]) + step_prefix = prefix_str.replace('', str(len(commands) + 1)) + commands.append( + f'echo -en "\\r{step_prefix}cloud python packages{empty_str}" && ' + f'{constants.SKY_UV_PIP_CMD} install {packages_string} > /dev/null 2>&1' + ) + total_commands = len(commands) finish_prefix = prefix_str.replace('[/] ', ' ') commands.append(f'echo -e "\\r{finish_prefix}done.{empty_str}"') + commands = [ - command.replace('', str(len(setup_clouds))) - for command in commands + command.replace('', str(total_commands)) for command in commands ] return commands From 51a7e177d99fdfe73a89c04dddc385940a97a37d Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Tue, 3 Dec 2024 16:32:31 -0800 Subject: [PATCH 08/10] [Minor] README updates. (#4436) * [Minor] README touches. * update * update --- README.md | 3 ++- docs/source/docs/index.rst | 4 ++++ tests/test_smoke.py | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 40a41e18782..f29b57be9ca 100644 --- a/README.md +++ b/README.md @@ -155,10 +155,11 @@ SkyPilot then performs the heavy-lifting for you, including: Refer to [Quickstart](https://skypilot.readthedocs.io/en/latest/getting-started/quickstart.html) to get started with SkyPilot. ## More Information -To learn more, see our [documentation](https://skypilot.readthedocs.io/en/latest/), [blog](https://blog.skypilot.co/), and [community integrations](https://blog.skypilot.co/community/). +To learn more, see [Concept: Sky Computing](https://docs.skypilot.co/en/latest/sky-computing.html), [SkyPilot docs](https://skypilot.readthedocs.io/en/latest/), and [SkyPilot blog](https://blog.skypilot.co/). Runnable examples: +- [**AI Gallery**](https://docs.skypilot.co/en/latest/gallery/index.html) - LLMs on SkyPilot - [Llama 3.2: lightweight and vision models](./llm/llama-3_2/) - [Pixtral](./llm/pixtral/) diff --git a/docs/source/docs/index.rst b/docs/source/docs/index.rst index c5c44c63f47..17f8d545fa6 100644 --- a/docs/source/docs/index.rst +++ b/docs/source/docs/index.rst @@ -74,10 +74,14 @@ You can chat with the SkyPilot team and community on the `SkyPilot Slack ` and `SkyPilot blog `_. + Runnable examples: .. Keep this section in sync with README.md in SkyPilot repo +* :ref:`AI Gallery ` + * **LLMs on SkyPilot** * `Llama 3.2: lightweight and vision models `_ diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 78c87e0e60e..f37467417fa 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -195,7 +195,7 @@ def _get_cmd_wait_until_cluster_is_not_found(cluster_name: str, timeout: int): ' fi; ' 'done <<< "$current_status"; ' 'if [ "$found" -eq 1 ]; then break; fi; ' # Break outer loop if match found - 'echo "Waiting for job status to contains {job_status}, current status: $current_status"; ' + 'echo "Waiting for job status to contain {job_status}, current status: $current_status"; ' 'sleep 10; ' 'done') From 3009204b07474c7ea906d6906d4c82449c68f1c8 Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Wed, 4 Dec 2024 13:52:51 -0800 Subject: [PATCH 09/10] make --fast robust against credential or wheel updates (#4289) * add config_dict['config_hash'] output to write_cluster_config * fix docstring for write_cluster_config This used to be true, but since #2943, 'ray' is the only provisioner. Add other keys that are now present instead. * when using --fast, check if config_hash matches, and if not, provision * mock hashing method in unit test This is needed since some files in the fake file mounts don't actually exist, like the wheel path. * check config hash within provision with lock held * address other PR review comments * rename to skip_if_no_cluster_updates Co-authored-by: Zhanghao Wu * add assert details Co-authored-by: Zhanghao Wu * address PR comments and update docstrings * fix test * update docstrings Co-authored-by: Zhanghao Wu * address PR comments * fix lint and tests * Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Zhanghao Wu * refactor skip_if_no_cluster_update var * clarify comment * format exception --------- Co-authored-by: Zhanghao Wu --- sky/backends/backend.py | 57 +++++++--- sky/backends/backend_utils.py | 143 ++++++++++++++++++++++++- sky/backends/cloud_vm_ray_backend.py | 94 ++++++++++++---- sky/backends/local_docker_backend.py | 18 ++-- sky/clouds/service_catalog/common.py | 4 +- sky/execution.py | 36 +++++-- sky/global_user_state.py | 33 ++++-- sky/utils/common_utils.py | 19 ++++ tests/unit_tests/test_backend_utils.py | 2 + 9 files changed, 340 insertions(+), 66 deletions(-) diff --git a/sky/backends/backend.py b/sky/backends/backend.py index 10b51b06038..d5fd6f19925 100644 --- a/sky/backends/backend.py +++ b/sky/backends/backend.py @@ -45,20 +45,45 @@ def check_resources_fit_cluster(self, handle: _ResourceHandleType, @timeline.event @usage_lib.messages.usage.update_runtime('provision') def provision( - self, - task: 'task_lib.Task', - to_provision: Optional['resources.Resources'], - dryrun: bool, - stream_logs: bool, - cluster_name: Optional[str] = None, - retry_until_up: bool = False) -> Optional[_ResourceHandleType]: + self, + task: 'task_lib.Task', + to_provision: Optional['resources.Resources'], + dryrun: bool, + stream_logs: bool, + cluster_name: Optional[str] = None, + retry_until_up: bool = False, + skip_unnecessary_provisioning: bool = False, + ) -> Optional[_ResourceHandleType]: + """Provisions resources for the given task. + + Args: + task: The task to provision resources for. + to_provision: Resource config to provision. Should only be None if + cluster_name refers to an existing cluster, whose resources will + be used. + dryrun: If True, don't actually provision anything. + stream_logs: If True, stream additional logs to console. + cluster_name: Name of the cluster to provision. If None, a name will + be auto-generated. If the name refers to an existing cluster, + the existing cluster will be reused and re-provisioned. + retry_until_up: If True, retry provisioning until resources are + successfully launched. + skip_if_no_cluster_updates: If True, compare the cluster config to + the existing cluster_name's config. Skip provisioning if no + updates are needed for the existing cluster. + + Returns: + A ResourceHandle object for the provisioned resources, or None if + dryrun is True. + """ if cluster_name is None: cluster_name = sky.backends.backend_utils.generate_cluster_name() usage_lib.record_cluster_name_for_current_operation(cluster_name) usage_lib.messages.usage.update_actual_task(task) with rich_utils.safe_status(ux_utils.spinner_message('Launching')): return self._provision(task, to_provision, dryrun, stream_logs, - cluster_name, retry_until_up) + cluster_name, retry_until_up, + skip_unnecessary_provisioning) @timeline.event @usage_lib.messages.usage.update_runtime('sync_workdir') @@ -126,13 +151,15 @@ def register_info(self, **kwargs) -> None: # --- Implementations of the APIs --- def _provision( - self, - task: 'task_lib.Task', - to_provision: Optional['resources.Resources'], - dryrun: bool, - stream_logs: bool, - cluster_name: str, - retry_until_up: bool = False) -> Optional[_ResourceHandleType]: + self, + task: 'task_lib.Task', + to_provision: Optional['resources.Resources'], + dryrun: bool, + stream_logs: bool, + cluster_name: str, + retry_until_up: bool = False, + skip_unnecessary_provisioning: bool = False, + ) -> Optional[_ResourceHandleType]: raise NotImplementedError def _sync_workdir(self, handle: _ResourceHandleType, workdir: Path) -> None: diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 9c56546234a..7292001cc09 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -3,6 +3,7 @@ import enum import fnmatch import functools +import hashlib import os import pathlib import pprint @@ -644,11 +645,17 @@ def write_cluster_config( keep_launch_fields_in_existing_config: bool = True) -> Dict[str, str]: """Fills in cluster configuration templates and writes them out. - Returns: {provisioner: path to yaml, the provisioning spec}. - 'provisioner' can be - - 'ray' - - 'tpu-create-script' (if TPU is requested) - - 'tpu-delete-script' (if TPU is requested) + Returns: + Dict with the following keys: + - 'ray': Path to the generated Ray yaml config file + - 'cluster_name': Name of the cluster + - 'cluster_name_on_cloud': Name of the cluster as it appears in the + cloud provider + - 'config_hash': Hash of the cluster config and file mounts contents. + Can be missing if we unexpectedly failed to calculate the hash for + some reason. In that case we will continue without the optimization to + skip provisioning. + Raises: exceptions.ResourcesUnavailableError: if the region/zones requested does not appear in the catalog, or an ssh_proxy_command is specified but @@ -903,6 +910,12 @@ def write_cluster_config( if dryrun: # If dryrun, return the unfinished tmp yaml path. config_dict['ray'] = tmp_yaml_path + try: + config_dict['config_hash'] = _deterministic_cluster_yaml_hash( + tmp_yaml_path) + except Exception as e: # pylint: disable=broad-except + logger.warning(f'Failed to calculate config_hash: {e}') + logger.debug('Full exception:', exc_info=e) return config_dict _add_auth_to_cluster_config(cloud, tmp_yaml_path) @@ -925,6 +938,17 @@ def write_cluster_config( yaml_config = common_utils.read_yaml(tmp_yaml_path) config_dict['cluster_name_on_cloud'] = yaml_config['cluster_name'] + # Make sure to do this before we optimize file mounts. Optimization is + # non-deterministic, but everything else before this point should be + # deterministic. + try: + config_dict['config_hash'] = _deterministic_cluster_yaml_hash( + tmp_yaml_path) + except Exception as e: # pylint: disable=broad-except + logger.warning('Failed to calculate config_hash: ' + f'{common_utils.format_exception(e)}') + logger.debug('Full exception:', exc_info=e) + # Optimization: copy the contents of source files in file_mounts to a # special dir, and upload that as the only file_mount instead. Delay # calling this optimization until now, when all source files have been @@ -1033,6 +1057,115 @@ def get_ready_nodes_counts(pattern, output): return ready_head, ready_workers +@timeline.event +def _deterministic_cluster_yaml_hash(yaml_path: str) -> str: + """Hash the cluster yaml and contents of file mounts to a unique string. + + Two invocations of this function should return the same string if and only + if the contents of the yaml are the same and the file contents of all the + file_mounts specified in the yaml are the same. + + Limitations: + - This function can be expensive if the file mounts are large. (E.g. a few + seconds for ~1GB.) This should be okay since we expect that the + file_mounts in the cluster yaml (the wheel and cloud credentials) will be + small. + - Symbolic links are not explicitly handled. Some symbolic link changes may + not be detected. + + Implementation: We create a byte sequence that captures the state of the + yaml file and all the files in the file mounts, then hash the byte sequence. + + The format of the byte sequence is: + 32 bytes - sha256 hash of the yaml file + for each file mount: + file mount remote destination (UTF-8), \0 + if the file mount source is a file: + 'file' encoded to UTF-8 + 32 byte sha256 hash of the file contents + if the file mount source is a directory: + 'dir' encoded to UTF-8 + for each directory and subdirectory withinin the file mount (starting from + the root and descending recursively): + name of the directory (UTF-8), \0 + name of each subdirectory within the directory (UTF-8) terminated by \0 + \0 + for each file in the directory: + name of the file (UTF-8), \0 + 32 bytes - sha256 hash of the file contents + \0 + if the file mount source is something else or does not exist, nothing + \0\0 + + Rather than constructing the whole byte sequence, which may be quite large, + we construct it incrementally by using hash.update() to add new bytes. + """ + + def _hash_file(path: str) -> bytes: + return common_utils.hash_file(path, 'sha256').digest() + + config_hash = hashlib.sha256() + + config_hash.update(_hash_file(yaml_path)) + + yaml_config = common_utils.read_yaml(yaml_path) + file_mounts = yaml_config.get('file_mounts', {}) + # Remove the file mounts added by the newline. + if '' in file_mounts: + assert file_mounts[''] == '', file_mounts[''] + file_mounts.pop('') + + for dst, src in sorted(file_mounts.items()): + expanded_src = os.path.expanduser(src) + config_hash.update(dst.encode('utf-8') + b'\0') + + # If the file mount source is a symlink, this should be true. In that + # case we hash the contents of the symlink destination. + if os.path.isfile(expanded_src): + config_hash.update('file'.encode('utf-8')) + config_hash.update(_hash_file(expanded_src)) + + # This can also be a symlink to a directory. os.walk will treat it as a + # normal directory and list the contents of the symlink destination. + elif os.path.isdir(expanded_src): + config_hash.update('dir'.encode('utf-8')) + + # Aside from expanded_src, os.walk will list symlinks to directories + # but will not recurse into them. + for (dirpath, dirnames, filenames) in os.walk(expanded_src): + config_hash.update(dirpath.encode('utf-8') + b'\0') + + # Note: inplace sort will also affect the traversal order of + # os.walk. We need it so that the os.walk order is + # deterministic. + dirnames.sort() + # This includes symlinks to directories. os.walk will recurse + # into all the directories but not the symlinks. We don't hash + # the link destination, so if a symlink to a directory changes, + # we won't notice. + for dirname in dirnames: + config_hash.update(dirname.encode('utf-8') + b'\0') + config_hash.update(b'\0') + + filenames.sort() + # This includes symlinks to files. We could hash the symlink + # destination itself but instead just hash the destination + # contents. + for filename in filenames: + config_hash.update(filename.encode('utf-8') + b'\0') + config_hash.update( + _hash_file(os.path.join(dirpath, filename))) + config_hash.update(b'\0') + + else: + logger.debug( + f'Unexpected file_mount that is not a file or dir: {src}') + + config_hash.update(b'\0\0') + + return config_hash.hexdigest() + + def get_docker_user(ip: str, cluster_config_file: str) -> str: """Find docker container username.""" ssh_credentials = ssh_credential_from_yaml(cluster_config_file) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index d73b7f54b8d..b7ef8850132 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -1155,6 +1155,7 @@ def __init__( prev_cluster_status: Optional[status_lib.ClusterStatus], prev_handle: Optional['CloudVmRayResourceHandle'], prev_cluster_ever_up: bool, + prev_config_hash: Optional[str], ) -> None: assert cluster_name is not None, 'cluster_name must be specified.' self.cluster_name = cluster_name @@ -1163,6 +1164,7 @@ def __init__( self.prev_cluster_status = prev_cluster_status self.prev_handle = prev_handle self.prev_cluster_ever_up = prev_cluster_ever_up + self.prev_config_hash = prev_config_hash def __init__(self, log_dir: str, @@ -1324,8 +1326,21 @@ def _retry_zones( prev_cluster_status: Optional[status_lib.ClusterStatus], prev_handle: Optional['CloudVmRayResourceHandle'], prev_cluster_ever_up: bool, + skip_if_config_hash_matches: Optional[str], ) -> Dict[str, Any]: - """The provision retry loop.""" + """The provision retry loop. + + Returns a config_dict with the following fields: + All fields from backend_utils.write_cluster_config(). See its + docstring. + - 'provisioning_skipped': True if provisioning was short-circuited + by skip_if_config_hash_matches, False otherwise. + - 'handle': The provisioned cluster handle. + - 'provision_record': (Only if using the new skypilot provisioner) The + record returned by provisioner.bulk_provision(). + - 'resources_vars': (Only if using the new skypilot provisioner) The + resources variables given by make_deploy_resources_variables(). + """ # Get log_path name log_path = os.path.join(self.log_dir, 'provision.log') log_abs_path = os.path.abspath(log_path) @@ -1434,8 +1449,18 @@ def _retry_zones( raise exceptions.ResourcesUnavailableError( f'Failed to provision on cloud {to_provision.cloud} due to ' f'invalid cloud config: {common_utils.format_exception(e)}') + + if ('config_hash' in config_dict and + skip_if_config_hash_matches == config_dict['config_hash']): + logger.debug('Skipping provisioning of cluster with matching ' + 'config hash.') + config_dict['provisioning_skipped'] = True + return config_dict + config_dict['provisioning_skipped'] = False + if dryrun: return config_dict + cluster_config_file = config_dict['ray'] launched_resources = to_provision.copy(region=region.name) @@ -1947,8 +1972,13 @@ def provision_with_retries( to_provision_config: ToProvisionConfig, dryrun: bool, stream_logs: bool, + skip_unnecessary_provisioning: bool, ) -> Dict[str, Any]: - """Provision with retries for all launchable resources.""" + """Provision with retries for all launchable resources. + + Returns the config_dict from _retry_zones() - see its docstring for + details. + """ cluster_name = to_provision_config.cluster_name to_provision = to_provision_config.resources num_nodes = to_provision_config.num_nodes @@ -1957,6 +1987,8 @@ def provision_with_retries( prev_cluster_ever_up = to_provision_config.prev_cluster_ever_up launchable_retries_disabled = (self._dag is None or self._optimize_target is None) + skip_if_config_hash_matches = (to_provision_config.prev_config_hash if + skip_unnecessary_provisioning else None) failover_history: List[Exception] = list() @@ -1996,7 +2028,8 @@ def provision_with_retries( cloud_user_identity=cloud_user, prev_cluster_status=prev_cluster_status, prev_handle=prev_handle, - prev_cluster_ever_up=prev_cluster_ever_up) + prev_cluster_ever_up=prev_cluster_ever_up, + skip_if_config_hash_matches=skip_if_config_hash_matches) if dryrun: return config_dict except (exceptions.InvalidClusterNameError, @@ -2697,14 +2730,21 @@ def check_resources_fit_cluster( return valid_resource def _provision( - self, - task: task_lib.Task, - to_provision: Optional[resources_lib.Resources], - dryrun: bool, - stream_logs: bool, - cluster_name: str, - retry_until_up: bool = False) -> Optional[CloudVmRayResourceHandle]: - """Provisions using 'ray up'. + self, + task: task_lib.Task, + to_provision: Optional[resources_lib.Resources], + dryrun: bool, + stream_logs: bool, + cluster_name: str, + retry_until_up: bool = False, + skip_unnecessary_provisioning: bool = False, + ) -> Optional[CloudVmRayResourceHandle]: + """Provisions the cluster, or re-provisions an existing cluster. + + Use the SKYPILOT provisioner if it's supported by the cloud, otherwise + use 'ray up'. + + See also docstring for Backend.provision(). Raises: exceptions.ClusterOwnerIdentityMismatchError: if the cluster @@ -2789,7 +2829,8 @@ def _provision( rich_utils.force_update_status( ux_utils.spinner_message('Launching', log_path)) config_dict = retry_provisioner.provision_with_retries( - task, to_provision_config, dryrun, stream_logs) + task, to_provision_config, dryrun, stream_logs, + skip_unnecessary_provisioning) break except exceptions.ResourcesUnavailableError as e: # Do not remove the stopped cluster from the global state @@ -2839,11 +2880,23 @@ def _provision( record = global_user_state.get_cluster_from_name(cluster_name) return record['handle'] if record is not None else None + if config_dict['provisioning_skipped']: + # Skip further provisioning. + # In this case, we won't have certain fields in the config_dict + # ('handle', 'provision_record', 'resources_vars') + # We need to return the handle - but it should be the existing + # handle for the cluster. + record = global_user_state.get_cluster_from_name(cluster_name) + assert record is not None and record['handle'] is not None, ( + cluster_name, record) + return record['handle'] + if 'provision_record' in config_dict: # New provisioner is used here. handle = config_dict['handle'] provision_record = config_dict['provision_record'] resources_vars = config_dict['resources_vars'] + config_hash = config_dict.get('config_hash', None) # Setup SkyPilot runtime after the cluster is provisioned # 1. Wait for SSH to be ready. @@ -2878,7 +2931,7 @@ def _provision( self._update_after_cluster_provisioned( handle, to_provision_config.prev_handle, task, prev_cluster_status, handle.external_ips(), - handle.external_ssh_ports(), lock_path) + handle.external_ssh_ports(), lock_path, config_hash) return handle cluster_config_file = config_dict['ray'] @@ -2950,7 +3003,8 @@ def _get_zone(runner): self._update_after_cluster_provisioned( handle, to_provision_config.prev_handle, task, - prev_cluster_status, ip_list, ssh_port_list, lock_path) + prev_cluster_status, ip_list, ssh_port_list, lock_path, + config_hash) return handle def _open_ports(self, handle: CloudVmRayResourceHandle) -> None: @@ -2968,8 +3022,8 @@ def _update_after_cluster_provisioned( prev_handle: Optional[CloudVmRayResourceHandle], task: task_lib.Task, prev_cluster_status: Optional[status_lib.ClusterStatus], - ip_list: List[str], ssh_port_list: List[int], - lock_path: str) -> None: + ip_list: List[str], ssh_port_list: List[int], lock_path: str, + config_hash: str) -> None: usage_lib.messages.usage.update_cluster_resources( handle.launched_nodes, handle.launched_resources) usage_lib.messages.usage.update_final_cluster_status( @@ -3029,6 +3083,7 @@ def _update_after_cluster_provisioned( handle, set(task.resources), ready=True, + config_hash=config_hash, ) usage_lib.messages.usage.update_final_cluster_status( status_lib.ClusterStatus.UP) @@ -4348,6 +4403,7 @@ def _check_existing_cluster( # cluster is terminated (through console or auto-dwon), the record will # become None and the cluster_ever_up should be considered as False. cluster_ever_up = record is not None and record['cluster_ever_up'] + prev_config_hash = record['config_hash'] if record is not None else None logger.debug(f'cluster_ever_up: {cluster_ever_up}') logger.debug(f'record: {record}') @@ -4386,7 +4442,8 @@ def _check_existing_cluster( handle.launched_nodes, prev_cluster_status=prev_cluster_status, prev_handle=handle, - prev_cluster_ever_up=cluster_ever_up) + prev_cluster_ever_up=cluster_ever_up, + prev_config_hash=prev_config_hash) usage_lib.messages.usage.set_new_cluster() # Use the task_cloud, because the cloud in `to_provision` can be changed # later during the retry. @@ -4427,7 +4484,8 @@ def _check_existing_cluster( task.num_nodes, prev_cluster_status=None, prev_handle=None, - prev_cluster_ever_up=False) + prev_cluster_ever_up=False, + prev_config_hash=prev_config_hash) def _execute_file_mounts(self, handle: CloudVmRayResourceHandle, file_mounts: Optional[Dict[Path, Path]]): diff --git a/sky/backends/local_docker_backend.py b/sky/backends/local_docker_backend.py index 2cc3f3347a5..c10e51e7975 100644 --- a/sky/backends/local_docker_backend.py +++ b/sky/backends/local_docker_backend.py @@ -131,13 +131,14 @@ def check_resources_fit_cluster(self, handle: 'LocalDockerResourceHandle', pass def _provision( - self, - task: 'task_lib.Task', - to_provision: Optional['resources.Resources'], - dryrun: bool, - stream_logs: bool, - cluster_name: str, - retry_until_up: bool = False + self, + task: 'task_lib.Task', + to_provision: Optional['resources.Resources'], + dryrun: bool, + stream_logs: bool, + cluster_name: str, + retry_until_up: bool = False, + skip_unnecessary_provisioning: bool = False, ) -> Optional[LocalDockerResourceHandle]: """Builds docker image for the task and returns cluster name as handle. @@ -153,6 +154,9 @@ def _provision( logger.warning( f'Retrying until up is not supported in backend: {self.NAME}. ' 'Ignored the flag.') + if skip_unnecessary_provisioning: + logger.warning(f'skip_unnecessary_provisioning is not supported in ' + f'backend: {self.NAME}. Ignored the flag.') if stream_logs: logger.info( 'Streaming build logs is not supported in LocalDockerBackend. ' diff --git a/sky/clouds/service_catalog/common.py b/sky/clouds/service_catalog/common.py index 387d695d637..67c6e09b27e 100644 --- a/sky/clouds/service_catalog/common.py +++ b/sky/clouds/service_catalog/common.py @@ -15,6 +15,7 @@ from sky.clouds import cloud as cloud_lib from sky.clouds import cloud_registry from sky.clouds.service_catalog import constants +from sky.utils import common_utils from sky.utils import rich_utils from sky.utils import ux_utils @@ -69,8 +70,7 @@ def is_catalog_modified(filename: str) -> bool: meta_path = os.path.join(_ABSOLUTE_VERSIONED_CATALOG_DIR, '.meta', filename) md5_filepath = meta_path + '.md5' if os.path.exists(md5_filepath): - with open(catalog_path, 'rb') as f: - file_md5 = hashlib.md5(f.read()).hexdigest() + file_md5 = common_utils.hash_file(catalog_path, 'md5').hexdigest() with open(md5_filepath, 'r', encoding='utf-8') as f: last_md5 = f.read() return file_md5 != last_md5 diff --git a/sky/execution.py b/sky/execution.py index 103dcf5ee83..7392d510b17 100644 --- a/sky/execution.py +++ b/sky/execution.py @@ -108,6 +108,7 @@ def _execute( idle_minutes_to_autostop: Optional[int] = None, no_setup: bool = False, clone_disk_from: Optional[str] = None, + skip_unnecessary_provisioning: bool = False, # Internal only: # pylint: disable=invalid-name _is_launched_by_jobs_controller: bool = False, @@ -128,8 +129,9 @@ def _execute( Note that if errors occur during provisioning/data syncing/setting up, the cluster will not be torn down for debugging purposes. stream_logs: bool; whether to stream all tasks' outputs to the client. - handle: Optional[backends.ResourceHandle]; if provided, execution will use - an existing backend cluster handle instead of provisioning a new one. + handle: Optional[backends.ResourceHandle]; if provided, execution will + attempt to use an existing backend cluster handle instead of + provisioning a new one. backend: Backend; backend to use for executing the tasks. Defaults to CloudVmRayBackend() retry_until_up: bool; whether to retry the provisioning until the cluster @@ -150,6 +152,11 @@ def _execute( idle_minutes_to_autostop: int; if provided, the cluster will be set to autostop after this many minutes of idleness. no_setup: bool; whether to skip setup commands or not when (re-)launching. + clone_disk_from: Optional[str]; if set, clone the disk from the specified + cluster. + skip_unecessary_provisioning: bool; if True, compare the calculated + cluster config to the current cluster's config. If they match, shortcut + provisioning even if we have Stage.PROVISION. Returns: job_id: Optional[int]; the job ID of the submitted job. None if the @@ -288,13 +295,18 @@ def _execute( try: if Stage.PROVISION in stages: - if handle is None: - handle = backend.provision(task, - task.best_resources, - dryrun=dryrun, - stream_logs=stream_logs, - cluster_name=cluster_name, - retry_until_up=retry_until_up) + assert handle is None or skip_unnecessary_provisioning, ( + 'Provisioning requested, but handle is already set. PROVISION ' + 'should be excluded from stages or ' + 'skip_unecessary_provisioning should be set. ') + handle = backend.provision( + task, + task.best_resources, + dryrun=dryrun, + stream_logs=stream_logs, + cluster_name=cluster_name, + retry_until_up=retry_until_up, + skip_unnecessary_provisioning=skip_unnecessary_provisioning) if handle is None: assert dryrun, ('If not dryrun, handle must be set or ' @@ -469,6 +481,7 @@ def launch( handle = None stages = None + skip_unnecessary_provisioning = False # Check if cluster exists and we are doing fast provisioning if fast and cluster_name is not None: cluster_status, maybe_handle = ( @@ -502,12 +515,16 @@ def launch( if cluster_status == status_lib.ClusterStatus.UP: handle = maybe_handle stages = [ + # Provisioning will be short-circuited if the existing + # cluster config hash matches the calculated one. + Stage.PROVISION, Stage.SYNC_WORKDIR, Stage.SYNC_FILE_MOUNTS, Stage.PRE_EXEC, Stage.EXEC, Stage.DOWN, ] + skip_unnecessary_provisioning = True return _execute( entrypoint=entrypoint, @@ -525,6 +542,7 @@ def launch( idle_minutes_to_autostop=idle_minutes_to_autostop, no_setup=no_setup, clone_disk_from=clone_disk_from, + skip_unnecessary_provisioning=skip_unnecessary_provisioning, _is_launched_by_jobs_controller=_is_launched_by_jobs_controller, _is_launched_by_sky_serve_controller= _is_launched_by_sky_serve_controller, diff --git a/sky/global_user_state.py b/sky/global_user_state.py index e9f15df4f52..2a5cbc7eb3f 100644 --- a/sky/global_user_state.py +++ b/sky/global_user_state.py @@ -61,7 +61,8 @@ def create_table(cursor, conn): cluster_hash TEXT DEFAULT null, storage_mounts_metadata BLOB DEFAULT null, cluster_ever_up INTEGER DEFAULT 0, - status_updated_at INTEGER DEFAULT null)""") + status_updated_at INTEGER DEFAULT null, + config_hash TEXT DEFAULT null)""") # Table for Cluster History # usage_intervals: List[Tuple[int, int]] @@ -135,6 +136,9 @@ def create_table(cursor, conn): db_utils.add_column_to_table(cursor, conn, 'clusters', 'status_updated_at', 'INTEGER DEFAULT null') + db_utils.add_column_to_table(cursor, conn, 'clusters', 'config_hash', + 'TEXT DEFAULT null') + conn.commit() @@ -145,7 +149,8 @@ def add_or_update_cluster(cluster_name: str, cluster_handle: 'backends.ResourceHandle', requested_resources: Optional[Set[Any]], ready: bool, - is_launch: bool = True): + is_launch: bool = True, + config_hash: Optional[str] = None): """Adds or updates cluster_name -> cluster_handle mapping. Args: @@ -197,7 +202,8 @@ def add_or_update_cluster(cluster_name: str, # specified. '(name, launched_at, handle, last_use, status, ' 'autostop, to_down, metadata, owner, cluster_hash, ' - 'storage_mounts_metadata, cluster_ever_up, status_updated_at) ' + 'storage_mounts_metadata, cluster_ever_up, status_updated_at, ' + 'config_hash) ' 'VALUES (' # name '?, ' @@ -236,7 +242,9 @@ def add_or_update_cluster(cluster_name: str, # cluster_ever_up '((SELECT cluster_ever_up FROM clusters WHERE name=?) OR ?),' # status_updated_at - '?' + '?,' + # config_hash + 'COALESCE(?, (SELECT config_hash FROM clusters WHERE name=?))' ')', ( # name @@ -270,6 +278,9 @@ def add_or_update_cluster(cluster_name: str, int(ready), # status_updated_at status_updated_at, + # config_hash + config_hash, + cluster_name, )) launched_nodes = getattr(cluster_handle, 'launched_nodes', None) @@ -585,15 +596,15 @@ def get_cluster_from_name( rows = _DB.cursor.execute( 'SELECT name, launched_at, handle, last_use, status, autostop, ' 'metadata, to_down, owner, cluster_hash, storage_mounts_metadata, ' - 'cluster_ever_up, status_updated_at FROM clusters WHERE name=(?)', - (cluster_name,)).fetchall() + 'cluster_ever_up, status_updated_at, config_hash ' + 'FROM clusters WHERE name=(?)', (cluster_name,)).fetchall() for row in rows: # Explicitly specify the number of fields to unpack, so that # we can add new fields to the database in the future without # breaking the previous code. (name, launched_at, handle, last_use, status, autostop, metadata, to_down, owner, cluster_hash, storage_mounts_metadata, cluster_ever_up, - status_updated_at) = row[:13] + status_updated_at, config_hash) = row[:14] # TODO: use namedtuple instead of dict record = { 'name': name, @@ -610,6 +621,7 @@ def get_cluster_from_name( _load_storage_mounts_metadata(storage_mounts_metadata), 'cluster_ever_up': bool(cluster_ever_up), 'status_updated_at': status_updated_at, + 'config_hash': config_hash, } return record return None @@ -619,13 +631,13 @@ def get_clusters() -> List[Dict[str, Any]]: rows = _DB.cursor.execute( 'select name, launched_at, handle, last_use, status, autostop, ' 'metadata, to_down, owner, cluster_hash, storage_mounts_metadata, ' - 'cluster_ever_up, status_updated_at from clusters ' - 'order by launched_at desc').fetchall() + 'cluster_ever_up, status_updated_at, config_hash ' + 'from clusters order by launched_at desc').fetchall() records = [] for row in rows: (name, launched_at, handle, last_use, status, autostop, metadata, to_down, owner, cluster_hash, storage_mounts_metadata, cluster_ever_up, - status_updated_at) = row[:13] + status_updated_at, config_hash) = row[:14] # TODO: use namedtuple instead of dict record = { 'name': name, @@ -642,6 +654,7 @@ def get_clusters() -> List[Dict[str, Any]]: _load_storage_mounts_metadata(storage_mounts_metadata), 'cluster_ever_up': bool(cluster_ever_up), 'status_updated_at': status_updated_at, + 'config_hash': config_hash, } records.append(record) diff --git a/sky/utils/common_utils.py b/sky/utils/common_utils.py index 5fce435b770..3fcdd24e505 100644 --- a/sky/utils/common_utils.py +++ b/sky/utils/common_utils.py @@ -697,3 +697,22 @@ def truncate_long_string(s: str, max_length: int = 35) -> str: if len(prefix) < max_length: prefix += s[len(prefix):max_length] return prefix + '...' + + +def hash_file(path: str, hash_alg: str) -> 'hashlib._Hash': + # In python 3.11, hashlib.file_digest is available, but for <3.11 we have to + # do it manually. + # This implementation is simplified from the implementation in CPython. + # TODO(cooperc): Use hashlib.file_digest once we move to 3.11+. + # Beware of f.read() as some files may be larger than memory. + with open(path, 'rb') as f: + file_hash = hashlib.new(hash_alg) + buf = bytearray(2**18) + view = memoryview(buf) + while True: + size = f.readinto(buf) + if size == 0: + # EOF + break + file_hash.update(view[:size]) + return file_hash diff --git a/tests/unit_tests/test_backend_utils.py b/tests/unit_tests/test_backend_utils.py index 5da4410abb9..c9aa21567c2 100644 --- a/tests/unit_tests/test_backend_utils.py +++ b/tests/unit_tests/test_backend_utils.py @@ -22,6 +22,8 @@ return_value='~/.aws/credentials') @mock.patch('sky.backends.backend_utils._get_yaml_path_from_cluster_name', return_value='/tmp/fake/path') +@mock.patch('sky.backends.backend_utils._deterministic_cluster_yaml_hash', + return_value='fake-hash') @mock.patch('sky.utils.common_utils.fill_template') def test_write_cluster_config_w_remote_identity(mock_fill_template, *mocks) -> None: From 6e5083293f0d9a9d069d51274c57f0e59e47e5ce Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Thu, 5 Dec 2024 06:59:34 +0530 Subject: [PATCH 10/10] [k8s] Add resource limits only if they exist (#4440) Add limits only if they exist --- sky/templates/kubernetes-ray.yml.j2 | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index 535e6f0b1ae..2087d9c6e9d 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -560,6 +560,7 @@ available_node_types: # https://gitlab.com/arm-research/smarter/smarter-device-manager smarter-devices/fuse: "1" {% endif %} + {% if k8s_resource_key is not none or k8s_fuse_device_required %} limits: # Limits need to be defined for GPU/TPU requests {% if k8s_resource_key is not none %} @@ -568,7 +569,8 @@ available_node_types: {% if k8s_fuse_device_required %} smarter-devices/fuse: "1" {% endif %} - + {% endif %} + setup_commands: # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) # Create ~/.ssh/config file in case the file does not exist in the image.