Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send {{workflow.failures}} to exit_handler #278

Open
wants to merge 2 commits into
base: feature/aip
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
include:
- project: 'analytics/artificial-intelligence/ai-platform/aip-infrastructure/ci-templates/ci-cd-template'
ref: &include_ref 'v4'
# ref: &include_ref 'v4'
ref: &include_ref '4e25c7200369aec17fcf8de723db360a7726151a'
file: 'environments/devex.yml'
- project: 'analytics/artificial-intelligence/ai-platform/aip-infrastructure/ci-templates/ci-cd-template'
ref: *include_ref
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,7 @@ def _get_user_defined_exit_handler_op(
" --status {{workflow.status}}"
f" --metaflow_configs_json {json.dumps(json.dumps(metaflow_configs))}"
" --retries {{retries}}"
" --failures {{workflow.failures}}"
),
]

Expand Down
3 changes: 3 additions & 0 deletions metaflow/plugins/aip/aip_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def kubeflow_pipelines(obj):
@click.option("--flow_parameters_json")
@click.option("--metaflow_configs_json")
@click.option("--retries")
@click.option("--failures")
@click.pass_obj
def user_defined_exit_handler(
obj,
Expand All @@ -65,6 +66,7 @@ def user_defined_exit_handler(
flow_parameters_json: str,
metaflow_configs_json: str,
retries: int,
failures: str,
):
# call user defined exit handler
invoke_user_defined_exit_handler(
Expand All @@ -76,6 +78,7 @@ def user_defined_exit_handler(
flow_parameters_json,
metaflow_configs_json,
retries,
failures,
)


Expand Down
4 changes: 4 additions & 0 deletions metaflow/plugins/aip/aip_udf_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def invoke_user_defined_exit_handler(
flow_parameters_json: str,
metaflow_configs_json: str,
retries: int,
failures: str,
):
"""
The environment variables that this depends on:
Expand All @@ -35,6 +36,8 @@ def invoke_user_defined_exit_handler(
import json
import os

print(f"{failures=}")

env_variables: Dict[str, str] = json.loads(env_variables_json)

def get_env(name, default=None) -> str:
Expand Down Expand Up @@ -74,4 +77,5 @@ def get_env(name, default=None) -> str:
metaflow_run_id=run_id,
argo_ui_url=argo_ui_url,
retries=int(retries),
failures=failures,
)
40 changes: 30 additions & 10 deletions metaflow/tutorials/00-helloworld/helloworld.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
from metaflow import FlowSpec, step
from metaflow import FlowSpec, step, resources, exit_handler, Parameter, JSONType

from metaflow.plugins.aip import exit_handler_resources, exit_handler_retry


@exit_handler_retry(times=1)
# @exit_handler_resources(cpu="600m", memory="600Mi")
def my_exit_handler(
status: str,
flow_parameters: dict,
argo_workflow_run_name: str,
metaflow_run_id: str,
argo_ui_url: str,
retries: int,
failures: str,
) -> None:
print(f"{failures=}")
print(f"{flow_parameters=}")
if retries == 0:
raise Exception("oopsie")


@exit_handler(func=my_exit_handler, on_success=True)
class HelloFlow(FlowSpec):
"""
A flow where Metaflow prints 'Hi'.
Expand All @@ -9,23 +29,23 @@ class HelloFlow(FlowSpec):

"""

s3_keys = Parameter("s3_keys", type=JSONType)
hi = Parameter("hi", default=None, help="The word to print.")
hi2 = Parameter("hi2")

@step
def start(self):
"""
This is the 'start' step. All flows must have a step named 'start' that
is the first step in the flow.

"""
self.x = 42
self.y = {"foo": "bar", "baz": "qux", "quux": "corge"}
# raise Exception("foo")
print("HelloFlow is starting.")
self.next(self.hello)

@step
def hello(self):
"""
A step for metaflow to introduce itself.

"""
print("Metaflow says: Hi!")
# import time
# time.sleep(3)
self.next(self.end)

@step
Expand Down
Loading