Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-8651 Include parameters with no defaults in WorkflowTemplate #311

Open
wants to merge 2 commits into
base: feature/aip
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,22 @@ def _create_workflow_yaml(

# Service account is added through webhooks.
workflow["spec"].pop("serviceAccountName", None)

# Parameters with no defaults need to be added to WorkflowTemplates.
# This allows the parameters to be supplied by Workflow that reference the WorkflowTemplate.
if "arguments" in workflow["spec"]:
parameters = workflow["spec"]["arguments"].get("parameters", {})
else:
parameters = {}
workflow["spec"]["arguments"] = {"parameters": parameters}

workflow["spec"]["arguments"]["parameters"].extend(
[
{"name": param}
for param in self.flow._get_parameters()
if param not in parameters
]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may not work:

File "/home/zservice/metaflow/metaflow/task.py", line 548, in run_step
    self._exec_step_function(step_func)
  File "/home/zservice/metaflow/metaflow/task.py", line 53, in _exec_step_function
    step_function()
  File "/home/zservice/metaflow/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py", line 89, in start
    self.submit_template(path)
  File "/home/zservice/metaflow/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py", line 198, in submit_template
    subprocess.run(
  File "/usr/local/lib/python3.9/subprocess.py", line 528, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['argo', 'template', '-n', 'metaflow-integration-testing-internal', 'create', '/tmp/wfdsk-ftf-test-argo-d64fe9ff-f1d7-4c7c-8551-c45c3c600063-0.yaml']' returned non-zero exit status 1.

see:

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: it'll also be important to ensure it's tested with a workflow compiled with an exit handler to ensure we don't hit argoproj/argo-workflows#6036

else:
raise NotImplementedError(f"Unsupported output format {kind}.")

Expand Down
40 changes: 39 additions & 1 deletion metaflow/plugins/aip/tests/test_argo_support.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import importlib.util
import inspect
import os
import sys
from tempfile import TemporaryDirectory
import yaml

import pytest
import subprocess_tee

from . import _python, obtain_flow_file_paths

from metaflow import FlowSpec

disabled_test_flows = [
"aip_flow.py", # kfp_preceding_component feature has been deprecated.
]
sys.path.append("flows")


@pytest.mark.parametrize(
Expand Down Expand Up @@ -52,6 +57,11 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None:
validation_cmd = f"argo lint {output_path}"
else:
validation_cmd = f"argo template lint {output_path}"
assert_workflow_template_contains_all_parameters(
flow_base_name=flow_base_name,
flow_path=full_path,
workflow_template_path=output_path,
)

assert (
subprocess_tee.run(
Expand All @@ -61,3 +71,31 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None:
).returncode
== 0
)


def assert_workflow_template_contains_all_parameters(
flow_base_name: str, flow_path: str, workflow_template_path: str
) -> None:
spec = importlib.util.spec_from_file_location(flow_base_name, flow_path)
module = importlib.util.module_from_spec(spec)
flow = None
for name in dir(module):
var = getattr(module, name)
if inspect.isclass(var) and issubclass(var, FlowSpec) and var != FlowSpec:
flow = var # Found valid flow class
assert flow is not None
flow_parameters = [
param
for param in dir(flow)
if not param.startswith("_") and not callable(getattr(flow, param))
]
if not flow_parameters:
# No parameters found. Skipping.
return

with open(workflow_template_path) as workflow_template_file:
workflow_output = yaml.safe_load(workflow_template_file)
output_params = workflow_output["spec"]["arguments"].get("parameters")

for param in flow_parameters:
assert param in output_params
3 changes: 0 additions & 3 deletions metaflow/plugins/aip/tests/test_kfp_s3_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
import marshal
import os
import tempfile
from unittest import mock
from unittest.mock import Mock, PropertyMock, call, patch

import boto3
import pytest
from botocore.exceptions import ClientError
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports.

from moto import mock_s3

from metaflow.plugins.aip.aip_s3_sensor import wait_for_s3_path
Expand Down
Loading