Skip to content

Commit

Permalink
SKL Prepare loop for querying state from Airflow in launch job. Also …
Browse files Browse the repository at this point in the history
…bump python env version
  • Loading branch information
gadorlhiac committed Apr 15, 2024
1 parent 7b32d5e commit fd6ce97
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 5 deletions.
53 changes: 48 additions & 5 deletions launch_scripts/launch_airflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.59-py3/bin/python
#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.60-py3/bin/python

"""Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG.
Expand All @@ -8,12 +8,14 @@

__author__ = "Gabriel Dorlhiac"

import sys
import os
import uuid
import getpass
import datetime
import logging
import argparse
import time
from typing import Dict, Union, List

import requests
Expand Down Expand Up @@ -69,11 +71,18 @@ def _retrieve_pw(instance: str = "prod") -> str:
airflow_api_endpoints: Dict[str, str] = {
"health": "api/v1/health",
"run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns",
"get_tasks": f"api/v1/dags/lute_{args.workflow}/tasks",
"get_xcom": (
f"api/v1/dags/lute_{args.workflow}/{{dag_run_id}}/taskInstances"
f"/{{task_id}}/xcomEntries/{{xcom_key}}"
),
}

pw: str = _retrieve_pw(instance_str)
auth: HTTPBasicAuth = HTTPBasicAuth("btx", pw)
resp: requests.models.Response = requests.get(
f"{airflow_instance}/{airflow_api_endpoints['health']}",
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
auth=auth,
)
resp.raise_for_status()

Expand All @@ -99,10 +108,44 @@ def _retrieve_pw(instance: str = "prod") -> str:
},
}

resp: requests.models.Response = requests.post(
resp = requests.post(
f"{airflow_instance}/{airflow_api_endpoints['run_dag']}",
json=dag_run_data,
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
auth=auth,
)
resp.raise_for_status()
logger.info(resp.text)
dag_run_id: str = dag_run_data["dag_run_id"]
logger.info(f"Submitted DAG (Workflow): {args.workflow}\nDAG_RUN_ID: {dag_run_id}")
state: str = resp.json()["state"]
logger.info(f"DAG is {state}")

# Get Task information
resp = requests.get(
f"{airflow_instance}/{airflow_api_endpoints['get_tasks']}",
auth=auth,
)
resp.raise_for_status()
task_ids: List[str] = [task["task_id"] for task in resp.json()["tasks"]]
task_id_str: str = ",\n\t- ".join(tid for tid in task_ids)
logger.info(f"Contains Managed Tasks (in no particular order):\n\t- {task_id_str}")

# Enter loop for checking status
time.sleep(1)
# Same as run_dag endpoint, but needs to include the dag_run_id on the end
url: str = f"{airflow_instance}/{airflow_api_endpoints['run_dag']}/{dag_run_id}"
# Pulling logs for each Task via XCom
xcom_key: str = "log"
while True:
time.sleep(1)
resp = requests.get(url, auth=auth)
resp.raise_for_status()
state = resp.json()["state"]
if state in ("queued", "running"):
continue
logger.info(f"DAG is {state}")
break

if state == "failed":
sys.exit(-1)
else:
sys.exit(0)
1 change: 1 addition & 0 deletions lute/io/elog.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def _get_current_run_status(update_url: str) -> Dict[str, str]:
get_url: str = update_url.replace("replace_counters", "get_counters")
requests.get(get_url)


def post_elog_run_status(
data: Dict[str, Union[str, int, float]], update_url: Optional[str] = None
) -> None:
Expand Down

0 comments on commit fd6ce97

Please sign in to comment.