From d80a373f98b1fab750be30370eb4d6511aaeaf67 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 6 Sep 2023 01:58:19 -0700 Subject: [PATCH] Update run-tests.py (#84) Signed-off-by: Kevin Su --- boilerplate/flyte/end2end/run-tests.py | 75 ++++++++++++-------------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py index 6427681..eb2b28d 100644 --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 -import click import datetime import json import sys import time import traceback +from typing import Dict, List, Mapping, Tuple + +import click import requests -from typing import List, Mapping, Tuple, Dict -from flytekit.remote import FlyteRemote +from flytekit.configuration import Config from flytekit.models.core.execution import WorkflowExecutionPhase -from flytekit.configuration import Config, ImageConfig, SerializationSettings +from flytekit.remote import FlyteRemote from flytekit.remote.executions import FlyteWorkflowExecution - WAIT_TIME = 10 MAX_ATTEMPTS = 200 @@ -22,15 +22,14 @@ # starting with "core". FLYTESNACKS_WORKFLOW_GROUPS: Mapping[str, List[Tuple[str, dict]]] = { "lite": [ - ("basics.hello_world.my_wf", {}), - ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), + ("basics.hello_world.hello_world_wf", {}), ], "core": [ - ("basics.deck.wf", {}), + # ("development_lifecycle.decks.image_renderer_wf", {}), # The chain_workflows example in flytesnacks expects to be running in a sandbox. - ("control_flow.chain_entities.chain_workflows_wf", {}), - ("control_flow.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), - ("control_flow.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), + ("advanced_composition.chain_entities.chain_workflows_wf", {}), + ("advanced_composition.dynamics.wf", {"s1": "Pear", "s2": "Earth"}), + ("advanced_composition.map_task.my_map_workflow", {"a": [1, 2, 3, 4, 5]}), # Workflows that use nested executions cannot be launched via flyteremote. # This issue is being tracked in https://github.com/flyteorg/flyte/issues/1482. # ("control_flow.run_conditions.multiplier", {"my_input": 0.5}), @@ -41,24 +40,22 @@ # ("control_flow.run_conditions.nested_conditions", {"my_input": 0.4}), # ("control_flow.run_conditions.consume_outputs", {"my_input": 0.4, "seed": 7}), # ("control_flow.run_merge_sort.merge_sort", {"numbers": [5, 4, 3, 2, 1], "count": 5}), - ("control_flow.subworkflows.parent_wf", {"a": 3}), - ("control_flow.subworkflows.nested_parent_wf", {"a": 3}), - ("basics.basic_workflow.my_wf", {"a": 50, "b": "hello"}), + ("advanced_composition.subworkflows.parent_workflow", {"my_input1": "hello"}), + ("advanced_composition.subworkflows.nested_parent_wf", {"a": 3}), + ("basics.workflow.simple_wf", {"x": [1, 2, 3], "y": [1, 2, 3]}), # TODO: enable new files and folders workflows # ("basics.files.rotate_one_workflow", {"in_image": "https://upload.wikimedia.org/wikipedia/commons/d/d2/Julia_set_%28C_%3D_0.285%2C_0.01%29.jpg"}), # ("basics.folders.download_and_rotate", {}), - ("basics.hello_world.my_wf", {}), - ("basics.lp.my_wf", {"val": 4}), - ("basics.lp.go_greet", {"day_of_week": "5", "number": 3, "am": True}), - ("basics.named_outputs.my_wf", {}), + ("basics.hello_world.hello_world_wf", {}), + ("basics.named_outputs.simple_wf_with_named_outputs", {}), # # Getting a 403 for the wikipedia image # # ("basics.reference_task.wf", {}), - ("type_system.custom_objects.wf", {"x": 10, "y": 20}), + ("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}), # Enums are not supported in flyteremote # ("type_system.enums.enum_wf", {"c": "red"}), - ("type_system.schema.df_wf", {"a": 42}), - ("type_system.typed_schema.wf", {}), - #("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), + ("data_types_and_io.schema.df_wf", {"a": 42}), + ("data_types_and_io.typed_schema.wf", {}), + # ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), ], "integrations-k8s-spark": [ ("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}), @@ -97,12 +94,14 @@ def execute_workflow(remote, version, workflow_name, inputs): wf = remote.fetch_workflow(name=workflow_name, version=version) return remote.execute(wf, inputs=inputs, wait=False) + def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool: for executions in executions_by_wfgroup.values(): if not all([execution.is_done for execution in executions]): return False return True + def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]): try: for executions in executions_by_wfgroup.values(): @@ -120,6 +119,7 @@ def report_executions(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecuti for execution in executions: print(execution) + def schedule_workflow_groups( tag: str, workflow_groups: List[str], @@ -140,17 +140,12 @@ def schedule_workflow_groups( # Wait for all executions to finish attempt = 0 - while attempt == 0 or ( - not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS - ): + while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS): attempt += 1 - print( - f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s" - ) + print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s") time.sleep(WAIT_TIME) sync_executions(remote, executions_by_wfgroup) - report_executions(executions_by_wfgroup) results = {} @@ -193,14 +188,17 @@ def run( # For a given release tag and priority, this function filters the workflow groups from the flytesnacks # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ]. - manifest_url = "https://raw.githubusercontent.com/flyteorg/flytesnacks/" \ - f"{flytesnacks_release_tag}/flyte_tests_manifest.json" + manifest_url = ( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json" + ) r = requests.get(manifest_url) parsed_manifest = r.json() workflow_groups = [] - workflow_groups = ["lite"] if "lite" in priorities else [ - group["name"] for group in parsed_manifest if group["priority"] in priorities - ] + workflow_groups = ( + ["lite"] + if "lite" in priorities + else [group["name"] for group in parsed_manifest if group["priority"] in priorities] + ) results = [] valid_workgroups = [] @@ -217,10 +215,7 @@ def run( valid_workgroups.append(workflow_group) results_by_wfgroup = schedule_workflow_groups( - flytesnacks_release_tag, - valid_workgroups, - remote, - terminate_workflow_on_failure + flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure ) for workflow_group, succeeded in results_by_wfgroup.items(): @@ -274,9 +269,7 @@ def cli( terminate_workflow_on_failure, ): print(f"return_non_zero_on_failure={return_non_zero_on_failure}") - results = run( - flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure - ) + results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure) # Write a json object in its own line describing the result of this run to stdout print(f"Result of run:\n{json.dumps(results)}")