Skip to content

Commit

Permalink
update locking mechanism for status check to early exit
Browse files Browse the repository at this point in the history
  • Loading branch information
cg505 committed Nov 13, 2024
1 parent 6cc553f commit c023bd1
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 74 deletions.
179 changes: 108 additions & 71 deletions sky/backends/backend_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1672,11 +1672,36 @@ def check_can_clone_disk_and_override_task(

def _update_cluster_status_no_lock(
cluster_name: str) -> Optional[Dict[str, Any]]:
"""Updates the status of the cluster.
"""Update the cluster status.
The cluster status is updated by checking ray cluster and real status from
cloud.
The function will update the cached cluster status in the global state. For
the design of the cluster status and transition, please refer to the
sky/design_docs/cluster_status.md
Args:
cluster_name: The name of the cluster.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
cluster_status_lock_timeout: The timeout to acquire the per-cluster
lock.
force: Refresh the cluster status from the cloud even if the status is
not stale.
Returns:
If the cluster is terminated or does not exist, return None. Otherwise
returns the input record with status and handle potentially updated.
Raises:
exceptions.ClusterOwnerIdentityMismatchError: if the current user is
not the same as the user who created the cluster.
exceptions.CloudUserIdentityError: if we fail to get the current user
identity.
exceptions.ClusterStatusFetchingError: the cluster status cannot be
fetched from the cloud provider.
fetched from the cloud provider or there are leaked nodes causing
the node number larger than expected.
"""
record = global_user_state.get_cluster_from_name(cluster_name)
if record is None:
Expand Down Expand Up @@ -1896,52 +1921,22 @@ def run_ray_status_to_check_ray_cluster_healthy() -> bool:
return global_user_state.get_cluster_from_name(cluster_name)


def _update_cluster_status(
cluster_name: str,
acquire_per_cluster_status_lock: bool,
cluster_status_lock_timeout: int = CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS
) -> Optional[Dict[str, Any]]:
"""Update the cluster status.
The cluster status is updated by checking ray cluster and real status from
cloud.
The function will update the cached cluster status in the global state. For
the design of the cluster status and transition, please refer to the
sky/design_docs/cluster_status.md
Args:
cluster_name: The name of the cluster.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
cluster_status_lock_timeout: The timeout to acquire the per-cluster
lock.
Returns:
If the cluster is terminated or does not exist, return None. Otherwise
returns the input record with status and handle potentially updated.
def _must_refresh_cluster_status(
record: Dict[str, Any],
force_refresh_statuses: Optional[Set[status_lib.ClusterStatus]]
) -> bool:
force_refresh_for_cluster = (force_refresh_statuses is not None and
record['status'] in force_refresh_statuses)

Raises:
exceptions.ClusterOwnerIdentityMismatchError: if the current user is
not the same as the user who created the cluster.
exceptions.CloudUserIdentityError: if we fail to get the current user
identity.
exceptions.ClusterStatusFetchingError: the cluster status cannot be
fetched from the cloud provider or there are leaked nodes causing
the node number larger than expected.
"""
if not acquire_per_cluster_status_lock:
return _update_cluster_status_no_lock(cluster_name)
use_spot = record['handle'].launched_resources.use_spot
has_autostop = (record['status'] != status_lib.ClusterStatus.STOPPED and
record['autostop'] >= 0)
recently_refreshed = (record['status_updated_at'] is not None and
time.time() - record['status_updated_at'] <
_CLUSTER_STATUS_CACHE_DURATION_SECONDS)
is_stale = (use_spot or has_autostop) and not recently_refreshed

try:
with filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name),
timeout=cluster_status_lock_timeout):
return _update_cluster_status_no_lock(cluster_name)
except filelock.Timeout:
logger.debug('Refreshing status: Failed get the lock for cluster '
f'{cluster_name!r}. Using the cached status.')
record = global_user_state.get_cluster_from_name(cluster_name)
return record
return force_refresh_for_cluster or is_stale


def refresh_cluster_record(
Expand All @@ -1959,16 +1954,20 @@ def refresh_cluster_record(
Args:
cluster_name: The name of the cluster.
force_refresh_statuses: if specified, refresh the cluster if it has one of
the specified statuses. Additionally, clusters satisfying the
following conditions will always be refreshed no matter the
argument is specified or not:
1. is a spot cluster, or
2. is a non-spot cluster, is not STOPPED, and autostop is set.
force_refresh_statuses: if specified, refresh the cluster if it has one
of the specified statuses. Additionally, clusters satisfying the
following conditions will be refreshed no matter the argument is
specified or not:
- the most latest available status update is more than
_CLUSTER_STATUS_CACHE_DURATION_SECONDS old, and one of:
1. the cluster is a spot cluster, or
2. cluster autostop is set and the cluster is not STOPPED.
acquire_per_cluster_status_lock: Whether to acquire the per-cluster lock
before updating the status.
before updating the status. Even if this is True, the lock may not be
acquired if the status does not need to be refreshed.
cluster_status_lock_timeout: The timeout to acquire the per-cluster
lock. If timeout, the function will use the cached status.
lock. If timeout, the function will use the cached status. If the
value is <0, do not timeout (wait for the lock indefinitely).
Returns:
If the cluster is terminated or does not exist, return None.
Expand All @@ -1989,23 +1988,61 @@ def refresh_cluster_record(
return None
check_owner_identity(cluster_name)

handle = record['handle']
if isinstance(handle, backends.CloudVmRayResourceHandle):
use_spot = handle.launched_resources.use_spot
has_autostop = (record['status'] != status_lib.ClusterStatus.STOPPED and
record['autostop'] >= 0)
recently_refreshed = (record['status_updated_at'] is not None and
time.time() - record['status_updated_at'] <
_CLUSTER_STATUS_CACHE_DURATION_SECONDS)
should_refresh = (has_autostop or use_spot) and not recently_refreshed
force_refresh_for_cluster = (force_refresh_statuses is not None and
record['status'] in force_refresh_statuses)
if force_refresh_for_cluster or should_refresh:
record = _update_cluster_status(
cluster_name,
acquire_per_cluster_status_lock=acquire_per_cluster_status_lock,
cluster_status_lock_timeout=cluster_status_lock_timeout)
return record
if not isinstance(record['handle'], backends.CloudVmRayResourceHandle):
return record

# The loop logic allows us to notice if the status was updated in the
# global_user_state by another process and stop trying to get the lock.
# The core loop logic is adapted from FileLock's implementation.
start_time = time.perf_counter()
lock = filelock.FileLock(CLUSTER_STATUS_LOCK_PATH.format(cluster_name))

try:
# Loop until we have an up-to-date status or until we acquire the lock.
while True:
# Check to see if we can return the cached status.
if not _must_refresh_cluster_status(record, force_refresh_statuses):
return record

if not acquire_per_cluster_status_lock:
return _update_cluster_status_no_lock(cluster_name)

# Try to acquire the lock so we can fetch the status.
try:
lock.acquire(blocking=False)
# Lock acquired.

# Check the cluster status again, since it could have been
# updated between our last check and acquiring the lock.
record = global_user_state.get_cluster_from_name(cluster_name)
if record is None or not _must_refresh_cluster_status(
record, force_refresh_statuses):
return record

# Update and return the cluster status.
return _update_cluster_status_no_lock(cluster_name)

except filelock.Timeout:
# lock.acquire() will throw a Timeout exception if the lock
# is not available and we have blocking=False.
pass

if 0 <= cluster_status_lock_timeout < time.perf_counter(
) - start_time:
logger.debug(
'Refreshing status: Failed get the lock for cluster '
f'{cluster_name!r}. Using the cached status.')
return record

time.sleep(0.05)

record = global_user_state.get_cluster_from_name(cluster_name)
if record is None:
return None

finally:
if lock.is_locked:
lock.release()


@timeline.event
Expand Down
2 changes: 1 addition & 1 deletion sky/backends/cloud_vm_ray_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -3554,7 +3554,7 @@ def _teardown(self,
backend_utils.CLUSTER_STATUS_LOCK_PATH.format(cluster_name))

try:
with filelock.FileLock(
with timeline.FileLockEvent(
lock_path,
backend_utils.CLUSTER_STATUS_LOCK_TIMEOUT_SECONDS):
self.teardown_no_lock(
Expand Down
2 changes: 2 additions & 0 deletions sky/clouds/service_catalog/aws_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from sky.utils import common_utils
from sky.utils import resources_utils
from sky.utils import rich_utils
from sky.utils import timeline
from sky.utils import ux_utils

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -100,6 +101,7 @@ def _get_az_mappings(aws_user_hash: str) -> Optional['pd.DataFrame']:
return az_mappings


@timeline.event
def _fetch_and_apply_az_mapping(df: common.LazyDataFrame) -> 'pd.DataFrame':
"""Maps zone IDs (use1-az1) to zone names (us-east-1x).
Expand Down
2 changes: 0 additions & 2 deletions sky/utils/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ class FileLockEvent:

def __init__(self, lockfile: Union[str, os.PathLike]):
self._lockfile = lockfile
# TODO(mraheja): remove pylint disabling when filelock version updated
# pylint: disable=abstract-class-instantiated
self._lock = filelock.FileLock(self._lockfile)
self._hold_lock_event = Event(f'[FileLock.hold]:{self._lockfile}')

Expand Down

0 comments on commit c023bd1

Please sign in to comment.