From 97036cbb52e4f4728f2e2160bd55f0af60f52bb3 Mon Sep 17 00:00:00 2001 From: kreczko Date: Tue, 15 Oct 2024 16:49:39 +0100 Subject: [PATCH 1/3] feat: add simple PythonOperator --- docs/examples/hello_world.yaml | 5 ++++ src/fasthep_flow/operators/__init__.py | 4 ++- src/fasthep_flow/operators/py_call.py | 36 ++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 src/fasthep_flow/operators/py_call.py diff --git a/docs/examples/hello_world.yaml b/docs/examples/hello_world.yaml index 7da8277..cdade40 100644 --- a/docs/examples/hello_world.yaml +++ b/docs/examples/hello_world.yaml @@ -9,3 +9,8 @@ tasks: kwargs: bash_command: touch arguments: ["/tmp/date.txt"] + - name: printPython + type: "fasthep_flow.operators.PythonOperator" + kwargs: + python_callable: print + arguments: ["Hello World!"] diff --git a/src/fasthep_flow/operators/__init__.py b/src/fasthep_flow/operators/__init__.py index 6af9d94..f53dc7a 100644 --- a/src/fasthep_flow/operators/__init__.py +++ b/src/fasthep_flow/operators/__init__.py @@ -1,8 +1,10 @@ """Module for defining basic operators.""" + from __future__ import annotations from .base import Operator from .bash import BashOperator, LocalBashOperator +from .py_call import PythonOperator # only Operator is exposed to the user, everything else is imported directly by the workflow -__all__ = ["BashOperator", "LocalBashOperator", "Operator"] +__all__ = ["BashOperator", "LocalBashOperator", "Operator", "PythonOperator"] diff --git a/src/fasthep_flow/operators/py_call.py b/src/fasthep_flow/operators/py_call.py new file mode 100644 index 0000000..6811fc9 --- /dev/null +++ b/src/fasthep_flow/operators/py_call.py @@ -0,0 +1,36 @@ +"""Python related operators.""" +from __future__ import annotations + +import io +from collections.abc import Callable +from contextlib import redirect_stderr, redirect_stdout +from typing import Any + +from .base import Operator + + +class PythonOperator(Operator): + python_callable: Callable[..., Any] + arguments: list[Any] + + def __init__(self, **kwargs: Any): + self.configure(**kwargs) + + def configure(self, **kwargs: Any) -> None: + """Configure the operator.""" + self.python_callable = kwargs.pop("python_callable") + self.arguments = kwargs.pop("arguments") + + def __call__(self, **kwargs: Any) -> dict[str, Any]: + stdout, stderr = io.StringIO(), io.StringIO() + with redirect_stdout(stdout), redirect_stderr(stderr): + result = self.python_callable(*self.arguments) + result = self.python_callable(*self.arguments) + return { + "result": result, + "stdout": stdout.getvalue(), + "stderr": stderr.getvalue(), + } + + def __repr__(self) -> str: + return f"PythonOperator(python_callable={self.python_callable}, arguments={self.arguments})" From 8af41f4b222db21d1678fc4b85d83494e8edadfb Mon Sep 17 00:00:00 2001 From: kreczko Date: Tue, 15 Oct 2024 16:57:36 +0100 Subject: [PATCH 2/3] feat: add ResultType for validation of operator output --- src/fasthep_flow/operators/base.py | 20 ++++++++++++++++++++ src/fasthep_flow/operators/bash.py | 6 ++++-- src/fasthep_flow/operators/py_call.py | 14 ++++++++------ 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/fasthep_flow/operators/base.py b/src/fasthep_flow/operators/base.py index 7ef62b1..b71fe17 100644 --- a/src/fasthep_flow/operators/base.py +++ b/src/fasthep_flow/operators/base.py @@ -1,6 +1,8 @@ """Definition of the Operator protocol.""" + from __future__ import annotations +from dataclasses import dataclass from typing import Any, Protocol @@ -18,3 +20,21 @@ def __repr__(self) -> str: def configure(self, **kwargs: Any) -> None: """General function to configure the operator.""" + + +@dataclass +class ResultType: + """The result type of an operator. Can add validation here if needed.""" + + result: Any + stdout: str + stderr: str + exit_code: int + + def to_dict(self) -> dict[str, Any]: + return { + "result": self.result, + "stdout": self.stdout, + "stderr": self.stderr, + "exit_code": self.exit_code, + } diff --git a/src/fasthep_flow/operators/bash.py b/src/fasthep_flow/operators/bash.py index ecdcf8c..c432eea 100644 --- a/src/fasthep_flow/operators/bash.py +++ b/src/fasthep_flow/operators/bash.py @@ -3,7 +3,7 @@ from typing import Any -from .base import Operator +from .base import Operator, ResultType try: # try to import plumbum @@ -31,7 +31,9 @@ def configure(self, **kwargs: Any) -> None: def __call__(self, **kwargs: Any) -> dict[str, Any]: command = plumbum.local[self.bash_command] exit_code, stdout, stderr = command.run(*self.arguments) - return {"stdout": stdout, "stderr": stderr, "exit_code": exit_code} + return ResultType( + result=None, stdout=stdout, stderr=stderr, exit_code=exit_code + ).to_dict() def __repr__(self) -> str: return f'LocalBashOperator(bash_command="{self.bash_command}", arguments={self.arguments})' diff --git a/src/fasthep_flow/operators/py_call.py b/src/fasthep_flow/operators/py_call.py index 6811fc9..dcc8a18 100644 --- a/src/fasthep_flow/operators/py_call.py +++ b/src/fasthep_flow/operators/py_call.py @@ -1,4 +1,5 @@ """Python related operators.""" + from __future__ import annotations import io @@ -6,7 +7,7 @@ from contextlib import redirect_stderr, redirect_stdout from typing import Any -from .base import Operator +from .base import Operator, ResultType class PythonOperator(Operator): @@ -26,11 +27,12 @@ def __call__(self, **kwargs: Any) -> dict[str, Any]: with redirect_stdout(stdout), redirect_stderr(stderr): result = self.python_callable(*self.arguments) result = self.python_callable(*self.arguments) - return { - "result": result, - "stdout": stdout.getvalue(), - "stderr": stderr.getvalue(), - } + return ResultType( + result=result, + stdout=stdout.getvalue(), + stderr=stderr.getvalue(), + exit_code=0, + ).to_dict() def __repr__(self) -> str: return f"PythonOperator(python_callable={self.python_callable}, arguments={self.arguments})" From 1415cb4a897bc61314efe9cb2be6f8abbc069741 Mon Sep 17 00:00:00 2001 From: kreczko Date: Tue, 15 Oct 2024 17:04:03 +0100 Subject: [PATCH 3/3] fix: pylint errors --- src/fasthep_flow/operators/base.py | 1 + src/fasthep_flow/operators/py_call.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/fasthep_flow/operators/base.py b/src/fasthep_flow/operators/base.py index b71fe17..14a7673 100644 --- a/src/fasthep_flow/operators/base.py +++ b/src/fasthep_flow/operators/base.py @@ -32,6 +32,7 @@ class ResultType: exit_code: int def to_dict(self) -> dict[str, Any]: + """Convert the ResultType to a dictionary.""" return { "result": self.result, "stdout": self.stdout, diff --git a/src/fasthep_flow/operators/py_call.py b/src/fasthep_flow/operators/py_call.py index dcc8a18..d3ba98b 100644 --- a/src/fasthep_flow/operators/py_call.py +++ b/src/fasthep_flow/operators/py_call.py @@ -11,6 +11,8 @@ class PythonOperator(Operator): + """A Python operator. This operator wraps a Python callable.""" + python_callable: Callable[..., Any] arguments: list[Any]