From 839db19606ae46e75e354f127faa9d5ceb96e7f0 Mon Sep 17 00:00:00 2001 From: Zhanghao Wu Date: Wed, 11 Dec 2024 21:37:46 -0800 Subject: [PATCH] Merge branch 'master' of github.com:skypilot-org/skypilot into restapi (#62) * [perf] use uv for venv creation and pip install (#4414) * Revert "remove `uv` from runtime setup due to azure installation issue (#4401)" This reverts commit 0b20d568ee1af454bfec3e50ff62d239f976e52d. * on azure, use --prerelease=allow to install azure-cli * use uv venv --seed * fix backwards compatibility * really fix backwards compatibility * use uv to set up controller dependencies * fix python 3.8 * lint * add missing file * update comment * split out azure-cli dep * fix lint for dependencies * use runpy.run_path rather than modifying sys.path * fix cloud dependency installation commands * lint * Update sky/utils/controller_utils.py Co-authored-by: Zhanghao Wu --------- Co-authored-by: Zhanghao Wu * [Minor] README updates. (#4436) * [Minor] README touches. * update * update * make --fast robust against credential or wheel updates (#4289) * add config_dict['config_hash'] output to write_cluster_config * fix docstring for write_cluster_config This used to be true, but since #2943, 'ray' is the only provisioner. Add other keys that are now present instead. * when using --fast, check if config_hash matches, and if not, provision * mock hashing method in unit test This is needed since some files in the fake file mounts don't actually exist, like the wheel path. * check config hash within provision with lock held * address other PR review comments * rename to skip_if_no_cluster_updates Co-authored-by: Zhanghao Wu * add assert details Co-authored-by: Zhanghao Wu * address PR comments and update docstrings * fix test * update docstrings Co-authored-by: Zhanghao Wu * address PR comments * fix lint and tests * Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Zhanghao Wu * refactor skip_if_no_cluster_update var * clarify comment * format exception --------- Co-authored-by: Zhanghao Wu * [k8s] Add resource limits only if they exist (#4440) Add limits only if they exist * [robustness] cover some potential resource leakage cases (#4443) * if a newly-created cluster is missing from the cloud, wait before deleting Addresses #4431. * confirm cluster actually terminates before deleting from the db * avoid deleting cluster data outside the primary provision loop * tweaks * Apply suggestions from code review Co-authored-by: Zhanghao Wu * use usage_intervals for new cluster detection get_cluster_duration will include the total duration of the cluster since its initial launch, while launched_at may be reset by sky launch on an existing cluster. So this is a more accurate method to check. * fix terminating/stopping state for Lambda and Paperspace * Revert "use usage_intervals for new cluster detection" This reverts commit aa6d2e9f8462c4e68196e9a6420c6781c9ff116b. * check cloud.STATUS_VERSION before calling query_instances * avoid try/catch when querying instances * update comments --------- Co-authored-by: Zhanghao Wu * smoke tests support storage mount only (#4446) * smoke tests support storage mount only * fix verify command * rename to only_mount * [Feature] support spot pod on RunPod (#4447) * wip * wip * wip * wip * wip * wip * resolve comments * wip * wip * wip * wip * wip * wip --------- Co-authored-by: hwei * use lazy import for runpod (#4451) Fixes runpod import issues introduced in #4447. * [k8s] Fix show-gpus when running with incluster auth (#4452) * Add limits only if they exist * Fix incluster auth handling * Merge branch 'master' of github.com:skypilot-org/skypilot into restapi * Add comment * Not mutate azure dep list at runtime (#4457) * format --------- Co-authored-by: Christopher Cooper Co-authored-by: Zongheng Yang Co-authored-by: Romil Bhardwaj Co-authored-by: zpoint Co-authored-by: Hong Co-authored-by: hwei Co-authored-by: Yika --- sky/backends/backend_utils.py | 49 ++++-- sky/backends/cloud_vm_ray_backend.py | 69 +++++++- sky/clouds/runpod.py | 19 ++- .../service_catalog/kubernetes_catalog.py | 4 + sky/global_user_state.py | 4 + sky/provision/azure/instance.py | 2 +- sky/provision/gcp/instance.py | 2 + sky/provision/lambda_cloud/instance.py | 2 +- sky/provision/paperspace/instance.py | 3 +- sky/provision/runpod/api/__init__.py | 3 + sky/provision/runpod/api/commands.py | 119 ++++++++++++++ sky/provision/runpod/api/pods.py | 142 +++++++++++++++++ sky/provision/runpod/instance.py | 5 +- sky/provision/runpod/utils.py | 50 +++--- sky/templates/kubernetes-ray.yml.j2 | 4 +- sky/templates/runpod-ray.yml.j2 | 2 + tests/test_smoke.py | 147 +++++++++--------- .../test_yamls/test_storage_mounting.yaml.j2 | 10 +- 18 files changed, 511 insertions(+), 125 deletions(-) create mode 100644 sky/provision/runpod/api/__init__.py create mode 100644 sky/provision/runpod/api/commands.py create mode 100644 sky/provision/runpod/api/pods.py diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 79c72adb365..dfadf70e6c0 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -114,6 +114,16 @@ _ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, ' 'please retry after a while.') +# If a cluster is less than LAUNCH_DOUBLE_CHECK_WINDOW seconds old, and we don't +# see any instances in the cloud, the instances might be in the proccess of +# being created. We will wait LAUNCH_DOUBLE_CHECK_DELAY seconds and then double +# check to make sure there are still no instances. LAUNCH_DOUBLE_CHECK_DELAY +# should be set longer than the delay between (sending the create instance +# request) and (the instances appearing on the cloud). +# See https://github.com/skypilot-org/skypilot/issues/4431. +_LAUNCH_DOUBLE_CHECK_WINDOW = 60 +_LAUNCH_DOUBLE_CHECK_DELAY = 1 + # Include the fields that will be used for generating tags that distinguishes # the cluster in ray, to avoid the stopped cluster being discarded due to # updates in the yaml template. @@ -1761,13 +1771,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: logger.debug( f'Refreshing status ({cluster_name!r}) failed to get IPs.') except RuntimeError as e: - logger.debug(str(e)) + logger.debug(common_utils.format_exception(e)) except Exception as e: # pylint: disable=broad-except # This can be raised by `external_ssh_ports()`, due to the # underlying call to kubernetes API. - logger.debug( - f'Refreshing status ({cluster_name!r}) failed: ' - f'{common_utils.format_exception(e, use_bracket=True)}') + logger.debug(f'Refreshing status ({cluster_name!r}) failed: ', + exc_info=e) return False # Determining if the cluster is healthy (UP): @@ -1794,6 +1803,24 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: return global_user_state.get_cluster_from_name(cluster_name) # All cases below are transitioning the cluster to non-UP states. + + if (not node_statuses and handle.launched_resources.cloud.STATUS_VERSION >= + clouds.StatusVersion.SKYPILOT): + # Note: launched_at is set during sky launch, even on an existing + # cluster. This will catch the case where the cluster was terminated on + # the cloud and restarted by sky launch. + time_since_launch = time.time() - record['launched_at'] + if (record['status'] == status_lib.ClusterStatus.INIT and + time_since_launch < _LAUNCH_DOUBLE_CHECK_WINDOW): + # It's possible the instances for this cluster were just created, + # and haven't appeared yet in the cloud API/console. Wait for a bit + # and check again. This is a best-effort leak prevention check. + # See https://github.com/skypilot-org/skypilot/issues/4431. + time.sleep(_LAUNCH_DOUBLE_CHECK_DELAY) + node_statuses = _query_cluster_status_via_cloud_api(handle) + # Note: even if all the node_statuses are UP now, we will still + # consider this cluster abnormal, and its status will be INIT. + if len(node_statuses) > handle.launched_nodes: # Unexpected: in the queried region more than 1 cluster with the same # constructed name tag returned. This will typically not happen unless @@ -1822,13 +1849,15 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: f'{colorama.Style.RESET_ALL}') assert len(node_statuses) <= handle.launched_nodes - # If the node_statuses is empty, all the nodes are terminated. We can - # safely set the cluster status to TERMINATED. This handles the edge case - # where the cluster is terminated by the user manually through the UI. + # If the node_statuses is empty, it should mean that all the nodes are + # terminated and we can set the cluster status to TERMINATED. This handles + # the edge case where the cluster is terminated by the user manually through + # the UI. to_terminate = not node_statuses - # A cluster is considered "abnormal", if not all nodes are TERMINATED or - # not all nodes are STOPPED. We check that with the following logic: + # A cluster is considered "abnormal", if some (but not all) nodes are + # TERMINATED, or not all nodes are STOPPED. We check that with the following + # logic: # * Not all nodes are terminated and there's at least one node # terminated; or # * Any of the non-TERMINATED nodes is in a non-STOPPED status. @@ -1840,6 +1869,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: # cluster is probably down. # * The cluster is partially terminated or stopped should be considered # abnormal. + # * The cluster is partially or completely in the INIT state, which means + # that provisioning was interrupted. This is considered abnormal. # # An abnormal cluster will transition to INIT and have any autostop setting # reset (unless it's autostopping/autodowning). diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index b6fe8f9cb4d..a7779c133b8 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -101,6 +101,11 @@ # The maximum retry count for fetching IP address. _FETCH_IP_MAX_ATTEMPTS = 3 +# How many times to query the cloud provider to make sure instances are +# stopping/terminating, and how long to wait between each query. +_TEARDOWN_WAIT_MAX_ATTEMPTS = 10 +_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS = 1 + _TEARDOWN_FAILURE_MESSAGE = ( f'\n{colorama.Fore.RED}Failed to terminate ' '{cluster_name}. {extra_reason}' @@ -2411,15 +2416,17 @@ def get_command_runners(self, zip(ip_list, port_list), **ssh_credentials) return runners if self.cached_cluster_info is None: - # We have `or self.cached_external_ips is None` here, because + # We have `and self.cached_external_ips is None` here, because # when a cluster's cloud is just upgraded to the new provsioner, # although it has the cached_external_ips, the cached_cluster_info # can be None. We need to update it here, even when force_cached is # set to True. # TODO: We can remove `self.cached_external_ips is None` after # version 0.8.0. - assert not force_cached or self.cached_external_ips is not None, ( - force_cached, self.cached_external_ips) + if force_cached and self.cached_external_ips is None: + raise RuntimeError( + 'Tried to use cached cluster info, but it\'s missing for ' + f'cluster "{self.cluster_name}"') self._update_cluster_info() assert self.cached_cluster_info is not None, self runners = provision_lib.get_command_runners( @@ -4009,7 +4016,6 @@ def teardown_no_lock(self, limit=1000).get_result()['items'] vpc_id = None try: - # pylint: disable=line-too-long vpc_id = vpcs_filtered_by_tags_and_region[0]['crn'].rsplit( ':', 1)[-1] vpc_found = True @@ -4018,7 +4024,6 @@ def teardown_no_lock(self, returncode = -1 if vpc_found: - # pylint: disable=line-too-long E1136 # Delete VPC and it's associated resources vpc_provider = IBMVPCProvider( config_provider['resource_group_id'], region, @@ -4121,6 +4126,7 @@ def post_teardown_cleanup(self, * Removing the terminated cluster's scripts and ray yaml files. """ cluster_name_on_cloud = handle.cluster_name_on_cloud + cloud = handle.launched_resources.cloud if (terminate and handle.launched_resources.is_image_managed is True): # Delete the image when terminating a "cloned" cluster, i.e., @@ -4173,6 +4179,59 @@ def post_teardown_cleanup(self, sky.utils.cluster_utils.SSHConfigHelper.remove_cluster( handle.cluster_name) + + def _detect_abnormal_non_terminated_nodes( + handle: CloudVmRayResourceHandle) -> None: + # Confirm that instances have actually transitioned state before + # updating the state database. We do this immediately before + # removing the state from the database, so that we can guarantee + # that this is always called before the state is removed. We + # considered running this check as part of + # provisioner.teardown_cluster or provision.terminate_instances, but + # it would open the door to code paths that successfully call this + # function but do not first call teardown_cluster or + # terminate_instances. See + # https://github.com/skypilot-org/skypilot/pull/4443#discussion_r1872798032 + attempts = 0 + while True: + config = common_utils.read_yaml(handle.cluster_yaml) + + logger.debug(f'instance statuses attempt {attempts + 1}') + node_status_dict = provision_lib.query_instances( + repr(cloud), + cluster_name_on_cloud, + config['provider'], + non_terminated_only=False) + + unexpected_node_state: Optional[Tuple[str, str]] = None + for node_id, node_status in node_status_dict.items(): + logger.debug(f'{node_id} status: {node_status}') + # FIXME(cooperc): Some clouds (e.g. GCP) do not distinguish + # between "stopping/stopped" and "terminating/terminated", + # so we allow for either status instead of casing on + # `terminate`. + if node_status not in [ + None, status_lib.ClusterStatus.STOPPED + ]: + unexpected_node_state = (node_id, node_status) + break + + if unexpected_node_state is None: + break + + attempts += 1 + if attempts < _TEARDOWN_WAIT_MAX_ATTEMPTS: + time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS) + else: + (node_id, node_status) = unexpected_node_state + raise RuntimeError(f'Instance {node_id} in unexpected ' + f'state {node_status}.') + + # If cluster_yaml is None, the cluster should ensured to be terminated, + # so we don't need to do the double check. + if handle.cluster_yaml is not None: + _detect_abnormal_non_terminated_nodes(handle) + if not terminate or remove_from_db: global_user_state.remove_cluster(handle.cluster_name, terminate=terminate) diff --git a/sky/clouds/runpod.py b/sky/clouds/runpod.py index 7dff076d94e..77de4fa9dfb 100644 --- a/sky/clouds/runpod.py +++ b/sky/clouds/runpod.py @@ -25,8 +25,6 @@ class RunPod(clouds.Cloud): _REPR = 'RunPod' _CLOUD_UNSUPPORTED_FEATURES = { clouds.CloudImplementationFeatures.STOP: 'Stopping not supported.', - clouds.CloudImplementationFeatures.SPOT_INSTANCE: - ('Spot is not supported, as runpod API does not implement spot.'), clouds.CloudImplementationFeatures.MULTI_NODE: ('Multi-node not supported yet, as the interconnection among nodes ' 'are non-trivial on RunPod.'), @@ -71,11 +69,8 @@ def regions_with_offering(cls, instance_type: str, zone: Optional[str]) -> List[clouds.Region]: assert zone is None, 'RunPod does not support zones.' del accelerators, zone # unused - if use_spot: - return [] - else: - regions = service_catalog.get_region_zones_for_instance_type( - instance_type, use_spot, 'runpod') + regions = service_catalog.get_region_zones_for_instance_type( + instance_type, use_spot, 'runpod') if region is not None: regions = [r for r in regions if r.name == region] @@ -177,11 +172,19 @@ def make_deploy_resources_variables( else: image_id = r.image_id[r.region] + instance_type = resources.instance_type + use_spot = resources.use_spot + + hourly_cost = self.instance_type_to_hourly_cost( + instance_type=instance_type, use_spot=use_spot) + return { - 'instance_type': resources.instance_type, + 'instance_type': instance_type, 'custom_resources': custom_resources, 'region': region.name, 'image_id': image_id, + 'use_spot': use_spot, + 'bid_per_gpu': str(hourly_cost), } def _get_feasible_launchable_resources( diff --git a/sky/clouds/service_catalog/kubernetes_catalog.py b/sky/clouds/service_catalog/kubernetes_catalog.py index 2c7eafc20e5..655b3b54a66 100644 --- a/sky/clouds/service_catalog/kubernetes_catalog.py +++ b/sky/clouds/service_catalog/kubernetes_catalog.py @@ -123,6 +123,10 @@ def _list_accelerators( # clusters defined by allowed_contexts. if region_filter is None: context = kubernetes_utils.get_current_kube_config_context_name() + if context is None and kubernetes_utils.is_incluster_config_available(): + # If context is None and we are running in a kubernetes pod, use the + # in-cluster context as the current context. + context = kubernetes.in_cluster_context_name() else: context = region_filter if context is None: diff --git a/sky/global_user_state.py b/sky/global_user_state.py index 63a8341f3d6..f8ca8086106 100644 --- a/sky/global_user_state.py +++ b/sky/global_user_state.py @@ -155,6 +155,10 @@ def create_table(cursor, conn): value_to_replace_existing_entries=common_utils.get_user_hash()) db_utils.add_column_to_table(cursor, conn, 'clusters', 'config_hash', 'TEXT DEFAULT null') + + db_utils.add_column_to_table(cursor, conn, 'clusters', 'config_hash', + 'TEXT DEFAULT null') + db_utils.add_column_to_table(cursor, conn, 'cluster_history', 'user_hash', 'TEXT DEFAULT null') conn.commit() diff --git a/sky/provision/azure/instance.py b/sky/provision/azure/instance.py index ec1b7d2ae15..80986a6f35b 100644 --- a/sky/provision/azure/instance.py +++ b/sky/provision/azure/instance.py @@ -101,8 +101,8 @@ def cluster_status_map( ) -> Dict['AzureInstanceStatus', Optional[status_lib.ClusterStatus]]: return { cls.PENDING: status_lib.ClusterStatus.INIT, - cls.STOPPING: status_lib.ClusterStatus.INIT, cls.RUNNING: status_lib.ClusterStatus.UP, + cls.STOPPING: status_lib.ClusterStatus.STOPPED, cls.STOPPED: status_lib.ClusterStatus.STOPPED, cls.DELETING: None, } diff --git a/sky/provision/gcp/instance.py b/sky/provision/gcp/instance.py index 3aac3ad4193..8244bac5633 100644 --- a/sky/provision/gcp/instance.py +++ b/sky/provision/gcp/instance.py @@ -52,6 +52,8 @@ def _filter_instances( # non_terminated_only=True? # Will there be callers who would want this to be False? # stop() and terminate() for example already implicitly assume non-terminated. +# Currently, even with non_terminated_only=False, we may not have a dict entry +# for terminated instances, if they have already been fully deleted. @common_utils.retry def query_instances( cluster_name_on_cloud: str, diff --git a/sky/provision/lambda_cloud/instance.py b/sky/provision/lambda_cloud/instance.py index ef2e52d5675..ae8090a3e8c 100644 --- a/sky/provision/lambda_cloud/instance.py +++ b/sky/provision/lambda_cloud/instance.py @@ -221,7 +221,7 @@ def query_instances( 'booting': status_lib.ClusterStatus.INIT, 'active': status_lib.ClusterStatus.UP, 'unhealthy': status_lib.ClusterStatus.INIT, - 'terminating': status_lib.ClusterStatus.INIT, + 'terminating': None, } statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} for instance_id, instance in instances.items(): diff --git a/sky/provision/paperspace/instance.py b/sky/provision/paperspace/instance.py index 44c69e052bb..2f7a083bd14 100644 --- a/sky/provision/paperspace/instance.py +++ b/sky/provision/paperspace/instance.py @@ -286,12 +286,13 @@ def query_instances( assert provider_config is not None, (cluster_name_on_cloud, provider_config) instances = _filter_instances(cluster_name_on_cloud, None) + # https://docs.digitalocean.com/reference/paperspace/core/commands/machines/#show status_map = { 'starting': status_lib.ClusterStatus.INIT, 'restarting': status_lib.ClusterStatus.INIT, 'upgrading': status_lib.ClusterStatus.INIT, 'provisioning': status_lib.ClusterStatus.INIT, - 'stopping': status_lib.ClusterStatus.INIT, + 'stopping': status_lib.ClusterStatus.STOPPED, 'serviceready': status_lib.ClusterStatus.INIT, 'ready': status_lib.ClusterStatus.UP, 'off': status_lib.ClusterStatus.STOPPED, diff --git a/sky/provision/runpod/api/__init__.py b/sky/provision/runpod/api/__init__.py new file mode 100644 index 00000000000..e6826dcaa98 --- /dev/null +++ b/sky/provision/runpod/api/__init__.py @@ -0,0 +1,3 @@ +"""RunPod low level API support for spot pod.""" + +from sky.provision.runpod.api.commands import create_spot_pod diff --git a/sky/provision/runpod/api/commands.py b/sky/provision/runpod/api/commands.py new file mode 100644 index 00000000000..82983e01308 --- /dev/null +++ b/sky/provision/runpod/api/commands.py @@ -0,0 +1,119 @@ +"""This module provides functions to generate GraphQL mutations for deploying +spot instance Pods on RunPod. + +Reference: + https://github.com/runpod/runpod-python/blob/main/runpod/api/ctl_commands.py + +Functions: + generate_spot_pod_deployment_mutation: Generates a GraphQL mutation string + for deploying a spot instance Pod on RunPod. + +Example: + >>> mutation = generate_spot_pod_deployment_mutation( + name='test', + image_name='runpod/stack', + gpu_type_id='NVIDIA GeForce RTX 3070', + bid_per_gpu=0.3 + ) +""" +from typing import List, Optional + +from sky.adaptors import runpod +from sky.provision.runpod.api.pods import generate_spot_pod_deployment_mutation + +_INTERRUPTABLE_POD_FIELD: str = 'podRentInterruptable' +_RESPONSE_DATA_FIELD: str = 'data' + + +def create_spot_pod( + name: str, + image_name: str, + gpu_type_id: str, + bid_per_gpu: float, + cloud_type: str = 'ALL', + volume_mount_path: str = '/runpod-volume', + gpu_count: Optional[int] = 1, + min_memory_in_gb: Optional[int] = 1, + min_vcpu_count: Optional[int] = 1, + container_disk_in_gb: Optional[int] = None, + volume_in_gb: Optional[int] = 0, + ports: Optional[str] = None, + start_ssh: Optional[bool] = True, + start_jupyter: Optional[bool] = False, + env: Optional[dict] = None, + docker_args: Optional[str] = '', + support_public_ip: Optional[bool] = True, + terminate_after: Optional[str] = None, + stop_after: Optional[str] = None, + data_center_id: Optional[str] = None, + country_code: Optional[str] = None, + network_volume_id: Optional[str] = None, + allowed_cuda_versions: Optional[List[str]] = None, + min_download: Optional[int] = None, + min_upload: Optional[int] = None, + cuda_version: Optional[str] = None, + template_id: Optional[str] = None, + volume_key: Optional[str] = None, +) -> dict: + """This module provides functions to generate GraphQL mutations for + deploying spot instance Pods on RunPod. + + Functions: + generate_spot_pod_deployment_mutation: Generates a GraphQL mutation + string for deploying a spot instance Pod on RunPod. + + Example: + >>> mutation = generate_spot_pod_deployment_mutation( + name='test', + image_name='runpod/stack', + gpu_type_id='NVIDIA GeForce RTX 3070', + bid_per_gpu=0.3 + ) + """ + runpod.runpod.get_gpu(gpu_type_id) + # refer to https://graphql-spec.runpod.io/#definition-CloudTypeEnum + if cloud_type not in ['ALL', 'COMMUNITY', 'SECURE']: + raise ValueError('cloud_type must be one of ALL, COMMUNITY or SECURE') + + if network_volume_id and data_center_id is None: + user_info = runpod.runpod.get_user() + for network_volume in user_info['networkVolumes']: + if network_volume['id'] == network_volume_id: + data_center_id = network_volume['dataCenterId'] + break + + if container_disk_in_gb is None and template_id is None: + container_disk_in_gb = 10 + + mutation = generate_spot_pod_deployment_mutation( + name=name, + image_name=image_name, + gpu_type_id=gpu_type_id, + bid_per_gpu=bid_per_gpu, + cloud_type=cloud_type, + gpu_count=gpu_count, + min_memory_in_gb=min_memory_in_gb, + min_vcpu_count=min_vcpu_count, + container_disk_in_gb=container_disk_in_gb, + volume_in_gb=volume_in_gb, + volume_mount_path=volume_mount_path, + ports=ports, + start_ssh=start_ssh, + start_jupyter=start_jupyter, + env=env, + docker_args=docker_args, + support_public_ip=support_public_ip, + terminate_after=terminate_after, + stop_after=stop_after, + data_center_id=data_center_id, + country_code=country_code, + network_volume_id=network_volume_id, + allowed_cuda_versions=allowed_cuda_versions, + min_download=min_download, + min_upload=min_upload, + cuda_version=cuda_version, + template_id=template_id, + volume_key=volume_key, + ) + response = runpod.runpod.api.graphql.run_graphql_query(mutation) + return response[_RESPONSE_DATA_FIELD][_INTERRUPTABLE_POD_FIELD] diff --git a/sky/provision/runpod/api/pods.py b/sky/provision/runpod/api/pods.py new file mode 100644 index 00000000000..8a232cfbdcc --- /dev/null +++ b/sky/provision/runpod/api/pods.py @@ -0,0 +1,142 @@ +"""This module provides functions to generate GraphQL mutations for deploying +spot instance Pods on RunPod. + +Reference: + https://github.com/runpod/runpod-python/blob/main/runpod/api/mutations/pods.py + +Functions: + generate_spot_pod_deployment_mutation: Generates a GraphQL mutation string + for deploying a spot instance Pod on RunPod. +Example: + >>> mutation = generate_spot_pod_deployment_mutation( + name='test', + image_name='runpod/stack', + gpu_type_id='NVIDIA GeForce RTX 3070', + bid_per_gpu=0.3 + ) +""" + +from typing import List, Optional + + +# refer to https://graphql-spec.runpod.io/#definition-PodRentInterruptableInput +def generate_spot_pod_deployment_mutation( + name: str, + image_name: str, + gpu_type_id: str, + bid_per_gpu: float, + volume_mount_path: str, + cloud_type: str = 'ALL', + gpu_count: Optional[int] = None, + min_memory_in_gb: Optional[int] = None, + min_vcpu_count: Optional[int] = None, + container_disk_in_gb: Optional[int] = None, + volume_in_gb: Optional[int] = None, + ports: Optional[str] = None, + start_ssh: Optional[bool] = True, + start_jupyter: Optional[bool] = False, + env: Optional[dict] = None, + docker_args: Optional[str] = None, + support_public_ip: Optional[bool] = True, + terminate_after: Optional[str] = None, + stop_after: Optional[str] = None, + data_center_id: Optional[str] = None, + country_code: Optional[str] = None, + network_volume_id: Optional[str] = None, + allowed_cuda_versions: Optional[List[str]] = None, + min_download: Optional[int] = None, + min_upload: Optional[int] = None, + cuda_version: Optional[str] = None, + template_id: Optional[str] = None, + volume_key: Optional[str] = None, +) -> str: + input_fields = [] + + # Required Fields + input_fields.append(f'name: "{name}"') + input_fields.append(f'imageName: "{image_name}"') + input_fields.append(f'gpuTypeId: "{gpu_type_id}"') + input_fields.append(f'bidPerGpu: {bid_per_gpu}') + input_fields.append(f'volumeMountPath: "{volume_mount_path}"') + + # Default Fields + input_fields.append(f'cloudType: {cloud_type}') + + if start_ssh: + input_fields.append('startSsh: true') + if start_jupyter: + input_fields.append('startJupyter: true') + if support_public_ip: + input_fields.append('supportPublicIp: true') + else: + input_fields.append('supportPublicIp: false') + + # Optional Fields + if gpu_count is not None: + input_fields.append(f'gpuCount: {gpu_count}') + if min_memory_in_gb is not None: + input_fields.append(f'minMemoryInGb: {min_memory_in_gb}') + if min_vcpu_count is not None: + input_fields.append(f'minVcpuCount: {min_vcpu_count}') + if container_disk_in_gb is not None: + input_fields.append(f'containerDiskInGb: {container_disk_in_gb}') + if volume_in_gb is not None: + input_fields.append(f'volumeInGb: {volume_in_gb}') + if ports is not None: + ports = ports.replace(' ', '') + input_fields.append(f'ports: "{ports}"') + if docker_args is not None: + input_fields.append(f'dockerArgs: "{docker_args}"') + if terminate_after is not None: + input_fields.append(f'terminateAfter: "{terminate_after}"') + if stop_after is not None: + input_fields.append(f'stopAfter: "{stop_after}"') + if data_center_id is not None: + input_fields.append(f'dataCenterId: "{data_center_id}"') + if country_code is not None: + input_fields.append(f'countryCode: "{country_code}"') + if network_volume_id is not None: + input_fields.append(f'networkVolumeId: "{network_volume_id}"') + if allowed_cuda_versions is not None: + allowed_cuda_versions_string = ', '.join( + [f'"{version}"' for version in allowed_cuda_versions]) + input_fields.append( + f'allowedCudaVersions: [{allowed_cuda_versions_string}]') + if min_download is not None: + input_fields.append(f'minDownload: {min_download}') + if min_upload is not None: + input_fields.append(f'minUpload: {min_upload}') + if cuda_version is not None: + input_fields.append(f'cudaVersion: "{cuda_version}"') + if template_id is not None: + input_fields.append(f'templateId: "{template_id}"') + if volume_key is not None: + input_fields.append(f'volumeKey: "{volume_key}"') + + if env is not None: + env_string = ', '.join([ + f'{{ key: "{key}", value: "{value}" }}' + for key, value in env.items() + ]) + input_fields.append(f'env: [{env_string}]') + + # Format input fields + input_string = ', '.join(input_fields) + return f""" + mutation {{ + podRentInterruptable( + input: {{ + {input_string} + }} + ) {{ + id + desiredStatus + imageName + env + machineId + machine {{ + podHostId + }} + }} + }} + """ diff --git a/sky/provision/runpod/instance.py b/sky/provision/runpod/instance.py index e5adfa9fba7..fbbedb53e9c 100644 --- a/sky/provision/runpod/instance.py +++ b/sky/provision/runpod/instance.py @@ -89,7 +89,10 @@ def run_instances(region: str, cluster_name_on_cloud: str, disk_size=config.node_config['DiskSize'], image_name=config.node_config['ImageId'], ports=config.ports_to_open_on_launch, - public_key=config.node_config['PublicKey']) + public_key=config.node_config['PublicKey'], + preemptible=config.node_config['Preemptible'], + bid_per_gpu=config.node_config['BidPerGPU'], + ) except Exception as e: # pylint: disable=broad-except logger.warning(f'run_instances error: {e}') raise diff --git a/sky/provision/runpod/utils.py b/sky/provision/runpod/utils.py index f1587463e84..d0a06b026b3 100644 --- a/sky/provision/runpod/utils.py +++ b/sky/provision/runpod/utils.py @@ -6,6 +6,7 @@ from sky import sky_logging from sky.adaptors import runpod +import sky.provision.runpod.api.commands as runpod_commands from sky.skylet import constants from sky.utils import common_utils @@ -100,7 +101,8 @@ def list_instances() -> Dict[str, Dict[str, Any]]: def launch(name: str, instance_type: str, region: str, disk_size: int, - image_name: str, ports: Optional[List[int]], public_key: str) -> str: + image_name: str, ports: Optional[List[int]], public_key: str, + preemptible: Optional[bool], bid_per_gpu: float) -> str: """Launches an instance with the given parameters. Converts the instance_type to the RunPod GPU name, finds the specs for the @@ -142,23 +144,35 @@ def launch(name: str, instance_type: str, region: str, disk_size: int, if ports is not None: custom_ports_str = ''.join([f'{p}/tcp,' for p in ports]) - new_instance = runpod.runpod.create_pod( - name=name, - image_name=image_name, - gpu_type_id=gpu_type, - cloud_type=cloud_type, - container_disk_in_gb=disk_size, - min_vcpu_count=4 * gpu_quantity, - min_memory_in_gb=gpu_specs['memoryInGb'] * gpu_quantity, - gpu_count=gpu_quantity, - country_code=region, - ports=(f'22/tcp,' - f'{custom_ports_str}' - f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' - f'{constants.SKY_REMOTE_RAY_PORT}/http'), - support_public_ip=True, - docker_args= - f'bash -c \'echo {encoded} | base64 --decode > init.sh; bash init.sh\'') + docker_args = (f'bash -c \'echo {encoded} | base64 --decode > init.sh; ' + f'bash init.sh\'') + ports = (f'22/tcp,' + f'{custom_ports_str}' + f'{constants.SKY_REMOTE_RAY_DASHBOARD_PORT}/http,' + f'{constants.SKY_REMOTE_RAY_PORT}/http') + + params = { + 'name': name, + 'image_name': image_name, + 'gpu_type_id': gpu_type, + 'cloud_type': cloud_type, + 'container_disk_in_gb': disk_size, + 'min_vcpu_count': 4 * gpu_quantity, + 'min_memory_in_gb': gpu_specs['memoryInGb'] * gpu_quantity, + 'gpu_count': gpu_quantity, + 'country_code': region, + 'ports': ports, + 'support_public_ip': True, + 'docker_args': docker_args, + } + + if preemptible is None or not preemptible: + new_instance = runpod.runpod.create_pod(**params) + else: + new_instance = runpod_commands.create_spot_pod( + bid_per_gpu=bid_per_gpu, + **params, + ) return new_instance['id'] diff --git a/sky/templates/kubernetes-ray.yml.j2 b/sky/templates/kubernetes-ray.yml.j2 index b587cb467ee..c5c45d6c646 100644 --- a/sky/templates/kubernetes-ray.yml.j2 +++ b/sky/templates/kubernetes-ray.yml.j2 @@ -560,6 +560,7 @@ available_node_types: # https://gitlab.com/arm-research/smarter/smarter-device-manager smarter-devices/fuse: "1" {% endif %} + {% if k8s_resource_key is not none or k8s_fuse_device_required %} limits: # Limits need to be defined for GPU/TPU requests {% if k8s_resource_key is not none %} @@ -568,7 +569,8 @@ available_node_types: {% if k8s_fuse_device_required %} smarter-devices/fuse: "1" {% endif %} - + {% endif %} + setup_commands: # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) # Add ~/.ssh/sky-cluster-key to SSH config to allow nodes within a cluster to connect to each other diff --git a/sky/templates/runpod-ray.yml.j2 b/sky/templates/runpod-ray.yml.j2 index 2dab4ad8c13..3cb0f6fa8ea 100644 --- a/sky/templates/runpod-ray.yml.j2 +++ b/sky/templates/runpod-ray.yml.j2 @@ -24,6 +24,8 @@ available_node_types: ImageId: {{image_id}} PublicKey: |- skypilot:ssh_public_key_content + Preemptible: {{use_spot}} + BidPerGPU: {{bid_per_gpu}} head_node_type: ray_head_default diff --git a/tests/test_smoke.py b/tests/test_smoke.py index a5163c1cc52..16a7103fe47 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -37,7 +37,7 @@ import tempfile import textwrap import time -from typing import Dict, List, NamedTuple, Optional, Tuple +from typing import Dict, List, NamedTuple, Optional, TextIO, Tuple import urllib.parse import uuid @@ -1427,34 +1427,68 @@ def test_using_file_mounts_with_env_vars(generic_cloud: str): # ---------- storage ---------- + + +def _storage_mounts_commands_generator(f: TextIO, cluster_name: str, + storage_name: str, ls_hello_command: str, + cloud: str, only_mount: bool): + template_str = pathlib.Path( + 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() + template = jinja2.Template(template_str) + content = template.render( + storage_name=storage_name, + cloud=cloud, + only_mount=only_mount, + ) + f.write(content) + f.flush() + file_path = f.name + test_commands = [ + *STORAGE_SETUP_COMMANDS, + f'sky launch -y -c {cluster_name} --cloud {cloud} {file_path}', + f'sky logs {cluster_name} 1 --status', # Ensure job succeeded. + ls_hello_command, + f'sky stop -y {cluster_name}', + f'sky start -y {cluster_name}', + # Check if hello.txt from mounting bucket exists after restart in + # the mounted directory + f'sky exec {cluster_name} -- "set -ex; ls /mount_private_mount/hello.txt"', + ] + clean_command = f'sky down -y {cluster_name}; sky storage delete -y {storage_name}' + return test_commands, clean_command + + @pytest.mark.aws def test_aws_storage_mounts_with_stop(): name = _get_cluster_name() cloud = 'aws' storage_name = f'sky-test-{int(time.time())}' - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name, cloud=cloud) + ls_hello_command = f'aws s3 ls {storage_name}/hello.txt' with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud {cloud} {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'aws s3 ls {storage_name}/hello.txt', - f'sky stop -y {name}', - f'sky start -y {name}', - # Check if hello.txt from mounting bucket exists after restart in - # the mounted directory - f'sky exec {name} -- "set -ex; ls /mount_private_mount/hello.txt"' - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, False) test = Test( 'aws_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, + timeout=20 * 60, # 20 mins + ) + run_one_test(test) + + +@pytest.mark.aws +def test_aws_storage_mounts_with_stop_only_mount(): + name = _get_cluster_name() + cloud = 'aws' + storage_name = f'sky-test-{int(time.time())}' + ls_hello_command = f'aws s3 ls {storage_name}/hello.txt' + with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, True) + test = Test( + 'aws_storage_mounts_only_mount', + test_commands, + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) @@ -1465,29 +1499,14 @@ def test_gcp_storage_mounts_with_stop(): name = _get_cluster_name() cloud = 'gcp' storage_name = f'sky-test-{int(time.time())}' - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name, cloud=cloud) + ls_hello_command = f'gsutil ls gs://{storage_name}/hello.txt' with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud {cloud} {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'gsutil ls gs://{storage_name}/hello.txt', - f'sky stop -y {name}', - f'sky start -y {name}', - # Check if hello.txt from mounting bucket exists after restart in - # the mounted directory - f'sky exec {name} -- "set -ex; ls /mount_private_mount/hello.txt"' - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, False) test = Test( 'gcp_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) @@ -1503,31 +1522,19 @@ def test_azure_storage_mounts_with_stop(): get_default_storage_account_name(default_region)) storage_account_key = data_utils.get_az_storage_account_key( storage_account_name) - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name, cloud=cloud) + # if the file does not exist, az storage blob list returns '[]' + ls_hello_command = (f'output=$(az storage blob list -c {storage_name} ' + f'--account-name {storage_account_name} ' + f'--account-key {storage_account_key} ' + f'--prefix hello.txt) ' + f'[ "$output" = "[]" ] && exit 1 || exit 0') with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud {cloud} {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'output=$(az storage blob list -c {storage_name} --account-name {storage_account_name} --account-key {storage_account_key} --prefix hello.txt)' - # if the file does not exist, az storage blob list returns '[]' - f'[ "$output" = "[]" ] && exit 1;' - f'sky stop -y {name}', - f'sky start -y {name}', - # Check if hello.txt from mounting bucket exists after restart in - # the mounted directory - f'sky exec {name} -- "set -ex; ls /mount_private_mount/hello.txt"' - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, cloud, False) test = Test( 'azure_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) @@ -1540,25 +1547,15 @@ def test_kubernetes_storage_mounts(): # built for x86_64 only. name = _get_cluster_name() storage_name = f'sky-test-{int(time.time())}' - template_str = pathlib.Path( - 'tests/test_yamls/test_storage_mounting.yaml.j2').read_text() - template = jinja2.Template(template_str) - content = template.render(storage_name=storage_name) + ls_hello_command = (f'aws s3 ls {storage_name}/hello.txt || ' + f'gsutil ls gs://{storage_name}/hello.txt') with tempfile.NamedTemporaryFile(suffix='.yaml', mode='w') as f: - f.write(content) - f.flush() - file_path = f.name - test_commands = [ - *STORAGE_SETUP_COMMANDS, - f'sky launch -y -c {name} --cloud kubernetes {file_path}', - f'sky logs {name} 1 --status', # Ensure job succeeded. - f'aws s3 ls {storage_name}/hello.txt || ' - f'gsutil ls gs://{storage_name}/hello.txt', - ] + test_commands, clean_command = _storage_mounts_commands_generator( + f, name, storage_name, ls_hello_command, 'kubernetes', False) test = Test( 'kubernetes_storage_mounts', test_commands, - f'sky down -y {name}; sky storage delete -y {storage_name}', + clean_command, timeout=20 * 60, # 20 mins ) run_one_test(test) diff --git a/tests/test_yamls/test_storage_mounting.yaml.j2 b/tests/test_yamls/test_storage_mounting.yaml.j2 index 4241c63409e..651fb190012 100644 --- a/tests/test_yamls/test_storage_mounting.yaml.j2 +++ b/tests/test_yamls/test_storage_mounting.yaml.j2 @@ -20,13 +20,13 @@ file_mounts: /mount_private_copy: name: {{storage_name}} source: ~/tmp-workdir - mode: COPY + mode: {% if only_mount | default(false) %}MOUNT{% else %}COPY{% endif %} # Mounting private buckets in COPY mode with a list of files as source /mount_private_copy_lof: name: {{storage_name}} source: ['~/tmp-workdir/tmp file', '~/tmp-workdir/tmp file2'] - mode: COPY + mode: {% if only_mount | default(false) %}MOUNT{% else %}COPY{% endif %} {% if include_private_mount | default(True) %} # Mounting private buckets in MOUNT mode @@ -38,7 +38,7 @@ file_mounts: run: | set -ex - + # Check public bucket contents ls -ltr /mount_public_s3/corpora ls -ltr /mount_public_gcp/tiles @@ -55,12 +55,12 @@ run: | ls -ltr /mount_private_mount/foo ls -ltr /mount_private_mount/tmp\ file {% endif %} - + # Symlinks are not copied to buckets ! ls /mount_private_copy/circle-link {% if include_private_mount | default(True) %} ! ls /mount_private_mount/circle-link - + # Write to private bucket in MOUNT mode should pass echo "hello" > /mount_private_mount/hello.txt {% endif %}