Skip to content

Commit

Permalink
restructure processing module
Browse files Browse the repository at this point in the history
  • Loading branch information
ungarj committed Nov 7, 2023
1 parent 9572f56 commit 644bded
Show file tree
Hide file tree
Showing 11 changed files with 870 additions and 853 deletions.
3 changes: 1 addition & 2 deletions mapchete/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from mapchete.executor import Executor, MFuture
from mapchete.formats import read_output_metadata
from mapchete.path import MPath, fs_from_path
from mapchete.processing import Job, Mapchete, MapcheteProcess, ProcessInfo
from mapchete.processing import Job, Mapchete, MapcheteProcess
from mapchete.tile import count_tiles
from mapchete.timer import Timer
from mapchete.types import MPathLike
Expand All @@ -17,7 +17,6 @@
"count_tiles",
"Mapchete",
"MapcheteProcess",
"ProcessInfo",
"Timer",
"Executor",
"MFuture",
Expand Down
2 changes: 1 addition & 1 deletion mapchete/commands/_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import mapchete
from mapchete.config.parse import bounds_from_opts, raw_conf, raw_conf_process_pyramid
from mapchete.processing.job import PreprocessingProcessInfo, TileProcessInfo
from mapchete.processing.types import PreprocessingProcessInfo, TileProcessInfo

logger = logging.getLogger(__name__)

Expand Down
7 changes: 7 additions & 0 deletions mapchete/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ class ProcessingMode(str, Enum):
READONLY = "readonly"
OVERWRITE = "overwrite"
MEMORY = "memory"


class Concurrency(str, Enum):
none = "none"
threads = "threads"
processes = "processes"
dask = "dask"
6 changes: 4 additions & 2 deletions mapchete/executor/future.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def exception(self, **kwargs) -> Union[Exception, None]:

def cancelled(self) -> bool: # pragma: no cover
"""Sequential futures cannot be cancelled."""
return False
return self._cancelled

def _populate_from_future(self, timeout: int = FUTURE_TIMEOUT, **kwargs):
"""Fill internal cache with future.result() if future was provided."""
Expand Down Expand Up @@ -179,7 +179,9 @@ def failed_or_cancelled(self) -> bool:
This is a workaround between the slightly different APIs of dask and concurrent.futures.
It also tries to avoid potentially expensive calls to the dask scheduler.
"""
if self.status:
if self.cancelled():
return True
elif self.status:
return self.status in ["error", "cancelled"]
# concurrent.futures futures
else:
Expand Down
4 changes: 2 additions & 2 deletions mapchete/processing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from mapchete.processing.base import Mapchete
from mapchete.processing.job import Job, ProcessInfo
from mapchete.processing.job import Job
from mapchete.processing.mp import MapcheteProcess

__all__ = ["Mapchete", "Job", "ProcessInfo", "MapcheteProcess"]
__all__ = ["Mapchete", "Job", "MapcheteProcess"]
10 changes: 5 additions & 5 deletions mapchete/processing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from mapchete.errors import MapcheteNodataTile, ReprojectionFailed
from mapchete.executor import MULTIPROCESSING_DEFAULT_START_METHOD
from mapchete.path import tiles_exist
from mapchete.processing.job import (
ProcessInfo,
from mapchete.processing.compute import (
TileProcessInfo,
_preprocess,
_run_area,
_run_on_single_tile,
Expand Down Expand Up @@ -494,7 +494,7 @@ def write(self, process_tile, data):
):
message = "output exists, not overwritten"
logger.debug((process_tile.id, message))
return ProcessInfo(
return TileProcessInfo(
tile=process_tile,
processed=False,
process_msg=None,
Expand All @@ -504,7 +504,7 @@ def write(self, process_tile, data):
elif data is None:
message = "output empty, nothing written"
logger.debug((process_tile.id, message))
return ProcessInfo(
return TileProcessInfo(
tile=process_tile,
processed=False,
process_msg=None,
Expand All @@ -516,7 +516,7 @@ def write(self, process_tile, data):
self.config.output.write(process_tile=process_tile, data=data)
message = "output written in %s" % t
logger.debug((process_tile.id, message))
return ProcessInfo(
return TileProcessInfo(
tile=process_tile,
processed=False,
process_msg=None,
Expand Down
Loading

0 comments on commit 644bded

Please sign in to comment.