From 4ee116ba60f7ba84593901f929a282e1729a4b92 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Thu, 5 Dec 2024 14:52:22 -0800 Subject: [PATCH] MNT Try softer approach to Airflow dynamic DAG --- tests/run_functional.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tests/run_functional.py b/tests/run_functional.py index 49810bfe..91f44eb1 100644 --- a/tests/run_functional.py +++ b/tests/run_functional.py @@ -343,6 +343,15 @@ def run_workflow( resp.raise_for_status() else: raise + # Let's wait some time and update again... + # Try and allow Airflow to get used to the Dynamic DAG it doesn't like... + time.sleep(2) + resp = requests.patch( + f"{airflow_instance}/{airflow_api_endpoints['update_defn']}", + json=new_workflow, + auth=auth, + ) + resp.raise_for_status() logger.debug("Sent new workflow definition.") resp = requests.get( f"{airflow_instance}/{airflow_api_endpoints['mod_dag']}", @@ -379,13 +388,6 @@ def run_workflow( }, } - resp = requests.post( - f"{airflow_instance}/{airflow_api_endpoints['run_dag']}", - json=dag_run_data, - auth=auth, - ) - resp.raise_for_status() - # Get Task information task_ids: List[str] # Airflow shouldn't have list of Tasks yet so we parse manually @@ -403,7 +405,7 @@ def get_names(wf_defn: Dict[str, Any], names: List[str]) -> None: f"Contains Managed Tasks (alphabetical, not execution order):\n\t- {task_id_str}" ) - # Submit again... hack around Airflow not liking these dynamic DAGs + # Submit hopefully Airflow will do okay dag_run_data["dag_run_id"] = str(uuid.uuid4()) resp = requests.post( f"{airflow_instance}/{airflow_api_endpoints['run_dag']}", @@ -441,6 +443,7 @@ def get_names(wf_defn: Dict[str, Any], names: List[str]) -> None: None, "scheduled", "queued", + "removed", ): if task_id not in logged_running: # Should be "running" by first time it reaches here.