Skip to content

Commit

Permalink
Merge pull request #346 from ungarj/dask
Browse files Browse the repository at this point in the history
add dask support for distributed processing
  • Loading branch information
ungarj authored Aug 17, 2021
2 parents f7d1a74 + 3ed0555 commit a58e007
Show file tree
Hide file tree
Showing 37 changed files with 1,364 additions and 571 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ venv/
.eggs/
htmlcov/
.pytest*
.env
.env
dask-worker-space/
7 changes: 7 additions & 0 deletions doc/source/apidoc/mapchete.cli.options.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mapchete.cli.options module
===========================

.. automodule:: mapchete.cli.options
:members:
:undoc-members:
:show-inheritance:
2 changes: 1 addition & 1 deletion doc/source/apidoc/mapchete.cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Submodules
:maxdepth: 4

mapchete.cli.main
mapchete.cli.utils
mapchete.cli.options

Module contents
---------------
Expand Down
3 changes: 2 additions & 1 deletion mapchete/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging

from mapchete._core import open, Mapchete
from mapchete._processing import MapcheteProcess, ProcessInfo
from mapchete._executor import Executor
from mapchete._processing import Job, MapcheteProcess, ProcessInfo
from mapchete.tile import count_tiles
from mapchete._timer import Timer

Expand Down
39 changes: 26 additions & 13 deletions mapchete/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import threading

from mapchete.config import MapcheteConfig
from mapchete.config import MapcheteConfig, MULTIPROCESSING_DEFAULT_START_METHOD
from mapchete.errors import MapcheteNodataTile
from mapchete.formats import read_output_metadata
from mapchete.io import fs_from_path, tiles_exist
Expand Down Expand Up @@ -83,8 +83,8 @@ def open(some_input, with_cache=False, fs=None, fs_kwargs=None, **kwargs):
)
kwargs.update(mode="readonly")
return Mapchete(MapcheteConfig(config, **kwargs))
else:
return Mapchete(MapcheteConfig(some_input, **kwargs), with_cache=with_cache)

return Mapchete(MapcheteConfig(some_input, **kwargs), with_cache=with_cache)


class Mapchete(object):
Expand Down Expand Up @@ -155,9 +155,9 @@ def get_process_tiles(self, zoom=None):
):
yield tile
else:
for zoom in reversed(self.config.zoom_levels):
for i in reversed(self.config.zoom_levels):
for tile in self.config.process_pyramid.tiles_from_geom(
self.config.area_at_zoom(zoom), zoom
self.config.area_at_zoom(i), i
):
yield tile

Expand Down Expand Up @@ -188,11 +188,14 @@ def batch_process(
self,
zoom=None,
tile=None,
distributed=False,
dask_scheduler=None,
multi=None,
max_chunksize=1,
multiprocessing_module=None,
multiprocessing_start_method="fork",
multiprocessing_start_method=MULTIPROCESSING_DEFAULT_START_METHOD,
skip_output_check=False,
executor=None,
):
"""
Process a large batch of tiles.
Expand All @@ -217,7 +220,7 @@ def batch_process(
(default: multiprocessing)
multiprocessing_start_method : str
"fork", "forkserver" or "spawn"
(default: "fork")
(default: "spawn")
skip_output_check : bool
skip checking whether process tiles already have existing output before
starting to process;
Expand All @@ -226,23 +229,29 @@ def batch_process(
self.batch_processor(
zoom=zoom,
tile=tile,
distributed=distributed,
dask_scheduler=dask_scheduler,
multi=multi or multiprocessing.cpu_count(),
max_chunksize=max_chunksize,
multiprocessing_module=multiprocessing_module or multiprocessing,
multiprocessing_start_method=multiprocessing_start_method,
skip_output_check=skip_output_check,
executor=executor,
)
)

def batch_processor(
self,
zoom=None,
tile=None,
distributed=False,
dask_scheduler=None,
multi=None,
max_chunksize=1,
multiprocessing_module=None,
multiprocessing_start_method="fork",
multiprocessing_start_method=MULTIPROCESSING_DEFAULT_START_METHOD,
skip_output_check=False,
executor=None,
):
"""
Process a large batch of tiles and yield report messages per tile.
Expand All @@ -265,7 +274,7 @@ def batch_processor(
(default: multiprocessing)
multiprocessing_start_method : str
"fork", "forkserver" or "spawn"
(default: "fork")
(default: "spawn")
skip_output_check : bool
skip checking whether process tiles already have existing output before
starting to process;
Expand All @@ -276,18 +285,23 @@ def batch_processor(
# run single tile
if tile:
yield _run_on_single_tile(
process=self, tile=self.config.process_pyramid.tile(*tuple(tile))
executor=executor,
process=self,
tile=self.config.process_pyramid.tile(*tuple(tile)),
)
# run area
else:
for process_info in _run_area(
process=self,
zoom_levels=list(_get_zoom_level(zoom, self)),
distributed=distributed,
dask_scheduler=dask_scheduler,
multi=multi or multiprocessing.cpu_count(),
max_chunksize=max_chunksize,
multiprocessing_module=multiprocessing_module or multiprocessing,
multiprocessing_start_method=multiprocessing_start_method,
skip_output_check=skip_output_check,
executor=executor,
):
yield process_info

Expand Down Expand Up @@ -315,7 +329,7 @@ def count_tiles(self, minzoom=None, maxzoom=None, init_zoom=0):
self.config.process_pyramid,
minzoom,
maxzoom,
init_zoom=0,
init_zoom=init_zoom,
)
return self._count_tiles_cache[(minzoom, maxzoom)]

Expand Down Expand Up @@ -344,8 +358,7 @@ def execute(self, process_tile, raise_nodata=False):
except MapcheteNodataTile:
if raise_nodata: # pragma: no cover
raise
else:
return self.config.output.empty(process_tile)
return self.config.output.empty(process_tile)

def read(self, output_tile):
"""
Expand Down
Loading

0 comments on commit a58e007

Please sign in to comment.