From 2a468c4a2f382f1c5c4887787724042e3fe067d0 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Mon, 12 Jun 2023 16:00:04 +0200 Subject: [PATCH] don't let MPath.makedirs() decide whether to only create parent directories or not; ProcessFixture now processes all preprocessing tasks using SequentialExecutor (#546) --- mapchete/_core.py | 2 +- mapchete/formats/base.py | 6 +----- mapchete/io/_json.py | 2 +- mapchete/io/_misc.py | 2 +- mapchete/io/raster.py | 3 +-- mapchete/io/vector.py | 3 +-- mapchete/path.py | 6 ++---- mapchete/stac.py | 2 +- mapchete/testing.py | 18 +++++++++++------- test/test_mapchete.py | 3 +-- test/test_path.py | 2 +- 11 files changed, 22 insertions(+), 27 deletions(-) diff --git a/mapchete/_core.py b/mapchete/_core.py index 96af0066..aabf4176 100644 --- a/mapchete/_core.py +++ b/mapchete/_core.py @@ -707,7 +707,7 @@ def write_stac(self, indent=4): bands_type=self.config.output.stac_asset_type, ) logger.debug("write STAC item JSON to %s", self.config.output.stac_path) - self.config.output.stac_path.makedirs() + self.config.output.stac_path.parent.makedirs() with self.config.output.stac_path.open("w") as dst: dst.write(json.dumps(item.to_dict(), indent=indent)) except ReprojectionFailed: diff --git a/mapchete/formats/base.py b/mapchete/formats/base.py index 748bbd60..59bd4153 100644 --- a/mapchete/formats/base.py +++ b/mapchete/formats/base.py @@ -6,7 +6,6 @@ """ import logging -import os import types import warnings from itertools import chain @@ -110,7 +109,6 @@ def exists(self): def cleanup(self): """Optional cleanup function called when Mapchete exits.""" - pass def add_preprocessing_task( self, func, fargs=None, fkwargs=None, key=None, geometry=None, bounds=None @@ -230,7 +228,6 @@ def __enter__(self): def __exit__(self, t, v, tb): """Clean up.""" - pass class OutputDataBaseFunctions: @@ -447,7 +444,7 @@ def prepare_path(self, tile): tile : ``BufferedTile`` must be member of output ``TilePyramid`` """ - self.get_path(tile).makedirs() + self.get_path(tile).parent.makedirs() def output_is_valid(self, process_data): """ @@ -508,7 +505,6 @@ def streamline_output(self, process_data): def close(self, exc_type=None, exc_value=None, exc_traceback=None): """Gets called if process is closed.""" - pass class TileDirectoryOutputReader(OutputDataReader): diff --git a/mapchete/io/_json.py b/mapchete/io/_json.py index d10bd367..1a640cf6 100644 --- a/mapchete/io/_json.py +++ b/mapchete/io/_json.py @@ -10,7 +10,7 @@ def write_json(path, params, fs=None, **kwargs): """Write local or remote.""" logger.debug(f"write {params} to {path}") path = MPath.from_inp(path, fs=fs, **kwargs) - path.makedirs() + path.parent.makedirs() with path.open(mode="w") as dst: json.dump(params, dst, sort_keys=True, indent=4) diff --git a/mapchete/io/_misc.py b/mapchete/io/_misc.py index d9c2311b..19614037 100644 --- a/mapchete/io/_misc.py +++ b/mapchete/io/_misc.py @@ -183,7 +183,7 @@ def copy(src_path, dst_path, src_fs=None, dst_fs=None, overwrite=False): raise IOError(f"{dst_path} already exists") # create parent directories on local filesystems - dst_path.makedirs() + dst_path.parent.makedirs() # copy either within a filesystem or between filesystems if src_path.fs == dst_path.fs: diff --git a/mapchete/io/raster.py b/mapchete/io/raster.py index c1f439c4..f3252013 100644 --- a/mapchete/io/raster.py +++ b/mapchete/io/raster.py @@ -54,7 +54,6 @@ def rasterio_read(path, mode="r", **kwargs): Wrapper around rasterio.open but rasterio.Env is set according to path properties. """ path = MPath.from_inp(path) - with path.rio_env() as env: logger.debug("reading %s with GDAL options %s", str(path), env.options) with rasterio.open(path, mode=mode, **kwargs) as src: @@ -1292,7 +1291,7 @@ def convert_raster(inp, out, overwrite=False, exists_ok=True, **kwargs): dst.write(src.read()) else: logger.debug("copy %s to %s", inp, (out)) - out.makedirs() + out.parent.makedirs() copy(inp, out, overwrite=overwrite) diff --git a/mapchete/io/vector.py b/mapchete/io/vector.py index ce01ff89..dbe33bb1 100644 --- a/mapchete/io/vector.py +++ b/mapchete/io/vector.py @@ -93,7 +93,6 @@ def fiona_write(path, mode="w", fs=None, in_memory=True, *args, **kwargs): FionaRemoteWriter if target is remote, otherwise return fiona.open(). """ path = MPath.from_inp(path) - if path.is_remote(): if "s3" in path.protocols: # pragma: no cover try: @@ -611,7 +610,7 @@ def convert_vector(inp, out, overwrite=False, exists_ok=True, **kwargs): dst.writerecords(src) else: logger.debug("copy %s to %s", str(inp), str(out)) - out.makedirs() + out.parent.makedirs() copy(inp, out, overwrite=overwrite) diff --git a/mapchete/path.py b/mapchete/path.py index 880dcc1d..82076db7 100644 --- a/mapchete/path.py +++ b/mapchete/path.py @@ -279,10 +279,8 @@ def makedirs(self, exist_ok=True) -> None: # create parent directories on local filesystems if self.fs.protocol == "file": # if path has no suffix, assume a file path and only create parent directories - if self.suffix != "": - self.fs.makedirs(self.dirname, exist_ok=exist_ok) - else: - self.fs.makedirs(self, exist_ok=exist_ok) + logger.debug("create directory %s", str(self)) + self.fs.makedirs(self, exist_ok=exist_ok) def ls(self, detail=False): if detail: diff --git a/mapchete/stac.py b/mapchete/stac.py index b0cb7e00..3edb9d9e 100644 --- a/mapchete/stac.py +++ b/mapchete/stac.py @@ -453,7 +453,7 @@ def create_prototype_files(mp): else: logger.debug("creating prototype tile %s", tile_path) out_profile = mp.config.output.profile(prototype_tile) - tile_path.makedirs() + tile_path.parent.makedirs() write_raster_window( in_tile=prototype_tile, in_data=ma.masked_array( diff --git a/mapchete/testing.py b/mapchete/testing.py index b2a8f7ee..3d36ebfc 100644 --- a/mapchete/testing.py +++ b/mapchete/testing.py @@ -84,7 +84,7 @@ def __enter__(self, *args): temp_path = ( self._inp_cache_tempdir / key / "cache" / path.name ) - temp_path.makedirs() + temp_path.parent.makedirs() val["cache"]["path"] = temp_path # replace output path with temporary path if self._tempdir: @@ -98,7 +98,7 @@ def __enter__(self, *args): self.path = self._tempdir / self.path.name # dump modified mapchete config to temporary directory - self.path.makedirs() + self.path.parent.makedirs() with self.path.open("w") as dst: dst.write(dict_to_yaml(self.dict)) @@ -144,11 +144,15 @@ def mp(self, batch_preprocess=True): """ Return Mapchete object from mapchete.open(). """ - if not self._mp: - self._mp = mapchete.open(self.dict) - if batch_preprocess: - self._mp.batch_preprocess() - return self._mp + from mapchete._executor import SequentialExecutor + + with SequentialExecutor() as executor: + if not self._mp: + self._mp = mapchete.open(self.dict) + if batch_preprocess: + self._mp.batch_preprocess(executor=executor) + + return self._mp def first_process_tile(self, zoom=None): zoom = zoom or max(self.mp().config.zoom_levels) diff --git a/test/test_mapchete.py b/test/test_mapchete.py index 274e185e..8d7f3973 100644 --- a/test/test_mapchete.py +++ b/test/test_mapchete.py @@ -353,7 +353,6 @@ def test_baselevels_output_buffer(mp_tmpdir, baselevels_output_buffer): ) subset = src.read(window=window, masked=True) assert not subset.mask.any() - pass def test_baselevels_buffer_antimeridian(mp_tmpdir, baselevels): @@ -394,7 +393,7 @@ def test_processing(mp_tmpdir, cleantopo_br, cleantopo_tl): mosaic = create_mosaic(tiles) try: temp_vrt = mp_tmpdir / zoom + ".vrt" - temp_vrt.makedirs() + temp_vrt.parent.makedirs() gdalbuildvrt = "gdalbuildvrt %s %s/%s/*/*.tif > /dev/null" % ( temp_vrt, mp.config.output.path, diff --git a/test/test_path.py b/test/test_path.py index ff097c2d..052685d9 100644 --- a/test/test_path.py +++ b/test/test_path.py @@ -106,7 +106,7 @@ def test_parse_local_dirpath(path_str): def test_makedirs_filepath(mp_tmpdir): path = MPath(mp_tmpdir).joinpath("path_mkdir_test", "file.ext") - path.makedirs() + path.parent.makedirs() assert path.parent.exists() assert not path.exists()