From fd6ce9739059a87edc3863eaedceac790a7d3297 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 15 Apr 2024 14:56:37 -0700 Subject: [PATCH] SKL Prepare loop for querying state from Airflow in launch job. Also bump python env version --- launch_scripts/launch_airflow.py | 53 +++++++++++++++++++++++++++++--- lute/io/elog.py | 1 + 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index 9e19a96b..2c8944e2 100755 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -1,4 +1,4 @@ -#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.59-py3/bin/python +#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.60-py3/bin/python """Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG. @@ -8,12 +8,14 @@ __author__ = "Gabriel Dorlhiac" +import sys import os import uuid import getpass import datetime import logging import argparse +import time from typing import Dict, Union, List import requests @@ -69,11 +71,18 @@ def _retrieve_pw(instance: str = "prod") -> str: airflow_api_endpoints: Dict[str, str] = { "health": "api/v1/health", "run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns", + "get_tasks": f"api/v1/dags/lute_{args.workflow}/tasks", + "get_xcom": ( + f"api/v1/dags/lute_{args.workflow}/{{dag_run_id}}/taskInstances" + f"/{{task_id}}/xcomEntries/{{xcom_key}}" + ), } + pw: str = _retrieve_pw(instance_str) + auth: HTTPBasicAuth = HTTPBasicAuth("btx", pw) resp: requests.models.Response = requests.get( f"{airflow_instance}/{airflow_api_endpoints['health']}", - auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), + auth=auth, ) resp.raise_for_status() @@ -99,10 +108,44 @@ def _retrieve_pw(instance: str = "prod") -> str: }, } - resp: requests.models.Response = requests.post( + resp = requests.post( f"{airflow_instance}/{airflow_api_endpoints['run_dag']}", json=dag_run_data, - auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), + auth=auth, ) resp.raise_for_status() - logger.info(resp.text) + dag_run_id: str = dag_run_data["dag_run_id"] + logger.info(f"Submitted DAG (Workflow): {args.workflow}\nDAG_RUN_ID: {dag_run_id}") + state: str = resp.json()["state"] + logger.info(f"DAG is {state}") + + # Get Task information + resp = requests.get( + f"{airflow_instance}/{airflow_api_endpoints['get_tasks']}", + auth=auth, + ) + resp.raise_for_status() + task_ids: List[str] = [task["task_id"] for task in resp.json()["tasks"]] + task_id_str: str = ",\n\t- ".join(tid for tid in task_ids) + logger.info(f"Contains Managed Tasks (in no particular order):\n\t- {task_id_str}") + + # Enter loop for checking status + time.sleep(1) + # Same as run_dag endpoint, but needs to include the dag_run_id on the end + url: str = f"{airflow_instance}/{airflow_api_endpoints['run_dag']}/{dag_run_id}" + # Pulling logs for each Task via XCom + xcom_key: str = "log" + while True: + time.sleep(1) + resp = requests.get(url, auth=auth) + resp.raise_for_status() + state = resp.json()["state"] + if state in ("queued", "running"): + continue + logger.info(f"DAG is {state}") + break + + if state == "failed": + sys.exit(-1) + else: + sys.exit(0) diff --git a/lute/io/elog.py b/lute/io/elog.py index d010938c..cb220511 100644 --- a/lute/io/elog.py +++ b/lute/io/elog.py @@ -340,6 +340,7 @@ def _get_current_run_status(update_url: str) -> Dict[str, str]: get_url: str = update_url.replace("replace_counters", "get_counters") requests.get(get_url) + def post_elog_run_status( data: Dict[str, Union[str, int, float]], update_url: Optional[str] = None ) -> None: