Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[robustness] cover some potential resource leakage cases #4443

Merged
merged 11 commits into from
Dec 8, 2024
49 changes: 40 additions & 9 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@
_ENDPOINTS_RETRY_MESSAGE = ('If the cluster was recently started, '
'please retry after a while.')

# If a cluster is less than LAUNCH_DOUBLE_CHECK_WINDOW seconds old, and we don't
# see any instances in the cloud, the instances might be in the proccess of
# being created. We will wait LAUNCH_DOUBLE_CHECK_DELAY seconds and then double
# check to make sure there are still no instances. LAUNCH_DOUBLE_CHECK_DELAY
# should be set longer than the delay between (sending the create instance
# request) and (the instances appearing on the cloud).
# See https://github.com/skypilot-org/skypilot/issues/4431.
_LAUNCH_DOUBLE_CHECK_WINDOW = 60
_LAUNCH_DOUBLE_CHECK_DELAY = 1

# Include the fields that will be used for generating tags that distinguishes
# the cluster in ray, to avoid the stopped cluster being discarded due to
# updates in the yaml template.
Expand Down Expand Up @@ -1793,13 +1803,12 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
logger.debug(
f'Refreshing status ({cluster_name!r}) failed to get IPs.')
except RuntimeError as e:
logger.debug(str(e))
logger.debug(common_utils.format_exception(e))
except Exception as e: # pylint: disable=broad-except
# This can be raised by `external_ssh_ports()`, due to the
# underlying call to kubernetes API.
logger.debug(
f'Refreshing status ({cluster_name!r}) failed: '
f'{common_utils.format_exception(e, use_bracket=True)}')
logger.debug(f'Refreshing status ({cluster_name!r}) failed: ',
exc_info=e)
return False

# Determining if the cluster is healthy (UP):
Expand All @@ -1826,6 +1835,24 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
return record

# All cases below are transitioning the cluster to non-UP states.

if (not node_statuses and handle.launched_resources.cloud.STATUS_VERSION >=
clouds.StatusVersion.SKYPILOT):
# Note: launched_at is set during sky launch, even on an existing
# cluster. This will catch the case where the cluster was terminated on
# the cloud and restarted by sky launch.
time_since_launch = time.time() - record['launched_at']
cg505 marked this conversation as resolved.
Show resolved Hide resolved
if (record['status'] == status_lib.ClusterStatus.INIT and
time_since_launch < _LAUNCH_DOUBLE_CHECK_WINDOW):
# It's possible the instances for this cluster were just created,
# and haven't appeared yet in the cloud API/console. Wait for a bit
# and check again. This is a best-effort leak prevention check.
# See https://github.com/skypilot-org/skypilot/issues/4431.
time.sleep(_LAUNCH_DOUBLE_CHECK_DELAY)
node_statuses = _query_cluster_status_via_cloud_api(handle)
# Note: even if all the node_statuses are UP now, we will still
# consider this cluster abnormal, and its status will be INIT.

if len(node_statuses) > handle.launched_nodes:
# Unexpected: in the queried region more than 1 cluster with the same
# constructed name tag returned. This will typically not happen unless
Expand Down Expand Up @@ -1854,13 +1881,15 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
f'{colorama.Style.RESET_ALL}')
assert len(node_statuses) <= handle.launched_nodes

# If the node_statuses is empty, all the nodes are terminated. We can
# safely set the cluster status to TERMINATED. This handles the edge case
# where the cluster is terminated by the user manually through the UI.
# If the node_statuses is empty, it should mean that all the nodes are
# terminated and we can set the cluster status to TERMINATED. This handles
# the edge case where the cluster is terminated by the user manually through
# the UI.
to_terminate = not node_statuses

# A cluster is considered "abnormal", if not all nodes are TERMINATED or
# not all nodes are STOPPED. We check that with the following logic:
# A cluster is considered "abnormal", if some (but not all) nodes are
# TERMINATED, or not all nodes are STOPPED. We check that with the following
# logic:
# * Not all nodes are terminated and there's at least one node
# terminated; or
# * Any of the non-TERMINATED nodes is in a non-STOPPED status.
Expand All @@ -1872,6 +1901,8 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
# cluster is probably down.
# * The cluster is partially terminated or stopped should be considered
# abnormal.
# * The cluster is partially or completely in the INIT state, which means
# that provisioning was interrupted. This is considered abnormal.
#
# An abnormal cluster will transition to INIT and have any autostop setting
# reset (unless it's autostopping/autodowning).
Expand Down
58 changes: 49 additions & 9 deletions sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
# The maximum retry count for fetching IP address.
_FETCH_IP_MAX_ATTEMPTS = 3

# How many times to query the cloud provider to make sure instances are
# stopping/terminating, and how long to wait between each query.
_TEARDOWN_WAIT_MAX_ATTEMPTS = 10
_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS = 1

_TEARDOWN_FAILURE_MESSAGE = (
f'\n{colorama.Fore.RED}Failed to terminate '
'{cluster_name}. {extra_reason}'
Expand Down Expand Up @@ -2370,15 +2375,17 @@ def get_command_runners(self,
zip(ip_list, port_list), **ssh_credentials)
return runners
if self.cached_cluster_info is None:
# We have `or self.cached_external_ips is None` here, because
# We have `and self.cached_external_ips is None` here, because
# when a cluster's cloud is just upgraded to the new provsioner,
# although it has the cached_external_ips, the cached_cluster_info
# can be None. We need to update it here, even when force_cached is
# set to True.
# TODO: We can remove `self.cached_external_ips is None` after
# version 0.8.0.
assert not force_cached or self.cached_external_ips is not None, (
force_cached, self.cached_external_ips)
if force_cached and self.cached_external_ips is None:
raise RuntimeError(
'Tried to use cached cluster info, but it\'s missing for '
f'cluster "{self.cluster_name}"')
self._update_cluster_info()
assert self.cached_cluster_info is not None, self
runners = provision_lib.get_command_runners(
Expand Down Expand Up @@ -2797,9 +2804,6 @@ def _provision(
if e.no_failover:
error_message = str(e)
else:
# Clean up the cluster's entry in `sky status`.
global_user_state.remove_cluster(cluster_name,
terminate=True)
usage_lib.messages.usage.update_final_cluster_status(
None)
error_message = (
Expand Down Expand Up @@ -3972,7 +3976,6 @@ def teardown_no_lock(self,
limit=1000).get_result()['items']
vpc_id = None
try:
# pylint: disable=line-too-long
vpc_id = vpcs_filtered_by_tags_and_region[0]['crn'].rsplit(
':', 1)[-1]
vpc_found = True
Expand All @@ -3981,7 +3984,6 @@ def teardown_no_lock(self,
returncode = -1

if vpc_found:
# pylint: disable=line-too-long E1136
# Delete VPC and it's associated resources
vpc_provider = IBMVPCProvider(
config_provider['resource_group_id'], region,
Expand Down Expand Up @@ -4083,6 +4085,7 @@ def post_teardown_cleanup(self,
* Removing the terminated cluster's scripts and ray yaml files.
"""
cluster_name_on_cloud = handle.cluster_name_on_cloud
cloud = handle.launched_resources.cloud

if (terminate and handle.launched_resources.is_image_managed is True):
# Delete the image when terminating a "cloned" cluster, i.e.,
Expand All @@ -4103,7 +4106,6 @@ def post_teardown_cleanup(self,
'remove it manually to avoid image leakage. Details: '
f'{common_utils.format_exception(e, use_bracket=True)}')
if terminate:
cloud = handle.launched_resources.cloud
config = common_utils.read_yaml(handle.cluster_yaml)
try:
cloud.check_features_are_supported(
Expand All @@ -4130,6 +4132,44 @@ def post_teardown_cleanup(self,
config = common_utils.read_yaml(handle.cluster_yaml)
backend_utils.SSHConfigHelper.remove_cluster(handle.cluster_name)

# Confirm that instances have actually transitioned state before
# updating the state database. We do this immediately before removing
# the state from the database, so that we can guarantee that this is
# always called before the state is removed. We considered running this
# check as part of provisioner.teardown_cluster or
# provision.terminate_instances, but it would open the door code paths
# that successfully call this function but do not first call
# teardown_cluster or terminate_instances. See
# https://github.com/skypilot-org/skypilot/pull/4443#discussion_r1872798032
attempts = 0
while True:
logger.debug(f'instance statuses attempt {attempts + 1}')
node_status_dict = provision_lib.query_instances(
repr(cloud),
cluster_name_on_cloud,
config['provider'],
non_terminated_only=False)

unexpected_node_state: Optional[Tuple[str, str]] = None
for node_id, node_status in node_status_dict.items():
logger.debug(f'{node_id} status: {node_status}')
# FIXME(cooperc): Some clouds (e.g. GCP) do not distinguish
# between "stopping/stopped" and "terminating/terminated", so we
# allow for either status instead of casing on `terminate`.
if node_status not in [None, status_lib.ClusterStatus.STOPPED]:
unexpected_node_state = (node_id, node_status)

if unexpected_node_state is None:
break

attempts += 1
if attempts < _TEARDOWN_WAIT_MAX_ATTEMPTS:
time.sleep(_TEARDOWN_WAIT_BETWEEN_ATTEMPS_SECONDS)
else:
(node_id, node_status) = unexpected_node_state
raise RuntimeError(f'Instance {node_id} in unexpected state '
f'{node_status}.')

global_user_state.remove_cluster(handle.cluster_name,
terminate=terminate)

Expand Down
2 changes: 1 addition & 1 deletion sky/provision/azure/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def cluster_status_map(
) -> Dict['AzureInstanceStatus', Optional[status_lib.ClusterStatus]]:
return {
cls.PENDING: status_lib.ClusterStatus.INIT,
cls.STOPPING: status_lib.ClusterStatus.INIT,
cls.RUNNING: status_lib.ClusterStatus.UP,
cls.STOPPING: status_lib.ClusterStatus.STOPPED,
cg505 marked this conversation as resolved.
Show resolved Hide resolved
cls.STOPPED: status_lib.ClusterStatus.STOPPED,
cls.DELETING: None,
}
Expand Down
2 changes: 2 additions & 0 deletions sky/provision/gcp/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def _filter_instances(
# non_terminated_only=True?
# Will there be callers who would want this to be False?
# stop() and terminate() for example already implicitly assume non-terminated.
# Currently, even with non_terminated_only=False, we may not have a dict entry
# for terminated instances, if they have already been fully deleted.
@common_utils.retry
def query_instances(
cluster_name_on_cloud: str,
Expand Down
2 changes: 1 addition & 1 deletion sky/provision/lambda_cloud/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def query_instances(
'booting': status_lib.ClusterStatus.INIT,
'active': status_lib.ClusterStatus.UP,
'unhealthy': status_lib.ClusterStatus.INIT,
'terminating': status_lib.ClusterStatus.INIT,
'terminating': None,
}
statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {}
for instance_id, instance in instances.items():
Expand Down
3 changes: 2 additions & 1 deletion sky/provision/paperspace/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,13 @@ def query_instances(
assert provider_config is not None, (cluster_name_on_cloud, provider_config)
instances = _filter_instances(cluster_name_on_cloud, None)

# https://docs.digitalocean.com/reference/paperspace/core/commands/machines/#show
status_map = {
'starting': status_lib.ClusterStatus.INIT,
'restarting': status_lib.ClusterStatus.INIT,
'upgrading': status_lib.ClusterStatus.INIT,
'provisioning': status_lib.ClusterStatus.INIT,
'stopping': status_lib.ClusterStatus.INIT,
'stopping': status_lib.ClusterStatus.STOPPED,
'serviceready': status_lib.ClusterStatus.INIT,
'ready': status_lib.ClusterStatus.UP,
'off': status_lib.ClusterStatus.STOPPED,
Expand Down
Loading