From 07388162341f565ba48503dc8bea6fbe4434ad94 Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Fri, 23 Aug 2024 21:28:43 -0700 Subject: [PATCH 1/2] Add param with no default to wf template --- metaflow/plugins/aip/aip.py | 10 +++++ .../plugins/aip/tests/test_argo_support.py | 40 ++++++++++++++++++- .../plugins/aip/tests/test_kfp_s3_sensor.py | 3 -- 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/metaflow/plugins/aip/aip.py b/metaflow/plugins/aip/aip.py index 6ad529579d..a27550d23a 100644 --- a/metaflow/plugins/aip/aip.py +++ b/metaflow/plugins/aip/aip.py @@ -300,6 +300,16 @@ def _create_workflow_yaml( # Service account is added through webhooks. workflow["spec"].pop("serviceAccountName", None) + + # Parameters with no defaults need to be added to WorkflowTemplates. + # This allows the parameters to be supplied by Workflow that reference the WorkflowTemplate. + workflow["spec"]["arguments"]["parameters"].extend( + [ + dict(name=param) + for param in self.flow._get_parameters() + if param not in flow_parameters + ] + ) else: raise NotImplementedError(f"Unsupported output format {kind}.") diff --git a/metaflow/plugins/aip/tests/test_argo_support.py b/metaflow/plugins/aip/tests/test_argo_support.py index 51a4943d5f..f6655d8579 100644 --- a/metaflow/plugins/aip/tests/test_argo_support.py +++ b/metaflow/plugins/aip/tests/test_argo_support.py @@ -1,15 +1,20 @@ +import importlib.util +import inspect import os +import sys from tempfile import TemporaryDirectory +import yaml import pytest import subprocess_tee from . import _python, obtain_flow_file_paths - +from metaflow import FlowSpec disabled_test_flows = [ "aip_flow.py", # kfp_preceding_component feature has been deprecated. ] +sys.path.append("flows") @pytest.mark.parametrize( @@ -52,6 +57,11 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None: validation_cmd = f"argo lint {output_path}" else: validation_cmd = f"argo template lint {output_path}" + assert_workflow_template_contains_all_parameters( + flow_base_name=flow_base_name, + flow_path=full_path, + workflow_template_path=output_path, + ) assert ( subprocess_tee.run( @@ -61,3 +71,31 @@ def test_argo_flows(pytestconfig, flow_file_path: str) -> None: ).returncode == 0 ) + + +def assert_workflow_template_contains_all_parameters( + flow_base_name: str, flow_path: str, workflow_template_path: str +) -> None: + spec = importlib.util.spec_from_file_location(flow_base_name, flow_path) + module = importlib.util.module_from_spec(spec) + flow = None + for name in dir(module): + var = getattr(module, name) + if inspect.isclass(var) and issubclass(var, FlowSpec) and var != FlowSpec: + flow = var # Found valid flow class + assert flow is not None + flow_parameters = [ + param + for param in dir(flow) + if not param.startswith("_") and not callable(getattr(flow, param)) + ] + if not flow_parameters: + # No parameters found. Skipping. + return + + with open(workflow_template_path) as workflow_template_file: + workflow_output = yaml.safe_load(workflow_template_file) + output_params = workflow_output["spec"]["arguments"].get("parameters") + + for param in flow_parameters: + assert param in output_params diff --git a/metaflow/plugins/aip/tests/test_kfp_s3_sensor.py b/metaflow/plugins/aip/tests/test_kfp_s3_sensor.py index e8d4c115ad..efe383e28b 100644 --- a/metaflow/plugins/aip/tests/test_kfp_s3_sensor.py +++ b/metaflow/plugins/aip/tests/test_kfp_s3_sensor.py @@ -2,12 +2,9 @@ import marshal import os import tempfile -from unittest import mock -from unittest.mock import Mock, PropertyMock, call, patch import boto3 import pytest -from botocore.exceptions import ClientError from moto import mock_s3 from metaflow.plugins.aip.aip_s3_sensor import wait_for_s3_path From b44efde40d024e8222b7d29b426fbdcb85f0383e Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Tue, 1 Oct 2024 19:21:31 -0700 Subject: [PATCH 2/2] Fix param addition --- metaflow/plugins/aip/aip.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/metaflow/plugins/aip/aip.py b/metaflow/plugins/aip/aip.py index a27550d23a..5fa2023994 100644 --- a/metaflow/plugins/aip/aip.py +++ b/metaflow/plugins/aip/aip.py @@ -303,11 +303,17 @@ def _create_workflow_yaml( # Parameters with no defaults need to be added to WorkflowTemplates. # This allows the parameters to be supplied by Workflow that reference the WorkflowTemplate. + if "arguments" in workflow["spec"]: + parameters = workflow["spec"]["arguments"].get("parameters", {}) + else: + parameters = {} + workflow["spec"]["arguments"] = {"parameters": parameters} + workflow["spec"]["arguments"]["parameters"].extend( [ - dict(name=param) + {"name": param} for param in self.flow._get_parameters() - if param not in flow_parameters + if param not in parameters ] ) else: