From 04a8367a4000c6cb420c25e059d203749fd451c7 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Wed, 17 Nov 2021 10:36:36 +0100 Subject: [PATCH 1/2] use generators when creating indexes --- mapchete/index.py | 86 +++++++++----------------------------------- mapchete/io/_path.py | 2 -- 2 files changed, 17 insertions(+), 71 deletions(-) diff --git a/mapchete/index.py b/mapchete/index.py index e1ffb1d3..862e7e8b 100644 --- a/mapchete/index.py +++ b/mapchete/index.py @@ -152,91 +152,39 @@ def zoom_index_gen( logger.debug("use the following index writers: %s", index_writers) - # all output tiles for given process area - logger.debug("determine affected output tiles") if tile: - output_tiles = set( - mp.config.output_pyramid.intersecting( - mp.config.process_pyramid.tile(*tile) - ) + output_tiles_batches = mp.config.output_pyramid.tiles_from_bounds( + mp.config.process_pyramid.tile(*tile).bounds, zoom, batch_by="row" ) else: - output_tiles = set( - [ - t - for t in mp.config.output_pyramid.tiles_from_geom( - mp.config.area_at_zoom(zoom), zoom - ) - # this is required to omit tiles touching the config area - if t.bbox.intersection(mp.config.area_at_zoom(zoom)).area - ] - ) - # check which tiles exist in any index - logger.debug("check which tiles exist in index(es)") - existing_in_any_index = set( - t - for t in output_tiles - if any( - [ - i.entry_exists( - tile=t, - path=_tile_path( - orig_path=mp.config.output.get_path(t), - basepath=basepath, - for_gdal=for_gdal, - ), - ) - for i in index_writers - ] + output_tiles_batches = mp.config.output_pyramid.tiles_from_geom( + mp.config.area_at_zoom(zoom), zoom, batch_by="row" ) - ) - logger.debug( - "{}/{} tiles found in index(es)".format( - len(existing_in_any_index), len(output_tiles) - ) - ) - # tiles which do not exist in any index - for t, output_exists in tiles_exist( - mp.config, output_tiles=output_tiles.difference(existing_in_any_index) + for output_tile, exists in tiles_exist( + mp.config, output_tiles_batches=output_tiles_batches ): - tile_path = _tile_path( - orig_path=mp.config.output.get_path(t), - basepath=basepath, - for_gdal=for_gdal, - ) - indexes = [ - i - for i in index_writers - if not i.entry_exists(tile=t, path=tile_path) - ] - if indexes and output_exists: - logger.debug("%s exists", tile_path) - logger.debug("write to %s indexes" % len(indexes)) - for index in indexes: - index.write(t, tile_path) - # yield tile for progress information - yield t - # tiles which exist in at least one index - for t in existing_in_any_index: tile_path = _tile_path( - orig_path=mp.config.output.get_path(t), + orig_path=mp.config.output.get_path(output_tile), basepath=basepath, for_gdal=for_gdal, ) + # get indexes where tile entry does not exist indexes = [ - i - for i in index_writers - if not i.entry_exists(tile=t, path=tile_path) + index_writer + for index_writer in index_writers + if not index_writer.entry_exists(tile=output_tile, path=tile_path) ] - if indexes: + + if indexes and exists: logger.debug("%s exists", tile_path) - logger.debug("write to %s indexes" % len(indexes)) + logger.debug("write to %s indexes", len(indexes)) for index in indexes: - index.write(t, tile_path) + index.write(output_tile, tile_path) + # yield tile for progress information - yield t + yield output_tile def _index_file_path(out_dir, zoom, ext): diff --git a/mapchete/io/_path.py b/mapchete/io/_path.py index 1944f745..f5f7cf0c 100644 --- a/mapchete/io/_path.py +++ b/mapchete/io/_path.py @@ -180,8 +180,6 @@ def _tiles(): yield from _process_tiles_batches_exist(process_tiles_batches, config) elif output_tiles_batches: yield from _output_tiles_batches_exist(output_tiles_batches, config) - else: - return def _batch_tiles_by_row(tiles): From 0a7a5e1d9ae92245f1416899814ddc663ef4b53c Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Fri, 19 Nov 2021 14:40:47 +0100 Subject: [PATCH 2/2] cleanup dask executor; expose dask chunksize to execute and convert commands --- mapchete/_core.py | 16 ++++ mapchete/_executor.py | 155 +++++++++++++++++----------------- mapchete/_processing.py | 7 ++ mapchete/commands/_convert.py | 4 + mapchete/commands/_execute.py | 9 +- 5 files changed, 113 insertions(+), 78 deletions(-) diff --git a/mapchete/_core.py b/mapchete/_core.py index b608050d..f7e8fca6 100644 --- a/mapchete/_core.py +++ b/mapchete/_core.py @@ -203,6 +203,7 @@ def batch_preprocessor( self, dask_scheduler=None, dask_max_submitted_tasks=500, + dask_chunksize=100, workers=None, executor=None, ): @@ -218,6 +219,8 @@ def batch_preprocessor( URL to a dask scheduler if distributed execution is desired dask_max_submitted_tasks : int Make sure that not more tasks are submitted to dask scheduler at once. (default: 500) + dask_chunksize : int + Number of tasks submitted to the scheduler at once. (default: 100) workers : int number of workers to be used for local processing executor : mapchete.Executor @@ -230,6 +233,7 @@ def batch_preprocessor( process=self, dask_scheduler=dask_scheduler, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, workers=workers, executor=executor, ) @@ -238,6 +242,7 @@ def batch_preprocess( self, dask_scheduler=None, dask_max_submitted_tasks=500, + dask_chunksize=100, workers=None, executor=None, ): @@ -251,6 +256,8 @@ def batch_preprocess( URL to a dask scheduler if distributed execution is desired dask_max_submitted_tasks : int Make sure that not more tasks are submitted to dask scheduler at once. (default: 500) + dask_chunksize : int + Number of tasks submitted to the scheduler at once. (default: 100) workers : int number of workers to be used for local processing executor : mapchete.Executor @@ -260,6 +267,7 @@ def batch_preprocess( self.batch_preprocessor( dask_scheduler=dask_scheduler, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, workers=workers, executor=executor, ) @@ -271,6 +279,7 @@ def batch_process( tile=None, dask_scheduler=None, dask_max_submitted_tasks=500, + dask_chunksize=100, multi=None, workers=None, multiprocessing_module=None, @@ -297,6 +306,8 @@ def batch_process( URL to a dask scheduler if distributed execution is desired dask_max_submitted_tasks : int Make sure that not more tasks are submitted to dask scheduler at once. (default: 500) + dask_chunksize : int + Number of tasks submitted to the scheduler at once. (default: 100) multiprocessing_module : module either Python's standard 'multiprocessing' or Celery's 'billiard' module (default: multiprocessing) @@ -315,6 +326,7 @@ def batch_process( tile=tile, dask_scheduler=dask_scheduler, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, workers=workers, multi=multi, multiprocessing_module=multiprocessing_module, @@ -330,6 +342,7 @@ def batch_processor( tile=None, dask_scheduler=None, dask_max_submitted_tasks=500, + dask_chunksize=100, multi=None, workers=None, multiprocessing_module=None, @@ -352,6 +365,8 @@ def batch_processor( URL to a dask scheduler if distributed execution is desired dask_max_submitted_tasks : int Make sure that not more tasks are submitted to dask scheduler at once. (default: 500) + dask_chunksize : int + Number of tasks submitted to the scheduler at once. (default: 100) multi : int number of workers (default: number of CPU cores) multiprocessing_module : module @@ -392,6 +407,7 @@ def batch_processor( zoom_levels=list(_get_zoom_level(zoom, self)), dask_scheduler=dask_scheduler, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, workers=workers or multiprocessing.cpu_count(), multiprocessing_module=multiprocessing_module or multiprocessing, multiprocessing_start_method=multiprocessing_start_method, diff --git a/mapchete/_executor.py b/mapchete/_executor.py index 2f23b73c..9ee36a84 100644 --- a/mapchete/_executor.py +++ b/mapchete/_executor.py @@ -68,7 +68,7 @@ def as_completed( item_skip_bool=False, **kwargs, ): - """Submit tasks to executor in chunks and start yielding finished futures after each chunk.""" + """Submit tasks to executor and start yielding finished futures.""" try: fargs = fargs or () fkwargs = fkwargs or {} @@ -234,7 +234,7 @@ def as_completed( max_submitted_tasks=500, raise_cancelled=False, item_skip_bool=False, - chunks=100, + chunksize=100, **kwargs, ): """ @@ -259,7 +259,7 @@ def as_completed( Further function keyword arguments. max_submitted_tasks : int Make sure that not more tasks are submitted to dask scheduler at once. (default: 500) - chunks : int + chunksize : int Submit tasks in chunks to scheduler. raise_cancelled : bool If a future contains a CancelledError without the Exectuor having initiated the @@ -273,7 +273,7 @@ def as_completed( from dask.distributed import as_completed max_submitted_tasks = max_submitted_tasks or 1 - chunks = chunks or 1 + chunksize = chunksize or 1 cancelled_exc = None @@ -282,85 +282,86 @@ def as_completed( fkwargs = fkwargs or {} ac_iterator = as_completed(loop=self._executor.loop) - with Timer() as duration: - count = 0 - submitted = 0 - chunk = [] - for count, item in enumerate(iterable, 1): - if self.cancelled: # pragma: no cover - logger.debug("executor cancelled") - return - - # skip task submission if option is activated - if item_skip_bool: - item, skip, skip_info = item - if skip: - yield SkippedFuture(item, skip_info=skip_info) - continue - - chunk.append(item) - - # submit chunk of tasks, if - # (1) chunksize is reached, or - # (2) current chunk would not exceed maximum allowed number of running tasks - if len(chunk) % chunks == 0 or max_submitted_tasks - len( - self.running_futures - ) - 1 <= len(chunk): - logger.debug("%s running futures", len(self.running_futures)) - logger.debug("submitting %s items to cluster", len(chunk)) - self._submit_chunk( - ac_iterator=ac_iterator, - chunk=chunk, - func=func, - fargs=fargs, - fkwargs=fkwargs, - ) - submitted += len(chunk) - chunk = [] + chunk = [] + for item in iterable: + + # abort if execution is cancelled + if self.cancelled: # pragma: no cover + logger.debug("executor cancelled") + return + + # skip task submission if option is activated + if item_skip_bool: + item, skip, skip_info = item + if skip: + yield SkippedFuture(item, skip_info=skip_info) + continue + + # add processing item to chunk + chunk.append(item) + + # submit chunk of tasks, if + # (1) chunksize is reached, or + # (2) remaining free task spots are less than tasks in chunk + remaining_spots = max_submitted_tasks - len(self.running_futures) + if len(chunk) % chunksize == 0 or remaining_spots == len(chunk): + logger.debug( + "submitted futures (tracked): %s", len(self.running_futures) + ) + logger.debug("remaining spots for futures: %s", remaining_spots) + logger.debug("current chunk size: %s", len(chunk)) + self._submit_chunk( + ac_iterator=ac_iterator, + chunk=chunk, + func=func, + fargs=fargs, + fkwargs=fkwargs, + ) + chunk = [] - # yield finished tasks, if - # (1) there are finished tasks available, or - # (2) maximum allowed number of running tasks is reached - if ac_iterator.has_ready() or ( - len(self.running_futures) >= max_submitted_tasks - ): - # yield batch of finished futures - batch = ac_iterator.next_batch() - for future in batch: - logger.debug("%s remaining futures", ac_iterator.count()) - try: - yield self._raise_future_exception(future) - except CancelledError as exc: # pragma: no cover - cancelled_exc = exc - - # submit last chunk of items - self._submit_chunk( - ac_iterator=ac_iterator, - chunk=chunk, - func=func, - fargs=fargs, - fkwargs=fkwargs, + # yield finished tasks, if + # (1) there are finished tasks available, or + # (2) maximum allowed number of running tasks is reached + max_submitted_tasks_reached = ( + len(self.running_futures) >= max_submitted_tasks ) - submitted += len(chunk) - chunk = [] - logger.debug( - "%s remaining futures after submitting task %s", - ac_iterator.count(), - count, - ) - - logger.debug("%s tasks submitted in %s", count, duration) - # yield remaining futures as they finish - if ac_iterator is not None: - for future in ac_iterator: - if self.cancelled: # pragma: no cover - logger.debug("executor cancelled") - return - logger.debug("%s remaining futures", ac_iterator.count()) + if ac_iterator.has_ready() or max_submitted_tasks_reached: + # yield batch of finished futures + # if maximum submitted tasks limit is reached, block call and wait for finished futures + logger.debug( + "wait for finished tasks: %s", max_submitted_tasks_reached + ) + batch = ac_iterator.next_batch(block=max_submitted_tasks_reached) + logger.debug("%s tasks ready for yielding", len(batch)) + for future in batch: try: yield self._raise_future_exception(future) except CancelledError as exc: # pragma: no cover cancelled_exc = exc + logger.debug( + "%s futures still on cluster", len(self.running_futures) + ) + + # submit last chunk of items + self._submit_chunk( + ac_iterator=ac_iterator, + chunk=chunk, + func=func, + fargs=fargs, + fkwargs=fkwargs, + ) + chunk = [] + # yield remaining futures as they finish + if ac_iterator is not None: + logger.debug("yield %s remaining futures", len(self.running_futures)) + for future in ac_iterator: + if self.cancelled: # pragma: no cover + logger.debug("executor cancelled") + return + try: + yield self._raise_future_exception(future) + except CancelledError as exc: # pragma: no cover + cancelled_exc = exc finally: # reset so futures won't linger here for next call diff --git a/mapchete/_processing.py b/mapchete/_processing.py index 359d71f8..2e73c5d5 100644 --- a/mapchete/_processing.py +++ b/mapchete/_processing.py @@ -431,6 +431,7 @@ def _preprocess( process=None, dask_scheduler=None, dask_max_submitted_tasks=500, + dask_chunksize=100, workers=None, multiprocessing_module=None, multiprocessing_start_method=None, @@ -462,6 +463,7 @@ def _preprocess( func=_preprocess_task_wrapper, iterable=list(tasks.items()), max_submitted_tasks=dask_max_submitted_tasks, + chunksize=dask_chunksize, ), 1, ): @@ -528,6 +530,7 @@ def _run_area( zoom_levels=None, dask_scheduler=None, dask_max_submitted_tasks=500, + dask_chunksize=100, workers=None, multiprocessing_module=None, multiprocessing_start_method=None, @@ -545,6 +548,7 @@ def _run_area( process=process, dask_scheduler=dask_scheduler, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, workers=workers, multiprocessing_start_method=multiprocessing_start_method, multiprocessing_module=multiprocessing_module, @@ -563,6 +567,7 @@ def _run_area( process=process, dask_scheduler=dask_scheduler, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, workers=workers, multiprocessing_start_method=multiprocessing_start_method, multiprocessing_module=multiprocessing_module, @@ -595,6 +600,7 @@ def _run_multi( process=None, dask_scheduler=None, dask_max_submitted_tasks=500, + dask_chunksize=100, workers=None, multiprocessing_start_method=None, multiprocessing_module=None, @@ -663,6 +669,7 @@ def _run_multi( ), fkwargs=fkwargs, max_submitted_tasks=dask_max_submitted_tasks, + chunksize=dask_chunksize, item_skip_bool=True, ): # tiles which were not processed diff --git a/mapchete/commands/_convert.py b/mapchete/commands/_convert.py index 956d53bc..3b1da984 100644 --- a/mapchete/commands/_convert.py +++ b/mapchete/commands/_convert.py @@ -44,6 +44,7 @@ def convert( concurrency: str = "processes", dask_scheduler: str = None, dask_max_submitted_tasks: int = 500, + dask_chunksize: int = 100, dask_client=None, workers: int = None, multi: int = None, @@ -107,6 +108,8 @@ def convert( URL to dask scheduler if required. dask_max_submitted_tasks : int Make sure that not more tasks are submitted to dask scheduler at once. (default: 500) + dask_chunksize : int + Number of tasks submitted to the scheduler at once. (default: 100) dask_client : dask.distributed.Client Reusable Client instance if required. Otherwise a new client will be created. clip_geometry : str @@ -312,6 +315,7 @@ def _empty_callback(*args, **kwargs): concurrency=concurrency, dask_scheduler=dask_scheduler, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, dask_client=dask_client, workers=workers, as_iterator=as_iterator, diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index 9c212fa1..63b03650 100755 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -31,7 +31,8 @@ def execute( multi: int = None, multiprocessing_start_method: str = None, dask_scheduler: str = None, - dask_max_submitted_tasks=500, + dask_max_submitted_tasks=1000, + dask_chunksize=100, dask_client=None, msg_callback: Callable = None, as_iterator: bool = False, @@ -75,6 +76,8 @@ def execute( URL to dask scheduler if required. dask_max_submitted_tasks : int Make sure that not more tasks are submitted to dask scheduler at once. (default: 500) + dask_chunksize : int + Number of tasks submitted to the scheduler at once. (default: 100) dask_client : dask.distributed.Client Reusable Client instance if required. Otherwise a new client will be created. msg_callback : Callable @@ -161,6 +164,7 @@ def _empty_callback(_): workers=workers, zoom=None if tile else zoom, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, ), executor_concurrency=concurrency, executor_kwargs=dict( @@ -184,6 +188,7 @@ def _process_everything( executor=None, workers=None, dask_max_submitted_tasks=500, + dask_chunksize=100, **kwargs, ): try: @@ -191,6 +196,7 @@ def _process_everything( executor=executor, workers=workers, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, ): # pragma: no cover yield preprocessing_task_info msg_callback(preprocessing_task_info) @@ -198,6 +204,7 @@ def _process_everything( executor=executor, workers=workers, dask_max_submitted_tasks=dask_max_submitted_tasks, + dask_chunksize=dask_chunksize, **kwargs, ): yield process_info