diff --git a/docs/source/getting-started/installation.rst b/docs/source/getting-started/installation.rst index 93c730ef651..b88bb0b69f8 100644 --- a/docs/source/getting-started/installation.rst +++ b/docs/source/getting-started/installation.rst @@ -297,6 +297,16 @@ Paperspace mkdir -p ~/.paperspace echo "{'api_key' : }" > ~/.paperspace/config.json +Vast +~~~~~~~~~~ + +`Vast `__ is a cloud provider that offers low-cost GPUs. To configure Vast access, go to the `Account `_ page on your Vast console to get your **API key**. Then, run: + +.. code-block:: shell + + pip install "vastai-sdk>=0.1.3" + echo "" > ~/.vast_api_key + RunPod ~~~~~~~~~~ diff --git a/sky/__init__.py b/sky/__init__.py index 4e720d63ce0..9e052ca62f2 100644 --- a/sky/__init__.py +++ b/sky/__init__.py @@ -133,6 +133,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): OCI = clouds.OCI Paperspace = clouds.Paperspace RunPod = clouds.RunPod +Vast = clouds.Vast Vsphere = clouds.Vsphere Fluidstack = clouds.Fluidstack optimize = Optimizer.optimize @@ -150,6 +151,7 @@ def set_proxy_env_var(proxy_var: str, urllib_var: Optional[str]): 'OCI', 'Paperspace', 'RunPod', + 'Vast', 'SCP', 'Vsphere', 'Fluidstack', diff --git a/sky/adaptors/vast.py b/sky/adaptors/vast.py new file mode 100644 index 00000000000..20ba72035f1 --- /dev/null +++ b/sky/adaptors/vast.py @@ -0,0 +1,29 @@ +"""Vast cloud adaptor.""" + +import functools + +_vast_sdk = None + + +def import_package(func): + + @functools.wraps(func) + def wrapper(*args, **kwargs): + global _vast_sdk + + if _vast_sdk is None: + try: + import vastai_sdk as _vast # pylint: disable=import-outside-toplevel + _vast_sdk = _vast.VastAI() + except ImportError: + raise ImportError('Fail to import dependencies for vast.' + 'Try pip install "skypilot[vast]"') from None + return func(*args, **kwargs) + + return wrapper + + +@import_package +def vast(): + """Return the vast package.""" + return _vast_sdk diff --git a/sky/authentication.py b/sky/authentication.py index 6108073494f..b281de6c1b3 100644 --- a/sky/authentication.py +++ b/sky/authentication.py @@ -43,6 +43,7 @@ from sky.adaptors import ibm from sky.adaptors import kubernetes from sky.adaptors import runpod +from sky.adaptors import vast from sky.provision.fluidstack import fluidstack_utils from sky.provision.kubernetes import utils as kubernetes_utils from sky.provision.lambda_cloud import lambda_utils @@ -485,6 +486,23 @@ def setup_runpod_authentication(config: Dict[str, Any]) -> Dict[str, Any]: return configure_ssh_info(config) +def setup_vast_authentication(config: Dict[str, Any]) -> Dict[str, Any]: + """Sets up SSH authentication for Vast. + - Generates a new SSH key pair if one does not exist. + - Adds the public SSH key to the user's Vast account. + """ + _, public_key_path = get_or_generate_keys() + with open(public_key_path, 'r', encoding='UTF-8') as pub_key_file: + public_key = pub_key_file.read().strip() + current_key_list = vast.vast().show_ssh_keys() # pylint: disable=assignment-from-no-return + # Only add an ssh key if it hasn't already been added + if not any(x['public_key'] == public_key for x in current_key_list): + vast.vast().create_ssh_key(ssh_key=public_key) + + config['auth']['ssh_public_key'] = PUBLIC_SSH_KEY_PATH + return configure_ssh_info(config) + + def setup_fluidstack_authentication(config: Dict[str, Any]) -> Dict[str, Any]: get_or_generate_keys() diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index bf92f442d2f..7385c7cb9fc 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1056,6 +1056,8 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): config = auth.setup_ibm_authentication(config) elif isinstance(cloud, clouds.RunPod): config = auth.setup_runpod_authentication(config) + elif isinstance(cloud, clouds.Vast): + config = auth.setup_vast_authentication(config) elif isinstance(cloud, clouds.Fluidstack): config = auth.setup_fluidstack_authentication(config) else: @@ -2135,7 +2137,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: except exceptions.CommandError as e: success = False if e.returncode == 255: - logger.debug(f'The cluster is likely {noun}ed.') + word = 'autostopped' if noun == 'autostop' else 'autodowned' + logger.debug(f'The cluster is likely {word}.') reset_local_autostop = False except (Exception, SystemExit) as e: # pylint: disable=broad-except success = False diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index e2f7c997b09..84d8c1c4073 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -182,6 +182,7 @@ def _get_cluster_config_template(cloud): clouds.RunPod: 'runpod-ray.yml.j2', clouds.Kubernetes: 'kubernetes-ray.yml.j2', clouds.Vsphere: 'vsphere-ray.yml.j2', + clouds.Vast: 'vast-ray.yml.j2', clouds.Fluidstack: 'fluidstack-ray.yml.j2' } return cloud_to_template[type(cloud)] diff --git a/sky/clouds/__init__.py b/sky/clouds/__init__.py index 24b805fe8bc..53b770d4531 100644 --- a/sky/clouds/__init__.py +++ b/sky/clouds/__init__.py @@ -25,6 +25,7 @@ from sky.clouds.paperspace import Paperspace from sky.clouds.runpod import RunPod from sky.clouds.scp import SCP +from sky.clouds.vast import Vast from sky.clouds.vsphere import Vsphere __all__ = [ @@ -39,6 +40,7 @@ 'Paperspace', 'SCP', 'RunPod', + 'Vast', 'OCI', 'Vsphere', 'Kubernetes', diff --git a/sky/clouds/service_catalog/constants.py b/sky/clouds/service_catalog/constants.py index 945152582f6..c511158e397 100644 --- a/sky/clouds/service_catalog/constants.py +++ b/sky/clouds/service_catalog/constants.py @@ -3,5 +3,5 @@ CATALOG_SCHEMA_VERSION = 'v6' CATALOG_DIR = '~/.sky/catalogs' ALL_CLOUDS = ('aws', 'azure', 'gcp', 'ibm', 'lambda', 'scp', 'oci', - 'kubernetes', 'runpod', 'vsphere', 'cudo', 'fluidstack', + 'kubernetes', 'runpod', 'vast', 'vsphere', 'cudo', 'fluidstack', 'paperspace', 'do') diff --git a/sky/clouds/service_catalog/data_fetchers/fetch_vast.py b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py new file mode 100755 index 00000000000..7d452d5d913 --- /dev/null +++ b/sky/clouds/service_catalog/data_fetchers/fetch_vast.py @@ -0,0 +1,108 @@ +"""A script that generates the Vast Cloud catalog. """ + +# +# Due to the design of the sdk, pylint has a false +# positive for the fnctions. +# +# pylint: disable=assignment-from-no-return +import collections +import csv +import json +import math +import re +import sys +from typing import Any, Dict, List + +from sky.adaptors import vast + +_map = { + 'TeslaV100': 'V100', + 'TeslaT4': 'T4', + 'TeslaP100': 'P100', + 'QRTX6000': 'RTX6000', + 'QRTX8000': 'RTX8000' +} + + +def create_instance_type(obj: Dict[str, Any]) -> str: + stubify = lambda x: re.sub(r'\s', '_', x) + return '{}x-{}-{}-{}'.format(obj['num_gpus'], stubify(obj['gpu_name']), + obj['cpu_cores'], obj['cpu_ram']) + + +def dot_get(d: dict, key: str) -> Any: + for k in key.split('.'): + d = d[k] + return d + + +if __name__ == '__main__': + # InstanceType and gpuInfo are basically just stubs + # so that the dictwriter is happy without weird + # code. + mapped_keys = (('gpu_name', 'InstanceType'), ('gpu_name', + 'AcceleratorName'), + ('num_gpus', 'AcceleratorCount'), ('cpu_cores', 'vCPUs'), + ('cpu_ram', 'MemoryGiB'), ('gpu_name', 'GpuInfo'), + ('search.totalHour', 'Price'), ('min_bid', 'SpotPrice'), + ('geolocation', 'Region')) + writer = csv.DictWriter(sys.stdout, fieldnames=[x[1] for x in mapped_keys]) + writer.writeheader() + + offerList = vast.vast().search_offers(limit=10000) + priceMap: Dict[str, List] = collections.defaultdict(list) + for offer in offerList: + entry = {} + for ours, theirs in mapped_keys: + field = dot_get(offer, ours) + entry[theirs] = field + + instance_type = create_instance_type(offer) + entry['InstanceType'] = instance_type + + # the documentation says + # "{'gpus': [{ + # 'name': 'v100', + # 'manufacturer': 'nvidia', + # 'count': 8.0, + # 'memoryinfo': {'sizeinmib': 16384} + # }], + # 'totalgpumemoryinmib': 16384}", + # we can do that. + entry['MemoryGiB'] /= 1024 + + gpu = re.sub('Ada', '-Ada', re.sub(r'\s', '', offer['gpu_name'])) + gpu = re.sub(r'(Ti|PCIE|SXM4|SXM|NVL)$', '', gpu) + gpu = re.sub(r'(RTX\d0\d0)(S|D)$', r'\1', gpu) + + if gpu in _map: + gpu = _map[gpu] + + entry['AcceleratorName'] = gpu + entry['GpuInfo'] = json.dumps({ + 'Gpus': [{ + 'Name': gpu, + 'Count': offer['num_gpus'], + 'MemoryInfo': { + 'SizeInMiB': offer['gpu_total_ram'] + } + }], + 'TotalGpuMemoryInMiB': offer['gpu_total_ram'] + }).replace('"', '\'') + + priceMap[instance_type].append(entry) + + for instanceList in priceMap.values(): + priceList = sorted([x['Price'] for x in instanceList]) + index = math.ceil(0.8 * len(priceList)) - 1 + priceTarget = priceList[index] + toList: List = [] + for instance in instanceList: + if instance['Price'] <= priceTarget: + instance['Price'] = '{:.2f}'.format(priceTarget) + toList.append(instance) + + maxBid = max([x.get('SpotPrice') for x in toList]) + for instance in toList: + instance['SpotPrice'] = '{:.2f}'.format(maxBid) + writer.writerow(instance) diff --git a/sky/clouds/service_catalog/vast_catalog.py b/sky/clouds/service_catalog/vast_catalog.py new file mode 100644 index 00000000000..14f4b2fedf2 --- /dev/null +++ b/sky/clouds/service_catalog/vast_catalog.py @@ -0,0 +1,104 @@ +""" Vast | Catalog + +This module loads the service catalog file and can be used to +query instance types and pricing information for Vast.ai. +""" + +import typing +from typing import Dict, List, Optional, Tuple, Union + +from sky.clouds.service_catalog import common +from sky.utils import ux_utils + +if typing.TYPE_CHECKING: + from sky.clouds import cloud + +_df = common.read_catalog('vast/vms.csv') + + +def instance_type_exists(instance_type: str) -> bool: + return common.instance_type_exists_impl(_df, instance_type) + + +def validate_region_zone( + region: Optional[str], + zone: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.validate_region_zone_impl('vast', _df, region, zone) + + +def get_hourly_cost(instance_type: str, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the cost, or the cheapest cost among all zones for spot.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.get_hourly_cost_impl(_df, instance_type, use_spot, region, + zone) + + +def get_vcpus_mem_from_instance_type( + instance_type: str) -> Tuple[Optional[float], Optional[float]]: + return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) + + +def get_default_instance_type(cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[str] = None) -> Optional[str]: + del disk_tier + # NOTE: After expanding catalog to multiple entries, you may + # want to specify a default instance type or family. + return common.get_instance_type_for_cpus_mem_impl(_df, cpus, memory) + + +def get_accelerators_from_instance_type( + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return common.get_accelerators_from_instance_type_impl(_df, instance_type) + + +def get_instance_type_for_accelerator( + acc_name: str, + acc_count: int, + cpus: Optional[str] = None, + memory: Optional[str] = None, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None) -> Tuple[Optional[List[str]], List[str]]: + """Returns a list of instance types that have the given accelerator.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('Vast does not support zones.') + return common.get_instance_type_for_accelerator_impl(df=_df, + acc_name=acc_name, + acc_count=acc_count, + cpus=cpus, + memory=memory, + use_spot=use_spot, + region=region, + zone=zone) + + +def get_region_zones_for_instance_type(instance_type: str, + use_spot: bool) -> List['cloud.Region']: + df = _df[_df['InstanceType'] == instance_type] + return common.get_region_zones(df, use_spot) + + +# TODO: this differs from the fluffy catalog version +def list_accelerators( + gpus_only: bool, + name_filter: Optional[str], + region_filter: Optional[str], + quantity_filter: Optional[int], + case_sensitive: bool = True, + all_regions: bool = False, + require_price: bool = True) -> Dict[str, List[common.InstanceTypeInfo]]: + """Returns all instance types in Vast offering GPUs.""" + del require_price # Unused. + return common.list_accelerators_impl('Vast', _df, gpus_only, name_filter, + region_filter, quantity_filter, + case_sensitive, all_regions) diff --git a/sky/clouds/vast.py b/sky/clouds/vast.py new file mode 100644 index 00000000000..f133b653b3d --- /dev/null +++ b/sky/clouds/vast.py @@ -0,0 +1,278 @@ +""" Vast Cloud. """ + +import typing +from typing import Dict, Iterator, List, Optional, Tuple, Union + +from sky import clouds +from sky.clouds import service_catalog +from sky.utils import resources_utils + +if typing.TYPE_CHECKING: + from sky import resources as resources_lib + + +@clouds.CLOUD_REGISTRY.register +class Vast(clouds.Cloud): + """ Vast GPU Cloud + + _REPR | The string representation for the Vast GPU cloud object. + """ + _REPR = 'Vast' + _CLOUD_UNSUPPORTED_FEATURES = { + clouds.CloudImplementationFeatures.MULTI_NODE: + ('Multi-node not supported yet, as the interconnection among nodes ' + 'are non-trivial on Vast.'), + clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: + ('Customizing disk tier is not supported yet on Vast.'), + clouds.CloudImplementationFeatures.OPEN_PORTS: + ('Opening ports is currently not supported on Vast.'), + clouds.CloudImplementationFeatures.STORAGE_MOUNTING: + ('Mounting object stores is not supported on Vast.'), + } + # + # Vast doesn't have a max cluster name limit. This number + # is reasonably large and exists to play nicely with the + # other providers + # + _MAX_CLUSTER_NAME_LEN_LIMIT = 120 + _regions: List[clouds.Region] = [] + + PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT + STATUS_VERSION = clouds.StatusVersion.SKYPILOT + + @classmethod + def _unsupported_features_for_resources( + cls, resources: 'resources_lib.Resources' + ) -> Dict[clouds.CloudImplementationFeatures, str]: + """The features not supported based on the resources provided. + + This method is used by check_features_are_supported() to check if the + cloud implementation supports all the requested features. + + Returns: + A dict of {feature: reason} for the features not supported by the + cloud implementation. + """ + del resources # unused + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def _max_cluster_name_length(cls) -> Optional[int]: + return cls._MAX_CLUSTER_NAME_LEN_LIMIT + + @classmethod + def regions_with_offering(cls, instance_type: str, + accelerators: Optional[Dict[str, int]], + use_spot: bool, region: Optional[str], + zone: Optional[str]) -> List[clouds.Region]: + assert zone is None, 'Vast does not support zones.' + del accelerators, zone # unused + regions = service_catalog.get_region_zones_for_instance_type( + instance_type, use_spot, 'vast') + + if region is not None: + regions = [r for r in regions if r.name == region] + return regions + + @classmethod + def get_vcpus_mem_from_instance_type( + cls, + instance_type: str, + ) -> Tuple[Optional[float], Optional[float]]: + return service_catalog.get_vcpus_mem_from_instance_type(instance_type, + clouds='vast') + + @classmethod + def zones_provision_loop( + cls, + *, + region: str, + num_nodes: int, + instance_type: str, + accelerators: Optional[Dict[str, int]] = None, + use_spot: bool = False, + ) -> Iterator[None]: + del num_nodes # unused + regions = cls.regions_with_offering(instance_type, + accelerators, + use_spot, + region=region, + zone=None) + for r in regions: + assert r.zones is None, r + yield r.zones + + def instance_type_to_hourly_cost(self, + instance_type: str, + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + return service_catalog.get_hourly_cost(instance_type, + use_spot=use_spot, + region=region, + zone=zone, + clouds='vast') + + def accelerators_to_hourly_cost(self, + accelerators: Dict[str, int], + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None) -> float: + """Returns the hourly cost of the accelerators, in dollars/hour.""" + del accelerators, use_spot, region, zone # unused + return 0.0 # Vast includes accelerators in the hourly cost. + + def get_egress_cost(self, num_gigabytes: float) -> float: + return 0.0 + + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[resources_utils.DiskTier] = None + ) -> Optional[str]: + """Returns the default instance type for Vast.""" + return service_catalog.get_default_instance_type(cpus=cpus, + memory=memory, + disk_tier=disk_tier, + clouds='vast') + + @classmethod + def get_accelerators_from_instance_type( + cls, instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return service_catalog.get_accelerators_from_instance_type( + instance_type, clouds='vast') + + @classmethod + def get_zone_shell_cmd(cls) -> Optional[str]: + return None + + def make_deploy_resources_variables( + self, + resources: 'resources_lib.Resources', + cluster_name: resources_utils.ClusterName, + region: 'clouds.Region', + zones: Optional[List['clouds.Zone']], + num_nodes: int, + dryrun: bool = False) -> Dict[str, Optional[str]]: + del zones, dryrun, cluster_name, num_nodes # unused + + r = resources + acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + custom_resources = resources_utils.make_ray_custom_resources_str( + acc_dict) + + if r.image_id is None: + image_id = 'vastai/base:0.0.2' + elif r.extract_docker_image() is not None: + image_id = r.extract_docker_image() + else: + image_id = r.image_id[r.region] + + return { + 'instance_type': resources.instance_type, + 'custom_resources': custom_resources, + 'region': region.name, + 'image_id': image_id, + } + + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources' + ) -> 'resources_utils.FeasibleResources': + """Returns a list of feasible resources for the given resources.""" + if resources.instance_type is not None: + assert resources.is_launchable(), resources + resources = resources.copy(accelerators=None) + return resources_utils.FeasibleResources([resources], [], None) + + def _make(instance_list): + resource_list = [] + for instance_type in instance_list: + r = resources.copy( + cloud=Vast(), + instance_type=instance_type, + accelerators=None, + cpus=None, + ) + resource_list.append(r) + return resource_list + + # Currently, handle a filter on accelerators only. + accelerators = resources.accelerators + if accelerators is None: + # Return a default instance type + default_instance_type = Vast.get_default_instance_type( + cpus=resources.cpus, + memory=resources.memory, + disk_tier=resources.disk_tier) + if default_instance_type is None: + # TODO: Add hints to all return values in this method to help + # users understand why the resources are not launchable. + return resources_utils.FeasibleResources([], [], None) + else: + return resources_utils.FeasibleResources( + _make([default_instance_type]), [], None) + + assert len(accelerators) == 1, resources + acc, acc_count = list(accelerators.items())[0] + (instance_list, fuzzy_candidate_list + ) = service_catalog.get_instance_type_for_accelerator( + acc, + acc_count, + use_spot=resources.use_spot, + cpus=resources.cpus, + region=resources.region, + zone=resources.zone, + clouds='vast') + if instance_list is None: + return resources_utils.FeasibleResources([], fuzzy_candidate_list, + None) + return resources_utils.FeasibleResources(_make(instance_list), + fuzzy_candidate_list, None) + + @classmethod + def check_credentials(cls) -> Tuple[bool, Optional[str]]: + """ Verify that the user has valid credentials for Vast. """ + try: + import vastai_sdk as _vast # pylint: disable=import-outside-toplevel + vast = _vast.VastAI() + + # We only support file pased credential passing + if vast.creds_source != 'FILE': + return False, ( + 'error \n' # First line is indented by 4 spaces + ' Credentials can be set up by running: \n' + ' $ pip install vastai\n' + ' $ echo [key] > ~/.vast_api_key\n' + ' For more information, see https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#vast' # pylint: disable=line-too-long + ) + + return True, None + + except ImportError: + return False, ('Failed to import vast. ' + 'To install, run: pip install skypilot[vast]') + + def get_credential_file_mounts(self) -> Dict[str, str]: + return { + '~/.config/vastai/vast_api_key': '~/.config/vastai/vast_api_key' + } + + @classmethod + def get_user_identities(cls) -> Optional[List[List[str]]]: + # NOTE: used for very advanced SkyPilot functionality + # Can implement later if desired + return None + + def instance_type_exists(self, instance_type: str) -> bool: + return service_catalog.instance_type_exists(instance_type, 'vast') + + def validate_region_zone(self, region: Optional[str], zone: Optional[str]): + return service_catalog.validate_region_zone(region, zone, clouds='vast') + + @classmethod + def get_image_size(cls, image_id: str, region: Optional[str]) -> float: + # TODO: use 0.0 for now to allow all images. We should change this to + # return the docker image size. + return 0.0 diff --git a/sky/provision/__init__.py b/sky/provision/__init__.py index 02a627b08a3..83452a0077b 100644 --- a/sky/provision/__init__.py +++ b/sky/provision/__init__.py @@ -22,6 +22,7 @@ from sky.provision import lambda_cloud from sky.provision import oci from sky.provision import runpod +from sky.provision import vast from sky.provision import vsphere from sky.utils import command_runner from sky.utils import timeline diff --git a/sky/provision/vast/__init__.py b/sky/provision/vast/__init__.py new file mode 100644 index 00000000000..2843fd65f20 --- /dev/null +++ b/sky/provision/vast/__init__.py @@ -0,0 +1,10 @@ +"""Vast provisioner for SkyPilot.""" + +from sky.provision.vast.config import bootstrap_instances +from sky.provision.vast.instance import cleanup_ports +from sky.provision.vast.instance import get_cluster_info +from sky.provision.vast.instance import query_instances +from sky.provision.vast.instance import run_instances +from sky.provision.vast.instance import stop_instances +from sky.provision.vast.instance import terminate_instances +from sky.provision.vast.instance import wait_instances diff --git a/sky/provision/vast/config.py b/sky/provision/vast/config.py new file mode 100644 index 00000000000..9cb337eb91d --- /dev/null +++ b/sky/provision/vast/config.py @@ -0,0 +1,11 @@ +"""Vast configuration bootstrapping.""" + +from sky.provision import common + + +def bootstrap_instances( + region: str, cluster_name: str, + config: common.ProvisionConfig) -> common.ProvisionConfig: + """Bootstraps instances for the given cluster.""" + del region, cluster_name # unused + return config diff --git a/sky/provision/vast/instance.py b/sky/provision/vast/instance.py new file mode 100644 index 00000000000..3dae257d6f0 --- /dev/null +++ b/sky/provision/vast/instance.py @@ -0,0 +1,247 @@ +"""Vast instance provisioning.""" +import time +from typing import Any, Dict, List, Optional + +from sky import sky_logging +from sky import status_lib +from sky.provision import common +from sky.provision.vast import utils +from sky.utils import common_utils +from sky.utils import ux_utils + +POLL_INTERVAL = 10 + +logger = sky_logging.init_logger(__name__) +# a much more convenient method +status_filter = lambda machine_dict, stat_list: { + k: v for k, v in machine_dict.items() if v['status'] in stat_list +} + + +def _filter_instances(cluster_name_on_cloud: str, + status_filters: Optional[List[str]], + head_only: bool = False) -> Dict[str, Any]: + + instances = utils.list_instances() + possible_names = [f'{cluster_name_on_cloud}-head'] + if not head_only: + possible_names.append(f'{cluster_name_on_cloud}-worker') + + filtered_instances = {} + for instance_id, instance in instances.items(): + if (status_filters is not None and + instance['status'] not in status_filters): + continue + if instance.get('name') in possible_names: + filtered_instances[instance_id] = instance + return filtered_instances + + +def _get_head_instance_id(instances: Dict[str, Any]) -> Optional[str]: + for inst_id, inst in instances.items(): + if inst['name'].endswith('-head'): + return inst_id + return None + + +def run_instances(region: str, cluster_name_on_cloud: str, + config: common.ProvisionConfig) -> common.ProvisionRecord: + """Runs instances for the given cluster.""" + pending_status = ['CREATED', 'RESTARTING'] + + created_instance_ids = [] + instances: Dict[str, Any] = {} + + while True: + instances = _filter_instances(cluster_name_on_cloud, None) + if not status_filter(instances, pending_status): + break + logger.info(f'Waiting for {len(instances)} instances to be ready.') + time.sleep(POLL_INTERVAL) + + running_instances = status_filter(instances, ['RUNNING']) + head_instance_id = _get_head_instance_id(running_instances) + stopped_instances = status_filter(instances, ['EXITED', 'STOPPED']) + + if config.resume_stopped_nodes and stopped_instances: + for instance in stopped_instances.values(): + utils.start(instance['id']) + else: + to_start_count = config.count - (len(running_instances) + + len(stopped_instances)) + if to_start_count < 0: + raise RuntimeError(f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes,' + f'but {config.count} are required.') + if to_start_count == 0: + if head_instance_id is None: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} has no head node.') + logger.info( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(running_instances)} nodes, no need to start more.') + return common.ProvisionRecord(provider_name='vast', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=[]) + + for _ in range(to_start_count): + node_type = 'head' if head_instance_id is None else 'worker' + try: + instance_id = utils.launch( + name=f'{cluster_name_on_cloud}-{node_type}', + instance_type=config.node_config['InstanceType'], + region=region, + disk_size=config.node_config['DiskSize'], + preemptible=config.node_config['Preemptible'], + image_name=config.node_config['ImageId']) + except Exception as e: # pylint: disable=broad-except + logger.warning(f'run_instances error: {e}') + raise + logger.info(f'Launched instance {instance_id}.') + created_instance_ids.append(instance_id) + if head_instance_id is None: + head_instance_id = instance_id + + # Wait for instances to be ready. + while True: + instances = _filter_instances(cluster_name_on_cloud, ['RUNNING']) + ready_instance_cnt = 0 + for instance_id, instance in instances.items(): + if instance.get('ssh_port') is not None: + ready_instance_cnt += 1 + logger.info('Waiting for instances to be ready: ' + f'({ready_instance_cnt}/{config.count}).') + if ready_instance_cnt == config.count: + break + + time.sleep(POLL_INTERVAL) + + head_instance_id = _get_head_instance_id(utils.list_instances()) + assert head_instance_id is not None, 'head_instance_id should not be None' + return common.ProvisionRecord(provider_name='vast', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance_id, + resumed_instance_ids=[], + created_instance_ids=created_instance_ids) + + +def wait_instances(region: str, cluster_name_on_cloud: str, + state: Optional[status_lib.ClusterStatus]) -> None: + del region, cluster_name_on_cloud, state + + +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + return action_instances('stop', cluster_name_on_cloud, provider_config, + worker_only) + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + return action_instances('remove', cluster_name_on_cloud, provider_config, + worker_only) + + +def action_instances( + fn: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + del provider_config # unused + instances = _filter_instances(cluster_name_on_cloud, None) + for inst_id, inst in instances.items(): + logger.debug(f'Instance {fn} {inst_id}: {inst}') + if worker_only and inst['name'].endswith('-head'): + continue + try: + getattr(utils, fn)(inst_id) + except Exception as e: # pylint: disable=broad-except + with ux_utils.print_exception_no_traceback(): + raise RuntimeError( + f'Failed to {fn} instance {inst_id}: ' + f'{common_utils.format_exception(e, use_bracket=False)}' + ) from e + + +def get_cluster_info( + region: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None) -> common.ClusterInfo: + del region # unused + running_instances = _filter_instances(cluster_name_on_cloud, ['RUNNING']) + instances: Dict[str, List[common.InstanceInfo]] = {} + head_instance_id = None + for instance_id, instance_info in running_instances.items(): + instances[instance_id] = [ + common.InstanceInfo( + instance_id=instance_id, + internal_ip=instance_info['local_ipaddrs'].strip(), + external_ip=instance_info['public_ipaddr'], + ssh_port=instance_info['ports']['22/tcp'][0]['HostPort'], + tags={}, + ) + ] + if instance_info['name'].endswith('-head'): + head_instance_id = instance_id + + return common.ClusterInfo( + instances=instances, + head_instance_id=head_instance_id, + provider_name='vast', + provider_config=provider_config, + ) + + +def open_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + raise NotImplementedError('open_ports is not supported for Vast') + + +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + instances = _filter_instances(cluster_name_on_cloud, None) + # "running", "frozen", "stopped", "unknown", "loading" + status_map = { + 'LOADING': status_lib.ClusterStatus.INIT, + 'EXITED': status_lib.ClusterStatus.STOPPED, + 'STOPPED': status_lib.ClusterStatus.STOPPED, + 'RUNNING': status_lib.ClusterStatus.UP, + } + statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} + for inst_id, inst in instances.items(): + status = status_map[inst['status']] + if non_terminated_only and status is None: + continue + statuses[inst_id] = status + return statuses + + +def cleanup_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + del cluster_name_on_cloud, ports, provider_config # Unused. diff --git a/sky/provision/vast/utils.py b/sky/provision/vast/utils.py new file mode 100644 index 00000000000..e9fa445e567 --- /dev/null +++ b/sky/provision/vast/utils.py @@ -0,0 +1,153 @@ +# pylint: disable=assignment-from-no-return +# +# The pylint exception above is an accomodation for +# false positives generated by pylint for the Vast +# python sdk. +# +"""Vast library wrapper for SkyPilot.""" +from typing import Any, Dict, List + +from sky import sky_logging +from sky.adaptors import vast + +logger = sky_logging.init_logger(__name__) + + +def list_instances() -> Dict[str, Dict[str, Any]]: + """Lists instances associated with API key.""" + instances = vast.vast().show_instances() + + instance_dict: Dict[str, Dict[str, Any]] = {} + for instance in instances: + instance['id'] = str(instance['id']) + info = instance + + if isinstance(instance['actual_status'], str): + info['status'] = instance['actual_status'].upper() + else: + info['status'] = 'UNKNOWN' + info['name'] = instance['label'] + + instance_dict[instance['id']] = info + + return instance_dict + + +def launch(name: str, instance_type: str, region: str, disk_size: int, + image_name: str, preemptible: bool) -> str: + """Launches an instance with the given parameters. + + Converts the instance_type to the Vast GPU name, finds the specs for the + GPU, and launches the instance. + + Notes: + + * `disk_size`: we look for instances that are of the requested + size or greater than it. For instance, `disk_size=100` might + return something with `disk_size` at 102 or even 1000. + + The disk size {xx} GB is not exactly matched the requested + size {yy} GB. It is possible to charge extra cost on disk. + + * `geolocation`: Geolocation on Vast can be as specific as the + host chooses to be. They can say, for instance, "Yutakachō, + Shinagawa District, Tokyo, JP." Such a specific geolocation + as ours would fail to return this host in a simple string + comparison if a user searched for "JP". + + Since regardless of specificity, all our geolocations end + in two-letter country codes we just snip that to conform + to how many providers state their geolocation. + + * Since the catalog is cached, we can't gaurantee availability + of any machine at the point of inquiry. As a consequence we + search for the machine again and potentially return a failure + if there is no availability. + + * Vast instance types are an invention for skypilot. Refer to + service_catalog/vast_catalog.py for the current construction + of the type. + + """ + gpu_name = instance_type.split('-')[1].replace('_', ' ') + num_gpus = int(instance_type.split('-')[0].replace('x', '')) + + query = ' '.join([ + f'geolocation="{region[-2:]}"', + f'disk_space>={disk_size}', + f'num_gpus={num_gpus}', + f'gpu_name="{gpu_name}"', + ]) + + instance_list = vast.vast().search_offers(query=query) + + if isinstance(instance_list, int) or len(instance_list) == 0: + return '' + + instance_touse = instance_list[0] + + launch_params = { + 'id': instance_touse['id'], + 'direct': True, + 'ssh': True, + 'env': '-e __SOURCE=skypilot', + 'onstart_cmd': ';'.join([ + 'touch ~/.no_auto_tmux', + f'echo "{vast.vast().api_key_access}" > ~/.vast_api_key', + ]), + 'label': name, + 'image': image_name + } + + if preemptible: + launch_params['min_bid'] = instance_touse['min_bid'] + + new_instance_contract = vast.vast().create_instance(**launch_params) + + new_instance = vast.vast().show_instance( + id=new_instance_contract['new_contract']) + + return new_instance['id'] + + +def start(instance_id: str) -> None: + """Starts the given instance.""" + vast.vast().start_instance(id=instance_id) + + +def stop(instance_id: str) -> None: + """Stops the given instance.""" + vast.vast().stop_instance(id=instance_id) + + +def remove(instance_id: str) -> None: + """Terminates the given instance.""" + vast.vast().destroy_instance(id=instance_id) + + +def get_ssh_ports(cluster_name) -> List[int]: + """Gets the SSH ports for the given cluster.""" + logger.debug(f'Getting SSH ports for cluster {cluster_name}.') + + instances = list_instances() + possible_names = [f'{cluster_name}-head', f'{cluster_name}-worker'] + + ssh_ports = [] + + for instance in instances.values(): + if instance['name'] in possible_names: + ssh_ports.append((instance['name'], instance['ssh_port'])) + assert ssh_ports, ( + f'Could not find any instances for cluster {cluster_name}.') + + # So now we have + # [(name, port) ... ] + # + # We want to put head first and otherwise sort numerically + # and then extract the ports. + ssh_ports = list( + x[1] + for x in sorted(ssh_ports, + key=lambda x: -1 + if x[0].endswith('head') else int(x[0].split('-')[-1]))) + return ssh_ports diff --git a/sky/setup_files/dependencies.py b/sky/setup_files/dependencies.py index 13b99770e5b..d0a554c75b0 100644 --- a/sky/setup_files/dependencies.py +++ b/sky/setup_files/dependencies.py @@ -130,6 +130,7 @@ 'cudo': ['cudo-compute>=0.1.10'], 'paperspace': [], # No dependencies needed for paperspace 'do': ['pydo>=0.3.0', 'azure-core>=1.24.0', 'azure-common'], + 'vast': ['vastai-sdk>=0.1.7'], 'vsphere': [ 'pyvmomi==8.0.1.0.2', # vsphere-automation-sdk is also required, but it does not have diff --git a/sky/templates/vast-ray.yml.j2 b/sky/templates/vast-ray.yml.j2 new file mode 100644 index 00000000000..cc5a63a9f55 --- /dev/null +++ b/sky/templates/vast-ray.yml.j2 @@ -0,0 +1,70 @@ +cluster_name: {{cluster_name_on_cloud}} + +# The maximum number of workers nodes to launch in addition to the head node. +max_workers: {{num_nodes - 1}} +upscaling_speed: {{num_nodes - 1}} +idle_timeout_minutes: 60 + +provider: + type: external + module: sky.provision.vast + region: "{{region}}" + disable_launch_config_check: true + +auth: + ssh_user: root + ssh_private_key: {{ssh_private_key}} + +available_node_types: + ray_head_default: + resources: {} + node_config: + InstanceType: {{instance_type}} + DiskSize: {{disk_size}} + ImageId: {{image_id}} + Preemptible: {{use_spot}} + PublicKey: |- + skypilot:ssh_public_key_content + +head_node_type: ray_head_default + +# Format: `REMOTE_PATH : LOCAL_PATH` +file_mounts: { + "{{sky_ray_yaml_remote_path}}": "{{sky_ray_yaml_local_path}}", + "{{sky_remote_path}}/{{sky_wheel_hash}}": "{{sky_local_path}}", +{%- for remote_path, local_path in credentials.items() %} + "{{remote_path}}": "{{local_path}}", +{%- endfor %} +} + +rsync_exclude: [] + +initialization_commands: [] + +# List of shell commands to run to set up nodes. +# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH +# connection, which is expensive. Try your best to co-locate commands into fewer +# items! +# +# Increment the following for catching performance bugs easier: +# current num items (num SSH connections): 1 +setup_commands: + # Create ~/.ssh/config file in case the file does not exist in the image. + # Line 'rm ..': there is another installation of pip. + # Line 'sudo bash ..': set the ulimit as suggested by ray docs for performance. https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#system-configuration + # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. + # Line 'mkdir -p ..': disable host key check + # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` + - {%- for initial_setup_command in initial_setup_commands %} + {{ initial_setup_command }} + {%- endfor %} + mkdir -p ~/.ssh; touch ~/.ssh/config; which patch > /dev/null || sudo apt install -y patch; + {{ conda_installation_commands }} + {{ ray_skypilot_installation_commands }} + sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; + sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; + (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; + + +# Command to start ray clusters are now placed in `sky.provision.instance_setup`. +# We do not need to list it here anymore. diff --git a/sky/utils/controller_utils.py b/sky/utils/controller_utils.py index acb636893a5..23754f4af99 100644 --- a/sky/utils/controller_utils.py +++ b/sky/utils/controller_utils.py @@ -260,6 +260,11 @@ def _get_cloud_dependencies_installation_commands( if controller != Controllers.JOBS_CONTROLLER: # We only need IBM deps on the jobs controller. cloud_python_dependencies = [] + elif isinstance(cloud, clouds.Vast): + step_prefix = prefix_str.replace('', str(len(commands) + 1)) + commands.append(f'echo -en "\\r{step_prefix}Vast{empty_str}" && ' + 'pip list | grep vastai_sdk > /dev/null 2>&1 || ' + 'pip install "vastai_sdk>=0.1.2" > /dev/null 2>&1') python_packages.update(cloud_python_dependencies) diff --git a/tests/conftest.py b/tests/conftest.py index af6367fdac6..d6fcf74e93f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,7 +22,8 @@ # --managed-jobs. all_clouds_in_smoke_tests = [ 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', 'do', - 'kubernetes', 'vsphere', 'cudo', 'fluidstack', 'paperspace', 'runpod' + 'kubernetes', 'vsphere', 'cudo', 'fluidstack', 'paperspace', 'runpod', + 'vast' ] default_clouds_to_run = ['aws', 'azure'] @@ -44,6 +45,7 @@ 'cudo': 'cudo', 'paperspace': 'paperspace', 'do': 'do', + 'vast': 'vast', 'runpod': 'runpod' } diff --git a/tests/smoke_tests/smoke_tests_utils.py b/tests/smoke_tests/smoke_tests_utils.py index 14f2b94a5d4..59625a439e0 100644 --- a/tests/smoke_tests/smoke_tests_utils.py +++ b/tests/smoke_tests/smoke_tests_utils.py @@ -276,7 +276,7 @@ def run_one_test(test: Test) -> Tuple[int, str, str]: write = log_file.write flush = log_file.flush subprocess_out = log_file - test.echo(f'Test started. Log: less {log_file.name}') + test.echo(f'Test started. Log: less -r {log_file.name}') env_dict = os.environ.copy() if test.env: @@ -310,7 +310,7 @@ def run_one_test(test: Test) -> Tuple[int, str, str]: style = colorama.Style fore = colorama.Fore - outcome = (f'{fore.RED}Failed{style.RESET_ALL}' + outcome = (f'{fore.RED}Failed{style.RESET_ALL} (returned {proc.returncode})' if proc.returncode else f'{fore.GREEN}Passed{style.RESET_ALL}') reason = f'\nReason: {command}' if proc.returncode else '' msg = (f'{outcome}.' @@ -318,7 +318,7 @@ def run_one_test(test: Test) -> Tuple[int, str, str]: if log_to_stdout: test.echo(msg) else: - msg += f'\nLog: less {log_file.name}\n' + msg += f'\nLog: less -r {log_file.name}\n' test.echo(msg) write(msg) @@ -336,7 +336,7 @@ def run_one_test(test: Test) -> Tuple[int, str, str]: if log_to_stdout: raise Exception(f'test failed') else: - raise Exception(f'test failed: less {log_file.name}') + raise Exception(f'test failed: less -r {log_file.name}') def get_aws_region_for_quota_failover() -> Optional[str]: @@ -423,7 +423,7 @@ def get_gcp_region_for_quota_failover() -> Optional[str]: # └── To teardown the cluster: sky down test 'echo "$s" && echo "==Validating launching==" && ' 'echo "$s" | grep -A 1 "Launching on" | grep "is up." && ' - 'echo "$s" && echo "==Validating setup output==" && ' + 'echo "==Validating setup output==" && ' 'echo "$s" | grep -A 1 "Running setup on" | grep "running setup" && ' 'echo "==Validating running output hints==" && echo "$s" | ' 'grep -A 1 "Job submitted, ID:" | ' diff --git a/tests/smoke_tests/test_basic.py b/tests/smoke_tests/test_basic.py index 30576d3272f..82fef12e9a8 100644 --- a/tests/smoke_tests/test_basic.py +++ b/tests/smoke_tests/test_basic.py @@ -34,6 +34,7 @@ # ---------- Dry run: 2 Tasks in a chain. ---------- +@pytest.mark.no_vast #requires GCP and AWS set up @pytest.mark.no_fluidstack #requires GCP and AWS set up def test_example_app(): test = smoke_tests_utils.Test( @@ -118,6 +119,7 @@ def test_launch_fast(generic_cloud: str): @pytest.mark.no_fluidstack @pytest.mark.no_lambda_cloud @pytest.mark.no_ibm +@pytest.mark.no_vast # Dynamic ports are needed @pytest.mark.no_kubernetes def test_launch_fast_with_autostop(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() @@ -155,6 +157,7 @@ def test_launch_fast_with_autostop(generic_cloud: str): @pytest.mark.no_fluidstack # FluidStack does not support stopping instances in SkyPilot implementation @pytest.mark.no_lambda_cloud # Lambda Cloud does not support stopping instances @pytest.mark.no_kubernetes # Kubernetes does not support stopping instances +@pytest.mark.no_vast # This requires port opening def test_stale_job(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -176,6 +179,7 @@ def test_stale_job(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast @pytest.mark.aws def test_aws_stale_job_manual_restart(): name = smoke_tests_utils.get_cluster_name() @@ -213,6 +217,7 @@ def test_aws_stale_job_manual_restart(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast @pytest.mark.gcp def test_gcp_stale_job_manual_restart(): name = smoke_tests_utils.get_cluster_name() @@ -250,6 +255,7 @@ def test_gcp_stale_job_manual_restart(): # ---------- Check Sky's environment variables; workdir. ---------- @pytest.mark.no_fluidstack # Requires amazon S3 @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet def test_env_check(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() total_timeout_minutes = 25 if generic_cloud == 'azure' else 15 @@ -267,6 +273,7 @@ def test_env_check(generic_cloud: str): # ---------- CLI logs ---------- @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_logs instead. +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet. def test_cli_logs(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() num_nodes = 2 @@ -420,6 +427,7 @@ def test_load_dump_yaml_config_equivalent(self): # ---------- Testing Multiple Accelerators ---------- +@pytest.mark.no_vast # Vast has low availability for K80 GPUs @pytest.mark.no_fluidstack # Fluidstack does not support K80 gpus for now @pytest.mark.no_paperspace # Paperspace does not support K80 gpus @pytest.mark.no_do # DO does not support K80s @@ -437,6 +445,7 @@ def test_multiple_accelerators_ordered(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast has low availability for T4 GPUs @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs @pytest.mark.no_do # DO does not have multiple accelerators @@ -454,6 +463,7 @@ def test_multiple_accelerators_ordered_with_default(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast has low availability for T4 GPUs @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs @pytest.mark.no_do # DO does not have multiple accelerators @@ -470,6 +480,7 @@ def test_multiple_accelerators_unordered(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast has low availability for T4 GPUs @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs @pytest.mark.no_do # DO does not support multiple accelerators @@ -487,6 +498,7 @@ def test_multiple_accelerators_unordered_with_default(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Requires other clouds to be enabled @pytest.mark.no_fluidstack # Requires other clouds to be enabled def test_multiple_resources(): name = smoke_tests_utils.get_cluster_name() @@ -503,6 +515,7 @@ def test_multiple_resources(): # ---------- Sky Benchmark ---------- @pytest.mark.no_fluidstack # Requires other clouds to be enabled +@pytest.mark.no_vast # Requires other clouds to be enabled @pytest.mark.no_paperspace # Requires other clouds to be enabled @pytest.mark.no_kubernetes @pytest.mark.aws # SkyBenchmark requires S3 access diff --git a/tests/smoke_tests/test_cluster_job.py b/tests/smoke_tests/test_cluster_job.py index 1fbb1b3d875..8acf519c777 100644 --- a/tests/smoke_tests/test_cluster_job.py +++ b/tests/smoke_tests/test_cluster_job.py @@ -38,6 +38,7 @@ # ---------- Job Queue. ---------- +@pytest.mark.no_vast # Vast has low availability of T4 GPUs @pytest.mark.no_fluidstack # FluidStack DC has low availability of T4 GPUs @pytest.mark.no_lambda_cloud # Lambda Cloud does not have T4 gpus @pytest.mark.no_ibm # IBM Cloud does not have T4 gpus. run test_ibm_job_queue instead @@ -76,6 +77,7 @@ def test_job_queue(generic_cloud: str, accelerator: Dict[str, str]): @pytest.mark.no_fluidstack # FluidStack does not support docker for now @pytest.mark.no_lambda_cloud # Doesn't support Lambda Cloud for now @pytest.mark.no_ibm # Doesn't support IBM Cloud for now +@pytest.mark.no_vast # Vast has low availability of T4 GPUs @pytest.mark.no_paperspace # Paperspace doesn't have T4 GPUs @pytest.mark.no_scp # Doesn't support SCP for now @pytest.mark.no_oci # Doesn't support OCI for now @@ -213,12 +215,14 @@ def test_scp_job_queue(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast has low availability of T4 GPUs @pytest.mark.no_fluidstack # FluidStack DC has low availability of T4 GPUs @pytest.mark.no_lambda_cloud # Lambda Cloud does not have T4 gpus @pytest.mark.no_ibm # IBM Cloud does not have T4 gpus. run test_ibm_job_queue_multinode instead @pytest.mark.no_paperspace # Paperspace does not have T4 gpus. @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_oci # OCI Cloud does not have T4 gpus. +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet @pytest.mark.no_kubernetes # Kubernetes not support num_nodes > 1 yet @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) def test_job_queue_multinode(generic_cloud: str, accelerator: Dict[str, str]): @@ -262,6 +266,7 @@ def test_job_queue_multinode(generic_cloud: str, accelerator: Dict[str, str]): @pytest.mark.no_fluidstack # No FluidStack VM has 8 CPUs @pytest.mark.no_lambda_cloud # No Lambda Cloud VM has 8 CPUs +@pytest.mark.no_vast # Vast doesn't guarantee exactly 8 CPUs, only at least. def test_large_job_queue(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -387,6 +392,7 @@ def test_docker_preinstalled_package(generic_cloud: str): # ---------- Submitting multiple tasks to the same cluster. ---------- +@pytest.mark.no_vast # Vast has low availability of T4 GPUs @pytest.mark.no_fluidstack # FluidStack DC has low availability of T4 GPUs @pytest.mark.no_lambda_cloud # Lambda Cloud does not have T4 gpus @pytest.mark.no_paperspace # Paperspace does not have T4 gpus @@ -432,6 +438,7 @@ def test_multi_echo(generic_cloud: str): # ---------- Task: 1 node training. ---------- +@pytest.mark.no_vast # Vast has low availability of T4 GPUs @pytest.mark.no_fluidstack # Fluidstack does not have T4 gpus for now @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @@ -587,6 +594,7 @@ def test_tpu_pod_slice_gke(): # ---------- Simple apps. ---------- +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet def test_multi_hostname(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() @@ -607,6 +615,7 @@ def test_multi_hostname(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet def test_multi_node_failure(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() @@ -1054,6 +1063,7 @@ def test_azure_start_stop(): @pytest.mark.no_ibm # FIX(IBM) sporadically fails, as restarted workers stay uninitialized indefinitely @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_kubernetes # Kubernetes does not autostop yet +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet def test_autostop(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() # Azure takes ~ 7m15s (435s) to autostop a VM, so here we use 600 to ensure @@ -1121,6 +1131,7 @@ def test_autostop(generic_cloud: str): # ---------- Testing Autodowning ---------- @pytest.mark.no_fluidstack # FluidStack does not support stopping in SkyPilot implementation @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_autodown instead. +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet def test_autodown(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() # Azure takes ~ 13m30s (810s) to autodown a VM, so here we use 900 to ensure @@ -1242,6 +1253,7 @@ def test_cancel_azure(): @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_paperspace # Paperspace has `gnome-shell` on nvidia-smi @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_vast # Vast does not support num_nodes > 1 yet @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) def test_cancel_pytorch(generic_cloud: str, accelerator: Dict[str, str]): accelerator = accelerator.get(generic_cloud, 'T4') diff --git a/tests/smoke_tests/test_managed_job.py b/tests/smoke_tests/test_managed_job.py index 1656e136398..6dcec938a02 100644 --- a/tests/smoke_tests/test_managed_job.py +++ b/tests/smoke_tests/test_managed_job.py @@ -96,6 +96,7 @@ def test_managed_jobs(generic_cloud: str): @pytest.mark.no_paperspace # Paperspace does not support spot instances @pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances @pytest.mark.no_do # DO does not support spot instances +@pytest.mark.no_vast # The pipeline.yaml uses other clouds @pytest.mark.managed_jobs def test_job_pipeline(generic_cloud: str): """Test a job pipeline.""" @@ -103,7 +104,7 @@ def test_job_pipeline(generic_cloud: str): test = smoke_tests_utils.Test( 'spot-pipeline', [ - f'sky jobs launch -n {name} tests/test_yamls/pipeline.yaml -y -d', + f'sky jobs launch -n {name} --cloud {generic_cloud} tests/test_yamls/pipeline.yaml -y -d', 'sleep 5', f'{smoke_tests_utils.GET_JOB_QUEUE} | grep {name} | head -n1 | grep "STARTING\|RUNNING"', # `grep -A 4 {name}` finds the job with {name} and the 4 lines @@ -173,7 +174,7 @@ def test_managed_jobs_pipeline_failed_setup(generic_cloud: str): test = smoke_tests_utils.Test( 'managed_jobs_pipeline_failed_setup', [ - f'sky jobs launch -n {name} -y -d tests/test_yamls/failed_setup_pipeline.yaml', + f'sky jobs launch -n {name} --cloud {generic_cloud} -y -d tests/test_yamls/failed_setup_pipeline.yaml', smoke_tests_utils. get_cmd_wait_until_managed_job_status_contains_matching_job_name( job_name=name, @@ -689,6 +690,7 @@ def test_managed_jobs_retry_logs(): @pytest.mark.no_paperspace # Paperspace does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances @pytest.mark.no_do # DO does not support spot instances +@pytest.mark.no_vast # Uses other clouds @pytest.mark.managed_jobs def test_managed_jobs_storage(generic_cloud: str): """Test storage with managed job""" @@ -900,6 +902,7 @@ def test_managed_jobs_inline_env(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # The test uses other clouds @pytest.mark.managed_jobs def test_managed_jobs_logs_sync_down(): name = smoke_tests_utils.get_cluster_name() diff --git a/tests/smoke_tests/test_mount_and_storage.py b/tests/smoke_tests/test_mount_and_storage.py index 3f2ddb16c57..881d17bce58 100644 --- a/tests/smoke_tests/test_mount_and_storage.py +++ b/tests/smoke_tests/test_mount_and_storage.py @@ -47,6 +47,7 @@ # ---------- file_mounts ---------- +@pytest.mark.no_vast # VAST does not support num_nodes > 1 yet @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet. Run test_scp_file_mounts instead. def test_file_mounts(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() @@ -104,6 +105,7 @@ def test_oci_mounts(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Requires GCP @pytest.mark.no_fluidstack # Requires GCP to be enabled def test_using_file_mounts_with_env_vars(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() @@ -319,6 +321,7 @@ def test_kubernetes_context_switch(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Requires AWS @pytest.mark.parametrize( 'image_id', [ @@ -1058,6 +1061,7 @@ def tmp_public_storage_obj(self, request): # This does not require any deletion logic because it is a public bucket # and should not get added to global_user_state. + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, @@ -1083,6 +1087,7 @@ def test_new_bucket_creation_and_deletion(self, tmp_local_storage_obj, out = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_local_storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ pytest.param(storage_lib.StoreType.S3, marks=pytest.mark.aws), @@ -1136,6 +1141,7 @@ def test_bucket_sub_path(self, tmp_local_storage_obj_with_sub_path, assert tmp_local_storage_obj_with_sub_path.name not in out.decode( 'utf-8') + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.xdist_group('multiple_bucket_deletion') @pytest.mark.parametrize('store_type', [ @@ -1178,6 +1184,7 @@ def test_multiple_buckets_creation_and_deletion( ] assert all([item not in out for item in storage_obj_name]) + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, @@ -1204,6 +1211,7 @@ def test_upload_source_with_spaces(self, store_type, ] assert all([item in out for item in storage_obj_names]) + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, @@ -1235,6 +1243,7 @@ def test_bucket_external_deletion(self, tmp_scratch_storage_obj, out = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_scratch_storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, @@ -1254,6 +1263,7 @@ def test_bucket_bulk_deletion(self, store_type, tmp_bulk_del_storage_obj): output = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_bulk_del_storage_obj.name not in output.decode('utf-8') + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize( 'tmp_public_storage_obj, store_type', @@ -1274,6 +1284,7 @@ def test_public_bucket(self, tmp_public_storage_obj, store_type): out = subprocess.check_output(['sky', 'storage', 'ls']) assert tmp_public_storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize( 'nonexist_bucket_url', @@ -1357,6 +1368,7 @@ def test_nonexistent_bucket(self, nonexist_bucket_url): source=nonexist_bucket_url.format( random_name=nonexist_bucket_name)) + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize( 'private_bucket', @@ -1383,6 +1395,7 @@ def test_private_bucket(self, private_bucket): name=private_bucket_name)): storage_obj = storage_lib.Storage(source=private_bucket) + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('ext_bucket_fixture, store_type', [('tmp_awscli_bucket', storage_lib.StoreType.S3), @@ -1424,6 +1437,7 @@ def test_upload_to_existing_bucket(self, ext_bucket_fixture, request, out = subprocess.check_output(['sky', 'storage', 'ls']) assert storage_obj.name not in out.decode('utf-8') + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack def test_copy_mount_existing_storage(self, tmp_copy_mnt_existing_storage_obj): @@ -1436,6 +1450,7 @@ def test_copy_mount_existing_storage(self, out = subprocess.check_output(['sky', 'storage', 'ls']).decode('utf-8') assert storage_name in out, f'Storage {storage_name} not found in sky storage ls.' + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('store_type', [ storage_lib.StoreType.S3, storage_lib.StoreType.GCS, @@ -1464,6 +1479,7 @@ def test_list_source(self, tmp_local_list_storage_obj, store_type): 'File not found in bucket - output was : {}'.format(out.decode ('utf-8')) + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('invalid_name_list, store_type', [(AWS_INVALID_NAMES, storage_lib.StoreType.S3), @@ -1485,6 +1501,7 @@ def test_invalid_names(self, invalid_name_list, store_type): storage_obj = storage_lib.Storage(name=name) storage_obj.add_store(store_type) + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize( 'gitignore_structure, store_type', @@ -1563,6 +1580,7 @@ def test_externally_created_bucket_mount_without_source( if handle: storage_obj.delete() + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('region', [ 'ap-northeast-1', 'ap-northeast-2', 'ap-northeast-3', 'ap-south-1', @@ -1595,6 +1613,7 @@ def test_aws_regions(self, tmp_local_storage_obj, region): assert 'tmp-file' in output, ( f'tmp-file not found in bucket - output of {ls_cmd} was: {output}') + @pytest.mark.no_vast # Requires AWS or S3 @pytest.mark.no_fluidstack @pytest.mark.parametrize('region', [ 'northamerica-northeast1', 'northamerica-northeast2', 'us-central1', diff --git a/tests/smoke_tests/test_sky_serve.py b/tests/smoke_tests/test_sky_serve.py index 3ba36d8a092..cc8bc03d61c 100644 --- a/tests/smoke_tests/test_sky_serve.py +++ b/tests/smoke_tests/test_sky_serve.py @@ -198,6 +198,7 @@ def test_skyserve_oci_http(): @pytest.mark.no_fluidstack # Fluidstack does not support T4 gpus for now +@pytest.mark.no_vast # Vast has low availability of T4 GPUs @pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) @pytest.mark.serve def test_skyserve_llm(generic_cloud: str, accelerator: Dict[str, str]): @@ -258,6 +259,7 @@ def test_skyserve_spot_recovery(): @pytest.mark.no_fluidstack # Fluidstack does not support spot instances +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve @pytest.mark.no_kubernetes @pytest.mark.no_do @@ -327,6 +329,7 @@ def test_skyserve_dynamic_ondemand_fallback(): @pytest.mark.no_fluidstack @pytest.mark.no_do # DO does not support `--cpus 2` @pytest.mark.serve +@pytest.mark.no_vast # Vast doesn't support opening ports def test_skyserve_user_bug_restart(generic_cloud: str): """Tests that we restart the service after user bug.""" # TODO(zhwu): this behavior needs some rethinking. @@ -359,6 +362,7 @@ def test_skyserve_user_bug_restart(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve @pytest.mark.no_kubernetes # Replicas on k8s may be running on the same node and have the same public IP def test_skyserve_load_balancer(generic_cloud: str): @@ -425,6 +429,7 @@ def test_skyserve_auto_restart(): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_cancel(generic_cloud: str): """Test skyserve with cancel""" @@ -450,6 +455,7 @@ def test_skyserve_cancel(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_streaming(generic_cloud: str): """Test skyserve with streaming""" @@ -469,6 +475,7 @@ def test_skyserve_streaming(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_readiness_timeout_fail(generic_cloud: str): """Test skyserve with large readiness probe latency, expected to fail""" @@ -492,6 +499,7 @@ def test_skyserve_readiness_timeout_fail(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_large_readiness_timeout(generic_cloud: str): """Test skyserve with customized large readiness timeout""" @@ -513,6 +521,7 @@ def test_skyserve_large_readiness_timeout(generic_cloud: str): # TODO: fluidstack does not support `--cpus 2`, but the check for services in this test is based on CPUs @pytest.mark.no_fluidstack @pytest.mark.no_do # DO does not support `--cpus 2` +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_update(generic_cloud: str): """Test skyserve with update""" @@ -544,6 +553,7 @@ def test_skyserve_update(generic_cloud: str): # TODO: fluidstack does not support `--cpus 2`, but the check for services in this test is based on CPUs @pytest.mark.no_fluidstack @pytest.mark.no_do # DO does not support `--cpus 2` +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_rolling_update(generic_cloud: str): """Test skyserve with rolling update""" @@ -581,6 +591,7 @@ def test_skyserve_rolling_update(generic_cloud: str): @pytest.mark.no_fluidstack +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_fast_update(generic_cloud: str): """Test skyserve with fast update (Increment version of old replicas)""" @@ -622,6 +633,7 @@ def test_skyserve_fast_update(generic_cloud: str): smoke_tests_utils.run_one_test(test) +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_update_autoscale(generic_cloud: str): """Test skyserve update with autoscale""" @@ -662,6 +674,7 @@ def test_skyserve_update_autoscale(generic_cloud: str): @pytest.mark.serve @pytest.mark.no_kubernetes # Spot instances are not supported in Kubernetes @pytest.mark.no_do # Spot instances not on DO +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.parametrize('mode', ['rolling', 'blue_green']) def test_skyserve_new_autoscaler_update(mode: str, generic_cloud: str): """Test skyserve with update that changes autoscaler""" @@ -726,6 +739,7 @@ def test_skyserve_new_autoscaler_update(mode: str, generic_cloud: str): # TODO: fluidstack does not support `--cpus 2`, but the check for services in this test is based on CPUs @pytest.mark.no_fluidstack @pytest.mark.no_do # DO does not support `--cpus 2` +@pytest.mark.no_vast # Vast doesn't support opening ports @pytest.mark.serve def test_skyserve_failures(generic_cloud: str): """Test replica failure statuses""" @@ -776,6 +790,7 @@ def test_skyserve_failures(generic_cloud: str): # ------- Testing user dependencies -------- +@pytest.mark.no_vast # Requires GCS def test_user_dependencies(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test(