Skip to content

Commit

Permalink
MNT Try softer approach to Airflow dynamic DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
gadorlhiac committed Dec 5, 2024
1 parent 89b8c96 commit 4ee116b
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions tests/run_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,15 @@ def run_workflow(
resp.raise_for_status()
else:
raise
# Let's wait some time and update again...
# Try and allow Airflow to get used to the Dynamic DAG it doesn't like...
time.sleep(2)
resp = requests.patch(
f"{airflow_instance}/{airflow_api_endpoints['update_defn']}",
json=new_workflow,
auth=auth,
)
resp.raise_for_status()
logger.debug("Sent new workflow definition.")
resp = requests.get(
f"{airflow_instance}/{airflow_api_endpoints['mod_dag']}",
Expand Down Expand Up @@ -379,13 +388,6 @@ def run_workflow(
},
}

resp = requests.post(
f"{airflow_instance}/{airflow_api_endpoints['run_dag']}",
json=dag_run_data,
auth=auth,
)
resp.raise_for_status()

# Get Task information
task_ids: List[str]
# Airflow shouldn't have list of Tasks yet so we parse manually
Expand All @@ -403,7 +405,7 @@ def get_names(wf_defn: Dict[str, Any], names: List[str]) -> None:
f"Contains Managed Tasks (alphabetical, not execution order):\n\t- {task_id_str}"
)

# Submit again... hack around Airflow not liking these dynamic DAGs
# Submit hopefully Airflow will do okay
dag_run_data["dag_run_id"] = str(uuid.uuid4())
resp = requests.post(
f"{airflow_instance}/{airflow_api_endpoints['run_dag']}",
Expand Down Expand Up @@ -441,6 +443,7 @@ def get_names(wf_defn: Dict[str, Any], names: List[str]) -> None:
None,
"scheduled",
"queued",
"removed",
):
if task_id not in logged_running:
# Should be "running" by first time it reaches here.
Expand Down

0 comments on commit 4ee116b

Please sign in to comment.