diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 80941cf7..82427097 100644 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -1,4 +1,5 @@ """Execute a process.""" + import logging from contextlib import AbstractContextManager from multiprocessing import cpu_count @@ -145,32 +146,32 @@ def execute( all_observers.notify( message=f"processing {len(tasks)} tasks on {workers} worker(s)" ) - all_observers.notify(message="waiting for executor ...") - with executor_getter( - concurrency=concurrency, - dask_scheduler=dask_settings.scheduler, - dask_client=dask_settings.client, - multiprocessing_start_method=multiprocessing_start_method, - max_workers=workers, - preprocessing_tasks=tasks.preprocessing_tasks_count, - tile_tasks=tasks.tile_tasks_count, - ) as executor: - if profiling: - for profiler in preconfigured_profilers: - executor.add_profiler(profiler) - else: - executor.add_profiler( - Profiler(name="time", decorator=measure_time) + + try: + with executor_getter( + concurrency=concurrency, + dask_scheduler=dask_settings.scheduler, + dask_client=dask_settings.client, + multiprocessing_start_method=multiprocessing_start_method, + max_workers=workers, + preprocessing_tasks=tasks.preprocessing_tasks_count, + tile_tasks=tasks.tile_tasks_count, + ) as executor: + if profiling: + for profiler in preconfigured_profilers: + executor.add_profiler(profiler) + else: + executor.add_profiler( + Profiler(name="time", decorator=measure_time) + ) + all_observers.notify( + status=Status.running, + progress=Progress(total=len(tasks)), + message=f"sending {len(tasks)} tasks to {executor} ...", + executor=executor, ) - all_observers.notify( - status=Status.running, - progress=Progress(total=len(tasks)), - message=f"sending {len(tasks)} tasks to {executor} ...", - executor=executor, - ) - # TODO it would be nice to track the time it took sending tasks to the executor - try: + # TODO it would be nice to track the time it took sending tasks to the executor for count, task_info in enumerate( mp.execute( executor=executor, @@ -186,20 +187,20 @@ def execute( all_observers.notify(status=Status.done) return - except cancel_on_exception: - # special exception indicating job was cancelled from the outside - all_observers.notify(status=Status.cancelled) - return + except cancel_on_exception: + # special exception indicating job was cancelled from the outside + all_observers.notify(status=Status.cancelled) + return - except retry_on_exception as exception: - if retries: - retries -= 1 - all_observers.notify( - status=Status.retrying, - message=f"run failed due to {repr(exception)} (remaining retries: {retries})", - ) - else: - raise + except retry_on_exception as exception: + if retries: + retries -= 1 + all_observers.notify( + status=Status.retrying, + message=f"run failed due to {repr(exception)} (remaining retries: {retries})", + ) + else: + raise except Exception as exception: all_observers.notify(status=Status.failed, exception=exception)