diff --git a/docs/examples/hello_world.yaml b/docs/examples/hello_world.yaml index 7da8277..cdade40 100644 --- a/docs/examples/hello_world.yaml +++ b/docs/examples/hello_world.yaml @@ -9,3 +9,8 @@ tasks: kwargs: bash_command: touch arguments: ["/tmp/date.txt"] + - name: printPython + type: "fasthep_flow.operators.PythonOperator" + kwargs: + python_callable: print + arguments: ["Hello World!"] diff --git a/src/fasthep_flow/operators/__init__.py b/src/fasthep_flow/operators/__init__.py index 6af9d94..f53dc7a 100644 --- a/src/fasthep_flow/operators/__init__.py +++ b/src/fasthep_flow/operators/__init__.py @@ -1,8 +1,10 @@ """Module for defining basic operators.""" + from __future__ import annotations from .base import Operator from .bash import BashOperator, LocalBashOperator +from .py_call import PythonOperator # only Operator is exposed to the user, everything else is imported directly by the workflow -__all__ = ["BashOperator", "LocalBashOperator", "Operator"] +__all__ = ["BashOperator", "LocalBashOperator", "Operator", "PythonOperator"] diff --git a/src/fasthep_flow/operators/base.py b/src/fasthep_flow/operators/base.py index 7ef62b1..14a7673 100644 --- a/src/fasthep_flow/operators/base.py +++ b/src/fasthep_flow/operators/base.py @@ -1,6 +1,8 @@ """Definition of the Operator protocol.""" + from __future__ import annotations +from dataclasses import dataclass from typing import Any, Protocol @@ -18,3 +20,22 @@ def __repr__(self) -> str: def configure(self, **kwargs: Any) -> None: """General function to configure the operator.""" + + +@dataclass +class ResultType: + """The result type of an operator. Can add validation here if needed.""" + + result: Any + stdout: str + stderr: str + exit_code: int + + def to_dict(self) -> dict[str, Any]: + """Convert the ResultType to a dictionary.""" + return { + "result": self.result, + "stdout": self.stdout, + "stderr": self.stderr, + "exit_code": self.exit_code, + } diff --git a/src/fasthep_flow/operators/bash.py b/src/fasthep_flow/operators/bash.py index ecdcf8c..c432eea 100644 --- a/src/fasthep_flow/operators/bash.py +++ b/src/fasthep_flow/operators/bash.py @@ -3,7 +3,7 @@ from typing import Any -from .base import Operator +from .base import Operator, ResultType try: # try to import plumbum @@ -31,7 +31,9 @@ def configure(self, **kwargs: Any) -> None: def __call__(self, **kwargs: Any) -> dict[str, Any]: command = plumbum.local[self.bash_command] exit_code, stdout, stderr = command.run(*self.arguments) - return {"stdout": stdout, "stderr": stderr, "exit_code": exit_code} + return ResultType( + result=None, stdout=stdout, stderr=stderr, exit_code=exit_code + ).to_dict() def __repr__(self) -> str: return f'LocalBashOperator(bash_command="{self.bash_command}", arguments={self.arguments})' diff --git a/src/fasthep_flow/operators/py_call.py b/src/fasthep_flow/operators/py_call.py new file mode 100644 index 0000000..d3ba98b --- /dev/null +++ b/src/fasthep_flow/operators/py_call.py @@ -0,0 +1,40 @@ +"""Python related operators.""" + +from __future__ import annotations + +import io +from collections.abc import Callable +from contextlib import redirect_stderr, redirect_stdout +from typing import Any + +from .base import Operator, ResultType + + +class PythonOperator(Operator): + """A Python operator. This operator wraps a Python callable.""" + + python_callable: Callable[..., Any] + arguments: list[Any] + + def __init__(self, **kwargs: Any): + self.configure(**kwargs) + + def configure(self, **kwargs: Any) -> None: + """Configure the operator.""" + self.python_callable = kwargs.pop("python_callable") + self.arguments = kwargs.pop("arguments") + + def __call__(self, **kwargs: Any) -> dict[str, Any]: + stdout, stderr = io.StringIO(), io.StringIO() + with redirect_stdout(stdout), redirect_stderr(stderr): + result = self.python_callable(*self.arguments) + result = self.python_callable(*self.arguments) + return ResultType( + result=result, + stdout=stdout.getvalue(), + stderr=stderr.getvalue(), + exit_code=0, + ).to_dict() + + def __repr__(self) -> str: + return f"PythonOperator(python_callable={self.python_callable}, arguments={self.arguments})"