Skip to content

Commit

Permalink
Merge pull request #627 from ungarj/retry_executor_exceptions
Browse files Browse the repository at this point in the history
also include executor_getter() call within try ... except block in or…
  • Loading branch information
ungarj authored May 2, 2024
2 parents 5f5963b + 8dc67b5 commit 9c4659e
Showing 1 changed file with 38 additions and 37 deletions.
75 changes: 38 additions & 37 deletions mapchete/commands/_execute.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Execute a process."""

import logging
from contextlib import AbstractContextManager
from multiprocessing import cpu_count
Expand Down Expand Up @@ -145,32 +146,32 @@ def execute(
all_observers.notify(
message=f"processing {len(tasks)} tasks on {workers} worker(s)"
)

all_observers.notify(message="waiting for executor ...")
with executor_getter(
concurrency=concurrency,
dask_scheduler=dask_settings.scheduler,
dask_client=dask_settings.client,
multiprocessing_start_method=multiprocessing_start_method,
max_workers=workers,
preprocessing_tasks=tasks.preprocessing_tasks_count,
tile_tasks=tasks.tile_tasks_count,
) as executor:
if profiling:
for profiler in preconfigured_profilers:
executor.add_profiler(profiler)
else:
executor.add_profiler(
Profiler(name="time", decorator=measure_time)

try:
with executor_getter(
concurrency=concurrency,
dask_scheduler=dask_settings.scheduler,
dask_client=dask_settings.client,
multiprocessing_start_method=multiprocessing_start_method,
max_workers=workers,
preprocessing_tasks=tasks.preprocessing_tasks_count,
tile_tasks=tasks.tile_tasks_count,
) as executor:
if profiling:
for profiler in preconfigured_profilers:
executor.add_profiler(profiler)
else:
executor.add_profiler(
Profiler(name="time", decorator=measure_time)
)
all_observers.notify(
status=Status.running,
progress=Progress(total=len(tasks)),
message=f"sending {len(tasks)} tasks to {executor} ...",
executor=executor,
)
all_observers.notify(
status=Status.running,
progress=Progress(total=len(tasks)),
message=f"sending {len(tasks)} tasks to {executor} ...",
executor=executor,
)
# TODO it would be nice to track the time it took sending tasks to the executor
try:
# TODO it would be nice to track the time it took sending tasks to the executor
for count, task_info in enumerate(
mp.execute(
executor=executor,
Expand All @@ -186,20 +187,20 @@ def execute(
all_observers.notify(status=Status.done)
return

except cancel_on_exception:
# special exception indicating job was cancelled from the outside
all_observers.notify(status=Status.cancelled)
return
except cancel_on_exception:
# special exception indicating job was cancelled from the outside
all_observers.notify(status=Status.cancelled)
return

except retry_on_exception as exception:
if retries:
retries -= 1
all_observers.notify(
status=Status.retrying,
message=f"run failed due to {repr(exception)} (remaining retries: {retries})",
)
else:
raise
except retry_on_exception as exception:
if retries:
retries -= 1
all_observers.notify(
status=Status.retrying,
message=f"run failed due to {repr(exception)} (remaining retries: {retries})",
)
else:
raise

except Exception as exception:
all_observers.notify(status=Status.failed, exception=exception)
Expand Down

0 comments on commit 9c4659e

Please sign in to comment.