From 9325a665b28ba1260a21ef83df96ee8b988dd57b Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 15 Apr 2024 15:47:59 -0700 Subject: [PATCH] SKL Retrieve log file (XCom) when Task job finishes or fails --- launch_scripts/launch_airflow.py | 33 ++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index 2c8944e2..18a8f1c2 100755 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -16,7 +16,7 @@ import logging import argparse import time -from typing import Dict, Union, List +from typing import Dict, Union, List, Optional, Any import requests from requests.auth import HTTPBasicAuth @@ -72,8 +72,8 @@ def _retrieve_pw(instance: str = "prod") -> str: "health": "api/v1/health", "run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns", "get_tasks": f"api/v1/dags/lute_{args.workflow}/tasks", - "get_xcom": ( - f"api/v1/dags/lute_{args.workflow}/{{dag_run_id}}/taskInstances" + "get_xcom": ( # Need to format dag_run_id, task_id, xcom_key + f"api/v1/dags/lute_{args.workflow}/dagRuns/{{dag_run_id}}/taskInstances" f"/{{task_id}}/xcomEntries/{{xcom_key}}" ), } @@ -137,15 +137,40 @@ def _retrieve_pw(instance: str = "prod") -> str: xcom_key: str = "log" while True: time.sleep(1) + # DAG Status resp = requests.get(url, auth=auth) resp.raise_for_status() state = resp.json()["state"] + # Check Task instances + task_url: str = f"{url}/taskInstances" + resp = requests.get(task_url, auth=auth) + resp.raise_for_status() + instance_information: Dict[str, Any] = resp.json()["task_instances"] + for inst in instance_information: + task_id: str = inst["task_id"] + task_state: Optional[str] + if task_state := inst["state"]: + logger.info(f"{task_id} state: {task_state}") + if task_state in ("success", "failed"): + # Only pushed to XCOM at the end of each Task + xcom_url: str = ( + f"{airflow_instance}/{airflow_api_endpoints['get_xcom']}" + ) + xcom_url = xcom_url.format( + dag_run_id=dag_run_id, + task_id=task_id, + xcom_key=xcom_key, + ) + resp = requests.get(xcom_url, auth=auth) + resp.raise_for_status() + logs: str = resp.json()["value"] # Only want to print once. + if state in ("queued", "running"): continue logger.info(f"DAG is {state}") break if state == "failed": - sys.exit(-1) + sys.exit(1) else: sys.exit(0)