Skip to content

Commit

Permalink
SKL Retrieve log file (XCom) when Task job finishes or fails
Browse files Browse the repository at this point in the history
  • Loading branch information
gadorlhiac committed Apr 15, 2024
1 parent fd6ce97 commit 9325a66
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions launch_scripts/launch_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging
import argparse
import time
from typing import Dict, Union, List
from typing import Dict, Union, List, Optional, Any

import requests
from requests.auth import HTTPBasicAuth
Expand Down Expand Up @@ -72,8 +72,8 @@ def _retrieve_pw(instance: str = "prod") -> str:
"health": "api/v1/health",
"run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns",
"get_tasks": f"api/v1/dags/lute_{args.workflow}/tasks",
"get_xcom": (
f"api/v1/dags/lute_{args.workflow}/{{dag_run_id}}/taskInstances"
"get_xcom": ( # Need to format dag_run_id, task_id, xcom_key
f"api/v1/dags/lute_{args.workflow}/dagRuns/{{dag_run_id}}/taskInstances"
f"/{{task_id}}/xcomEntries/{{xcom_key}}"
),
}
Expand Down Expand Up @@ -137,15 +137,40 @@ def _retrieve_pw(instance: str = "prod") -> str:
xcom_key: str = "log"
while True:
time.sleep(1)
# DAG Status
resp = requests.get(url, auth=auth)
resp.raise_for_status()
state = resp.json()["state"]
# Check Task instances
task_url: str = f"{url}/taskInstances"
resp = requests.get(task_url, auth=auth)
resp.raise_for_status()
instance_information: Dict[str, Any] = resp.json()["task_instances"]
for inst in instance_information:
task_id: str = inst["task_id"]
task_state: Optional[str]
if task_state := inst["state"]:
logger.info(f"{task_id} state: {task_state}")
if task_state in ("success", "failed"):
# Only pushed to XCOM at the end of each Task
xcom_url: str = (
f"{airflow_instance}/{airflow_api_endpoints['get_xcom']}"
)
xcom_url = xcom_url.format(
dag_run_id=dag_run_id,
task_id=task_id,
xcom_key=xcom_key,
)
resp = requests.get(xcom_url, auth=auth)
resp.raise_for_status()
logs: str = resp.json()["value"] # Only want to print once.

if state in ("queued", "running"):
continue
logger.info(f"DAG is {state}")
break

if state == "failed":
sys.exit(-1)
sys.exit(1)
else:
sys.exit(0)

0 comments on commit 9325a66

Please sign in to comment.