Skip to content

Commit

Permalink
Merge branch 'dev' into ENH/yaml_substitutions
Browse files Browse the repository at this point in the history
  • Loading branch information
gadorlhiac committed May 2, 2024
2 parents 05716b2 + 1ba79dd commit 3b59475
Show file tree
Hide file tree
Showing 6 changed files with 739 additions and 58 deletions.
7 changes: 7 additions & 0 deletions docs/tutorial/creating_workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ In addition to the core LUTE package, a number of components are generally invol

For building and running workflows using SLURM and Airflow, the following components are necessary, and will be described in more detail below:
- Airflow launch script: `launch_airflow.py`
- This has a wrapper batch submission script: `submit_launch_airflow.sh` . When running using the ARP (from the eLog), you **MUST** use this wrapper script instead of the Python script directly.
- SLURM submission script: `submit_slurm.sh`
- Airflow operators:
- `JIDSlurmOperator`
Expand Down Expand Up @@ -45,6 +46,12 @@ launch_airflow.py -c <path_to_config_yaml> -w <workflow_name> [--debug] [--test]
- `--test` is an optional flag which will use the test Airflow instance. By default the script will make requests of the standard production Airflow instance.


**Lifetime**
This script will run for the entire duration of the **workflow (DAG)**. After making the initial request of Airflow to launch the DAG, it will enter a status update loop which will keep track of each individual job (each job runs one managed `Task`) submitted by Airflow. At the end of each job it will collect the log file, in addition to providing a few other status updates/debugging messages, and append it to its own log. This allows all logging for the entire workflow (DAG) to be inspected from an individual file. This is particularly useful when running via the eLog, because only a single log file is displayed.

### `submit_launch_airflow.sh`
This script is only necessary when running from the eLog using the ARP. The initial job submitted by the ARP can not have a duration of longer than 30 seconds, as it will then time out. As the `launch_airflow.py` job will live for the entire duration of the workflow, which is often much longer than 30 seconds, the solution was to have a wrapper which submits the `launch_airflow.py` script to run on the S3DF batch nodes. Usage of this script is identical to `launch_airflow.py`. All the arguments are passed transparently to the underlying Python script. The wrapper will simply launch a batch job using minimal resources (1 core).

## `submit_slurm.sh`
Launches a job on the S3DF batch nodes using the SLURM job scheduler. This script launches a single **managed** `Task` at a time. The usage is as follows:
```bash
Expand Down
112 changes: 102 additions & 10 deletions launch_scripts/launch_airflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.59-py3/bin/python
#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.60-py3/bin/python

"""Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG.
Expand All @@ -8,23 +8,30 @@

__author__ = "Gabriel Dorlhiac"

import sys
import os
import uuid
import getpass
import datetime
import logging
import argparse
from typing import Dict, Union, List
import time
from typing import Dict, Union, List, Optional, Any

import requests
from requests.auth import HTTPBasicAuth

# Requests, urllib have lots of debug statements. Only set level for this logger
logger: logging.Logger = logging.getLogger("Launch_Airflow")
handler: logging.Handler = logging.StreamHandler()
formatter: logging.Formatter = logging.Formatter(logging.BASIC_FORMAT)
handler.setFormatter(formatter)
logger.addHandler(handler)

if __debug__:
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

logger: logging.Logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def _retrieve_pw(instance: str = "prod") -> str:
Expand Down Expand Up @@ -69,11 +76,18 @@ def _retrieve_pw(instance: str = "prod") -> str:
airflow_api_endpoints: Dict[str, str] = {
"health": "api/v1/health",
"run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns",
"get_tasks": f"api/v1/dags/lute_{args.workflow}/tasks",
"get_xcom": ( # Need to format dag_run_id, task_id, xcom_key
f"api/v1/dags/lute_{args.workflow}/dagRuns/{{dag_run_id}}/taskInstances"
f"/{{task_id}}/xcomEntries/{{xcom_key}}"
),
}

pw: str = _retrieve_pw(instance_str)
auth: HTTPBasicAuth = HTTPBasicAuth("btx", pw)
resp: requests.models.Response = requests.get(
f"{airflow_instance}/{airflow_api_endpoints['health']}",
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
auth=auth,
)
resp.raise_for_status()

Expand All @@ -99,10 +113,88 @@ def _retrieve_pw(instance: str = "prod") -> str:
},
}

resp: requests.models.Response = requests.post(
resp = requests.post(
f"{airflow_instance}/{airflow_api_endpoints['run_dag']}",
json=dag_run_data,
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
auth=auth,
)
resp.raise_for_status()
dag_run_id: str = dag_run_data["dag_run_id"]
logger.info(f"Submitted DAG (Workflow): {args.workflow}\nDAG_RUN_ID: {dag_run_id}")
dag_state: str = resp.json()["state"]
logger.info(f"DAG is {dag_state}")

# Get Task information
resp = requests.get(
f"{airflow_instance}/{airflow_api_endpoints['get_tasks']}",
auth=auth,
)
resp.raise_for_status()
logger.info(resp.text)
task_ids: List[str] = [task["task_id"] for task in resp.json()["tasks"]]
task_id_str: str = ",\n\t- ".join(tid for tid in task_ids)
logger.info(
f"Contains Managed Tasks (alphabetical, not execution order):\n\t- {task_id_str}"
)

# Enter loop for checking status
time.sleep(1)
# Same as run_dag endpoint, but needs to include the dag_run_id on the end
url: str = f"{airflow_instance}/{airflow_api_endpoints['run_dag']}/{dag_run_id}"
# Pulling logs for each Task via XCom
xcom_key: str = "log"
completed_tasks: Dict[str, str] = {} # Remember exit status of each Task
logged_running: List[str] = [] # Keep track to only print "running" once
while True:
time.sleep(1)
# DAG Status
resp = requests.get(url, auth=auth)
resp.raise_for_status()
dag_state = resp.json()["state"]
# Check Task instances
task_url: str = f"{url}/taskInstances"
resp = requests.get(task_url, auth=auth)
resp.raise_for_status()
instance_information: Dict[str, Any] = resp.json()["task_instances"]
for inst in instance_information:
task_id: str = inst["task_id"]
task_state: Optional[str] = inst["state"]
if task_id not in completed_tasks and task_state not in (None, "scheduled"):
if task_id not in logged_running:
# Should be "running" by first time it reaches here.
# Or e.g. "upstream_failed"... Setup to skip "scheduled"
logger.info(f"{task_id} state: {task_state}")
logged_running.append(task_id)

if task_state in ("success", "failed"):
# Only pushed to XCOM at the end of each Task
xcom_url: str = (
f"{airflow_instance}/{airflow_api_endpoints['get_xcom']}"
)
xcom_url = xcom_url.format(
dag_run_id=dag_run_id,
task_id=task_id,
xcom_key=xcom_key,
)
resp = requests.get(xcom_url, auth=auth)
resp.raise_for_status()
logs: str = resp.json()["value"] # Only want to print once.
logger.info(f"Providing logs for {task_id}")
print("-" * 50, flush=True)
print(logs, flush=True)
print("-" * 50, flush=True)
logger.info(f"End of logs for {task_id}")
completed_tasks[task_id] = task_state

elif task_state in ("upstream_failed"):
# upstream_failed never launches so has no log
completed_tasks[task_id] = task_state

if dag_state in ("queued", "running"):
continue
logger.info(f"DAG exited: {dag_state}")
break

if dag_state == "failed":
sys.exit(1)
else:
sys.exit(0)
29 changes: 29 additions & 0 deletions launch_scripts/submit_launch_airflow.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

# Need to capture partition and account for SLURM
while [[ $# -gt 0 ]]
do
case "$1" in
--partition=*)
PARTITION="${1#*=}"
shift
;;
--account=*)
ACCOUNT="${1#*=}"
shift
;;
*)
POS+=("$1")
shift
;;
esac
done
set -- "${POS[@]}"

CMD="${@}"
CMD="${CMD} --partition=${PARTITION} --account=${ACCOUNT}"
echo $CMD
CMD="/sdf/group/lcls/ds/tools/lute/lute_launcher ${CMD}"
SLURM_ARGS="--partition=${PARTITION} --account=${ACCOUNT} --ntasks=1"
echo "Running ${CMD} with ${SLURM_ARGS}"
sbatch $SLURM_ARGS --wrap "${CMD}"
113 changes: 65 additions & 48 deletions lute/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from ..tasks.dataclasses import *
from ..io.models.base import TaskParameters
from ..io.db import record_analysis_db
from ..io.elog import post_elog_run_status

if __debug__:
warnings.simplefilter("default")
Expand Down Expand Up @@ -275,6 +276,27 @@ def _finalize_task(self, proc: subprocess.Popen) -> None:
"""
...

def _submit_cmd(self, executable_path: str, params: str) -> str:
"""Return a formatted command for launching Task subprocess.
May be overridden by subclasses.
Args:
executable_path (str): Path to the LUTE subprocess script.
params (str): String of formatted command-line arguments.
Returns:
cmd (str): Appropriately formatted command for this Executor.
"""
cmd: str = ""
if __debug__:
cmd = f"python -B {executable_path} {params}"
else:
cmd = f"python -OB {executable_path} {params}"

return cmd

def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
Expand All @@ -286,12 +308,7 @@ def execute_task(self) -> None:
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"

cmd: str = ""
if __debug__:
cmd = f"python -B {executable_path} {params}"
else:
cmd = f"python -OB {executable_path} {params}"

cmd: str = self._submit_cmd(executable_path, params)
proc: subprocess.Popen = self._submit_task(cmd)

while self._task_is_running(proc):
Expand All @@ -308,10 +325,12 @@ def execute_task(self) -> None:
if ret := proc.returncode:
logger.info(f"Task failed with return code: {ret}")
self._analysis_desc.task_result.task_status = TaskStatus.FAILED
self.Hooks.task_failed(self, msg=Message())
elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
# Ret code is 0, no exception was thrown, task forgot to set status
self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
logger.debug(f"Task did not change from RUNNING status. Assume COMPLETED.")
self.Hooks.task_done(self, msg=Message())
self._store_configuration()
for comm in self._communicators:
comm.clear_communicator()
Expand Down Expand Up @@ -406,22 +425,42 @@ def task_started(self: Executor, msg: Message):
f"Executor: {self._analysis_desc.task_result.task_name} started"
)
self._analysis_desc.task_result.task_status = TaskStatus.RUNNING
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "RUNNING",
}
post_elog_run_status(elog_data)

self.add_hook("task_started", task_started)

def task_failed(self: Executor, msg: Message): ...
def task_failed(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "FAILED",
}
post_elog_run_status(elog_data)

self.add_hook("task_failed", task_failed)

def task_stopped(self: Executor, msg: Message): ...
def task_stopped(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "STOPPED",
}
post_elog_run_status(elog_data)

self.add_hook("task_stopped", task_stopped)

def task_done(self: Executor, msg: Message): ...
def task_done(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)

self.add_hook("task_done", task_done)

def task_cancelled(self: Executor, msg: Message): ...
def task_cancelled(self: Executor, msg: Message):
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "CANCELLED",
}
post_elog_run_status(elog_data)

self.add_hook("task_cancelled", task_cancelled)

Expand All @@ -430,6 +469,10 @@ def task_result(self: Executor, msg: Message):
self._analysis_desc.task_result = msg.contents
logger.info(self._analysis_desc.task_result.summary)
logger.info(self._analysis_desc.task_result.task_status)
elog_data: Dict[str, str] = {
f"{self._analysis_desc.task_result.task_name} status": "COMPLETED",
}
post_elog_run_status(elog_data)

self.add_hook("task_result", task_result)

Expand Down Expand Up @@ -476,20 +519,20 @@ class MPIExecutor(Executor):
for the Executor itself.
Methods:
execute_task(): Run the task as a subprocess using `mpirun`.
_submit_cmd: Run the task as a subprocess using `mpirun`.
"""

def execute_task(self) -> None:
"""Run the requested Task as a subprocess."""
lute_path: Optional[str] = os.getenv("LUTE_PATH")
if lute_path is None:
logger.debug("Absolute path to subprocess.py not found.")
lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..")
os.environ["LUTE_PATH"] = lute_path
executable_path: str = f"{lute_path}/subprocess_task.py"
config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"]
params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}"
def _submit_cmd(self, executable_path: str, params: str) -> str:
"""Override submission command to use `mpirun`
Args:
executable_path (str): Path to the LUTE subprocess script.
params (str): String of formatted command-line arguments.
Returns:
cmd (str): Appropriately formatted command for this Executor.
"""
py_cmd: str = ""
nprocs: int = max(
int(os.environ.get("SLURM_NPROCS", len(os.sched_getaffinity(0)))) - 1, 1
Expand All @@ -501,30 +544,4 @@ def execute_task(self) -> None:
py_cmd = f"python -OB -u -m mpi4py.run {executable_path} {params}"

cmd: str = f"{mpi_cmd} {py_cmd}"
proc: subprocess.Popen = self._submit_task(cmd)

while self._task_is_running(proc):
self._task_loop(proc)
time.sleep(self._analysis_desc.poll_interval)

os.set_blocking(proc.stdout.fileno(), True)
os.set_blocking(proc.stderr.fileno(), True)

self._finalize_task(proc)
proc.stdout.close()
proc.stderr.close()
proc.wait()
if ret := proc.returncode:
logger.info(f"Task failed with return code: {ret}")
self._analysis_desc.task_result.task_status = TaskStatus.FAILED
elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING:
# Ret code is 0, no exception was thrown, task forgot to set status
self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED
logger.debug(f"Task did not change from RUNNING status. Assume COMPLETED.")
self._store_configuration()
for comm in self._communicators:
comm.clear_communicator()

if self._analysis_desc.task_result.task_status == TaskStatus.FAILED:
logger.info("Exiting after Task failure. Result recorded.")
sys.exit(-1)
return cmd
Loading

0 comments on commit 3b59475

Please sign in to comment.