Skip to content

Commit

Permalink
Add param with no default to wf template
Browse files Browse the repository at this point in the history
  • Loading branch information
cloudw committed Aug 24, 2024
1 parent 7942609 commit fba549e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 4 deletions.
14 changes: 14 additions & 0 deletions metaflow/plugins/aip/aip.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,20 @@ def _create_workflow_yaml(

# Service account is added through webhooks.
workflow["spec"].pop("serviceAccountName", None)

workflow["spec"]["arguments"]["parameters"] = [
dict(name=k, value=json.dumps(v) if isinstance(v, dict) else v)
for k, v in flow_parameters.items()
]
# Parameters with no defaults need to be added to WorkflowTemplates.
# This allows the parameters to be supplied by Workflow that reference the WorkflowTemplate.
workflow["spec"]["arguments"]["parameters"].extend(
[
dict(name=param)
for param in self.flow._get_parameters()
if param not in flow_parameters
]
)
else:
raise NotImplementedError(f"Unsupported output format {kind}.")

Expand Down
40 changes: 39 additions & 1 deletion metaflow/plugins/aip/tests/test_argo_support.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import importlib.util
import inspect
import os
import sys
from tempfile import TemporaryDirectory
import yaml

import pytest
import subprocess_tee

from . import _python, obtain_flow_file_paths

from metaflow import FlowSpec

disabled_test_flows = [
"aip_flow.py", # kfp_preceding_component feature has been deprecated.
]
sys.path.append("flows")


@pytest.mark.parametrize(
Expand Down Expand Up @@ -52,6 +57,11 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None:
validation_cmd = f"argo lint {output_path}"
else:
validation_cmd = f"argo template lint {output_path}"
assert_workflow_template_contains_all_parameters(
flow_base_name=flow_base_name,
flow_path=full_path,
workflow_template_path=output_path,
)

assert (
subprocess_tee.run(
Expand All @@ -61,3 +71,31 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None:
).returncode
== 0
)


def assert_workflow_template_contains_all_parameters(
flow_base_name: str, flow_path: str, workflow_template_path: str
) -> None:
spec = importlib.util.spec_from_file_location(flow_base_name, flow_path)
module = importlib.util.module_from_spec(spec)
flow = None
for name in dir(module):
var = getattr(module, name)
if inspect.isclass(var) and issubclass(var, FlowSpec) and var != FlowSpec:
flow = var # Found valid flow class
assert flow is not None
flow_parameters = [
param
for param in dir(flow)
if not param.startswith("_") and not callable(getattr(flow, param))
]
if not flow_parameters:
# No parameters found. Skipping.
return

with open(workflow_template_path) as workflow_template_file:
workflow_output = yaml.safe_load(workflow_template_file)
output_params = workflow_output["spec"]["arguments"].get("parameters")

for param in flow_parameters:
assert param in output_params
3 changes: 0 additions & 3 deletions metaflow/plugins/aip/tests/test_kfp_s3_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
import marshal
import os
import tempfile
from unittest import mock
from unittest.mock import Mock, PropertyMock, call, patch

import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_s3

from metaflow.plugins.aip.aip_s3_sensor import wait_for_s3_path
Expand Down
Empty file.

0 comments on commit fba549e

Please sign in to comment.