From 4d59acabde4f73e36987abcac449bc832a598b3a Mon Sep 17 00:00:00 2001 From: Tom Close Date: Sat, 24 Feb 2024 22:10:48 +1100 Subject: [PATCH] implemented "with-timeout" plugin for testing the startup phase of long-running shell-command tasks --- pydra/engine/tests/test_submitter.py | 17 ++++++++++++++ pydra/engine/workers.py | 34 +++++++++++++++++++++++++--- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/pydra/engine/tests/test_submitter.py b/pydra/engine/tests/test_submitter.py index d65247e96a..75d94f6abc 100644 --- a/pydra/engine/tests/test_submitter.py +++ b/pydra/engine/tests/test_submitter.py @@ -17,6 +17,7 @@ from ... import mark from pathlib import Path from datetime import datetime +from ..task import ShellCommandTask @mark.task @@ -612,3 +613,19 @@ def alter_input(x): @mark.task def to_tuple(x, y): return (x, y) + + +def test_serial_with_untriggered_timeout(): + sleeper = ShellCommandTask(name="sleeper", executable=["sleep", "0.5"]) + with Submitter(plugin="with-timeout", timeout=1) as sub: + result = sleeper(submitter=sub) + + assert result.output.return_code == 0 + + +def test_serial_with_triggered_timeout(): + sleeper = ShellCommandTask(name="sleeper", executable=["sleep", "1.5"]) + with Submitter(plugin="with-timeout", timeout=1) as sub: + result = sleeper(submitter=sub) + + assert result is None diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 155a2800d9..7c301b5629 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -7,6 +7,7 @@ from tempfile import gettempdir from pathlib import Path from shutil import copyfile, which +from .specs import Result import concurrent.futures as cf @@ -130,6 +131,7 @@ class SerialWorker(Worker): def __init__(self, **kwargs): """Initialize worker.""" + super().__init__() logger.debug("Initialize SerialWorker") def run_el(self, interface, rerun=False, environment=None, **kwargs): @@ -150,9 +152,6 @@ async def fetch_finished(self, futures): await asyncio.gather(*futures) return set() - # async def fetch_finished(self, futures): - # return await asyncio.wait(futures) - class ConcurrentFuturesWorker(Worker): """A worker to execute in parallel using Python's concurrent futures.""" @@ -1039,12 +1038,41 @@ def close(self): pass +class WithTimeoutWorker(ConcurrentFuturesWorker): + """A worker used to test the start-up phase of long running tasks. Tasks are initiated + and run up until a specified timeout. + + If the task completes before the timeout then results are returned as normal, if not, + then None is returned instead""" + + def __init__(self, timeout=10, **kwargs): + """Initialize Worker.""" + super().__init__(n_procs=1) + self.timeout = timeout + # self.loop = asyncio.get_event_loop() + logger.debug("Initialize worker with a timeout") + + def run_el(self, runnable, rerun=False, **kwargs): + """Run a task.""" + return self.exec_with_timeout(runnable, rerun=rerun) + + async def exec_with_timeout(self, runnable, rerun=False): + try: + result = await asyncio.wait_for( + self.exec_as_coro(runnable, rerun=rerun), timeout=self.timeout + ) + except asyncio.TimeoutError: + result = Result(output=None, runtime=None, errored=False) + return result + + WORKERS = { "serial": SerialWorker, "cf": ConcurrentFuturesWorker, "slurm": SlurmWorker, "dask": DaskWorker, "sge": SGEWorker, + "with-timeout": WithTimeoutWorker, **{ "psij-" + subtype: lambda subtype=subtype: PsijWorker(subtype=subtype) for subtype in ["local", "slurm"]