Skip to content

Commit

Permalink
Merge pull request #364 from ungarj/fix_loitering_futures
Browse files Browse the repository at this point in the history
loitering futures & COG tests fix
  • Loading branch information
ungarj authored Oct 1, 2021
2 parents f6992a7 + 0d2273f commit 126997e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
8 changes: 7 additions & 1 deletion mapchete/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def as_completed(self, func, iterable, fargs=None, fkwargs=None, chunks=100):
yield _raise_future_exception(future)
except CancelledError: # pragma: no cover
return
finally:
# reset so futures won't linger here for next call
self.running_futures = set()

def _finished_futures(self):
done = set()
Expand All @@ -96,7 +99,7 @@ def _finished_futures(self):
self.running_futures.difference_update(done)

def map(self, func, iterable, fargs=None, fkwargs=None):
return self._map(func, iterable, fargs=None, fkwargs=None)
return self._map(func, iterable, fargs=fargs, fkwargs=fkwargs)

def cancel(self):
self.cancelled = True
Expand All @@ -105,6 +108,9 @@ def cancel(self):
future.cancel()
logger.debug(f"{len(self.running_futures)} futures cancelled")
self.wait()
# reset so futures won't linger here for next call

self.running_futures = set()

def wait(self):
logger.debug("wait for running futures to finish...")
Expand Down
7 changes: 4 additions & 3 deletions test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def test_concurrent_futures_processes_executor_as_completed():
for future in executor.as_completed(_dummy_process, range(items)):
count += 1
assert future.result()
assert not executor.running_futures


def test_concurrent_futures_processes_executor_cancel_as_completed():
Expand All @@ -58,7 +59,7 @@ def test_concurrent_futures_processes_executor_cancel_as_completed():
executor.cancel()
break

assert any([future.cancelled() for future in executor.running_futures])
assert not executor.running_futures


def test_concurrent_futures_processes_executor_map():
Expand All @@ -79,7 +80,7 @@ def test_concurrent_futures_threads_executor_as_completed():
executor.cancel()
break

assert any([future.cancelled() for future in executor.running_futures])
assert not executor.running_futures


def test_concurrent_futures_threads_executor_map():
Expand All @@ -100,7 +101,7 @@ def test_dask_executor_as_completed():
executor.cancel()
break

assert any([future.cancelled() for future in executor.running_futures])
assert not executor.running_futures


def test_concurrent_futures_dask_executor_map():
Expand Down
17 changes: 17 additions & 0 deletions test/test_formats_geotiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
from mapchete.tile import BufferedTilePyramid


def _gdal_cog_available():
with rasterio.Env() as env:
return "COG" in env.drivers()


GDAL_COG_AVAILABLE = _gdal_cog_available()


def test_output_data(mp_tmpdir):
"""Check GeoTIFF as output data."""
output_params = dict(
Expand Down Expand Up @@ -395,6 +403,9 @@ def test_output_single_gtiff_s3_tempfile(output_single_gtiff, mp_s3_tmpdir):
assert path_exists(mp.config.output.path)


@pytest.mark.skipif(
not GDAL_COG_AVAILABLE, reason="GDAL>=3.1 with COG driver is required"
)
def test_output_single_gtiff_cog(output_single_gtiff_cog):
tile_id = (5, 3, 7)
with mapchete.open(output_single_gtiff_cog.dict) as mp:
Expand Down Expand Up @@ -423,6 +434,9 @@ def test_output_single_gtiff_cog(output_single_gtiff_cog):
assert cog_validate(mp.config.output.path, strict=True)


@pytest.mark.skipif(
not GDAL_COG_AVAILABLE, reason="GDAL>=3.1 with COG driver is required"
)
def test_output_single_gtiff_cog_tempfile(output_single_gtiff_cog):
tile_id = (5, 3, 7)
with mapchete.open(
Expand Down Expand Up @@ -457,6 +471,9 @@ def test_output_single_gtiff_cog_tempfile(output_single_gtiff_cog):


@pytest.mark.remote
@pytest.mark.skipif(
not GDAL_COG_AVAILABLE, reason="GDAL>=3.1 with COG driver is required"
)
def test_output_single_gtiff_cog_s3(output_single_gtiff_cog, mp_s3_tmpdir):
tile_id = (5, 3, 7)
with mapchete.open(
Expand Down

0 comments on commit 126997e

Please sign in to comment.