Skip to content

Commit

Permalink
Merge pull request #396 from ungarj/release_futures
Browse files Browse the repository at this point in the history
explicitly release dask futures
  • Loading branch information
ungarj authored Dec 14, 2021
2 parents 0d14aea + 9c76804 commit 97a47ca
Showing 1 changed file with 45 additions and 3 deletions.
48 changes: 45 additions & 3 deletions mapchete/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ def as_completed(
if max_submitted_tasks and (
len(self.running_futures) >= max_submitted_tasks
):
future = next(self._as_completed(self.running_futures))
yield self._raise_future_exception(future)
yield self._raise_future_exception(
next(self._as_completed(self.running_futures))
)

logger.debug("%s tasks submitted in %s", len(self.running_futures), timer)

Expand Down Expand Up @@ -155,11 +156,22 @@ def _wait(self, *args, **kwargs): # pragma: no cover
raise NotImplementedError()

def _raise_future_exception(self, future):
"""
Release future from cluster explicitly and wrap result around FinishedFuture object.
"""
self.running_futures.remove(future)
if future.exception(): # pragma: no cover
logger.debug("exception caught in future %s", future)
raise future.exception()
return future
# create minimal Future-like object with no references to the cluster
finished_future = FinishedFuture(future)
# explicitly release future
try:
future.release()
# logger.debug("%s released", future)
except AttributeError:
pass
return finished_future

@cached_property
def _executor(self):
Expand Down Expand Up @@ -565,3 +577,33 @@ def exception(self): # pragma: no cover
def cancelled(self): # pragma: no cover
"""Nothing to cancel here."""
return False


class FinishedFuture:
"""Wrapper class to mimick future interface."""

def __init__(self, future):
"""Set attributes."""
try:
self._result, self._exception = future.result(), None
except Exception as e: # pragma: no cover
self._result, self._exception = None, e

def result(self):
"""Return task result."""
if self._exception: # pragma: no cover
logger.exception(self._exception)
raise self._exception
return self._result

def exception(self): # pragma: no cover
"""Raise task exception if any."""
return self._exception

def cancelled(self): # pragma: no cover
"""Sequential futures cannot be cancelled."""
return False

def __repr__(self): # pragma: no cover
"""Return string representation."""
return f"FinishedFuture(result={self._result}, exception={self._exception})"

0 comments on commit 97a47ca

Please sign in to comment.