diff --git a/mapchete/path.py b/mapchete/path.py index 50899df1..8a7d5771 100644 --- a/mapchete/path.py +++ b/mapchete/path.py @@ -17,8 +17,9 @@ from rasterio.session import Session as RioSession from retry import retry +import mapchete from mapchete.executor import Executor -from mapchete.settings import GDALHTTPOptions, IORetrySettings +from mapchete.settings import GDALHTTPOptions, IORetrySettings, mapchete_options from mapchete.tile import BufferedTile from mapchete.types import MPathLike @@ -780,7 +781,7 @@ def _batch_tiles_by_attribute(tiles: List[BufferedTile], attribute: str = "row") def _output_tiles_batches_exist(output_tiles_batches, config, is_https_without_ls): - with Executor(concurrency="threads") as executor: + with Executor(concurrency=mapchete_options.tiles_exist_concurrency) as executor: for batch in executor.as_completed( _output_tiles_batch_exists, (list(b) for b in output_tiles_batches), @@ -798,7 +799,7 @@ def _output_tiles_batch_exists(tiles, config, is_https_without_ls): for output_tile in tiles } # iterate through output tile rows and determine existing output tiles - existing_tiles = _existing_tiles( + existing_tiles = _existing_output_tiles( output_rows=[tiles[0].row], output_paths=output_paths, config=config, @@ -811,7 +812,7 @@ def _output_tiles_batch_exists(tiles, config, is_https_without_ls): def _process_tiles_batches_exist(process_tiles_batches, config, is_https_without_ls): - with Executor(concurrency="threads") as executor: + with Executor(concurrency=mapchete_options.tiles_exist_concurrency) as executor: for batch in executor.as_completed( _process_tiles_batch_exists, (list(b) for b in process_tiles_batches), @@ -821,32 +822,43 @@ def _process_tiles_batches_exist(process_tiles_batches, config, is_https_without def _process_tiles_batch_exists(tiles, config, is_https_without_ls): + def _all_output_tiles_exist(process_tile, existing_output_tiles): + # a process tile only exists if all of its output tiles exist + for output_tile in config.output_pyramid.intersecting(process_tile): + if output_tile not in existing_output_tiles: + return False + else: + return True + if tiles: zoom = tiles[0].zoom # determine output tile rows output_rows = sorted( list(set(t.row for t in config.output_pyramid.intersecting(tiles[0]))) ) - # determine output paths + # determine all output paths output_paths = { - config.output_reader.get_path(output_tile).crop(-3): process_tile + config.output_reader.get_path(output_tile).crop(-3): output_tile for process_tile in tiles for output_tile in config.output_pyramid.intersecting(process_tile) } # iterate through output tile rows and determine existing process tiles - existing_tiles = _existing_tiles( + existing_output_tiles = _existing_output_tiles( output_rows=output_rows, output_paths=output_paths, config=config, zoom=zoom, is_https_without_ls=is_https_without_ls, ) - return [(tile, tile in existing_tiles) for tile in tiles] + return [ + (tile, _all_output_tiles_exist(tile, existing_output_tiles)) + for tile in tiles + ] else: # pragma: no cover return [] -def _existing_tiles( +def _existing_output_tiles( output_rows=None, output_paths=None, config=None, diff --git a/mapchete/settings.py b/mapchete/settings.py index 16d272fa..23d14693 100644 --- a/mapchete/settings.py +++ b/mapchete/settings.py @@ -9,6 +9,8 @@ from pydantic_settings import BaseSettings, SettingsConfigDict from rasterio.errors import RasterioError +from mapchete.executor import Concurrency + # defaults sets according to the recommendations given at # https://developmentseed.org/titiler/advanced/performance_tuning/ @@ -71,6 +73,7 @@ class IORetrySettings(BaseSettings): class MapcheteOptions(BaseSettings): # timeout granted when fetching future results or exceptions future_timeout: int = 10 + tiles_exist_concurrency: Concurrency = Concurrency.threads # read from environment model_config = SettingsConfigDict(env_prefix="MAPCHETE_") diff --git a/test/test_io.py b/test/test_io.py index 8e6e1330..1be47325 100644 --- a/test/test_io.py +++ b/test/test_io.py @@ -201,17 +201,23 @@ def test_tiles_exist_local(example_mapchete): output_tiles = list(mp.config.output_pyramid.tiles_from_bounds(bounds, zoom)) # see which files were written and create set for output_tiles and process_tiles - out_path = os.path.join( - SCRIPTDIR, example_mapchete.dict["output"]["path"], str(zoom) - ) written_output_tiles = set() - for root, dirs, files in os.walk(out_path): - for file in files: - zoom, row = map(int, root.split("/")[-2:]) - col = int(file.split(".")[0]) + for rowdir in (example_mapchete.output_path / zoom).ls(): + for file in rowdir.ls(): + zoom, row, col = map(int, file.without_suffix().elements[-3:]) written_output_tiles.add(mp.config.output_pyramid.tile(zoom, row, col)) - written_process_tiles = set( - [mp.config.process_pyramid.intersecting(t)[0] for t in written_output_tiles] + + full_process_tiles = set( + [ + tile + for tile in process_tiles + if all( + [ + output_tile in written_output_tiles + for output_tile in mp.config.output_pyramid.intersecting(tile) + ] + ) + ] ) # process tiles @@ -224,7 +230,7 @@ def test_tiles_exist_local(example_mapchete): existing.add(tile) else: not_existing.add(tile) - assert existing == written_process_tiles + assert existing == full_process_tiles assert not_existing assert set(process_tiles) == existing.union(not_existing) @@ -267,8 +273,17 @@ def test_tiles_exist_s3(gtiff_s3): for t in output_tiles: if mp.config.output_reader.tiles_exist(output_tile=t): written_output_tiles.add(t) - written_process_tiles = set( - [mp.config.process_pyramid.intersecting(t)[0] for t in written_output_tiles] + full_process_tiles = set( + [ + tile + for tile in process_tiles + if all( + [ + output_tile in written_output_tiles + for output_tile in mp.config.output_pyramid.intersecting(tile) + ] + ) + ] ) # process tiles @@ -281,7 +296,7 @@ def test_tiles_exist_s3(gtiff_s3): existing.add(tile) else: not_existing.add(tile) - assert existing == written_process_tiles + assert existing == full_process_tiles assert set(process_tiles) == existing.union(not_existing) # output tiles diff --git a/test/test_processing_base.py b/test/test_processing_base.py index 954c6085..6b6b5a57 100644 --- a/test/test_processing_base.py +++ b/test/test_processing_base.py @@ -324,6 +324,28 @@ def test_update_overviews(overviews, concurrency, process_graph, dask_executor): assert not np.array_equal(overview_before, overview_after) +def test_larger_process_tiles_than_output_tiles(cleantopo_br): + config = cleantopo_br.dict.copy() + config["output"].update(metatiling=1) + config["pyramid"].update(metatiling=2) + zoom = 5 + with mapchete.open(config) as mp: + # there are tasks to be done + assert mp.tasks(zoom=zoom) + + # execute tasks + list(mp.execute(zoom=zoom)) + + # there are no tasks + assert not mp.tasks(zoom=zoom) + + # remove one output tile + (cleantopo_br.output_path / 5).ls()[0].ls()[0].rm() + + # now there are tasks to be done again + assert mp.tasks(zoom=zoom) + + def test_baselevels_buffer(baselevels): """Baselevel interpolation using buffers.""" config = baselevels.dict