Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue storage deletion when some fail #4454

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3525,11 +3525,10 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
if sum([len(names) > 0, all]) != 1:
raise click.UsageError('Either --all or a name must be specified.')
if all:
storages = sky.storage_ls()
if not storages:
names = global_user_state.get_glob_storage_name('*')
if not names:
click.echo('No storage(s) to delete.')
return
names = [s['name'] for s in storages]
else:
names = _get_glob_storages(names)
if names:
Expand All @@ -3543,7 +3542,18 @@ def storage_delete(names: List[str], all: bool, yes: bool): # pylint: disable=r
abort=True,
show_default=True)

subprocess_utils.run_in_parallel(sky.storage_delete, names)
def delete_storage(name: str) -> None:
try:
sky.storage_delete(name)
except (exceptions.StorageBucketDeleteError, PermissionError) as e:
click.secho(f'Error deleting storage {name}: {e}', fg='red')
except ValueError as e:
click.secho(f'Error deleting storage {name}: {e}', fg='red')
except Exception as e: # pylint: disable=broad-except
with ux_utils.print_exception_no_traceback():
raise e

subprocess_utils.run_in_parallel(delete_storage, names)


@cli.group(cls=_NaturalOrderGroup)
Expand Down
13 changes: 8 additions & 5 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,8 +915,11 @@ def storage_delete(name: str) -> None:
handle = global_user_state.get_handle_from_storage_name(name)
if handle is None:
raise ValueError(f'Storage name {name!r} not found.')
else:
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()

assert handle.storage_name == name, (
f'In global_user_state, storage name {name!r} does not match '
f'handle.storage_name {handle.storage_name!r}')
storage_object = data.Storage(name=handle.storage_name,
source=handle.source,
sync_on_reconstruction=False)
storage_object.delete()
21 changes: 15 additions & 6 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,18 +950,16 @@ def delete(self, store_type: Optional[StoreType] = None) -> None:
if not self.stores:
logger.info('No backing stores found. Deleting storage.')
global_user_state.remove_storage(self.name)
if store_type:
if store_type is not None:
store = self.stores[store_type]
is_sky_managed = store.is_sky_managed
# We delete a store from the cloud if it's sky managed. Else just
# remove handle and return
if is_sky_managed:
if store.is_sky_managed:
self.handle.remove_store(store)
store.delete()
# Check remaining stores - if none is sky managed, remove
# the storage from global_user_state.
delete = all(
s.is_sky_managed is False for s in self.stores.values())
delete = all(not s.is_sky_managed for s in self.stores.values())
if delete:
global_user_state.remove_storage(self.name)
else:
Expand Down Expand Up @@ -1491,6 +1489,9 @@ def _delete_s3_bucket(self, bucket_name: str) -> bool:

Returns:
bool; True if bucket was deleted, False if it was deleted externally.

Raises:
StorageBucketDeleteError: If deleting the bucket fails.
"""
# Deleting objects is very slow programatically
# (i.e. bucket.objects.all().delete() is slow).
Expand Down Expand Up @@ -1934,6 +1935,11 @@ def _delete_gcs_bucket(self, bucket_name: str) -> bool:

Returns:
bool; True if bucket was deleted, False if it was deleted externally.

Raises:
StorageBucketDeleteError: If deleting the bucket fails.
PermissionError: If the bucket is external and the user is not
allowed to delete it.
"""

with rich_utils.safe_status(
Expand Down Expand Up @@ -3096,6 +3102,9 @@ def _delete_r2_bucket(self, bucket_name: str) -> bool:

Returns:
bool; True if bucket was deleted, False if it was deleted externally.

Raises:
StorageBucketDeleteError: If deleting the bucket fails.
"""
# Deleting objects is very slow programatically
# (i.e. bucket.objects.all().delete() is slow).
Expand Down Expand Up @@ -3532,7 +3541,7 @@ def _create_cos_bucket(self,

return self.bucket

def _delete_cos_bucket(self):
def _delete_cos_bucket(self) -> None:
bucket = self.s3_resource.Bucket(self.name)
try:
bucket_versioning = self.s3_resource.BucketVersioning(self.name)
Expand Down
2 changes: 1 addition & 1 deletion sky/global_user_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ def get_storage_names_start_with(starts_with: str) -> List[str]:


def get_storage() -> List[Dict[str, Any]]:
rows = _DB.cursor.execute('select * from storage')
rows = _DB.cursor.execute('SELECT * FROM storage')
records = []
for name, launched_at, handle, last_use, status in rows:
# TODO: use namedtuple instead of dict
Expand Down
16 changes: 8 additions & 8 deletions sky/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ def run_in_parallel(func: Callable,
num_threads: Optional[int] = None) -> List[Any]:
"""Run a function in parallel on a list of arguments.

The function 'func' should raise a CommandError if the command fails.

Args:
func: The function to run in parallel
args: Iterable of arguments to pass to func
Expand All @@ -111,14 +109,16 @@ def run_in_parallel(func: Callable,

Returns:
A list of the return values of the function func, in the same order as the
arguments.
arguments.

Raises:
Exception: The first exception encountered.
"""
# Reference: https://stackoverflow.com/questions/25790279/python-multiprocessing-early-termination # pylint: disable=line-too-long
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not removing this reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean why remove the reference? I think it's trivial now, and the reference doesn't add much value to the code.

processes = num_threads if num_threads is not None else get_parallel_threads(
)
processes = (num_threads
if num_threads is not None else get_parallel_threads())
with pool.ThreadPool(processes=processes) as p:
# Run the function in parallel on the arguments, keeping the order.
return list(p.imap(func, args))
ordered_iterators = p.imap(func, args)
return list(ordered_iterators)


def handle_returncode(returncode: int,
Expand Down
Loading