From b434743e34910bbd25f84aeba0a225ec328ff3e1 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Thu, 5 Dec 2024 17:13:26 -0800 Subject: [PATCH] BUG Fix test_dynamic so it can be marked failed when user_workflow fails. --- workflows/airflow/test_dynamic.py | 32 ++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/workflows/airflow/test_dynamic.py b/workflows/airflow/test_dynamic.py index 3340e7f1..ade734c9 100644 --- a/workflows/airflow/test_dynamic.py +++ b/workflows/airflow/test_dynamic.py @@ -13,11 +13,13 @@ from datetime import datetime import os import time +import sys from typing import Optional, Any, Dict, List +from airflow import configuration from airflow.decorators import dag, task, task_group from airflow.utils.trigger_rule import TriggerRule -from airflow.models import Variable +from airflow.models import Variable, DagBag, TaskInstance from lute.operators.jidoperators import JIDSlurmOperator @@ -67,7 +69,35 @@ def user_workflow(): @task(trigger_rule=TriggerRule.ALL_DONE) def delete_workflow(**context): + folder: Any = configuration.get("core", "DAGS_FOLDER") + dagbag: DagBag = DagBag(folder) + dag_ref: Any = dagbag.dags[dag_id] + + tg: Any = dag_ref.task_group.get_child_by_label("user_workflow") + execution_date: str = context.get("logical_date") + + # Collect the TaskGroup state now by looking at state of Tasks in the group + # Otherwise this information gets hidden when we delete the user_workflow + # This Task (`delete_workflow`) always succeeds, so if we don't collect + # the information the workflow is marked as successful even when it fails + task: "DAGNode" + ti: TaskInstance + user_wf_state: str = "success" + for task in tg.children.values(): + try: + ti = TaskInstance(task, execution_date) + if ti.current_state() != "success": + print( + f"{task.task_id} was marked {ti.current_state()}. " + "Marking task group the same." + ) + user_wf_state = ti.current_state() + except Exception as err: + print(err) Variable.delete(key="user_workflow") + if user_wf_state != "success": + print(f"User workflow does not report success: {user_wf_state}") + sys.exit(-1) retrieve_workflow() >> user_workflow() >> delete_workflow()