diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 9c56546234a..87651a83cd4 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -115,6 +115,16 @@ _ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, ' 'please retry after a while.') +# If a cluster is less than LAUNCH_DOUBLE_CHECK_WINDOW seconds old, and we don't +# see any instances in the cloud, the instances might be in the proccess of +# being created. We will wait LAUNCH_DOUBLE_CHECK_DELAY seconds and then double +# check to make sure there are still no instances. LAUNCH_DOUBLE_CHECK_DELAY +# should be set longer than the delay between (sending the create instance +# request) and (the instances appearing on the cloud). +# See https://github.com/skypilot-org/skypilot/issues/4431. +_LAUNCH_DOUBLE_CHECK_WINDOW = 60 +_LAUNCH_DOUBLE_CHECK_DELAY = 1 + # Include the fields that will be used for generating tags that distinguishes # the cluster in ray, to avoid the stopped cluster being discarded due to # updates in the yaml template. @@ -1793,13 +1803,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: logger.debug( f'Refreshing status ({cluster_name!r}) failed to get IPs.') except RuntimeError as e: - logger.debug(str(e)) + logger.debug(common_utils.format_exception(e)) except Exception as e: # pylint: disable=broad-except # This can be raised by `external_ssh_ports()`, due to the # underlying call to kubernetes API. - logger.debug( - f'Refreshing status ({cluster_name!r}) failed: ' - f'{common_utils.format_exception(e, use_bracket=True)}') + logger.debug(f'Refreshing status ({cluster_name!r}) failed: ', + exc_info=e) return False # Determining if the cluster is healthy (UP): @@ -1826,6 +1835,24 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: return record # All cases below are transitioning the cluster to non-UP states. + + if (not node_statuses and handle.launched_resources.cloud.STATUS_VERSION >= + clouds.StatusVersion.SKYPILOT): + # Note: launched_at is set during sky launch, even on an existing + # cluster. This will catch the case where the cluster was terminated on + # the cloud and restarted by sky launch. + time_since_launch = time.time() - record['launched_at'] + if (record['status'] == status_lib.ClusterStatus.INIT and + time_since_launch < _LAUNCH_DOUBLE_CHECK_WINDOW): + # It's possible the instances for this cluster were just created, + # and haven't appeared yet in the cloud API/console. Wait for a bit + # and check again. This is a best-effort leak prevention check. + # See https://github.com/skypilot-org/skypilot/issues/4431. + time.sleep(_LAUNCH_DOUBLE_CHECK_DELAY) + node_statuses = _query_cluster_status_via_cloud_api(handle) + # Note: even if all the node_statuses are UP now, we will still + # consider this cluster abnormal, and its status will be INIT. + if len(node_statuses) > handle.launched_nodes: # Unexpected: in the queried region more than 1 cluster with the same # constructed name tag returned. This will typically not happen unless @@ -1854,13 +1881,15 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: f'{colorama.Style.RESET_ALL}') assert len(node_statuses) <= handle.launched_nodes - # If the node_statuses is empty, all the nodes are terminated. We can - # safely set the cluster status to TERMINATED. This handles the edge case - # where the cluster is terminated by the user manually through the UI. + # If the node_statuses is empty, it should mean that all the nodes are + # terminated and we can set the cluster status to TERMINATED. This handles + # the edge case where the cluster is terminated by the user manually through + # the UI. to_terminate = not node_statuses - # A cluster is considered "abnormal", if not all nodes are TERMINATED or - # not all nodes are STOPPED. We check that with the following logic: + # A cluster is considered "abnormal", if some (but not all) nodes are + # TERMINATED, or not all nodes are STOPPED. We check that with the following + # logic: # * Not all nodes are terminated and there's at least one node # terminated; or # * Any of the non-TERMINATED nodes is in a non-STOPPED status. @@ -1872,6 +1901,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: # cluster is probably down. # * The cluster is partially terminated or stopped should be considered # abnormal. + # * The cluster is partially or completely in the INIT state, which means + # that provisioning was interrupted. This is considered abnormal. # # An abnormal cluster will transition to INIT and have any autostop setting # reset (unless it's autostopping/autodowning). diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index d73b7f54b8d..d2af79c9db8 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -98,6 +98,11 @@ # The maximum retry count for fetching IP address. _FETCH_IP_MAX_ATTEMPTS = 3 +# How many times to query the cloud provider to make sure instances are +# stopping/terminating, and how long to wait between each query. +_TEARDOWN_WAIT_MAX_ATTEMPTS = 10 +_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS = 1 + _TEARDOWN_FAILURE_MESSAGE = ( f'\n{colorama.Fore.RED}Failed to terminate ' '{cluster_name}. {extra_reason}' @@ -2370,15 +2375,17 @@ def get_command_runners(self, zip(ip_list, port_list), **ssh_credentials) return runners if self.cached_cluster_info is None: - # We have `or self.cached_external_ips is None` here, because + # We have `and self.cached_external_ips is None` here, because # when a cluster's cloud is just upgraded to the new provsioner, # although it has the cached_external_ips, the cached_cluster_info # can be None. We need to update it here, even when force_cached is # set to True. # TODO: We can remove `self.cached_external_ips is None` after # version 0.8.0. - assert not force_cached or self.cached_external_ips is not None, ( - force_cached, self.cached_external_ips) + if force_cached and self.cached_external_ips is None: + raise RuntimeError( + 'Tried to use cached cluster info, but it\'s missing for ' + f'cluster "{self.cluster_name}"') self._update_cluster_info() assert self.cached_cluster_info is not None, self runners = provision_lib.get_command_runners( @@ -2797,9 +2804,6 @@ def _provision( if e.no_failover: error_message = str(e) else: - # Clean up the cluster's entry in `sky status`. - global_user_state.remove_cluster(cluster_name, - terminate=True) usage_lib.messages.usage.update_final_cluster_status( None) error_message = ( @@ -3972,7 +3976,6 @@ def teardown_no_lock(self, limit=1000).get_result()['items'] vpc_id = None try: - # pylint: disable=line-too-long vpc_id = vpcs_filtered_by_tags_and_region[0]['crn'].rsplit( ':', 1)[-1] vpc_found = True @@ -3981,7 +3984,6 @@ def teardown_no_lock(self, returncode = -1 if vpc_found: - # pylint: disable=line-too-long E1136 # Delete VPC and it's associated resources vpc_provider = IBMVPCProvider( config_provider['resource_group_id'], region, @@ -4083,6 +4085,7 @@ def post_teardown_cleanup(self, * Removing the terminated cluster's scripts and ray yaml files. """ cluster_name_on_cloud = handle.cluster_name_on_cloud + cloud = handle.launched_resources.cloud if (terminate and handle.launched_resources.is_image_managed is True): # Delete the image when terminating a "cloned" cluster, i.e., @@ -4103,7 +4106,6 @@ def post_teardown_cleanup(self, 'remove it manually to avoid image leakage. Details: ' f'{common_utils.format_exception(e, use_bracket=True)}') if terminate: - cloud = handle.launched_resources.cloud config = common_utils.read_yaml(handle.cluster_yaml) try: cloud.check_features_are_supported( @@ -4130,6 +4132,44 @@ def post_teardown_cleanup(self, config = common_utils.read_yaml(handle.cluster_yaml) backend_utils.SSHConfigHelper.remove_cluster(handle.cluster_name) + # Confirm that instances have actually transitioned state before + # updating the state database. We do this immediately before removing + # the state from the database, so that we can guarantee that this is + # always called before the state is removed. We considered running this + # check as part of provisioner.teardown_cluster or + # provision.terminate_instances, but it would open the door code paths + # that successfully call this function but do not first call + # teardown_cluster or terminate_instances. See + # https://github.com/skypilot-org/skypilot/pull/4443#discussion_r1872798032 + attempts = 0 + while True: + logger.debug(f'instance statuses attempt {attempts + 1}') + node_status_dict = provision_lib.query_instances( + repr(cloud), + cluster_name_on_cloud, + config['provider'], + non_terminated_only=False) + + unexpected_node_state: Optional[Tuple[str, str]] = None + for node_id, node_status in node_status_dict.items(): + logger.debug(f'{node_id} status: {node_status}') + # FIXME(cooperc): Some clouds (e.g. GCP) do not distinguish + # between "stopping/stopped" and "terminating/terminated", so we + # allow for either status instead of casing on `terminate`. + if node_status not in [None, status_lib.ClusterStatus.STOPPED]: + unexpected_node_state = (node_id, node_status) + + if unexpected_node_state is None: + break + + attempts += 1 + if attempts < _TEARDOWN_WAIT_MAX_ATTEMPTS: + time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS) + else: + (node_id, node_status) = unexpected_node_state + raise RuntimeError(f'Instance {node_id} in unexpected state ' + f'{node_status}.') + global_user_state.remove_cluster(handle.cluster_name, terminate=terminate) diff --git a/sky/provision/azure/instance.py b/sky/provision/azure/instance.py index 700d31c597f..229d7361e22 100644 --- a/sky/provision/azure/instance.py +++ b/sky/provision/azure/instance.py @@ -101,8 +101,8 @@ def cluster_status_map( ) -> Dict['AzureInstanceStatus', Optional[status_lib.ClusterStatus]]: return { cls.PENDING: status_lib.ClusterStatus.INIT, - cls.STOPPING: status_lib.ClusterStatus.INIT, cls.RUNNING: status_lib.ClusterStatus.UP, + cls.STOPPING: status_lib.ClusterStatus.STOPPED, cls.STOPPED: status_lib.ClusterStatus.STOPPED, cls.DELETING: None, } diff --git a/sky/provision/gcp/instance.py b/sky/provision/gcp/instance.py index 9872ad73dc7..6c09dbc6816 100644 --- a/sky/provision/gcp/instance.py +++ b/sky/provision/gcp/instance.py @@ -52,6 +52,8 @@ def _filter_instances( # non_terminated_only=True? # Will there be callers who would want this to be False? # stop() and terminate() for example already implicitly assume non-terminated. +# Currently, even with non_terminated_only=False, we may not have a dict entry +# for terminated instances, if they have already been fully deleted. @common_utils.retry def query_instances( cluster_name_on_cloud: str, diff --git a/sky/provision/lambda_cloud/instance.py b/sky/provision/lambda_cloud/instance.py index d10c36496ab..d33c97df95c 100644 --- a/sky/provision/lambda_cloud/instance.py +++ b/sky/provision/lambda_cloud/instance.py @@ -233,7 +233,7 @@ def query_instances( 'booting': status_lib.ClusterStatus.INIT, 'active': status_lib.ClusterStatus.UP, 'unhealthy': status_lib.ClusterStatus.INIT, - 'terminating': status_lib.ClusterStatus.INIT, + 'terminating': None, } statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} for instance_id, instance in instances.items(): diff --git a/sky/provision/paperspace/instance.py b/sky/provision/paperspace/instance.py index 5804362d102..6775304c8f7 100644 --- a/sky/provision/paperspace/instance.py +++ b/sky/provision/paperspace/instance.py @@ -286,12 +286,13 @@ def query_instances( assert provider_config is not None, (cluster_name_on_cloud, provider_config) instances = _filter_instances(cluster_name_on_cloud, None) + # https://docs.digitalocean.com/reference/paperspace/core/commands/machines/#show status_map = { 'starting': status_lib.ClusterStatus.INIT, 'restarting': status_lib.ClusterStatus.INIT, 'upgrading': status_lib.ClusterStatus.INIT, 'provisioning': status_lib.ClusterStatus.INIT, - 'stopping': status_lib.ClusterStatus.INIT, + 'stopping': status_lib.ClusterStatus.STOPPED, 'serviceready': status_lib.ClusterStatus.INIT, 'ready': status_lib.ClusterStatus.UP, 'off': status_lib.ClusterStatus.STOPPED,