diff --git a/mapchete/_executor.py b/mapchete/_executor.py index 9ee36a84..d5164bf1 100644 --- a/mapchete/_executor.py +++ b/mapchete/_executor.py @@ -102,8 +102,9 @@ def as_completed( if max_submitted_tasks and ( len(self.running_futures) >= max_submitted_tasks ): - future = next(self._as_completed(self.running_futures)) - yield self._raise_future_exception(future) + yield self._raise_future_exception( + next(self._as_completed(self.running_futures)) + ) logger.debug("%s tasks submitted in %s", len(self.running_futures), timer) @@ -155,11 +156,22 @@ def _wait(self, *args, **kwargs): # pragma: no cover raise NotImplementedError() def _raise_future_exception(self, future): + """ + Release future from cluster explicitly and wrap result around FinishedFuture object. + """ self.running_futures.remove(future) if future.exception(): # pragma: no cover logger.debug("exception caught in future %s", future) raise future.exception() - return future + # create minimal Future-like object with no references to the cluster + finished_future = FinishedFuture(future) + # explicitly release future + try: + future.release() + # logger.debug("%s released", future) + except AttributeError: + pass + return finished_future @cached_property def _executor(self): @@ -565,3 +577,33 @@ def exception(self): # pragma: no cover def cancelled(self): # pragma: no cover """Nothing to cancel here.""" return False + + +class FinishedFuture: + """Wrapper class to mimick future interface.""" + + def __init__(self, future): + """Set attributes.""" + try: + self._result, self._exception = future.result(), None + except Exception as e: # pragma: no cover + self._result, self._exception = None, e + + def result(self): + """Return task result.""" + if self._exception: # pragma: no cover + logger.exception(self._exception) + raise self._exception + return self._result + + def exception(self): # pragma: no cover + """Raise task exception if any.""" + return self._exception + + def cancelled(self): # pragma: no cover + """Sequential futures cannot be cancelled.""" + return False + + def __repr__(self): # pragma: no cover + """Return string representation.""" + return f"FinishedFuture(result={self._result}, exception={self._exception})"