diff --git a/README.md b/README.md index 61085db93c6..1a9ba0e7780 100644 --- a/README.md +++ b/README.md @@ -38,30 +38,32 @@ This repository is a fork of the [original Skypilot](https://github.com/skypilot ---- :fire: *News* :fire: -- [Sep, 2024] Point, Launch and Serve **Llama 3.2** on Kubernetes or Any Cloud: [**example**](./llm/llama-3_2/) -- [Sep, 2024] Run and deploy [**Pixtral**](./llm/pixtral), the first open-source multimodal model from Mistral AI. -- [Jul, 2024] [**Finetune**](./llm/llama-3_1-finetuning/) and [**serve**](./llm/llama-3_1/) **Llama 3.1** on your infra -- [Jun, 2024] Reproduce **GPT** with [llm.c](https://github.com/karpathy/llm.c/discussions/481) on any cloud: [**guide**](./llm/gpt-2/) -- [Apr, 2024] Serve **Qwen-110B** on your infra: [**example**](./llm/qwen/) -- [Apr, 2024] Using **Ollama** to deploy quantized LLMs on CPUs and GPUs: [**example**](./llm/ollama/) -- [Feb, 2024] Deploying and scaling **Gemma** with SkyServe: [**example**](./llm/gemma/) -- [Feb, 2024] Serving **Code Llama 70B** with vLLM and SkyServe: [**example**](./llm/codellama/) -- [Dec, 2023] **Mixtral 8x7B**, a high quality sparse mixture-of-experts model, was released by Mistral AI! Deploy via SkyPilot on any cloud: [**example**](./llm/mixtral/) -- [Nov, 2023] Using **Axolotl** to finetune Mistral 7B on the cloud (on-demand and spot): [**example**](./llm/axolotl/) +- [Oct 2024] :tada: **SkyPilot crossed 1M+ downloads** :tada:: Thank you to our community! [**Twitter/X**](https://x.com/skypilot_org/status/1844770841718067638) +- [Sep 2024] Point, Launch and Serve **Llama 3.2** on Kubernetes or Any Cloud: [**example**](./llm/llama-3_2/) +- [Sep 2024] Run and deploy [**Pixtral**](./llm/pixtral), the first open-source multimodal model from Mistral AI. +- [Jun 2024] Reproduce **GPT** with [llm.c](https://github.com/karpathy/llm.c/discussions/481) on any cloud: [**guide**](./llm/gpt-2/) +- [Apr 2024] Serve [**Qwen-110B**](https://qwenlm.github.io/blog/qwen1.5-110b/) on your infra: [**example**](./llm/qwen/) +- [Apr 2024] Using [**Ollama**](https://github.com/ollama/ollama) to deploy quantized LLMs on CPUs and GPUs: [**example**](./llm/ollama/) +- [Feb 2024] Deploying and scaling [**Gemma**](https://blog.google/technology/developers/gemma-open-models/) with SkyServe: [**example**](./llm/gemma/) +- [Feb 2024] Serving [**Code Llama 70B**](https://ai.meta.com/blog/code-llama-large-language-model-coding/) with vLLM and SkyServe: [**example**](./llm/codellama/) +- [Dec 2023] [**Mixtral 8x7B**](https://mistral.ai/news/mixtral-of-experts/), a high quality sparse mixture-of-experts model, was released by Mistral AI! Deploy via SkyPilot on any cloud: [**example**](./llm/mixtral/) +- [Nov 2023] Using [**Axolotl**](https://github.com/OpenAccess-AI-Collective/axolotl) to finetune Mistral 7B on the cloud (on-demand and spot): [**example**](./llm/axolotl/) + +**LLM Finetuning Cookbooks**: Finetuning Llama 2 / Llama 3.1 in your own cloud environment, privately: Llama 2 [**example**](./llm/vicuna-llama-2/) and [**blog**](https://blog.skypilot.co/finetuning-llama2-operational-guide/); Llama 3.1 [**example**](./llm/llama-3_1-finetuning/) and [**blog**](https://blog.skypilot.co/finetune-llama-3_1-on-your-infra/)
Archived -- [Apr, 2024] Serve and finetune [**Llama 3**](https://skypilot.readthedocs.io/en/latest/gallery/llms/llama-3.html) on any cloud or Kubernetes: [**example**](./llm/llama-3/) -- [Mar, 2024] Serve and deploy [**Databricks DBRX**](https://www.databricks.com/blog/introducing-dbrx-new-state-art-open-llm) on your infra: [**example**](./llm/dbrx/) -- [Feb, 2024] Speed up your LLM deployments with [**SGLang**](https://github.com/sgl-project/sglang) for 5x throughput on SkyServe: [**example**](./llm/sglang/) -- [Dec, 2023] Using [**LoRAX**](https://github.com/predibase/lorax) to serve 1000s of finetuned LLMs on a single instance in the cloud: [**example**](./llm/lorax/) -- [Sep, 2023] [**Mistral 7B**](https://mistral.ai/news/announcing-mistral-7b/), a high-quality open LLM, was released! Deploy via SkyPilot on any cloud: [**Mistral docs**](https://docs.mistral.ai/self-deployment/skypilot) -- [Sep, 2023] Case study: [**Covariant**](https://covariant.ai/) transformed AI development on the cloud using SkyPilot, delivering models 4x faster cost-effectively: [**read the case study**](https://blog.skypilot.co/covariant/) -- [Aug, 2023] **Finetuning Cookbook**: Finetuning Llama 2 in your own cloud environment, privately: [**example**](./llm/vicuna-llama-2/), [**blog post**](https://blog.skypilot.co/finetuning-llama2-operational-guide/) -- [July, 2023] Self-Hosted **Llama-2 Chatbot** on Any Cloud: [**example**](./llm/llama-2/) -- [June, 2023] Serving LLM 24x Faster On the Cloud [**with vLLM**](https://vllm.ai/) and SkyPilot: [**example**](./llm/vllm/), [**blog post**](https://blog.skypilot.co/serving-llm-24x-faster-on-the-cloud-with-vllm-and-skypilot/) -- [April, 2023] [SkyPilot YAMLs](./llm/vicuna/) for finetuning & serving the [Vicuna LLM](https://lmsys.org/blog/2023-03-30-vicuna/) with a single command! +- [Jul 2024] [**Finetune**](./llm/llama-3_1-finetuning/) and [**serve**](./llm/llama-3_1/) **Llama 3.1** on your infra +- [Apr 2024] Serve and finetune [**Llama 3**](https://skypilot.readthedocs.io/en/latest/gallery/llms/llama-3.html) on any cloud or Kubernetes: [**example**](./llm/llama-3/) +- [Mar 2024] Serve and deploy [**Databricks DBRX**](https://www.databricks.com/blog/introducing-dbrx-new-state-art-open-llm) on your infra: [**example**](./llm/dbrx/) +- [Feb 2024] Speed up your LLM deployments with [**SGLang**](https://github.com/sgl-project/sglang) for 5x throughput on SkyServe: [**example**](./llm/sglang/) +- [Dec 2023] Using [**LoRAX**](https://github.com/predibase/lorax) to serve 1000s of finetuned LLMs on a single instance in the cloud: [**example**](./llm/lorax/) +- [Sep 2023] [**Mistral 7B**](https://mistral.ai/news/announcing-mistral-7b/), a high-quality open LLM, was released! Deploy via SkyPilot on any cloud: [**Mistral docs**](https://docs.mistral.ai/self-deployment/skypilot) +- [Sep 2023] Case study: [**Covariant**](https://covariant.ai/) transformed AI development on the cloud using SkyPilot, delivering models 4x faster cost-effectively: [**read the case study**](https://blog.skypilot.co/covariant/) +- [Jul 2023] Self-Hosted **Llama-2 Chatbot** on Any Cloud: [**example**](./llm/llama-2/) +- [Jun 2023] Serving LLM 24x Faster On the Cloud [**with vLLM**](https://vllm.ai/) and SkyPilot: [**example**](./llm/vllm/), [**blog post**](https://blog.skypilot.co/serving-llm-24x-faster-on-the-cloud-with-vllm-and-skypilot/) +- [Apr 2023] [SkyPilot YAMLs](./llm/vicuna/) for finetuning & serving the [Vicuna LLM](https://lmsys.org/blog/2023-03-30-vicuna/) with a single command!
diff --git a/docs/source/examples/syncing-code-artifacts.rst b/docs/source/examples/syncing-code-artifacts.rst index ded8d03f739..1b05c68b84f 100644 --- a/docs/source/examples/syncing-code-artifacts.rst +++ b/docs/source/examples/syncing-code-artifacts.rst @@ -46,31 +46,7 @@ VMs. The task is invoked under that working directory (so that it can call scripts, access checkpoints, etc.). .. note:: - - **Exclude files from syncing** - - For large, multi-gigabyte workdirs, uploading may be slow because they - are synced to the remote VM(s). To exclude large files in - your workdir from being uploaded, add them to a :code:`.skyignore` file - under your workdir. :code:`.skyignore` follows RSYNC filter rules. - - Example :code:`.skyignore` file: - - .. code-block:: - - # Files that match pattern under ONLY CURRENT directory - /hello.py - /*.txt - /dir - - # Files that match pattern under ALL directories - *.txt - hello.py - - # Files that match pattern under a directory ./dir/ - /dir/*.txt - - Do NOT use ``.`` to indicate local directory (e.g. ``./hello.py``). + To exclude large files from being uploaded, see :ref:`exclude-uploading-files`. .. note:: @@ -140,6 +116,33 @@ file_mount may be slow because they are processed by ``rsync``. Use :ref:`SkyPilot bucket mounting ` to efficiently handle large files. +.. _exclude-uploading-files: + +Exclude uploading files +-------------------------------------- +By default, SkyPilot uses your existing :code:`.gitignore` and :code:`.git/info/exclude` to exclude files from syncing. + +Alternatively, you can use :code:`.skyignore` if you want to separate SkyPilot's syncing behavior from Git's. +If you use a :code:`.skyignore` file, SkyPilot will only exclude files based on that file without using the default Git files. + +Any :code:`.skyignore` file under either your workdir or source paths of file_mounts is respected. + +:code:`.skyignore` follows RSYNC filter rules, e.g. + +.. code-block:: + + # Files that match pattern under CURRENT directory + /file.txt + /dir + /*.jar + /dir/*.jar + + # Files that match pattern under ALL directories + *.jar + file.txt + +Do _not_ use ``.`` to indicate local directory (e.g., instead of ``./file``, write ``/file``). + .. _downloading-files-and-artifacts: Downloading files and artifacts diff --git a/docs/source/reference/config.rst b/docs/source/reference/config.rst index 5c52e7487b9..b8255b46402 100644 --- a/docs/source/reference/config.rst +++ b/docs/source/reference/config.rst @@ -419,6 +419,15 @@ Available fields and semantics: # Default: 'LOCAL_CREDENTIALS'. remote_identity: LOCAL_CREDENTIALS + # Enable gVNIC (optional). + # + # Set to true to use gVNIC on GCP instances. gVNIC offers higher performance + # for multi-node clusters, but costs more. + # Reference: https://cloud.google.com/compute/docs/networking/using-gvnic + # + # Default: false. + enable_gvnic: false + # Advanced Azure configurations (optional). # Apply to all new instances but not existing ones. azure: diff --git a/docs/source/reference/kubernetes/kubernetes-deployment.rst b/docs/source/reference/kubernetes/kubernetes-deployment.rst index d7e7127f6e7..e9489e9149e 100644 --- a/docs/source/reference/kubernetes/kubernetes-deployment.rst +++ b/docs/source/reference/kubernetes/kubernetes-deployment.rst @@ -114,9 +114,9 @@ Deploying on Google Cloud GKE # Example: # gcloud container clusters get-credentials testcluster --region us-central1-c -3. [If using GPUs] If your GKE nodes have GPUs, you may need to to - `manually install `_ - nvidia drivers. You can do so by deploying the daemonset +3. [If using GPUs] For GKE versions newer than 1.30.1-gke.115600, NVIDIA drivers are pre-installed and no additional setup is required. If you are using an older GKE version, you may need to + `manually install `_ + NVIDIA drivers for GPU support. You can do so by deploying the daemonset depending on the GPU and OS on your nodes: .. code-block:: console @@ -133,7 +133,8 @@ Deploying on Google Cloud GKE # For Ubuntu based nodes with L4 GPUs: $ kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/ubuntu/daemonset-preloaded-R525.yaml - To verify if GPU drivers are set up, run ``kubectl describe nodes`` and verify that ``nvidia.com/gpu`` is listed under the ``Capacity`` section. + .. tip:: + To verify if GPU drivers are set up, run ``kubectl describe nodes`` and verify that ``nvidia.com/gpu`` resource is listed under the ``Capacity`` section. 4. Verify your kubernetes cluster is correctly set up for SkyPilot by running :code:`sky check`: diff --git a/docs/source/reference/kubernetes/kubernetes-getting-started.rst b/docs/source/reference/kubernetes/kubernetes-getting-started.rst index 4f87c8a6ee7..d7313fba3e2 100644 --- a/docs/source/reference/kubernetes/kubernetes-getting-started.rst +++ b/docs/source/reference/kubernetes/kubernetes-getting-started.rst @@ -119,6 +119,57 @@ Once your cluster administrator has :ref:`setup a Kubernetes cluster `_ for easily viewing and managing -SkyPilot tasks running on your cluster. +Below, we provide tips on how to monitor SkyPilot resources on your Kubernetes cluster. + +.. _kubernetes-observability-skystatus: + +List SkyPilot resources across all users +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +We provide a convenience command, :code:`sky status --k8s`, to view the status of all SkyPilot resources in the cluster. + +Unlike :code:`sky status` which lists only the SkyPilot resources launched by the current user, +:code:`sky status --k8s` lists all SkyPilot resources in the cluster across all users. + +.. code-block:: console + + $ sky status --k8s + Kubernetes cluster state (context: mycluster) + SkyPilot clusters + USER NAME LAUNCHED RESOURCES STATUS + alice infer-svc-1 23 hrs ago 1x Kubernetes(cpus=1, mem=1, {'L4': 1}) UP + alice sky-jobs-controller-80b50983 2 days ago 1x Kubernetes(cpus=4, mem=4) UP + alice sky-serve-controller-80b50983 23 hrs ago 1x Kubernetes(cpus=4, mem=4) UP + bob dev 1 day ago 1x Kubernetes(cpus=2, mem=8, {'H100': 1}) UP + bob multinode-dev 1 day ago 2x Kubernetes(cpus=2, mem=2) UP + bob sky-jobs-controller-2ea485ea 2 days ago 1x Kubernetes(cpus=4, mem=4) UP + + Managed jobs + In progress tasks: 1 STARTING + USER ID TASK NAME RESOURCES SUBMITTED TOT. DURATION JOB DURATION #RECOVERIES STATUS + alice 1 - eval 1x[CPU:1+] 2 days ago 49s 8s 0 SUCCEEDED + bob 4 - pretrain 1x[H100:4] 1 day ago 1h 1m 11s 1h 14s 0 SUCCEEDED + bob 3 - bigjob 1x[CPU:16] 1 day ago 1d 21h 11m 4s - 0 STARTING + bob 2 - failjob 1x[CPU:1+] 1 day ago 54s 9s 0 FAILED + bob 1 - shortjob 1x[CPU:1+] 2 days ago 1h 1m 19s 1h 16s 0 SUCCEEDED + + +.. _kubernetes-observability-dashboard: + +Kubernetes Dashboard +^^^^^^^^^^^^^^^^^^^^ +You can deploy tools such as the `Kubernetes dashboard `_ to easily view and manage +SkyPilot resources on your cluster. .. image:: ../../images/screenshots/kubernetes/kubernetes-dashboard.png :width: 80% diff --git a/docs/source/reference/yaml-spec.rst b/docs/source/reference/yaml-spec.rst index c5339bcc184..f874b4d37b4 100644 --- a/docs/source/reference/yaml-spec.rst +++ b/docs/source/reference/yaml-spec.rst @@ -22,8 +22,8 @@ Available fields: # If a relative path is used, it's evaluated relative to the location from # which `sky` is called. # - # To exclude files from syncing, add them to a .skyignore file under your working directory. - # Details: https://skypilot.readthedocs.io/en/latest/examples/syncing-code-artifacts.html#uploading-code-and-project-files + # To exclude files from syncing, see + # https://skypilot.readthedocs.io/en/latest/examples/syncing-code-artifacts.html#exclude-uploading-files workdir: ~/my-task-code # Number of nodes (optional; defaults to 1) to launch including the head node. diff --git a/examples/deepspeed-multinode/sky.yaml b/examples/deepspeed-multinode/sky.yaml index 37d7445a2a1..07bd3746894 100644 --- a/examples/deepspeed-multinode/sky.yaml +++ b/examples/deepspeed-multinode/sky.yaml @@ -2,10 +2,16 @@ # # This takes care constructing a "hostfile" to pass to DeepSpeed. # +# If running on Kubernetes, use the nvidia/cuda:12.1.1-devel-ubuntu20.04 image +# because DeepSpeed requires nvcc. +# # Usage: # # $ sky launch sky.yaml -r --down -c ds # +# If running on Kubernetes: +# $ sky launch sky.yaml -r --down -c ds --cloud kubernetes --image nvidia/cuda:12.1.1-devel-ubuntu20.04 +# # # Optional: After the job starts running, you can log into the two nodes and # # check gpustat: # $ ssh ds @@ -18,6 +24,7 @@ resources: # accelerators: A100-80GB:1 # Azure, GCP, SCP # accelerators: A10G:1 # AWS. Will OOM for (1) single_node/run_1.3b_lora.sh (2) multi_node/run_66b.sh. # accelerators: T4:1 # AWS, Azure, GCP. Will OOM for (1) single_node/run_1.3b_lora.sh (2) multi_node/run_66b.sh. + # image_id: docker:nvidia/cuda:12.1.1-devel-ubuntu20.04 # Use this image if running on Kubernetes num_nodes: 2 @@ -28,6 +35,13 @@ envs: DEEPSPEED_ENVS: "MY_VAR_1,MY_VAR_2,SKYPILOT_NODE_RANK" setup: | + if ! command -v git &> /dev/null + then + echo "git is not installed. Installing git..." + sudo apt-get update + sudo apt-get install -y git + fi + git clone https://github.com/microsoft/DeepSpeedExamples.git || true cd DeepSpeedExamples git checkout d7c42b4f34df91035e7ed3e0c51500bb53d0bc71 @@ -39,16 +53,19 @@ setup: | conda create -n deepspeed python=3.8 -y conda activate deepspeed - pip install deepspeed + pip install deepspeed==0.14.4 cd applications/DeepSpeed-Chat pip install -r requirements.txt + + pip install transformers==4.44.0 # Required by DeepSpeed in multi-node settings. # # NOTE(skypilot): DeepSpeed uses `pdsh` to log into each node and calls # `ninja --version`; so it has to be installed system-wide rather than in # the above 'deepspeed' conda env. + sudo apt-get update sudo apt-get -y install pdsh ninja-build fi diff --git a/sky/authentication.py b/sky/authentication.py index eb51aad02ad..41a7d02dfb7 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -43,9 +43,9 @@ from sky.adaptors import ibm from sky.adaptors import kubernetes from sky.adaptors import runpod -from sky.clouds.utils import lambda_utils from sky.provision.fluidstack import fluidstack_utils from sky.provision.kubernetes import utils as kubernetes_utils +from sky.provision.lambda_cloud import lambda_utils from sky.utils import common_utils from sky.utils import kubernetes_enums from sky.utils import subprocess_utils diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 1f213f5c614..caa6c9292d5 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -56,7 +56,7 @@ from sky.utils import ux_utils if typing.TYPE_CHECKING: - from sky import resources + from sky import resources as resources_lib from sky import task as task_lib from sky.backends import cloud_vm_ray_backend from sky.backends import local_docker_backend @@ -751,7 +751,7 @@ def _restore_block(new_block: Dict[str, Any], old_block: Dict[str, Any]): # TODO: too many things happening here - leaky abstraction. Refactor. @timeline.event def write_cluster_config( - to_provision: 'resources.Resources', + to_provision: 'resources_lib.Resources', num_nodes: int, cluster_config_template: str, cluster_name: str, @@ -2772,6 +2772,10 @@ def get_endpoints(cluster: str, cluster_records = get_clusters(include_controller=True, refresh=False, cluster_names=[cluster]) + if not cluster_records: + with ux_utils.print_exception_no_traceback(): + raise exceptions.ClusterNotUpError( + f'Cluster {cluster!r} not found.', cluster_status=None) assert len(cluster_records) == 1, cluster_records cluster_record = cluster_records[0] if (not skip_status_check and diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index d0ba3b2bc8f..b4ab1644401 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2849,9 +2849,9 @@ def _provision( time.sleep(gap_seconds) continue logger.error( - f'{colorama.Fore.RED}⨯{colorama.Style.RESET_ALL} ' - 'Failed to provision resources. ' - f'{ux_utils.log_path_hint(log_path)}') + ux_utils.error_message( + 'Failed to provision resources. ' + f'{ux_utils.log_path_hint(log_path)}')) error_message += ( '\nTo keep retrying until the cluster is up, use ' 'the `--retry-until-up` flag.') diff --git a/sky/cli.py b/sky/cli.py index 87d35f58d1c..fb5a38bba7b 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -1464,54 +1464,8 @@ def _status_kubernetes(show_all: bool): Args: show_all (bool): Show all job information (e.g., start time, failures). """ - context = kubernetes_utils.get_current_kube_config_context_name() - try: - pods = kubernetes_utils.get_skypilot_pods(context) - except exceptions.ResourcesUnavailableError as e: - with ux_utils.print_exception_no_traceback(): - raise ValueError('Failed to get SkyPilot pods from ' - f'Kubernetes: {str(e)}') from e - all_clusters, jobs_controllers, serve_controllers = ( - status_utils.process_skypilot_pods(pods, context)) - all_jobs = [] - with rich_utils.safe_status( - '[bold cyan]Checking in-progress managed jobs[/]') as spinner: - for i, (_, job_controller_info) in enumerate(jobs_controllers.items()): - user = job_controller_info['user'] - pod = job_controller_info['pods'][0] - status_message = ('[bold cyan]Checking managed jobs controller') - if len(jobs_controllers) > 1: - status_message += f's ({i+1}/{len(jobs_controllers)})' - spinner.update(f'{status_message}[/]') - try: - job_list = managed_jobs.queue_from_kubernetes_pod( - pod.metadata.name) - except RuntimeError as e: - logger.warning('Failed to get managed jobs from controller ' - f'{pod.metadata.name}: {str(e)}') - job_list = [] - # Add user field to jobs - for job in job_list: - job['user'] = user - all_jobs.extend(job_list) - # Reconcile cluster state between managed jobs and clusters: - # To maintain a clear separation between regular SkyPilot clusters - # and those from managed jobs, we need to exclude the latter from - # the main cluster list. - # We do this by reconstructing managed job cluster names from each - # job's name and ID. We then use this set to filter out managed - # clusters from the main cluster list. This is necessary because there - # are no identifiers distinguishing clusters from managed jobs from - # regular clusters. - managed_job_cluster_names = set() - for job in all_jobs: - # Managed job cluster name is - - managed_cluster_name = f'{job["job_name"]}-{job["job_id"]}' - managed_job_cluster_names.add(managed_cluster_name) - unmanaged_clusters = [ - c for c in all_clusters - if c['cluster_name'] not in managed_job_cluster_names - ] + all_clusters, unmanaged_clusters, all_jobs, context = ( + core.status_kubernetes()) click.echo(f'{colorama.Fore.CYAN}{colorama.Style.BRIGHT}' f'Kubernetes cluster state (context: {context})' f'{colorama.Style.RESET_ALL}') @@ -1523,7 +1477,7 @@ def _status_kubernetes(show_all: bool): f'{colorama.Style.RESET_ALL}') msg = managed_jobs.format_job_table(all_jobs, show_all=show_all) click.echo(msg) - if serve_controllers: + if any(['sky-serve-controller' in c.cluster_name for c in all_clusters]): # TODO: Parse serve controllers and show services separately. # Currently we show a hint that services are shown as clusters. click.echo(f'\n{colorama.Style.DIM}Hint: SkyServe replica pods are ' @@ -4426,9 +4380,14 @@ def serve_status(all: bool, endpoint: bool, service_names: List[str]): default=False, required=False, help='Skip confirmation prompt.') +@click.option('--replica-id', + default=None, + type=int, + help='Tear down a given replica') # pylint: disable=redefined-builtin -def serve_down(service_names: List[str], all: bool, purge: bool, yes: bool): - """Teardown service(s). +def serve_down(service_names: List[str], all: bool, purge: bool, yes: bool, + replica_id: Optional[int]): + """Teardown service(s) or a replica. SERVICE_NAMES is the name of the service (or glob pattern) to tear down. If both SERVICE_NAMES and ``--all`` are supplied, the latter takes precedence. @@ -4454,6 +4413,12 @@ def serve_down(service_names: List[str], all: bool, purge: bool, yes: bool): \b # Forcefully tear down a service in failed status. sky serve down failed-service --purge + \b + # Tear down a specific replica + sky serve down my-service --replica-id 1 + \b + # Forcefully tear down a specific replica, even in failed status. + sky serve down my-service --replica-id 1 --purge """ if sum([len(service_names) > 0, all]) != 1: argument_str = f'SERVICE_NAMES={",".join(service_names)}' if len( @@ -4463,22 +4428,45 @@ def serve_down(service_names: List[str], all: bool, purge: bool, yes: bool): 'Can only specify one of SERVICE_NAMES or --all. ' f'Provided {argument_str!r}.') + replica_id_is_defined = replica_id is not None + if replica_id_is_defined: + if len(service_names) != 1: + service_names_str = ', '.join(service_names) + raise click.UsageError(f'The --replica-id option can only be used ' + f'with a single service name. Got: ' + f'{service_names_str}.') + if all: + raise click.UsageError('The --replica-id option cannot be used ' + 'with the --all option.') + backend_utils.is_controller_accessible( controller=controller_utils.Controllers.SKY_SERVE_CONTROLLER, stopped_message='All services should have been terminated.', exit_if_not_accessible=True) if not yes: - quoted_service_names = [f'{name!r}' for name in service_names] - service_identity_str = f'service(s) {", ".join(quoted_service_names)}' - if all: - service_identity_str = 'all services' - click.confirm(f'Terminating {service_identity_str}. Proceed?', - default=True, - abort=True, - show_default=True) - - serve_lib.down(service_names=service_names, all=all, purge=purge) + if replica_id_is_defined: + click.confirm( + f'Terminating replica ID {replica_id} in ' + f'{service_names[0]!r}. Proceed?', + default=True, + abort=True, + show_default=True) + else: + quoted_service_names = [f'{name!r}' for name in service_names] + service_identity_str = (f'service(s) ' + f'{", ".join(quoted_service_names)}') + if all: + service_identity_str = 'all services' + click.confirm(f'Terminating {service_identity_str}. Proceed?', + default=True, + abort=True, + show_default=True) + + if replica_id_is_defined: + serve_lib.terminate_replica(service_names[0], replica_id, purge) + else: + serve_lib.down(service_names=service_names, all=all, purge=purge) @serve.command('logs', cls=_DocumentedCodeCommand) diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index 2207a977f25..a0962b17cac 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -32,6 +32,14 @@ logger = sky_logging.init_logger(__name__) +# Image ID tags +_DEFAULT_CPU_IMAGE_ID = 'skypilot:custom-cpu-ubuntu' +# For GPU-related package version, +# see sky/clouds/service_catalog/images/provisioners/cuda.sh +_DEFAULT_GPU_IMAGE_ID = 'skypilot:custom-gpu-ubuntu' +_DEFAULT_GPU_K80_IMAGE_ID = 'skypilot:k80-ubuntu-2004' +_DEFAULT_NEURON_IMAGE_ID = 'skypilot:neuron-ubuntu-2204' + # This local file (under ~/.aws/) will be uploaded to remote nodes (any # cloud), if all of the following conditions hold: # - the current user identity is not using AWS SSO @@ -217,17 +225,20 @@ def zones_provision_loop( @classmethod def _get_default_ami(cls, region_name: str, instance_type: str) -> str: acc = cls.get_accelerators_from_instance_type(instance_type) - image_id = service_catalog.get_image_id_from_tag( - 'skypilot:gpu-ubuntu-2004', region_name, clouds='aws') + image_id = service_catalog.get_image_id_from_tag(_DEFAULT_CPU_IMAGE_ID, + region_name, + clouds='aws') if acc is not None: + image_id = service_catalog.get_image_id_from_tag( + _DEFAULT_GPU_IMAGE_ID, region_name, clouds='aws') assert len(acc) == 1, acc acc_name = list(acc.keys())[0] if acc_name == 'K80': image_id = service_catalog.get_image_id_from_tag( - 'skypilot:k80-ubuntu-2004', region_name, clouds='aws') + _DEFAULT_GPU_K80_IMAGE_ID, region_name, clouds='aws') if acc_name in ['Trainium', 'Inferentia']: image_id = service_catalog.get_image_id_from_tag( - 'skypilot:neuron-ubuntu-2204', region_name, clouds='aws') + _DEFAULT_NEURON_IMAGE_ID, region_name, clouds='aws') if image_id is not None: return image_id # Raise ResourcesUnavailableError to make sure the failover in diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index afa85f48fa5..adffd32ad88 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -329,7 +329,6 @@ def make_deploy_resources_variables( runcmd: - sed -i 's/#Banner none/Banner none/' /etc/ssh/sshd_config - echo '\\nif [ ! -f "/tmp/__restarted" ]; then\\n sudo systemctl restart ssh\\n sleep 2\\n touch /tmp/__restarted\\nfi' >> /home/skypilot:ssh_user/.bashrc - - usermod -aG docker skypilot:ssh_user write_files: - path: /etc/apt/apt.conf.d/20auto-upgrades content: | diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index b1015c92979..1b70abf914d 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -94,6 +94,12 @@ f'\nTo query common AI images: {colorama.Style.BRIGHT}gcloud compute images list --project deeplearning-platform-release | less{colorama.Style.RESET_ALL}' ) +# Image ID tags +_DEFAULT_CPU_IMAGE_ID = 'skypilot:custom-cpu-ubuntu-2204' +# For GPU-related package version, see sky/clouds/service_catalog/images/provisioners/cuda.sh +_DEFAULT_GPU_IMAGE_ID = 'skypilot:custom-gpu-ubuntu-2204' +_DEFAULT_GPU_K80_IMAGE_ID = 'skypilot:k80-debian-10' + def _run_output(cmd): proc = subprocess.run(cmd, @@ -422,7 +428,7 @@ def make_deploy_resources_variables( # --no-standard-images # We use the debian image, as the ubuntu image has some connectivity # issue when first booted. - image_id = 'skypilot:cpu-debian-11' + image_id = _DEFAULT_CPU_IMAGE_ID def _failover_disk_tier() -> Optional[resources_utils.DiskTier]: if (r.disk_tier is not None and @@ -471,13 +477,16 @@ def _failover_disk_tier() -> Optional[resources_utils.DiskTier]: 'runtime_version'] resources_vars['tpu_node_name'] = r.accelerator_args.get( 'tpu_name') + # TPU VMs require privileged mode for docker containers to + # access TPU devices. + resources_vars['docker_run_options'] = ['--privileged'] else: # Convert to GCP names: # https://cloud.google.com/compute/docs/gpus if acc in ('A100-80GB', 'L4'): # A100-80GB and L4 have a different name pattern. resources_vars['gpu'] = f'nvidia-{acc.lower()}' - elif acc == 'H100': + elif acc in ('H100', 'H100-MEGA'): resources_vars['gpu'] = f'nvidia-{acc.lower()}-80gb' else: resources_vars['gpu'] = 'nvidia-tesla-{}'.format( @@ -487,10 +496,10 @@ def _failover_disk_tier() -> Optional[resources_utils.DiskTier]: # Though the image is called cu113, it actually has later # versions of CUDA as noted below. # CUDA driver version 470.57.02, CUDA Library 11.4 - image_id = 'skypilot:k80-debian-10' + image_id = _DEFAULT_GPU_K80_IMAGE_ID else: # CUDA driver version 535.86.10, CUDA Library 12.2 - image_id = 'skypilot:gpu-debian-11' + image_id = _DEFAULT_GPU_IMAGE_ID if (resources.image_id is not None and resources.extract_docker_image() is None): @@ -540,6 +549,11 @@ def _failover_disk_tier() -> Optional[resources_utils.DiskTier]: resources_vars[ 'force_enable_external_ips'] = skypilot_config.get_nested( ('gcp', 'force_enable_external_ips'), False) + + # Add gVNIC from config + resources_vars['enable_gvnic'] = skypilot_config.get_nested( + ('gcp', 'enable_gvnic'), False) + return resources_vars def _get_feasible_launchable_resources( diff --git a/sky/clouds/lambda_cloud.py b/sky/clouds/lambda_cloud.py index d3d20fbd41a..0201f4f76ad 100644 --- a/sky/clouds/lambda_cloud.py +++ b/sky/clouds/lambda_cloud.py @@ -8,7 +8,7 @@ from sky import clouds from sky import status_lib from sky.clouds import service_catalog -from sky.clouds.utils import lambda_utils +from sky.provision.lambda_cloud import lambda_utils from sky.utils import resources_utils if typing.TYPE_CHECKING: @@ -37,10 +37,6 @@ class Lambda(clouds.Cloud): _CLOUD_UNSUPPORTED_FEATURES = { clouds.CloudImplementationFeatures.STOP: 'Lambda cloud does not support stopping VMs.', clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER: f'Migrating disk is currently not supported on {_REPR}.', - clouds.CloudImplementationFeatures.DOCKER_IMAGE: ( - f'Docker image is currently not supported on {_REPR}. ' - 'You can try running docker command inside the `run` section in task.yaml.' - ), clouds.CloudImplementationFeatures.SPOT_INSTANCE: f'Spot instances are not supported in {_REPR}.', clouds.CloudImplementationFeatures.IMAGE_ID: f'Specifying image ID is not supported in {_REPR}.', clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: f'Custom disk tiers are not supported in {_REPR}.', @@ -48,6 +44,9 @@ class Lambda(clouds.Cloud): clouds.CloudImplementationFeatures.HOST_CONTROLLERS: f'Host controllers are not supported in {_REPR}.', } + PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT + STATUS_VERSION = clouds.StatusVersion.SKYPILOT + @classmethod def _unsupported_features_for_resources( cls, resources: 'resources_lib.Resources' @@ -170,12 +169,20 @@ def make_deploy_resources_variables( else: custom_resources = None - return { + resources_vars = { 'instance_type': resources.instance_type, 'custom_resources': custom_resources, 'region': region.name, } + if acc_dict is not None: + # Lambda cloud's docker runtime information does not contain + # 'nvidia-container-runtime', causing no GPU option is added to + # the docker run command. We patch this by adding it here. + resources_vars['docker_run_options'] = ['--gpus all'] + + return resources_vars + def _get_feasible_launchable_resources( self, resources: 'resources_lib.Resources' ) -> 'resources_utils.FeasibleResources': diff --git a/sky/clouds/oci.py b/sky/clouds/oci.py index f4ac4d577e3..810e43fe3b5 100644 --- a/sky/clouds/oci.py +++ b/sky/clouds/oci.py @@ -17,6 +17,8 @@ make_deploy_resources_variables(): Bug fix for specify the image_id as the ocid of the image in the task.yaml file, in this case the image_id for the node config should be set to the ocid instead of a dict. + - Hysun He (hysun.he@oracle.com) @ Oct 13, 2024: + Support more OS types additional to ubuntu for OCI resources. """ import json import logging @@ -295,10 +297,21 @@ def make_deploy_resources_variables( cpus=None if cpus is None else float(cpus), disk_tier=resources.disk_tier) + image_str = self._get_image_str(image_id=resources.image_id, + instance_type=resources.instance_type, + region=region.name) + + # pylint: disable=import-outside-toplevel + from sky.clouds.service_catalog import oci_catalog + os_type = oci_catalog.get_image_os_from_tag(tag=image_str, + region=region.name) + logger.debug(f'OS type for the image {image_str} is {os_type}') + return { 'instance_type': instance_type, 'custom_resources': custom_resources, 'region': region.name, + 'os_type': os_type, 'cpus': str(cpus), 'memory': resources.memory, 'disk_size': resources.disk_size, @@ -501,59 +514,45 @@ def _get_image_id( region_name: str, instance_type: str, ) -> str: - if image_id is None: - return self._get_default_image(region_name=region_name, - instance_type=instance_type) - if None in image_id: - image_id_str = image_id[None] - else: - assert region_name in image_id, image_id - image_id_str = image_id[region_name] + image_id_str = self._get_image_str(image_id=image_id, + instance_type=instance_type, + region=region_name) + if image_id_str.startswith('skypilot:'): image_id_str = service_catalog.get_image_id_from_tag(image_id_str, region_name, clouds='oci') - if image_id_str is None: - logger.critical( - '! Real image_id not found! - {region_name}:{image_id}') - # Raise ResourcesUnavailableError to make sure the failover - # in CloudVMRayBackend will be correctly triggered. - # TODO(zhwu): This is a information leakage to the cloud - # implementor, we need to find a better way to handle this. - raise exceptions.ResourcesUnavailableError( - '! ERR: No image found in catalog for region ' - f'{region_name}. Try setting a valid image_id.') + + # Image_id should be impossible be None, except for the case when + # user specify an image tag which does not exist in the image.csv + # catalog file which only possible in "test" / "evaluation" phase. + # Therefore, we use assert here. + assert image_id_str is not None logger.debug(f'Got real image_id {image_id_str}') return image_id_str - def _get_default_image(self, region_name: str, instance_type: str) -> str: + def _get_image_str(self, image_id: Optional[Dict[Optional[str], str]], + instance_type: str, region: str): + if image_id is None: + image_str = self._get_default_image_tag(instance_type) + elif None in image_id: + image_str = image_id[None] + else: + assert region in image_id, image_id + image_str = image_id[region] + return image_str + + def _get_default_image_tag(self, instance_type: str) -> str: acc = self.get_accelerators_from_instance_type(instance_type) if acc is None: image_tag = oci_utils.oci_config.get_default_image_tag() - image_id_str = service_catalog.get_image_id_from_tag(image_tag, - region_name, - clouds='oci') else: assert len(acc) == 1, acc image_tag = oci_utils.oci_config.get_default_gpu_image_tag() - image_id_str = service_catalog.get_image_id_from_tag(image_tag, - region_name, - clouds='oci') - if image_id_str is not None: - logger.debug( - f'Got default image_id {image_id_str} from tag {image_tag}') - return image_id_str - - # Raise ResourcesUnavailableError to make sure the failover in - # CloudVMRayBackend will be correctly triggered. - # TODO(zhwu): This is a information leakage to the cloud implementor, - # we need to find a better way to handle this. - raise exceptions.ResourcesUnavailableError( - 'ERR: No image found in catalog for region ' - f'{region_name}. Try update your default image_id settings.') + return image_tag def get_vpu_from_disktier( self, cpus: Optional[float], diff --git a/sky/clouds/service_catalog/aws_catalog.py b/sky/clouds/service_catalog/aws_catalog.py index a44750c4ec4..d156135047b 100644 --- a/sky/clouds/service_catalog/aws_catalog.py +++ b/sky/clouds/service_catalog/aws_catalog.py @@ -308,7 +308,17 @@ def list_accelerators( def get_image_id_from_tag(tag: str, region: Optional[str]) -> Optional[str]: """Returns the image id from the tag.""" - return common.get_image_id_from_tag_impl(_image_df, tag, region) + global _image_df + + image_id = common.get_image_id_from_tag_impl(_image_df, tag, region) + if image_id is None: + # Refresh the image catalog and try again, if the image tag is not + # found. + logger.debug('Refreshing the image catalog and trying again.') + _image_df = common.read_catalog('aws/images.csv', + pull_frequency_hours=0) + image_id = common.get_image_id_from_tag_impl(_image_df, tag, region) + return image_id def is_image_tag_valid(tag: str, region: Optional[str]) -> bool: diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_aws.py b/sky/clouds/service_catalog/data_fetchers/fetch_aws.py index e0e5ffa21a1..b630123648e 100644 --- a/sky/clouds/service_catalog/data_fetchers/fetch_aws.py +++ b/sky/clouds/service_catalog/data_fetchers/fetch_aws.py @@ -538,11 +538,13 @@ def _check_regions_integrity(df: 'pd.DataFrame', name: str): instance_df.to_csv('aws/vms.csv', index=False) print('AWS Service Catalog saved to aws/vms.csv') - image_df = get_all_regions_images_df(user_regions) - _check_regions_integrity(image_df, 'images') + # Disable refreshing images.csv as we are using skypilot custom AMIs + # See sky/clouds/service_catalog/images/README.md for more details. + # image_df = get_all_regions_images_df(user_regions) + # _check_regions_integrity(image_df, 'images') - image_df.to_csv('aws/images.csv', index=False) - print('AWS Images saved to aws/images.csv') + # image_df.to_csv('aws/images.csv', index=False) + # print('AWS Images saved to aws/images.csv') if args.az_mappings: az_mappings_df = fetch_availability_zone_mappings() diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py b/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py index eb69695aa55..097efe74deb 100644 --- a/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py +++ b/sky/clouds/service_catalog/data_fetchers/fetch_gcp.py @@ -419,6 +419,11 @@ def _get_gpus_for_zone(zone: str) -> 'pd.DataFrame': if count != 8: # H100 only has 8 cards. continue + if 'H100-MEGA-80GB' in gpu_name: + gpu_name = 'H100-MEGA' + if count != 8: + # H100-MEGA only has 8 cards. + continue if 'VWS' in gpu_name: continue if gpu_name.startswith('TPU-'): @@ -447,6 +452,7 @@ def _gpu_info_from_name(name: str) -> Optional[Dict[str, List[Dict[str, Any]]]]: 'A100-80GB': 80 * 1024, 'A100': 40 * 1024, 'H100': 80 * 1024, + 'H100-MEGA': 80 * 1024, 'P4': 8 * 1024, 'T4': 16 * 1024, 'V100': 16 * 1024, @@ -491,12 +497,17 @@ def get_gpu_price(row: pd.Series, spot: bool) -> Optional[float]: if sku['category']['usageType'] != ondemand_or_spot: continue - gpu_name = row['AcceleratorName'] - if gpu_name == 'A100-80GB': - gpu_name = 'A100 80GB' - if gpu_name == 'H100': - gpu_name = 'H100 80GB' - if f'{gpu_name} GPU' not in sku['description']: + gpu_names = [row['AcceleratorName']] + if gpu_names[0] == 'A100-80GB': + gpu_names = ['A100 80GB'] + if gpu_names[0] == 'H100': + gpu_names = ['H100 80GB'] + if gpu_names[0] == 'H100-MEGA': + # Seems that H100-MEGA has two different descriptions in SKUs in + # different regions: 'H100 80GB Mega' and 'H100 80GB Plus'. + gpu_names = ['H100 80GB Mega', 'H100 80GB Plus'] + if not any(f'{gpu_name} GPU' in sku['description'] + for gpu_name in gpu_names): continue unit_price = _get_unit_price(sku) diff --git a/sky/clouds/service_catalog/gcp_catalog.py b/sky/clouds/service_catalog/gcp_catalog.py index f861b51920e..c9e15f602dc 100644 --- a/sky/clouds/service_catalog/gcp_catalog.py +++ b/sky/clouds/service_catalog/gcp_catalog.py @@ -98,6 +98,9 @@ }, 'H100': { 8: ['a3-highgpu-8g'], + }, + 'H100-MEGA': { + 8: ['a3-megagpu-8g'], } } diff --git a/sky/clouds/service_catalog/images/README.md b/sky/clouds/service_catalog/images/README.md new file mode 100644 index 00000000000..31ce7c6d9ce --- /dev/null +++ b/sky/clouds/service_catalog/images/README.md @@ -0,0 +1,72 @@ +# SkyPilot OS Image Generation Guide + +## Prerequisites +You only need to do this once. +1. Install [Packer](https://developer.hashicorp.com/packer/tutorials/aws-get-started/get-started-install-cli) +2. Download plugins used by Packer +```bash +packer init plugins.pkr.hcl +``` +3. Setup cloud credentials + +## Generate Images +```bash +export CLOUD=gcp # Update this +export TYPE=gpu # Update this +export IMAGE=skypilot-${CLOUD}-${TYPE}-ubuntu +packer build ${IMAGE}.pkr.hcl +``` +You will see the image ID after the build is complete. + +FYI time to packer build an image: + +| Cloud | Type | Approx. Time | +|-------|------|------------------------| +| AWS | GPU | 15 min | +| AWS | CPU | 10 min | +| GCP | GPU | 16 min | +| GCP | CPU | 5 min | + +### GCP +```bash +export IMAGE_NAME=skypilot-gcp-cpu-ubuntu-20241011003407 # Update this + +# Make image public +export IMAGE_ID=projects/sky-dev-465/global/images/${IMAGE_NAME} +gcloud compute images add-iam-policy-binding ${IMAGE_NAME} --member='allAuthenticatedUsers' --role='roles/compute.imageUser' +``` + +### AWS +1. Generate images for all regions +```bash +export IMAGE_ID=ami-0b31b24524afa8e47 # Update this + +python aws_utils/image_gen.py --image-id ${IMAGE_ID} --processor ${TYPE} +``` +2. Add fallback images if any region failed \ +Look for "NEED_FALLBACK" in the output `images.csv` and edit. (You can use public [ubuntu images](https://cloud-images.ubuntu.com/locator/ec2/) as fallback.) + +## Test Images +1. Minimal GPU test: `sky launch --image ${IMAGE_ID} --gpus=L4:1 --cloud ${CLOUD}` then run `nvidia-smi` in the launched instance. +2. Update the image ID in `sky/clouds/gcp.py` and run the test: +```bash +pytest tests/test_smoke.py::test_minimal --gcp +pytest tests/test_smoke.py::test_huggingface --gcp +pytest tests/test_smoke.py::test_job_queue_with_docker --gcp +pytest tests/test_smoke.py::test_cancel_gcp +``` + +## Ship Images & Cleanup +Submit a PR to update [`SkyPilot Catalog`](https://github.com/skypilot-org/skypilot-catalog/tree/master/catalogs) then clean up the old images to avoid extra iamge storage fees. + +### GCP +1. Example PR: [#86](https://github.com/skypilot-org/skypilot-catalog/pull/86) +2. Go to console and delete old images. + +### AWS +1. Copy the old custom image rows from Catalog's existing `images.csv` to a local `images.csv` in this folder. +2. Update Catalog with new images. Example PR: [#89](https://github.com/skypilot-org/skypilot-catalog/pull/89) +3. Delete AMIs across regions by running +```bash +python aws_utils/image_delete.py --tag ${TAG} +``` diff --git a/sky/clouds/service_catalog/images/aws_utils/image_delete.py b/sky/clouds/service_catalog/images/aws_utils/image_delete.py new file mode 100644 index 00000000000..52cbb5b2382 --- /dev/null +++ b/sky/clouds/service_catalog/images/aws_utils/image_delete.py @@ -0,0 +1,63 @@ +"""Delete all images with a given tag and their associated snapshots from images.csv + +Example Usage: put images.csv in the same folder as this script and run + python image_delete.py --tag skypilot:custom-gpu-ubuntu-2204 +""" + +import argparse +import csv +import json +import subprocess + +parser = argparse.ArgumentParser( + description='Delete AWS images and their snapshots across regions.') +parser.add_argument('--tag', + required=True, + help='Tag of the image to delete, see tags in images.csv') +args = parser.parse_args() + + +def get_snapshots(image_id, region): + cmd = f'aws ec2 describe-images --image-ids {image_id} --region {region} --query "Images[*].BlockDeviceMappings[*].Ebs.SnapshotId" --output json' + result = subprocess.run(cmd, + shell=True, + check=True, + capture_output=True, + text=True) + snapshots = json.loads(result.stdout) + return [ + snapshot for sublist in snapshots for snapshot in sublist if snapshot + ] + + +def delete_image_and_snapshots(image_id, region): + # Must get snapshots before deleting the image + snapshots = get_snapshots(image_id, region) + + # Deregister the image + cmd = f'aws ec2 deregister-image --image-id {image_id} --region {region}' + subprocess.run(cmd, shell=True, check=True) + print(f"Deregistered image {image_id} in region {region}") + + # Delete snapshots + for snapshot in snapshots: + cmd = f'aws ec2 delete-snapshot --snapshot-id {snapshot} --region {region}' + subprocess.run(cmd, shell=True, check=True) + print(f'Deleted snapshot {snapshot} in region {region}') + + +def main(): + with open('images.csv', 'r') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + if row['Tag'] == args.tag: + try: + delete_image_and_snapshots(row['ImageId'], row['Region']) + except subprocess.CalledProcessError as e: + print( + f'Failed to delete image {row["ImageId"]} or its snapshots in region {row["Region"]}: {e}' + ) + + +if __name__ == "__main__": + main() diff --git a/sky/clouds/service_catalog/images/aws_utils/image_gen.py b/sky/clouds/service_catalog/images/aws_utils/image_gen.py new file mode 100644 index 00000000000..cb39355ad2c --- /dev/null +++ b/sky/clouds/service_catalog/images/aws_utils/image_gen.py @@ -0,0 +1,151 @@ +"""Copy SkyPilot AMI to multiple regions, make them public, and generate images.csv + +Example Usage: + python aws_image_gen.py --source-image-id ami-00000 --processor gpu +""" + +import argparse +import concurrent.futures +import csv +import json +import os +import subprocess +import threading +import time + +parser = argparse.ArgumentParser( + description='Generate AWS images across regions.') +parser.add_argument('--image-id', + required=True, + help='The source AMI ID to copy from') +parser.add_argument('--processor', required=True, help='e.g. gpu, cpu, etc.') +parser.add_argument('--region', + default='us-east-1', + help='Region of the source AMI') +parser.add_argument('--base-image-id', + default='ami-005fc0f236362e99f', + help='The base AMI of the source AMI.') +parser.add_argument('--os-type', default='ubuntu', help='The OS type') +parser.add_argument('--os-version', default='22.04', help='The OS version') +parser.add_argument('--output-csv', + default='images.csv', + help='The output CSV file name') +args = parser.parse_args() + +# 25 regions +ALL_REGIONS = [ + # 'us-east-1', # Source AMI is already in this region + 'us-east-2', + 'us-west-1', + 'us-west-2', + 'ca-central-1', + 'eu-central-1', # need for smoke test + 'eu-central-2', + 'eu-west-1', + 'eu-west-2', + 'eu-south-1', + 'eu-south-2', + 'eu-west-3', + 'eu-north-1', + 'me-south-1', + 'me-central-1', + 'af-south-1', + 'ap-east-1', + 'ap-south-1', + 'ap-south-2', + 'ap-northeast-3', + 'ap-northeast-2', + 'ap-southeast-1', + 'ap-southeast-2', + 'ap-southeast-3', + 'ap-northeast-1', +] + + +def make_image_public(image_id, region): + unblock_command = f"aws ec2 disable-image-block-public-access --region {region}" + subprocess.run(unblock_command, shell=True, check=True) + public_command = ( + f'aws ec2 modify-image-attribute --image-id {image_id} ' + f'--launch-permission "{{\\\"Add\\\": [{{\\\"Group\\\":\\\"all\\\"}}]}}" --region {region}' + ) + subprocess.run(public_command, shell=True, check=True) + print(f"Made {image_id} public") + + +def copy_image_and_make_public(target_region): + # Copy the AMI to the target region + copy_command = ( + f"aws ec2 copy-image --source-region {args.region} " + f"--source-image-id {args.image_id} --region {target_region} " + f"--name 'skypilot-aws-{args.processor}-{args.os_type}-{time.time()}' --output json" + ) + print(copy_command) + result = subprocess.run(copy_command, + shell=True, + check=True, + capture_output=True, + text=True) + print(result.stdout) + new_image_id = json.loads(result.stdout)['ImageId'] + print(f"Copied image to {target_region} with new image ID: {new_image_id}") + + # Wait for the image to be available + print(f"Waiting for {new_image_id} to be available...") + wait_command = f"aws ec2 wait image-available --image-ids {new_image_id} --region {target_region}" + subprocess.run(wait_command, shell=True, check=True) + + make_image_public(new_image_id, target_region) + + return new_image_id + + +def write_image_to_csv(image_id, region): + with open(args.output_csv, 'a', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + row = [ + f'skypilot:custom-{args.processor}-{args.os_type}', region, + args.os_type, args.os_version, image_id, + time.strftime('%Y%m%d'), args.base_image_id + ] + writer.writerow(row) + print(f"Wrote to CSV: {row}") + + +def main(): + make_image_public(args.image_id, args.region) + if not os.path.exists(args.output_csv): + with open(args.output_csv, 'w', newline='') as csvfile: + writer = csv.writer(csvfile) + writer.writerow([ + 'Tag', 'Region', 'OS', 'OSVersion', 'ImageId', 'CreationDate', + 'BaseImageId' + ]) # Header + print(f"No existing {args.output_csv} so created it.") + + # Process other regions + image_cache = [(args.image_id, args.region)] + + def process_region(copy_to_region): + print(f"Start copying image to {copy_to_region}...") + try: + new_image_id = copy_image_and_make_public(copy_to_region) + except Exception as e: + print(f"Error generating image to {copy_to_region}: {str(e)}") + new_image_id = 'NEED_FALLBACK' + image_cache.append((new_image_id, copy_to_region)) + + with concurrent.futures.ThreadPoolExecutor() as executor: + executor.map(process_region, ALL_REGIONS) + executor.shutdown(wait=True) + + # Sort the images by it's region and write to CSV + sorted_image_cache = sorted(image_cache, key=lambda x: x[1]) + for new_image_id, copy_to_region in sorted_image_cache: + write_image_to_csv(new_image_id, copy_to_region) + + print("All done!") + + +if __name__ == "__main__": + main() diff --git a/sky/clouds/service_catalog/images/plugins.pkr.hcl b/sky/clouds/service_catalog/images/plugins.pkr.hcl new file mode 100644 index 00000000000..e007c1723bf --- /dev/null +++ b/sky/clouds/service_catalog/images/plugins.pkr.hcl @@ -0,0 +1,17 @@ +packer { + required_plugins { + amazon = { + version = ">= 1.2.8" + source = "github.com/hashicorp/amazon" + } + } +} + +packer { + required_plugins { + googlecompute = { + version = ">= 1.1.1" + source = "github.com/hashicorp/googlecompute" + } + } +} diff --git a/sky/clouds/service_catalog/images/provisioners/cloud.sh b/sky/clouds/service_catalog/images/provisioners/cloud.sh new file mode 100644 index 00000000000..b326c9fde51 --- /dev/null +++ b/sky/clouds/service_catalog/images/provisioners/cloud.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +PYTHON_EXEC=$(echo ~/skypilot-runtime)/bin/python + +# TODO: keep this dependency installation align with utils/controller_utils.py and setup.py +install_azure() { + echo "Install cloud dependencies on controller: Azure" + $PYTHON_EXEC -m pip install "azure-cli>=2.31.0" azure-core "azure-identity>=1.13.0" azure-mgmt-network + $PYTHON_EXEC -m pip install azure-storage-blob msgraph-sdk +} + +install_gcp() { + echo "Install cloud dependencies on controller: GCP" + $PYTHON_EXEC -m pip install "google-api-python-client>=2.69.0" + $PYTHON_EXEC -m pip install google-cloud-storage + if ! gcloud --help > /dev/null 2>&1; then + pushd /tmp &>/dev/null + mkdir -p ~/.sky/logs + wget --quiet https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-424.0.0-linux-x86_64.tar.gz > ~/.sky/logs/gcloud_installation.log + tar xzf google-cloud-sdk-424.0.0-linux-x86_64.tar.gz >> ~/.sky/logs/gcloud_installation.log + rm -rf ~/google-cloud-sdk >> ~/.sky/logs/gcloud_installation.log + mv google-cloud-sdk ~/ + ~/google-cloud-sdk/install.sh -q >> ~/.sky/logs/gcloud_installation.log 2>&1 + echo "source ~/google-cloud-sdk/path.bash.inc > /dev/null 2>&1" >> ~/.bashrc + source ~/google-cloud-sdk/path.bash.inc >> ~/.sky/logs/gcloud_installation.log 2>&1 + popd &>/dev/null + fi +} + +install_aws() { + echo "Install cloud dependencies on controller: AWS" + $PYTHON_EXEC -m pip install botocore>=1.29.10 boto3>=1.26.1 + $PYTHON_EXEC -m pip install "urllib3<2" awscli>=1.27.10 "colorama<0.4.5" +} + +if [ "$CLOUD" = "azure" ]; then + install_azure +elif [ "$CLOUD" = "gcp" ]; then + install_gcp +elif [ "$CLOUD" = "aws" ]; then + install_aws +else + echo "Error: Unknown cloud $CLOUD so not installing any cloud dependencies." +fi + +if [ $? -eq 0 ]; then + echo "Successfully installed cloud dependencies on controller: $CLOUD" +else + echo "Error: Failed to install cloud dependencies on controller: $CLOUD" +fi diff --git a/sky/clouds/service_catalog/images/provisioners/cuda.sh b/sky/clouds/service_catalog/images/provisioners/cuda.sh new file mode 100644 index 00000000000..1b2b4ec977e --- /dev/null +++ b/sky/clouds/service_catalog/images/provisioners/cuda.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +# This script installs the latest CUDA driver and toolkit version that is compatible with all GPU types. +# For CUDA driver version, choose the latest version that works for ALL GPU types. +# GCP: https://cloud.google.com/compute/docs/gpus/install-drivers-gpu#minimum-driver +# AWS: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/install-nvidia-driver.html +export DEBIAN_FRONTEND=noninteractive + +wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.1-1_all.deb +sudo dpkg -i cuda-keyring_1.1-1_all.deb +sudo apt-get update + +# Make sure CUDA toolkit and driver versions are compatible: https://docs.nvidia.com/deploy/cuda-compatibility/index.html +# Current State: Driver Version 535.183.06 and CUDA Version 12.2 +sudo apt-get install -y cuda-drivers-535 +sudo apt-get install -y cuda-toolkit-12-4 + +# Install cuDNN +# https://docs.nvidia.com/deeplearning/cudnn/latest/installation/linux.html#installing-on-linux +sudo apt-get install libcudnn8 +sudo apt-get install libcudnn8-dev + +# Cleanup +rm cuda-keyring_1.1-1_all.deb diff --git a/sky/clouds/service_catalog/images/provisioners/docker.sh b/sky/clouds/service_catalog/images/provisioners/docker.sh new file mode 100644 index 00000000000..da2366408ab --- /dev/null +++ b/sky/clouds/service_catalog/images/provisioners/docker.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +# Add Docker's official GPG key: +sudo apt-get update +sudo apt-get install ca-certificates curl +sudo install -m 0755 -d /etc/apt/keyrings +sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc +sudo chmod a+r /etc/apt/keyrings/docker.asc + +# Add the repository to Apt sources: +echo \ + "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null +sudo apt-get update + +# Install Docker +sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin + +# Add user to Docker group so that user does not need to use sudo to run Docker commands +sudo usermod -aG docker $USER +newgrp docker diff --git a/sky/clouds/service_catalog/images/provisioners/nvidia-container-toolkit.sh b/sky/clouds/service_catalog/images/provisioners/nvidia-container-toolkit.sh new file mode 100644 index 00000000000..b6b3625176b --- /dev/null +++ b/sky/clouds/service_catalog/images/provisioners/nvidia-container-toolkit.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +set -e + +curl -fsSL https://nvidia.github.io/libnvidia-container/gpgkey | sudo gpg --dearmor -o /usr/share/keyrings/nvidia-container-toolkit-keyring.gpg && + curl -s -L https://nvidia.github.io/libnvidia-container/stable/deb/nvidia-container-toolkit.list | + sed 's#deb https://#deb [signed-by=/usr/share/keyrings/nvidia-container-toolkit-keyring.gpg] https://#g' | + sudo tee /etc/apt/sources.list.d/nvidia-container-toolkit.list + +sudo apt-get update +sudo apt-get install -y nvidia-container-toolkit + +# if there's an empty /etc/docker/daemon.json, `nvidia-ctk runtime configure --runtime=docker` will fail +if [ -f /etc/docker/daemon.json ] && [ ! -s /etc/docker/daemon.json ]; then + sudo rm /etc/docker/daemon.json +fi + +sudo nvidia-ctk runtime configure --runtime=docker +sudo systemctl restart docker + +# Validate +if sudo docker info -f "{{.Runtimes}}" | grep "nvidia-container-runtime"; then + echo "Successfully installed NVIDIA container runtime" +else + echo "Failed to install NVIDIA container runtime" +fi diff --git a/sky/clouds/service_catalog/images/provisioners/skypilot.sh b/sky/clouds/service_catalog/images/provisioners/skypilot.sh new file mode 100644 index 00000000000..ff2aa06b2b6 --- /dev/null +++ b/sky/clouds/service_catalog/images/provisioners/skypilot.sh @@ -0,0 +1,69 @@ +#!/bin/bash + +# Stop and disable unattended-upgrades +sudo systemctl stop unattended-upgrades || true +sudo systemctl disable unattended-upgrades || true +sudo sed -i 's/Unattended-Upgrade "1"/Unattended-Upgrade "0"/g' /etc/apt/apt.conf.d/20auto-upgrades || true + +# Configure dpkg +sudo dpkg --configure --force-overwrite -a + +# Apt-get installs +sudo apt-get install jq -y + +# Create necessary directories +mkdir -p ~/sky_workdir +mkdir -p ~/.sky/ +mkdir -p ~/.sky/sky_app +mkdir -p ~/.ssh +touch ~/.ssh/config + +# Install Miniconda +curl -o Miniconda3-Linux-x86_64.sh https://repo.anaconda.com/miniconda/Miniconda3-py310_23.11.0-2-Linux-x86_64.sh +bash Miniconda3-Linux-x86_64.sh -b +eval "$(~/miniconda3/bin/conda shell.bash hook)" +rm Miniconda3-Linux-x86_64.sh +conda init +conda config --set auto_activate_base true +conda activate base + +# Conda, Python +echo "Creating conda env with Python 3.10" +conda create -y -n skypilot-runtime python=3.10 +conda activate skypilot-runtime +export PIP_DISABLE_PIP_VERSION_CHECK=1 +echo PATH=$PATH +python3 -m venv ~/skypilot-runtime +PYTHON_EXEC=$(echo ~/skypilot-runtime)/bin/python + +# Pip installs +$PYTHON_EXEC -m pip install "setuptools<70" +$PYTHON_EXEC -m pip install "grpcio!=1.48.0,<=1.51.3,>=1.42.0" +$PYTHON_EXEC -m pip install "skypilot-nightly" + +# Install ray +RAY_ADDRESS=127.0.0.1:6380 +$PYTHON_EXEC -m pip install --exists-action w -U ray[default]==2.9.3 +export PATH=$PATH:$HOME/.local/bin +source ~/skypilot-runtime/bin/activate +which ray > ~/.sky/ray_path || exit 1 +$PYTHON_EXEC -m pip list | grep "ray " | grep 2.9.3 2>&1 > /dev/null && { + $PYTHON_EXEC -c "from sky.skylet.ray_patches import patch; patch()" || exit 1 +} + +# System configurations +sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf' +sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf' +sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity +sudo systemctl daemon-reload + +# Stop and disable Jupyter service +sudo systemctl stop jupyter > /dev/null 2>&1 || true +sudo systemctl disable jupyter > /dev/null 2>&1 || true + +# Configure fuse +[ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf' + +# Cleanup +# Remove SkyPilot in OS image because when user sky launch we will install whatever version of SkyPilot user has on their local machine. +$PYTHON_EXEC -m pip uninstall "skypilot-nightly" -y diff --git a/sky/clouds/service_catalog/images/skypilot-aws-cpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-aws-cpu-ubuntu.pkr.hcl new file mode 100644 index 00000000000..c21fbf51b20 --- /dev/null +++ b/sky/clouds/service_catalog/images/skypilot-aws-cpu-ubuntu.pkr.hcl @@ -0,0 +1,47 @@ +variable "region" { + type = string + default = "us-east-1" +} + +locals { + timestamp = regex_replace(timestamp(), "[- TZ:]", "") +} + +source "amazon-ebs" "cpu-ubuntu" { + ami_name = "skypilot-aws-cpu-ubuntu-${local.timestamp}" + instance_type = "t2.micro" + region = var.region + ssh_username = "ubuntu" + source_ami_filter { + filters = { + name = "ubuntu/images/*ubuntu-jammy-22.04-amd64-server-*" + root-device-type = "ebs" + virtualization-type = "hvm" + } + most_recent = true + owners = ["099720109477"] + } + launch_block_device_mappings { + device_name = "/dev/sda1" + volume_size = 8 + volume_type = "gp2" + delete_on_termination = true + } +} + +build { + name = "aws-cpu-ubuntu-build" + sources = ["sources.amazon-ebs.cpu-ubuntu"] + provisioner "shell" { + script = "./provisioners/docker.sh" + } + provisioner "shell" { + script = "./provisioners/skypilot.sh" + } + provisioner "shell" { + environment_vars = [ + "CLOUD=aws", + ] + script = "./provisioners/cloud.sh" + } +} diff --git a/sky/clouds/service_catalog/images/skypilot-aws-gpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-aws-gpu-ubuntu.pkr.hcl new file mode 100644 index 00000000000..c4a8efac4dc --- /dev/null +++ b/sky/clouds/service_catalog/images/skypilot-aws-gpu-ubuntu.pkr.hcl @@ -0,0 +1,55 @@ +variable "region" { + type = string + default = "us-east-1" +} + +locals { + timestamp = regex_replace(timestamp(), "[- TZ:]", "") +} + +source "amazon-ebs" "gpu-ubuntu" { + ami_name = "skypilot-aws-gpu-ubuntu-${local.timestamp}" + instance_type = "g6.xlarge" + region = var.region + ssh_username = "ubuntu" + source_ami_filter { + filters = { + name = "ubuntu/images/*ubuntu-jammy-22.04-amd64-server-*" + root-device-type = "ebs" + virtualization-type = "hvm" + } + most_recent = true + owners = ["099720109477"] + } + launch_block_device_mappings { + device_name = "/dev/sda1" + volume_size = 30 + volume_type = "gp2" + delete_on_termination = true + } +} + +build { + name = "aws-gpu-ubuntu-build" + sources = [ + "source.amazon-ebs.gpu-ubuntu" + ] + provisioner "shell" { + script = "./provisioners/docker.sh" + } + provisioner "shell" { + script = "./provisioners/cuda.sh" + } + provisioner "shell" { + script = "./provisioners/nvidia-container-toolkit.sh" + } + provisioner "shell" { + script = "./provisioners/skypilot.sh" + } + provisioner "shell" { + environment_vars = [ + "CLOUD=aws", + ] + script = "./provisioners/cloud.sh" + } +} diff --git a/sky/clouds/service_catalog/images/skypilot-gcp-cpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-gcp-cpu-ubuntu.pkr.hcl new file mode 100644 index 00000000000..bf3af0519e4 --- /dev/null +++ b/sky/clouds/service_catalog/images/skypilot-gcp-cpu-ubuntu.pkr.hcl @@ -0,0 +1,33 @@ + +locals { + timestamp = regex_replace(timestamp(), "[- TZ:]", "") +} + +source "googlecompute" "cpu-ubuntu" { + project_id = "sky-dev-465" + image_name = "skypilot-gcp-cpu-ubuntu-${local.timestamp}" + source_image_family = "ubuntu-2204-lts" + zone = "us-west1-a" + image_description = "SkyPilot custom image for launching GCP CPU instances." + tags = ["packer"] + disk_size = 10 + machine_type = "e2-medium" + ssh_username = "gcpuser" +} + +build { + name = "gcp-cpu-ubuntu-build" + sources = ["sources.googlecompute.cpu-ubuntu"] + provisioner "shell" { + script = "./provisioners/docker.sh" + } + provisioner "shell" { + script = "./provisioners/skypilot.sh" + } + provisioner "shell" { + environment_vars = [ + "CLOUD=gcp", + ] + script = "./provisioners/cloud.sh" + } +} diff --git a/sky/clouds/service_catalog/images/skypilot-gcp-gpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-gcp-gpu-ubuntu.pkr.hcl new file mode 100644 index 00000000000..f46d414493b --- /dev/null +++ b/sky/clouds/service_catalog/images/skypilot-gcp-gpu-ubuntu.pkr.hcl @@ -0,0 +1,46 @@ +variable "zone" { + type = string + default = "us-west1-a" +} + +locals { + timestamp = regex_replace(timestamp(), "[- TZ:]", "") +} + +source "googlecompute" "gpu-ubuntu" { + image_name = "skypilot-gcp-gpu-ubuntu-${local.timestamp}" + project_id = "sky-dev-465" + source_image_family = "ubuntu-2204-lts" + zone = var.zone + image_description = "SkyPilot custom image for launching GCP GPU instances." + tags = ["packer", "gpu", "ubuntu"] + disk_size = 50 + machine_type = "g2-standard-4" + accelerator_type = "projects/sky-dev-465/zones/${var.zone}/acceleratorTypes/nvidia-l4" + accelerator_count = 1 + on_host_maintenance = "TERMINATE" + ssh_username = "gcpuser" +} + +build { + name = "gcp-gpu-ubuntu-build" + sources = ["sources.googlecompute.gpu-ubuntu"] + provisioner "shell" { + script = "./provisioners/docker.sh" + } + provisioner "shell" { + script = "./provisioners/cuda.sh" + } + provisioner "shell" { + script = "./provisioners/nvidia-container-toolkit.sh" + } + provisioner "shell" { + script = "./provisioners/skypilot.sh" + } + provisioner "shell" { + environment_vars = [ + "CLOUD=gcp", + ] + script = "./provisioners/cloud.sh" + } +} diff --git a/sky/clouds/service_catalog/oci_catalog.py b/sky/clouds/service_catalog/oci_catalog.py index a18dee79be5..47d0489f6ab 100644 --- a/sky/clouds/service_catalog/oci_catalog.py +++ b/sky/clouds/service_catalog/oci_catalog.py @@ -7,6 +7,8 @@ - Hysun He (hysun.he@oracle.com) @ Apr, 2023: Initial implementation - Hysun He (hysun.he@oracle.com) @ Jun, 2023: Reduce retry times by excluding those unsubscribed regions. + - Hysun He (hysun.he@oracle.com) @ Oct 14, 2024: Bug fix for validation + of the Marketplace images """ import logging @@ -206,4 +208,24 @@ def get_image_id_from_tag(tag: str, region: Optional[str]) -> Optional[str]: def is_image_tag_valid(tag: str, region: Optional[str]) -> bool: """Returns whether the image tag is valid.""" + # Oct.14, 2024 by Hysun He: Marketplace images are region neutral, so don't + # check with region for the Marketplace images. + df = _image_df[_image_df['Tag'].str.fullmatch(tag)] + if df.empty: + return False + app_catalog_listing_id = df['AppCatalogListingId'].iloc[0] + if app_catalog_listing_id: + return True return common.is_image_tag_valid_impl(_image_df, tag, region) + + +def get_image_os_from_tag(tag: str, region: Optional[str]) -> Optional[str]: + del region + df = _image_df[_image_df['Tag'].str.fullmatch(tag)] + if df.empty: + os_type = oci_utils.oci_config.get_default_image_os() + else: + os_type = df['OS'].iloc[0] + + logger.debug(f'Operation system for the image {tag} is {os_type}') + return os_type diff --git a/sky/clouds/utils/oci_utils.py b/sky/clouds/utils/oci_utils.py index 3d11bab24da..86647071f3e 100644 --- a/sky/clouds/utils/oci_utils.py +++ b/sky/clouds/utils/oci_utils.py @@ -1,7 +1,9 @@ """OCI Configuration. History: - - Zhanghao Wu @ Oct 2023: Formatting and refactoring - Hysun He (hysun.he@oracle.com) @ Apr, 2023: Initial implementation + - Zhanghao Wu @ Oct 2023: Formatting and refactoring + - Hysun He (hysun.he@oracle.com) @ Oct, 2024: Add default image OS + configuration. """ import logging import os @@ -121,5 +123,13 @@ def get_profile(cls) -> str: return skypilot_config.get_nested( ('oci', 'default', 'oci_config_profile'), 'DEFAULT') + @classmethod + def get_default_image_os(cls) -> str: + # Get the default image OS. Instead of hardcoding, we give a choice to + # set the default image OS type in the sky's user-config file. (if not + # specified, use the hardcode one at last) + return skypilot_config.get_nested(('oci', 'default', 'image_os_type'), + 'ubuntu') + oci_config = OCIConfig() diff --git a/sky/core.py b/sky/core.py index fa695bda687..496b8b8ad5e 100644 --- a/sky/core.py +++ b/sky/core.py @@ -1,7 +1,7 @@ """SDK functions for cluster/job management.""" import getpass import typing -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union import colorama @@ -11,10 +11,12 @@ from sky import data from sky import exceptions from sky import global_user_state +from sky import jobs as managed_jobs from sky import sky_logging from sky import status_lib from sky import task from sky.backends import backend_utils +from sky.provision.kubernetes import utils as kubernetes_utils from sky.skylet import constants from sky.skylet import job_lib from sky.usage import usage_lib @@ -111,6 +113,79 @@ def status(cluster_names: Optional[Union[str, List[str]]] = None, cluster_names=cluster_names) +def status_kubernetes( +) -> Tuple[List['kubernetes_utils.KubernetesSkyPilotClusterInfo'], + List['kubernetes_utils.KubernetesSkyPilotClusterInfo'], List[Dict[ + str, Any]], Optional[str]]: + """Get all SkyPilot clusters and jobs in the Kubernetes cluster. + + Managed jobs and services are also included in the clusters returned. + The caller must parse the controllers to identify which clusters are run + as managed jobs or services. +all_clusters, unmanaged_clusters, all_jobs, context + Returns: + A tuple containing: + - all_clusters: List of KubernetesSkyPilotClusterInfo with info for + all clusters, including managed jobs, services and controllers. + - unmanaged_clusters: List of KubernetesSkyPilotClusterInfo with info + for all clusters excluding managed jobs and services. Controllers + are included. + - all_jobs: List of managed jobs from all controllers. Each entry is a + dictionary job info, see jobs.queue_from_kubernetes_pod for details. + - context: Kubernetes context used to fetch the cluster information. + """ + context = kubernetes_utils.get_current_kube_config_context_name() + try: + pods = kubernetes_utils.get_skypilot_pods(context) + except exceptions.ResourcesUnavailableError as e: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Failed to get SkyPilot pods from ' + f'Kubernetes: {str(e)}') from e + all_clusters, jobs_controllers, _ = (kubernetes_utils.process_skypilot_pods( + pods, context)) + all_jobs = [] + with rich_utils.safe_status( + ux_utils.spinner_message( + '[bold cyan]Checking in-progress managed jobs[/]')) as spinner: + for i, job_controller_info in enumerate(jobs_controllers): + user = job_controller_info.user + pod = job_controller_info.pods[0] + status_message = '[bold cyan]Checking managed jobs controller' + if len(jobs_controllers) > 1: + status_message += f's ({i + 1}/{len(jobs_controllers)})' + spinner.update(f'{status_message}[/]') + try: + job_list = managed_jobs.queue_from_kubernetes_pod( + pod.metadata.name) + except RuntimeError as e: + logger.warning('Failed to get managed jobs from controller ' + f'{pod.metadata.name}: {str(e)}') + job_list = [] + # Add user field to jobs + for job in job_list: + job['user'] = user + all_jobs.extend(job_list) + # Reconcile cluster state between managed jobs and clusters: + # To maintain a clear separation between regular SkyPilot clusters + # and those from managed jobs, we need to exclude the latter from + # the main cluster list. + # We do this by reconstructing managed job cluster names from each + # job's name and ID. We then use this set to filter out managed + # clusters from the main cluster list. This is necessary because there + # are no identifiers distinguishing clusters from managed jobs from + # regular clusters. + managed_job_cluster_names = set() + for job in all_jobs: + # Managed job cluster name is - + managed_cluster_name = f'{job["job_name"]}-{job["job_id"]}' + managed_job_cluster_names.add(managed_cluster_name) + unmanaged_clusters = [ + c for c in all_clusters + if c.cluster_name not in managed_job_cluster_names + ] + return all_clusters, unmanaged_clusters, all_jobs, context + + def endpoints(cluster: str, port: Optional[Union[int, str]] = None) -> Dict[int, str]: """Gets the endpoint for a given cluster and port number (endpoint). diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 41d985ade41..bbe92b68c3a 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -19,6 +19,7 @@ from sky.provision import fluidstack from sky.provision import gcp from sky.provision import kubernetes +from sky.provision import lambda_cloud from sky.provision import runpod from sky.provision import vsphere from sky.utils import command_runner @@ -39,6 +40,8 @@ def _wrapper(*args, **kwargs): provider_name = kwargs.pop('provider_name') module_name = provider_name.lower() + if module_name == 'lambda': + module_name = 'lambda_cloud' module = globals().get(module_name) assert module is not None, f'Unknown provider: {module_name}' diff --git a/sky/provision/docker_utils.py b/sky/provision/docker_utils.py index 7bfa1724b83..3ee5d4dfc0c 100644 --- a/sky/provision/docker_utils.py +++ b/sky/provision/docker_utils.py @@ -253,12 +253,13 @@ def initialize(self) -> str: # issue with nvidia container toolkit: # https://github.com/NVIDIA/nvidia-container-toolkit/issues/48 self._run( - '[ -f /etc/docker/daemon.json ] || ' + '{ which jq || sudo apt update && sudo apt install -y jq; } && ' + '{ [ -f /etc/docker/daemon.json ] || ' 'echo "{}" | sudo tee /etc/docker/daemon.json;' 'sudo jq \'.["exec-opts"] = ["native.cgroupdriver=cgroupfs"]\' ' '/etc/docker/daemon.json > /tmp/daemon.json;' 'sudo mv /tmp/daemon.json /etc/docker/daemon.json;' - 'sudo systemctl restart docker') + 'sudo systemctl restart docker; } || true') user_docker_run_options = self.docker_config.get('run_options', []) start_command = docker_start_cmds( specific_image, @@ -335,7 +336,11 @@ def initialize(self) -> str: def _check_docker_installed(self): no_exist = 'NoExist' + # SkyPilot: Add the current user to the docker group first (if needed), + # before checking if docker is installed to avoid permission issues. cleaned_output = self._run( + 'id -nG $USER | grep -qw docker || ' + 'sudo usermod -aG docker $USER > /dev/null 2>&1;' f'command -v {self.docker_cmd} || echo {no_exist!r}') if no_exist in cleaned_output or 'docker' not in cleaned_output: logger.error( @@ -424,8 +429,8 @@ def _auto_configure_shm(self, run_options: List[str]) -> List[str]: def _check_container_exited(self) -> bool: if self.initialized: return True - output = (self._run(check_docker_running_cmd(self.container_name, - self.docker_cmd), - wait_for_docker_daemon=True)) - return 'false' in output.lower( - ) and 'no such object' not in output.lower() + output = self._run(check_docker_running_cmd(self.container_name, + self.docker_cmd), + wait_for_docker_daemon=True) + return ('false' in output.lower() and + 'no such object' not in output.lower()) diff --git a/sky/provision/gcp/config.py b/sky/provision/gcp/config.py index 416f0c1a694..a8292669a7c 100644 --- a/sky/provision/gcp/config.py +++ b/sky/provision/gcp/config.py @@ -670,8 +670,12 @@ def _configure_subnet(region: str, cluster_name: str, 'accessConfigs': [{ 'name': 'External NAT', 'type': 'ONE_TO_ONE_NAT', - }], + }] }] + # Add gVNIC if specified in config + enable_gvnic = config.provider_config.get('enable_gvnic', False) + if enable_gvnic: + default_interfaces[0]['nicType'] = 'gVNIC' enable_external_ips = _enable_external_ips(config) if not enable_external_ips: # Removing this key means the VM will not be assigned an external IP. diff --git a/sky/provision/kubernetes/utils.py b/sky/provision/kubernetes/utils.py index 3924074838e..0156c4d1091 100644 --- a/sky/provision/kubernetes/utils.py +++ b/sky/provision/kubernetes/utils.py @@ -15,9 +15,11 @@ import yaml import sky +from sky import clouds from sky import exceptions from sky import sky_logging from sky import skypilot_config +from sky import status_lib from sky.adaptors import kubernetes from sky.provision import constants as provision_constants from sky.provision.kubernetes import network_utils @@ -30,6 +32,7 @@ if typing.TYPE_CHECKING: from sky import backends + from sky import resources as resources_lib # TODO(romilb): Move constants to constants.py DEFAULT_NAMESPACE = 'default' @@ -2023,3 +2026,113 @@ def get_skypilot_pods(context: Optional[str] = None) -> List[Any]: 'kubectl get pods --selector=skypilot-cluster --all-namespaces' ) from None return pods + + +@dataclasses.dataclass +class KubernetesSkyPilotClusterInfo: + cluster_name_on_cloud: str + cluster_name: str + user: str + status: status_lib.ClusterStatus + pods: List[Any] + launched_at: float + resources: 'resources_lib.Resources' + resources_str: str + + +def process_skypilot_pods( + pods: List[Any], + context: Optional[str] = None +) -> Tuple[List[KubernetesSkyPilotClusterInfo], + List[KubernetesSkyPilotClusterInfo], + List[KubernetesSkyPilotClusterInfo]]: + """Process SkyPilot pods on k8s to extract cluster and controller info. + + Args: + pods: List of Kubernetes pod objects. + context: Kubernetes context name, used to detect GPU label formatter. + + Returns: + A tuple containing: + - List of KubernetesSkyPilotClusterInfo with all cluster info. + - List of KubernetesSkyPilotClusterInfo with job controller info. + - List of KubernetesSkyPilotClusterInfo with serve controller info. + """ + # pylint: disable=import-outside-toplevel + from sky import resources as resources_lib + clusters: Dict[str, KubernetesSkyPilotClusterInfo] = {} + jobs_controllers: List[KubernetesSkyPilotClusterInfo] = [] + serve_controllers: List[KubernetesSkyPilotClusterInfo] = [] + + for pod in pods: + cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster') + cluster_name = cluster_name_on_cloud.rsplit( + '-', 1 + )[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) + if cluster_name_on_cloud not in clusters: + # Parse the start time for the cluster + start_time = pod.status.start_time + if start_time is not None: + start_time = pod.status.start_time.timestamp() + + # Parse resources + cpu_request = parse_cpu_or_gpu_resource( + pod.spec.containers[0].resources.requests.get('cpu', '0')) + memory_request = parse_memory_resource( + pod.spec.containers[0].resources.requests.get('memory', '0'), + unit='G') + gpu_count = parse_cpu_or_gpu_resource( + pod.spec.containers[0].resources.requests.get( + 'nvidia.com/gpu', '0')) + gpu_name = None + if gpu_count > 0: + label_formatter, _ = (detect_gpu_label_formatter(context)) + assert label_formatter is not None, ( + 'GPU label formatter cannot be None if there are pods ' + f'requesting GPUs: {pod.metadata.name}') + gpu_label = label_formatter.get_label_key() + # Get GPU name from pod node selector + if pod.spec.node_selector is not None: + gpu_name = label_formatter.get_accelerator_from_label_value( + pod.spec.node_selector.get(gpu_label)) + + resources = resources_lib.Resources( + cloud=clouds.Kubernetes(), + cpus=int(cpu_request), + memory=int(memory_request), + accelerators=(f'{gpu_name}:{gpu_count}' + if gpu_count > 0 else None)) + if pod.status.phase == 'Pending': + # If pod is pending, do not show it in the status + continue + + cluster_info = KubernetesSkyPilotClusterInfo( + cluster_name_on_cloud=cluster_name_on_cloud, + cluster_name=cluster_name, + user=pod.metadata.labels.get('skypilot-user'), + status=status_lib.ClusterStatus.UP, + pods=[], + launched_at=start_time, + resources=resources, + resources_str='') + clusters[cluster_name_on_cloud] = cluster_info + # Check if cluster name is name of a controller + # Can't use controller_utils.Controllers.from_name(cluster_name) + # because hash is different across users + if 'sky-jobs-controller' in cluster_name_on_cloud: + jobs_controllers.append(cluster_info) + elif 'sky-serve-controller' in cluster_name_on_cloud: + serve_controllers.append(cluster_info) + else: + # Update start_time if this pod started earlier + pod_start_time = pod.status.start_time + if pod_start_time is not None: + pod_start_time = pod_start_time.timestamp() + if pod_start_time < clusters[cluster_name_on_cloud].launched_at: + clusters[cluster_name_on_cloud].launched_at = pod_start_time + clusters[cluster_name_on_cloud].pods.append(pod) + # Update resources_str in clusters: + for cluster in clusters.values(): + num_pods = len(cluster.pods) + cluster.resources_str = f'{num_pods}x {cluster.resources}' + return list(clusters.values()), jobs_controllers, serve_controllers diff --git a/sky/provision/lambda_cloud/__init__.py b/sky/provision/lambda_cloud/__init__.py new file mode 100644 index 00000000000..4992df4531b --- /dev/null +++ b/sky/provision/lambda_cloud/__init__.py @@ -0,0 +1,11 @@ +"""Lambda provisioner for SkyPilot.""" + +from sky.provision.lambda_cloud.config import bootstrap_instances +from sky.provision.lambda_cloud.instance import cleanup_ports +from sky.provision.lambda_cloud.instance import get_cluster_info +from sky.provision.lambda_cloud.instance import open_ports +from sky.provision.lambda_cloud.instance import query_instances +from sky.provision.lambda_cloud.instance import run_instances +from sky.provision.lambda_cloud.instance import stop_instances +from sky.provision.lambda_cloud.instance import terminate_instances +from sky.provision.lambda_cloud.instance import wait_instances diff --git a/sky/provision/lambda_cloud/config.py b/sky/provision/lambda_cloud/config.py new file mode 100644 index 00000000000..3066e7747fd --- /dev/null +++ b/sky/provision/lambda_cloud/config.py @@ -0,0 +1,10 @@ +"""Lambda Cloud configuration bootstrapping""" + +from sky.provision import common + + +def bootstrap_instances( + region: str, cluster_name: str, + config: common.ProvisionConfig) -> common.ProvisionConfig: + del region, cluster_name # unused + return config diff --git a/sky/provision/lambda_cloud/instance.py b/sky/provision/lambda_cloud/instance.py new file mode 100644 index 00000000000..d10c36496ab --- /dev/null +++ b/sky/provision/lambda_cloud/instance.py @@ -0,0 +1,261 @@ +"""Lambda instance provisioning.""" + +import time +from typing import Any, Dict, List, Optional + +from sky import authentication as auth +from sky import sky_logging +from sky import status_lib +from sky.provision import common +import sky.provision.lambda_cloud.lambda_utils as lambda_utils +from sky.utils import common_utils +from sky.utils import ux_utils + +POLL_INTERVAL = 1 + +logger = sky_logging.init_logger(__name__) +_lambda_client = None + + +def _get_lambda_client(): + global _lambda_client + if _lambda_client is None: + _lambda_client = lambda_utils.LambdaCloudClient() + return _lambda_client + + +def _filter_instances( + cluster_name_on_cloud: str, + status_filters: Optional[List[str]]) -> Dict[str, Dict[str, Any]]: + lambda_client = _get_lambda_client() + instances = lambda_client.list_instances() + possible_names = [ + f'{cluster_name_on_cloud}-head', + f'{cluster_name_on_cloud}-worker', + ] + + filtered_instances = {} + for instance in instances: + if (status_filters is not None and + instance['status'] not in status_filters): + continue + if instance.get('name') in possible_names: + filtered_instances[instance['id']] = instance + return filtered_instances + + +def _get_head_instance_id(instances: Dict[str, Any]) -> Optional[str]: + head_instance_id = None + for instance_id, instance in instances.items(): + if instance['name'].endswith('-head'): + head_instance_id = instance_id + break + return head_instance_id + + +def _get_ssh_key_name(prefix: str = '') -> str: + lambda_client = _get_lambda_client() + _, public_key_path = auth.get_or_generate_keys() + with open(public_key_path, 'r', encoding='utf-8') as f: + public_key = f.read() + name, exists = lambda_client.get_unique_ssh_key_name(prefix, public_key) + if not exists: + raise lambda_utils.LambdaCloudError('SSH key not found') + return name + + +def run_instances(region: str, cluster_name_on_cloud: str, + config: common.ProvisionConfig) -> common.ProvisionRecord: + """Runs instances for the given cluster""" + lambda_client = _get_lambda_client() + pending_status = ['booting'] + while True: + instances = _filter_instances(cluster_name_on_cloud, pending_status) + if not instances: + break + logger.info(f'Waiting for {len(instances)} instances to be ready.') + time.sleep(POLL_INTERVAL) + exist_instances = _filter_instances(cluster_name_on_cloud, ['active']) + head_instance_id = _get_head_instance_id(exist_instances) + + to_start_count = config.count - len(exist_instances) + if to_start_count < 0: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(exist_instances)} nodes, but {config.count} are required.') + if to_start_count == 0: + if head_instance_id is None: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} has no head node.') + logger.info(f'Cluster {cluster_name_on_cloud} already has ' + f'{len(exist_instances)} nodes, no need to start more.') + return common.ProvisionRecord( + provider_name='lambda', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=[], + ) + + created_instance_ids = [] + ssh_key_name = _get_ssh_key_name() + + def launch_nodes(node_type: str, quantity: int) -> List[str]: + try: + instance_ids = lambda_client.create_instances( + instance_type=config.node_config['InstanceType'], + region=region, + name=f'{cluster_name_on_cloud}-{node_type}', + quantity=quantity, + ssh_key_name=ssh_key_name, + ) + logger.info(f'Launched {len(instance_ids)} {node_type} node(s), ' + f'instance_ids: {instance_ids}') + return instance_ids + except Exception as e: + logger.warning(f'run_instances error: {e}') + raise + + if head_instance_id is None: + instance_ids = launch_nodes('head', 1) + assert len(instance_ids) == 1 + created_instance_ids.append(instance_ids[0]) + head_instance_id = instance_ids[0] + + assert head_instance_id is not None, 'head_instance_id should not be None' + + worker_node_count = to_start_count - 1 + if worker_node_count > 0: + instance_ids = launch_nodes('worker', worker_node_count) + created_instance_ids.extend(instance_ids) + + while True: + instances = _filter_instances(cluster_name_on_cloud, ['active']) + if len(instances) == config.count: + break + + time.sleep(POLL_INTERVAL) + + return common.ProvisionRecord( + provider_name='lambda', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=created_instance_ids, + ) + + +def wait_instances(region: str, cluster_name_on_cloud: str, + state: Optional[status_lib.ClusterStatus]) -> None: + del region, cluster_name_on_cloud, state # Unused. + + +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + raise NotImplementedError( + 'stop_instances is not supported for Lambda Cloud') + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + del provider_config + lambda_client = _get_lambda_client() + instances = _filter_instances(cluster_name_on_cloud, None) + + instance_ids_to_terminate = [] + for instance_id, instance in instances.items(): + if worker_only and not instance['name'].endswith('-worker'): + continue + instance_ids_to_terminate.append(instance_id) + + try: + logger.debug( + f'Terminating instances {", ".join(instance_ids_to_terminate)}') + lambda_client.remove_instances(instance_ids_to_terminate) + except Exception as e: # pylint: disable=broad-except + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'Failed to terminate instances {instance_ids_to_terminate}: ' + f'{common_utils.format_exception(e, use_bracket=False)}') from e + + +def get_cluster_info( + region: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, +) -> common.ClusterInfo: + del region # unused + running_instances = _filter_instances(cluster_name_on_cloud, ['active']) + instances: Dict[str, List[common.InstanceInfo]] = {} + head_instance_id = None + for instance_id, instance_info in running_instances.items(): + instances[instance_id] = [ + common.InstanceInfo( + instance_id=instance_id, + internal_ip=instance_info['private_ip'], + external_ip=instance_info['ip'], + ssh_port=22, + tags={}, + ) + ] + if instance_info['name'].endswith('-head'): + head_instance_id = instance_id + + return common.ClusterInfo( + instances=instances, + head_instance_id=head_instance_id, + provider_name='lambda', + provider_config=provider_config, + ) + + +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + instances = _filter_instances(cluster_name_on_cloud, None) + + status_map = { + 'booting': status_lib.ClusterStatus.INIT, + 'active': status_lib.ClusterStatus.UP, + 'unhealthy': status_lib.ClusterStatus.INIT, + 'terminating': status_lib.ClusterStatus.INIT, + } + statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} + for instance_id, instance in instances.items(): + status = status_map.get(instance['status']) + if non_terminated_only and status is None: + continue + statuses[instance_id] = status + return statuses + + +def open_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + raise NotImplementedError('open_ports is not supported for Lambda Cloud') + + +def cleanup_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + """See sky/provision/__init__.py""" + del cluster_name_on_cloud, ports, provider_config # Unused. diff --git a/sky/clouds/utils/lambda_utils.py b/sky/provision/lambda_cloud/lambda_utils.py similarity index 92% rename from sky/clouds/utils/lambda_utils.py rename to sky/provision/lambda_cloud/lambda_utils.py index 61c4b33ebe9..339919e80e7 100644 --- a/sky/clouds/utils/lambda_utils.py +++ b/sky/provision/lambda_cloud/lambda_utils.py @@ -1,4 +1,5 @@ """Lambda Cloud helper functions.""" + import json import os import time @@ -76,7 +77,7 @@ def refresh(self, instance_ids: List[str]) -> None: def raise_lambda_error(response: requests.Response) -> None: - """Raise LambdaCloudError if appropriate. """ + """Raise LambdaCloudError if appropriate.""" status_code = response.status_code if status_code == 200: return @@ -131,20 +132,22 @@ def __init__(self) -> None: self.api_key = self._credentials['api_key'] self.headers = {'Authorization': f'Bearer {self.api_key}'} - def create_instances(self, - instance_type: str = 'gpu_1x_a100_sxm4', - region: str = 'us-east-1', - quantity: int = 1, - name: str = '', - ssh_key_name: str = '') -> List[str]: + def create_instances( + self, + instance_type: str = 'gpu_1x_a100_sxm4', + region: str = 'us-east-1', + quantity: int = 1, + name: str = '', + ssh_key_name: str = '', + ) -> List[str]: """Launch new instances.""" # Optimization: # Most API requests are rate limited at ~1 request every second but # launch requests are rate limited at ~1 request every 10 seconds. # So don't use launch requests to check availability. # See https://docs.lambdalabs.com/cloud/rate-limiting/ for more. - available_regions = self.list_catalog()[instance_type]\ - ['regions_with_capacity_available'] + available_regions = (self.list_catalog()[instance_type] + ['regions_with_capacity_available']) available_regions = [reg['name'] for reg in available_regions] if region not in available_regions: if len(available_regions) > 0: @@ -163,27 +166,25 @@ def create_instances(self, 'instance_type_name': instance_type, 'ssh_key_names': [ssh_key_name], 'quantity': quantity, - 'name': name + 'name': name, }) response = _try_request_with_backoff( 'post', f'{API_ENDPOINT}/instance-operations/launch', data=data, - headers=self.headers) + headers=self.headers, + ) return response.json().get('data', []).get('instance_ids', []) - def remove_instances(self, *instance_ids: str) -> Dict[str, Any]: + def remove_instances(self, instance_ids: List[str]) -> Dict[str, Any]: """Terminate instances.""" - data = json.dumps({ - 'instance_ids': [ - instance_ids[0] # TODO(ewzeng) don't hardcode - ] - }) + data = json.dumps({'instance_ids': instance_ids}) response = _try_request_with_backoff( 'post', f'{API_ENDPOINT}/instance-operations/terminate', data=data, - headers=self.headers) + headers=self.headers, + ) return response.json().get('data', []).get('terminated_instances', []) def list_instances(self) -> List[Dict[str, Any]]: diff --git a/sky/provision/paperspace/utils.py b/sky/provision/paperspace/utils.py index db2da7b4610..d9eceefba19 100644 --- a/sky/provision/paperspace/utils.py +++ b/sky/provision/paperspace/utils.py @@ -132,6 +132,8 @@ def set_sky_key_script(self, public_key: str) -> None: 'apt-get update \n' 'apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin \n' # pylint: disable=line-too-long 'fi \n' + # TODO(tian): Maybe remove this as well since we are now adding + # users to docker group in the DockerInitializer. Need to test. 'usermod -aG docker paperspace \n' f'echo "{public_key}" >> /home/paperspace/.ssh/authorized_keys \n') try: diff --git a/sky/provision/provisioner.py b/sky/provision/provisioner.py index b2ac6d6660f..7706a3d489b 100644 --- a/sky/provision/provisioner.py +++ b/sky/provision/provisioner.py @@ -571,7 +571,10 @@ def post_provision_runtime_setup( provision_record=provision_record, custom_resource=custom_resource) except Exception: # pylint: disable=broad-except - logger.error('*** Failed setting up cluster. ***') + logger.error( + ux_utils.error_message( + 'Failed to set up SkyPilot runtime on cluster.', + provision_logging.config.log_path)) logger.debug(f'Stacktrace:\n{traceback.format_exc()}') with ux_utils.print_exception_no_traceback(): raise diff --git a/sky/resources.py b/sky/resources.py index e9a522cef48..384f2b6a548 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -842,12 +842,6 @@ def _try_validate_image_id(self) -> None: if self.extract_docker_image() is not None: # TODO(tian): validate the docker image exists / of reasonable size - if self.accelerators is not None: - for acc in self.accelerators.keys(): - if acc.lower().startswith('tpu'): - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'Docker image is not supported for TPU VM.') if self.cloud is not None: self.cloud.check_features_are_supported( self, {clouds.CloudImplementationFeatures.DOCKER_IMAGE}) @@ -1032,6 +1026,12 @@ def make_deploy_variables(self, cluster_name: resources_utils.ClusterName, self.accelerators is not None): initial_setup_commands = [constants.DISABLE_GPU_ECC_COMMAND] + docker_image = self.extract_docker_image() + + # Cloud specific variables + cloud_specific_variables = self.cloud.make_deploy_resources_variables( + self, cluster_name, region, zones, dryrun) + # Docker run options docker_run_options = skypilot_config.get_nested( ('docker', 'run_options'), @@ -1039,18 +1039,17 @@ def make_deploy_variables(self, cluster_name: resources_utils.ClusterName, override_configs=self.cluster_config_overrides) if isinstance(docker_run_options, str): docker_run_options = [docker_run_options] + # Special accelerator runtime might require additional docker run + # options. e.g., for TPU, we need --privileged. + if 'docker_run_options' in cloud_specific_variables: + docker_run_options.extend( + cloud_specific_variables['docker_run_options']) if docker_run_options and isinstance(self.cloud, clouds.Kubernetes): logger.warning( f'{colorama.Style.DIM}Docker run options are specified, ' 'but ignored for Kubernetes: ' f'{" ".join(docker_run_options)}' f'{colorama.Style.RESET_ALL}') - - docker_image = self.extract_docker_image() - - # Cloud specific variables - cloud_specific_variables = self.cloud.make_deploy_resources_variables( - self, cluster_name, region, zones, dryrun) return dict( cloud_specific_variables, **{ diff --git a/sky/serve/__init__.py b/sky/serve/__init__.py index d85b6e9311e..f93495809c3 100644 --- a/sky/serve/__init__.py +++ b/sky/serve/__init__.py @@ -8,6 +8,7 @@ from sky.serve.core import down from sky.serve.core import status from sky.serve.core import tail_logs +from sky.serve.core import terminate_replica from sky.serve.core import up from sky.serve.core import update from sky.serve.serve_state import ReplicaStatus @@ -42,6 +43,7 @@ 'SKY_SERVE_CONTROLLER_NAME', 'SKYSERVE_METADATA_DIR', 'status', + 'terminate_replica', 'tail_logs', 'up', 'update', diff --git a/sky/serve/constants.py b/sky/serve/constants.py index 7775c3f8a6e..3974293190e 100644 --- a/sky/serve/constants.py +++ b/sky/serve/constants.py @@ -92,4 +92,11 @@ # change for the serve_utils.ServeCodeGen, we need to bump this version, so that # the user can be notified to update their SkyPilot serve version on the remote # cluster. -SERVE_VERSION = 1 +# Changelog: +# v1.0 - Introduce rolling update. +# v2.0 - Added template-replica feature. +SERVE_VERSION = 2 + +TERMINATE_REPLICA_VERSION_MISMATCH_ERROR = ( + 'The version of service is outdated and does not support manually ' + 'terminating replicas. Please terminate the service and spin up again.') diff --git a/sky/serve/controller.py b/sky/serve/controller.py index 580964273ef..75d14b76079 100644 --- a/sky/serve/controller.py +++ b/sky/serve/controller.py @@ -9,6 +9,7 @@ import traceback from typing import Any, Dict, List +import colorama import fastapi from fastapi import responses import uvicorn @@ -157,6 +158,75 @@ async def update_service(request: fastapi.Request) -> fastapi.Response: return responses.JSONResponse(content={'message': 'Error'}, status_code=500) + @self._app.post('/controller/terminate_replica') + async def terminate_replica( + request: fastapi.Request) -> fastapi.Response: + request_data = await request.json() + replica_id = request_data['replica_id'] + assert isinstance(replica_id, + int), 'Error: replica ID must be an integer.' + purge = request_data['purge'] + assert isinstance(purge, bool), 'Error: purge must be a boolean.' + replica_info = serve_state.get_replica_info_from_id( + self._service_name, replica_id) + assert replica_info is not None, (f'Error: replica ' + f'{replica_id} does not exist.') + replica_status = replica_info.status + + if replica_status == serve_state.ReplicaStatus.SHUTTING_DOWN: + return responses.JSONResponse( + status_code=409, + content={ + 'message': + f'Replica {replica_id} of service ' + f'{self._service_name!r} is already in the process ' + f'of terminating. Skip terminating now.' + }) + + if (replica_status in serve_state.ReplicaStatus.failed_statuses() + and not purge): + return responses.JSONResponse( + status_code=409, + content={ + 'message': f'{colorama.Fore.YELLOW}Replica ' + f'{replica_id} of service ' + f'{self._service_name!r} is in failed ' + f'status ({replica_info.status}). ' + f'Skipping its termination as it could ' + f'lead to a resource leak. ' + f'(Use `sky serve down ' + f'{self._service_name!r} --replica-id ' + f'{replica_id} --purge` to ' + 'forcefully terminate the replica.)' + f'{colorama.Style.RESET_ALL}' + }) + + self._replica_manager.scale_down(replica_id, purge=purge) + + action = 'terminated' if not purge else 'purged' + message = (f'{colorama.Fore.GREEN}Replica {replica_id} of service ' + f'{self._service_name!r} is scheduled to be ' + f'{action}.{colorama.Style.RESET_ALL}\n' + f'Please use {ux_utils.BOLD}sky serve status ' + f'{self._service_name}{ux_utils.RESET_BOLD} ' + f'to check the latest status.') + return responses.JSONResponse(status_code=200, + content={'message': message}) + + @self._app.exception_handler(Exception) + async def validation_exception_handler( + request: fastapi.Request, exc: Exception) -> fastapi.Response: + with ux_utils.enable_traceback(): + logger.error(f'Error in controller: {exc!r}') + return responses.JSONResponse( + status_code=500, + content={ + 'message': + (f'Failed method {request.method} at URL {request.url}.' + f' Exception message is {exc!r}.') + }, + ) + threading.Thread(target=self._run_autoscaler).start() logger.info('SkyServe Controller started on ' diff --git a/sky/serve/core.py b/sky/serve/core.py index 3ad260213f1..691a3edea0b 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -503,6 +503,53 @@ def down( sky_logging.print(stdout) +@usage_lib.entrypoint +def terminate_replica(service_name: str, replica_id: int, purge: bool) -> None: + """Tear down a specific replica for the given service. + + Args: + service_name: Name of the service. + replica_id: ID of replica to terminate. + purge: Whether to terminate replicas in a failed status. These replicas + may lead to resource leaks, so we require the user to explicitly + specify this flag to make sure they are aware of this potential + resource leak. + + Raises: + sky.exceptions.ClusterNotUpError: if the sky sere controller is not up. + RuntimeError: if failed to terminate the replica. + """ + handle = backend_utils.is_controller_accessible( + controller=controller_utils.Controllers.SKY_SERVE_CONTROLLER, + stopped_message= + 'No service is running now. Please spin up a service first.', + non_existent_message='No service is running now. ' + 'Please spin up a service first.', + ) + + backend = backend_utils.get_backend_from_handle(handle) + assert isinstance(backend, backends.CloudVmRayBackend) + + code = serve_utils.ServeCodeGen.terminate_replica(service_name, replica_id, + purge) + returncode, stdout, stderr = backend.run_on_head(handle, + code, + require_outputs=True, + stream_logs=False, + separate_stderr=True) + + try: + subprocess_utils.handle_returncode(returncode, + code, + 'Failed to terminate the replica', + stderr, + stream_logs=True) + except exceptions.CommandError as e: + raise RuntimeError(e.error_msg) from e + + sky_logging.print(stdout) + + @usage_lib.entrypoint def status( service_names: Optional[Union[str, diff --git a/sky/serve/replica_managers.py b/sky/serve/replica_managers.py index 337b28ba61b..c0e5220e779 100644 --- a/sky/serve/replica_managers.py +++ b/sky/serve/replica_managers.py @@ -247,6 +247,8 @@ class ReplicaStatusProperty: is_scale_down: bool = False # The replica's spot instance was preempted. preempted: bool = False + # Whether the replica is purged. + purged: bool = False def remove_terminated_replica(self) -> bool: """Whether to remove the replica record from the replica table. @@ -307,6 +309,8 @@ def should_track_service_status(self) -> bool: return False if self.preempted: return False + if self.purged: + return False return True def to_replica_status(self) -> serve_state.ReplicaStatus: @@ -590,7 +594,7 @@ def scale_up(self, """ raise NotImplementedError - def scale_down(self, replica_id: int) -> None: + def scale_down(self, replica_id: int, purge: bool = False) -> None: """Scale down replica with replica_id.""" raise NotImplementedError @@ -679,7 +683,8 @@ def _terminate_replica(self, replica_id: int, sync_down_logs: bool, replica_drain_delay_seconds: int, - is_scale_down: bool = False) -> None: + is_scale_down: bool = False, + purge: bool = False) -> None: if replica_id in self._launch_process_pool: info = serve_state.get_replica_info_from_id(self._service_name, @@ -763,16 +768,18 @@ def _download_and_stream_logs(info: ReplicaInfo): ) info.status_property.sky_down_status = ProcessStatus.RUNNING info.status_property.is_scale_down = is_scale_down + info.status_property.purged = purge serve_state.add_or_update_replica(self._service_name, replica_id, info) p.start() self._down_process_pool[replica_id] = p - def scale_down(self, replica_id: int) -> None: + def scale_down(self, replica_id: int, purge: bool = False) -> None: self._terminate_replica( replica_id, sync_down_logs=False, replica_drain_delay_seconds=_DEFAULT_DRAIN_SECONDS, - is_scale_down=True) + is_scale_down=True, + purge=purge) def _handle_preemption(self, info: ReplicaInfo) -> bool: """Handle preemption of the replica if any error happened. @@ -911,6 +918,8 @@ def _refresh_process_pool(self) -> None: # since user should fixed the error before update. elif info.version != self.latest_version: removal_reason = 'for version outdated' + elif info.status_property.purged: + removal_reason = 'for purge' else: logger.info(f'Termination of replica {replica_id} ' 'finished. Replica info is kept since some ' diff --git a/sky/serve/serve_utils.py b/sky/serve/serve_utils.py index 0ecf34135a7..3a416dd2932 100644 --- a/sky/serve/serve_utils.py +++ b/sky/serve/serve_utils.py @@ -246,9 +246,11 @@ def set_service_status_and_active_versions_from_replica( update_mode: UpdateMode) -> None: record = serve_state.get_service_from_name(service_name) if record is None: - raise ValueError('The service is up-ed in an old version and does not ' - 'support update. Please `sky serve down` ' - 'it first and relaunch the service.') + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'The service is up-ed in an old version and does not ' + 'support update. Please `sky serve down` ' + 'it first and relaunch the service.') if record['status'] == serve_state.ServiceStatus.SHUTTING_DOWN: # When the service is shutting down, there is a period of time which the # controller still responds to the request, and the replica is not @@ -289,7 +291,8 @@ def update_service_status() -> None: def update_service_encoded(service_name: str, version: int, mode: str) -> str: service_status = _get_service_status(service_name) if service_status is None: - raise ValueError(f'Service {service_name!r} does not exist.') + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Service {service_name!r} does not exist.') controller_port = service_status['controller_port'] resp = requests.post( _CONTROLLER_URL.format(CONTROLLER_PORT=controller_port) + @@ -299,20 +302,56 @@ def update_service_encoded(service_name: str, version: int, mode: str) -> str: 'mode': mode, }) if resp.status_code == 404: - raise ValueError('The service is up-ed in an old version and does not ' - 'support update. Please `sky serve down` ' - 'it first and relaunch the service. ') + with ux_utils.print_exception_no_traceback(): + raise ValueError( + 'The service is up-ed in an old version and does not ' + 'support update. Please `sky serve down` ' + 'it first and relaunch the service. ') elif resp.status_code == 400: - raise ValueError(f'Client error during service update: {resp.text}') + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Client error during service update: {resp.text}') elif resp.status_code == 500: - raise RuntimeError(f'Server error during service update: {resp.text}') + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'Server error during service update: {resp.text}') elif resp.status_code != 200: - raise ValueError(f'Failed to update service: {resp.text}') + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Failed to update service: {resp.text}') service_msg = resp.json()['message'] return common_utils.encode_payload(service_msg) +def terminate_replica(service_name: str, replica_id: int, purge: bool) -> str: + service_status = _get_service_status(service_name) + if service_status is None: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Service {service_name!r} does not exist.') + replica_info = serve_state.get_replica_info_from_id(service_name, + replica_id) + if replica_info is None: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'Replica {replica_id} for service {service_name} does not ' + 'exist.') + + controller_port = service_status['controller_port'] + resp = requests.post( + _CONTROLLER_URL.format(CONTROLLER_PORT=controller_port) + + '/controller/terminate_replica', + json={ + 'replica_id': replica_id, + 'purge': purge, + }) + + message: str = resp.json()['message'] + if resp.status_code != 200: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Failed to terminate replica {replica_id} ' + f'in {service_name}. Reason:\n{message}') + return message + + def _get_service_status( service_name: str, with_replica_info: bool = True) -> Optional[Dict[str, Any]]: @@ -644,8 +683,9 @@ def _get_replica_status() -> serve_state.ReplicaStatus: for info in replica_info: if info.replica_id == replica_id: return info.status - raise ValueError( - _FAILED_TO_FIND_REPLICA_MSG.format(replica_id=replica_id)) + with ux_utils.print_exception_no_traceback(): + raise ValueError( + _FAILED_TO_FIND_REPLICA_MSG.format(replica_id=replica_id)) finish_stream = ( lambda: _get_replica_status() != serve_state.ReplicaStatus.PROVISIONING) @@ -735,7 +775,7 @@ def _get_replicas(service_record: Dict[str, Any]) -> str: def get_endpoint(service_record: Dict[str, Any]) -> str: - # Don't use backend_utils.is_controller_up since it is too slow. + # Don't use backend_utils.is_controller_accessible since it is too slow. handle = global_user_state.get_handle_from_cluster_name( SKY_SERVE_CONTROLLER_NAME) assert isinstance(handle, backends.CloudVmRayResourceHandle) @@ -915,6 +955,18 @@ def terminate_services(cls, service_names: Optional[List[str]], ] return cls._build(code) + @classmethod + def terminate_replica(cls, service_name: str, replica_id: int, + purge: bool) -> str: + code = [ + f'(lambda: print(serve_utils.terminate_replica({service_name!r}, ' + f'{replica_id}, {purge}), end="", flush=True) ' + 'if getattr(constants, "SERVE_VERSION", 0) >= 2 else ' + f'exec("raise RuntimeError(' + f'{constants.TERMINATE_REPLICA_VERSION_MISMATCH_ERROR!r})"))()' + ] + return cls._build(code) + @classmethod def wait_service_registration(cls, service_name: str, job_id: int) -> str: code = [ diff --git a/sky/setup_files/MANIFEST.in b/sky/setup_files/MANIFEST.in index 54ab3b55a32..0cd93f485e0 100644 --- a/sky/setup_files/MANIFEST.in +++ b/sky/setup_files/MANIFEST.in @@ -6,7 +6,6 @@ include sky/setup_files/* include sky/skylet/*.sh include sky/skylet/LICENSE include sky/skylet/providers/ibm/* -include sky/skylet/providers/lambda_cloud/* include sky/skylet/providers/oci/* include sky/skylet/providers/scp/* include sky/skylet/providers/*.py diff --git a/sky/skylet/constants.py b/sky/skylet/constants.py index 5729d75c968..032ad5d25b1 100644 --- a/sky/skylet/constants.py +++ b/sky/skylet/constants.py @@ -155,8 +155,8 @@ # We use --system-site-packages to reuse the system site packages to avoid # the overhead of installing the same packages in the new environment. f'[ -d {SKY_REMOTE_PYTHON_ENV} ] || ' - f'{{ {SKY_PYTHON_CMD} -m venv {SKY_REMOTE_PYTHON_ENV} --system-site-packages && ' - f'echo "$(echo {SKY_REMOTE_PYTHON_ENV})/bin/python" > {SKY_PYTHON_PATH_FILE}; }};' + f'{SKY_PYTHON_CMD} -m venv {SKY_REMOTE_PYTHON_ENV} --system-site-packages;' + f'echo "$(echo {SKY_REMOTE_PYTHON_ENV})/bin/python" > {SKY_PYTHON_PATH_FILE};' ) _sky_version = str(version.parse(sky.__version__)) diff --git a/sky/skylet/providers/lambda_cloud/__init__.py b/sky/skylet/providers/lambda_cloud/__init__.py deleted file mode 100644 index 64dac295eb5..00000000000 --- a/sky/skylet/providers/lambda_cloud/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -"""Lambda Cloud node provider""" -from sky.skylet.providers.lambda_cloud.node_provider import LambdaNodeProvider diff --git a/sky/skylet/providers/lambda_cloud/node_provider.py b/sky/skylet/providers/lambda_cloud/node_provider.py deleted file mode 100644 index 557afe75568..00000000000 --- a/sky/skylet/providers/lambda_cloud/node_provider.py +++ /dev/null @@ -1,320 +0,0 @@ -import logging -import os -from threading import RLock -import time -from typing import Any, Dict, List, Optional - -from ray.autoscaler.node_provider import NodeProvider -from ray.autoscaler.tags import NODE_KIND_HEAD -from ray.autoscaler.tags import NODE_KIND_WORKER -from ray.autoscaler.tags import STATUS_UP_TO_DATE -from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME -from ray.autoscaler.tags import TAG_RAY_NODE_KIND -from ray.autoscaler.tags import TAG_RAY_NODE_NAME -from ray.autoscaler.tags import TAG_RAY_NODE_STATUS -from ray.autoscaler.tags import TAG_RAY_USER_NODE_TYPE - -from sky import authentication as auth -from sky.clouds.utils import lambda_utils -from sky.utils import command_runner -from sky.utils import common_utils -from sky.utils import subprocess_utils -from sky.utils import ux_utils - -_TAG_PATH_PREFIX = '~/.sky/generated/lambda_cloud/metadata' -_REMOTE_SSH_KEY_NAME = '~/.lambda_cloud/ssh_key_name' -_REMOTE_RAY_SSH_KEY = '~/ray_bootstrap_key.pem' -_REMOTE_RAY_YAML = '~/ray_bootstrap_config.yaml' -_GET_INTERNAL_IP_CMD = 's=$(ip -4 -br addr show | grep UP); echo "$s"; echo "$s" | grep -Eo "(10\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)|172\.(1[6-9]|2[0-9]|3[0-1])|104\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)"' - -logger = logging.getLogger(__name__) - - -def synchronized(f): - - def wrapper(self, *args, **kwargs): - self.lock.acquire() - try: - return f(self, *args, **kwargs) - finally: - self.lock.release() - - return wrapper - - -class LambdaNodeProvider(NodeProvider): - """Node Provider for Lambda Cloud. - - This provider assumes Lambda Cloud credentials are set. - """ - - def __init__(self, provider_config: Dict[str, Any], - cluster_name: str) -> None: - NodeProvider.__init__(self, provider_config, cluster_name) - self.lock = RLock() - self.lambda_client = lambda_utils.LambdaCloudClient() - self.cached_nodes: Dict[str, Dict[str, Any]] = {} - self.metadata = lambda_utils.Metadata(_TAG_PATH_PREFIX, cluster_name) - self.ssh_key_path = os.path.expanduser(auth.PRIVATE_SSH_KEY_PATH) - - def _get_ssh_key_name(prefix: str) -> str: - public_key_path = os.path.expanduser(auth.PUBLIC_SSH_KEY_PATH) - with open(public_key_path, 'r') as f: - public_key = f.read() - name, exists = self.lambda_client.get_unique_ssh_key_name( - prefix, public_key) - if not exists: - raise lambda_utils.LambdaCloudError('SSH key not found') - return name - - ray_yaml_path = os.path.expanduser(_REMOTE_RAY_YAML) - self.on_head = (os.path.exists(ray_yaml_path) and - common_utils.read_yaml(ray_yaml_path)['cluster_name'] - == cluster_name) - - if self.on_head: - self.ssh_key_path = os.path.expanduser(_REMOTE_RAY_SSH_KEY) - ssh_key_name_path = os.path.expanduser(_REMOTE_SSH_KEY_NAME) - if os.path.exists(ssh_key_name_path): - with open(ssh_key_name_path, 'r') as f: - self.ssh_key_name = f.read() - else: - # At this point, `~/.ssh/sky-key.pub` contains the public - # key used to launch this cluster. Use it to determine - # ssh key name and store the name in _REMOTE_SSH_KEY_NAME. - # Note: this case only runs during cluster launch, so it is - # not possible for ~/.ssh/sky-key.pub to already be regenerated - # by the user. - self.ssh_key_name = _get_ssh_key_name('') - with open(ssh_key_name_path, 'w', encoding='utf-8') as f: - f.write(self.ssh_key_name) - else: - # On local - self.ssh_key_name = _get_ssh_key_name( - f'sky-key-{common_utils.get_user_hash()}') - - def _guess_and_add_missing_tags(self, vms: List[Dict[str, Any]]) -> None: - """Adds missing vms to local tag file and guesses their tags.""" - for node in vms: - if self.metadata.get(node['id']) is not None: - pass - elif node['name'] == f'{self.cluster_name}-head': - self.metadata.set( - node['id'], { - 'tags': { - TAG_RAY_CLUSTER_NAME: self.cluster_name, - TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, - TAG_RAY_NODE_KIND: NODE_KIND_HEAD, - TAG_RAY_USER_NODE_TYPE: 'ray_head_default', - TAG_RAY_NODE_NAME: f'ray-{self.cluster_name}-head', - } - }) - elif node['name'] == f'{self.cluster_name}-worker': - self.metadata.set( - node['id'], { - 'tags': { - TAG_RAY_CLUSTER_NAME: self.cluster_name, - TAG_RAY_NODE_STATUS: STATUS_UP_TO_DATE, - TAG_RAY_NODE_KIND: NODE_KIND_WORKER, - TAG_RAY_USER_NODE_TYPE: 'ray_worker_default', - TAG_RAY_NODE_NAME: f'ray-{self.cluster_name}-worker', - } - }) - - def _list_instances_in_cluster(self) -> List[Dict[str, Any]]: - """List running instances in cluster.""" - vms = self.lambda_client.list_instances() - possible_names = [ - f'{self.cluster_name}-head', f'{self.cluster_name}-worker' - ] - return [node for node in vms if node.get('name') in possible_names] - - @synchronized - def _get_filtered_nodes(self, tag_filters: Dict[str, - str]) -> Dict[str, Any]: - - def _extract_metadata(vm: Dict[str, Any]) -> Dict[str, Any]: - metadata = {'id': vm['id'], 'status': vm['status'], 'tags': {}} - instance_info = self.metadata.get(vm['id']) - if instance_info is not None: - metadata['tags'] = instance_info['tags'] - metadata['external_ip'] = vm.get('ip') - return metadata - - def _match_tags(vm: Dict[str, Any]): - vm_info = self.metadata.get(vm['id']) - tags = {} if vm_info is None else vm_info['tags'] - for k, v in tag_filters.items(): - if tags.get(k) != v: - return False - return True - - def _get_internal_ip(node: Dict[str, Any]): - # TODO(ewzeng): cache internal ips in metadata file to reduce - # ssh overhead. - if node['external_ip'] is None or node['status'] != 'active': - node['internal_ip'] = None - return - runner = command_runner.SSHCommandRunner( - node=(node['external_ip'], 22), - ssh_user='ubuntu', - ssh_private_key=self.ssh_key_path) - rc, stdout, stderr = runner.run(_GET_INTERNAL_IP_CMD, - require_outputs=True, - stream_logs=False) - subprocess_utils.handle_returncode( - rc, - _GET_INTERNAL_IP_CMD, - 'Failed get obtain private IP from node', - stderr=stdout + stderr) - node['internal_ip'] = stdout.strip() - - vms = self._list_instances_in_cluster() - self.metadata.refresh([node['id'] for node in vms]) - self._guess_and_add_missing_tags(vms) - nodes = [_extract_metadata(vm) for vm in filter(_match_tags, vms)] - nodes = [ - node for node in nodes - if node['status'] not in ['terminating', 'terminated'] - ] - subprocess_utils.run_in_parallel(_get_internal_ip, nodes) - self.cached_nodes = {node['id']: node for node in nodes} - return self.cached_nodes - - def non_terminated_nodes(self, tag_filters: Dict[str, str]) -> List[str]: - """Return a list of node ids filtered by the specified tags dict. - - This list must not include terminated nodes. For performance reasons, - providers are allowed to cache the result of a call to - non_terminated_nodes() to serve single-node queries - (e.g. is_running(node_id)). This means that non_terminated_nodes() must - be called again to refresh results. - - Examples: - >>> provider.non_terminated_nodes({TAG_RAY_NODE_KIND: "worker"}) - ["node-1", "node-2"] - """ - nodes = self._get_filtered_nodes(tag_filters=tag_filters) - return [k for k, _ in nodes.items()] - - def is_running(self, node_id: str) -> bool: - """Return whether the specified node is running.""" - return self._get_cached_node(node_id=node_id) is not None - - def is_terminated(self, node_id: str) -> bool: - """Return whether the specified node is terminated.""" - return self._get_cached_node(node_id=node_id) is None - - def node_tags(self, node_id: str) -> Dict[str, str]: - """Returns the tags of the given node (string dict).""" - node = self._get_cached_node(node_id=node_id) - if node is None: - return {} - return node['tags'] - - def external_ip(self, node_id: str) -> Optional[str]: - """Returns the external ip of the given node.""" - node = self._get_cached_node(node_id=node_id) - if node is None: - return None - ip = node.get('external_ip') - with ux_utils.print_exception_no_traceback(): - if ip is None: - raise lambda_utils.LambdaCloudError( - 'A node ip address was not found. Either ' - '(1) Lambda Cloud has internally errored, or ' - '(2) the cluster is still booting. ' - 'You can manually terminate the cluster on the ' - 'Lambda Cloud console or (in case 2) wait for ' - 'booting to finish (~2 minutes).') - return ip - - def internal_ip(self, node_id: str) -> Optional[str]: - """Returns the internal ip (Ray ip) of the given node.""" - node = self._get_cached_node(node_id=node_id) - if node is None: - return None - ip = node.get('internal_ip') - with ux_utils.print_exception_no_traceback(): - if ip is None: - raise lambda_utils.LambdaCloudError( - 'A node ip address was not found. Either ' - '(1) Lambda Cloud has internally errored, or ' - '(2) the cluster is still booting. ' - 'You can manually terminate the cluster on the ' - 'Lambda Cloud console or (in case 2) wait for ' - 'booting to finish (~2 minutes).') - return ip - - def create_node(self, node_config: Dict[str, Any], tags: Dict[str, str], - count: int) -> None: - """Creates a number of nodes within the namespace.""" - # Get tags - config_tags = node_config.get('tags', {}).copy() - config_tags.update(tags) - config_tags[TAG_RAY_CLUSTER_NAME] = self.cluster_name - - # Create nodes - instance_type = node_config['InstanceType'] - region = self.provider_config['region'] - - if config_tags[TAG_RAY_NODE_KIND] == NODE_KIND_HEAD: - name = f'{self.cluster_name}-head' - # Occasionally, the head node will continue running for a short - # period after termination. This can lead to the following bug: - # 1. Head node autodowns but continues running. - # 2. The next autodown event is triggered, which executes ray up. - # 3. Head node stops running. - # In this case, a new head node is created after the cluster has - # terminated. We avoid this with the following check: - if self.on_head: - raise lambda_utils.LambdaCloudError('Head already exists.') - else: - name = f'{self.cluster_name}-worker' - - # Lambda launch api only supports launching one node at a time, - # so we do a loop. Remove loop when launch api allows quantity > 1 - booting_list = [] - for _ in range(count): - vm_id = self.lambda_client.create_instances( - instance_type=instance_type, - region=region, - quantity=1, - name=name, - ssh_key_name=self.ssh_key_name)[0] - self.metadata.set(vm_id, {'tags': config_tags}) - booting_list.append(vm_id) - time.sleep(10) # Avoid api rate limits - - # Wait for nodes to finish booting - while True: - vms = self._list_instances_in_cluster() - for vm_id in booting_list.copy(): - for vm in vms: - if vm['id'] == vm_id and vm['status'] == 'active': - booting_list.remove(vm_id) - if len(booting_list) == 0: - return - time.sleep(10) - - @synchronized - def set_node_tags(self, node_id: str, tags: Dict[str, str]) -> None: - """Sets the tag values (string dict) for the specified node.""" - node = self._get_node(node_id) - assert node is not None, node_id - node['tags'].update(tags) - self.metadata.set(node_id, {'tags': node['tags']}) - - def terminate_node(self, node_id: str) -> None: - """Terminates the specified node.""" - self.lambda_client.remove_instances(node_id) - self.metadata.set(node_id, None) - - def _get_node(self, node_id: str) -> Optional[Dict[str, Any]]: - self._get_filtered_nodes({}) # Side effect: updates cache - return self.cached_nodes.get(node_id, None) - - def _get_cached_node(self, node_id: str) -> Optional[Dict[str, Any]]: - if node_id in self.cached_nodes: - return self.cached_nodes[node_id] - return self._get_node(node_id=node_id) diff --git a/sky/templates/gcp-ray.yml.j2 b/sky/templates/gcp-ray.yml.j2 index 5f06eef05c7..f3e6232d5d8 100644 --- a/sky/templates/gcp-ray.yml.j2 +++ b/sky/templates/gcp-ray.yml.j2 @@ -64,6 +64,9 @@ provider: # leakage. disable_launch_config_check: true use_managed_instance_group: {{ gcp_use_managed_instance_group }} +{%- if enable_gvnic %} + enable_gvnic: {{ enable_gvnic }} +{%- endif %} auth: ssh_user: gcpuser diff --git a/sky/templates/lambda-ray.yml.j2 b/sky/templates/lambda-ray.yml.j2 index 6b6d94cfb3c..5df3655c566 100644 --- a/sky/templates/lambda-ray.yml.j2 +++ b/sky/templates/lambda-ray.yml.j2 @@ -5,9 +5,29 @@ max_workers: {{num_nodes - 1}} upscaling_speed: {{num_nodes - 1}} idle_timeout_minutes: 60 +{%- if docker_image is not none %} +docker: + image: {{docker_image}} + container_name: {{docker_container_name}} + run_options: + - --ulimit nofile=1048576:1048576 + {%- for run_option in docker_run_options %} + - {{run_option}} + {%- endfor %} + {%- if docker_login_config is not none %} + docker_login_config: + username: |- + {{docker_login_config.username}} + password: |- + {{docker_login_config.password}} + server: |- + {{docker_login_config.server}} + {%- endif %} +{%- endif %} + provider: type: external - module: sky.skylet.providers.lambda_cloud.LambdaNodeProvider + module: sky.provision.lambda region: {{region}} # Disable launch config check for worker nodes as it can cause resource # leakage. @@ -25,14 +45,6 @@ available_node_types: resources: {} node_config: InstanceType: {{instance_type}} -{% if num_nodes > 1 %} - ray_worker_default: - min_workers: {{num_nodes - 1}} - max_workers: {{num_nodes - 1}} - resources: {} - node_config: - InstanceType: {{instance_type}} -{%- endif %} head_node_type: ray_head_default @@ -64,7 +76,10 @@ setup_commands: # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. # Line 'mkdir -p ..': disable host key check # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` - - sudo systemctl stop unattended-upgrades || true; + - {%- for initial_setup_command in initial_setup_commands %} + {{ initial_setup_command }} + {%- endfor %} + sudo systemctl stop unattended-upgrades || true; sudo systemctl disable unattended-upgrades || true; sudo sed -i 's/Unattended-Upgrade "1"/Unattended-Upgrade "0"/g' /etc/apt/apt.conf.d/20auto-upgrades || true; sudo kill -9 `sudo lsof /var/lib/dpkg/lock-frontend | awk '{print $2}' | tail -n 1` || true; @@ -81,31 +96,5 @@ setup_commands: mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); -# Command to start ray on the head node. You don't need to change this. -# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH -# connection, which is expensive. Try your best to co-locate commands into fewer -# items! The same comment applies for worker_start_ray_commands. -# -# Increment the following for catching performance bugs easier: -# current num items (num SSH connections): 2 -head_start_ray_commands: - - {{ sky_activate_python_env }}; {{ sky_ray_cmd }} stop; RAY_SCHEDULER_EVENTS=0 RAY_DEDUP_LOGS=0 {{ sky_ray_cmd }} start --disable-usage-stats --head --port={{ray_port}} --min-worker-port 11002 --dashboard-port={{ray_dashboard_port}} --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml {{"--resources='%s'" % custom_resources if custom_resources}} --temp-dir {{ray_temp_dir}} || exit 1; - which prlimit && for id in $(pgrep -f raylet/raylet); do sudo prlimit --nofile=1048576:1048576 --pid=$id || true; done; - {{dump_port_command}}; {{ray_head_wait_initialized_command}} - -{%- if num_nodes > 1 %} -worker_start_ray_commands: - - {{ sky_activate_python_env }}; {{ sky_ray_cmd }} stop; RAY_SCHEDULER_EVENTS=0 RAY_DEDUP_LOGS=0 {{ sky_ray_cmd }} start --disable-usage-stats --address=$RAY_HEAD_IP:{{ray_port}} --min-worker-port 11002 --object-manager-port=8076 {{"--resources='%s'" % custom_resources if custom_resources}} --temp-dir {{ray_temp_dir}} || exit 1; - which prlimit && for id in $(pgrep -f raylet/raylet); do sudo prlimit --nofile=1048576:1048576 --pid=$id || true; done; -{%- else %} -worker_start_ray_commands: [] -{%- endif %} - -head_node: {} -worker_nodes: {} - -# These fields are required for external cloud providers. -head_setup_commands: [] -worker_setup_commands: [] -cluster_synced_files: [] -file_mounts_sync_continuously: False +# Command to start ray clusters are now placed in `sky.provision.instance_setup`. +# We do not need to list it here anymore. diff --git a/sky/templates/oci-ray.yml.j2 b/sky/templates/oci-ray.yml.j2 index 32bd6326ee2..64fa4e745c7 100644 --- a/sky/templates/oci-ray.yml.j2 +++ b/sky/templates/oci-ray.yml.j2 @@ -16,7 +16,11 @@ provider: disable_launch_config_check: true auth: +{% if os_type == "ubuntu" %} ssh_user: ubuntu +{% else %} + ssh_user: opc +{% endif %} ssh_private_key: {{ssh_private_key}} available_node_types: @@ -85,14 +89,20 @@ setup_commands: # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. # Line 'mkdir -p ..': disable host key check # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` - - sudo systemctl stop unattended-upgrades || true; + - echo "setup commands runs at $(date)" > /tmp/provision.tmp.out || true; + {%- if os_type == "ubuntu" %} + sudo systemctl stop unattended-upgrades || true; sudo systemctl disable unattended-upgrades || true; sudo sed -i 's/Unattended-Upgrade "1"/Unattended-Upgrade "0"/g' /etc/apt/apt.conf.d/20auto-upgrades || true; sudo kill -9 `sudo lsof /var/lib/dpkg/lock-frontend | awk '{print $2}' | tail -n 1` || true; sudo pkill -9 apt-get; sudo pkill -9 dpkg; sudo dpkg --configure -a; - ([ `sudo lshw -class display | grep "NVIDIA Corporation" | wc -l` -gt 0 ]) && (sudo which nvidia-smi > /dev/null || ( sudo apt-get install nvidia-driver-530-open -y && sudo apt-get install nvidia-driver-525-server -y ) || true); + {%- else %} + sudo /usr/libexec/oci-growfs -y || true; + sudo systemctl stop firewalld || true; + sudo systemctl disable firewalld || true; + {%- endif %} mkdir -p ~/.ssh; touch ~/.ssh/config; {{ conda_installation_commands }} {{ ray_skypilot_installation_commands }} diff --git a/sky/utils/cli_utils/status_utils.py b/sky/utils/cli_utils/status_utils.py index 09172f24814..96f9b5e9946 100644 --- a/sky/utils/cli_utils/status_utils.py +++ b/sky/utils/cli_utils/status_utils.py @@ -1,19 +1,20 @@ """Utilities for sky status.""" -from typing import Any, Callable, Dict, List, Optional, Tuple +import typing +from typing import Any, Callable, Dict, List, Optional import click import colorama from sky import backends -from sky import clouds as sky_clouds -from sky import resources as resources_lib from sky import status_lib -from sky.provision.kubernetes import utils as kubernetes_utils from sky.skylet import constants from sky.utils import common_utils from sky.utils import log_utils from sky.utils import resources_utils +if typing.TYPE_CHECKING: + from sky.provision.kubernetes import utils as kubernetes_utils + COMMAND_TRUNC_LENGTH = 25 NUM_COST_REPORT_LINES = 5 @@ -303,19 +304,19 @@ def _get_estimated_cost_for_cost_report( return f'$ {cost:.2f}' -def show_kubernetes_cluster_status_table(clusters: List[Any], - show_all: bool) -> None: +def show_kubernetes_cluster_status_table( + clusters: List['kubernetes_utils.KubernetesSkyPilotClusterInfo'], + show_all: bool) -> None: """Compute cluster table values and display for Kubernetes clusters.""" status_columns = [ - StatusColumn('USER', lambda c: c['user']), - StatusColumn('NAME', lambda c: c['cluster_name']), - StatusColumn( - 'LAUNCHED', - lambda c: log_utils.readable_time_duration(c['launched_at'])), + StatusColumn('USER', lambda c: c.user), + StatusColumn('NAME', lambda c: c.cluster_name), + StatusColumn('LAUNCHED', + lambda c: log_utils.readable_time_duration(c.launched_at)), StatusColumn('RESOURCES', - lambda c: c['resources_str'], + lambda c: c.resources_str, trunc_length=70 if not show_all else 0), - StatusColumn('STATUS', lambda c: c['status'].colored_str()), + StatusColumn('STATUS', lambda c: c.status.colored_str()), # TODO(romilb): We should consider adding POD_NAME field here when --all # is passed to help users fetch pod name programmatically. ] @@ -326,8 +327,7 @@ def show_kubernetes_cluster_status_table(clusters: List[Any], cluster_table = log_utils.create_table(columns) # Sort table by user, then by cluster name - sorted_clusters = sorted(clusters, - key=lambda c: (c['user'], c['cluster_name'])) + sorted_clusters = sorted(clusters, key=lambda c: (c.user, c.cluster_name)) for cluster in sorted_clusters: row = [] @@ -344,122 +344,3 @@ def show_kubernetes_cluster_status_table(clusters: List[Any], else: click.echo('No SkyPilot resources found in the ' 'active Kubernetes context.') - - -def process_skypilot_pods( - pods: List[Any], - context: Optional[str] = None -) -> Tuple[List[Dict[Any, Any]], Dict[str, Any], Dict[str, Any]]: - """Process SkyPilot pods on k8s to extract cluster and controller info. - - Args: - pods: List of Kubernetes pod objects. - context: Kubernetes context name, used to detect GPU label formatter. - - Returns: - A tuple containing: - - List of dictionaries with cluster information. - - Dictionary of job controller information. - - Dictionary of serve controller information. - - Each dictionary contains the following keys: - 'cluster_name_on_cloud': The cluster_name_on_cloud used by SkyPilot - 'cluster_name': The cluster name without the user hash - 'user': The user who created the cluster. Fetched from pod label - 'status': The cluster status (assumed UP if pod exists) - 'pods': List of pod objects in the cluster - 'launched_at': Timestamp of when the cluster was launched - 'resources': sky.Resources object for the cluster - """ - clusters: Dict[str, Dict] = {} - jobs_controllers: Dict[str, Dict] = {} - serve_controllers: Dict[str, Dict] = {} - - for pod in pods: - cluster_name_on_cloud = pod.metadata.labels.get('skypilot-cluster') - cluster_name = cluster_name_on_cloud.rsplit( - '-', 1 - )[0] # Remove the user hash to get cluster name (e.g., mycluster-2ea4) - - # Check if cluster name is name of a controller - # Can't use controller_utils.Controllers.from_name(cluster_name) - # because hash is different across users - if 'controller' in cluster_name_on_cloud: - start_time = pod.status.start_time.timestamp() - controller_info = { - 'cluster_name_on_cloud': cluster_name_on_cloud, - 'cluster_name': cluster_name, - 'user': pod.metadata.labels.get('skypilot-user'), - 'status': status_lib.ClusterStatus.UP, - # Assuming UP if pod exists - 'pods': [pod], - 'launched_at': start_time - } - if 'sky-jobs-controller' in cluster_name_on_cloud: - jobs_controllers[cluster_name_on_cloud] = controller_info - elif 'sky-serve-controller' in cluster_name_on_cloud: - serve_controllers[cluster_name_on_cloud] = controller_info - - if cluster_name_on_cloud not in clusters: - # Parse the start time for the cluster - start_time = pod.status.start_time - if start_time is not None: - start_time = pod.status.start_time.timestamp() - - # Parse resources - cpu_request = kubernetes_utils.parse_cpu_or_gpu_resource( - pod.spec.containers[0].resources.requests.get('cpu', '0')) - memory_request = kubernetes_utils.parse_memory_resource( - pod.spec.containers[0].resources.requests.get('memory', '0'), - unit='G') - gpu_count = kubernetes_utils.parse_cpu_or_gpu_resource( - pod.spec.containers[0].resources.requests.get( - 'nvidia.com/gpu', '0')) - if gpu_count > 0: - label_formatter, _ = ( - kubernetes_utils.detect_gpu_label_formatter(context)) - assert label_formatter is not None, ( - 'GPU label formatter cannot be None if there are pods ' - f'requesting GPUs: {pod.metadata.name}') - gpu_label = label_formatter.get_label_key() - # Get GPU name from pod node selector - if pod.spec.node_selector is not None: - gpu_name = label_formatter.get_accelerator_from_label_value( - pod.spec.node_selector.get(gpu_label)) - - resources = resources_lib.Resources( - cloud=sky_clouds.Kubernetes(), - cpus=int(cpu_request), - memory=int(memory_request), - accelerators=(f'{gpu_name}:{gpu_count}' - if gpu_count > 0 else None)) - if pod.status.phase == 'Pending': - # If pod is pending, do not show it in the status - continue - - clusters[cluster_name_on_cloud] = { - 'cluster_name_on_cloud': cluster_name_on_cloud, - 'cluster_name': cluster_name, - 'user': pod.metadata.labels.get('skypilot-user'), - 'status': status_lib.ClusterStatus.UP, - 'pods': [], - 'launched_at': start_time, - 'resources': resources, - } - else: - # Update start_time if this pod started earlier - pod_start_time = pod.status.start_time - if pod_start_time is not None: - pod_start_time = pod_start_time.timestamp() - if pod_start_time < clusters[cluster_name_on_cloud][ - 'launched_at']: - clusters[cluster_name_on_cloud][ - 'launched_at'] = pod_start_time - clusters[cluster_name_on_cloud]['pods'].append(pod) - # Update resources_str in clusters: - for cluster_name, cluster in clusters.items(): - resources = cluster['resources'] - num_pods = len(cluster['pods']) - resources_str = f'{num_pods}x {resources}' - cluster['resources_str'] = resources_str - return list(clusters.values()), jobs_controllers, serve_controllers diff --git a/sky/utils/command_runner.py b/sky/utils/command_runner.py index be6e8346e3d..bbe287d9f79 100644 --- a/sky/utils/command_runner.py +++ b/sky/utils/command_runner.py @@ -502,8 +502,10 @@ def close_cached_connection(self) -> None: if self.ssh_control_name is not None: control_path = _ssh_control_path(self.ssh_control_name) if control_path is not None: + # Suppress the `Exit request sent.` output for this comamnd + # which would interrupt the CLI spinner. cmd = (f'ssh -O exit -S {control_path}/%C ' - f'{self.ssh_user}@{self.ip}') + f'{self.ssh_user}@{self.ip} > /dev/null 2>&1') logger.debug(f'Closing cached connection {control_path!r} with ' f'cmd: {cmd}') log_lib.run_with_log(cmd, diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 6e752f73ebc..94a6ed690e1 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -755,6 +755,9 @@ def get_config_schema(): 'force_enable_external_ips': { 'type': 'boolean' }, + 'enable_gvnic': { + 'type': 'boolean' + }, **_LABELS_SCHEMA, **_NETWORK_CONFIG_SCHEMA, }, diff --git a/sky/utils/ux_utils.py b/sky/utils/ux_utils.py index f6699f355f8..2fffa8a9df9 100644 --- a/sky/utils/ux_utils.py +++ b/sky/utils/ux_utils.py @@ -121,11 +121,6 @@ def run(self, *args, **kwargs): raise -def starting_message(message: str) -> str: - """Gets the starting message for the given message.""" - return f'⚙︎ {message}' - - def log_path_hint(log_path: Union[str, 'pathlib.Path']) -> str: """Gets the log path hint for the given log path.""" log_path = str(log_path) @@ -135,21 +130,50 @@ def log_path_hint(log_path: Union[str, 'pathlib.Path']) -> str: return _LOG_PATH_HINT.format(log_path=log_path) +def starting_message(message: str) -> str: + """Gets the starting message for the given message.""" + # We have to reset the color before the message, because sometimes if a + # previous spinner with dimmed color overflows in a narrow terminal, the + # color might be messed up. + return f'{colorama.Style.RESET_ALL}⚙︎ {message}' + + def finishing_message( message: str, log_path: Optional[Union[str, 'pathlib.Path']] = None) -> str: """Gets the finishing message for the given message.""" - success_prefix = (f'{colorama.Fore.GREEN}✓ {message}' - f'{colorama.Style.RESET_ALL}') + # We have to reset the color before the message, because sometimes if a + # previous spinner with dimmed color overflows in a narrow terminal, the + # color might be messed up. + success_prefix = (f'{colorama.Style.RESET_ALL}{colorama.Fore.GREEN}✓ ' + f'{message}{colorama.Style.RESET_ALL}') if log_path is None: return success_prefix path_hint = log_path_hint(log_path) return f'{success_prefix} {path_hint}' +def error_message(message: str, + log_path: Optional[Union[str, 'pathlib.Path']] = None) -> str: + """Gets the error message for the given message.""" + # We have to reset the color before the message, because sometimes if a + # previous spinner with dimmed color overflows in a narrow terminal, the + # color might be messed up. + error_prefix = (f'{colorama.Style.RESET_ALL}{colorama.Fore.RED}⨯' + f'{colorama.Style.RESET_ALL} {message}') + if log_path is None: + return error_prefix + path_hint = log_path_hint(log_path) + return f'{error_prefix} {path_hint}' + + def retry_message(message: str) -> str: """Gets the retry message for the given message.""" - return f'{colorama.Fore.YELLOW}↺{colorama.Style.RESET_ALL} {message}' + # We have to reset the color before the message, because sometimes if a + # previous spinner with dimmed color overflows in a narrow terminal, the + # color might be messed up. + return (f'{colorama.Style.RESET_ALL}{colorama.Fore.YELLOW}↺' + f'{colorama.Style.RESET_ALL} {message}') def spinner_message( diff --git a/tests/test_smoke.py b/tests/test_smoke.py index cb14b82eb99..8b1737db121 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -383,7 +383,7 @@ def test_aws_region(): f'sky exec {name} \'echo $SKYPILOT_CLUSTER_INFO | jq .region | grep us-east-2\'', f'sky logs {name} 2 --status', # Ensure the job succeeded. # A user program should not access SkyPilot runtime env python by default. - f'sky exec {name} \'which python | grep {constants.SKY_REMOTE_PYTHON_ENV_NAME} || exit 1\'', + f'sky exec {name} \'which python | grep {constants.SKY_REMOTE_PYTHON_ENV_NAME} && exit 1 || true\'', f'sky logs {name} 3 --status', # Ensure the job succeeded. ], f'sky down -y {name}', @@ -406,7 +406,7 @@ def test_gcp_region_and_service_account(): f'sky exec {name} \'echo $SKYPILOT_CLUSTER_INFO | jq .region | grep us-central1\'', f'sky logs {name} 3 --status', # Ensure the job succeeded. # A user program should not access SkyPilot runtime env python by default. - f'sky exec {name} \'which python | grep {constants.SKY_REMOTE_PYTHON_ENV_NAME} || exit 1\'', + f'sky exec {name} \'which python | grep {constants.SKY_REMOTE_PYTHON_ENV_NAME} && exit 1 || true\'', f'sky logs {name} 4 --status', # Ensure the job succeeded. ], f'sky down -y {name}', @@ -446,7 +446,7 @@ def test_azure_region(): f'sky exec {name} \'echo $SKYPILOT_CLUSTER_INFO | jq .zone | grep null\'', f'sky logs {name} 3 --status', # Ensure the job succeeded. # A user program should not access SkyPilot runtime env python by default. - f'sky exec {name} \'which python | grep {constants.SKY_REMOTE_PYTHON_ENV_NAME} || exit 1\'', + f'sky exec {name} \'which python | grep {constants.SKY_REMOTE_PYTHON_ENV_NAME} && exit 1 || true\'', f'sky logs {name} 4 --status', # Ensure the job succeeded. ], f'sky down -y {name}', @@ -864,14 +864,14 @@ def test_custom_default_conda_env(generic_cloud: str): f'sky launch -c {name} -y --cloud {generic_cloud} tests/test_yamls/test_custom_default_conda_env.yaml', f'sky status -r {name} | grep "UP"', f'sky logs {name} 1 --status', - f'sky logs {name} 1 --no-follow | grep -P "myenv\\s+\\*"', + f'sky logs {name} 1 --no-follow | grep -E "myenv\\s+\\*"', f'sky exec {name} tests/test_yamls/test_custom_default_conda_env.yaml', f'sky logs {name} 2 --status', f'sky autostop -y -i 0 {name}', 'sleep 60', f'sky status -r {name} | grep "STOPPED"', f'sky start -y {name}', - f'sky logs {name} 2 --no-follow | grep -P "myenv\\s+\\*"', + f'sky logs {name} 2 --no-follow | grep -E "myenv\\s+\\*"', f'sky exec {name} tests/test_yamls/test_custom_default_conda_env.yaml', f'sky logs {name} 3 --status', ], f'sky down -y {name}')