Skip to content

Commit

Permalink
BUG Fix test_dynamic so it can be marked failed when user_workflow fa…
Browse files Browse the repository at this point in the history
…ils.
  • Loading branch information
gadorlhiac committed Dec 6, 2024
1 parent 4ee116b commit b434743
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion workflows/airflow/test_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
from datetime import datetime
import os
import time
import sys
from typing import Optional, Any, Dict, List

from airflow import configuration
from airflow.decorators import dag, task, task_group
from airflow.utils.trigger_rule import TriggerRule
from airflow.models import Variable
from airflow.models import Variable, DagBag, TaskInstance

from lute.operators.jidoperators import JIDSlurmOperator

Expand Down Expand Up @@ -67,7 +69,35 @@ def user_workflow():

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_workflow(**context):
folder: Any = configuration.get("core", "DAGS_FOLDER")
dagbag: DagBag = DagBag(folder)
dag_ref: Any = dagbag.dags[dag_id]

tg: Any = dag_ref.task_group.get_child_by_label("user_workflow")
execution_date: str = context.get("logical_date")

# Collect the TaskGroup state now by looking at state of Tasks in the group
# Otherwise this information gets hidden when we delete the user_workflow
# This Task (`delete_workflow`) always succeeds, so if we don't collect
# the information the workflow is marked as successful even when it fails
task: "DAGNode"

Check failure on line 83 in workflows/airflow/test_dynamic.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (F821)

workflows/airflow/test_dynamic.py:83:16: F821 Undefined name `DAGNode`
ti: TaskInstance
user_wf_state: str = "success"
for task in tg.children.values():
try:
ti = TaskInstance(task, execution_date)
if ti.current_state() != "success":
print(
f"{task.task_id} was marked {ti.current_state()}. "
"Marking task group the same."
)
user_wf_state = ti.current_state()
except Exception as err:
print(err)
Variable.delete(key="user_workflow")
if user_wf_state != "success":
print(f"User workflow does not report success: {user_wf_state}")
sys.exit(-1)

retrieve_workflow() >> user_workflow() >> delete_workflow()

Expand Down

0 comments on commit b434743

Please sign in to comment.