Skip to content

Commit

Permalink
Merge pull request #622 from ungarj/fix_621_tiles_exist_process_tiles
Browse files Browse the repository at this point in the history
fix tiles exist check on output tiles smaller than process tiles
  • Loading branch information
ungarj authored Feb 13, 2024
2 parents a1e0b18 + ff29ef4 commit 0b2363f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 22 deletions.
30 changes: 21 additions & 9 deletions mapchete/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from rasterio.session import Session as RioSession
from retry import retry

import mapchete
from mapchete.executor import Executor
from mapchete.settings import GDALHTTPOptions, IORetrySettings
from mapchete.settings import GDALHTTPOptions, IORetrySettings, mapchete_options
from mapchete.tile import BufferedTile
from mapchete.types import MPathLike

Expand Down Expand Up @@ -780,7 +781,7 @@ def _batch_tiles_by_attribute(tiles: List[BufferedTile], attribute: str = "row")


def _output_tiles_batches_exist(output_tiles_batches, config, is_https_without_ls):
with Executor(concurrency="threads") as executor:
with Executor(concurrency=mapchete_options.tiles_exist_concurrency) as executor:
for batch in executor.as_completed(
_output_tiles_batch_exists,
(list(b) for b in output_tiles_batches),
Expand All @@ -798,7 +799,7 @@ def _output_tiles_batch_exists(tiles, config, is_https_without_ls):
for output_tile in tiles
}
# iterate through output tile rows and determine existing output tiles
existing_tiles = _existing_tiles(
existing_tiles = _existing_output_tiles(
output_rows=[tiles[0].row],
output_paths=output_paths,
config=config,
Expand All @@ -811,7 +812,7 @@ def _output_tiles_batch_exists(tiles, config, is_https_without_ls):


def _process_tiles_batches_exist(process_tiles_batches, config, is_https_without_ls):
with Executor(concurrency="threads") as executor:
with Executor(concurrency=mapchete_options.tiles_exist_concurrency) as executor:
for batch in executor.as_completed(
_process_tiles_batch_exists,
(list(b) for b in process_tiles_batches),
Expand All @@ -821,32 +822,43 @@ def _process_tiles_batches_exist(process_tiles_batches, config, is_https_without


def _process_tiles_batch_exists(tiles, config, is_https_without_ls):
def _all_output_tiles_exist(process_tile, existing_output_tiles):
# a process tile only exists if all of its output tiles exist
for output_tile in config.output_pyramid.intersecting(process_tile):
if output_tile not in existing_output_tiles:
return False
else:
return True

if tiles:
zoom = tiles[0].zoom
# determine output tile rows
output_rows = sorted(
list(set(t.row for t in config.output_pyramid.intersecting(tiles[0])))
)
# determine output paths
# determine all output paths
output_paths = {
config.output_reader.get_path(output_tile).crop(-3): process_tile
config.output_reader.get_path(output_tile).crop(-3): output_tile
for process_tile in tiles
for output_tile in config.output_pyramid.intersecting(process_tile)
}
# iterate through output tile rows and determine existing process tiles
existing_tiles = _existing_tiles(
existing_output_tiles = _existing_output_tiles(
output_rows=output_rows,
output_paths=output_paths,
config=config,
zoom=zoom,
is_https_without_ls=is_https_without_ls,
)
return [(tile, tile in existing_tiles) for tile in tiles]
return [
(tile, _all_output_tiles_exist(tile, existing_output_tiles))
for tile in tiles
]
else: # pragma: no cover
return []


def _existing_tiles(
def _existing_output_tiles(
output_rows=None,
output_paths=None,
config=None,
Expand Down
3 changes: 3 additions & 0 deletions mapchete/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
from rasterio.errors import RasterioError

from mapchete.executor import Concurrency


# defaults sets according to the recommendations given at
# https://developmentseed.org/titiler/advanced/performance_tuning/
Expand Down Expand Up @@ -71,6 +73,7 @@ class IORetrySettings(BaseSettings):
class MapcheteOptions(BaseSettings):
# timeout granted when fetching future results or exceptions
future_timeout: int = 10
tiles_exist_concurrency: Concurrency = Concurrency.threads

# read from environment
model_config = SettingsConfigDict(env_prefix="MAPCHETE_")
Expand Down
41 changes: 28 additions & 13 deletions test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,17 +201,23 @@ def test_tiles_exist_local(example_mapchete):
output_tiles = list(mp.config.output_pyramid.tiles_from_bounds(bounds, zoom))

# see which files were written and create set for output_tiles and process_tiles
out_path = os.path.join(
SCRIPTDIR, example_mapchete.dict["output"]["path"], str(zoom)
)
written_output_tiles = set()
for root, dirs, files in os.walk(out_path):
for file in files:
zoom, row = map(int, root.split("/")[-2:])
col = int(file.split(".")[0])
for rowdir in (example_mapchete.output_path / zoom).ls():
for file in rowdir.ls():
zoom, row, col = map(int, file.without_suffix().elements[-3:])
written_output_tiles.add(mp.config.output_pyramid.tile(zoom, row, col))
written_process_tiles = set(
[mp.config.process_pyramid.intersecting(t)[0] for t in written_output_tiles]

full_process_tiles = set(
[
tile
for tile in process_tiles
if all(
[
output_tile in written_output_tiles
for output_tile in mp.config.output_pyramid.intersecting(tile)
]
)
]
)

# process tiles
Expand All @@ -224,7 +230,7 @@ def test_tiles_exist_local(example_mapchete):
existing.add(tile)
else:
not_existing.add(tile)
assert existing == written_process_tiles
assert existing == full_process_tiles
assert not_existing
assert set(process_tiles) == existing.union(not_existing)

Expand Down Expand Up @@ -267,8 +273,17 @@ def test_tiles_exist_s3(gtiff_s3):
for t in output_tiles:
if mp.config.output_reader.tiles_exist(output_tile=t):
written_output_tiles.add(t)
written_process_tiles = set(
[mp.config.process_pyramid.intersecting(t)[0] for t in written_output_tiles]
full_process_tiles = set(
[
tile
for tile in process_tiles
if all(
[
output_tile in written_output_tiles
for output_tile in mp.config.output_pyramid.intersecting(tile)
]
)
]
)

# process tiles
Expand All @@ -281,7 +296,7 @@ def test_tiles_exist_s3(gtiff_s3):
existing.add(tile)
else:
not_existing.add(tile)
assert existing == written_process_tiles
assert existing == full_process_tiles
assert set(process_tiles) == existing.union(not_existing)

# output tiles
Expand Down
22 changes: 22 additions & 0 deletions test/test_processing_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,28 @@ def test_update_overviews(overviews, concurrency, process_graph, dask_executor):
assert not np.array_equal(overview_before, overview_after)


def test_larger_process_tiles_than_output_tiles(cleantopo_br):
config = cleantopo_br.dict.copy()
config["output"].update(metatiling=1)
config["pyramid"].update(metatiling=2)
zoom = 5
with mapchete.open(config) as mp:
# there are tasks to be done
assert mp.tasks(zoom=zoom)

# execute tasks
list(mp.execute(zoom=zoom))

# there are no tasks
assert not mp.tasks(zoom=zoom)

# remove one output tile
(cleantopo_br.output_path / 5).ls()[0].ls()[0].rm()

# now there are tasks to be done again
assert mp.tasks(zoom=zoom)


def test_baselevels_buffer(baselevels):
"""Baselevel interpolation using buffers."""
config = baselevels.dict
Expand Down

0 comments on commit 0b2363f

Please sign in to comment.