Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PythonOperator #35

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/examples/hello_world.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ tasks:
kwargs:
bash_command: touch
arguments: ["/tmp/date.txt"]
- name: printPython
type: "fasthep_flow.operators.PythonOperator"
kwargs:
python_callable: print
arguments: ["Hello World!"]
4 changes: 3 additions & 1 deletion src/fasthep_flow/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Module for defining basic operators."""

from __future__ import annotations

from .base import Operator
from .bash import BashOperator, LocalBashOperator
from .py_call import PythonOperator

# only Operator is exposed to the user, everything else is imported directly by the workflow
__all__ = ["BashOperator", "LocalBashOperator", "Operator"]
__all__ = ["BashOperator", "LocalBashOperator", "Operator", "PythonOperator"]
21 changes: 21 additions & 0 deletions src/fasthep_flow/operators/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Definition of the Operator protocol."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Protocol


Expand All @@ -18,3 +20,22 @@ def __repr__(self) -> str:

def configure(self, **kwargs: Any) -> None:
"""General function to configure the operator."""


@dataclass
class ResultType:
"""The result type of an operator. Can add validation here if needed."""

result: Any
stdout: str
stderr: str
exit_code: int

def to_dict(self) -> dict[str, Any]:
"""Convert the ResultType to a dictionary."""
return {
"result": self.result,
"stdout": self.stdout,
"stderr": self.stderr,
"exit_code": self.exit_code,
}
6 changes: 4 additions & 2 deletions src/fasthep_flow/operators/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing import Any

from .base import Operator
from .base import Operator, ResultType

try:
# try to import plumbum
Expand Down Expand Up @@ -31,7 +31,9 @@ def configure(self, **kwargs: Any) -> None:
def __call__(self, **kwargs: Any) -> dict[str, Any]:
command = plumbum.local[self.bash_command]
exit_code, stdout, stderr = command.run(*self.arguments)
return {"stdout": stdout, "stderr": stderr, "exit_code": exit_code}
return ResultType(
result=None, stdout=stdout, stderr=stderr, exit_code=exit_code
).to_dict()

def __repr__(self) -> str:
return f'LocalBashOperator(bash_command="{self.bash_command}", arguments={self.arguments})'
Expand Down
40 changes: 40 additions & 0 deletions src/fasthep_flow/operators/py_call.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Python related operators."""

from __future__ import annotations

import io
from collections.abc import Callable
from contextlib import redirect_stderr, redirect_stdout
from typing import Any

from .base import Operator, ResultType


class PythonOperator(Operator):
"""A Python operator. This operator wraps a Python callable."""

python_callable: Callable[..., Any]
arguments: list[Any]

def __init__(self, **kwargs: Any):
self.configure(**kwargs)

def configure(self, **kwargs: Any) -> None:
"""Configure the operator."""
self.python_callable = kwargs.pop("python_callable")
self.arguments = kwargs.pop("arguments")

def __call__(self, **kwargs: Any) -> dict[str, Any]:
stdout, stderr = io.StringIO(), io.StringIO()
with redirect_stdout(stdout), redirect_stderr(stderr):
result = self.python_callable(*self.arguments)
result = self.python_callable(*self.arguments)
return ResultType(
result=result,
stdout=stdout.getvalue(),
stderr=stderr.getvalue(),
exit_code=0,
).to_dict()

def __repr__(self) -> str:
return f"PythonOperator(python_callable={self.python_callable}, arguments={self.arguments})"
Loading