Skip to content

Commit

Permalink
revert: run_in_parallel continue_on_errors
Browse files Browse the repository at this point in the history
  • Loading branch information
andylizf committed Dec 11, 2024
1 parent 8b7b075 commit fa6037f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 26 deletions.
4 changes: 1 addition & 3 deletions sky/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3553,9 +3553,7 @@ def delete_storage(name: str) -> None:
with ux_utils.print_exception_no_traceback():
raise e

subprocess_utils.run_in_parallel(delete_storage,
names,
continue_on_error=True)
subprocess_utils.run_in_parallel(delete_storage, names)


@cli.group(cls=_NaturalOrderGroup)
Expand Down
29 changes: 6 additions & 23 deletions sky/utils/subprocess_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,44 +99,27 @@ def get_parallel_threads(cloud_str: Optional[str] = None) -> int:
# TODO(andyl): Why this function returns a list of results? Why not yielding?
def run_in_parallel(func: Callable,
args: Iterable[Any],
num_threads: Optional[int] = None,
continue_on_error: bool = False) -> List[Any]:
num_threads: Optional[int] = None) -> List[Any]:
"""Run a function in parallel on a list of arguments.
Args:
func: The function to run in parallel
args: Iterable of arguments to pass to func
num_threads: Number of threads to use. If None, uses
get_parallel_threads()
continue_on_error: If True, continues execution when errors occur
If False (default), raises the first error immediately
Returns:
A list of the return values of the function func, in the same order as the
arguments. If continue_on_error=True, failed operations will have
their exceptions in the result list.
arguments.
Raises:
Exception: The first exception encountered.
"""
processes = (num_threads
if num_threads is not None else get_parallel_threads())
with pool.ThreadPool(processes=processes) as p:
ordered_iterators = p.imap(func, args)

# TODO(andyl): Is this list(ordered_iterators) clear? Maybe we should
# merge two cases, and move this logic deeper to
# `except e: results.append(e) if continue_on_error else raise e`
if not continue_on_error:
return list(ordered_iterators)
else:
results: List[Union[Any, Exception]] = []
while True:
try:
result = next(ordered_iterators)
results.append(result)
except StopIteration:
break
except Exception as e: # pylint: disable=broad-except
results.append(e)
return results
return list(ordered_iterators)


def handle_returncode(returncode: int,
Expand Down

0 comments on commit fa6037f

Please sign in to comment.