From 4adda4158cca5599537677283c6ebc746ec5c989 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Thu, 24 Oct 2024 15:16:59 -0700 Subject: [PATCH 01/17] Fix type checking issues in jobs/ and serve/ directories (#4161) * fix some linter errors * refactor: use `Sequence` for more accurate typing and for covariance --- sky/exceptions.py | 8 ++++---- sky/jobs/recovery_strategy.py | 6 +++--- sky/usage/usage_lib.py | 5 +++-- sky/utils/common_utils.py | 5 +++-- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/sky/exceptions.py b/sky/exceptions.py index 066d36c3cf3..f78c6605261 100644 --- a/sky/exceptions.py +++ b/sky/exceptions.py @@ -1,7 +1,7 @@ """Exceptions.""" import enum import typing -from typing import List, Optional +from typing import List, Optional, Sequence if typing.TYPE_CHECKING: from sky import status_lib @@ -61,12 +61,12 @@ class ProvisionPrechecksError(Exception): the error will be raised. Args: - reasons: (List[Exception]) The reasons why the prechecks failed. + reasons: (Sequence[Exception]) The reasons why the prechecks failed. """ - def __init__(self, reasons: List[Exception]) -> None: + def __init__(self, reasons: Sequence[Exception]) -> None: super().__init__() - self.reasons = list(reasons) + self.reasons = reasons class ManagedJobReachedMaxRetriesError(Exception): diff --git a/sky/jobs/recovery_strategy.py b/sky/jobs/recovery_strategy.py index 2a32aa3b24e..6a931240646 100644 --- a/sky/jobs/recovery_strategy.py +++ b/sky/jobs/recovery_strategy.py @@ -24,6 +24,7 @@ from sky.utils import ux_utils if typing.TYPE_CHECKING: + from sky import resources from sky import task as task_lib logger = sky_logging.init_logger(__name__) @@ -327,8 +328,7 @@ def _launch(self, 'Failure happened before provisioning. Failover ' f'reasons: {reasons_str}') if raise_on_failure: - raise exceptions.ProvisionPrechecksError( - reasons=reasons) + raise exceptions.ProvisionPrechecksError(reasons) return None logger.info('Failed to launch a cluster with error: ' f'{common_utils.format_exception(e)})') @@ -382,7 +382,7 @@ def __init__(self, cluster_name: str, backend: 'backends.Backend', # first retry in the same cloud/region. (Inside recover() we may not # rely on cluster handle, as it can be None if the cluster is # preempted.) - self._launched_resources: Optional['sky.resources.Resources'] = None + self._launched_resources: Optional['resources.Resources'] = None def _launch(self, max_retry: Optional[int] = 3, diff --git a/sky/usage/usage_lib.py b/sky/usage/usage_lib.py index a6c10da5c7a..07867939ee5 100644 --- a/sky/usage/usage_lib.py +++ b/sky/usage/usage_lib.py @@ -432,8 +432,9 @@ def entrypoint_context(name: str, fallback: bool = False): with ux_utils.enable_traceback(): trace = traceback.format_exc() messages.usage.stacktrace = trace - if hasattr(e, 'detailed_reason') and e.detailed_reason is not None: - messages.usage.stacktrace += '\nDetails: ' + e.detailed_reason + detailed_reason = getattr(e, 'detailed_reason', None) + if detailed_reason is not None: + messages.usage.stacktrace += '\nDetails: ' + detailed_reason messages.usage.exception = common_utils.remove_color( common_utils.format_exception(e)) raise diff --git a/sky/utils/common_utils.py b/sky/utils/common_utils.py index 6383ee8af0d..5fce435b770 100644 --- a/sky/utils/common_utils.py +++ b/sky/utils/common_utils.py @@ -362,7 +362,6 @@ def _wrapper(f): @functools.wraps(f) def _record(*args, **kwargs): - nonlocal name_or_fn with cls(name_or_fn, **ctx_kwargs): return f(*args, **kwargs) @@ -376,7 +375,6 @@ def _record(*args, **kwargs): @functools.wraps(name_or_fn) def _record(*args, **kwargs): - nonlocal name_or_fn f = name_or_fn func_name = getattr(f, '__qualname__', f.__name__) module_name = getattr(f, '__module__', '') @@ -579,7 +577,10 @@ def validate_schema(obj, schema, err_msg_prefix='', skip_none=True): e.message) else: err_msg = err_msg_prefix + assert isinstance(e.schema, dict), 'Schema must be a dictionary' known_fields = set(e.schema.get('properties', {}).keys()) + assert isinstance(e.instance, + dict), 'Instance must be a dictionary' for field in e.instance: if field not in known_fields: most_similar_field = difflib.get_close_matches( From e832dde2c5a7f9ba9e141afad874054deb15732c Mon Sep 17 00:00:00 2001 From: Wenjie Ma <55629401+euclidgame@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:47:58 -0700 Subject: [PATCH 02/17] [Serve] Make controller regions/ choose from replica resources (#4053) * Add controller regions. * Consider regions and zones` * Revert cloud in task.yaml * Change the format * Remove one for loop and change the placeholder * Add unit test for get_controller_resources * Correct the number of tests * Use explicit loop for return value * Add some resources to test * Change the early return logic in get_controller_resources * Change some comments * Change default value of region * Some nits * Add types for parameters and ret values in test. --------- Co-authored-by: Wenjie Ma --- sky/utils/controller_utils.py | 87 +++++++++++++---- tests/unit_tests/test_controller_utils.py | 114 ++++++++++++++++++---- 2 files changed, 165 insertions(+), 36 deletions(-) diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index 0c71357c856..0ab2fd7e117 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -505,20 +505,17 @@ def get_controller_resources( if handle is not None: controller_resources_to_use = handle.launched_resources - if controller_resources_to_use.cloud is not None: - return {controller_resources_to_use} + # If the controller and replicas are from the same cloud (and region/zone), + # it should provide better connectivity. We will let the controller choose + # from the clouds (and regions/zones) of the resources if the user does not + # specify the cloud (and region/zone) for the controller. - # If the controller and replicas are from the same cloud, it should - # provide better connectivity. We will let the controller choose from - # the clouds of the resources if the controller does not exist. - # TODO(tian): Consider respecting the regions/zones specified for the - # resources as well. - requested_clouds: Set['clouds.Cloud'] = set() + requested_clouds_with_region_zone: Dict[str, Dict[Optional[str], + Set[Optional[str]]]] = {} for resource in task_resources: - # cloud is an object and will not be able to be distinguished by set. - # Here we manually check if the cloud is in the set. if resource.cloud is not None: - if not clouds.cloud_in_iterable(resource.cloud, requested_clouds): + cloud_name = str(resource.cloud) + if cloud_name not in requested_clouds_with_region_zone: try: resource.cloud.check_features_are_supported( resources.Resources(), @@ -526,7 +523,26 @@ def get_controller_resources( except exceptions.NotSupportedError: # Skip the cloud if it does not support hosting controllers. continue - requested_clouds.add(resource.cloud) + requested_clouds_with_region_zone[cloud_name] = {} + if resource.region is None: + # If one of the resource.region is None, this could represent + # that the user is unsure about which region the resource is + # hosted in. In this case, we allow any region for this cloud. + requested_clouds_with_region_zone[cloud_name] = {None: {None}} + elif None not in requested_clouds_with_region_zone[cloud_name]: + if resource.region not in requested_clouds_with_region_zone[ + cloud_name]: + requested_clouds_with_region_zone[cloud_name][ + resource.region] = set() + # If one of the resource.zone is None, allow any zone in the + # region. + if resource.zone is None: + requested_clouds_with_region_zone[cloud_name][ + resource.region] = {None} + elif None not in requested_clouds_with_region_zone[cloud_name][ + resource.region]: + requested_clouds_with_region_zone[cloud_name][ + resource.region].add(resource.zone) else: # if one of the resource.cloud is None, this could represent user # does not know which cloud is best for the specified resources. @@ -536,14 +552,49 @@ def get_controller_resources( # - cloud: runpod # accelerators: A40 # In this case, we allow the controller to be launched on any cloud. - requested_clouds.clear() + requested_clouds_with_region_zone.clear() break - if not requested_clouds: + + # Extract filtering criteria from the controller resources specified by the + # user. + controller_cloud = str( + controller_resources_to_use.cloud + ) if controller_resources_to_use.cloud is not None else None + controller_region = controller_resources_to_use.region + controller_zone = controller_resources_to_use.zone + + # Filter clouds if controller_resources_to_use.cloud is specified. + filtered_clouds = ({controller_cloud} if controller_cloud is not None else + requested_clouds_with_region_zone.keys()) + + # Filter regions and zones and construct the result. + result: Set[resources.Resources] = set() + for cloud_name in filtered_clouds: + regions = requested_clouds_with_region_zone.get(cloud_name, + {None: {None}}) + + # Filter regions if controller_resources_to_use.region is specified. + filtered_regions = ({controller_region} if controller_region is not None + else regions.keys()) + + for region in filtered_regions: + zones = regions.get(region, {None}) + + # Filter zones if controller_resources_to_use.zone is specified. + filtered_zones = ({controller_zone} + if controller_zone is not None else zones) + + # Create combinations of cloud, region, and zone. + for zone in filtered_zones: + resource_copy = controller_resources_to_use.copy( + cloud=clouds.CLOUD_REGISTRY.from_str(cloud_name), + region=region, + zone=zone) + result.add(resource_copy) + + if not result: return {controller_resources_to_use} - return { - controller_resources_to_use.copy(cloud=controller_cloud) - for controller_cloud in requested_clouds - } + return result def _setup_proxy_command_on_controller( diff --git a/tests/unit_tests/test_controller_utils.py b/tests/unit_tests/test_controller_utils.py index 7465f648385..f41c7413bc1 100644 --- a/tests/unit_tests/test_controller_utils.py +++ b/tests/unit_tests/test_controller_utils.py @@ -1,5 +1,5 @@ """Test the controller_utils module.""" -from typing import Any, Dict +from typing import Any, Dict, Optional, Set, Tuple import pytest @@ -65,6 +65,24 @@ def get_custom_controller_resources(keys, default): controller_resources_config, k, v) +def _check_controller_resources( + controller_resources: Set[sky.Resources], + expected_combinations: Set[Tuple[Optional[str], Optional[str], + Optional[str]]], + default_controller_resources: Dict[str, Any]) -> None: + """Helper function to check that the controller resources match the + expected combinations.""" + for r in controller_resources: + config = r.to_yaml_config() + cloud = config.pop('cloud') + region = config.pop('region', None) + zone = config.pop('zone', None) + assert (cloud, region, zone) in expected_combinations + expected_combinations.remove((cloud, region, zone)) + assert config == default_controller_resources, config + assert not expected_combinations + + @pytest.mark.parametrize(('controller_type', 'default_controller_resources'), [ ('jobs', managed_job_constants.CONTROLLER_RESOURCES), ('serve', serve_constants.CONTROLLER_RESOURCES), @@ -79,17 +97,12 @@ def test_get_controller_resources_with_task_resources( # could host controllers. Return a set, each item has # one cloud specified plus the default resources. all_clouds = {sky.AWS(), sky.GCP(), sky.Azure()} - all_cloud_names = {str(c) for c in all_clouds} + expected_combinations = {(str(c), None, None) for c in all_clouds} controller_resources = controller_utils.get_controller_resources( controller=controller_utils.Controllers.from_type(controller_type), task_resources=[sky.Resources(cloud=c) for c in all_clouds]) - for r in controller_resources: - config = r.to_yaml_config() - cloud = config.pop('cloud') - assert cloud in all_cloud_names - all_cloud_names.remove(cloud) - assert config == default_controller_resources, config - assert not all_cloud_names + _check_controller_resources(controller_resources, expected_combinations, + default_controller_resources) # 2. All resources has cloud specified. Some of them # could NOT host controllers. Return a set, only @@ -113,19 +126,14 @@ def _could_host_controllers(cloud: sky.clouds.Cloud) -> bool: return False return True - all_cloud_names_expected = { - str(c) for c in all_clouds if _could_host_controllers(c) + expected_combinations = { + (str(c), None, None) for c in all_clouds if _could_host_controllers(c) } controller_resources = controller_utils.get_controller_resources( controller=controller_utils.Controllers.from_type(controller_type), task_resources=[sky.Resources(cloud=c) for c in all_clouds]) - for r in controller_resources: - config = r.to_yaml_config() - cloud = config.pop('cloud') - assert cloud in all_cloud_names_expected - all_cloud_names_expected.remove(cloud) - assert config == default_controller_resources, config - assert not all_cloud_names_expected + _check_controller_resources(controller_resources, expected_combinations, + default_controller_resources) # 3. Some resources does not have cloud specified. # Return the default resources. @@ -138,3 +146,73 @@ def _could_host_controllers(cloud: sky.clouds.Cloud) -> bool: assert len(controller_resources) == 1 config = list(controller_resources)[0].to_yaml_config() assert config == default_controller_resources, config + + # 4. All resources have clouds, regions, and zones specified. + # Return a set of controller resources for all combinations of clouds, + # regions, and zones. Each combination should contain the default resources + # along with the cloud, region, and zone. + all_cloud_regions_zones = [ + sky.Resources(cloud=sky.AWS(), region='us-east-1', zone='us-east-1a'), + sky.Resources(cloud=sky.AWS(), region='ap-south-1', zone='ap-south-1b'), + sky.Resources(cloud=sky.GCP(), + region='us-central1', + zone='us-central1-a'), + sky.Resources(cloud=sky.GCP(), + region='europe-west1', + zone='europe-west1-b') + ] + expected_combinations = {('AWS', 'us-east-1', 'us-east-1a'), + ('AWS', 'ap-south-1', 'ap-south-1b'), + ('GCP', 'us-central1', 'us-central1-a'), + ('GCP', 'europe-west1', 'europe-west1-b')} + controller_resources = controller_utils.get_controller_resources( + controller=controller_utils.Controllers.from_type(controller_type), + task_resources=all_cloud_regions_zones) + _check_controller_resources(controller_resources, expected_combinations, + default_controller_resources) + + # 5. Clouds and regions are specified, but zones are partially specified. + # Return a set containing combinations where the zone is None when not all + # zones are specified in the input for the given region. The default + # resources should be returned along with the cloud and region, and the + # zone (if specified). + controller_resources = controller_utils.get_controller_resources( + controller=controller_utils.Controllers.from_type(controller_type), + task_resources=[ + sky.Resources(cloud=sky.AWS(), region='us-west-2'), + sky.Resources(cloud=sky.AWS(), + region='us-west-2', + zone='us-west-2b'), + sky.Resources(cloud=sky.GCP(), + region='us-central1', + zone='us-central1-a') + ]) + expected_combinations = {('AWS', 'us-west-2', None), + ('GCP', 'us-central1', 'us-central1-a')} + _check_controller_resources(controller_resources, expected_combinations, + default_controller_resources) + + # 6. Mixed case: Some resources have clouds and regions or zones, others do + # not. For clouds where regions or zones are not specified in the input, + # return None for those fields. The default resources should be returned + # along with the cloud, region (if specified), and zone (if specified). + controller_resources = controller_utils.get_controller_resources( + controller=controller_utils.Controllers.from_type(controller_type), + task_resources=[ + sky.Resources(cloud=sky.GCP(), region='europe-west1'), + sky.Resources(cloud=sky.GCP()), + sky.Resources(cloud=sky.AWS(), + region='eu-north-1', + zone='eu-north-1a'), + sky.Resources(cloud=sky.AWS(), region='eu-north-1'), + sky.Resources(cloud=sky.AWS(), region='ap-south-1'), + sky.Resources(cloud=sky.Azure()), + ]) + expected_combinations = { + ('AWS', 'eu-north-1', None), + ('AWS', 'ap-south-1', None), + ('GCP', None, None), + ('Azure', None, None), + } + _check_controller_resources(controller_resources, expected_combinations, + default_controller_resources) From 13ad9169bd81afa5aab610d7a308a39ddf81d074 Mon Sep 17 00:00:00 2001 From: Yika Date: Thu, 24 Oct 2024 18:48:57 -0700 Subject: [PATCH 03/17] Upload all cloud credentials to sky cluster regardless of sky check (#4165) * Upload all cloud credentials to sky cluster regardless of sky check * address comments * nit Co-authored-by: Zhanghao Wu --------- Co-authored-by: Zhanghao Wu --- sky/check.py | 15 +++++++++++---- sky/clouds/oci.py | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sky/check.py b/sky/check.py index 9ac2848733c..dcaa349d234 100644 --- a/sky/check.py +++ b/sky/check.py @@ -1,4 +1,5 @@ """Credential checks: check cloud credentials and enable clouds.""" +import os import traceback from types import ModuleType from typing import Dict, Iterable, List, Optional, Tuple, Union @@ -194,19 +195,25 @@ def get_cached_enabled_clouds_or_refresh( def get_cloud_credential_file_mounts( excluded_clouds: Optional[Iterable[sky_clouds.Cloud]] ) -> Dict[str, str]: - """Returns the files necessary to access all enabled clouds. + """Returns the files necessary to access all clouds. Returns a dictionary that will be added to a task's file mounts and a list of patterns that will be excluded (used as rsync_exclude). """ - enabled_clouds = get_cached_enabled_clouds_or_refresh() + # Uploading credentials for all clouds instead of only sky check + # enabled clouds because users may have partial credentials for some + # clouds to access their specific resources (e.g. cloud storage) but + # not have the complete credentials to pass sky check. + clouds = sky_clouds.CLOUD_REGISTRY.values() file_mounts = {} - for cloud in enabled_clouds: + for cloud in clouds: if (excluded_clouds is not None and sky_clouds.cloud_in_iterable(cloud, excluded_clouds)): continue cloud_file_mounts = cloud.get_credential_file_mounts() - file_mounts.update(cloud_file_mounts) + for remote_path, local_path in cloud_file_mounts.items(): + if os.path.exists(os.path.expanduser(local_path)): + file_mounts[remote_path] = local_path # Currently, get_cached_enabled_clouds_or_refresh() does not support r2 as # only clouds with computing instances are marked as enabled by skypilot. # This will be removed when cloudflare/r2 is added as a 'cloud'. diff --git a/sky/clouds/oci.py b/sky/clouds/oci.py index 810e43fe3b5..c6451a73a1f 100644 --- a/sky/clouds/oci.py +++ b/sky/clouds/oci.py @@ -468,7 +468,7 @@ def get_credential_file_mounts(self) -> Dict[str, str]: api_key_file = oci_cfg[ 'key_file'] if 'key_file' in oci_cfg else 'BadConf' sky_cfg_file = oci_utils.oci_config.get_sky_user_config_file() - except ImportError: + except (ImportError, oci_adaptor.oci.exceptions.ConfigFileNotFound): return {} # OCI config and API key file are mandatory From 149713e9acde77df6d40ee17cd2cd6bd349d5edc Mon Sep 17 00:00:00 2001 From: Yika Date: Thu, 24 Oct 2024 20:06:02 -0700 Subject: [PATCH 04/17] [Performance] Allow users to pass in Azure community images at --image-id (#4145) * Allow users to pass in community image as image-id * Add image fallback * Address comments * address comments * Resolve region failover * address comments --- sky/clouds/azure.py | 136 +++++++++++++------- sky/clouds/service_catalog/azure_catalog.py | 15 +++ sky/clouds/utils/azure_utils.py | 91 +++++++++++++ sky/resources.py | 1 + tests/unit_tests/test_azure_utils.py | 21 +++ 5 files changed, 214 insertions(+), 50 deletions(-) create mode 100644 sky/clouds/utils/azure_utils.py create mode 100644 tests/unit_tests/test_azure_utils.py diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index adffd32ad88..d91f589ca8f 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -15,6 +15,7 @@ from sky import sky_logging from sky.adaptors import azure from sky.clouds import service_catalog +from sky.clouds.utils import azure_utils from sky.utils import common_utils from sky.utils import resources_utils from sky.utils import ux_utils @@ -36,6 +37,15 @@ _DEFAULT_AZURE_UBUNTU_HPC_IMAGE_GB = 30 _DEFAULT_AZURE_UBUNTU_2004_IMAGE_GB = 150 +_DEFAULT_SKYPILOT_IMAGE_GB = 30 + +_DEFAULT_CPU_IMAGE_ID = 'skypilot:gpu-ubuntu-2204' +_DEFAULT_GPU_IMAGE_ID = 'skypilot:gpu-ubuntu-2204' +_DEFAULT_V1_IMAGE_ID = 'skypilot:v1-ubuntu-2004' +_DEFAULT_GPU_K80_IMAGE_ID = 'skypilot:k80-ubuntu-2004' +_FALLBACK_IMAGE_ID = 'skypilot:gpu-ubuntu-2204' + +_COMMUNITY_IMAGE_PREFIX = '/CommunityGalleries' def _run_output(cmd): @@ -132,29 +142,56 @@ def get_egress_cost(self, num_gigabytes: float): cost += 0.0 return cost + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[resources_utils.DiskTier] = None + ) -> Optional[str]: + return service_catalog.get_default_instance_type(cpus=cpus, + memory=memory, + disk_tier=disk_tier, + clouds='azure') + @classmethod def get_image_size(cls, image_id: str, region: Optional[str]) -> float: - if region is None: - # The region used here is only for where to send the query, - # not the image location. Azure's image is globally available. - region = 'eastus' - is_skypilot_image_tag = False + # Process skypilot images. if image_id.startswith('skypilot:'): - is_skypilot_image_tag = True image_id = service_catalog.get_image_id_from_tag(image_id, clouds='azure') - image_id_splitted = image_id.split(':') - if len(image_id_splitted) != 4: - with ux_utils.print_exception_no_traceback(): - raise ValueError(f'Invalid image id: {image_id}. Expected ' - 'format: :::') - publisher, offer, sku, version = image_id_splitted - if is_skypilot_image_tag: - if offer == 'ubuntu-hpc': - return _DEFAULT_AZURE_UBUNTU_HPC_IMAGE_GB + if image_id.startswith(_COMMUNITY_IMAGE_PREFIX): + # Avoid querying the image size from Azure as + # all skypilot custom images have the same size. + return _DEFAULT_SKYPILOT_IMAGE_GB else: - return _DEFAULT_AZURE_UBUNTU_2004_IMAGE_GB + publisher, offer, sku, version = image_id.split(':') + if offer == 'ubuntu-hpc': + return _DEFAULT_AZURE_UBUNTU_HPC_IMAGE_GB + else: + return _DEFAULT_AZURE_UBUNTU_2004_IMAGE_GB + + # Process user-specified images. + azure_utils.validate_image_id(image_id) compute_client = azure.get_client('compute', cls.get_project_id()) + + # Community gallery image. + if image_id.startswith(_COMMUNITY_IMAGE_PREFIX): + if region is None: + return 0.0 + _, _, gallery_name, _, image_name = image_id.split('/') + try: + return azure_utils.get_community_image_size( + compute_client, gallery_name, image_name, region) + except exceptions.ResourcesUnavailableError: + return 0.0 + + # Marketplace image + if region is None: + # The region used here is only for where to send the query, + # not the image location. Marketplace image is globally available. + region = 'eastus' + publisher, offer, sku, version = image_id.split(':') try: image = compute_client.virtual_machine_images.get( region, publisher, offer, sku, version) @@ -176,40 +213,23 @@ def get_image_size(cls, image_id: str, region: Optional[str]) -> float: size_in_gb = size_in_bytes / (1024**3) return size_in_gb - @classmethod - def get_default_instance_type( - cls, - cpus: Optional[str] = None, - memory: Optional[str] = None, - disk_tier: Optional[resources_utils.DiskTier] = None - ) -> Optional[str]: - return service_catalog.get_default_instance_type(cpus=cpus, - memory=memory, - disk_tier=disk_tier, - clouds='azure') - def _get_default_image_tag(self, gen_version, instance_type) -> str: # ubuntu-2004 v21.08.30, K80 requires image with old NVIDIA driver version acc = self.get_accelerators_from_instance_type(instance_type) if acc is not None: acc_name = list(acc.keys())[0] if acc_name == 'K80': - return 'skypilot:k80-ubuntu-2004' - - # ubuntu-2004 v21.11.04, the previous image we used in the past for - # V1 HyperV instance before we change default image to ubuntu-hpc. + return _DEFAULT_GPU_K80_IMAGE_ID + # About Gen V1 vs V2: # In Azure, all instances with K80 (Standard_NC series), some # instances with M60 (Standard_NV series) and some cpu instances - # (Basic_A, Standard_D, ...) are V1 instance. For these instances, - # we use the previous image. + # (Basic_A, Standard_D, ...) are V1 instance. + # All A100 instances are V2. if gen_version == 'V1': - return 'skypilot:v1-ubuntu-2004' - - # nvidia-driver: 535.54.03, cuda: 12.2 - # see: https://github.com/Azure/azhpc-images/releases/tag/ubuntu-hpc-20230803 - # All A100 instances is of gen2, so it will always use - # the latest ubuntu-hpc:2204 image. - return 'skypilot:gpu-ubuntu-2204' + return _DEFAULT_V1_IMAGE_ID + if acc is None: + return _DEFAULT_CPU_IMAGE_ID + return _DEFAULT_GPU_IMAGE_ID @classmethod def regions_with_offering(cls, instance_type: str, @@ -302,17 +322,34 @@ def make_deploy_resources_variables( else: assert region_name in resources.image_id, resources.image_id image_id = resources.image_id[region_name] + + # Checked basic image syntax in resources.py if image_id.startswith('skypilot:'): image_id = service_catalog.get_image_id_from_tag(image_id, clouds='azure') - # Already checked in resources.py - publisher, offer, sku, version = image_id.split(':') - image_config = { - 'image_publisher': publisher, - 'image_offer': offer, - 'image_sku': sku, - 'image_version': version, - } + # Fallback if image does not exist in the specified region. + # Putting fallback here instead of at image validation + # when creating the resource because community images are + # regional so we need the correct region when we check whether + # the image exists. + if image_id.startswith( + _COMMUNITY_IMAGE_PREFIX + ) and region_name not in azure_catalog.COMMUNITY_IMAGE_AVAILABLE_REGIONS: + logger.info(f'Azure image {image_id} does not exist in region ' + f'{region_name} so use the fallback image instead.') + image_id = service_catalog.get_image_id_from_tag( + _FALLBACK_IMAGE_ID, clouds='azure') + + if image_id.startswith(_COMMUNITY_IMAGE_PREFIX): + image_config = {'community_gallery_image_id': image_id} + else: + publisher, offer, sku, version = image_id.split(':') + image_config = { + 'image_publisher': publisher, + 'image_offer': offer, + 'image_sku': sku, + 'image_version': version, + } # Setup the A10 nvidia driver. need_nvidia_driver_extension = (acc_dict is not None and @@ -380,7 +417,6 @@ def _failover_disk_tier() -> Optional[resources_utils.DiskTier]: # Setting disk performance tier for high disk tier. if disk_tier == resources_utils.DiskTier.HIGH: resources_vars['disk_performance_tier'] = 'P50' - return resources_vars def _get_feasible_launchable_resources( diff --git a/sky/clouds/service_catalog/azure_catalog.py b/sky/clouds/service_catalog/azure_catalog.py index 2d323cbac5f..c71285fe9a3 100644 --- a/sky/clouds/service_catalog/azure_catalog.py +++ b/sky/clouds/service_catalog/azure_catalog.py @@ -12,6 +12,21 @@ from sky.utils import resources_utils from sky.utils import ux_utils +# This list should match the list of regions in +# skypilot image generation Packer script's replication_regions +# sky/clouds/service_catalog/images/skypilot-azure-cpu-ubuntu.pkr.hcl +COMMUNITY_IMAGE_AVAILABLE_REGIONS = { + 'centralus', + 'eastus', + 'eastus2', + 'northcentralus', + 'southcentralus', + 'westcentralus', + 'westus', + 'westus2', + 'westus3', +} + # The frequency of pulling the latest catalog from the cloud provider. # Though the catalog update is manual in our skypilot-catalog repo, we # still want to pull the latest catalog periodically to make sure the diff --git a/sky/clouds/utils/azure_utils.py b/sky/clouds/utils/azure_utils.py new file mode 100644 index 00000000000..83b86f4d54f --- /dev/null +++ b/sky/clouds/utils/azure_utils.py @@ -0,0 +1,91 @@ +"""Utilies for Azure""" + +import typing + +from sky import exceptions +from sky.adaptors import azure +from sky.utils import ux_utils + +if typing.TYPE_CHECKING: + from azure.mgmt import compute as azure_compute + from azure.mgmt.compute import models as azure_compute_models + + +def validate_image_id(image_id: str): + """Check if the image ID has a valid format. + + Raises: + ValueError: If the image ID is invalid. + """ + image_id_colon_splitted = image_id.split(':') + image_id_slash_splitted = image_id.split('/') + if len(image_id_slash_splitted) != 5 and len(image_id_colon_splitted) != 4: + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'Invalid image id for Azure: {image_id}. Expected format: \n' + '* Marketplace image ID: :::\n' + '* Community image ID: ' + '/CommunityGalleries//Images/') + if len(image_id_slash_splitted) == 5: + _, gallery_type, _, image_type, _ = image_id.split('/') + if gallery_type != 'CommunityGalleries' or image_type != 'Images': + with ux_utils.print_exception_no_traceback(): + raise ValueError( + f'Invalid community image id for Azure: {image_id}.\n' + 'Expected format: ' + '/CommunityGalleries//Images/') + + +def get_community_image( + compute_client: 'azure_compute.ComputeManagementClient', image_id: str, + region: str) -> 'azure_compute_models.CommunityGalleryImage': + """Get community image from cloud. + + Args: + image_id: /CommunityGalleries//Images/ + Raises: + ResourcesUnavailableError + """ + try: + _, _, gallery_name, _, image_name = image_id.split('/') + return compute_client.community_gallery_images.get( + location=region, + public_gallery_name=gallery_name, + gallery_image_name=image_name) + except azure.exceptions().AzureError as e: + raise exceptions.ResourcesUnavailableError( + f'Community image {image_id} does not exist in region {region}.' + ) from e + + +def get_community_image_size( + compute_client: 'azure_compute.ComputeManagementClient', + gallery_name: str, image_name: str, region: str) -> float: + """Get the size of the community image from cloud. + + Args: + image_id: /CommunityGalleries//Images/ + Raises: + ResourcesUnavailableError + """ + try: + image_versions = compute_client.community_gallery_image_versions.list( + location=region, + public_gallery_name=gallery_name, + gallery_image_name=image_name, + ) + image_versions = list(image_versions) + if not image_versions: + raise exceptions.ResourcesUnavailableError( + f'No versions available for Azure community image {image_name}') + latest_version = image_versions[-1].name + + image_details = compute_client.community_gallery_image_versions.get( + location=region, + public_gallery_name=gallery_name, + gallery_image_name=image_name, + gallery_image_version_name=latest_version) + return image_details.storage_profile.os_disk_image.disk_size_gb + except azure.exceptions().AzureError as e: + raise exceptions.ResourcesUnavailableError( + f'Failed to get community image size: {e}.') from e diff --git a/sky/resources.py b/sky/resources.py index 384f2b6a548..540cbfb703c 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -225,6 +225,7 @@ def __init__( self._set_memory(memory) self._set_accelerators(accelerators, accelerator_args) + # TODO: move these out of init to prevent repeated calls. self._try_validate_instance_type() self._try_validate_cpus_mem() self._try_validate_managed_job_attributes() diff --git a/tests/unit_tests/test_azure_utils.py b/tests/unit_tests/test_azure_utils.py new file mode 100644 index 00000000000..93ef5caadb0 --- /dev/null +++ b/tests/unit_tests/test_azure_utils.py @@ -0,0 +1,21 @@ +import pytest + +from sky.clouds.utils import azure_utils + + +def test_validate_image_id(): + # Valid marketplace image ID + azure_utils.validate_image_id("publisher:offer:sku:version") + + # Valid community image ID + azure_utils.validate_image_id( + "/CommunityGalleries/gallery-name/Images/image-name") + + # Invalid format (neither marketplace nor community) + with pytest.raises(ValueError): + azure_utils.validate_image_id( + "CommunityGalleries/gallery-name/Images/image-name") + + # Invalid marketplace image ID (too few parts) + with pytest.raises(ValueError): + azure_utils.validate_image_id("publisher:offer:sku") From 7c5b7e0baf593080c526387e200407c89f4459e4 Mon Sep 17 00:00:00 2001 From: Yika Date: Thu, 24 Oct 2024 20:11:11 -0700 Subject: [PATCH 05/17] [Performance] Add Azure packer scripts for custom images (#4142) * [Performance] Add Azure packer scripts for custom images * address comment --- sky/clouds/service_catalog/images/README.md | 55 ++++++++----- .../images/skypilot-aws-cpu-ubuntu.pkr.hcl | 6 +- .../images/skypilot-aws-gpu-ubuntu.pkr.hcl | 6 +- .../images/skypilot-azure-cpu-ubuntu.pkr.hcl | 72 +++++++++++++++++ .../images/skypilot-azure-gpu-ubuntu.pkr.hcl | 78 +++++++++++++++++++ 5 files changed, 193 insertions(+), 24 deletions(-) create mode 100644 sky/clouds/service_catalog/images/skypilot-azure-cpu-ubuntu.pkr.hcl create mode 100644 sky/clouds/service_catalog/images/skypilot-azure-gpu-ubuntu.pkr.hcl diff --git a/sky/clouds/service_catalog/images/README.md b/sky/clouds/service_catalog/images/README.md index 31ce7c6d9ce..3bdcbf86560 100644 --- a/sky/clouds/service_catalog/images/README.md +++ b/sky/clouds/service_catalog/images/README.md @@ -10,42 +10,58 @@ packer init plugins.pkr.hcl 3. Setup cloud credentials ## Generate Images -```bash -export CLOUD=gcp # Update this -export TYPE=gpu # Update this -export IMAGE=skypilot-${CLOUD}-${TYPE}-ubuntu -packer build ${IMAGE}.pkr.hcl -``` -You will see the image ID after the build is complete. - -FYI time to packer build an image: - +FYI time to packer build images: | Cloud | Type | Approx. Time | |-------|------|------------------------| | AWS | GPU | 15 min | | AWS | CPU | 10 min | | GCP | GPU | 16 min | | GCP | CPU | 5 min | +| Azure | GPU | 35 min | +| Azure | CPU | 25 min | ### GCP +1. Build a single global image. +```bash +export TYPE=gpu # Update this +export IMAGE=skypilot-gcp-${TYPE}-ubuntu +packer build ${IMAGE}.pkr.hcl +``` +2. Make the image public ```bash -export IMAGE_NAME=skypilot-gcp-cpu-ubuntu-20241011003407 # Update this - # Make image public +export IMAGE_NAME=skypilot-gcp-cpu-ubuntu-xxx # Update this export IMAGE_ID=projects/sky-dev-465/global/images/${IMAGE_NAME} gcloud compute images add-iam-policy-binding ${IMAGE_NAME} --member='allAuthenticatedUsers' --role='roles/compute.imageUser' ``` ### AWS -1. Generate images for all regions +1. Generate the source image for a single region. +```bash +export TYPE=gpu # Update this +export IMAGE=skypilot-aws-${TYPE}-ubuntu +packer build ${IMAGE}.pkr.hcl +``` +2. Copy images to all regions ```bash export IMAGE_ID=ami-0b31b24524afa8e47 # Update this - python aws_utils/image_gen.py --image-id ${IMAGE_ID} --processor ${TYPE} ``` -2. Add fallback images if any region failed \ +3. Add fallback images if any region failed \ Look for "NEED_FALLBACK" in the output `images.csv` and edit. (You can use public [ubuntu images](https://cloud-images.ubuntu.com/locator/ec2/) as fallback.) +### Azure +1. Generate a client secret for packer [here](https://portal.azure.com/?feature.msaljs=true#view/Microsoft_AAD_RegisteredApps/ApplicationMenuBlade/~/Credentials/appId/1d249f23-c22e-4d02-b62b-a6827bd113fe/isMSAApp~/false). +```bash +export SECRET=xxxxxx # Update this +``` +2. Build and copy images for all regions and both VM generations (1 and 2). +```bash +export VM_GENERATION=2 # Update this +packer build -force --var vm_generation=${VM_GENERATION} --var client_secret=${SECRET} skypilot-azure-cpu-ubuntu.pkr.hcl +packer build --var client_secret=${SECRET} skypilot-azure-gpu-ubuntu.pkr.hcl +``` + ## Test Images 1. Minimal GPU test: `sky launch --image ${IMAGE_ID} --gpus=L4:1 --cloud ${CLOUD}` then run `nvidia-smi` in the launched instance. 2. Update the image ID in `sky/clouds/gcp.py` and run the test: @@ -60,13 +76,16 @@ pytest tests/test_smoke.py::test_cancel_gcp Submit a PR to update [`SkyPilot Catalog`](https://github.com/skypilot-org/skypilot-catalog/tree/master/catalogs) then clean up the old images to avoid extra iamge storage fees. ### GCP -1. Example PR: [#86](https://github.com/skypilot-org/skypilot-catalog/pull/86) -2. Go to console and delete old images. +1. Update Catalog with new images: [example PR](https://github.com/skypilot-org/skypilot-catalog/pull/86) +2. Go to [GCP console](https://console.cloud.google.com/compute/images?tab=images&project=sky-dev-465) and delete old images. ### AWS 1. Copy the old custom image rows from Catalog's existing `images.csv` to a local `images.csv` in this folder. -2. Update Catalog with new images. Example PR: [#89](https://github.com/skypilot-org/skypilot-catalog/pull/89) +2. Update Catalog with new images: [example PR](https://github.com/skypilot-org/skypilot-catalog/pull/89) 3. Delete AMIs across regions by running ```bash python aws_utils/image_delete.py --tag ${TAG} ``` + +### Azure +1. Update Catalog with new images: [example PR](https://github.com/skypilot-org/skypilot-catalog/pull/92) diff --git a/sky/clouds/service_catalog/images/skypilot-aws-cpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-aws-cpu-ubuntu.pkr.hcl index c21fbf51b20..5b049cf35ec 100644 --- a/sky/clouds/service_catalog/images/skypilot-aws-cpu-ubuntu.pkr.hcl +++ b/sky/clouds/service_catalog/images/skypilot-aws-cpu-ubuntu.pkr.hcl @@ -22,9 +22,9 @@ source "amazon-ebs" "cpu-ubuntu" { owners = ["099720109477"] } launch_block_device_mappings { - device_name = "/dev/sda1" - volume_size = 8 - volume_type = "gp2" + device_name = "/dev/sda1" + volume_size = 8 + volume_type = "gp2" delete_on_termination = true } } diff --git a/sky/clouds/service_catalog/images/skypilot-aws-gpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-aws-gpu-ubuntu.pkr.hcl index c4a8efac4dc..4579987768a 100644 --- a/sky/clouds/service_catalog/images/skypilot-aws-gpu-ubuntu.pkr.hcl +++ b/sky/clouds/service_catalog/images/skypilot-aws-gpu-ubuntu.pkr.hcl @@ -22,9 +22,9 @@ source "amazon-ebs" "gpu-ubuntu" { owners = ["099720109477"] } launch_block_device_mappings { - device_name = "/dev/sda1" - volume_size = 30 - volume_type = "gp2" + device_name = "/dev/sda1" + volume_size = 30 + volume_type = "gp2" delete_on_termination = true } } diff --git a/sky/clouds/service_catalog/images/skypilot-azure-cpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-azure-cpu-ubuntu.pkr.hcl new file mode 100644 index 00000000000..2a07c41d136 --- /dev/null +++ b/sky/clouds/service_catalog/images/skypilot-azure-cpu-ubuntu.pkr.hcl @@ -0,0 +1,72 @@ +variable "client_secret" { + type = string + description = "The client secret for the packer client registered in Azure (see Azure app registration)" +} + +variable "vm_generation" { + type = number + description = "Azure's VM generation, currently support 1 or 2" +} + +locals { + timestamp = regex_replace(timestamp(), "[- TZ:]", "") + version = formatdate("YY.MM.DD", timestamp()) +} + +source "azure-arm" "cpu-ubuntu" { + managed_image_resource_group_name = "skypilot-images" + managed_image_name = "skypilot-azure-cpu-ubuntu-${local.timestamp}" + + subscription_id = "59d8c23c-7ef5-42c7-b2f3-a919ad8026a7" + tenant_id = "7c81f068-46f8-4b26-9a46-2fbec2287e3d" + client_id = "1d249f23-c22e-4d02-b62b-a6827bd113fe" + client_secret = var.client_secret + + os_type = "Linux" + image_publisher = "Canonical" + image_offer = "0001-com-ubuntu-server-jammy" + image_sku = var.vm_generation == 1 ? "22_04-lts" : "22_04-lts-gen2" + location = "centralus" + vm_size = var.vm_generation == 1 ? "Standard_D1_v2" : "Standard_B2s" + ssh_username = "azureuser" + azure_tags = { + Created_by = "packer" + Purpose = "skypilot" + } + + shared_image_gallery_destination { + subscription = "59d8c23c-7ef5-42c7-b2f3-a919ad8026a7" + resource_group = "skypilot-images" + gallery_name = "skypilot_image_gallery" + image_name = "skypilot-cpu-gen${var.vm_generation}" + image_version = "${local.version}" + replication_regions = [ + "centralus", + "eastus", + "eastus2", + "northcentralus", + "southcentralus", + "westcentralus", + "westus", + "westus2", + "westus3" + ] + } +} + +build { + name = "azure-cpu-ubuntu-build" + sources = ["sources.azure-arm.cpu-ubuntu"] + provisioner "shell" { + script = "./provisioners/docker.sh" + } + provisioner "shell" { + script = "./provisioners/skypilot.sh" + } + provisioner "shell" { + environment_vars = [ + "CLOUD=azure", + ] + script = "./provisioners/cloud.sh" + } +} diff --git a/sky/clouds/service_catalog/images/skypilot-azure-gpu-ubuntu.pkr.hcl b/sky/clouds/service_catalog/images/skypilot-azure-gpu-ubuntu.pkr.hcl new file mode 100644 index 00000000000..97c99b2431e --- /dev/null +++ b/sky/clouds/service_catalog/images/skypilot-azure-gpu-ubuntu.pkr.hcl @@ -0,0 +1,78 @@ +variable "client_secret" { + type = string + description = "The client secret for the packer client registered in Azure (see Azure app registration)" +} + +variable "vm_generation" { + type = number + description = "Azure's VM generation, currently support 1 or 2" +} + +locals { + timestamp = regex_replace(timestamp(), "[- TZ:]", "") + version = formatdate("YY.MM.DD", timestamp()) +} + +source "azure-arm" "gpu-ubuntu" { + managed_image_resource_group_name = "skypilot-images" + managed_image_name = "skypilot-azure-gpu-ubuntu-${local.timestamp}" + + subscription_id = "59d8c23c-7ef5-42c7-b2f3-a919ad8026a7" + tenant_id = "7c81f068-46f8-4b26-9a46-2fbec2287e3d" + client_id = "1d249f23-c22e-4d02-b62b-a6827bd113fe" + client_secret = var.client_secret + + os_type = "Linux" + image_publisher = "Canonical" + image_offer = "0001-com-ubuntu-server-jammy" + image_sku = var.vm_generation == 1 ? "22_04-lts" : "22_04-lts-gen2" + location = var.vm_generation == 1 ? "eastus" : "centralus" + vm_size = var.vm_generation == 1 ? "Standard_NC4as_T4_v3" : "Standard_NC24ads_A100_v4" + ssh_username = "azureuser" + azure_tags = { + Created_by = "packer" + Purpose = "skypilot" + } + + shared_image_gallery_destination { + subscription = "59d8c23c-7ef5-42c7-b2f3-a919ad8026a7" + resource_group = "skypilot-images" + gallery_name = var.vm_generation == 1 ? "skypilot_images": "skypilot_image_gallery" + image_name = "skypilot-gpu-gen${var.vm_generation}" + image_version = "${local.version}" + replication_regions = [ + "centralus", + "eastus", + "eastus2", + "northcentralus", + "southcentralus", + "westcentralus", + "westus", + "westus2", + "westus3" + ] + } +} + +build { + name = "azure-gpu-ubuntu-build" + sources = ["sources.azure-arm.gpu-ubuntu"] + provisioner "shell" { + script = "./provisioners/docker.sh" + } + provisioner "shell" { + script = "./provisioners/cuda.sh" + } + provisioner "shell" { + script = "./provisioners/nvidia-container-toolkit.sh" + } + provisioner "shell" { + script = "./provisioners/skypilot.sh" + } + provisioner "shell" { + environment_vars = [ + "CLOUD=azure", + ] + script = "./provisioners/cloud.sh" + } +} From 057bc4b44755ac1e9dadc680e022c369e8ddff52 Mon Sep 17 00:00:00 2001 From: landscapepainter <34902420+landscapepainter@users.noreply.github.com> Date: Thu, 24 Oct 2024 21:58:31 -0700 Subject: [PATCH 06/17] [Azure] Fix to sync NSG status while opening ports (#3844) * fix to update NSG status while opening ports * nit * format * refactor check for nsg creation * format * nit * format * Update sky/provision/azure/config.py Co-authored-by: Zhanghao Wu * Update sky/provision/azure/instance.py Co-authored-by: Zhanghao Wu * Update sky/provision/azure/config.py Co-authored-by: Zhanghao Wu * Update sky/provision/azure/config.py Co-authored-by: Zhanghao Wu * format * additional TODO comments --------- Co-authored-by: Zhanghao Wu --- .../azure/azure-config-template.json | 8 +- sky/provision/azure/config.py | 31 +++- sky/provision/azure/instance.py | 141 +++++++++++------- 3 files changed, 121 insertions(+), 59 deletions(-) diff --git a/sky/provision/azure/azure-config-template.json b/sky/provision/azure/azure-config-template.json index 489783faf98..c743dd40215 100644 --- a/sky/provision/azure/azure-config-template.json +++ b/sky/provision/azure/azure-config-template.json @@ -13,6 +13,12 @@ "metadata": { "description": "Subnet parameters." } + }, + "nsgName": { + "type": "string", + "metadata": { + "description": "Name of the Network Security Group associated with the SkyPilot cluster." + } } }, "variables": { @@ -20,7 +26,7 @@ "location": "[resourceGroup().location]", "msiName": "[concat('sky-', parameters('clusterId'), '-msi')]", "roleAssignmentName": "[concat('sky-', parameters('clusterId'), '-ra')]", - "nsgName": "[concat('sky-', parameters('clusterId'), '-nsg')]", + "nsgName": "[parameters('nsgName')]", "nsg": "[resourceId('Microsoft.Network/networkSecurityGroups', variables('nsgName'))]", "vnetName": "[concat('sky-', parameters('clusterId'), '-vnet')]", "subnetName": "[concat('sky-', parameters('clusterId'), '-subnet')]" diff --git a/sky/provision/azure/config.py b/sky/provision/azure/config.py index 22982a99075..afa94b4adbe 100644 --- a/sky/provision/azure/config.py +++ b/sky/provision/azure/config.py @@ -8,7 +8,7 @@ from pathlib import Path import random import time -from typing import Any, Callable +from typing import Any, Callable, Tuple from sky import exceptions from sky import sky_logging @@ -22,6 +22,7 @@ _DEPLOYMENT_NAME = 'skypilot-config' _LEGACY_DEPLOYMENT_NAME = 'ray-config' _RESOURCE_GROUP_WAIT_FOR_DELETION_TIMEOUT = 480 # 8 minutes +_CLUSTER_ID = '{cluster_name_on_cloud}-{unique_id}' def get_azure_sdk_function(client: Any, function_name: str) -> Callable: @@ -41,6 +42,19 @@ def get_azure_sdk_function(client: Any, function_name: str) -> Callable: return func +def get_cluster_id_and_nsg_name(resource_group: str, + cluster_name_on_cloud: str) -> Tuple[str, str]: + hasher = hashlib.md5(resource_group.encode('utf-8')) + unique_id = hasher.hexdigest()[:UNIQUE_ID_LEN] + # We use the cluster name + resource group hash as the + # unique ID for the cluster, as we need to make sure that + # the deployments have unique names during failover. + cluster_id = _CLUSTER_ID.format(cluster_name_on_cloud=cluster_name_on_cloud, + unique_id=unique_id) + nsg_name = f'sky-{cluster_id}-nsg' + return cluster_id, nsg_name + + @common.log_function_start_end def bootstrap_instances( region: str, cluster_name_on_cloud: str, @@ -117,12 +131,13 @@ def bootstrap_instances( logger.info(f'Using cluster name: {cluster_name_on_cloud}') - hasher = hashlib.md5(provider_config['resource_group'].encode('utf-8')) - unique_id = hasher.hexdigest()[:UNIQUE_ID_LEN] + cluster_id, nsg_name = get_cluster_id_and_nsg_name( + resource_group=provider_config['resource_group'], + cluster_name_on_cloud=cluster_name_on_cloud) subnet_mask = provider_config.get('subnet_mask') if subnet_mask is None: # choose a random subnet, skipping most common value of 0 - random.seed(unique_id) + random.seed(cluster_id) subnet_mask = f'10.{random.randint(1, 254)}.0.0/16' logger.info(f'Using subnet mask: {subnet_mask}') @@ -135,10 +150,10 @@ def bootstrap_instances( 'value': subnet_mask }, 'clusterId': { - # We use the cluster name + resource group hash as the - # unique ID for the cluster, as we need to make sure that - # the deployments have unique names during failover. - 'value': f'{cluster_name_on_cloud}-{unique_id}' + 'value': cluster_id + }, + 'nsgName': { + 'value': nsg_name }, }, } diff --git a/sky/provision/azure/instance.py b/sky/provision/azure/instance.py index f6c865e29c8..cc2dc692dec 100644 --- a/sky/provision/azure/instance.py +++ b/sky/provision/azure/instance.py @@ -15,6 +15,7 @@ from sky.adaptors import azure from sky.provision import common from sky.provision import constants +from sky.provision.azure import config as config_lib from sky.utils import common_utils from sky.utils import subprocess_utils from sky.utils import ux_utils @@ -31,6 +32,8 @@ # https://github.com/Azure/azure-sdk-for-python/issues/9422 azure_logger = logging.getLogger('azure') azure_logger.setLevel(logging.WARNING) +Client = Any +NetworkSecurityGroup = Any _RESUME_INSTANCE_TIMEOUT = 480 # 8 minutes _RESUME_PER_INSTANCE_TIMEOUT = 120 # 2 minutes @@ -40,6 +43,10 @@ _RESOURCE_GROUP_NOT_FOUND_ERROR_MESSAGE = 'ResourceGroupNotFound' _POLL_INTERVAL = 1 +# TODO(Doyoung): _LEGACY_NSG_NAME can be remove this after 0.8.0 to ignore +# legacy nsg names. +_LEGACY_NSG_NAME = 'ray-{cluster_name_on_cloud}-nsg' +_SECOND_LEGACY_NSG_NAME = 'sky-{cluster_name_on_cloud}-nsg' class AzureInstanceStatus(enum.Enum): @@ -795,6 +802,32 @@ def _fetch_and_map_status(node, resource_group: str) -> None: return statuses +# TODO(Doyoung): _get_cluster_nsg can be remove this after 0.8.0 to ignore +# legacy nsg names. +def _get_cluster_nsg(network_client: Client, resource_group: str, + cluster_name_on_cloud: str) -> NetworkSecurityGroup: + """Retrieve the NSG associated with the given name of the cluster.""" + list_network_security_groups = _get_azure_sdk_function( + client=network_client.network_security_groups, function_name='list') + legacy_nsg_name = _LEGACY_NSG_NAME.format( + cluster_name_on_cloud=cluster_name_on_cloud) + second_legacy_nsg_name = _SECOND_LEGACY_NSG_NAME.format( + cluster_name_on_cloud=cluster_name_on_cloud) + _, nsg_name = config_lib.get_cluster_id_and_nsg_name( + resource_group=resource_group, + cluster_name_on_cloud=cluster_name_on_cloud) + possible_nsg_names = [nsg_name, legacy_nsg_name, second_legacy_nsg_name] + for nsg in list_network_security_groups(resource_group): + if nsg.name in possible_nsg_names: + return nsg + + # Raise an error if no matching NSG is found + raise ValueError('Failed to find a matching NSG for cluster ' + f'{cluster_name_on_cloud!r} in resource group ' + f'{resource_group!r}. Expected NSG names were: ' + f'{possible_nsg_names}.') + + def open_ports( cluster_name_on_cloud: str, ports: List[str], @@ -809,58 +842,66 @@ def open_ports( update_network_security_groups = _get_azure_sdk_function( client=network_client.network_security_groups, function_name='create_or_update') - list_network_security_groups = _get_azure_sdk_function( - client=network_client.network_security_groups, function_name='list') - for nsg in list_network_security_groups(resource_group): - try: - # Wait the NSG creation to be finished before opening a port. The - # cluster provisioning triggers the NSG creation, but it may not be - # finished yet. - backoff = common_utils.Backoff(max_backoff_factor=1) - start_time = time.time() - while True: - if nsg.provisioning_state not in ['Creating', 'Updating']: - break - if time.time() - start_time > _WAIT_CREATION_TIMEOUT_SECONDS: - logger.warning( - f'Fails to wait for the creation of NSG {nsg.name} in ' - f'{resource_group} within ' - f'{_WAIT_CREATION_TIMEOUT_SECONDS} seconds. ' - 'Skip this NSG.') - backoff_time = backoff.current_backoff() - logger.info(f'NSG {nsg.name} is not created yet. Waiting for ' - f'{backoff_time} seconds before checking again.') - time.sleep(backoff_time) - - # Azure NSG rules have a priority field that determines the order - # in which they are applied. The priority must be unique across - # all inbound rules in one NSG. - priority = max(rule.priority - for rule in nsg.security_rules - if rule.direction == 'Inbound') + 1 - nsg.security_rules.append( - azure.create_security_rule( - name=f'sky-ports-{cluster_name_on_cloud}-{priority}', - priority=priority, - protocol='Tcp', - access='Allow', - direction='Inbound', - source_address_prefix='*', - source_port_range='*', - destination_address_prefix='*', - destination_port_ranges=ports, - )) - poller = update_network_security_groups(resource_group, nsg.name, - nsg) - poller.wait() - if poller.status() != 'Succeeded': + + try: + # Wait for the NSG creation to be finished before opening a port. The + # cluster provisioning triggers the NSG creation, but it may not be + # finished yet. + backoff = common_utils.Backoff(max_backoff_factor=1) + start_time = time.time() + while True: + nsg = _get_cluster_nsg(network_client, resource_group, + cluster_name_on_cloud) + if nsg.provisioning_state not in ['Creating', 'Updating']: + break + if time.time() - start_time > _WAIT_CREATION_TIMEOUT_SECONDS: with ux_utils.print_exception_no_traceback(): - raise ValueError(f'Failed to open ports {ports} in NSG ' - f'{nsg.name}: {poller.status()}') - except azure.exceptions().HttpResponseError as e: + raise TimeoutError( + f'Timed out while waiting for the Network ' + f'Security Group {nsg.name!r} to be ready for ' + f'cluster {cluster_name_on_cloud!r} in ' + f'resource group {resource_group!r}. The NSG ' + f'did not reach a stable state ' + '(Creating/Updating) within the allocated ' + f'{_WAIT_CREATION_TIMEOUT_SECONDS} seconds. ' + 'Consequently, the operation to open ports ' + f'{ports} failed.') + + backoff_time = backoff.current_backoff() + logger.info(f'NSG {nsg.name} is not created yet. Waiting for ' + f'{backoff_time} seconds before checking again.') + time.sleep(backoff_time) + + # Azure NSG rules have a priority field that determines the order + # in which they are applied. The priority must be unique across + # all inbound rules in one NSG. + priority = max(rule.priority + for rule in nsg.security_rules + if rule.direction == 'Inbound') + 1 + nsg.security_rules.append( + azure.create_security_rule( + name=f'sky-ports-{cluster_name_on_cloud}-{priority}', + priority=priority, + protocol='Tcp', + access='Allow', + direction='Inbound', + source_address_prefix='*', + source_port_range='*', + destination_address_prefix='*', + destination_port_ranges=ports, + )) + poller = update_network_security_groups(resource_group, nsg.name, nsg) + poller.wait() + if poller.status() != 'Succeeded': with ux_utils.print_exception_no_traceback(): - raise ValueError( - f'Failed to open ports {ports} in NSG {nsg.name}.') from e + raise ValueError(f'Failed to open ports {ports} in NSG ' + f'{nsg.name}: {poller.status()}') + + except azure.exceptions().HttpResponseError as e: + with ux_utils.print_exception_no_traceback(): + raise ValueError(f'Failed to open ports {ports} in NSG for cluster ' + f'{cluster_name_on_cloud!r} within resource group ' + f'{resource_group!r}.') from e def cleanup_ports( From ee708e7aad508fab1b48b1f6f297ebe6fb5dbe63 Mon Sep 17 00:00:00 2001 From: Yika Date: Fri, 25 Oct 2024 09:51:45 -0700 Subject: [PATCH 07/17] [Performance] Use new Azure custom images (#4167) * use sky images for azure * auto refresh local images.csv * format --- sky/clouds/azure.py | 6 +++--- sky/clouds/service_catalog/azure_catalog.py | 16 +++++++++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index d91f589ca8f..0852c993ed3 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -39,9 +39,9 @@ _DEFAULT_AZURE_UBUNTU_2004_IMAGE_GB = 150 _DEFAULT_SKYPILOT_IMAGE_GB = 30 -_DEFAULT_CPU_IMAGE_ID = 'skypilot:gpu-ubuntu-2204' -_DEFAULT_GPU_IMAGE_ID = 'skypilot:gpu-ubuntu-2204' -_DEFAULT_V1_IMAGE_ID = 'skypilot:v1-ubuntu-2004' +_DEFAULT_CPU_IMAGE_ID = 'skypilot:custom-cpu-ubuntu-v2' +_DEFAULT_GPU_IMAGE_ID = 'skypilot:custom-gpu-ubuntu-v2' +_DEFAULT_V1_IMAGE_ID = 'skypilot:custom-gpu-ubuntu-v1' _DEFAULT_GPU_K80_IMAGE_ID = 'skypilot:k80-ubuntu-2004' _FALLBACK_IMAGE_ID = 'skypilot:gpu-ubuntu-2204' diff --git a/sky/clouds/service_catalog/azure_catalog.py b/sky/clouds/service_catalog/azure_catalog.py index c71285fe9a3..867141f7899 100644 --- a/sky/clouds/service_catalog/azure_catalog.py +++ b/sky/clouds/service_catalog/azure_catalog.py @@ -7,11 +7,14 @@ from typing import Dict, List, Optional, Tuple from sky import clouds as cloud_lib +from sky import sky_logging from sky.clouds import Azure from sky.clouds.service_catalog import common from sky.utils import resources_utils from sky.utils import ux_utils +logger = sky_logging.init_logger(__name__) + # This list should match the list of regions in # skypilot image generation Packer script's replication_regions # sky/clouds/service_catalog/images/skypilot-azure-cpu-ubuntu.pkr.hcl @@ -191,9 +194,16 @@ def list_accelerators( def get_image_id_from_tag(tag: str, region: Optional[str]) -> Optional[str]: """Returns the image id from the tag.""" - # Azure images are not region-specific. - del region # Unused. - return common.get_image_id_from_tag_impl(_image_df, tag, None) + global _image_df + image_id = common.get_image_id_from_tag_impl(_image_df, tag, region) + if image_id is None: + # Refresh the image catalog and try again, if the image tag is not + # found. + logger.debug('Refreshing the image catalog and trying again.') + _image_df = common.read_catalog('azure/images.csv', + pull_frequency_hours=0) + image_id = common.get_image_id_from_tag_impl(_image_df, tag, region) + return image_id def is_image_tag_valid(tag: str, region: Optional[str]) -> bool: From b8a9a5716e746e88e65af397d8a2d84522445cb3 Mon Sep 17 00:00:00 2001 From: Yika Date: Fri, 25 Oct 2024 14:08:06 -0700 Subject: [PATCH 08/17] Fix OCI import issue (#4178) * Fix OCI import issue * Update sky/clouds/oci.py Co-authored-by: Zhanghao Wu * edit comments --------- Co-authored-by: Zhanghao Wu --- sky/clouds/oci.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sky/clouds/oci.py b/sky/clouds/oci.py index c6451a73a1f..0feda467bbf 100644 --- a/sky/clouds/oci.py +++ b/sky/clouds/oci.py @@ -468,7 +468,11 @@ def get_credential_file_mounts(self) -> Dict[str, str]: api_key_file = oci_cfg[ 'key_file'] if 'key_file' in oci_cfg else 'BadConf' sky_cfg_file = oci_utils.oci_config.get_sky_user_config_file() - except (ImportError, oci_adaptor.oci.exceptions.ConfigFileNotFound): + # Must catch ImportError before any oci_adaptor.oci.exceptions + # because oci_adaptor.oci.exceptions can throw ImportError. + except ImportError: + return {} + except oci_adaptor.oci.exceptions.ConfigFileNotFound: return {} # OCI config and API key file are mandatory From bbf14d5a52b21387cb2bed086a96f7363bc5750b Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Fri, 25 Oct 2024 15:08:27 -0700 Subject: [PATCH 09/17] [k8s] Add retry for apparmor failures (#4176) * Add retry for apparmor failures * add comment --- sky/provision/kubernetes/instance.py | 68 +++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/sky/provision/kubernetes/instance.py b/sky/provision/kubernetes/instance.py index 6ce7b74d18e..26ed5f51a43 100644 --- a/sky/provision/kubernetes/instance.py +++ b/sky/provision/kubernetes/instance.py @@ -1,5 +1,6 @@ """Kubernetes instance provisioning.""" import copy +import json import time from typing import Any, Dict, List, Optional import uuid @@ -425,6 +426,70 @@ def _label_pod(namespace: str, context: Optional[str], pod_name: str, _request_timeout=kubernetes.API_TIMEOUT) +def _create_namespaced_pod_with_retries(namespace: str, pod_spec: dict, + context: Optional[str]) -> Any: + """Attempts to create a Kubernetes Pod and handle any errors. + + Currently, we handle errors due to the AppArmor annotation and retry if + it fails due to the `FieldValueForbidden` error. + See https://github.com/skypilot-org/skypilot/issues/4174 for details. + + Returns: The created Pod object. + """ + try: + # Attempt to create the Pod with the AppArmor annotation + pod = kubernetes.core_api(context).create_namespaced_pod( + namespace, pod_spec) + return pod + except kubernetes.api_exception() as e: + try: + error_body = json.loads(e.body) + error_message = error_body.get('message', '') + except json.JSONDecodeError: + error_message = str(e.body) + # Check if the error is due to the AppArmor annotation and retry. + # We add an AppArmor annotation to set it as unconfined in our + # base template in kubernetes-ray.yml.j2. This is required for + # FUSE to work in the pod on most Kubernetes distributions. + # However, some distributions do not support the AppArmor annotation + # and will fail to create the pod. In this case, we retry without + # the annotation. + if (e.status == 422 and 'FieldValueForbidden' in error_message and + 'AppArmorProfile: nil' in error_message): + logger.warning('AppArmor annotation caused pod creation to fail. ' + 'Retrying without the annotation. ' + 'Note: this may cause bucket mounting to fail.') + + # Remove the AppArmor annotation + annotations = pod_spec.get('metadata', {}).get('annotations', {}) + if ('container.apparmor.security.beta.kubernetes.io/ray-node' + in annotations): + del annotations[ + 'container.apparmor.security.beta.kubernetes.io/ray-node'] + pod_spec['metadata']['annotations'] = annotations + logger.info('AppArmor annotation removed from Pod spec.') + else: + logger.warning('AppArmor annotation not found in pod spec, ' + 'retrying will not help. ' + f'Current annotations: {annotations}') + raise e + + # Retry Pod creation without the AppArmor annotation + try: + pod = kubernetes.core_api(context).create_namespaced_pod( + namespace, pod_spec) + logger.info(f'Pod {pod.metadata.name} created successfully ' + 'without AppArmor annotation.') + return pod + except kubernetes.api_exception() as retry_exception: + logger.info('Failed to create Pod without AppArmor annotation: ' + f'{retry_exception}') + raise retry_exception + else: + # Re-raise the exception if it's a different error + raise e + + def _create_pods(region: str, cluster_name_on_cloud: str, config: common.ProvisionConfig) -> common.ProvisionRecord: """Create pods based on the config.""" @@ -546,8 +611,7 @@ def _create_pods(region: str, cluster_name_on_cloud: str, } } - pod = kubernetes.core_api(context).create_namespaced_pod( - namespace, pod_spec) + pod = _create_namespaced_pod_with_retries(namespace, pod_spec, context) created_pods[pod.metadata.name] = pod if head_pod_name is None: head_pod_name = pod.metadata.name From c8ceaf6d7af98def987c98d41fcb502341cf1ed4 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Fri, 25 Oct 2024 16:51:33 -0700 Subject: [PATCH 10/17] [Docs] Update Managed Jobs page. (#4177) * [Docs] Update Managed Jobs page. * Lint * Updates --- docs/source/examples/managed-jobs.rst | 89 +++++++++++++++------------ 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/docs/source/examples/managed-jobs.rst b/docs/source/examples/managed-jobs.rst index a47b4345b9f..d85356c936a 100644 --- a/docs/source/examples/managed-jobs.rst +++ b/docs/source/examples/managed-jobs.rst @@ -5,14 +5,20 @@ Managed Jobs .. tip:: - This feature is great for scaling out: running a single job for long durations, or running many jobs (pipelines). + This feature is great for scaling out: running a single job for long durations, or running many jobs in parallel. -SkyPilot supports **managed jobs** (:code:`sky jobs`), which can automatically recover from any spot preemptions or hardware failures. -It can be used in three modes: +SkyPilot supports **managed jobs** (:code:`sky jobs`), where "managed" means +if a job's underlying compute experienced any spot preemptions or hardware failures, +SkyPilot will automatically recover the job. -#. :ref:`Managed Spot Jobs `: Jobs run on auto-recovering spot instances. This can **save significant costs** (e.g., up to 70\% for GPU VMs) by making preemptible spot instances useful for long-running jobs. -#. :ref:`On-demand `: Jobs run on auto-recovering on-demand instances. This is useful for jobs that require guaranteed resources. -#. :ref:`Pipelines `: Run pipelines that contain multiple tasks (which can have different resource requirements and ``setup``/``run`` commands). This is useful for running a sequence of tasks that depend on each other, e.g., data processing, training a model, and then running inference on it. +Managed jobs can be used in three modes: + +#. :ref:`Managed spot jobs `: Jobs run on auto-recovering spot instances. This **saves significant costs** (e.g., ~70\% for GPU VMs) by making preemptible spot instances useful for long-running jobs. +#. :ref:`Managed on-demand/reserved jobs `: Jobs run on auto-recovering on-demand or reserved instances. Useful for jobs that require guaranteed resources. +#. :ref:`Managed pipelines `: Run pipelines that contain multiple tasks (which + can have different resource requirements and ``setup``/``run`` commands). + Useful for running a sequence of tasks that depend on each other, e.g., data + processing, training a model, and then running inference on it. .. _spot-jobs: @@ -20,28 +26,12 @@ It can be used in three modes: Managed Spot Jobs ----------------- -In this mode, :code:`sky jobs launch --use-spot` is used to launch a managed spot job. SkyPilot automatically finds available spot resources across regions and clouds to maximize availability. -Any spot preemptions are automatically handled by SkyPilot without user intervention. - +In this mode, jobs run on spot instances, and preemptions are auto-recovered by SkyPilot. -Quick comparison between *unmanaged spot clusters* vs. *managed spot jobs*: - -.. list-table:: - :widths: 30 18 12 35 - :header-rows: 1 +To launch a managed spot job, use :code:`sky jobs launch --use-spot`. +SkyPilot automatically finds available spot instances across regions and clouds to maximize availability. +Any spot preemptions are automatically handled by SkyPilot without user intervention. - * - Command - - Managed? - - SSH-able? - - Best for - * - :code:`sky launch --use-spot` - - Unmanaged spot cluster - - Yes - - Interactive dev on spot instances (especially for hardware with low preemption rates) - * - :code:`sky jobs launch --use-spot` - - Managed spot job (auto-recovery) - - No - - Scaling out long-running jobs (e.g., data processing, training, batch inference) Here is an example of a BERT training job failing over different regions across AWS and GCP. @@ -59,6 +49,25 @@ To use managed spot jobs, there are two requirements: #. :ref:`Checkpointing ` (optional): For job recovery due to preemptions, the user application code can checkpoint its progress periodically to a :ref:`mounted cloud bucket `. The program can reload the latest checkpoint when restarted. +Quick comparison between *managed spot jobs* vs. *launching spot clusters*: + +.. list-table:: + :widths: 30 18 12 35 + :header-rows: 1 + + * - Command + - Managed? + - SSH-able? + - Best for + * - :code:`sky jobs launch --use-spot` + - Yes, preemptions are auto-recovered + - No + - Scaling out long-running jobs (e.g., data processing, training, batch inference) + * - :code:`sky launch --use-spot` + - No, preemptions are not handled + - Yes + - Interactive dev on spot instances (especially for hardware with low preemption rates) + .. _job-yaml: Job YAML @@ -245,11 +254,11 @@ Real-World Examples .. _on-demand: -Using On-Demand Instances --------------------------------- +Managed On-Demand/Reserved Jobs +------------------------------- The same ``sky jobs launch`` and YAML interfaces can run jobs on auto-recovering -on-demand instances. This is useful to have SkyPilot monitor any underlying +on-demand or reserved instances. This is useful to have SkyPilot monitor any underlying machine failures and transparently recover the job. To do so, simply set :code:`use_spot: false` in the :code:`resources` section, or override it with :code:`--use-spot false` in the CLI. @@ -264,10 +273,10 @@ To do so, simply set :code:`use_spot: false` in the :code:`resources` section, o interface, while ``sky launch`` is a cluster interface (that you can launch tasks on, albeit not managed). -Either Spot Or On-Demand -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Either Spot or On-Demand/Reserved +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -You can use ``any_of`` to specify either spot or on-demand instances as +You can use ``any_of`` to specify either spot or on-demand/reserved instances as candidate resources for a job. See documentation :ref:`here ` for more details. @@ -280,12 +289,17 @@ candidate resources for a job. See documentation :ref:`here - use_spot: false In this example, SkyPilot will perform cost optimizations to select the resource to use, which almost certainly -will be spot instances. If spot instances are not available, SkyPilot will fall back to launch on-demand instances. +will be spot instances. If spot instances are not available, SkyPilot will fall back to launch on-demand/reserved instances. More advanced policies for resource selection, such as the `Can't Be Late `__ (NSDI'24) paper, may be supported in the future. +Running Many Parallel Jobs +-------------------------- + +For batch jobs such as **data processing** or **hyperparameter sweeps**, you can launch many jobs in parallel. See :ref:`many-jobs`. + Useful CLIs ----------- @@ -323,11 +337,10 @@ Cancel a managed job: If any failure happens for a managed job, you can check :code:`sky jobs queue -a` for the brief reason of the failure. For more details, it would be helpful to check :code:`sky jobs logs --controller `. - .. _pipeline: -Job Pipelines -------------- +Managed Pipelines +----------------- A pipeline is a managed job that contains a sequence of tasks running one after another. @@ -414,8 +427,8 @@ To submit the pipeline, the same command :code:`sky jobs launch` is used. The pi -Dashboard ---------- +Job Dashboard +------------- Use ``sky jobs dashboard`` to open a dashboard to see all jobs: From df80daedea2eefd52c2e681b8da370305fcda262 Mon Sep 17 00:00:00 2001 From: Zongheng Yang Date: Fri, 25 Oct 2024 17:32:25 -0700 Subject: [PATCH 11/17] Minor: Jobs docs fix. (#4183) * [Docs] Update Managed Jobs page. * Lint * Updates * reword --- docs/source/examples/managed-jobs.rst | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/source/examples/managed-jobs.rst b/docs/source/examples/managed-jobs.rst index d85356c936a..993ad361d66 100644 --- a/docs/source/examples/managed-jobs.rst +++ b/docs/source/examples/managed-jobs.rst @@ -7,10 +7,7 @@ Managed Jobs This feature is great for scaling out: running a single job for long durations, or running many jobs in parallel. -SkyPilot supports **managed jobs** (:code:`sky jobs`), where "managed" means -if a job's underlying compute experienced any spot preemptions or hardware failures, -SkyPilot will automatically recover the job. - +SkyPilot supports **managed jobs** (:code:`sky jobs`), which can automatically recover from any underlying spot preemptions or hardware failures. Managed jobs can be used in three modes: #. :ref:`Managed spot jobs `: Jobs run on auto-recovering spot instances. This **saves significant costs** (e.g., ~70\% for GPU VMs) by making preemptible spot instances useful for long-running jobs. From 0e915d3430d8027aa40b766605bb13c889ffc62f Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Fri, 25 Oct 2024 18:54:11 -0700 Subject: [PATCH 12/17] [UX] remove all uses of deprecated `sky jobs` (#4173) * [UX] remove all uses of deprecated `sky jobs` * Apply suggestions from code review Co-authored-by: Romil Bhardwaj * fix other mentions of "spot jobs" --------- Co-authored-by: Romil Bhardwaj --- docs/source/examples/managed-jobs.rst | 2 +- docs/source/reference/faq.rst | 2 +- examples/managed_job_with_storage.yaml | 2 +- llm/axolotl/axolotl-spot.yaml | 2 +- llm/axolotl/readme.md | 2 +- llm/falcon/README.md | 12 ++++++------ llm/vicuna-llama-2/README.md | 6 +++--- llm/vicuna/README.md | 4 ++-- sky/cli.py | 2 +- sky/jobs/controller.py | 2 +- tests/backward_compatibility_tests.sh | 4 ++-- 11 files changed, 20 insertions(+), 20 deletions(-) diff --git a/docs/source/examples/managed-jobs.rst b/docs/source/examples/managed-jobs.rst index 993ad361d66..8e329adaa81 100644 --- a/docs/source/examples/managed-jobs.rst +++ b/docs/source/examples/managed-jobs.rst @@ -99,7 +99,7 @@ We can launch it with the following: setup: | # Fill in your wandb key: copy from https://wandb.ai/authorize # Alternatively, you can use `--env WANDB_API_KEY=$WANDB_API_KEY` - # to pass the key in the command line, during `sky spot launch`. + # to pass the key in the command line, during `sky jobs launch`. echo export WANDB_API_KEY=[YOUR-WANDB-API-KEY] >> ~/.bashrc pip install -e . diff --git a/docs/source/reference/faq.rst b/docs/source/reference/faq.rst index 5a966a0014f..1ade656b44b 100644 --- a/docs/source/reference/faq.rst +++ b/docs/source/reference/faq.rst @@ -38,7 +38,7 @@ How to ensure my workdir's ``.git`` is synced up for managed spot jobs? Currently, there is a difference in whether ``.git`` is synced up depending on the command used: - For regular ``sky launch``, the workdir's ``.git`` is synced up by default. -- For managed spot jobs ``sky spot launch``, the workdir's ``.git`` is excluded by default. +- For managed jobs ``sky jobs launch``, the workdir's ``.git`` is excluded by default. In the second case, to ensure the workdir's ``.git`` is synced up for managed spot jobs, you can explicitly add a file mount to sync it up: diff --git a/examples/managed_job_with_storage.yaml b/examples/managed_job_with_storage.yaml index 61244c16ba0..677e2c8ed6d 100644 --- a/examples/managed_job_with_storage.yaml +++ b/examples/managed_job_with_storage.yaml @@ -3,7 +3,7 @@ # Runs a task that uses cloud buckets for uploading and accessing files. # # Usage: -# sky spot launch -c spot-storage examples/managed_job_with_storage.yaml +# sky jobs launch -c spot-storage examples/managed_job_with_storage.yaml # sky down spot-storage resources: diff --git a/llm/axolotl/axolotl-spot.yaml b/llm/axolotl/axolotl-spot.yaml index b22a8ae3fce..0e04ba11992 100644 --- a/llm/axolotl/axolotl-spot.yaml +++ b/llm/axolotl/axolotl-spot.yaml @@ -4,7 +4,7 @@ # HF_TOKEN=abc BUCKET= sky launch -c axolotl-spot axolotl-spot.yaml --env HF_TOKEN --env BUCKET -i30 --down # # Managed spot (auto-recovery; for full runs): -# HF_TOKEN=abc BUCKET= sky spot launch -n axolotl-spot axolotl-spot.yaml --env HF_TOKEN --env BUCKET +# HF_TOKEN=abc BUCKET= sky jobs launch -n axolotl-spot axolotl-spot.yaml --env HF_TOKEN --env BUCKET name: axolotl diff --git a/llm/axolotl/readme.md b/llm/axolotl/readme.md index 0cc06b98723..eb80231aa93 100644 --- a/llm/axolotl/readme.md +++ b/llm/axolotl/readme.md @@ -22,5 +22,5 @@ ssh -L 8888:localhost:8888 axolotl-spot Launch managed spot instances (auto-recovery; for full runs): ``` -HF_TOKEN=abc BUCKET= sky spot launch -n axolotl-spot axolotl-spot.yaml --env HF_TOKEN --env BUCKET +HF_TOKEN=abc BUCKET= sky jobs launch -n axolotl-spot axolotl-spot.yaml --env HF_TOKEN --env BUCKET ``` diff --git a/llm/falcon/README.md b/llm/falcon/README.md index 6eb480d9ea8..1f40dc9f524 100644 --- a/llm/falcon/README.md +++ b/llm/falcon/README.md @@ -1,6 +1,6 @@ # Finetuning Falcon with SkyPilot -This README contains instructions on how to use SkyPilot to finetune Falcon-7B and Falcon-40B, an open-source LLM that rivals many current closed-source models, including ChatGPT. +This README contains instructions on how to use SkyPilot to finetune Falcon-7B and Falcon-40B, an open-source LLM that rivals many current closed-source models, including ChatGPT. * [Blog post](https://huggingface.co/blog/falcon) * [Repo](https://huggingface.co/tiiuae/falcon-40b) @@ -16,10 +16,10 @@ sky check See the Falcon SkyPilot YAML for [training](train.yaml). Serving is currently a work in progress and a YAML will be provided for that soon! We are also working on adding an evaluation step to evaluate the model you finetuned compared to the base model. ## Running Falcon on SkyPilot -Finetuning `Falcon-7B` and `Falcon-40B` require GPUs with 80GB memory, +Finetuning `Falcon-7B` and `Falcon-40B` require GPUs with 80GB memory, but `Falcon-7b-sharded` requires only 40GB memory. Thus, * If your GPU has 40 GB memory or less (e.g., Nvidia A100): use `ybelkada/falcon-7b-sharded-bf16`. -* If your GPU has 80 GB memory (e.g., Nvidia A100-80GB): you can also use `tiiuae/falcon-7b` and `tiiuae/falcon-40b`. +* If your GPU has 80 GB memory (e.g., Nvidia A100-80GB): you can also use `tiiuae/falcon-7b` and `tiiuae/falcon-40b`. Try `sky show-gpus --all` for supported GPUs. @@ -32,13 +32,13 @@ Steps for training on your cloud(s): 1. In [train.yaml](train.yaml), set the following variables in `envs`: - Replace the `OUTPUT_BUCKET_NAME` with a unique name. SkyPilot will create this bucket for you to store the model weights. - - Replace the `WANDB_API_KEY` to your own key. - - Replace the `MODEL_NAME` with your desired base model. + - Replace the `WANDB_API_KEY` to your own key. + - Replace the `MODEL_NAME` with your desired base model. 2. **Training the Falcon model using spot instances**: ```bash -sky spot launch -n falcon falcon.yaml +sky jobs launch --use-spot -n falcon falcon.yaml ``` Currently, such `A100-80GB:1` spot instances are only available on AWS and GCP. diff --git a/llm/vicuna-llama-2/README.md b/llm/vicuna-llama-2/README.md index 24caa525a56..e392b231e64 100644 --- a/llm/vicuna-llama-2/README.md +++ b/llm/vicuna-llama-2/README.md @@ -120,12 +120,12 @@ sky launch --no-use-spot ... ### Reducing costs by 3x with spot instances -[SkyPilot Managed Spot](https://skypilot.readthedocs.io/en/latest/examples/spot-jobs.html) is a library built on top of SkyPilot that helps users run jobs on spot instances without worrying about interruptions. That is the tool used by the LMSYS organization to train the first version of Vicuna (more details can be found in their [launch blog post](https://lmsys.org/blog/2023-03-30-vicuna/) and [example](https://github.com/skypilot-org/skypilot/tree/master/llm/vicuna)). With this, the training cost can be reduced from $1000 to **\$300**. +[SkyPilot Managed Jobs](https://skypilot.readthedocs.io/en/latest/examples/managed-jobs.html) is a library built on top of SkyPilot that helps users run jobs on spot instances without worrying about interruptions. That is the tool used by the LMSYS organization to train the first version of Vicuna (more details can be found in their [launch blog post](https://lmsys.org/blog/2023-03-30-vicuna/) and [example](https://github.com/skypilot-org/skypilot/tree/master/llm/vicuna)). With this, the training cost can be reduced from $1000 to **\$300**. -To use SkyPilot Managed Spot, you can simply replace `sky launch` with `sky spot launch` in the above command: +To use SkyPilot Managed Spot Jobs, you can simply replace `sky launch` with `sky jobs launch` in the above command: ```bash -sky spot launch -n vicuna train.yaml \ +sky jobs launch -n vicuna train.yaml \ --env ARTIFACT_BUCKET_NAME= \ --env WANDB_API_KEY= ``` diff --git a/llm/vicuna/README.md b/llm/vicuna/README.md index b511eb7f4b0..6d9f46127d4 100644 --- a/llm/vicuna/README.md +++ b/llm/vicuna/README.md @@ -63,14 +63,14 @@ Steps for training on your cloud(s): 2. **Training the Vicuna-7B model on 8 A100 GPUs (80GB memory) using spot instances**: ```bash # Launch it on managed spot to save 3x cost -sky spot launch -n vicuna train.yaml +sky jobs launch -n vicuna train.yaml ``` Note: if you would like to see the training curve on W&B, you can add `--env WANDB_API_KEY` to the above command, which will propagate your local W&B API key in the environment variable to the job. [Optional] Train a larger 13B model ``` # Train a 13B model instead of the default 7B -sky spot launch -n vicuna-7b train.yaml --env MODEL_SIZE=13 +sky jobs launch -n vicuna-7b train.yaml --env MODEL_SIZE=13 # Use *unmanaged* spot instances (i.e., preemptions won't get auto-recovered). # Unmanaged spot provides a better interactive development experience but is vulnerable to spot preemptions. diff --git a/sky/cli.py b/sky/cli.py index 6e0587cc117..db1befb04a3 100644 --- a/sky/cli.py +++ b/sky/cli.py @@ -3519,7 +3519,7 @@ def jobs(): default=None, type=str, hidden=True, - help=('Alias for --name, the name of the spot job.')) + help=('Alias for --name, the name of the managed job.')) @click.option('--job-recovery', default=None, type=str, diff --git a/sky/jobs/controller.py b/sky/jobs/controller.py index f3cd81576e2..1faa5dfbe31 100644 --- a/sky/jobs/controller.py +++ b/sky/jobs/controller.py @@ -215,7 +215,7 @@ def _run_one_task(self, task_id: int, task: 'sky.Task') -> bool: end_time=end_time, callback_func=callback_func) logger.info( - f'Spot job {self._job_id} (task: {task_id}) SUCCEEDED. ' + f'Managed job {self._job_id} (task: {task_id}) SUCCEEDED. ' f'Cleaning up the cluster {cluster_name}.') # Only clean up the cluster, not the storages, because tasks may # share storages. diff --git a/tests/backward_compatibility_tests.sh b/tests/backward_compatibility_tests.sh index 4f83c379ccf..276fda899dd 100644 --- a/tests/backward_compatibility_tests.sh +++ b/tests/backward_compatibility_tests.sh @@ -167,8 +167,8 @@ MANAGED_JOB_JOB_NAME=${CLUSTER_NAME}-${uuid:0:4} if [ "$start_from" -le 7 ]; then conda activate sky-back-compat-master rm -r ~/.sky/wheels || true -sky spot launch -d --cloud ${CLOUD} -y --cpus 2 --num-nodes 2 -n ${MANAGED_JOB_JOB_NAME}-7-0 "echo hi; sleep 1000" -sky spot launch -d --cloud ${CLOUD} -y --cpus 2 --num-nodes 2 -n ${MANAGED_JOB_JOB_NAME}-7-1 "echo hi; sleep 400" +sky jobs launch -d --cloud ${CLOUD} -y --cpus 2 --num-nodes 2 -n ${MANAGED_JOB_JOB_NAME}-7-0 "echo hi; sleep 1000" +sky jobs launch -d --cloud ${CLOUD} -y --cpus 2 --num-nodes 2 -n ${MANAGED_JOB_JOB_NAME}-7-1 "echo hi; sleep 400" conda activate sky-back-compat-current rm -r ~/.sky/wheels || true s=$(sky jobs queue | grep ${MANAGED_JOB_JOB_NAME}-7 | grep "RUNNING" | wc -l) From 647fcea335dec9f180421342d6c41cd67a3c8674 Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Sat, 26 Oct 2024 13:34:01 -0700 Subject: [PATCH 13/17] [Azure] Support fractional A10 instance types (#3877) * fix * change catalog to float gpu num * support print float point gpu in sky launch. TODO: test if the ray deployment group works for fractional one * fix unittest * format * patch ray resources to ceil value * support launch from --gpus A10 * only allow strictly match fractional gpu counts * address comment * change back condition * fix * apply suggestions from code review * fix * Update sky/backends/cloud_vm_ray_backend.py Co-authored-by: Zhanghao Wu * format * fix display of fuzzy candidates * fix precision issue * fix num gpu required * refactor in check_resources_fit_cluster * change type annotation of acc_count * enable fuzzy fp acc count * fix k8s * Update sky/clouds/service_catalog/common.py Co-authored-by: Zhanghao Wu * fix integer gpus * format --------- Co-authored-by: Zhanghao Wu --- sky/backends/cloud_vm_ray_backend.py | 15 +++++++++ sky/clouds/aws.py | 11 +++---- sky/clouds/azure.py | 10 +++--- sky/clouds/cloud.py | 18 +++++++---- sky/clouds/cudo.py | 11 +++---- sky/clouds/fluidstack.py | 11 +++---- sky/clouds/gcp.py | 4 +-- sky/clouds/ibm.py | 11 +++---- sky/clouds/kubernetes.py | 11 +++---- sky/clouds/lambda_cloud.py | 11 +++---- sky/clouds/oci.py | 11 +++---- sky/clouds/paperspace.py | 11 +++---- sky/clouds/runpod.py | 11 +++---- sky/clouds/scp.py | 11 +++---- sky/clouds/service_catalog/__init__.py | 2 +- sky/clouds/service_catalog/aws_catalog.py | 4 +-- sky/clouds/service_catalog/azure_catalog.py | 5 +-- sky/clouds/service_catalog/common.py | 21 ++++++++---- sky/clouds/service_catalog/cudo_catalog.py | 4 +-- .../data_fetchers/fetch_azure.py | 32 ++++++++++++------- .../service_catalog/fluidstack_catalog.py | 4 +-- sky/clouds/service_catalog/ibm_catalog.py | 4 +-- sky/clouds/service_catalog/lambda_catalog.py | 4 +-- sky/clouds/service_catalog/oci_catalog.py | 4 +-- .../service_catalog/paperspace_catalog.py | 4 +-- sky/clouds/service_catalog/runpod_catalog.py | 4 +-- sky/clouds/service_catalog/scp_catalog.py | 4 +-- sky/clouds/service_catalog/vsphere_catalog.py | 4 +-- sky/clouds/vsphere.py | 11 +++---- sky/resources.py | 2 +- sky/utils/resources_utils.py | 14 +++++++- 31 files changed, 150 insertions(+), 134 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index f0fb4d97ba1..918848b045b 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2713,6 +2713,21 @@ def check_resources_fit_cluster( f' Existing:\t{handle.launched_nodes}x ' f'{handle.launched_resources}\n' f'{mismatch_str}') + else: + # For fractional acc count clusters, we round up the number of accs + # to 1 (sky/utils/resources_utils.py::make_ray_custom_resources_str) + # Here we scale the required acc count to (required / launched) * 1 + # so the total number of accs is the same as the requested number. + launched_accs = launched_resources.accelerators + if (launched_accs is not None and + valid_resource.accelerators is not None): + for _, count in launched_accs.items(): + if isinstance(count, float) and not count.is_integer(): + valid_resource = valid_resource.copy( + accelerators={ + k: v / count + for k, v in valid_resource.accelerators.items() + }) return valid_resource def _provision( diff --git a/sky/clouds/aws.py b/sky/clouds/aws.py index a0962b17cac..43062ebf393 100644 --- a/sky/clouds/aws.py +++ b/sky/clouds/aws.py @@ -2,13 +2,12 @@ import enum import fnmatch import functools -import json import os import re import subprocess import time import typing -from typing import Any, Dict, Iterator, List, Optional, Set, Tuple +from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union from sky import clouds from sky import exceptions @@ -383,7 +382,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='aws') @@ -411,10 +410,8 @@ def make_deploy_resources_variables( r = resources # r.accelerators is cleared but .instance_type encodes the info. acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) if r.extract_docker_image() is not None: image_id_to_use = None diff --git a/sky/clouds/azure.py b/sky/clouds/azure.py index 0852c993ed3..fc9579d17c0 100644 --- a/sky/clouds/azure.py +++ b/sky/clouds/azure.py @@ -1,12 +1,11 @@ """Azure.""" import functools -import json import os import re import subprocess import textwrap import typing -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import colorama @@ -272,7 +271,7 @@ def zones_provision_loop( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='azure') @@ -304,10 +303,9 @@ def make_deploy_resources_variables( acc_dict = self.get_accelerators_from_instance_type(r.instance_type) acc_count = None if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) acc_count = str(sum(acc_dict.values())) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) if (resources.image_id is None or resources.extract_docker_image() is not None): diff --git a/sky/clouds/cloud.py b/sky/clouds/cloud.py index 3e21204f0a3..4028c1fef59 100644 --- a/sky/clouds/cloud.py +++ b/sky/clouds/cloud.py @@ -9,8 +9,9 @@ """ import collections import enum +import math import typing -from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple +from typing import Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union from sky import exceptions from sky import skypilot_config @@ -306,7 +307,7 @@ def get_vcpus_mem_from_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: """Returns {acc: acc_count} held by 'instance_type', if any.""" raise NotImplementedError @@ -673,8 +674,9 @@ def _check_instance_type_accelerators_combination( assert resources.is_launchable(), resources def _equal_accelerators( - acc_requested: Optional[Dict[str, int]], - acc_from_instance_type: Optional[Dict[str, int]]) -> bool: + acc_requested: Optional[Dict[str, Union[int, float]]], + acc_from_instance_type: Optional[Dict[str, Union[int, + float]]]) -> bool: """Check the requested accelerators equals to the instance type Check the requested accelerators equals to the accelerators @@ -689,12 +691,14 @@ def _equal_accelerators( for acc in acc_requested: if acc not in acc_from_instance_type: return False - if acc_requested[acc] != acc_from_instance_type[acc]: + # Avoid float point precision issue. + if not math.isclose(acc_requested[acc], + acc_from_instance_type[acc]): return False return True - acc_from_instance_type = (cls.get_accelerators_from_instance_type( - resources.instance_type)) + acc_from_instance_type = cls.get_accelerators_from_instance_type( + resources.instance_type) if not _equal_accelerators(resources.accelerators, acc_from_instance_type): with ux_utils.print_exception_no_traceback(): diff --git a/sky/clouds/cudo.py b/sky/clouds/cudo.py index 4dca442fa01..6f02e007049 100644 --- a/sky/clouds/cudo.py +++ b/sky/clouds/cudo.py @@ -1,8 +1,7 @@ """Cudo Compute""" -import json import subprocess import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union from sky import clouds from sky.clouds import service_catalog @@ -183,7 +182,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='cudo') @@ -202,10 +201,8 @@ def make_deploy_resources_variables( del zones, cluster_name # unused r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) return { 'instance_type': resources.instance_type, diff --git a/sky/clouds/fluidstack.py b/sky/clouds/fluidstack.py index 473fceabbe3..31e2112f8f7 100644 --- a/sky/clouds/fluidstack.py +++ b/sky/clouds/fluidstack.py @@ -1,8 +1,7 @@ """Fluidstack Cloud.""" -import json import os import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union import requests @@ -155,7 +154,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='fluidstack') @@ -184,10 +183,8 @@ def make_deploy_resources_variables( r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) return { 'instance_type': resources.instance_type, diff --git a/sky/clouds/gcp.py b/sky/clouds/gcp.py index 1b70abf914d..0e20fdc9789 100644 --- a/sky/clouds/gcp.py +++ b/sky/clouds/gcp.py @@ -7,7 +7,7 @@ import subprocess import time import typing -from typing import Any, Dict, Iterator, List, Optional, Set, Tuple +from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union import colorama @@ -669,7 +669,7 @@ def _get_feasible_launchable_resources( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: # GCP handles accelerators separately from regular instance types, # hence return none here. return None diff --git a/sky/clouds/ibm.py b/sky/clouds/ibm.py index b78cc4287c0..0ac3c36cc48 100644 --- a/sky/clouds/ibm.py +++ b/sky/clouds/ibm.py @@ -1,8 +1,7 @@ """IBM Web Services.""" -import json import os import typing -from typing import Any, Dict, Iterator, List, Optional, Tuple +from typing import Any, Dict, Iterator, List, Optional, Tuple, Union import colorama @@ -206,10 +205,8 @@ def _get_profile_resources(instance_profile): 'IBM does not currently support spot instances in this framework' acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) instance_resources = _get_profile_resources(r.instance_type) @@ -247,7 +244,7 @@ def get_vcpus_mem_from_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: """Returns {acc: acc_count} held by 'instance_type', if any.""" return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='ibm') diff --git a/sky/clouds/kubernetes.py b/sky/clouds/kubernetes.py index 8ff4172a5b1..39ddbe30577 100644 --- a/sky/clouds/kubernetes.py +++ b/sky/clouds/kubernetes.py @@ -1,10 +1,9 @@ """Kubernetes.""" import functools -import json import os import re import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union from sky import clouds from sky import sky_logging @@ -271,7 +270,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: inst = kubernetes_utils.KubernetesInstanceType.from_instance_type( instance_type) return { @@ -328,10 +327,8 @@ def make_deploy_resources_variables( r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) # resources.memory and cpus are None if they are not explicitly set. # We fetch the default values for the instance type in that case. diff --git a/sky/clouds/lambda_cloud.py b/sky/clouds/lambda_cloud.py index 0201f4f76ad..055a5338750 100644 --- a/sky/clouds/lambda_cloud.py +++ b/sky/clouds/lambda_cloud.py @@ -1,7 +1,6 @@ """Lambda Cloud.""" -import json import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union import requests @@ -136,7 +135,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='lambda') @@ -164,10 +163,8 @@ def make_deploy_resources_variables( r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) resources_vars = { 'instance_type': resources.instance_type, diff --git a/sky/clouds/oci.py b/sky/clouds/oci.py index 0feda467bbf..93a70c5ac37 100644 --- a/sky/clouds/oci.py +++ b/sky/clouds/oci.py @@ -20,11 +20,10 @@ - Hysun He (hysun.he@oracle.com) @ Oct 13, 2024: Support more OS types additional to ubuntu for OCI resources. """ -import json import logging import os import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union from sky import clouds from sky import exceptions @@ -193,7 +192,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='oci') @@ -213,10 +212,8 @@ def make_deploy_resources_variables( acc_dict = self.get_accelerators_from_instance_type( resources.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) image_str = self._get_image_id(resources.image_id, region.name, resources.instance_type) diff --git a/sky/clouds/paperspace.py b/sky/clouds/paperspace.py index 4c4fa1d695a..4047a2f5926 100644 --- a/sky/clouds/paperspace.py +++ b/sky/clouds/paperspace.py @@ -1,8 +1,7 @@ """ Paperspace Cloud. """ -import json import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union import requests @@ -162,7 +161,7 @@ def get_default_instance_type( @classmethod def get_accelerators_from_instance_type( - cls, instance_type: str) -> Optional[Dict[str, int]]: + cls, instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='paperspace') @@ -181,10 +180,8 @@ def make_deploy_resources_variables( r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) return { 'instance_type': resources.instance_type, diff --git a/sky/clouds/runpod.py b/sky/clouds/runpod.py index 6cfdf11c6b4..0d693fd9f60 100644 --- a/sky/clouds/runpod.py +++ b/sky/clouds/runpod.py @@ -1,8 +1,7 @@ """ RunPod Cloud. """ -import json import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union from sky import clouds from sky.clouds import service_catalog @@ -147,7 +146,7 @@ def get_default_instance_type( @classmethod def get_accelerators_from_instance_type( - cls, instance_type: str) -> Optional[Dict[str, int]]: + cls, instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='runpod') @@ -166,10 +165,8 @@ def make_deploy_resources_variables( r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) if r.image_id is None: image_id = 'runpod/base:0.0.2' diff --git a/sky/clouds/scp.py b/sky/clouds/scp.py index 17a54ce1607..d0ad611bf0c 100644 --- a/sky/clouds/scp.py +++ b/sky/clouds/scp.py @@ -4,9 +4,8 @@ to access the SCP catalog and check credentials for the SCP access. """ -import json import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union from sky import clouds from sky import exceptions @@ -160,7 +159,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds='scp') @@ -188,11 +187,9 @@ def make_deploy_resources_variables( r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None image_id = self._get_image_id(r.image_id, region.name, r.instance_type) return { 'instance_type': resources.instance_type, diff --git a/sky/clouds/service_catalog/__init__.py b/sky/clouds/service_catalog/__init__.py index f2301bac466..4deab8ac204 100644 --- a/sky/clouds/service_catalog/__init__.py +++ b/sky/clouds/service_catalog/__init__.py @@ -238,7 +238,7 @@ def get_default_instance_type(cpus: Optional[str] = None, def get_accelerators_from_instance_type( instance_type: str, - clouds: CloudFilter = None) -> Optional[Dict[str, int]]: + clouds: CloudFilter = None) -> Optional[Dict[str, Union[int, float]]]: """Returns the accelerators from a instance type.""" return _map_clouds_catalog(clouds, 'get_accelerators_from_instance_type', instance_type) diff --git a/sky/clouds/service_catalog/aws_catalog.py b/sky/clouds/service_catalog/aws_catalog.py index d156135047b..918a4070414 100644 --- a/sky/clouds/service_catalog/aws_catalog.py +++ b/sky/clouds/service_catalog/aws_catalog.py @@ -8,7 +8,7 @@ import os import threading import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky import exceptions from sky import sky_logging @@ -243,7 +243,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl( _get_df(), instance_type) diff --git a/sky/clouds/service_catalog/azure_catalog.py b/sky/clouds/service_catalog/azure_catalog.py index 867141f7899..62cb422bf83 100644 --- a/sky/clouds/service_catalog/azure_catalog.py +++ b/sky/clouds/service_catalog/azure_catalog.py @@ -4,7 +4,7 @@ instance types and pricing information for Azure. """ import re -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky import clouds as cloud_lib from sky import sky_logging @@ -137,7 +137,7 @@ def _filter_disk_type(instance_type: str) -> bool: def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) @@ -157,6 +157,7 @@ def get_instance_type_for_accelerator( if zone is not None: with ux_utils.print_exception_no_traceback(): raise ValueError('Azure does not support zones.') + return common.get_instance_type_for_accelerator_impl(df=_df, acc_name=acc_name, acc_count=acc_count, diff --git a/sky/clouds/service_catalog/common.py b/sky/clouds/service_catalog/common.py index 4df72824027..1082b4e9efd 100644 --- a/sky/clouds/service_catalog/common.py +++ b/sky/clouds/service_catalog/common.py @@ -5,7 +5,7 @@ import os import time import typing -from typing import Callable, Dict, List, NamedTuple, Optional, Tuple +from typing import Callable, Dict, List, NamedTuple, Optional, Tuple, Union import filelock import requests @@ -481,7 +481,7 @@ def get_instance_type_for_cpus_mem_impl( def get_accelerators_from_instance_type_impl( df: 'pd.DataFrame', instance_type: str, -) -> Optional[Dict[str, int]]: +) -> Optional[Dict[str, Union[int, float]]]: df = _get_instance_type(df, instance_type, None) if len(df) == 0: with ux_utils.print_exception_no_traceback(): @@ -490,13 +490,19 @@ def get_accelerators_from_instance_type_impl( acc_name, acc_count = row['AcceleratorName'], row['AcceleratorCount'] if pd.isnull(acc_name): return None - return {acc_name: int(acc_count)} + + def _convert(value): + if int(value) == value: + return int(value) + return float(value) + + return {acc_name: _convert(acc_count)} def get_instance_type_for_accelerator_impl( df: 'pd.DataFrame', acc_name: str, - acc_count: int, + acc_count: Union[int, float], cpus: Optional[str] = None, memory: Optional[str] = None, use_spot: bool = False, @@ -509,7 +515,7 @@ def get_instance_type_for_accelerator_impl( accelerators with sorted prices and a list of candidates with fuzzy search. """ result = df[(df['AcceleratorName'].str.fullmatch(acc_name, case=False)) & - (df['AcceleratorCount'] == acc_count)] + (abs(df['AcceleratorCount'] - acc_count) <= 0.01)] result = _filter_region_zone(result, region, zone) if len(result) == 0: fuzzy_result = df[ @@ -522,8 +528,11 @@ def get_instance_type_for_accelerator_impl( fuzzy_candidate_list = [] if len(fuzzy_result) > 0: for _, row in fuzzy_result.iterrows(): + acc_cnt = float(row['AcceleratorCount']) + acc_count_display = (int(acc_cnt) if acc_cnt.is_integer() else + f'{acc_cnt:.2f}') fuzzy_candidate_list.append(f'{row["AcceleratorName"]}:' - f'{int(row["AcceleratorCount"])}') + f'{acc_count_display}') return (None, fuzzy_candidate_list) result = _filter_with_cpus(result, cpus) diff --git a/sky/clouds/service_catalog/cudo_catalog.py b/sky/clouds/service_catalog/cudo_catalog.py index 62832cba5bf..d4adc5baea5 100644 --- a/sky/clouds/service_catalog/cudo_catalog.py +++ b/sky/clouds/service_catalog/cudo_catalog.py @@ -1,7 +1,7 @@ """Cudo Compute Offerings Catalog.""" import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.clouds.service_catalog import common import sky.provision.cudo.cudo_machine_type as cudo_mt @@ -66,7 +66,7 @@ def get_default_instance_type(cpus: Optional[str] = None, def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_azure.py b/sky/clouds/service_catalog/data_fetchers/fetch_azure.py index bbd337e23aa..f646cac339a 100644 --- a/sky/clouds/service_catalog/data_fetchers/fetch_azure.py +++ b/sky/clouds/service_catalog/data_fetchers/fetch_azure.py @@ -93,14 +93,15 @@ def get_regions() -> List[str]: # We have to manually remove it. DEPRECATED_FAMILIES = ['standardNVSv2Family'] -# Some A10 instance types only contains a fractional of GPU. We temporarily -# filter them out here to avoid using it as a whole A10 GPU. -# TODO(zhwu,tian): support fractional GPUs, which can be done on -# kubernetes as well. +# Azure has those fractional A10 instance types, which still shows has 1 A10 GPU +# in the API response. We manually changing the number of GPUs to a float here. # Ref: https://learn.microsoft.com/en-us/azure/virtual-machines/nva10v5-series -FILTERED_A10_INSTANCE_TYPES = [ - f'Standard_NV{vcpu}ads_A10_v5' for vcpu in [6, 12, 18] -] +# TODO(zhwu,tian): Support fractional GPUs on k8s as well. +# TODO(tian): Maybe we should support literally fractional count, i.e. A10:1/6 +# instead of float point count (A10:0.167). +AZURE_FRACTIONAL_A10_INS_TYPE_TO_NUM_GPUS = { + f'Standard_NV{vcpu}ads_A10_v5': round(vcpu / 36, 3) for vcpu in [6, 12, 18] +} USEFUL_COLUMNS = [ 'InstanceType', 'AcceleratorName', 'AcceleratorCount', 'vCPUs', 'MemoryGiB', @@ -274,6 +275,19 @@ def get_additional_columns(row): axis='columns', ) + def _upd_a10_gpu_count(row): + new_gpu_cnt = AZURE_FRACTIONAL_A10_INS_TYPE_TO_NUM_GPUS.get( + row['InstanceType']) + if new_gpu_cnt is not None: + return new_gpu_cnt + return row['AcceleratorCount'] + + # Manually update the GPU count for fractional A10 instance types. + # Those instance types have fractional GPU count, but Azure API returns + # 1 GPU count for them. We manually update the GPU count here. + df_ret['AcceleratorCount'] = df_ret.apply(_upd_a10_gpu_count, + axis='columns') + # As of Dec 2023, a few H100 instance types fetched from Azure APIs do not # have pricing: # @@ -299,10 +313,6 @@ def get_additional_columns(row): after_drop_len = len(df_ret) print(f'Dropped {before_drop_len - after_drop_len} duplicated rows') - # Filter out instance types that only contain a fractional of GPU. - df_ret = df_ret.loc[~df_ret['InstanceType'].isin(FILTERED_A10_INSTANCE_TYPES - )] - # Filter out deprecated families df_ret = df_ret.loc[~df_ret['family'].isin(DEPRECATED_FAMILIES)] df_ret = df_ret[USEFUL_COLUMNS] diff --git a/sky/clouds/service_catalog/fluidstack_catalog.py b/sky/clouds/service_catalog/fluidstack_catalog.py index 2f47a38df43..7a28ac8174a 100644 --- a/sky/clouds/service_catalog/fluidstack_catalog.py +++ b/sky/clouds/service_catalog/fluidstack_catalog.py @@ -4,7 +4,7 @@ instance types and pricing information for FluidStack. """ import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.clouds.service_catalog import common from sky.utils import ux_utils @@ -65,7 +65,7 @@ def get_default_instance_type(cpus: Optional[str] = None, def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) diff --git a/sky/clouds/service_catalog/ibm_catalog.py b/sky/clouds/service_catalog/ibm_catalog.py index 51b4e14f569..5cec86fbb65 100644 --- a/sky/clouds/service_catalog/ibm_catalog.py +++ b/sky/clouds/service_catalog/ibm_catalog.py @@ -4,7 +4,7 @@ instance types and pricing information for IBM. """ -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky import sky_logging from sky.adaptors import ibm @@ -43,7 +43,7 @@ def get_vcpus_mem_from_instance_type( def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) diff --git a/sky/clouds/service_catalog/lambda_catalog.py b/sky/clouds/service_catalog/lambda_catalog.py index e843ab72cc0..24cb4064d54 100644 --- a/sky/clouds/service_catalog/lambda_catalog.py +++ b/sky/clouds/service_catalog/lambda_catalog.py @@ -4,7 +4,7 @@ instance types and pricing information for Lambda. """ import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.clouds.service_catalog import common from sky.utils import resources_utils @@ -72,7 +72,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) diff --git a/sky/clouds/service_catalog/oci_catalog.py b/sky/clouds/service_catalog/oci_catalog.py index 47d0489f6ab..c8e475df871 100644 --- a/sky/clouds/service_catalog/oci_catalog.py +++ b/sky/clouds/service_catalog/oci_catalog.py @@ -14,7 +14,7 @@ import logging import threading import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.adaptors import oci as oci_adaptor from sky.clouds import OCI @@ -131,7 +131,7 @@ def _filter_disk_type(instance_type: str) -> bool: def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl( _get_df(), instance_type) diff --git a/sky/clouds/service_catalog/paperspace_catalog.py b/sky/clouds/service_catalog/paperspace_catalog.py index 1eb635c93e5..49948b219a1 100644 --- a/sky/clouds/service_catalog/paperspace_catalog.py +++ b/sky/clouds/service_catalog/paperspace_catalog.py @@ -5,7 +5,7 @@ """ import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.clouds.service_catalog import common from sky.utils import ux_utils @@ -60,7 +60,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) diff --git a/sky/clouds/service_catalog/runpod_catalog.py b/sky/clouds/service_catalog/runpod_catalog.py index 2d3ed44307b..7fbc46206ed 100644 --- a/sky/clouds/service_catalog/runpod_catalog.py +++ b/sky/clouds/service_catalog/runpod_catalog.py @@ -5,7 +5,7 @@ """ import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.clouds.service_catalog import common from sky.utils import ux_utils @@ -56,7 +56,7 @@ def get_default_instance_type(cpus: Optional[str] = None, def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) diff --git a/sky/clouds/service_catalog/scp_catalog.py b/sky/clouds/service_catalog/scp_catalog.py index 209bb4cf631..e4773ab3250 100644 --- a/sky/clouds/service_catalog/scp_catalog.py +++ b/sky/clouds/service_catalog/scp_catalog.py @@ -5,7 +5,7 @@ """ import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.clouds.service_catalog import common from sky.utils import resources_utils @@ -67,7 +67,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl(_df, instance_type) diff --git a/sky/clouds/service_catalog/vsphere_catalog.py b/sky/clouds/service_catalog/vsphere_catalog.py index e1199d3d266..74fb2fbe60d 100644 --- a/sky/clouds/service_catalog/vsphere_catalog.py +++ b/sky/clouds/service_catalog/vsphere_catalog.py @@ -2,7 +2,7 @@ import io import os import typing -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple, Union from sky.adaptors import common as adaptors_common from sky.clouds.service_catalog import common @@ -85,7 +85,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( - instance_type: str) -> Optional[Dict[str, int]]: + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: return common.get_accelerators_from_instance_type_impl( _get_df(), instance_type) diff --git a/sky/clouds/vsphere.py b/sky/clouds/vsphere.py index 7cf56b46a8d..88d5df3232a 100644 --- a/sky/clouds/vsphere.py +++ b/sky/clouds/vsphere.py @@ -1,8 +1,7 @@ """Vsphere cloud implementation.""" -import json import subprocess import typing -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple, Union import requests @@ -152,7 +151,7 @@ def get_default_instance_type( def get_accelerators_from_instance_type( cls, instance_type: str, - ) -> Optional[Dict[str, int]]: + ) -> Optional[Dict[str, Union[int, float]]]: return service_catalog.get_accelerators_from_instance_type( instance_type, clouds=_CLOUD_VSPHERE) @@ -182,10 +181,8 @@ def make_deploy_resources_variables( zone_names = [zone.name for zone in zones] r = resources acc_dict = self.get_accelerators_from_instance_type(r.instance_type) - if acc_dict is not None: - custom_resources = json.dumps(acc_dict, separators=(',', ':')) - else: - custom_resources = None + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) return { 'instance_type': resources.instance_type, diff --git a/sky/resources.py b/sky/resources.py index 540cbfb703c..164ef312ba1 100644 --- a/sky/resources.py +++ b/sky/resources.py @@ -392,7 +392,7 @@ def memory(self) -> Optional[str]: @property @functools.lru_cache(maxsize=1) - def accelerators(self) -> Optional[Dict[str, int]]: + def accelerators(self) -> Optional[Dict[str, Union[int, float]]]: """Returns the accelerators field directly or by inferring. For example, Resources(AWS, 'p3.2xlarge') has its accelerators field diff --git a/sky/utils/resources_utils.py b/sky/utils/resources_utils.py index 72aa5ac05d3..653bb109ac0 100644 --- a/sky/utils/resources_utils.py +++ b/sky/utils/resources_utils.py @@ -2,9 +2,11 @@ import dataclasses import enum import itertools +import json +import math import re import typing -from typing import List, Optional, Set +from typing import Dict, List, Optional, Set, Union from sky import skypilot_config from sky.clouds import cloud_registry @@ -163,6 +165,16 @@ def get_readable_resources_repr(handle: 'backends.CloudVmRayResourceHandle', return _DEFAULT_MESSAGE_HANDLE_INITIALIZING +def make_ray_custom_resources_str( + resource_dict: Optional[Dict[str, Union[int, float]]]) -> Optional[str]: + """Convert resources to Ray custom resources format.""" + if resource_dict is None: + return None + # Ray does not allow fractional resources, so we need to ceil the values. + ceiled_dict = {k: math.ceil(v) for k, v in resource_dict.items()} + return json.dumps(ceiled_dict, separators=(',', ':')) + + @dataclasses.dataclass class FeasibleResources: """Feasible resources returned by cloud. From c0c17483d1f692ad639144050f5f6fa0966e47a5 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Sat, 26 Oct 2024 16:30:52 -0700 Subject: [PATCH 14/17] [Jobs] Refactor: Extract task failure state update helper (#4185) refactor: a unified exception handling utility --- sky/jobs/controller.py | 61 +++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/sky/jobs/controller.py b/sky/jobs/controller.py index 1faa5dfbe31..73d509be9ef 100644 --- a/sky/jobs/controller.py +++ b/sky/jobs/controller.py @@ -340,48 +340,28 @@ def run(self): common_utils.format_exception(reason, use_bracket=True) for reason in e.reasons)) logger.error(failure_reason) - managed_job_state.set_failed( - self._job_id, - task_id=task_id, - failure_type=managed_job_state.ManagedJobStatus. - FAILED_PRECHECKS, - failure_reason=failure_reason, - callback_func=managed_job_utils.event_callback_func( - job_id=self._job_id, - task_id=task_id, - task=self._dag.tasks[task_id])) + self._update_failed_task_state( + task_id, managed_job_state.ManagedJobStatus.FAILED_PRECHECKS, + failure_reason) except exceptions.ManagedJobReachedMaxRetriesError as e: # Please refer to the docstring of self._run for the cases when # this exception can occur. - logger.error(common_utils.format_exception(e)) + failure_reason = common_utils.format_exception(e) + logger.error(failure_reason) # The managed job should be marked as FAILED_NO_RESOURCE, as the # managed job may be able to launch next time. - managed_job_state.set_failed( - self._job_id, - task_id=task_id, - failure_type=managed_job_state.ManagedJobStatus. - FAILED_NO_RESOURCE, - failure_reason=common_utils.format_exception(e), - callback_func=managed_job_utils.event_callback_func( - job_id=self._job_id, - task_id=task_id, - task=self._dag.tasks[task_id])) + self._update_failed_task_state( + task_id, managed_job_state.ManagedJobStatus.FAILED_NO_RESOURCE, + failure_reason) except (Exception, SystemExit) as e: # pylint: disable=broad-except with ux_utils.enable_traceback(): logger.error(traceback.format_exc()) - msg = ('Unexpected error occurred: ' - f'{common_utils.format_exception(e, use_bracket=True)}') + msg = ('Unexpected error occurred: ' + + common_utils.format_exception(e, use_bracket=True)) logger.error(msg) - managed_job_state.set_failed( - self._job_id, - task_id=task_id, - failure_type=managed_job_state.ManagedJobStatus. - FAILED_CONTROLLER, - failure_reason=msg, - callback_func=managed_job_utils.event_callback_func( - job_id=self._job_id, - task_id=task_id, - task=self._dag.tasks[task_id])) + self._update_failed_task_state( + task_id, managed_job_state.ManagedJobStatus.FAILED_CONTROLLER, + msg) finally: # This will set all unfinished tasks to CANCELLING, and will not # affect the jobs in terminal states. @@ -396,6 +376,21 @@ def run(self): managed_job_state.set_cancelled(job_id=self._job_id, callback_func=callback_func) + def _update_failed_task_state( + self, task_id: int, + failure_type: managed_job_state.ManagedJobStatus, + failure_reason: str): + """Update the state of the failed task.""" + managed_job_state.set_failed( + self._job_id, + task_id=task_id, + failure_type=failure_type, + failure_reason=failure_reason, + callback_func=managed_job_utils.event_callback_func( + job_id=self._job_id, + task_id=task_id, + task=self._dag.tasks[task_id])) + def _run_controller(job_id: int, dag_yaml: str, retry_until_up: bool): """Runs the controller in a remote process for interruption.""" From 546cb17e870e571f984631313be23f197fc4b7fd Mon Sep 17 00:00:00 2001 From: Tian Xia Date: Mon, 28 Oct 2024 13:06:31 -0700 Subject: [PATCH 15/17] [Core] Remove backward compatibility code for 0.6.0 & 0.7.0 (#4175) * [Core] Remove backward compatibility code for 0.6.0 * remove backwards compatibility for 0.7.0 release * Update sky/serve/serve_state.py Co-authored-by: Romil Bhardwaj * remove more * Revert "remove more" This reverts commit 34c28e93a083c7f4d02e9d8a8685a6d61d358e84. * remove more but not instance tags --------- Co-authored-by: Christopher Cooper Co-authored-by: Romil Bhardwaj --- sky/backends/backend_utils.py | 143 ++------------------------- sky/backends/cloud_vm_ray_backend.py | 103 +------------------ sky/provision/gcp/instance.py | 7 -- sky/serve/core.py | 2 - sky/serve/serve_state.py | 10 +- sky/serve/serve_utils.py | 16 +-- sky/serve/service_spec.py | 28 ------ sky/skylet/job_lib.py | 14 +-- sky/skylet/log_lib.py | 19 +--- sky/utils/schemas.py | 15 +-- tests/test_jobs_and_serve.py | 1 - 11 files changed, 28 insertions(+), 330 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index caa6c9292d5..da1e68991a4 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -401,6 +401,8 @@ class SSHConfigHelper(object): ssh_conf_path = '~/.ssh/config' ssh_conf_lock_path = os.path.expanduser('~/.sky/ssh_config.lock') + ssh_conf_per_cluster_lock_path = os.path.expanduser( + '~/.sky/ssh_config_{}.lock') ssh_cluster_path = SKY_USER_FILE_PATH + '/ssh/{}' @classmethod @@ -486,12 +488,6 @@ def add_cluster( config_path = os.path.expanduser(cls.ssh_conf_path) - # For backward compatibility: before #2706, we wrote the config of SkyPilot clusters - # directly in ~/.ssh/config. For these clusters, we remove the config in ~/.ssh/config - # and write/overwrite the config in ~/.sky/ssh/ instead. - cls._remove_stale_cluster_config_for_backward_compatibility( - cluster_name, ip, auth_config, docker_user) - if not os.path.exists(config_path): config = ['\n'] with open(config_path, @@ -560,139 +556,20 @@ def add_cluster( f.write(codegen) @classmethod - def _remove_stale_cluster_config_for_backward_compatibility( - cls, - cluster_name: str, - ip: str, - auth_config: Dict[str, str], - docker_user: Optional[str] = None, - ): - """Remove authentication information for cluster from local SSH config. - - If no existing host matching the provided specification is found, then - nothing is removed. - - Args: - ip: Head node's IP address. - auth_config: read_yaml(handle.cluster_yaml)['auth'] - docker_user: If not None, use this user to ssh into the docker - """ - username = auth_config['ssh_user'] - config_path = os.path.expanduser(cls.ssh_conf_path) - cluster_config_path = os.path.expanduser( - cls.ssh_cluster_path.format(cluster_name)) - if not os.path.exists(config_path): - return - - with open(config_path, 'r', encoding='utf-8') as f: - config = f.readlines() - - start_line_idx = None - - # Scan the config for the cluster name. - for i, line in enumerate(config): - next_line = config[i + 1] if i + 1 < len(config) else '' - if docker_user is None: - found = (line.strip() == f'HostName {ip}' and - next_line.strip() == f'User {username}') - else: - found = (line.strip() == 'HostName localhost' and - next_line.strip() == f'User {docker_user}') - if found: - # Find the line starting with ProxyCommand and contains the ip - found = False - for idx in range(i, len(config)): - # Stop if we reach an empty line, which means a new host - if not config[idx].strip(): - break - if config[idx].strip().startswith('ProxyCommand'): - proxy_command_line = config[idx].strip() - if proxy_command_line.endswith(f'@{ip}'): - found = True - break - if found: - start_line_idx = i - 1 - break - - if start_line_idx is not None: - # Scan for end of previous config. - cursor = start_line_idx - while cursor > 0 and len(config[cursor].strip()) > 0: - cursor -= 1 - prev_end_line_idx = cursor - - # Scan for end of the cluster config. - end_line_idx = None - cursor = start_line_idx + 1 - start_line_idx -= 1 # remove auto-generated comment - while cursor < len(config): - if config[cursor].strip().startswith( - '# ') or config[cursor].strip().startswith('Host '): - end_line_idx = cursor - break - cursor += 1 - - # Remove sky-generated config and update the file. - config[prev_end_line_idx:end_line_idx] = [ - '\n' - ] if end_line_idx is not None else [] - with open(config_path, 'w', encoding='utf-8') as f: - f.write(''.join(config).strip()) - f.write('\n' * 2) - - # Delete include statement if it exists in the config. - sky_autogen_comment = ('# Added by sky (use `sky stop/down ' - f'{cluster_name}` to remove)') - with open(config_path, 'r', encoding='utf-8') as f: - config = f.readlines() - - for i, line in enumerate(config): - config_str = line.strip() - if f'Include {cluster_config_path}' in config_str: - with open(config_path, 'w', encoding='utf-8') as f: - if i < len(config) - 1 and config[i + 1] == '\n': - del config[i + 1] - # Delete Include string - del config[i] - # Delete Sky Autogen Comment - if i > 0 and sky_autogen_comment in config[i - 1].strip(): - del config[i - 1] - f.write(''.join(config)) - break - if 'Host' in config_str: - break - - @classmethod - # TODO: We can remove this after 0.6.0 and have a lock only per cluster. - @timeline.FileLockEvent(ssh_conf_lock_path) - def remove_cluster( - cls, - cluster_name: str, - ip: str, - auth_config: Dict[str, str], - docker_user: Optional[str] = None, - ): + def remove_cluster(cls, cluster_name: str): """Remove authentication information for cluster from ~/.sky/ssh/. - For backward compatibility also remove the config from ~/.ssh/config if it exists. - If no existing host matching the provided specification is found, then nothing is removed. Args: - ip: Head node's IP address. - auth_config: read_yaml(handle.cluster_yaml)['auth'] - docker_user: If not None, use this user to ssh into the docker + cluster_name: Cluster name. """ - cluster_config_path = os.path.expanduser( - cls.ssh_cluster_path.format(cluster_name)) - common_utils.remove_file_if_exists(cluster_config_path) - - # Ensures backward compatibility: before #2706, we wrote the config of SkyPilot clusters - # directly in ~/.ssh/config. For these clusters, we should clean up the config. - # TODO: Remove this after 0.6.0 - cls._remove_stale_cluster_config_for_backward_compatibility( - cluster_name, ip, auth_config, docker_user) + with timeline.FileLockEvent( + cls.ssh_conf_per_cluster_lock_path.format(cluster_name)): + cluster_config_path = os.path.expanduser( + cls.ssh_cluster_path.format(cluster_name)) + common_utils.remove_file_if_exists(cluster_config_path) def _replace_yaml_dicts( @@ -867,7 +744,7 @@ def write_cluster_config( labels = skypilot_config.get_nested((str(cloud).lower(), 'labels'), {}) # Deprecated: instance_tags have been replaced by labels. For backward # compatibility, we support them and the schema allows them only if - # `labels` are not specified. This should be removed after 0.7.0. + # `labels` are not specified. This should be removed after 0.8.0. labels = skypilot_config.get_nested((str(cloud).lower(), 'instance_tags'), labels) # labels is a dict, which is guaranteed by the type check in diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 918848b045b..d0a642d4d80 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -2118,13 +2118,8 @@ def __init__( stable_internal_external_ips: Optional[List[Tuple[str, str]]] = None, stable_ssh_ports: Optional[List[int]] = None, - cluster_info: Optional[provision_common.ClusterInfo] = None, - # The following 2 fields are deprecated. SkyPilot new provisioner - # API handles the TPU node creation/deletion. - # Backward compatibility for TPU nodes created before #2943. - # TODO (zhwu): Remove this after 0.6.0. - tpu_create_script: Optional[str] = None, - tpu_delete_script: Optional[str] = None) -> None: + cluster_info: Optional[provision_common.ClusterInfo] = None + ) -> None: self._version = self._VERSION self.cluster_name = cluster_name self.cluster_name_on_cloud = cluster_name_on_cloud @@ -2139,12 +2134,6 @@ def __init__( self.launched_nodes = launched_nodes self.launched_resources = launched_resources self.docker_user: Optional[str] = None - # Deprecated. SkyPilot new provisioner API handles the TPU node - # creation/deletion. - # Backward compatibility for TPU nodes created before #2943. - # TODO (zhwu): Remove this after 0.6.0. - self.tpu_create_script = tpu_create_script - self.tpu_delete_script = tpu_delete_script def __repr__(self): return (f'ResourceHandle(' @@ -2160,10 +2149,7 @@ def __repr__(self): f'\n\tlaunched_resources={self.launched_nodes}x ' f'{self.launched_resources}, ' f'\n\tdocker_user={self.docker_user},' - f'\n\tssh_user={self.ssh_user},' - # TODO (zhwu): Remove this after 0.6.0. - f'\n\ttpu_create_script={self.tpu_create_script}, ' - f'\n\ttpu_delete_script={self.tpu_delete_script})') + f'\n\tssh_user={self.ssh_user}') def get_cluster_name(self): return self.cluster_name @@ -2176,26 +2162,6 @@ def _use_internal_ips(self): return common_utils.read_yaml(self.cluster_yaml).get( 'provider', {}).get('use_internal_ips', False) - def _update_cluster_region(self): - """Update the region in handle.launched_resources. - - This is for backward compatibility to handle the clusters launched - long before. We should remove this after 0.6.0. - """ - if self.launched_resources.region is not None: - return - - config = common_utils.read_yaml(self.cluster_yaml) - provider = config['provider'] - cloud = self.launched_resources.cloud - if cloud.is_same_cloud(clouds.Azure()): - region = provider['location'] - elif cloud.is_same_cloud(clouds.GCP()) or cloud.is_same_cloud( - clouds.AWS()): - region = provider['region'] - - self.launched_resources = self.launched_resources.copy(region=region) - def update_ssh_ports(self, max_attempts: int = 1) -> None: """Fetches and sets the SSH ports for the cluster nodes. @@ -2567,8 +2533,6 @@ def __setstate__(self, state): if version < 4: self.update_ssh_ports() - self._update_cluster_region() - if version < 8: try: self._update_cluster_info() @@ -2649,8 +2613,6 @@ def check_resources_fit_cluster( if record is not None: usage_lib.messages.usage.update_cluster_status(record['status']) - # Backward compatibility: the old launched_resources without region info - # was handled by ResourceHandle._update_cluster_region. assert launched_resources.region is not None, handle mismatch_str = (f'To fix: specify a new cluster name, or down the ' @@ -4096,55 +4058,9 @@ def post_teardown_cleanup(self, * Removing ssh configs for the cluster; * Updating the local state of the cluster; * Removing the terminated cluster's scripts and ray yaml files. - - Raises: - RuntimeError: If it fails to delete the TPU. """ - log_path = os.path.join(os.path.expanduser(self.log_dir), - 'teardown.log') - log_abs_path = os.path.abspath(log_path) cluster_name_on_cloud = handle.cluster_name_on_cloud - # Backward compatibility for TPU nodes created before #2943. Any TPU - # node launched before that PR have the delete script generated (and do - # not have the tpu_node config set in its cluster yaml), so we have to - # call the deletion script to clean up the TPU node. - # For TPU nodes launched after the PR, deletion is done in SkyPilot's - # new GCP provisioner API. - # TODO (zhwu): Remove this after 0.6.0. - if (handle.tpu_delete_script is not None and - os.path.exists(handle.tpu_delete_script)): - # Only call the deletion script if the cluster config does not - # contain TPU node config. Otherwise, the deletion should - # already be handled by the new provisioner. - config = common_utils.read_yaml(handle.cluster_yaml) - tpu_node_config = config['provider'].get('tpu_node') - if tpu_node_config is None: - with rich_utils.safe_status( - ux_utils.spinner_message('Terminating TPU')): - tpu_rc, tpu_stdout, tpu_stderr = log_lib.run_with_log( - ['bash', handle.tpu_delete_script], - log_abs_path, - stream_logs=False, - require_outputs=True) - if tpu_rc != 0: - if _TPU_NOT_FOUND_ERROR in tpu_stderr: - logger.info('TPU not found. ' - 'It should have been deleted already.') - elif purge: - logger.warning( - _TEARDOWN_PURGE_WARNING.format( - reason='stopping/terminating TPU', - details=tpu_stderr)) - else: - raise RuntimeError( - _TEARDOWN_FAILURE_MESSAGE.format( - extra_reason='It is caused by TPU failure.', - cluster_name=common_utils.cluster_name_in_hint( - handle.cluster_name, cluster_name_on_cloud), - stdout=tpu_stdout, - stderr=tpu_stderr)) - if (terminate and handle.launched_resources.is_image_managed is True): # Delete the image when terminating a "cloned" cluster, i.e., # whose image is created by SkyPilot (--clone-disk-from) @@ -4189,11 +4105,7 @@ def post_teardown_cleanup(self, # The cluster file must exist because the cluster_yaml will only # be removed after the cluster entry in the database is removed. config = common_utils.read_yaml(handle.cluster_yaml) - auth_config = config['auth'] - backend_utils.SSHConfigHelper.remove_cluster(handle.cluster_name, - handle.head_ip, - auth_config, - handle.docker_user) + backend_utils.SSHConfigHelper.remove_cluster(handle.cluster_name) global_user_state.remove_cluster(handle.cluster_name, terminate=terminate) @@ -4202,13 +4114,6 @@ def post_teardown_cleanup(self, # This function could be directly called from status refresh, # where we need to cleanup the cluster profile. metadata_utils.remove_cluster_metadata(handle.cluster_name) - # Clean up TPU creation/deletion scripts - # Backward compatibility for TPU nodes created before #2943. - # TODO (zhwu): Remove this after 0.6.0. - if handle.tpu_delete_script is not None: - assert handle.tpu_create_script is not None - common_utils.remove_file_if_exists(handle.tpu_create_script) - common_utils.remove_file_if_exists(handle.tpu_delete_script) # Clean up generated config # No try-except is needed since Ray will fail to teardown the diff --git a/sky/provision/gcp/instance.py b/sky/provision/gcp/instance.py index 21d04075f59..9872ad73dc7 100644 --- a/sky/provision/gcp/instance.py +++ b/sky/provision/gcp/instance.py @@ -632,13 +632,6 @@ def cleanup_ports( del ports # Unused. assert provider_config is not None, cluster_name_on_cloud project_id = provider_config['project_id'] - if 'ports' in provider_config: - # Backward compatibility for old provider config. - # TODO(tian): remove this after 2 minor releases, 0.6.0. - for port in provider_config['ports']: - firewall_rule_name = f'user-ports-{cluster_name_on_cloud}-{port}' - instance_utils.GCPComputeInstance.delete_firewall_rule( - project_id, firewall_rule_name) if 'firewall_rule' in provider_config: firewall_rule_name = provider_config['firewall_rule'] instance_utils.GCPComputeInstance.delete_firewall_rule( diff --git a/sky/serve/core.py b/sky/serve/core.py index 691a3edea0b..ea8f380a2e7 100644 --- a/sky/serve/core.py +++ b/sky/serve/core.py @@ -572,8 +572,6 @@ def status( 'controller_port': (Optional[int]) controller port, 'load_balancer_port': (Optional[int]) load balancer port, 'policy': (Optional[str]) load balancer policy description, - 'requested_resources': (sky.Resources) requested resources - for replica (deprecated), 'requested_resources_str': (str) str representation of requested resources, 'replica_info': (List[Dict[str, Any]]) replica information, diff --git a/sky/serve/serve_state.py b/sky/serve/serve_state.py index cbc8ef3d8cc..333e0138fb4 100644 --- a/sky/serve/serve_state.py +++ b/sky/serve/serve_state.py @@ -34,7 +34,7 @@ def _get_db_path() -> str: def create_table(cursor: 'sqlite3.Cursor', conn: 'sqlite3.Connection') -> None: """Creates the service and replica tables if they do not exist.""" - # auto_restart column is deprecated. + # auto_restart and requested_resources column is deprecated. cursor.execute("""\ CREATE TABLE IF NOT EXISTS services ( name TEXT PRIMARY KEY, @@ -323,8 +323,8 @@ def set_service_load_balancer_port(service_name: str, def _get_service_from_row(row) -> Dict[str, Any]: (current_version, name, controller_job_id, controller_port, - load_balancer_port, status, uptime, policy, _, requested_resources, - requested_resources_str, _, active_versions) = row[:13] + load_balancer_port, status, uptime, policy, _, _, requested_resources_str, + _, active_versions) = row[:13] return { 'name': name, 'controller_job_id': controller_job_id, @@ -340,10 +340,6 @@ def _get_service_from_row(row) -> Dict[str, Any]: # The versions that is active for the load balancer. This is a list of # integers in json format. This is mainly for display purpose. 'active_versions': json.loads(active_versions), - # TODO(tian): Backward compatibility. - # Remove after 2 minor release, 0.6.0. - 'requested_resources': pickle.loads(requested_resources) - if requested_resources is not None else None, 'requested_resources_str': requested_resources_str, } diff --git a/sky/serve/serve_utils.py b/sky/serve/serve_utils.py index 3a416dd2932..6e7b6f6eb4a 100644 --- a/sky/serve/serve_utils.py +++ b/sky/serve/serve_utils.py @@ -825,12 +825,7 @@ def format_service_table(service_records: List[Dict[str, Any]], replicas = _get_replicas(record) endpoint = get_endpoint(record) policy = record['policy'] - # TODO(tian): Backward compatibility. - # Remove `requested_resources` field after 2 minor release, 0.6.0. - if record.get('requested_resources_str') is None: - requested_resources_str = str(record['requested_resources']) - else: - requested_resources_str = record['requested_resources_str'] + requested_resources_str = record['requested_resources_str'] service_values = [ service_name, @@ -1004,15 +999,8 @@ def _build(cls, code: List[str]) -> str: @classmethod def update_service(cls, service_name: str, version: int, mode: str) -> str: code = [ - # Backward compatibility for old serve version on the remote - # machine. The `mode` argument was added in #3249, and if the remote - # machine has an old SkyPilot version before that, we need to avoid - # passing the `mode` argument to the job_lib functions. - # TODO(zhwu): Remove this in 0.7.0 release. - f'mode_kwargs = {{"mode": {mode!r}}} ' - 'if getattr(constants, "SERVE_VERSION", 0) >= 1 else {}', f'msg = serve_utils.update_service_encoded({service_name!r}, ' - f'{version}, **mode_kwargs)', + f'{version}, mode={mode!r})', 'print(msg, end="", flush=True)', ] return cls._build(code) diff --git a/sky/serve/service_spec.py b/sky/serve/service_spec.py index 3a97a6f8521..2eff6f40a9d 100644 --- a/sky/serve/service_spec.py +++ b/sky/serve/service_spec.py @@ -29,13 +29,6 @@ def __init__( base_ondemand_fallback_replicas: Optional[int] = None, upscale_delay_seconds: Optional[int] = None, downscale_delay_seconds: Optional[int] = None, - # The following arguments are deprecated. - # TODO(ziming): remove this after 2 minor release, i.e. 0.6.0. - # Deprecated: Always be True - auto_restart: Optional[bool] = None, - # Deprecated: replaced by the target_qps_per_replica. - qps_upper_threshold: Optional[float] = None, - qps_lower_threshold: Optional[float] = None, ) -> None: if max_replicas is not None and max_replicas < min_replicas: with ux_utils.print_exception_no_traceback(): @@ -62,21 +55,6 @@ def __init__( raise ValueError('readiness_path must start with a slash (/). ' f'Got: {readiness_path}') - # TODO(tian): Following field are deprecated. Remove after 2 minor - # release, i.e. 0.6.0. - if qps_upper_threshold is not None or qps_lower_threshold is not None: - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'Field `qps_upper_threshold` and `qps_lower_threshold`' - 'under `replica_policy` are deprecated. ' - 'Please use target_qps_per_replica instead.') - if auto_restart is not None: - with ux_utils.print_exception_no_traceback(): - raise ValueError( - 'Field `auto_restart` under `replica_policy` is deprecated.' - 'Currently, SkyServe will cleanup failed replicas' - 'and auto restart it to keep the service running.') - self._readiness_path: str = readiness_path self._initial_delay_seconds: int = initial_delay_seconds self._readiness_timeout_seconds: int = readiness_timeout_seconds @@ -160,14 +138,8 @@ def from_yaml_config(config: Dict[str, Any]) -> 'SkyServiceSpec': service_config['min_replicas'] = policy_section['min_replicas'] service_config['max_replicas'] = policy_section.get( 'max_replicas', None) - service_config['qps_upper_threshold'] = policy_section.get( - 'qps_upper_threshold', None) - service_config['qps_lower_threshold'] = policy_section.get( - 'qps_lower_threshold', None) service_config['target_qps_per_replica'] = policy_section.get( 'target_qps_per_replica', None) - service_config['auto_restart'] = policy_section.get( - 'auto_restart', None) service_config['upscale_delay_seconds'] = policy_section.get( 'upscale_delay_seconds', None) service_config['downscale_delay_seconds'] = policy_section.get( diff --git a/sky/skylet/job_lib.py b/sky/skylet/job_lib.py index 5e7008e55d8..17908a1aec5 100644 --- a/sky/skylet/job_lib.py +++ b/sky/skylet/job_lib.py @@ -827,14 +827,6 @@ class JobLibCodeGen: 'import os', 'import getpass', 'from sky.skylet import job_lib, log_lib, constants', - # Backward compatibility for old skylet lib version on the remote - # machine. The `job_owner` argument was removed in #3037, and if the - # remote machine has an old SkyPilot version before that, we need to - # pass the `job_owner` argument to the job_lib functions. - # TODO(zhwu): Remove this in 0.7.0 release. - 'job_owner_kwargs = {} ' - 'if getattr(constants, "SKYLET_LIB_VERSION", 0) >= 1 ' - 'else {"job_owner": getpass.getuser()}', ] @classmethod @@ -861,7 +853,7 @@ def queue_job(cls, job_id: int, cmd: str) -> str: @classmethod def update_status(cls) -> str: - code = ['job_lib.update_status(**job_owner_kwargs)'] + code = ['job_lib.update_status()'] return cls._build(code) @classmethod @@ -879,7 +871,7 @@ def cancel_jobs(cls, """See job_lib.cancel_jobs().""" code = [ (f'cancelled = job_lib.cancel_jobs_encoded_results(' - f' {job_ids!r}, {cancel_all}, **job_owner_kwargs)'), + f' {job_ids!r}, {cancel_all})'), # Print cancelled IDs. Caller should parse by decoding. 'print(cancelled, flush=True)', ] @@ -902,7 +894,7 @@ def tail_logs(cls, 'run_timestamp = job_lib.get_run_timestamp(job_id)', f'log_dir = None if run_timestamp is None else os.path.join({constants.SKY_LOGS_DIRECTORY!r}, run_timestamp)', f'log_lib.tail_logs(job_id=job_id, log_dir=log_dir, ' - f'managed_job_id={managed_job_id!r}, follow={follow}, **job_owner_kwargs)', + f'managed_job_id={managed_job_id!r}, follow={follow})', ] return cls._build(code) diff --git a/sky/skylet/log_lib.py b/sky/skylet/log_lib.py index 9f1483b2b48..eb64440077e 100644 --- a/sky/skylet/log_lib.py +++ b/sky/skylet/log_lib.py @@ -186,20 +186,11 @@ def run_with_log( daemon_script = os.path.join( os.path.dirname(os.path.abspath(job_lib.__file__)), 'subprocess_daemon.py') - if not hasattr(constants, 'SKY_GET_PYTHON_PATH_CMD'): - # Backward compatibility: for cluster started before #3326, this - # constant does not exist. Since we generate the job script - # in backends.cloud_vm_ray_backend with inspect, so the - # the lates `run_with_log` will be used, but the `constants` is - # not updated. We fallback to `python3` in this case. - # TODO(zhwu): remove this after 0.7.0. - python_path = 'python3' - else: - python_path = subprocess.check_output( - constants.SKY_GET_PYTHON_PATH_CMD, - shell=True, - stderr=subprocess.DEVNULL, - encoding='utf-8').strip() + python_path = subprocess.check_output( + constants.SKY_GET_PYTHON_PATH_CMD, + shell=True, + stderr=subprocess.DEVNULL, + encoding='utf-8').strip() daemon_cmd = [ python_path, daemon_script, diff --git a/sky/utils/schemas.py b/sky/utils/schemas.py index 94a6ed690e1..d9f105db8b0 100644 --- a/sky/utils/schemas.py +++ b/sky/utils/schemas.py @@ -357,19 +357,6 @@ def get_service_schema(): 'downscale_delay_seconds': { 'type': 'number', }, - # TODO(MaoZiming): Fields `qps_upper_threshold`, - # `qps_lower_threshold` and `auto_restart` are deprecated. - # Temporarily keep these fields for backward compatibility. - # Remove after 2 minor release, i.e., 0.6.0. - 'auto_restart': { - 'type': 'boolean', - }, - 'qps_upper_threshold': { - 'type': 'number', - }, - 'qps_lower_threshold': { - 'type': 'number', - }, } }, 'replicas': { @@ -595,7 +582,7 @@ def get_cluster_schema(): _LABELS_SCHEMA = { # Deprecated: 'instance_tags' is replaced by 'labels'. Keeping for backward - # compatibility. Will be removed after 0.7.0. + # compatibility. Will be removed after 0.8.0. 'instance_tags': { 'type': 'object', 'required': [], diff --git a/tests/test_jobs_and_serve.py b/tests/test_jobs_and_serve.py index a599fb7ba88..237ffd440da 100644 --- a/tests/test_jobs_and_serve.py +++ b/tests/test_jobs_and_serve.py @@ -307,7 +307,6 @@ def mock_get_services_one_service( 'controller_port': 30001, 'load_balancer_port': 30000, 'policy': None, - 'requested_resources': sky.Resources(), 'requested_resources_str': '', 'replica_info': [], } From 6b2b552e7ed98fab3f7ab6469ddcb1292798e264 Mon Sep 17 00:00:00 2001 From: Andy Lee Date: Mon, 28 Oct 2024 14:02:25 -0700 Subject: [PATCH 16/17] Remove outdated pylint disabling comments (#4196) Update cloud_vm_ray_backend.py --- sky/backends/cloud_vm_ray_backend.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index d0a642d4d80..4087dab1240 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3547,9 +3547,6 @@ def _teardown(self, backend_utils.CLUSTER_STATUS_LOCK_PATH.format(cluster_name)) try: - # TODO(mraheja): remove pylint disabling when filelock - # version updated - # pylint: disable=abstract-class-instantiated with filelock.FileLock( lock_path, backend_utils.CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS): From d3be8ed56adfb5efb9789790bec28616d446c239 Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Mon, 28 Oct 2024 14:02:30 -0700 Subject: [PATCH 17/17] [test] update default clouds for smoke tests (#4182) --- tests/conftest.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index b4e025a8f2d..bb79abfe61e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,7 +8,7 @@ # to mark a test as slow and to skip by default. # https://docs.pytest.org/en/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option -# By default, only run generic tests and cloud-specific tests for GCP and Azure, +# By default, only run generic tests and cloud-specific tests for AWS and Azure, # due to the cloud credit limit for the development account. # # A "generic test" tests a generic functionality (e.g., autostop) that @@ -24,7 +24,7 @@ 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', 'kubernetes', 'vsphere', 'cudo', 'fluidstack', 'paperspace' ] -default_clouds_to_run = ['gcp', 'azure'] +default_clouds_to_run = ['aws', 'azure'] # Translate cloud name to pytest keyword. We need this because # @pytest.mark.lambda is not allowed, so we use @pytest.mark.lambda_cloud @@ -72,7 +72,7 @@ def pytest_addoption(parser): parser.addoption( '--generic-cloud', type=str, - default='gcp', + default='aws', choices=all_clouds_in_smoke_tests, help='Cloud to use for generic tests. If the generic cloud is ' 'not within the clouds to be run, it will be reset to the first ' @@ -138,8 +138,8 @@ def pytest_collection_modifyitems(config, items): for cloud in all_clouds_in_smoke_tests: cloud_keyword = cloud_to_pytest_keyword[cloud] if (cloud_keyword in item.keywords and cloud not in cloud_to_run): - # Need to check both conditions as 'gcp' is added to cloud_to_run - # when tested for cloudflare + # Need to check both conditions as the first default cloud is + # added to cloud_to_run when tested for cloudflare if config.getoption('--cloudflare') and cloud == 'cloudflare': continue item.add_marker(skip_marks[cloud])