Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
timsavage committed Nov 27, 2024
2 parents ada8b16 + 742b462 commit 0a6e68f
Show file tree
Hide file tree
Showing 17 changed files with 1,014 additions and 648 deletions.
13 changes: 13 additions & 0 deletions HISTORY
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
0.17.0
======

Additions
---------

- Add required_vars option to Workflows to ensure that required variables
are set before the workflow is executed.
- Integrate required_vars into CLI.
- Add sensitive filter when outputing variables in flow-trace


0.16.0
======

Expand All @@ -18,6 +30,7 @@ Changes
- ``TryExcept`` now resolves nodes called after an exception taking subclasses
into account. This matches the behaviour of Python itself.


0.15.0
======

Expand Down
50 changes: 50 additions & 0 deletions docs/source/cli.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
###
CLI
###

A CLI is provided to trigger workflows that are defined in a ``flowfile.py`` file.

The primary CLI commands are ``list`` and ``run``.

A `flowfile` is simply a Python file in which a set of workflows are defined or imported.
The `flowfile` is loaded by the CLI and the workflows are made available for execution.


Listing available workflows
---------------------------

.. code-block:: bash
$ flow list --help
usage: flow list [-h] [-f FLOW_FILE]
options:
-h, --help show this help message and exit
-f FLOW_FILE, --flow-file FLOW_FILE
Location of flow file; default is ./flowfile.py
Running a workflow
------------------

.. code-block:: bash
$ flow run --help
usage: flow run [-h] [-f FLOW_FILE] [--dry-run] [--full-trace] NAME [KEY=VALUE ...]
positional arguments:
NAME Name of workflow
KEY=VALUE Key/Value arguments added to flow context
options:
-h, --help show this help message and exit
-f FLOW_FILE, --flow-file FLOW_FILE
Location of flow file; default is ./flowfile.py
--dry-run Dry run; do not execute actions
--full-trace Show full trace on error.
The ``run`` command takes a workflow name and a set of key/value pairs that are
added to the flow context.

The run command also includes tracing to report aid in the identification of
errors within a flow and where they occurred.
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Contents
:maxdepth: 2

getting-started
cli
reference/index
releases

Expand Down
1,305 changes: 699 additions & 606 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyapp-flow"
version = "0.16"
version = "0.17"
description = "Application workflow framework"
authors = ["Tim Savage <[email protected]>"]
license = "BSD-3-Clause"
Expand Down Expand Up @@ -29,7 +29,7 @@ include = ["HISTORY"]
flow = "pyapp_flow.cli:main"

[tool.poetry.dependencies]
python = "^3.8"
python = "^3.10"
rich = ">=12.4.4,<14.0.0"
pyapp = "^4.10"
typing-extensions = "^4.0"
Expand Down
55 changes: 41 additions & 14 deletions src/pyapp_flow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
"""Application Workflow"""

from typing import Optional, Type

from typing_extensions import Self

from . import errors as exceptions, steps
from .datastructures import WorkflowContext, Navigable, Branches
from .functions import extract_inputs, skip_step, call_nodes
from . import errors as exceptions
from . import steps
from .datastructures import Branches, Navigable, WorkflowContext
from .functions import (
call_nodes,
extract_inputs,
required_variables_in_context,
skip_step,
)
from .nodes import (
Node,
step,
inline,
Step,
SetVar,
ForEach,
Append,
CaptureErrors,
Conditional,
If,
FeatureEnabled,
Switch,
ForEach,
Group,
If,
LogMessage,
Append,
Node,
SetVar,
Step,
Switch,
TryExcept,
TryUntil,
Group,
inline,
step,
)
from .steps import (
alias,
)


Expand All @@ -46,16 +58,18 @@ class Workflow(Nodes):
workflow.
"""

__slots__ = ("_name", "description")
__slots__ = ("_name", "description", "_required_vars")

def __init__(self, name: str, description: str = None):
super().__init__()
self._name = name
self.description = description
self._required_vars = tuple()

def __call__(self, context: WorkflowContext):
context.info("⏩ Workflow: `%s`", context.format(self._name))
with context:
required_variables_in_context(self.name, self._required_vars, context)
self._execute(context)

@property
Expand All @@ -78,6 +92,7 @@ def execute(
context = context or WorkflowContext(dry_run=dry_run)
context.state.update(context_vars)
context.info("⏩ Workflow: `%s`", self._name)
required_variables_in_context(self.name, self._required_vars, context)
self._execute(context)
return context

Expand Down Expand Up @@ -109,3 +124,15 @@ def set_vars(self, **kwargs) -> Self:
"""
self._nodes.append(SetVar(**kwargs))
return self

def require_vars(self, **kwargs: Optional[Type]) -> Self:
"""Require variables to be present in the context.
If any type can be used, use ``typing.Any`` as the type.
:param kwargs: Key/Type pairs to check in the context.
:return: Returns self; fluent interface
"""
self._required_vars = tuple(kwargs.items())
return self
19 changes: 13 additions & 6 deletions src/pyapp_flow/cli/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import logging
import sys
from pathlib import Path
from typing import Dict
from types import ModuleType
from typing import Dict

from rich import print
from rich.traceback import Traceback

import pyapp_flow
from pyapp_flow import Workflow, WorkflowContext
from pyapp_flow import Workflow, WorkflowContext, errors

log = logging.getLogger(__package__)

Expand Down Expand Up @@ -78,35 +78,42 @@ def run_flow(
flow_module = _import_flow_file(flow_file)
except FileNotFoundError:
log.error("Flow file not found")
return 404
return 13
except Exception:
traceback = Traceback(
suppress=() if full_trace else [pyapp_flow],
show_locals=True,
)
print(traceback)
return 500
return 1

try:
flow = _resolve_flow(flow_module, name)
except RuntimeError as ex:
log.error(ex)
return 404
return 13

context = WorkflowContext(
dry_run=dry_run,
flow_path=flow_file.parent.resolve(),
)
try:
flow.execute(context, **args)

except errors.VariableError as ex:
if context.flow_trace:
print(context.flow_trace)
print(f"{flow.name} {ex}")
return 1

except Exception:
print(context.flow_trace)
traceback = Traceback(
suppress=() if full_trace else [pyapp_flow],
show_locals=True,
)
print(traceback)
return 501
return 1


def graph_flow(flow_file: Path, name: str):
Expand Down
4 changes: 3 additions & 1 deletion src/pyapp_flow/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
Iterable,
)

from .helpers import mask_keys

Branches = Dict[str, Sequence["Navigable"]]
TRACE_STATE_KEY: Final[str] = "__trace"

Expand Down Expand Up @@ -59,7 +61,7 @@ def __rich__(self):
"""Rich repr of state."""
from rich.scope import render_scope

return render_scope(self, title="State Variables", sort_keys=True)
return render_scope(mask_keys(self), title="State Variables", sort_keys=True)

def copy(self) -> "State":
"""Copy and return a state instance."""
Expand Down
12 changes: 12 additions & 0 deletions src/pyapp_flow/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ class WorkflowException(Exception):
pass


class VariableError(WorkflowException, TypeError):
"""Common error for variables."""


class MissingVariableError(VariableError):
"""Variable not found in context."""


class VariableTypeError(VariableError):
"""Variable type is invalid."""


class WorkflowSetupError(WorkflowException):
"""Error setting up workflow"""

Expand Down
41 changes: 39 additions & 2 deletions src/pyapp_flow/functions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from typing import Callable, Mapping, Tuple, Sequence, Union, Iterable
from typing import Callable, Mapping, Tuple, Sequence, Union, Iterable, Any

from .datastructures import WorkflowContext
from .errors import WorkflowSetupError, SkipStep
from .errors import (
WorkflowSetupError,
SkipStep,
MissingVariableError,
VariableTypeError,
)
from .helpers import human_join_strings


def skip_step(message: str):
Expand Down Expand Up @@ -119,3 +125,34 @@ def merge_nested_entries(
merge_method(value)

return results


def required_variables_in_context(
node_name: str,
required_vars: Sequence[Tuple[str, type]],
context: WorkflowContext,
):
"""Check all variables are in the context."""
missing = []
invalid_types = []

for var_name, var_type in required_vars:
try:
value = context.state[var_name]
except KeyError:
missing.append(var_name)
else:
if var_type is not Any and not isinstance(value, var_type):
invalid_types.append(var_name)

if missing:
raise MissingVariableError(
f"{node_name} missing {len(missing)} required context variable: "
f"{human_join_strings(missing)}"
)

if invalid_types:
raise VariableTypeError(
f"{node_name} has {len(invalid_types)} context variable(s) with invalid types: "
f"{human_join_strings(invalid_types)}"
)
37 changes: 35 additions & 2 deletions src/pyapp_flow/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import logging
from contextlib import contextmanager
from typing import Union
from typing import Any


@contextmanager
def change_log_level(level: Union[int, str, None], *, logger: logging.Logger = None):
def change_log_level(level: int | str | None, *, logger: logging.Logger | None = None):
"""Temporarily change the log level of a logger."""
if level is None:
yield
Expand All @@ -18,3 +18,36 @@ def change_log_level(level: Union[int, str, None], *, logger: logging.Logger = N
yield
finally:
logger.setLevel(old_level)


def human_join_strings(items, *, conjunction: str = "and", empty: str = ""):
"""Join a list of strings with a human-readable conjunction."""
if not items:
return empty

if len(items) == 1:
return items[0]

return f"{', '.join(items[:-1])} {conjunction} {items[-1]}"


SENSITIVE_WORDS = ("credential", "authorization", "token", "secret", "password")


def set_sensitive_words(sensitive_words: tuple[str, ...]):
global SENSITIVE_WORDS
SENSITIVE_WORDS = tuple(sensitive_words)


def mask_keys(d: dict[str, Any], *, sensitive_words: tuple[str, ...] | None = None):
"""Mask dictionary values for keys that contain a sensitive name"""

sensitive_words = sensitive_words or SENSITIVE_WORDS

def _mask(key: str, value: Any) -> Any:
if any(word in key for word in sensitive_words):
return "****"
return value

return {key: _mask(key, value) for key, value in d.items()}

Loading

0 comments on commit 0a6e68f

Please sign in to comment.