From c023bd1a14f7e4e11d998b5b54f6b6c62d045dac Mon Sep 17 00:00:00 2001 From: Christopher Cooper Date: Wed, 13 Nov 2024 15:23:59 -0800 Subject: [PATCH] update locking mechanism for status check to early exit --- sky/backends/backend_utils.py | 179 +++++++++++++--------- sky/backends/cloud_vm_ray_backend.py | 2 +- sky/clouds/service_catalog/aws_catalog.py | 2 + sky/utils/timeline.py | 2 - 4 files changed, 111 insertions(+), 74 deletions(-) diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 62998f3f0db..8310caac8e0 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1672,11 +1672,36 @@ def check_can_clone_disk_and_override_task( def _update_cluster_status_no_lock( cluster_name: str) -> Optional[Dict[str, Any]]: - """Updates the status of the cluster. + """Update the cluster status. + + The cluster status is updated by checking ray cluster and real status from + cloud. + + The function will update the cached cluster status in the global state. For + the design of the cluster status and transition, please refer to the + sky/design_docs/cluster_status.md + + Args: + cluster_name: The name of the cluster. + acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock + before updating the status. + cluster_status_lock_timeout: The timeout to acquire the per-cluster + lock. + force: Refresh the cluster status from the cloud even if the status is + not stale. + + Returns: + If the cluster is terminated or does not exist, return None. Otherwise + returns the input record with status and handle potentially updated. Raises: + exceptions.ClusterOwnerIdentityMismatchError: if the current user is + not the same as the user who created the cluster. + exceptions.CloudUserIdentityError: if we fail to get the current user + identity. exceptions.ClusterStatusFetchingError: the cluster status cannot be - fetched from the cloud provider. + fetched from the cloud provider or there are leaked nodes causing + the node number larger than expected. """ record = global_user_state.get_cluster_from_name(cluster_name) if record is None: @@ -1896,52 +1921,22 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool: return global_user_state.get_cluster_from_name(cluster_name) -def _update_cluster_status( - cluster_name: str, - acquire_per_cluster_status_lock: bool, - cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS -) -> Optional[Dict[str, Any]]: - """Update the cluster status. - - The cluster status is updated by checking ray cluster and real status from - cloud. - - The function will update the cached cluster status in the global state. For - the design of the cluster status and transition, please refer to the - sky/design_docs/cluster_status.md - - Args: - cluster_name: The name of the cluster. - acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock - before updating the status. - cluster_status_lock_timeout: The timeout to acquire the per-cluster - lock. - - Returns: - If the cluster is terminated or does not exist, return None. Otherwise - returns the input record with status and handle potentially updated. +def _must_refresh_cluster_status( + record: Dict[str, Any], + force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]] +) -> bool: + force_refresh_for_cluster = (force_refresh_statuses is not None and + record['status'] in force_refresh_statuses) - Raises: - exceptions.ClusterOwnerIdentityMismatchError: if the current user is - not the same as the user who created the cluster. - exceptions.CloudUserIdentityError: if we fail to get the current user - identity. - exceptions.ClusterStatusFetchingError: the cluster status cannot be - fetched from the cloud provider or there are leaked nodes causing - the node number larger than expected. - """ - if not acquire_per_cluster_status_lock: - return _update_cluster_status_no_lock(cluster_name) + use_spot = record['handle'].launched_resources.use_spot + has_autostop = (record['status'] != status_lib.ClusterStatus.STOPPED and + record['autostop'] >= 0) + recently_refreshed = (record['status_updated_at'] is not None and + time.time() - record['status_updated_at'] < + _CLUSTER_STATUS_CACHE_DURATION_SECONDS) + is_stale = (use_spot or has_autostop) and not recently_refreshed - try: - with filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name), - timeout=cluster_status_lock_timeout): - return _update_cluster_status_no_lock(cluster_name) - except filelock.Timeout: - logger.debug('Refreshing status: Failed get the lock for cluster ' - f'{cluster_name!r}. Using the cached status.') - record = global_user_state.get_cluster_from_name(cluster_name) - return record + return force_refresh_for_cluster or is_stale def refresh_cluster_record( @@ -1959,16 +1954,20 @@ def refresh_cluster_record( Args: cluster_name: The name of the cluster. - force_refresh_statuses: if specified, refresh the cluster if it has one of - the specified statuses. Additionally, clusters satisfying the - following conditions will always be refreshed no matter the - argument is specified or not: - 1. is a spot cluster, or - 2. is a non-spot cluster, is not STOPPED, and autostop is set. + force_refresh_statuses: if specified, refresh the cluster if it has one + of the specified statuses. Additionally, clusters satisfying the + following conditions will be refreshed no matter the argument is + specified or not: + - the most latest available status update is more than + _CLUSTER_STATUS_CACHE_DURATION_SECONDS old, and one of: + 1. the cluster is a spot cluster, or + 2. cluster autostop is set and the cluster is not STOPPED. acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock - before updating the status. + before updating the status. Even if this is True, the lock may not be + acquired if the status does not need to be refreshed. cluster_status_lock_timeout: The timeout to acquire the per-cluster - lock. If timeout, the function will use the cached status. + lock. If timeout, the function will use the cached status. If the + value is <0, do not timeout (wait for the lock indefinitely). Returns: If the cluster is terminated or does not exist, return None. @@ -1989,23 +1988,61 @@ def refresh_cluster_record( return None check_owner_identity(cluster_name) - handle = record['handle'] - if isinstance(handle, backends.CloudVmRayResourceHandle): - use_spot = handle.launched_resources.use_spot - has_autostop = (record['status'] != status_lib.ClusterStatus.STOPPED and - record['autostop'] >= 0) - recently_refreshed = (record['status_updated_at'] is not None and - time.time() - record['status_updated_at'] < - _CLUSTER_STATUS_CACHE_DURATION_SECONDS) - should_refresh = (has_autostop or use_spot) and not recently_refreshed - force_refresh_for_cluster = (force_refresh_statuses is not None and - record['status'] in force_refresh_statuses) - if force_refresh_for_cluster or should_refresh: - record = _update_cluster_status( - cluster_name, - acquire_per_cluster_status_lock=acquire_per_cluster_status_lock, - cluster_status_lock_timeout=cluster_status_lock_timeout) - return record + if not isinstance(record['handle'], backends.CloudVmRayResourceHandle): + return record + + # The loop logic allows us to notice if the status was updated in the + # global_user_state by another process and stop trying to get the lock. + # The core loop logic is adapted from FileLock's implementation. + start_time = time.perf_counter() + lock = filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name)) + + try: + # Loop until we have an up-to-date status or until we acquire the lock. + while True: + # Check to see if we can return the cached status. + if not _must_refresh_cluster_status(record, force_refresh_statuses): + return record + + if not acquire_per_cluster_status_lock: + return _update_cluster_status_no_lock(cluster_name) + + # Try to acquire the lock so we can fetch the status. + try: + lock.acquire(blocking=False) + # Lock acquired. + + # Check the cluster status again, since it could have been + # updated between our last check and acquiring the lock. + record = global_user_state.get_cluster_from_name(cluster_name) + if record is None or not _must_refresh_cluster_status( + record, force_refresh_statuses): + return record + + # Update and return the cluster status. + return _update_cluster_status_no_lock(cluster_name) + + except filelock.Timeout: + # lock.acquire() will throw a Timeout exception if the lock + # is not available and we have blocking=False. + pass + + if 0 <= cluster_status_lock_timeout < time.perf_counter( + ) - start_time: + logger.debug( + 'Refreshing status: Failed get the lock for cluster ' + f'{cluster_name!r}. Using the cached status.') + return record + + time.sleep(0.05) + + record = global_user_state.get_cluster_from_name(cluster_name) + if record is None: + return None + + finally: + if lock.is_locked: + lock.release() @timeline.event diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index e338eecb744..b4a1268f174 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -3554,7 +3554,7 @@ def _teardown(self, backend_utils.CLUSTER_STATUS_LOCK_PATH.format(cluster_name)) try: - with filelock.FileLock( + with timeline.FileLockEvent( lock_path, backend_utils.CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS): self.teardown_no_lock( diff --git a/sky/clouds/service_catalog/aws_catalog.py b/sky/clouds/service_catalog/aws_catalog.py index 918a4070414..bbd48863755 100644 --- a/sky/clouds/service_catalog/aws_catalog.py +++ b/sky/clouds/service_catalog/aws_catalog.py @@ -20,6 +20,7 @@ from sky.utils import common_utils from sky.utils import resources_utils from sky.utils import rich_utils +from sky.utils import timeline from sky.utils import ux_utils if typing.TYPE_CHECKING: @@ -100,6 +101,7 @@ def _get_az_mappings(aws_user_hash: str) -> Optional['pd.DataFrame']: return az_mappings +@timeline.event def _fetch_and_apply_az_mapping(df: common.LazyDataFrame) -> 'pd.DataFrame': """Maps zone IDs (use1-az1) to zone names (us-east-1x). diff --git a/sky/utils/timeline.py b/sky/utils/timeline.py index 29c6c3d94ee..502e5b38f83 100644 --- a/sky/utils/timeline.py +++ b/sky/utils/timeline.py @@ -79,8 +79,6 @@ class FileLockEvent: def __init__(self, lockfile: Union[str, os.PathLike]): self._lockfile = lockfile - # TODO(mraheja): remove pylint disabling when filelock version updated - # pylint: disable=abstract-class-instantiated self._lock = filelock.FileLock(self._lockfile) self._hold_lock_event = Event(f'[FileLock.hold]:{self._lockfile}')