Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented "with-timeout" plugin for testing the startup phase of lo… #732

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pydra/engine/tests/test_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ... import mark
from pathlib import Path
from datetime import datetime
from ..task import ShellCommandTask


@mark.task
Expand Down Expand Up @@ -612,3 +613,19 @@ def alter_input(x):
@mark.task
def to_tuple(x, y):
return (x, y)


def test_serial_with_untriggered_timeout():
sleeper = ShellCommandTask(name="sleeper", executable=["sleep", "0.5"])
with Submitter(plugin="with-timeout", timeout=1) as sub:
result = sleeper(submitter=sub)

assert result.output.return_code == 0


def test_serial_with_triggered_timeout():
sleeper = ShellCommandTask(name="sleeper", executable=["sleep", "1.5"])
with Submitter(plugin="with-timeout", timeout=1) as sub:
result = sleeper(submitter=sub)

assert result is None
34 changes: 31 additions & 3 deletions pydra/engine/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tempfile import gettempdir
from pathlib import Path
from shutil import copyfile, which
from .specs import Result

import concurrent.futures as cf

Expand Down Expand Up @@ -130,6 +131,7 @@

def __init__(self, **kwargs):
"""Initialize worker."""
super().__init__()
logger.debug("Initialize SerialWorker")

def run_el(self, interface, rerun=False, environment=None, **kwargs):
Expand All @@ -150,9 +152,6 @@
await asyncio.gather(*futures)
return set()

# async def fetch_finished(self, futures):
# return await asyncio.wait(futures)


class ConcurrentFuturesWorker(Worker):
"""A worker to execute in parallel using Python's concurrent futures."""
Expand Down Expand Up @@ -1039,12 +1038,41 @@
pass


class WithTimeoutWorker(ConcurrentFuturesWorker):
"""A worker used to test the start-up phase of long running tasks. Tasks are initiated
and run up until a specified timeout.

If the task completes before the timeout then results are returned as normal, if not,
then None is returned instead"""

def __init__(self, timeout=10, **kwargs):
"""Initialize Worker."""
super().__init__(n_procs=1)
self.timeout = timeout

Check warning on line 1051 in pydra/engine/workers.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/workers.py#L1050-L1051

Added lines #L1050 - L1051 were not covered by tests
# self.loop = asyncio.get_event_loop()
logger.debug("Initialize worker with a timeout")

Check warning on line 1053 in pydra/engine/workers.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/workers.py#L1053

Added line #L1053 was not covered by tests

def run_el(self, runnable, rerun=False, **kwargs):
"""Run a task."""
return self.exec_with_timeout(runnable, rerun=rerun)

Check warning on line 1057 in pydra/engine/workers.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/workers.py#L1057

Added line #L1057 was not covered by tests

async def exec_with_timeout(self, runnable, rerun=False):
try:
result = await asyncio.wait_for(

Check warning on line 1061 in pydra/engine/workers.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/workers.py#L1060-L1061

Added lines #L1060 - L1061 were not covered by tests
self.exec_as_coro(runnable, rerun=rerun), timeout=self.timeout
)
except asyncio.TimeoutError:
result = Result(output=None, runtime=None, errored=False)
return result

Check warning on line 1066 in pydra/engine/workers.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/workers.py#L1064-L1066

Added lines #L1064 - L1066 were not covered by tests


WORKERS = {
"serial": SerialWorker,
"cf": ConcurrentFuturesWorker,
"slurm": SlurmWorker,
"dask": DaskWorker,
"sge": SGEWorker,
"with-timeout": WithTimeoutWorker,
**{
"psij-" + subtype: lambda subtype=subtype: PsijWorker(subtype=subtype)
for subtype in ["local", "slurm"]
Expand Down
Loading