diff --git a/.gitignore b/.gitignore index 8aee8931..06c9e218 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ venv/ .eggs/ htmlcov/ .pytest* -.env \ No newline at end of file +.env +dask-worker-space/ \ No newline at end of file diff --git a/doc/source/apidoc/mapchete.cli.options.rst b/doc/source/apidoc/mapchete.cli.options.rst new file mode 100644 index 00000000..bbbcf996 --- /dev/null +++ b/doc/source/apidoc/mapchete.cli.options.rst @@ -0,0 +1,7 @@ +mapchete.cli.options module +=========================== + +.. automodule:: mapchete.cli.options + :members: + :undoc-members: + :show-inheritance: diff --git a/doc/source/apidoc/mapchete.cli.rst b/doc/source/apidoc/mapchete.cli.rst index e628f73f..1685ca23 100644 --- a/doc/source/apidoc/mapchete.cli.rst +++ b/doc/source/apidoc/mapchete.cli.rst @@ -16,7 +16,7 @@ Submodules :maxdepth: 4 mapchete.cli.main - mapchete.cli.utils + mapchete.cli.options Module contents --------------- diff --git a/mapchete/__init__.py b/mapchete/__init__.py index f2261228..8c50305f 100644 --- a/mapchete/__init__.py +++ b/mapchete/__init__.py @@ -1,7 +1,8 @@ import logging from mapchete._core import open, Mapchete -from mapchete._processing import MapcheteProcess, ProcessInfo +from mapchete._executor import Executor +from mapchete._processing import Job, MapcheteProcess, ProcessInfo from mapchete.tile import count_tiles from mapchete._timer import Timer diff --git a/mapchete/_core.py b/mapchete/_core.py index 7b30d1ff..13c15e91 100644 --- a/mapchete/_core.py +++ b/mapchete/_core.py @@ -6,7 +6,7 @@ import os import threading -from mapchete.config import MapcheteConfig +from mapchete.config import MapcheteConfig, MULTIPROCESSING_DEFAULT_START_METHOD from mapchete.errors import MapcheteNodataTile from mapchete.formats import read_output_metadata from mapchete.io import fs_from_path, tiles_exist @@ -83,8 +83,8 @@ def open(some_input, with_cache=False, fs=None, fs_kwargs=None, **kwargs): ) kwargs.update(mode="readonly") return Mapchete(MapcheteConfig(config, **kwargs)) - else: - return Mapchete(MapcheteConfig(some_input, **kwargs), with_cache=with_cache) + + return Mapchete(MapcheteConfig(some_input, **kwargs), with_cache=with_cache) class Mapchete(object): @@ -155,9 +155,9 @@ def get_process_tiles(self, zoom=None): ): yield tile else: - for zoom in reversed(self.config.zoom_levels): + for i in reversed(self.config.zoom_levels): for tile in self.config.process_pyramid.tiles_from_geom( - self.config.area_at_zoom(zoom), zoom + self.config.area_at_zoom(i), i ): yield tile @@ -188,11 +188,14 @@ def batch_process( self, zoom=None, tile=None, + distributed=False, + dask_scheduler=None, multi=None, max_chunksize=1, multiprocessing_module=None, - multiprocessing_start_method="fork", + multiprocessing_start_method=MULTIPROCESSING_DEFAULT_START_METHOD, skip_output_check=False, + executor=None, ): """ Process a large batch of tiles. @@ -217,7 +220,7 @@ def batch_process( (default: multiprocessing) multiprocessing_start_method : str "fork", "forkserver" or "spawn" - (default: "fork") + (default: "spawn") skip_output_check : bool skip checking whether process tiles already have existing output before starting to process; @@ -226,11 +229,14 @@ def batch_process( self.batch_processor( zoom=zoom, tile=tile, + distributed=distributed, + dask_scheduler=dask_scheduler, multi=multi or multiprocessing.cpu_count(), max_chunksize=max_chunksize, multiprocessing_module=multiprocessing_module or multiprocessing, multiprocessing_start_method=multiprocessing_start_method, skip_output_check=skip_output_check, + executor=executor, ) ) @@ -238,11 +244,14 @@ def batch_processor( self, zoom=None, tile=None, + distributed=False, + dask_scheduler=None, multi=None, max_chunksize=1, multiprocessing_module=None, - multiprocessing_start_method="fork", + multiprocessing_start_method=MULTIPROCESSING_DEFAULT_START_METHOD, skip_output_check=False, + executor=None, ): """ Process a large batch of tiles and yield report messages per tile. @@ -265,7 +274,7 @@ def batch_processor( (default: multiprocessing) multiprocessing_start_method : str "fork", "forkserver" or "spawn" - (default: "fork") + (default: "spawn") skip_output_check : bool skip checking whether process tiles already have existing output before starting to process; @@ -276,18 +285,23 @@ def batch_processor( # run single tile if tile: yield _run_on_single_tile( - process=self, tile=self.config.process_pyramid.tile(*tuple(tile)) + executor=executor, + process=self, + tile=self.config.process_pyramid.tile(*tuple(tile)), ) # run area else: for process_info in _run_area( process=self, zoom_levels=list(_get_zoom_level(zoom, self)), + distributed=distributed, + dask_scheduler=dask_scheduler, multi=multi or multiprocessing.cpu_count(), max_chunksize=max_chunksize, multiprocessing_module=multiprocessing_module or multiprocessing, multiprocessing_start_method=multiprocessing_start_method, skip_output_check=skip_output_check, + executor=executor, ): yield process_info @@ -315,7 +329,7 @@ def count_tiles(self, minzoom=None, maxzoom=None, init_zoom=0): self.config.process_pyramid, minzoom, maxzoom, - init_zoom=0, + init_zoom=init_zoom, ) return self._count_tiles_cache[(minzoom, maxzoom)] @@ -344,8 +358,7 @@ def execute(self, process_tile, raise_nodata=False): except MapcheteNodataTile: if raise_nodata: # pragma: no cover raise - else: - return self.config.output.empty(process_tile) + return self.config.output.empty(process_tile) def read(self, output_tile): """ diff --git a/mapchete/_executor.py b/mapchete/_executor.py new file mode 100644 index 00000000..c6817274 --- /dev/null +++ b/mapchete/_executor.py @@ -0,0 +1,262 @@ +from cached_property import cached_property +import concurrent.futures +from functools import partial +from itertools import chain +import logging +import multiprocessing +import os + +from mapchete.config import MULTIPROCESSING_DEFAULT_START_METHOD +from mapchete.log import set_log_level + + +logger = logging.getLogger(__name__) + + +class Executor: + """ + Executor factory for dask and concurrent.futures executor + + Will move into the mapchete core package. + """ + + def __new__(self, *args, concurrency=None, **kwargs): + if concurrency == "dask": + try: + return DaskExecutor(*args, **kwargs) + except ImportError as e: # pragma: no cover + raise ImportError( + f"this feature requires the mapchete[dask] extra: {e}" + ) + elif concurrency is None or kwargs.get("max_workers") == 1: + return SequentialExecutor(*args, **kwargs) + elif concurrency in ["processes", "threads"]: + return ConcurrentFuturesExecutor(*args, concurrency=concurrency, **kwargs) + else: # pragma: no cover + raise ValueError( + f"concurrency must be one of None, 'processes', 'threads' or 'dask', not {concurrency}" + ) + + +class _ExecutorBase: + """Define base methods and properties of executors.""" + + futures = None + _as_completed = None + _executor = None + _executor_cls = None + _executor_args = () + _executor_kwargs = {} + + def as_completed(self, func, iterable, fargs=None, fkwargs=None): + """Yield finished tasks.""" + fargs = fargs or () + fkwargs = fkwargs or {} + logger.debug("submitting tasks to executor") + futures = [ + self._executor.submit(func, *chain([item], fargs), **fkwargs) + for item in iterable + ] + self.futures.extend(futures) + logger.debug(f"added {len(futures)} tasks") + for future in self._as_completed(futures): + yield future + + def cancel(self): + logger.debug(f"cancel {len(self.futures)} futures...") + for future in self.futures: + future.cancel() + logger.debug(f"{len(self.futures)} futures cancelled") + + def close(self): # pragma: no cover + self.__exit__(None, None, None) + + def _as_completed(self, *args, **kwargs): # pragma: no cover + raise NotImplementedError() + + @cached_property + def _executor(self): + return self._executor_cls(*self._executor_args, **self._executor_kwargs) + + def __enter__(self): + """Enter context manager.""" + return self + + def __exit__(self, *args): + """Exit context manager.""" + logger.debug(f"closing executor {self._executor}...") + try: + self._executor.close() + except Exception: + self._executor.__exit__(*args) + logger.debug(f"closed executor {self._executor}") + + def __repr__(self): # pragma: no cover + return f"" + + +class DaskExecutor(_ExecutorBase): + """Execute tasks using dask cluster.""" + + def __init__( + self, + *args, + address=None, + dask_scheduler=None, + dask_client=None, + max_workers=None, + **kwargs, + ): + from dask.distributed import Client, LocalCluster + + self.futures = [] + self._executor_client = dask_client + if self._executor_client: # pragma: no cover + logger.debug(f"using existing dask client: {dask_client}") + else: + local_cluster_kwargs = dict( + n_workers=max_workers or os.cpu_count(), threads_per_worker=1 + ) + self._executor_cls = Client + self._executor_kwargs = dict( + address=dask_scheduler or LocalCluster(**local_cluster_kwargs), + ) + logger.debug( + f"starting dask.distributed.Client with kwargs {self._executor_kwargs}" + ) + + def cancel(self): + logger.debug(f"cancel {len(self.futures)} futures...") + for future in self.futures: + future.cancel() + logger.debug(f"{len(self.futures)} futures cancelled") + + def _as_completed(self, futures): + from dask.distributed import as_completed + + if futures: + for future in as_completed(futures): + yield future + + @cached_property + def _executor(self): + return self._executor_client or self._executor_cls( + *self._executor_args, **self._executor_kwargs + ) + + def __exit__(self, *args): + """Exit context manager.""" + if self._executor_client: # pragma: no cover + logger.debug("client not closing as it was passed on as kwarg") + else: + logger.debug(f"closing executor {self._executor}...") + try: + self._executor.close() + except Exception: # pragma: no cover + self._executor.__exit__(*args) + logger.debug(f"closed executor {self._executor}") + + +class ConcurrentFuturesExecutor(_ExecutorBase): + """Execute tasks using concurrent.futures.""" + + def __init__( + self, + *args, + max_workers=None, + concurrency="processes", + **kwargs, + ): + """Set attributes.""" + + self.max_workers = max_workers or os.cpu_count() + self.futures = [] + self._executor_kwargs = dict( + max_workers=self.max_workers, + initializer=set_log_level, + initargs=(logger.getEffectiveLevel(),), + ) + if concurrency == "processes": + self._executor_cls = concurrent.futures.ProcessPoolExecutor + start_method = ( + kwargs.get("multiprocessing_start_method") + or MULTIPROCESSING_DEFAULT_START_METHOD + ) + self._executor_kwargs.update( + mp_context=multiprocessing.get_context(method=start_method) + ) + elif concurrency == "threads": + self._executor_cls = concurrent.futures.ThreadPoolExecutor + else: # pragma: no cover + raise ValueError("concurrency must either be 'processes' or 'threads'") + logger.debug( + f"init ConcurrentFuturesExecutor using {concurrency} with {self.max_workers} workers" + ) + + def _as_completed(self, futures): + """Yield finished tasks.""" + for future in concurrent.futures.as_completed(futures): + yield future + + +class SequentialExecutor(_ExecutorBase): + """Execute tasks sequentially in single process.""" + + def __init__(self, *args, **kwargs): + """Set attributes.""" + logger.debug("init SequentialExecutor") + self.futures = [] + self._cancel = False + + def as_completed(self, func, iterable, fargs=None, fkwargs=None): + """Yield finished tasks.""" + fargs = fargs or [] + fkwargs = fkwargs or {} + for i in iterable: + if self._cancel: + return + yield FakeFuture(func, fargs=[i, *fargs], fkwargs=fkwargs) + + def cancel(self): + self._cancel = True + + def __exit__(self, *args): + """Exit context manager.""" + logger.debug("SequentialExecutor closed") + + def __repr__(self): # pragma: no cover + """Return string representation.""" + return "SequentialExecutor" + + +class FakeFuture: + """Wrapper class to mimick future interface.""" + + def __init__(self, func, fargs=None, fkwargs=None): + """Set attributes.""" + fargs = fargs or [] + fkwargs = fkwargs or {} + try: + self._result, self._exception = func(*fargs, **fkwargs), None + except Exception as e: # pragma: no cover + self._result, self._exception = None, e + + def result(self): + """Return task result.""" + if self._exception: + logger.exception(self._exception) + raise self._exception + else: + return self._result + + def exception(self): + """Raise task exception if any.""" + return self._exception + + def cancelled(self): # pragma: no cover + """Sequential futures cannot be cancelled.""" + return False + + def __repr__(self): # pragma: no cover + """Return string representation.""" + return f"FakeFuture(result={self._result}, exception={self._exception})" diff --git a/mapchete/_processing.py b/mapchete/_processing.py index dd51d967..8f2e1ff9 100644 --- a/mapchete/_processing.py +++ b/mapchete/_processing.py @@ -1,15 +1,17 @@ from collections import namedtuple -from functools import partial +from concurrent.futures._base import CancelledError from itertools import chain import logging import multiprocessing import os from traceback import format_exc +from typing import Generator from mapchete.commons import clip as commons_clip from mapchete.commons import contours as commons_contours from mapchete.commons import hillshade as commons_hillshade from mapchete.config import get_process_func +from mapchete._executor import Executor from mapchete.errors import MapcheteNodataTile, MapcheteProcessException from mapchete.io import raster from mapchete._timer import Timer @@ -21,6 +23,69 @@ ProcessInfo = namedtuple("ProcessInfo", "tile processed process_msg written write_msg") +class Job: + """ + Wraps the output of a processing function into a generator with known length. + + This class also exposes the internal Executor.cancel() function in order to cancel all remaining + tasks/futures. + + Will move into the mapchete core package. + """ + + def __init__( + self, + func: Generator, + fargs: tuple = None, + fkwargs: dict = None, + as_iterator: bool = False, + total: int = None, + executor_concurrency: str = "processes", + executor_kwargs: dict = None, + ): + self.func = func + self.fargs = fargs or () + self.fkwargs = fkwargs or {} + self.status = "pending" + self.executor = None + self.executor_concurrency = executor_concurrency + self.executor_kwargs = executor_kwargs or {} + self._total = total + self._as_iterator = as_iterator + if not as_iterator: + self._results = list(self._run()) + + def _run(self): + if self._total == 0: + return + with Executor( + concurrency=self.executor_concurrency, **self.executor_kwargs + ) as self.executor: + self.status = "running" + yield from self.func(*self.fargs, executor=self.executor, **self.fkwargs) + self.status = "finished" + + def cancel(self): + if self._as_iterator: + # requires client and futures + if self.executor is None: # pragma: no cover + raise ValueError("nothing to cancel because no executor is running") + self.executor.cancel() + self.status = "cancelled" + + def __len__(self): + return self._total + + def __iter__(self): + if self._as_iterator: + yield from self._run() + else: + return self._results + + def __repr__(self): # pragma: no cover + return f"<{self.status} Job with {self._total} tasks.>" + + class TileProcess: """ Class to process on a specific process tile. @@ -36,7 +101,7 @@ def __init__(self, tile=None, config=None, skip=False): self.skip = skip self.config_zoom_levels = None if skip else config.zoom_levels self.config_baselevels = None if skip else config.baselevels - self.process_path = None if skip else config.process_path + self.process = None if skip else config.process self.config_dir = None if skip else config.config_dir if skip or self.tile.zoom not in self.config_zoom_levels: self.input, self.process_func_params, self.output_params = {}, {}, {} @@ -85,7 +150,7 @@ def _execute(self): return self._interpolate_from_baselevel("higher") # Otherwise, execute from process file. process_func = get_process_func( - process_path=self.process_path, config_dir=self.config_dir + process=self.process, config_dir=self.config_dir ) try: with Timer() as t: @@ -97,7 +162,7 @@ def _execute(self): input=self.input, output_params=self.output_params, ), - **self.process_func_params + **self.process_func_params, ) except MapcheteNodataTile: raise @@ -343,109 +408,12 @@ def clip(self, array, geometries, inverted=False, clip_buffer=0): ) -############################################################# -# wrappers helping to abstract multiprocessing and billiard # -############################################################# -class Executor: - """Wrapper class to be used with multiprocessing or billiard.""" - - def __init__( - self, - start_method="fork", - max_workers=None, - multiprocessing_module=multiprocessing, - ): - """Set attributes.""" - self.start_method = start_method - self.max_workers = max_workers or os.cpu_count() - self.multiprocessing_module = multiprocessing_module - if self.max_workers != 1: - logger.debug( - "init %s Executor with start_method %s and %s workers", - self.multiprocessing_module, - self.start_method, - self.max_workers, - ) - self._pool = self.multiprocessing_module.get_context( - self.start_method - ).Pool(self.max_workers) - - def as_completed( - self, func=None, iterable=None, fargs=None, fkwargs=None, chunksize=1 - ): - """Yield finished tasks.""" - fargs = fargs or [] - fkwargs = fkwargs or {} - if self.max_workers == 1: - for i in iterable: - yield _exception_wrapper(func, fargs, fkwargs, i) - else: - logger.debug( - "open multiprocessing.Pool and %s %s workers", - self.start_method, - self.max_workers, - ) - for finished_task in self._pool.imap_unordered( - partial(_exception_wrapper, func, fargs, fkwargs), - iterable, - chunksize=chunksize or 1, - ): - yield finished_task - - def __enter__(self): - """Enter context manager.""" - if self.max_workers != 1: - self._pool.__enter__() - return self - - def __exit__(self, *args): - """Exit context manager.""" - if self.max_workers != 1: - logger.debug("closing %s and workers", self._pool) - self._pool.__exit__(*args) - logger.debug("%s closed", self._pool) - - -class FinishedTask: - """Wrapper class to encapsulate exceptions.""" - - def __init__(self, func, fargs=None, fkwargs=None): - """Set attributes.""" - fargs = fargs or [] - fkwargs = fkwargs or {} - try: - self._result, self._exception = func(*fargs, **fkwargs), None - except Exception as e: # pragma: no cover - self._result, self._exception = None, e - - def result(self): - """Return task result.""" - if self._exception: - logger.exception(self._exception) - raise self._exception - else: - return self._result - - def exception(self): - """Raise task exception if any.""" - return self._exception - - def __repr__(self): - """Return string representation.""" - return "FinishedTask(result=%s, exception=%s)" % (self._result, self._exception) - - -def _exception_wrapper(func, fargs, fkwargs, i): - """Wrap function around FinishedTask object.""" - return FinishedTask(func, list(chain([i], fargs)), fkwargs) - - ########################### # batch execution options # ########################### -def _run_on_single_tile(process=None, tile=None): +def _run_on_single_tile(executor=None, process=None, tile=None): logger.debug("run process on single tile") return _execute_and_write( tile_process=TileProcess( @@ -461,8 +429,11 @@ def _run_on_single_tile(process=None, tile=None): def _run_area( + executor=None, process=None, zoom_levels=None, + distributed=False, + dask_scheduler=None, multi=None, max_chunksize=None, multiprocessing_module=None, @@ -475,9 +446,12 @@ def _run_area( # for output drivers requiring writing data in parent process if process.config.output.write_in_parent_process: for process_info in _run_multi( + executor=executor, func=_execute, zoom_levels=zoom_levels, process=process, + distributed=distributed, + dask_scheduler=dask_scheduler, multi=multi, multiprocessing_start_method=multiprocessing_start_method, multiprocessing_module=multiprocessing_module, @@ -490,10 +464,13 @@ def _run_area( # for output drivers which can write data in child processes else: for process_info in _run_multi( + executor=executor, func=_execute_and_write, fkwargs=dict(output_writer=process.config.output), zoom_levels=zoom_levels, process=process, + dask_scheduler=dask_scheduler, + distributed=distributed, multi=multi, multiprocessing_start_method=multiprocessing_start_method, multiprocessing_module=multiprocessing_module, @@ -520,9 +497,12 @@ def _filter_skipable(process=None, tiles=None, todo=None, target_set=None): def _run_multi( + executor=None, func=None, zoom_levels=None, process=None, + distributed=False, + dask_scheduler=None, multi=None, multiprocessing_start_method=None, multiprocessing_module=None, @@ -534,38 +514,102 @@ def _run_multi( total_tiles = process.count_tiles(min(zoom_levels), max(zoom_levels)) workers = min([multi, total_tiles]) num_processed = 0 - logger.debug("run process on %s tiles using %s workers", total_tiles, workers) # here we store the parents of processed tiles so we can update overviews # also in "continue" mode in case there were updates at the baselevel overview_parents = set() - with Timer() as t, Executor( + # If an Executor is passed on, don't close after processing. If no Executor is passed on, + # create one and properly close it afterwards. + create_executor = executor is None + executor = executor or Executor( max_workers=workers, + concurrency="dask" if dask_scheduler else "processes", start_method=multiprocessing_start_method, multiprocessing_module=multiprocessing_module, - ) as executor: - - for i, zoom in enumerate(zoom_levels): - - if skip_output_check: # pragma: no cover - # don't check outputs and simply proceed - todo = process.get_process_tiles(zoom) - else: - # check which process output already exists and which process tiles need - # to be added to todo list - todo = set() - logger.debug("check skippable tiles") - for process_info in _filter_skipable( - process=process, - tiles=process.get_process_tiles(zoom), - todo=todo, - target_set=( - overview_parents if process.config.baselevels and i else None + dask_scheduler=dask_scheduler, + ) + + try: + with Timer() as t: + logger.debug( + "run process on %s tiles using %s workers", total_tiles, workers + ) + + for i, zoom in enumerate(zoom_levels): + + if skip_output_check: # pragma: no cover + # don't check outputs and simply proceed + todo = process.get_process_tiles(zoom) + else: + # check which process output already exists and which process tiles need + # to be added to todo list + todo = set() + logger.debug("check skippable tiles") + for process_info in _filter_skipable( + process=process, + tiles=process.get_process_tiles(zoom), + todo=todo, + target_set=( + overview_parents + if process.config.baselevels and i + else None + ), + ): + num_processed += 1 + logger.debug( + "tile %s/%s finished: %s, %s, %s", + num_processed, + total_tiles, + process_info.tile, + process_info.process_msg, + process_info.write_msg, + ) + yield process_info + + # process all remaining tiles using todo list from before + for future in executor.as_completed( + func=func, + iterable=( + TileProcess( + tile=tile, + config=process.config, + skip=( + process.mode == "continue" + and process.config.output_reader.tiles_exist(tile) + ) + if skip_output_check + else False, + ) + for tile in todo ), + fkwargs=fkwargs, ): + # trigger output write for driver which require parent process for writing + if write_in_parent_process: + output_data, process_info = future.result() + process_info = _write( + process_info=process_info, + output_data=output_data, + output_writer=process.config.output, + ) + + # output already has been written, so just use task process info + else: + process_info = future.result() + + # in case of building overviews from baselevels, remember which parent + # tile needs to be updated later on + if ( + not skip_output_check + and process.config.baselevels + and process_info.processed + and process_info.tile.zoom > 0 + ): + overview_parents.add(process_info.tile.get_parent()) + num_processed += 1 - logger.info( + logger.debug( "tile %s/%s finished: %s, %s, %s", num_processed, total_tiles, @@ -575,60 +619,11 @@ def _run_multi( ) yield process_info - # process all remaining tiles using todo list from before - for task in executor.as_completed( - func=func, - iterable=( - TileProcess( - tile=tile, - config=process.config, - skip=( - process.mode == "continue" - and process.config.output_reader.tiles_exist(tile) - ) - if skip_output_check - else False, - ) - for tile in todo - ), - fkwargs=fkwargs, - chunksize=max_chunksize, - ): - # trigger output write for driver which require parent process for writing - if write_in_parent_process: - output_data, process_info = task.result() - process_info = _write( - process_info=process_info, - output_data=output_data, - output_writer=process.config.output, - ) - - # output already has been written, so just use task process info - else: - process_info = task.result() - - # in case of building overviews from baselevels, remember which parent - # tile needs to be updated later on - if ( - not skip_output_check - and process.config.baselevels - and process_info.processed - and process_info.tile.zoom > 0 - ): - overview_parents.add(process_info.tile.get_parent()) - - num_processed += 1 - logger.info( - "tile %s/%s finished: %s, %s, %s", - num_processed, - total_tiles, - process_info.tile, - process_info.process_msg, - process_info.write_msg, - ) - yield process_info + finally: + if create_executor: + executor.close() - logger.debug("%s tile(s) iterated in %s", str(num_processed), t) + logger.info("%s tile(s) iterated in %s", str(num_processed), t) ############################### @@ -636,7 +631,7 @@ def _run_multi( ############################### -def _execute(tile_process=None): +def _execute(tile_process=None, **kwargs): logger.debug( (tile_process.tile.id, "running on %s" % multiprocessing.current_process().name) ) @@ -670,7 +665,7 @@ def _execute(tile_process=None): ) -def _write(process_info=None, output_data=None, output_writer=None): +def _write(process_info=None, output_data=None, output_writer=None, **kwargs): if process_info.processed: try: output_data = output_writer.streamline_output(output_data) @@ -702,7 +697,7 @@ def _write(process_info=None, output_data=None, output_writer=None): return process_info -def _execute_and_write(tile_process=None, output_writer=None): +def _execute_and_write(tile_process=None, output_writer=None, **kwargs): output_data, process_info = _execute(tile_process=tile_process) return _write( process_info=process_info, output_data=output_data, output_writer=output_writer diff --git a/mapchete/cli/default/convert.py b/mapchete/cli/default/convert.py index 393dcce6..2cb32d21 100644 --- a/mapchete/cli/default/convert.py +++ b/mapchete/cli/default/convert.py @@ -7,7 +7,7 @@ import mapchete from mapchete import commands -from mapchete.cli import utils +from mapchete.cli import options from mapchete.formats import available_output_formats OUTPUT_FORMATS = available_output_formats() @@ -22,15 +22,15 @@ def _validate_bidx(ctx, param, bidx): @click.command(help="Convert outputs or other geodata.") -@utils.arg_tiledir -@utils.arg_output -@utils.opt_zoom -@utils.opt_bounds -@utils.opt_bounds_crs -@utils.opt_area -@utils.opt_area_crs -@utils.opt_point -@utils.opt_point_crs +@options.arg_tiledir +@options.arg_output +@options.opt_zoom +@options.opt_bounds +@options.opt_bounds_crs +@options.opt_area +@options.opt_area_crs +@options.opt_point +@options.opt_point_crs @click.option( "--clip-geometry", "-c", @@ -76,7 +76,7 @@ def _validate_bidx(ctx, param, bidx): default=0.0, help="Scaling offset (for raster output only).", ) -@utils.opt_resampling_method +@options.opt_resampling_method @click.option( "--overviews", is_flag=True, help="Generate overviews (single GTiff output only)." ) @@ -91,14 +91,16 @@ def _validate_bidx(ctx, param, bidx): is_flag=True, help="Write a valid COG. This will automatically generate verviews. (GTiff only)", ) -@utils.opt_overwrite -@utils.opt_verbose -@utils.opt_no_pbar -@utils.opt_debug -@utils.opt_multi -@utils.opt_logfile -@utils.opt_vrt -@utils.opt_idx_out_dir +@options.opt_overwrite +@options.opt_verbose +@options.opt_no_pbar +@options.opt_debug +@options.opt_workers +@options.opt_multi +@options.opt_concurrency +@options.opt_logfile +@options.opt_vrt +@options.opt_idx_out_dir def convert( tiledir, output, diff --git a/mapchete/cli/default/cp.py b/mapchete/cli/default/cp.py index b49cd516..e532ea61 100644 --- a/mapchete/cli/default/cp.py +++ b/mapchete/cli/default/cp.py @@ -2,27 +2,29 @@ import tqdm from mapchete import commands -from mapchete.cli import utils +from mapchete.cli import options @click.command(help="Copy TileDirectory from one source to another.") -@utils.arg_src_tiledir -@utils.arg_dst_tiledir -@utils.opt_zoom -@utils.opt_area -@utils.opt_area_crs -@utils.opt_bounds -@utils.opt_bounds_crs -@utils.opt_overwrite -@utils.opt_verbose -@utils.opt_no_pbar -@utils.opt_debug -@utils.opt_logfile -@utils.opt_multi -@utils.opt_http_username -@utils.opt_http_password -@utils.opt_src_fs_opts -@utils.opt_dst_fs_opts +@options.arg_src_tiledir +@options.arg_dst_tiledir +@options.opt_zoom +@options.opt_area +@options.opt_area_crs +@options.opt_bounds +@options.opt_bounds_crs +@options.opt_overwrite +@options.opt_verbose +@options.opt_no_pbar +@options.opt_debug +@options.opt_logfile +@options.opt_workers +@options.opt_dask_scheduler +@options.opt_concurrency +@options.opt_http_username +@options.opt_http_password +@options.opt_src_fs_opts +@options.opt_dst_fs_opts def cp(*args, debug=False, no_pbar=False, verbose=False, logfile=None, **kwargs): """Copy TileDirectory.""" # handle deprecated options diff --git a/mapchete/cli/default/create.py b/mapchete/cli/default/create.py index 1486dc47..dbc7e099 100644 --- a/mapchete/cli/default/create.py +++ b/mapchete/cli/default/create.py @@ -7,7 +7,7 @@ from shutil import copyfile from oyaml import dump -from mapchete.cli import utils +from mapchete.cli import options FORMAT_MANDATORY = { "GTiff": {"bands": None, "dtype": None}, @@ -29,12 +29,12 @@ @click.command(help="Create a new process.") -@utils.arg_create_mapchete_file -@utils.arg_process_file -@utils.arg_out_format -@utils.opt_out_path -@utils.opt_pyramid_type -@utils.opt_force +@options.arg_create_mapchete_file +@options.arg_process_file +@options.arg_out_format +@options.opt_out_path +@options.opt_pyramid_type +@options.opt_force def create( mapchete_file, process_file, diff --git a/mapchete/cli/default/execute.py b/mapchete/cli/default/execute.py index d8455da5..b85996a4 100644 --- a/mapchete/cli/default/execute.py +++ b/mapchete/cli/default/execute.py @@ -3,30 +3,32 @@ import mapchete from mapchete import commands -from mapchete.cli import utils +from mapchete.cli import options @click.command(help="Execute a process.") -@utils.arg_mapchete_files -@utils.opt_zoom -@utils.opt_bounds -@utils.opt_bounds_crs -@utils.opt_area -@utils.opt_area_crs -@utils.opt_point -@utils.opt_point_crs -@utils.opt_tile -@utils.opt_overwrite -@utils.opt_multi -@utils.opt_input_file -@utils.opt_logfile -@utils.opt_verbose -@utils.opt_no_pbar -@utils.opt_debug -@utils.opt_max_chunksize -@utils.opt_multiprocessing_start_method -@utils.opt_vrt -@utils.opt_idx_out_dir +@options.arg_mapchete_files +@options.opt_zoom +@options.opt_bounds +@options.opt_bounds_crs +@options.opt_area +@options.opt_area_crs +@options.opt_point +@options.opt_point_crs +@options.opt_tile +@options.opt_overwrite +@options.opt_concurrency +@options.opt_workers +@options.opt_multi +@options.opt_dask_scheduler +@options.opt_input_file +@options.opt_logfile +@options.opt_verbose +@options.opt_no_pbar +@options.opt_debug +@options.opt_max_chunksize +@options.opt_vrt +@options.opt_idx_out_dir def execute( mapchete_files, *args, diff --git a/mapchete/cli/default/formats.py b/mapchete/cli/default/formats.py index 85682eb1..059e377a 100644 --- a/mapchete/cli/default/formats.py +++ b/mapchete/cli/default/formats.py @@ -2,14 +2,14 @@ import click -from mapchete.cli import utils +from mapchete.cli import options from mapchete.formats import available_input_formats, available_output_formats @click.command(help="List available input and/or output formats.") -@utils.opt_input_formats -@utils.opt_output_formats -@utils.opt_debug +@options.opt_input_formats +@options.opt_output_formats +@options.opt_debug def formats(input_formats, output_formats, debug=False): """List input and/or output formats.""" if input_formats == output_formats: diff --git a/mapchete/cli/default/index.py b/mapchete/cli/default/index.py index ab6c15e6..e9ca6e3a 100644 --- a/mapchete/cli/default/index.py +++ b/mapchete/cli/default/index.py @@ -8,7 +8,7 @@ import tqdm import mapchete -from mapchete.cli import utils +from mapchete.cli import options from mapchete import commands @@ -19,31 +19,31 @@ @click.command(help="Create index of output tiles.") -@utils.arg_tiledir -@utils.opt_idx_out_dir -@utils.opt_geojson -@utils.opt_gpkg -@utils.opt_shp -@utils.opt_vrt -@utils.opt_txt -@utils.opt_fieldname -@utils.opt_basepath -@utils.opt_for_gdal -@utils.opt_zoom -@utils.opt_bounds -@utils.opt_bounds_crs -@utils.opt_area -@utils.opt_area_crs -@utils.opt_point -@utils.opt_point_crs -@utils.opt_tile -@utils.opt_http_username -@utils.opt_http_password -@utils.opt_fs_opts -@utils.opt_verbose -@utils.opt_no_pbar -@utils.opt_debug -@utils.opt_logfile +@options.arg_tiledir +@options.opt_idx_out_dir +@options.opt_geojson +@options.opt_gpkg +@options.opt_shp +@options.opt_vrt +@options.opt_txt +@options.opt_fieldname +@options.opt_basepath +@options.opt_for_gdal +@options.opt_zoom +@options.opt_bounds +@options.opt_bounds_crs +@options.opt_area +@options.opt_area_crs +@options.opt_point +@options.opt_point_crs +@options.opt_tile +@options.opt_http_username +@options.opt_http_password +@options.opt_fs_opts +@options.opt_verbose +@options.opt_no_pbar +@options.opt_debug +@options.opt_logfile def index(*args, debug=False, no_pbar=False, verbose=False, logfile=None, **kwargs): """Create various index files from process output.""" # handle deprecated options diff --git a/mapchete/cli/default/processes.py b/mapchete/cli/default/processes.py index c84bf6b2..17d95427 100644 --- a/mapchete/cli/default/processes.py +++ b/mapchete/cli/default/processes.py @@ -3,7 +3,7 @@ import click import logging -from mapchete.cli import utils +from mapchete.cli import options from mapchete.processes import process_names_docstrings logger = logging.getLogger(__name__) @@ -13,7 +13,7 @@ @click.option( "--process_name", "-n", type=click.STRING, help="Print docstring of process." ) -@utils.opt_debug +@options.opt_debug def processes(process_name=None, docstrings=False, debug=False): """List available processes.""" processes = process_names_docstrings(process_name=process_name) diff --git a/mapchete/cli/default/rm.py b/mapchete/cli/default/rm.py index 5e02a927..7571c84d 100644 --- a/mapchete/cli/default/rm.py +++ b/mapchete/cli/default/rm.py @@ -2,23 +2,23 @@ import tqdm from mapchete import commands -from mapchete.cli import utils +from mapchete.cli import options @click.command(help="Remove tiles from TileDirectory.") -@utils.arg_tiledir -@utils.opt_zoom -@utils.opt_area -@utils.opt_area_crs -@utils.opt_bounds -@utils.opt_bounds_crs -@utils.opt_multi -@utils.opt_verbose -@utils.opt_no_pbar -@utils.opt_debug -@utils.opt_logfile -@utils.opt_force -@utils.opt_fs_opts +@options.arg_tiledir +@options.opt_zoom +@options.opt_area +@options.opt_area_crs +@options.opt_bounds +@options.opt_bounds_crs +@options.opt_multi +@options.opt_verbose +@options.opt_no_pbar +@options.opt_debug +@options.opt_logfile +@options.opt_force +@options.opt_fs_opts def rm( *args, force=False, diff --git a/mapchete/cli/default/serve.py b/mapchete/cli/default/serve.py index 1ac9e438..c0369b75 100755 --- a/mapchete/cli/default/serve.py +++ b/mapchete/cli/default/serve.py @@ -9,24 +9,24 @@ from rasterio.io import MemoryFile import mapchete -from mapchete.cli import utils +from mapchete.cli import options from mapchete.tile import BufferedTilePyramid logger = logging.getLogger(__name__) @click.command(help="Serve a process on localhost.") -@utils.arg_mapchete_files -@utils.opt_port -@utils.opt_internal_cache -@utils.opt_zoom -@utils.opt_bounds -@utils.opt_overwrite -@utils.opt_readonly -@utils.opt_memory -@utils.opt_input_file -@utils.opt_debug -@utils.opt_logfile +@options.arg_mapchete_files +@options.opt_port +@options.opt_internal_cache +@options.opt_zoom +@options.opt_bounds +@options.opt_overwrite +@options.opt_readonly +@options.opt_memory +@options.opt_input_file +@options.opt_debug +@options.opt_logfile def serve( mapchete_files, port=None, diff --git a/mapchete/cli/utils.py b/mapchete/cli/options.py similarity index 90% rename from mapchete/cli/utils.py rename to mapchete/cli/options.py index 99d4764e..6422c89d 100644 --- a/mapchete/cli/utils.py +++ b/mapchete/cli/options.py @@ -8,7 +8,11 @@ import tqdm import mapchete -from mapchete.config import raw_conf, bounds_from_opts +from mapchete.config import ( + raw_conf, + bounds_from_opts, + MULTIPROCESSING_DEFAULT_START_METHOD, +) from mapchete.formats import available_output_formats from mapchete.index import zoom_index_gen from mapchete.log import set_log_level, setup_logfile @@ -19,9 +23,6 @@ MULTIPROCESSING_START_METHODS = get_all_start_methods() -MULTIPROCESSING_START_METHOD_DEFAULT = ( - "fork" if "fork" in MULTIPROCESSING_START_METHODS else "spawn" -) # click callbacks # @@ -110,6 +111,10 @@ def _cb_key_val(ctx, param, value): return out +def _cb_none_concurrency(ctx, param, value): + return None if value == "none" else value + + # click arguments # ################### arg_mapchete_file = click.argument("mapchete_file", type=click.Path(exists=True)) @@ -137,13 +142,12 @@ def _cb_key_val(ctx, param, value): ################# opt_out_path = click.option( "--out-path", - "-op", type=click.Path(), default=os.path.join(os.getcwd(), "output"), help="Process output path.", ) opt_idx_out_dir = click.option( - "--idx-out-dir", "-od", type=click.Path(), help="Index output directory." + "--idx-out-dir", type=click.Path(), help="Index output directory." ) opt_input_file = click.option( "--input-file", @@ -210,7 +214,13 @@ def _cb_key_val(ctx, param, value): "--multi", "-m", type=click.INT, - help="Number of concurrent processes.", + help="Number of workers when processing concurrently.", +) +opt_workers = click.option( + "--workers", + "-w", + type=click.INT, + help="Number of workers when processing concurrently.", ) opt_force = click.option( "--force", "-f", is_flag=True, help="Overwrite if files already exist." @@ -235,18 +245,17 @@ def _cb_key_val(ctx, param, value): ) opt_max_chunksize = click.option( "--max-chunksize", - "-c", type=click.INT, default=1, - help="Maximum number of process tiles to be queued for each worker. (default: 1)", + help="Maximum number of tasks to be queued for each worker. (default: 1)", ) opt_multiprocessing_start_method = click.option( "--multiprocessing-start-method", type=click.Choice(MULTIPROCESSING_START_METHODS), - default=MULTIPROCESSING_START_METHOD_DEFAULT, + default=MULTIPROCESSING_DEFAULT_START_METHOD, help=( "Method used by multiprocessing module to start child workers. Availability of " - f"methods depends on OS (default: {MULTIPROCESSING_START_METHOD_DEFAULT})" + f"methods depends on OS (default: {MULTIPROCESSING_DEFAULT_START_METHOD})" ), ) opt_input_formats = click.option( @@ -350,3 +359,13 @@ def _cb_key_val(ctx, param, value): callback=_cb_key_val, help="Configuration options for destination fsspec filesystem.", ) +opt_dask_scheduler = click.option( + "--dask-scheduler", type=click.STRING, help="Address for dask scheduler." +) +opt_concurrency = click.option( + "--concurrency", + type=click.Choice(["processes", "threads", "dask", "none"]), + default="processes", + callback=_cb_none_concurrency, + help="Decide which Executor to use for concurrent processing.", +) diff --git a/mapchete/commands/_convert.py b/mapchete/commands/_convert.py index cf533053..fa08494b 100644 --- a/mapchete/commands/_convert.py +++ b/mapchete/commands/_convert.py @@ -9,10 +9,10 @@ from shapely.geometry.base import BaseGeometry import tilematrix from typing import Callable, List, Tuple, Union +import warnings import mapchete from mapchete.commands._execute import execute -from mapchete.commands._job import empty_callback, Job from mapchete.config import raw_conf, raw_conf_output_pyramid from mapchete.formats import ( driver_from_file, @@ -40,6 +40,10 @@ def convert( point_crs: Tuple[float, float] = None, tile: Tuple[int, int, int] = None, overwrite: bool = False, + concurrency: str = "processes", + dask_scheduler: str = None, + dask_client=None, + workers: int = None, multi: int = None, clip_geometry: str = None, bidx: List[int] = None, @@ -57,7 +61,7 @@ def convert( cog: bool = False, msg_callback: Callable = None, as_iterator: bool = False, -) -> Job: +) -> mapchete.Job: """ Convert mapchete outputs or other geodata. @@ -93,8 +97,14 @@ def convert( Zoom, row and column of tile to be processed (cannot be used with zoom) overwrite : bool Overwrite existing output. - multi : int - Number of processes used to paralellize tile execution. + workers : int + Number of execution workers when processing concurrently. + concurrency : str + Concurrency to be used. Could either be "processes", "threads" or "dask". + dask_scheduler : str + URL to dask scheduler if required. + dask_client : dask.distributed.Client + Reusable Client instance if required. Otherwise a new client will be created. clip_geometry : str Path to Fiona-readable file by which output will be clipped. bidx : list of integers @@ -130,7 +140,7 @@ def convert( Returns ------- - Job instance either with already processed items or a generator with known length. + mapchete.Job instance either with already processed items or a generator with known length. Examples -------- @@ -147,7 +157,14 @@ def convert( Usage within a process bar. """ - msg_callback = msg_callback or empty_callback + + def _empty_callback(*args, **kwargs): + pass + + msg_callback = msg_callback or _empty_callback + if multi is not None: # pragma: no cover + warnings.warn("The 'multi' parameter is deprecated and is now named 'workers'") + workers = workers or multi or cpu_count() creation_options = creation_options or {} bidx = [bidx] if isinstance(bidx, int) else bidx try: @@ -260,7 +277,7 @@ def convert( "Process area is empty: clip bounds don't intersect with input bounds." ) # this returns a Job with an empty iterator - return Job(iter, [], as_iterator=as_iterator, total=0) + return mapchete.Job(None, (), as_iterator=as_iterator, total=0) # add process bounds and output type mapchete_config.update( bounds=(clip_intersection.bounds if clip_geometry else inp_bounds), @@ -279,7 +296,10 @@ def convert( bounds_crs=bounds_crs, area=area, area_crs=area_crs, - multi=multi or cpu_count(), + concurrency=concurrency, + dask_scheduler=dask_scheduler, + dask_client=dask_client, + workers=workers, as_iterator=as_iterator, msg_callback=msg_callback, ) diff --git a/mapchete/commands/_cp.py b/mapchete/commands/_cp.py index bc192076..7ca253fc 100644 --- a/mapchete/commands/_cp.py +++ b/mapchete/commands/_cp.py @@ -1,6 +1,7 @@ import fiona import json import logging +from multiprocessing import cpu_count import os from rasterio.crs import CRS from shapely.geometry import box, shape @@ -8,9 +9,9 @@ from shapely.ops import unary_union from tilematrix import TilePyramid from typing import Callable, List, Tuple, Union +import warnings import mapchete -from mapchete.commands._job import empty_callback, Job from mapchete.config import _guess_geometry from mapchete.formats import read_output_metadata from mapchete.io import fs_from_path, tiles_exist @@ -28,12 +29,16 @@ def cp( bounds: Tuple[float] = None, bounds_crs: Union[CRS, str] = None, overwrite: bool = False, + workers: int = None, multi: int = None, + concurrency: str = None, + dask_scheduler: str = None, + dask_client=None, src_fs_opts: dict = None, dst_fs_opts: dict = None, msg_callback: Callable = None, as_iterator: bool = False, -) -> Job: +) -> mapchete.Job: """ Copy TileDirectory from source to destination. @@ -56,8 +61,14 @@ def cp( CRS of area (default: process CRS). overwrite : bool Overwrite existing output. - multi : int + workers : int Number of threads used to check whether tiles exist. + concurrency : str + Concurrency to be used. Could either be "processes", "threads" or "dask". + dask_scheduler : str + URL to dask scheduler if required. + dask_client : dask.distributed.Client + Reusable Client instance if required. Otherwise a new client will be created. src_fs_opts : dict Configuration options for source fsspec filesystem. dst_fs_opts : dict @@ -69,7 +80,7 @@ def cp( Returns ------- - Job instance either with already processed items or a generator with known length. + mapchete.Job instance either with already processed items or a generator with known length. Examples -------- @@ -86,7 +97,14 @@ def cp( Usage within a process bar. """ - msg_callback = msg_callback or empty_callback + + def _empty_callback(*args): + pass + + msg_callback = msg_callback or _empty_callback + if multi is not None: # pragma: no cover + warnings.warn("The 'multi' parameter is deprecated and is now named 'workers'") + workers = workers or multi or cpu_count() src_fs_opts = src_fs_opts or {} dst_fs_opts = dst_fs_opts or {} if zoom is None: # pragma: no cover @@ -127,16 +145,24 @@ def cp( fs=dst_fs, fs_kwargs=dst_fs_opts, ) as dst_mp: - return Job( + return mapchete.Job( _copy_tiles, - msg_callback, - src_mp, - dst_mp, - tp, - multi, - src_fs, - dst_fs, - overwrite, + fargs=( + msg_callback, + src_mp, + dst_mp, + tp, + workers, + src_fs, + dst_fs, + overwrite, + ), + executor_concurrency=concurrency, + executor_kwargs=dict( + max_workers=workers, + dask_scheduler=dask_scheduler, + dask_client=dask_client, + ), as_iterator=as_iterator, total=src_mp.count_tiles(), ) @@ -147,10 +173,11 @@ def _copy_tiles( src_mp, dst_mp, tp, - multi, + workers, src_fs, dst_fs, overwrite, + executor=None, ): for z in src_mp.config.init_zoom_levels: msg_callback(f"copy tiles for zoom {z}...") @@ -168,7 +195,7 @@ def _copy_tiles( src_tiles_exist = { tile: exists for tile, exists in tiles_exist( - config=src_mp.config, output_tiles=tiles, multi=multi + config=src_mp.config, output_tiles=tiles, multi=workers ) } @@ -177,37 +204,56 @@ def _copy_tiles( dst_tiles_exist = { tile: exists for tile, exists in tiles_exist( - config=dst_mp.config, output_tiles=tiles, multi=multi + config=dst_mp.config, output_tiles=tiles, multi=workers ) } # copy copied = 0 - for tile in tiles: - src_path = src_mp.config.output_reader.get_path(tile) - # only copy if source tile exists - if src_tiles_exist[tile]: - # skip if destination tile exists and overwrite is deactivated - if dst_tiles_exist[tile] and not overwrite: - msg = f"{tile}: destination tile exists" - logger.debug(msg) - yield msg - continue - # copy from source to target - else: - dst_path = dst_mp.config.output_reader.get_path(tile) - _copy(src_fs, src_path, dst_fs, dst_path) - copied += 1 - msg = f"{tile}: copy {src_path} to {dst_path}" - logger.debug(msg) - yield msg - else: - msg = f"{tile}: source tile ({src_path}) does not exist" - logger.debug(msg) - yield msg + for future in executor.as_completed( + _copy_tile, + tiles, + fargs=( + src_mp, + dst_mp, + src_tiles_exist, + dst_tiles_exist, + src_fs, + dst_fs, + overwrite, + ), + ): + c, msg = future.result() + copied += c + yield msg + msg_callback(f"{copied} tiles copied") +def _copy_tile( + tile, src_mp, dst_mp, src_tiles_exist, dst_tiles_exist, src_fs, dst_fs, overwrite +): + src_path = src_mp.config.output_reader.get_path(tile) + # only copy if source tile exists + if src_tiles_exist[tile]: + # skip if destination tile exists and overwrite is deactivated + if dst_tiles_exist[tile] and not overwrite: + msg = f"{tile}: destination tile exists" + logger.debug(msg) + return 0, msg + # copy from source to target + else: + dst_path = dst_mp.config.output_reader.get_path(tile) + _copy(src_fs, src_path, dst_fs, dst_path) + msg = f"{tile}: copy {src_path} to {dst_path}" + logger.debug(msg) + return 1, msg + else: + msg = f"{tile}: source tile ({src_path}) does not exist" + logger.debug(msg) + return 0, msg + + def _copy(src_fs, src_path, dst_fs, dst_path): # create parent directories on local filesystems if dst_fs.protocol == "file": diff --git a/mapchete/commands/_execute.py b/mapchete/commands/_execute.py index d94eb888..e6c411cd 100755 --- a/mapchete/commands/_execute.py +++ b/mapchete/commands/_execute.py @@ -3,9 +3,9 @@ from rasterio.crs import CRS from shapely.geometry.base import BaseGeometry from typing import Callable, List, Tuple, Union +import warnings import mapchete -from mapchete.commands._job import empty_callback, Job from mapchete.config import bounds_from_opts, raw_conf, raw_conf_process_pyramid logger = logging.getLogger(__name__) @@ -23,12 +23,16 @@ def execute( tile: Tuple[int, int, int] = None, overwrite: bool = False, mode: str = "continue", + concurrency: str = "processes", + workers: int = None, multi: int = None, max_chunksize: int = None, multiprocessing_start_method: str = None, + dask_scheduler: str = None, + dask_client=None, msg_callback: Callable = None, as_iterator: bool = False, -) -> Job: +) -> mapchete.Job: """ Execute a Mapchete process. @@ -57,13 +61,19 @@ def execute( Overwrite existing output. mode : str Set process mode. One of "readonly", "continue" or "overwrite". - multi : int - Number of processes used to paralellize tile execution. + workers : int + Number of execution workers when processing concurrently. max_chunksize : int Maximum number of process tiles to be queued for each worker. (default: 1) multiprocessing_start_method : str Method used by multiprocessing module to start child workers. Availability of methods depends on OS. + concurrency : str + Concurrency to be used. Could either be "processes", "threads" or "dask". + dask_scheduler : str + URL to dask scheduler if required. + dask_client : dask.distributed.Client + Reusable Client instance if required. Otherwise a new client will be created. msg_callback : Callable Optional callback function for process messages. as_iterator : bool @@ -71,7 +81,7 @@ def execute( Returns ------- - Job instance either with already processed items or a generator with known length. + mapchete.Job instance either with already processed items or a generator with known length. Examples -------- @@ -89,8 +99,14 @@ def execute( Usage within a process bar. """ mode = "overwrite" if overwrite else mode - msg_callback = msg_callback or empty_callback - multi = multi or cpu_count() + + def _empty_callback(*args): + pass + + msg_callback = msg_callback or _empty_callback + if multi is not None: # pragma: no cover + warnings.warn("The 'multi' parameter is deprecated and is now named 'workers'") + workers = workers or multi or cpu_count() if tile: tile = raw_conf_process_pyramid(raw_conf(mapchete_config)).tile(*tile) @@ -119,16 +135,34 @@ def execute( if tile: msg_callback("processing 1 tile") else: - msg_callback(f"processing {tiles_count} tile(s) on {multi} worker(s)") - return Job( + msg_callback(f"processing {tiles_count} tile(s) on {workers} worker(s)") + # automatically use dask Executor if dask scheduler is defined + if dask_scheduler or dask_client: # pragma: no cover + concurrency = "dask" + # use sequential Executor if only one tile or only one worker is defined + elif tiles_count == 1 or workers == 1: + logger.debug( + f"using sequential Executor because there is only one {'tile' if tiles_count == 1 else 'worker'}" + ) + concurrency = None + return mapchete.Job( _msg_wrapper, - msg_callback, - mp, - tile=tile, - multi=multi, - zoom=None if tile else zoom, - max_chunksize=max_chunksize, - multiprocessing_start_method=multiprocessing_start_method, + fargs=( + msg_callback, + mp, + ), + fkwargs=dict( + tile=tile, + multi=workers, + zoom=None if tile else zoom, + ), + executor_concurrency=concurrency, + executor_kwargs=dict( + dask_scheduler=dask_scheduler, + dask_client=dask_client, + max_chunksize=max_chunksize, + multiprocessing_start_method=multiprocessing_start_method, + ), as_iterator=as_iterator, total=1 if tile else tiles_count, ) @@ -138,9 +172,9 @@ def execute( raise -def _msg_wrapper(msg_callback, mp, **kwargs): +def _msg_wrapper(msg_callback, mp, executor=None, **kwargs): try: - for process_info in mp.batch_processor(**kwargs): + for process_info in mp.batch_processor(executor=executor, **kwargs): yield process_info msg_callback( f"Tile {process_info.tile.id}: {process_info.process_msg}, {process_info.write_msg}" diff --git a/mapchete/commands/_index.py b/mapchete/commands/_index.py index 5b536fb7..49130dde 100644 --- a/mapchete/commands/_index.py +++ b/mapchete/commands/_index.py @@ -7,8 +7,6 @@ from typing import Callable, List, Tuple, Union import mapchete -from mapchete.commands._job import empty_callback, Job -from mapchete.cli import utils from mapchete.index import zoom_index_gen logger = logging.getLogger(__name__) @@ -37,7 +35,7 @@ def index( msg_callback: Callable = None, as_iterator: bool = False, **kwargs, -) -> Job: +) -> mapchete.Job: """ Create one or more indexes from a TileDirectory. @@ -89,7 +87,7 @@ def index( Returns ------- - Job instance either with already processed items or a generator with known length. + mapchete.Job instance either with already processed items or a generator with known length. Examples -------- @@ -112,7 +110,11 @@ def index( """At least one of '--geojson', '--gpkg', '--shp', '--vrt' or '--txt'""" """must be provided.""" ) - msg_callback = msg_callback or empty_callback + + def _empty_callback(*args): + pass + + msg_callback = msg_callback or _empty_callback fs_opts = fs_opts or {} msg_callback(f"create index(es) for {tiledir}") @@ -130,20 +132,22 @@ def index( area_crs=area_crs, ) as mp: - return Job( + return mapchete.Job( zoom_index_gen, - mp=mp, - zoom=None if tile else mp.config.init_zoom_levels, - tile=tile, - out_dir=idx_out_dir if idx_out_dir else mp.config.output.path, - geojson=geojson, - gpkg=gpkg, - shapefile=shp, - vrt=vrt, - txt=txt, - fieldname=fieldname, - basepath=basepath, - for_gdal=for_gdal, + fkwargs=dict( + mp=mp, + zoom=None if tile else mp.config.init_zoom_levels, + tile=tile, + out_dir=idx_out_dir if idx_out_dir else mp.config.output.path, + geojson=geojson, + gpkg=gpkg, + shapefile=shp, + vrt=vrt, + txt=txt, + fieldname=fieldname, + basepath=basepath, + for_gdal=for_gdal, + ), as_iterator=as_iterator, total=1 if tile else mp.count_tiles(), ) diff --git a/mapchete/commands/_job.py b/mapchete/commands/_job.py deleted file mode 100644 index 3e67bf9c..00000000 --- a/mapchete/commands/_job.py +++ /dev/null @@ -1,41 +0,0 @@ -from typing import Generator - - -class Job: - """Wraps the output of a processing function into a generator with known length.""" - - def __init__( - self, - func: Generator, - *fargs: dict, - as_iterator: bool = False, - total: int = None, - **fkwargs: dict, - ): - self.func = func - self.fargs = fargs - self.fkwargs = fkwargs - self._total = total - self._as_iterator = as_iterator - self._finished = False - if not as_iterator: - list(self.func(*self.fargs, **self.fkwargs)) - self._finished = True - - def __len__(self): - return self._total - - def __iter__(self): - if not self._as_iterator: # pragma: no cover - raise TypeError("initialize with 'as_iterator=True'") - yield from self.func(*self.fargs, **self.fkwargs) - self._finished = True - - def __repr__(self): # pragma: no cover - return ( - f"" - ) - - -def empty_callback(*args, **kwargs): - pass diff --git a/mapchete/commands/_rm.py b/mapchete/commands/_rm.py index b9213923..415f3af0 100644 --- a/mapchete/commands/_rm.py +++ b/mapchete/commands/_rm.py @@ -4,8 +4,6 @@ from typing import Callable, List, Tuple, Union import mapchete -from mapchete.cli import utils -from mapchete.commands._job import empty_callback, Job from mapchete.io import fs_from_path, tiles_exist logger = logging.getLogger(__name__) @@ -23,7 +21,7 @@ def rm( fs_opts: dict = None, msg_callback: Callable = None, as_iterator: bool = False, -) -> Job: +) -> mapchete.Job: """ Remove tiles from TileDirectory. @@ -53,7 +51,7 @@ def rm( Returns ------- - Job instance either with already processed items or a generator with known length. + mapchete.Job instance either with already processed items or a generator with known length. Examples -------- @@ -71,7 +69,11 @@ def rm( Usage within a process bar. """ - msg_callback = msg_callback or empty_callback + + def _empty_callback(*args): + pass + + msg_callback = msg_callback or _empty_callback fs_opts = fs_opts or {} if zoom is None: # pragma: no cover raise ValueError("zoom level(s) required") @@ -114,17 +116,19 @@ def rm( for zoom_tiles in tiles.values() for tile in zoom_tiles ] - return Job( + return mapchete.Job( _rm, - paths, - fs, - msg_callback, + fargs=( + paths, + fs, + msg_callback, + ), as_iterator=as_iterator, total=len(paths), ) -def _rm(paths, fs, msg_callback, recursive=False): +def _rm(paths, fs, msg_callback, recursive=False, **kwargs): """ Remove one or multiple paths from file system. diff --git a/mapchete/config.py b/mapchete/config.py index dfcaf997..4525940a 100644 --- a/mapchete/config.py +++ b/mapchete/config.py @@ -26,6 +26,7 @@ from shapely.geometry.base import BaseGeometry from shapely.ops import cascaded_union import sys +from tempfile import NamedTemporaryFile from tilematrix._funcs import Bounds import warnings @@ -58,7 +59,7 @@ # parameters which have to be provided in the configuration and their types _MANDATORY_PARAMETERS = [ - ("process", (str, type(None))), # path to .py file or module path + ("process", (str, list, type(None))), # path to .py file or module path ("pyramid", dict), # process pyramid definition ("input", (dict, type(None))), # files & other types ("output", dict), # process output parameters @@ -93,6 +94,8 @@ "mode", ] +MULTIPROCESSING_DEFAULT_START_METHOD = "spawn" + class MapcheteConfig(object): """ @@ -206,7 +209,7 @@ def __init__( # (2) check user process self.config_dir = self._raw["config_dir"] - self.process_name = self.process_path = self._raw["process"] + self.process_name = self.process_path = self.process = self._raw["process"] if self.mode != "readonly": logger.debug("validating process code") self.process_func @@ -548,7 +551,7 @@ def process_func(self): ) else: return get_process_func( - process_path=self.process_path, + process=self.process, config_dir=self.config_dir, run_compile=True, ) @@ -995,11 +998,11 @@ def bounds_from_opts( return -def get_process_func(process_path=None, config_dir=None, run_compile=False): +def get_process_func(process=None, config_dir=None, run_compile=False): """Import and return process function.""" - logger.debug("get process function from %s", process_path) + logger.debug(f"get process function from {process}") process_module = _load_process_module( - process_path=process_path, config_dir=config_dir, run_compile=run_compile + process=process, config_dir=config_dir, run_compile=run_compile ) try: if hasattr(process_module, "Process"): @@ -1010,37 +1013,51 @@ def get_process_func(process_path=None, config_dir=None, run_compile=False): if hasattr(process_module, "execute"): return process_module.execute else: - raise ImportError("No execute() function found in %s" % process_path) + raise ImportError("No execute() function found in %s" % process) except ImportError as e: raise MapcheteProcessImportError(e) -def _load_process_module(process_path=None, config_dir=None, run_compile=False): - if process_path.endswith(".py"): - module_path = os.path.join(config_dir, process_path) - if not os.path.isfile(module_path): - raise MapcheteConfigError(f"{module_path} is not available") - try: - if run_compile: - py_compile.compile(module_path, doraise=True) - module_name = os.path.splitext(os.path.basename(module_path))[0] - # load module - spec = importlib.util.spec_from_file_location(module_name, module_path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - # required to make imported module available using multiprocessing - sys.modules[module_name] = module - # configure process file logger - add_module_logger(module.__name__) - except py_compile.PyCompileError as e: - raise MapcheteProcessSyntaxError(e) - except ImportError as e: - raise MapcheteProcessImportError(e) - else: - try: - module = importlib.import_module(process_path) - except ImportError as e: - raise MapcheteProcessImportError(e) +def _load_process_module(process=None, config_dir=None, run_compile=False): + tmpfile = None + try: + if isinstance(process, list): + tmpfile = NamedTemporaryFile(suffix=".py") + logger.debug(f"writing process code to temporary file {tmpfile.name}") + with open(tmpfile.name, "w") as dst: + for line in process: + dst.write(line + "\n") + process = tmpfile.name + if process.endswith(".py"): + module_path = absolute_path(path=process, base_dir=config_dir) + if not os.path.isfile(module_path): + raise MapcheteConfigError(f"{module_path} is not available") + try: + if run_compile: + py_compile.compile(module_path, doraise=True) + module_name = os.path.splitext(os.path.basename(module_path))[0] + # load module + spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + # required to make imported module available using multiprocessing + sys.modules[module_name] = module + # configure process file logger + add_module_logger(module.__name__) + except py_compile.PyCompileError as e: + raise MapcheteProcessSyntaxError(e) + except ImportError as e: + raise MapcheteProcessImportError(e) + else: + try: + module = importlib.import_module(process) + except ImportError as e: + raise MapcheteProcessImportError(e) + logger.debug(f"return process func: {module}") + finally: + if tmpfile: + logger.debug(f"removing {tmpfile.name}") + tmpfile.close() return module diff --git a/mapchete/index.py b/mapchete/index.py index cfd822fd..74ae3455 100644 --- a/mapchete/index.py +++ b/mapchete/index.py @@ -62,6 +62,7 @@ def zoom_index_gen( basepath=None, for_gdal=True, threading=False, + **kwargs, ): """ Generate indexes for given zoom level. diff --git a/mapchete/processes/__init__.py b/mapchete/processes/__init__.py index 490ba50a..bb58e52f 100644 --- a/mapchete/processes/__init__.py +++ b/mapchete/processes/__init__.py @@ -37,7 +37,7 @@ def _import(): return imported -def process_names_docstrings(process_name): +def process_names_docstrings(process_name=None): """ Return registered process module names and docstrings. diff --git a/setup.py b/setup.py index 71dadc38..22d8b59a 100644 --- a/setup.py +++ b/setup.py @@ -39,12 +39,15 @@ "tqdm", ] req_contours = ["matplotlib"] +req_dask = ["dask", "distributed"] req_geobuf = ["geobuf"] req_http = ["fsspec[http]", "aiohttp", "requests"] req_s3 = ["boto3", "fsspec[s3]", "s3fs>=0.5.1"] req_serve = ["flask", "werkzeug>=0.15"] req_vrt = ["lxml"] -req_complete = req_contours + req_geobuf + req_http + req_s3 + req_serve + req_vrt +req_complete = ( + req_contours + req_dask + req_geobuf + req_http + req_s3 + req_serve + req_vrt +) setup( name="mapchete", @@ -94,6 +97,7 @@ extras_require={ "complete": req_complete, "contours": req_contours, + "dask": req_dask, "geobuf": req_geobuf, "s3": req_s3, "serve": req_serve, diff --git a/test/conftest.py b/test/conftest.py index 695224f3..5f351412 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -361,6 +361,13 @@ def example_mapchete(): return ExampleConfig(path=path, dict=_dict_from_mapchete(path)) +@pytest.fixture +def example_custom_process_mapchete(): + """Fixture for example.mapchete.""" + path = os.path.join(TESTDATA_DIR, "example_custom_process.mapchete") + return ExampleConfig(path=path, dict=_dict_from_mapchete(path)) + + @pytest.fixture def zoom_mapchete(): """Fixture for zoom.mapchete.""" @@ -389,6 +396,13 @@ def cleantopo_br(): return ExampleConfig(path=path, dict=_dict_from_mapchete(path)) +@pytest.fixture +def cleantopo_br_metatiling_1(): + """Fixture for cleantopo_br.mapchete.""" + path = os.path.join(TESTDATA_DIR, "cleantopo_br_metatiling_1.mapchete") + return ExampleConfig(path=path, dict=_dict_from_mapchete(path)) + + @pytest.fixture def cleantopo_remote(): """Fixture for cleantopo_remote.mapchete.""" diff --git a/test/test_cli.py b/test/test_cli.py index 25163008..cf28e392 100644 --- a/test/test_cli.py +++ b/test/test_cli.py @@ -16,7 +16,7 @@ import mapchete from mapchete.cli.main import main as mapchete_cli -from mapchete.cli import utils +from mapchete.cli import options from mapchete.errors import MapcheteProcessOutputError @@ -73,7 +73,7 @@ def test_main(mp_tmpdir): ) -def test_create_and_execute(mp_tmpdir, cleantopo_br_tif): +def test_create(mp_tmpdir, cleantopo_br_tif): """Run mapchete create and execute.""" temp_mapchete = os.path.join(mp_tmpdir, "temp.mapchete") temp_process = os.path.join(mp_tmpdir, "temp.py") @@ -118,48 +118,72 @@ def test_create_existing(mp_tmpdir): run_cli(args, expected_exit_code=-1) -def test_execute_multiprocessing(mp_tmpdir, cleantopo_br, cleantopo_br_tif): - """Run mapchete execute with multiple workers.""" - temp_mapchete = os.path.join(mp_tmpdir, "temp.mapchete") - temp_process = os.path.join(mp_tmpdir, "temp.py") - out_format = "GTiff" - # create from template +def test_execute_concurrent_processes(mp_tmpdir, cleantopo_br_metatiling_1): + # """Run mapchete execute with multiple workers.""" run_cli( [ - "create", - temp_mapchete, - temp_process, - out_format, - "--pyramid-type", - "geodetic", + "execute", + cleantopo_br_metatiling_1.path, + "--zoom", + "5", + "--workers", + "2", + "-d", + "--concurrency", + "processes", ] ) - # edit configuration - with open(temp_mapchete, "r") as config_file: - config = yaml.safe_load(config_file) - config["output"].update(bands=1, dtype="uint8", path=mp_tmpdir) - with open(temp_mapchete, "w") as config_file: - config_file.write(yaml.dump(config, default_flow_style=False)) - # run process with multiprocessing - with pytest.raises(MapcheteProcessOutputError): - run_cli( - [ - "execute", - temp_mapchete, - "--zoom", - "5", - "-m", - "2", - "-d", - ] - ) - # run example process with multiprocessing - run_cli(["execute", cleantopo_br.path, "--zoom", "5", "-m", "2", "-d"]) + + +def test_execute_concurrent_threads(mp_tmpdir, cleantopo_br_metatiling_1): + """Run mapchete execute with multiple workers.""" + run_cli( + [ + "execute", + cleantopo_br_metatiling_1.path, + "--zoom", + "5", + "--workers", + "2", + "-d", + "--concurrency", + "threads", + ], + ) + + +def test_execute_concurrent_dask(mp_tmpdir, cleantopo_br_metatiling_1): + """Run mapchete execute with multiple workers.""" + run_cli( + [ + "execute", + cleantopo_br_metatiling_1.path, + "--zoom", + "5", + "--workers", + "2", + "-d", + "--concurrency", + "dask", + ], + ) def test_execute_debug(mp_tmpdir, example_mapchete): """Using debug output.""" - run_cli(["execute", example_mapchete.path, "-t", "10", "500", "1040", "--debug"]) + run_cli( + [ + "execute", + example_mapchete.path, + "-t", + "10", + "500", + "1040", + "--debug", + "--concurrency", + "none", + ] + ) def test_execute_vrt(mp_tmpdir, cleantopo_br): @@ -172,7 +196,17 @@ def test_execute_vrt(mp_tmpdir, cleantopo_br): # run again, this time with custom output directory run_cli( - ["execute", cleantopo_br.path, "-z", "5", "--vrt", "--idx-out-dir", mp_tmpdir] + [ + "execute", + cleantopo_br.path, + "-z", + "5", + "--vrt", + "--idx-out-dir", + mp_tmpdir, + "--concurrency", + "none", + ] ) with mapchete.open(cleantopo_br.dict) as mp: vrt_path = os.path.join(mp_tmpdir, "5.vrt") @@ -180,15 +214,51 @@ def test_execute_vrt(mp_tmpdir, cleantopo_br): assert src.read().any() # run with single tile - run_cli(["execute", cleantopo_br.path, "-t", "5", "3", "7", "--vrt"]) + run_cli( + [ + "execute", + cleantopo_br.path, + "-t", + "5", + "3", + "7", + "--vrt", + "--concurrency", + "none", + ] + ) # no new entries - run_cli(["execute", cleantopo_br.path, "-t", "5", "0", "0", "--vrt"]) + run_cli( + [ + "execute", + cleantopo_br.path, + "-t", + "5", + "0", + "0", + "--vrt", + "--concurrency", + "none", + ] + ) def test_execute_verbose(mp_tmpdir, example_mapchete): """Using verbose output.""" - run_cli(["execute", example_mapchete.path, "-t", "10", "500", "1040", "--verbose"]) + run_cli( + [ + "execute", + example_mapchete.path, + "-t", + "10", + "500", + "1040", + "--verbose", + "--concurrency", + "none", + ] + ) def test_execute_logfile(mp_tmpdir, example_mapchete): @@ -204,6 +274,8 @@ def test_execute_logfile(mp_tmpdir, example_mapchete): "1040", "--logfile", logfile, + "--concurrency", + "none", ] ) assert os.path.isfile(logfile) @@ -213,7 +285,9 @@ def test_execute_logfile(mp_tmpdir, example_mapchete): def test_execute_wkt_area(mp_tmpdir, example_mapchete, wkt_geom): """Using area from WKT.""" - run_cli(["execute", example_mapchete.path, "--area", wkt_geom]) + run_cli( + ["execute", example_mapchete.path, "--area", wkt_geom, "--concurrency", "none"] + ) def test_execute_point(mp_tmpdir, example_mapchete, wkt_geom): @@ -226,6 +300,8 @@ def test_execute_point(mp_tmpdir, example_mapchete, wkt_geom): "--point", str(g.centroid.x), str(g.centroid.y), + "--concurrency", + "none", ] ) @@ -245,7 +321,17 @@ def test_formats(capfd): def test_convert_geodetic(cleantopo_br_tif, mp_tmpdir): """Automatic geodetic tile pyramid creation of raster files.""" - run_cli(["convert", cleantopo_br_tif, mp_tmpdir, "--output-pyramid", "geodetic"]) + run_cli( + [ + "convert", + cleantopo_br_tif, + mp_tmpdir, + "--output-pyramid", + "geodetic", + "--concurrency", + "none", + ] + ) for zoom, row, col in [(4, 15, 31), (3, 7, 15), (2, 3, 7), (1, 1, 3)]: out_file = os.path.join(*[mp_tmpdir, str(zoom), str(row), str(col) + ".tif"]) with rasterio.open(out_file, "r") as src: @@ -257,7 +343,17 @@ def test_convert_geodetic(cleantopo_br_tif, mp_tmpdir): def test_convert_mercator(cleantopo_br_tif, mp_tmpdir): """Automatic mercator tile pyramid creation of raster files.""" - run_cli(["convert", cleantopo_br_tif, mp_tmpdir, "--output-pyramid", "mercator"]) + run_cli( + [ + "convert", + cleantopo_br_tif, + mp_tmpdir, + "--output-pyramid", + "mercator", + "--concurrency", + "none", + ] + ) for zoom, row, col in [(4, 15, 15), (3, 7, 7)]: out_file = os.path.join(*[mp_tmpdir, str(zoom), str(row), str(col) + ".tif"]) with rasterio.open(out_file, "r") as src: @@ -278,6 +374,8 @@ def test_convert_png(cleantopo_br_tif, mp_tmpdir): "mercator", "--output-format", "PNG", + "--concurrency", + "none", ] ) for zoom, row, col in [(4, 15, 15), (3, 7, 7)]: @@ -305,6 +403,8 @@ def test_convert_bidx(cleantopo_br_tif, mp_tmpdir): "3", "--bidx", "1", + "--concurrency", + "none", ] ) with rasterio.open(single_gtiff, "r") as src: @@ -327,6 +427,8 @@ def test_convert_single_gtiff(cleantopo_br_tif, mp_tmpdir): "geodetic", "-z", "3", + "--concurrency", + "none", ] ) with rasterio.open(single_gtiff, "r") as src: @@ -350,6 +452,8 @@ def test_convert_single_gtiff_cog(cleantopo_br_tif, mp_tmpdir): "-z", "3", "--cog", + "--concurrency", + "none", ] ) with rasterio.open(single_gtiff, "r") as src: @@ -375,6 +479,10 @@ def test_convert_single_gtiff_overviews(cleantopo_br_tif, mp_tmpdir): "--overviews", "--overviews-resampling-method", "bilinear", + "--multi", + "1", + "--concurrency", + "none", ] ) with rasterio.open(single_gtiff, "r") as src: @@ -397,6 +505,8 @@ def test_convert_remote_single_gtiff(http_raster, mp_tmpdir): "geodetic", "-z", "1", + "--concurrency", + "none", ] ) with rasterio.open(single_gtiff, "r") as src: @@ -417,6 +527,8 @@ def test_convert_dtype(cleantopo_br_tif, mp_tmpdir): "mercator", "--output-dtype", "uint8", + "--concurrency", + "none", ] ) for zoom, row, col in [(4, 15, 15), (3, 7, 7)]: @@ -441,6 +553,8 @@ def test_convert_scale_ratio(cleantopo_br_tif, mp_tmpdir): "uint8", "--scale-ratio", "0.003", + "--concurrency", + "none", ] ) for zoom, row, col in [(4, 15, 15), (3, 7, 7)]: @@ -466,6 +580,8 @@ def test_convert_scale_offset(cleantopo_br_tif, mp_tmpdir): "uint8", "--scale-offset", "1", + "--concurrency", + "none", ] ) for zoom, row, col in [(4, 15, 15), (3, 7, 7)]: @@ -490,6 +606,8 @@ def test_convert_clip(cleantopo_br_tif, mp_tmpdir, landpoly): "--clip-geometry", landpoly, "-v", + "--concurrency", + "none", ], output_contains="Process area is empty", ) @@ -506,6 +624,8 @@ def test_convert_zoom(cleantopo_br_tif, mp_tmpdir): "mercator", "-z", "3", + "--concurrency", + "none", ] ) for zoom, row, col in [(4, 15, 15), (2, 3, 0)]: @@ -524,6 +644,8 @@ def test_convert_zoom_minmax(cleantopo_br_tif, mp_tmpdir): "mercator", "-z", "3,4", + "--concurrency", + "none", ] ) for zoom, row, col in [(2, 3, 0)]: @@ -542,6 +664,8 @@ def test_convert_zoom_maxmin(cleantopo_br_tif, mp_tmpdir): "mercator", "-z", "4,3", + "--concurrency", + "none", ] ) for zoom, row, col in [(2, 3, 0)]: @@ -563,6 +687,8 @@ def test_convert_mapchete(cleantopo_br, mp_tmpdir): "--output-metatiling", "1", "-d", + "--concurrency", + "none", ] ) for zoom, row, col in [(4, 15, 31), (3, 7, 15), (2, 3, 7), (1, 1, 3)]: @@ -592,6 +718,8 @@ def test_convert_tiledir(cleantopo_br, mp_tmpdir): "--zoom", "1,4", "-d", + "--concurrency", + "none", ] ) for zoom, row, col in [(4, 15, 31), (3, 7, 15), (2, 3, 7), (1, 1, 3)]: @@ -613,6 +741,8 @@ def test_convert_geojson(landpoly, mp_tmpdir): "geodetic", "--zoom", "4", + "--concurrency", + "none", ] ) for (zoom, row, col), control in zip([(4, 0, 7), (4, 1, 7)], [9, 32]): @@ -639,6 +769,8 @@ def test_convert_geobuf(landpoly, mp_tmpdir): "4", "--output-format", "Geobuf", + "--concurrency", + "none", ] ) for (zoom, row, col), control in zip([(4, 0, 7), (4, 1, 7)], [9, 32]): @@ -669,6 +801,8 @@ def test_convert_geobuf(landpoly, mp_tmpdir): "4", "--output-format", "GeoJSON", + "--concurrency", + "none", ] ) for (zoom, row, col), control in zip([(4, 0, 7), (4, 1, 7)], [9, zoom9_control]): @@ -695,6 +829,8 @@ def test_convert_geobuf_multipolygon(landpoly, mp_tmpdir): "Geobuf", "--output-geometry-type", "MultiPolygon", + "--concurrency", + "none", ] ) for (zoom, row, col), control in zip([(4, 0, 7), (4, 1, 7)], [7, 30]): @@ -723,6 +859,8 @@ def test_convert_vrt(cleantopo_br_tif, mp_tmpdir): "--vrt", "--zoom", "1,4", + "--concurrency", + "none", ] ) for zoom in [4, 3, 2, 1]: @@ -766,6 +904,8 @@ def test_convert_errors(s2_band_jp2, mp_tmpdir, s2_band, cleantopo_br, landpoly) mp_tmpdir, "--output-pyramid", "geodetic", + "--concurrency", + "none", ], expected_exit_code=1, output_contains="Zoom levels required.", @@ -784,6 +924,8 @@ def test_convert_errors(s2_band_jp2, mp_tmpdir, s2_band, cleantopo_br, landpoly) "5", "--output-format", "GeoJSON", + "--concurrency", + "none", ], expected_exit_code=1, output_contains=( @@ -802,6 +944,8 @@ def test_convert_errors(s2_band_jp2, mp_tmpdir, s2_band, cleantopo_br, landpoly) "geodetic", "--zoom", "5", + "--concurrency", + "none", ], expected_exit_code=1, output_contains=("Could not determine output from extension"), @@ -875,7 +1019,9 @@ def test_serve(client, mp_tmpdir): def test_index_geojson(mp_tmpdir, cleantopo_br): # execute process at zoom 3 - run_cli(["execute", cleantopo_br.path, "-z", "3", "--debug"]) + run_cli( + ["execute", cleantopo_br.path, "-z", "3", "--debug", "--concurrency", "none"] + ) # generate index for zoom 3 run_cli(["index", cleantopo_br.path, "-z", "3", "--geojson", "--debug"]) @@ -891,7 +1037,9 @@ def test_index_geojson(mp_tmpdir, cleantopo_br): def test_index_geojson_fieldname(mp_tmpdir, cleantopo_br): # execute process at zoom 3 - run_cli(["execute", cleantopo_br.path, "-z", "3", "--debug"]) + run_cli( + ["execute", cleantopo_br.path, "-z", "3", "--debug", "--concurrency", "none"] + ) # index and rename "location" to "new_fieldname" run_cli( @@ -917,7 +1065,9 @@ def test_index_geojson_fieldname(mp_tmpdir, cleantopo_br): def test_index_geojson_basepath(mp_tmpdir, cleantopo_br): # execute process at zoom 3 - run_cli(["execute", cleantopo_br.path, "-z", "3", "--debug"]) + run_cli( + ["execute", cleantopo_br.path, "-z", "3", "--debug", "--concurrency", "none"] + ) basepath = "http://localhost" # index and rename "location" to "new_fieldname" @@ -972,7 +1122,19 @@ def test_index_geojson_for_gdal(mp_tmpdir, cleantopo_br): def test_index_geojson_tile(mp_tmpdir, cleantopo_tl): # execute process for single tile - run_cli(["execute", cleantopo_tl.path, "-t", "3", "0", "0", "--debug"]) + run_cli( + [ + "execute", + cleantopo_tl.path, + "-t", + "3", + "0", + "0", + "--debug", + "--concurrency", + "none", + ] + ) # generate index run_cli(["index", cleantopo_tl.path, "-t", "3", "0", "0", "--geojson", "--debug"]) with mapchete.open(cleantopo_tl.dict) as mp: @@ -985,7 +1147,17 @@ def test_index_geojson_tile(mp_tmpdir, cleantopo_tl): def test_index_geojson_wkt_area(mp_tmpdir, cleantopo_br, wkt_geom): # execute process at zoom 3 - run_cli(["execute", cleantopo_br.path, "--debug", "--area", wkt_geom]) + run_cli( + [ + "execute", + cleantopo_br.path, + "--debug", + "--area", + wkt_geom, + "--concurrency", + "none", + ] + ) # generate index for zoom 3 run_cli(["index", cleantopo_br.path, "--geojson", "--debug", "--area", wkt_geom]) @@ -998,7 +1170,9 @@ def test_index_geojson_wkt_area(mp_tmpdir, cleantopo_br, wkt_geom): def test_index_gpkg(mp_tmpdir, cleantopo_br): # execute process - run_cli(["execute", cleantopo_br.path, "-z", "5", "--debug"]) + run_cli( + ["execute", cleantopo_br.path, "-z", "5", "--debug", "--concurrency", "none"] + ) # generate index run_cli(["index", cleantopo_br.path, "-z", "5", "--gpkg", "--debug"]) @@ -1023,7 +1197,9 @@ def test_index_gpkg(mp_tmpdir, cleantopo_br): def test_index_shp(mp_tmpdir, cleantopo_br): # execute process - run_cli(["execute", cleantopo_br.path, "-z", "5", "--debug"]) + run_cli( + ["execute", cleantopo_br.path, "-z", "5", "--debug", "--concurrency", "none"] + ) # generate index run_cli(["index", cleantopo_br.path, "-z", "5", "--shp", "--debug"]) @@ -1048,7 +1224,9 @@ def test_index_shp(mp_tmpdir, cleantopo_br): def test_index_text(cleantopo_br): # execute process - run_cli(["execute", cleantopo_br.path, "-z", "5", "--debug"]) + run_cli( + ["execute", cleantopo_br.path, "-z", "5", "--debug", "--concurrency", "none"] + ) # generate index run_cli(["index", cleantopo_br.path, "-z", "5", "--txt", "--debug"]) @@ -1088,13 +1266,13 @@ def test_processes(): def test_callback_errors(cleantopo_tl): run_cli( - ["execute", cleantopo_tl.path, "--zoom", "4,5,7"], + ["execute", cleantopo_tl.path, "--zoom", "4,5,7", "--concurrency", "none"], expected_exit_code=2, raise_exc=False, output_contains="zooms can be maximum two items", ) run_cli( - ["execute", cleantopo_tl.path, "--zoom", "invalid"], + ["execute", cleantopo_tl.path, "--zoom", "invalid", "--concurrency", "none"], expected_exit_code=2, raise_exc=False, output_contains="zoom levels must be integer values", @@ -1115,6 +1293,8 @@ def test_cp(mp_tmpdir, cleantopo_br, wkt_geom): "-90", "180", "-80.18582802550002", + "--concurrency", + "none", ] ) out_path = os.path.join(TESTDATA_DIR, cleantopo_br.dict["output"]["path"]) @@ -1132,6 +1312,8 @@ def test_cp(mp_tmpdir, cleantopo_br, wkt_geom): "-90", "180", "-80.18582802550002", + "--concurrency", + "none", ] ) # copy all tiles @@ -1142,14 +1324,36 @@ def test_cp(mp_tmpdir, cleantopo_br, wkt_geom): os.path.join(mp_tmpdir, "all"), "-z", "5", + "--concurrency", + "none", ] ) # copy tiles and subset by area run_cli( - ["cp", out_path, os.path.join(mp_tmpdir, "all"), "-z", "5", "--area", wkt_geom] + [ + "cp", + out_path, + os.path.join(mp_tmpdir, "all"), + "-z", + "5", + "--area", + wkt_geom, + "--concurrency", + "none", + ] + ) + # copy local tiles wit using threads + run_cli( + [ + "cp", + out_path, + os.path.join(mp_tmpdir, "all"), + "-z", + "5", + "--concurrency", + "threads", + ] ) - # copy local tiles without using threads - run_cli(["cp", out_path, os.path.join(mp_tmpdir, "all"), "-z", "5", "--multi", "1"]) def test_cp_http(mp_tmpdir, http_tiledir): @@ -1166,6 +1370,8 @@ def test_cp_http(mp_tmpdir, http_tiledir): "1.0", "4.0", "2.0", + "--concurrency", + "none", ] ) @@ -1182,6 +1388,8 @@ def test_rm(mp_tmpdir, cleantopo_br): "-90", "180", "-80.18582802550002", + "--concurrency", + "none", ] ) out_path = os.path.join(TESTDATA_DIR, cleantopo_br.dict["output"]["path"]) @@ -1227,7 +1435,7 @@ def test_rm_storage_option_errors(cleantopo_br): def test_fs_opt_extractor(): - kwargs = utils._cb_key_val( + kwargs = options._cb_key_val( None, None, [ diff --git a/test/test_commands.py b/test/test_commands.py index 45a9f88b..86460357 100644 --- a/test/test_commands.py +++ b/test/test_commands.py @@ -82,6 +82,8 @@ def test_execute(mp_tmpdir, cleantopo_br, cleantopo_br_tif): tp = TilePyramid("geodetic") tiles = list(tp.tiles_from_bounds(rasterio.open(cleantopo_br_tif).bounds, zoom)) job = execute(config, zoom=zoom) + for t in job: + assert t assert len(tiles) == len(job) with mapchete.open(config) as mp: for t in tiles: @@ -89,6 +91,18 @@ def test_execute(mp_tmpdir, cleantopo_br, cleantopo_br_tif): assert not src.read(masked=True).mask.all() +def test_execute_cancel(mp_tmpdir, cleantopo_br, cleantopo_br_tif): + zoom = 5 + config = cleantopo_br.dict + config["pyramid"].update(metatiling=1) + job = execute(config, zoom=zoom, as_iterator=True) + for i, t in enumerate(job): + job.cancel() + break + assert i == 0 + assert job.status == "cancelled" + + def test_execute_tile(mp_tmpdir, cleantopo_br): tile = (5, 30, 63) @@ -209,6 +223,7 @@ def test_convert_single_gtiff_overviews(cleantopo_br_tif, mp_tmpdir): zoom=7, overviews=True, overviews_resampling_method="bilinear", + concurrency=None, ) assert len(job) with rasterio.open(single_gtiff, "r") as src: @@ -222,7 +237,9 @@ def test_convert_single_gtiff_overviews(cleantopo_br_tif, mp_tmpdir): def test_convert_remote_single_gtiff(http_raster, mp_tmpdir): """Automatic geodetic tile pyramid creation of raster files.""" single_gtiff = os.path.join(mp_tmpdir, "single_out.tif") - job = convert(http_raster, single_gtiff, output_pyramid="geodetic", zoom=1) + job = convert( + http_raster, single_gtiff, output_pyramid="geodetic", zoom=1, concurrency=None + ) assert len(job) with rasterio.open(single_gtiff, "r") as src: assert src.meta["driver"] == "GTiff" diff --git a/test/test_config.py b/test/test_config.py index 0e823a5b..35339852 100644 --- a/test/test_config.py +++ b/test/test_config.py @@ -442,3 +442,8 @@ def test_init_overrides_config(example_mapchete): with mapchete.open(dict(example_mapchete.dict, bounds=process_bounds)) as mp: assert mp.config.bounds == process_bounds assert mp.config.init_bounds == process_bounds + + +def test_custom_process(example_custom_process_mapchete): + with mapchete.open(example_custom_process_mapchete.dict) as mp: + assert callable(mp.config.process_func) diff --git a/test/test_errors.py b/test/test_errors.py index 52290a9c..51e714c6 100644 --- a/test/test_errors.py +++ b/test/test_errors.py @@ -300,7 +300,7 @@ def test_finished_task(): assert task.exception() with pytest.raises(ZeroDivisionError): task.result() - assert "FinishedTask" in str(task) + assert "FakeFuture" in str(task) def test_strip_zoom_error(files_zooms): diff --git a/test/test_executor.py b/test/test_executor.py new file mode 100644 index 00000000..9c33b4d2 --- /dev/null +++ b/test/test_executor.py @@ -0,0 +1,99 @@ +import pytest +import time + +from mapchete import Executor +from mapchete._executor import FakeFuture + + +def _dummy_process(i, sleep=0): + time.sleep(sleep) + return i + 1 + + +def test_sequential_executor(): + items = 10 + count = 0 + with Executor(concurrency=None) as executor: + # process all + for future in executor.as_completed(_dummy_process, range(items)): + count += 1 + assert future.result() + assert items == count + + # abort + cancelled = False + for future in executor.as_completed(_dummy_process, range(items)): + if cancelled: + raise RuntimeError() + assert future.result() + cancelled = True + executor.cancel() + + +def test_concurrent_futures_processes_executor(): + items = 10 + with Executor(concurrency="processes") as executor: + # process all + count = 0 + for future in executor.as_completed(_dummy_process, range(items)): + count += 1 + assert future.result() + + +def test_concurrent_futures_processes_executor_cancel(): + items = 100 + with Executor(concurrency="processes", max_workers=2) as executor: + # abort + for future in executor.as_completed( + _dummy_process, range(items), fkwargs=dict(sleep=2) + ): + assert future.result() + executor.cancel() + break + + assert any([future.cancelled() for future in executor.futures]) + + +def test_concurrent_futures_threads_executor(): + items = 100 + with Executor(concurrency="threads", max_workers=2) as executor: + # abort + for future in executor.as_completed( + _dummy_process, range(items), fkwargs=dict(sleep=2) + ): + assert future.result() + executor.cancel() + break + + assert any([future.cancelled() for future in executor.futures]) + + +def test_dask_executor(): + items = 100 + with Executor(concurrency="dask", max_workers=2) as executor: + # abort + for future in executor.as_completed( + _dummy_process, range(items), fkwargs=dict(sleep=2) + ): + assert future.result() + executor.cancel() + break + + assert any([future.cancelled() for future in executor.futures]) + + +def test_fake_future(): + def task(*args, **kwargs): + return True + + def failing_task(*args, **kwargs): + raise RuntimeError() + + future = FakeFuture(task, fargs=[1, True], fkwargs=dict(foo="bar")) + assert future.result() + assert not future.exception() + + future = FakeFuture(failing_task, fargs=[1, True], fkwargs=dict(foo="bar")) + with pytest.raises(RuntimeError): + future.result() + assert future.exception() diff --git a/test/test_formats_tiledir_input.py b/test/test_formats_tiledir_input.py index d4cf8bc5..53815185 100644 --- a/test/test_formats_tiledir_input.py +++ b/test/test_formats_tiledir_input.py @@ -157,18 +157,6 @@ def test_no_metadata_json(mp_tmpdir, cleantopo_br_tiledir): ) -@pytest.mark.remote -def test_read_remote_raster_data(mp_tmpdir, cleantopo_remote): - """Read raster data.""" - with mapchete.open(cleantopo_remote.path) as mp: - assert all( - [ - next(iter(mp.config.input.values())).open(tile).read().any() - for tile in mp.get_process_tiles(1) - ] - ) - - def test_parse_errors(geojson_tiledir, cleantopo_br_tiledir): """Different configuration exceptions.""" # without path diff --git a/test/test_mapchete.py b/test/test_mapchete.py index 66e3632e..56a31072 100644 --- a/test/test_mapchete.py +++ b/test/test_mapchete.py @@ -500,10 +500,10 @@ def test_skip_tiles(mp_tmpdir, cleantopo_tl): def test_custom_grid(mp_tmpdir, custom_grid): """Cutom grid processing.""" # process and save - with mapchete.open(custom_grid.dict) as mp: + with mapchete.open(custom_grid.path) as mp: mp.batch_process() # read written output - with mapchete.open(custom_grid.dict) as mp: + with mapchete.open(custom_grid.path) as mp: for tile in mp.get_process_tiles(5): data = mp.config.output.read(tile) assert data.any() diff --git a/test/testdata/cleantopo_br_metatiling_1.mapchete b/test/testdata/cleantopo_br_metatiling_1.mapchete new file mode 100644 index 00000000..43e651d5 --- /dev/null +++ b/test/testdata/cleantopo_br_metatiling_1.mapchete @@ -0,0 +1,15 @@ +process: ../example_process.py +zoom_levels: + min: 0 + max: 5 +pyramid: + grid: geodetic + pixelbuffer: 20 +input: + file1: cleantopo_br.tif +output: + dtype: uint16 + bands: 1 + format: GTiff + path: tmp/cleantopo_br_metatiling_1 + pixelbuffer: 20 diff --git a/test/testdata/example_custom_process.mapchete b/test/testdata/example_custom_process.mapchete new file mode 100644 index 00000000..782c1181 --- /dev/null +++ b/test/testdata/example_custom_process.mapchete @@ -0,0 +1,54 @@ +# mandatory parameters +###################### +# this is the location of user python code: +process: + - '"""Example process file."""' + - '' + - '' + - 'def execute(mp):' + - ' """User defined process."""' + - ' # Reading and writing data works like this:' + - ' with mp.open("file1") as raster_file:' + - ' if raster_file.is_empty():' + - ' return "empty"' + - ' # This assures a transparent tile instead of a pink error tile' + - ' # is returned when using mapchete serve.' + - ' dem = raster_file.read(resampling="bilinear")' + - ' return dem' + + +# zoom level range: +zoom_levels: + min: 7 + max: 11 +# or define single zoom level +# zoom_levels: 5 + +# geographical subset: +# bounds: [1.0, 2.0, 3.0, 4.0] + +# output pyramid definition + +pyramid: + grid: geodetic + metatiling: 1 # can be 1, 2, 4, 8, 16 (default 1) + + +input: + file1: + zoom>=10: dummy1.tif + file2: dummy2.tif +output: + path: tmp/example_custom_process + format: GTiff + dtype: float32 + bands: 3 + +# free parameters +################# +some_integer_parameter: 12 +some_float_parameter: 5.3 +some_string_parameter: + zoom<=7: string1 + zoom>7: string2 +some_bool_parameter: true