Skip to content

Commit

Permalink
Merge pull request #391 from ungarj/dask_executor_cleanup
Browse files Browse the repository at this point in the history
Dask executor cleanup
  • Loading branch information
ungarj authored Nov 22, 2021
2 parents 5b4b44c + 0a7a5e1 commit 4f454b9
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 149 deletions.
16 changes: 16 additions & 0 deletions mapchete/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def batch_preprocessor(
self,
dask_scheduler=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
workers=None,
executor=None,
):
Expand All @@ -218,6 +219,8 @@ def batch_preprocessor(
URL to a dask scheduler if distributed execution is desired
dask_max_submitted_tasks : int
Make sure that not more tasks are submitted to dask scheduler at once. (default: 500)
dask_chunksize : int
Number of tasks submitted to the scheduler at once. (default: 100)
workers : int
number of workers to be used for local processing
executor : mapchete.Executor
Expand All @@ -230,6 +233,7 @@ def batch_preprocessor(
process=self,
dask_scheduler=dask_scheduler,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
workers=workers,
executor=executor,
)
Expand All @@ -238,6 +242,7 @@ def batch_preprocess(
self,
dask_scheduler=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
workers=None,
executor=None,
):
Expand All @@ -251,6 +256,8 @@ def batch_preprocess(
URL to a dask scheduler if distributed execution is desired
dask_max_submitted_tasks : int
Make sure that not more tasks are submitted to dask scheduler at once. (default: 500)
dask_chunksize : int
Number of tasks submitted to the scheduler at once. (default: 100)
workers : int
number of workers to be used for local processing
executor : mapchete.Executor
Expand All @@ -260,6 +267,7 @@ def batch_preprocess(
self.batch_preprocessor(
dask_scheduler=dask_scheduler,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
workers=workers,
executor=executor,
)
Expand All @@ -271,6 +279,7 @@ def batch_process(
tile=None,
dask_scheduler=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
multi=None,
workers=None,
multiprocessing_module=None,
Expand All @@ -297,6 +306,8 @@ def batch_process(
URL to a dask scheduler if distributed execution is desired
dask_max_submitted_tasks : int
Make sure that not more tasks are submitted to dask scheduler at once. (default: 500)
dask_chunksize : int
Number of tasks submitted to the scheduler at once. (default: 100)
multiprocessing_module : module
either Python's standard 'multiprocessing' or Celery's 'billiard' module
(default: multiprocessing)
Expand All @@ -315,6 +326,7 @@ def batch_process(
tile=tile,
dask_scheduler=dask_scheduler,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
workers=workers,
multi=multi,
multiprocessing_module=multiprocessing_module,
Expand All @@ -330,6 +342,7 @@ def batch_processor(
tile=None,
dask_scheduler=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
multi=None,
workers=None,
multiprocessing_module=None,
Expand All @@ -352,6 +365,8 @@ def batch_processor(
URL to a dask scheduler if distributed execution is desired
dask_max_submitted_tasks : int
Make sure that not more tasks are submitted to dask scheduler at once. (default: 500)
dask_chunksize : int
Number of tasks submitted to the scheduler at once. (default: 100)
multi : int
number of workers (default: number of CPU cores)
multiprocessing_module : module
Expand Down Expand Up @@ -392,6 +407,7 @@ def batch_processor(
zoom_levels=list(_get_zoom_level(zoom, self)),
dask_scheduler=dask_scheduler,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
workers=workers or multiprocessing.cpu_count(),
multiprocessing_module=multiprocessing_module or multiprocessing,
multiprocessing_start_method=multiprocessing_start_method,
Expand Down
155 changes: 78 additions & 77 deletions mapchete/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def as_completed(
item_skip_bool=False,
**kwargs,
):
"""Submit tasks to executor in chunks and start yielding finished futures after each chunk."""
"""Submit tasks to executor and start yielding finished futures."""
try:
fargs = fargs or ()
fkwargs = fkwargs or {}
Expand Down Expand Up @@ -234,7 +234,7 @@ def as_completed(
max_submitted_tasks=500,
raise_cancelled=False,
item_skip_bool=False,
chunks=100,
chunksize=100,
**kwargs,
):
"""
Expand All @@ -259,7 +259,7 @@ def as_completed(
Further function keyword arguments.
max_submitted_tasks : int
Make sure that not more tasks are submitted to dask scheduler at once. (default: 500)
chunks : int
chunksize : int
Submit tasks in chunks to scheduler.
raise_cancelled : bool
If a future contains a CancelledError without the Exectuor having initiated the
Expand All @@ -273,7 +273,7 @@ def as_completed(
from dask.distributed import as_completed

max_submitted_tasks = max_submitted_tasks or 1
chunks = chunks or 1
chunksize = chunksize or 1

cancelled_exc = None

Expand All @@ -282,85 +282,86 @@ def as_completed(
fkwargs = fkwargs or {}
ac_iterator = as_completed(loop=self._executor.loop)

with Timer() as duration:
count = 0
submitted = 0
chunk = []
for count, item in enumerate(iterable, 1):
if self.cancelled: # pragma: no cover
logger.debug("executor cancelled")
return

# skip task submission if option is activated
if item_skip_bool:
item, skip, skip_info = item
if skip:
yield SkippedFuture(item, skip_info=skip_info)
continue

chunk.append(item)

# submit chunk of tasks, if
# (1) chunksize is reached, or
# (2) current chunk would not exceed maximum allowed number of running tasks
if len(chunk) % chunks == 0 or max_submitted_tasks - len(
self.running_futures
) - 1 <= len(chunk):
logger.debug("%s running futures", len(self.running_futures))
logger.debug("submitting %s items to cluster", len(chunk))
self._submit_chunk(
ac_iterator=ac_iterator,
chunk=chunk,
func=func,
fargs=fargs,
fkwargs=fkwargs,
)
submitted += len(chunk)
chunk = []
chunk = []
for item in iterable:

# abort if execution is cancelled
if self.cancelled: # pragma: no cover
logger.debug("executor cancelled")
return

# skip task submission if option is activated
if item_skip_bool:
item, skip, skip_info = item
if skip:
yield SkippedFuture(item, skip_info=skip_info)
continue

# add processing item to chunk
chunk.append(item)

# submit chunk of tasks, if
# (1) chunksize is reached, or
# (2) remaining free task spots are less than tasks in chunk
remaining_spots = max_submitted_tasks - len(self.running_futures)
if len(chunk) % chunksize == 0 or remaining_spots == len(chunk):
logger.debug(
"submitted futures (tracked): %s", len(self.running_futures)
)
logger.debug("remaining spots for futures: %s", remaining_spots)
logger.debug("current chunk size: %s", len(chunk))
self._submit_chunk(
ac_iterator=ac_iterator,
chunk=chunk,
func=func,
fargs=fargs,
fkwargs=fkwargs,
)
chunk = []

# yield finished tasks, if
# (1) there are finished tasks available, or
# (2) maximum allowed number of running tasks is reached
if ac_iterator.has_ready() or (
len(self.running_futures) >= max_submitted_tasks
):
# yield batch of finished futures
batch = ac_iterator.next_batch()
for future in batch:
logger.debug("%s remaining futures", ac_iterator.count())
try:
yield self._raise_future_exception(future)
except CancelledError as exc: # pragma: no cover
cancelled_exc = exc

# submit last chunk of items
self._submit_chunk(
ac_iterator=ac_iterator,
chunk=chunk,
func=func,
fargs=fargs,
fkwargs=fkwargs,
# yield finished tasks, if
# (1) there are finished tasks available, or
# (2) maximum allowed number of running tasks is reached
max_submitted_tasks_reached = (
len(self.running_futures) >= max_submitted_tasks
)
submitted += len(chunk)
chunk = []
logger.debug(
"%s remaining futures after submitting task %s",
ac_iterator.count(),
count,
)

logger.debug("%s tasks submitted in %s", count, duration)
# yield remaining futures as they finish
if ac_iterator is not None:
for future in ac_iterator:
if self.cancelled: # pragma: no cover
logger.debug("executor cancelled")
return
logger.debug("%s remaining futures", ac_iterator.count())
if ac_iterator.has_ready() or max_submitted_tasks_reached:
# yield batch of finished futures
# if maximum submitted tasks limit is reached, block call and wait for finished futures
logger.debug(
"wait for finished tasks: %s", max_submitted_tasks_reached
)
batch = ac_iterator.next_batch(block=max_submitted_tasks_reached)
logger.debug("%s tasks ready for yielding", len(batch))
for future in batch:
try:
yield self._raise_future_exception(future)
except CancelledError as exc: # pragma: no cover
cancelled_exc = exc
logger.debug(
"%s futures still on cluster", len(self.running_futures)
)

# submit last chunk of items
self._submit_chunk(
ac_iterator=ac_iterator,
chunk=chunk,
func=func,
fargs=fargs,
fkwargs=fkwargs,
)
chunk = []
# yield remaining futures as they finish
if ac_iterator is not None:
logger.debug("yield %s remaining futures", len(self.running_futures))
for future in ac_iterator:
if self.cancelled: # pragma: no cover
logger.debug("executor cancelled")
return
try:
yield self._raise_future_exception(future)
except CancelledError as exc: # pragma: no cover
cancelled_exc = exc

finally:
# reset so futures won't linger here for next call
Expand Down
7 changes: 7 additions & 0 deletions mapchete/_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def _preprocess(
process=None,
dask_scheduler=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
workers=None,
multiprocessing_module=None,
multiprocessing_start_method=None,
Expand Down Expand Up @@ -462,6 +463,7 @@ def _preprocess(
func=_preprocess_task_wrapper,
iterable=list(tasks.items()),
max_submitted_tasks=dask_max_submitted_tasks,
chunksize=dask_chunksize,
),
1,
):
Expand Down Expand Up @@ -528,6 +530,7 @@ def _run_area(
zoom_levels=None,
dask_scheduler=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
workers=None,
multiprocessing_module=None,
multiprocessing_start_method=None,
Expand All @@ -545,6 +548,7 @@ def _run_area(
process=process,
dask_scheduler=dask_scheduler,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
workers=workers,
multiprocessing_start_method=multiprocessing_start_method,
multiprocessing_module=multiprocessing_module,
Expand All @@ -563,6 +567,7 @@ def _run_area(
process=process,
dask_scheduler=dask_scheduler,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
workers=workers,
multiprocessing_start_method=multiprocessing_start_method,
multiprocessing_module=multiprocessing_module,
Expand Down Expand Up @@ -595,6 +600,7 @@ def _run_multi(
process=None,
dask_scheduler=None,
dask_max_submitted_tasks=500,
dask_chunksize=100,
workers=None,
multiprocessing_start_method=None,
multiprocessing_module=None,
Expand Down Expand Up @@ -663,6 +669,7 @@ def _run_multi(
),
fkwargs=fkwargs,
max_submitted_tasks=dask_max_submitted_tasks,
chunksize=dask_chunksize,
item_skip_bool=True,
):
# tiles which were not processed
Expand Down
4 changes: 4 additions & 0 deletions mapchete/commands/_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def convert(
concurrency: str = "processes",
dask_scheduler: str = None,
dask_max_submitted_tasks: int = 500,
dask_chunksize: int = 100,
dask_client=None,
workers: int = None,
multi: int = None,
Expand Down Expand Up @@ -107,6 +108,8 @@ def convert(
URL to dask scheduler if required.
dask_max_submitted_tasks : int
Make sure that not more tasks are submitted to dask scheduler at once. (default: 500)
dask_chunksize : int
Number of tasks submitted to the scheduler at once. (default: 100)
dask_client : dask.distributed.Client
Reusable Client instance if required. Otherwise a new client will be created.
clip_geometry : str
Expand Down Expand Up @@ -312,6 +315,7 @@ def _empty_callback(*args, **kwargs):
concurrency=concurrency,
dask_scheduler=dask_scheduler,
dask_max_submitted_tasks=dask_max_submitted_tasks,
dask_chunksize=dask_chunksize,
dask_client=dask_client,
workers=workers,
as_iterator=as_iterator,
Expand Down
Loading

0 comments on commit 4f454b9

Please sign in to comment.