From 20969cfef3082631a2e17673c42b36f47b0c9f0b Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Fri, 1 Oct 2021 10:58:28 +0200 Subject: [PATCH 1/3] make sure Executor.running_futures is cleared after earh as_completed() and cancel() call --- mapchete/_executor.py | 5 +++++ test/test_executor.py | 7 ++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/mapchete/_executor.py b/mapchete/_executor.py index d98b86b0..0a2e2816 100644 --- a/mapchete/_executor.py +++ b/mapchete/_executor.py @@ -84,6 +84,9 @@ def as_completed(self, func, iterable, fargs=None, fkwargs=None, chunks=100): yield _raise_future_exception(future) except CancelledError: # pragma: no cover return + finally: + # reset so futures won't linger here for next call + self.running_futures = set() def _finished_futures(self): done = set() @@ -105,6 +108,8 @@ def cancel(self): future.cancel() logger.debug(f"{len(self.running_futures)} futures cancelled") self.wait() + # reset so futures won't linger here for next call + self.running_futures = set() def wait(self): logger.debug("wait for running futures to finish...") diff --git a/test/test_executor.py b/test/test_executor.py index dcd352b3..92c1677f 100644 --- a/test/test_executor.py +++ b/test/test_executor.py @@ -45,6 +45,7 @@ def test_concurrent_futures_processes_executor_as_completed(): for future in executor.as_completed(_dummy_process, range(items)): count += 1 assert future.result() + assert not executor.running_futures def test_concurrent_futures_processes_executor_cancel_as_completed(): @@ -58,7 +59,7 @@ def test_concurrent_futures_processes_executor_cancel_as_completed(): executor.cancel() break - assert any([future.cancelled() for future in executor.running_futures]) + assert not executor.running_futures def test_concurrent_futures_processes_executor_map(): @@ -79,7 +80,7 @@ def test_concurrent_futures_threads_executor_as_completed(): executor.cancel() break - assert any([future.cancelled() for future in executor.running_futures]) + assert not executor.running_futures def test_concurrent_futures_threads_executor_map(): @@ -100,7 +101,7 @@ def test_dask_executor_as_completed(): executor.cancel() break - assert any([future.cancelled() for future in executor.running_futures]) + assert not executor.running_futures def test_concurrent_futures_dask_executor_map(): From 44606eabff94a0ca6c38637054ea578883e9a86b Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Fri, 1 Oct 2021 10:58:55 +0200 Subject: [PATCH 2/3] skip COG tests if driver is not available in GDAL --- test/test_formats_geotiff.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/test/test_formats_geotiff.py b/test/test_formats_geotiff.py index 79048112..b92548f1 100644 --- a/test/test_formats_geotiff.py +++ b/test/test_formats_geotiff.py @@ -18,6 +18,14 @@ from mapchete.tile import BufferedTilePyramid +def _gdal_cog_available(): + with rasterio.Env() as env: + return "COG" in env.drivers() + + +GDAL_COG_AVAILABLE = _gdal_cog_available() + + def test_output_data(mp_tmpdir): """Check GeoTIFF as output data.""" output_params = dict( @@ -395,6 +403,9 @@ def test_output_single_gtiff_s3_tempfile(output_single_gtiff, mp_s3_tmpdir): assert path_exists(mp.config.output.path) +@pytest.mark.skipif( + not GDAL_COG_AVAILABLE, reason="GDAL>=3.1 with COG driver is required" +) def test_output_single_gtiff_cog(output_single_gtiff_cog): tile_id = (5, 3, 7) with mapchete.open(output_single_gtiff_cog.dict) as mp: @@ -423,6 +434,9 @@ def test_output_single_gtiff_cog(output_single_gtiff_cog): assert cog_validate(mp.config.output.path, strict=True) +@pytest.mark.skipif( + not GDAL_COG_AVAILABLE, reason="GDAL>=3.1 with COG driver is required" +) def test_output_single_gtiff_cog_tempfile(output_single_gtiff_cog): tile_id = (5, 3, 7) with mapchete.open( @@ -457,6 +471,9 @@ def test_output_single_gtiff_cog_tempfile(output_single_gtiff_cog): @pytest.mark.remote +@pytest.mark.skipif( + not GDAL_COG_AVAILABLE, reason="GDAL>=3.1 with COG driver is required" +) def test_output_single_gtiff_cog_s3(output_single_gtiff_cog, mp_s3_tmpdir): tile_id = (5, 3, 7) with mapchete.open( From 0d2273fe0ceec4520533a75557bd97bda6ab7b48 Mon Sep 17 00:00:00 2001 From: Joachim Ungar Date: Fri, 1 Oct 2021 11:25:21 +0200 Subject: [PATCH 3/3] fix fargs and fkwargs ommission --- mapchete/_executor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mapchete/_executor.py b/mapchete/_executor.py index 0a2e2816..b2ec5cb1 100644 --- a/mapchete/_executor.py +++ b/mapchete/_executor.py @@ -99,7 +99,7 @@ def _finished_futures(self): self.running_futures.difference_update(done) def map(self, func, iterable, fargs=None, fkwargs=None): - return self._map(func, iterable, fargs=None, fkwargs=None) + return self._map(func, iterable, fargs=fargs, fkwargs=fkwargs) def cancel(self): self.cancelled = True @@ -109,6 +109,7 @@ def cancel(self): logger.debug(f"{len(self.running_futures)} futures cancelled") self.wait() # reset so futures won't linger here for next call + self.running_futures = set() def wait(self):