Skip to content

Commit

Permalink
Add Pydantic V1 IO models for use with Hera Runner (#920)
Browse files Browse the repository at this point in the history
**Pull Request Checklist**
- [x] Part of #858 
- [x] Tests added
- [ ] ~Documentation/examples added~ See #939 
- [x] [Good commit messages](https://cbea.ms/git-commit/) and/or PR
title

**Description of PR**
Currently hera i/o with annotated params can become extremely verbose.
The output syntax is especially error-prone.

This PR introduces custom Input/Output BaseModels for users to subclass,
which allow a cleaner arrangement of inputs and outputs for functions.
These are available under the `script_pydantic_io` experimental feature
flag.

With these Pydantic input/output models, the following should be noted:
* duplicated param names (for normal Parameters as well as the new
models) are now detected in Hera rather than when linted by Argo (as
well as duplicated artifact names). Parameters and Artifacts having the
same name is legal in the Argo spec as they exist in different scopes
e.g.
```yaml
...
      inputs:
        parameters:
          - name: my-name
            default: test
        artifacts:
          - name: my-name
            path: /tmp
            optional: true
...
```
* `exit_code` and `result` are reserved attributes for the
`RunnerOutput`. A user trying to use their own parameters with these
names would have to be specified with an annotated parameter e.g.
`my_exit_code: Annotated[int, Parameter(name="exit_code")]` (TBC with a
test)
* Scripts cannot have a return tuple containing any `RunnerOutput` to
avoid multiple `exit_code`s being specified. @samj1912 / @flaviuvadan
this is up for debate but I think would encourage better practices to
discourage tuples and have a single script template outputting a single
`RunnerOutput` subclass, and it keeps the logic clearer from the Hera
side. Users can still use inline output parameters alongside the
`RunnerOutput` return annotation
* Multiple input parameters when using a `RunnerInput` in the function
params **is not legal**
* A `RunnerInput`'s `__fields__` as defined by pydantic are used to
"explode" the input class into constituent parameters for the Argo spec.
i.e. using the following class as an input param to a script function:
```py
class MyInput(RunnerInput):
     my_input_str: str
     my_input_int: int

@script(constructor="runner")
def my_func(my_input: MyInput):
    ...
```
will create the script template `my_func` in yaml with Parameters
`my_input_str` and `my_input_int`, NOT `my_input`, [see the
example](https://github.com/argoproj-labs/hera/blob/92f11d341eb29d2501b9ee5be57a703160b35e24/docs/examples/workflows/experimental/script_pydantic_io.md)

---------

Signed-off-by: Elliot Gunton <[email protected]>
  • Loading branch information
elliotgunton authored Jan 30, 2024
1 parent d267da3 commit 7998f2e
Show file tree
Hide file tree
Showing 14 changed files with 1,280 additions and 34 deletions.
103 changes: 103 additions & 0 deletions docs/examples/workflows/experimental/script_pydantic_io.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Script Pydantic Io






=== "Hera"

```python linenums="1"
try:
from pydantic.v1 import BaseModel
except ImportError:
from pydantic import BaseModel

from hera.shared import global_config
from hera.workflows import Artifact, ArtifactLoader, Parameter, Workflow, script
from hera.workflows.io import RunnerInput, RunnerOutput

try:
from typing import Annotated # type: ignore
except ImportError:
from typing_extensions import Annotated # type: ignore

global_config.experimental_features["script_annotations"] = True
global_config.experimental_features["script_pydantic_io"] = True


class MyObject(BaseModel):
a_dict: dict = {}
a_str: str = "a default string"


class MyInput(RunnerInput):
param_int: Annotated[int, Parameter(name="param-input")] = 42
an_object: Annotated[MyObject, Parameter(name="obj-input")] = MyObject(
a_dict={"my-key": "a-value"}, a_str="hello world!"
)
artifact_int: Annotated[int, Artifact(name="artifact-input", loader=ArtifactLoader.json)]


class MyOutput(RunnerOutput):
param_int: Annotated[int, Parameter(name="param-output")]
artifact_int: Annotated[int, Artifact(name="artifact-output")]


@script(constructor="runner")
def pydantic_io(
my_input: MyInput,
) -> MyOutput:
return MyOutput(exit_code=1, result="Test!", param_int=42, artifact_int=my_input.param_int)


with Workflow(generate_name="pydantic-io-") as w:
pydantic_io()
```

=== "YAML"

```yaml linenums="1"
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pydantic-io-
spec:
templates:
- inputs:
artifacts:
- name: artifact-input
path: /tmp/hera-inputs/artifacts/artifact-input
parameters:
- default: '42'
name: param-input
- default: '{"a_dict": {"my-key": "a-value"}, "a_str": "hello world!"}'
name: obj-input
name: pydantic-io
outputs:
artifacts:
- name: artifact-output
path: /tmp/hera-outputs/artifacts/artifact-output
parameters:
- name: param-output
valueFrom:
path: /tmp/hera-outputs/parameters/param-output
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.script_pydantic_io:pydantic_io
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
- name: hera__script_pydantic_io
value: ''
image: python:3.8
source: '{{inputs.parameters}}'
```

41 changes: 41 additions & 0 deletions examples/workflows/experimental/script-pydantic-io.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pydantic-io-
spec:
templates:
- inputs:
artifacts:
- name: artifact-input
path: /tmp/hera-inputs/artifacts/artifact-input
parameters:
- default: '42'
name: param-input
- default: '{"a_dict": {"my-key": "a-value"}, "a_str": "hello world!"}'
name: obj-input
name: pydantic-io
outputs:
artifacts:
- name: artifact-output
path: /tmp/hera-outputs/artifacts/artifact-output
parameters:
- name: param-output
valueFrom:
path: /tmp/hera-outputs/parameters/param-output
script:
args:
- -m
- hera.workflows.runner
- -e
- examples.workflows.experimental.script_pydantic_io:pydantic_io
command:
- python
env:
- name: hera__script_annotations
value: ''
- name: hera__outputs_directory
value: /tmp/hera-outputs
- name: hera__script_pydantic_io
value: ''
image: python:3.8
source: '{{inputs.parameters}}'
45 changes: 45 additions & 0 deletions examples/workflows/experimental/script_pydantic_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
try:
from pydantic.v1 import BaseModel
except ImportError:
from pydantic import BaseModel

from hera.shared import global_config
from hera.workflows import Artifact, ArtifactLoader, Parameter, Workflow, script
from hera.workflows.io import RunnerInput, RunnerOutput

try:
from typing import Annotated # type: ignore
except ImportError:
from typing_extensions import Annotated # type: ignore

global_config.experimental_features["script_annotations"] = True
global_config.experimental_features["script_pydantic_io"] = True


class MyObject(BaseModel):
a_dict: dict = {}
a_str: str = "a default string"


class MyInput(RunnerInput):
param_int: Annotated[int, Parameter(name="param-input")] = 42
an_object: Annotated[MyObject, Parameter(name="obj-input")] = MyObject(
a_dict={"my-key": "a-value"}, a_str="hello world!"
)
artifact_int: Annotated[int, Artifact(name="artifact-input", loader=ArtifactLoader.json)]


class MyOutput(RunnerOutput):
param_int: Annotated[int, Parameter(name="param-output")]
artifact_int: Annotated[int, Artifact(name="artifact-output")]


@script(constructor="runner")
def pydantic_io(
my_input: MyInput,
) -> MyOutput:
return MyOutput(exit_code=1, result="Test!", param_int=42, artifact_int=my_input.param_int)


with Workflow(generate_name="pydantic-io-") as w:
pydantic_io()
105 changes: 105 additions & 0 deletions src/hera/workflows/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Input/output models for the Hera runner."""
from collections import ChainMap
from typing import Any, List, Optional, Union

from hera.shared._pydantic import BaseModel
from hera.shared.serialization import serialize
from hera.workflows.artifact import Artifact
from hera.workflows.parameter import Parameter

try:
from inspect import get_annotations # type: ignore
except ImportError:
from hera.workflows._inspect import get_annotations # type: ignore

try:
from typing import Annotated, get_args, get_origin # type: ignore
except ImportError:
from typing_extensions import Annotated, get_args, get_origin # type: ignore


class RunnerInput(BaseModel):
"""Input model usable by the Hera Runner.
RunnerInput is a Pydantic model which users can create a subclass of. When a subclass
of RunnerInput is used as a function parameter type, the Hera Runner will take the fields
of the user's subclass to create template input parameters and artifacts. See the example
for the script_pydantic_io experimental feature.
"""

@classmethod
def _get_parameters(cls, object_override: "Optional[RunnerInput]" = None) -> List[Parameter]:
parameters = []
annotations = {k: v for k, v in ChainMap(*(get_annotations(c) for c in cls.__mro__)).items()}

for field in cls.__fields__:
if get_origin(annotations[field]) is Annotated:
if isinstance(get_args(annotations[field])[1], Parameter):
param = get_args(annotations[field])[1]
if object_override:
param.default = serialize(getattr(object_override, field))
elif cls.__fields__[field].default:
# Serialize the value (usually done in Parameter's validator)
param.default = serialize(cls.__fields__[field].default)
parameters.append(param)
else:
# Create a Parameter from basic type annotations
if object_override:
parameters.append(Parameter(name=field, default=serialize(getattr(object_override, field))))
else:
parameters.append(Parameter(name=field, default=cls.__fields__[field].default))
return parameters

@classmethod
def _get_artifacts(cls) -> List[Artifact]:
artifacts = []
annotations = {k: v for k, v in ChainMap(*(get_annotations(c) for c in cls.__mro__)).items()}

for field in cls.__fields__:
if get_origin(annotations[field]) is Annotated:
if isinstance(get_args(annotations[field])[1], Artifact):
artifact = get_args(annotations[field])[1]
if artifact.path is None:
artifact.path = artifact._get_default_inputs_path()
artifacts.append(artifact)
return artifacts


class RunnerOutput(BaseModel):
"""Output model usable by the Hera Runner.
RunnerOutput is a Pydantic model which users can create a subclass of. When a subclass
of RunnerOutput is used as a function return type, the Hera Runner will take the fields
of the user's subclass to create template output parameters and artifacts. See the example
for the script_pydantic_io experimental feature.
"""

exit_code: int = 0
result: Any

@classmethod
def _get_outputs(cls) -> List[Union[Artifact, Parameter]]:
outputs = []
annotations = {k: v for k, v in ChainMap(*(get_annotations(c) for c in cls.__mro__)).items()}

for field in cls.__fields__:
if field in {"exit_code", "result"}:
continue
if get_origin(annotations[field]) is Annotated:
if isinstance(get_args(annotations[field])[1], (Parameter, Artifact)):
outputs.append(get_args(annotations[field])[1])
else:
# Create a Parameter from basic type annotations
outputs.append(Parameter(name=field, default=cls.__fields__[field].default))
return outputs

@classmethod
def _get_output(cls, field_name: str) -> Union[Artifact, Parameter]:
annotations = {k: v for k, v in ChainMap(*(get_annotations(c) for c in cls.__mro__)).items()}
annotation = annotations[field_name]
if get_origin(annotation) is Annotated:
if isinstance(get_args(annotation)[1], (Parameter, Artifact)):
return get_args(annotation)[1]

# Create a Parameter from basic type annotations
return Parameter(name=field_name, default=cls.__fields__[field_name].default)
Loading

0 comments on commit 7998f2e

Please sign in to comment.