Skip to content

Commit

Permalink
Merge pull request #5 from gadorlhiac/ENH/airflow_integration
Browse files Browse the repository at this point in the history
ENH S3DF Airflow integration
  • Loading branch information
valmar authored Apr 5, 2024
2 parents 06128d9 + deabc5e commit 885b25b
Show file tree
Hide file tree
Showing 15 changed files with 938 additions and 3 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/psdag.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: psDAG_Sync

on:
push:
branches: [ dev ]
pull_request:
branches: [ main ]

jobs:
deploy-dags:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Pushes to another repository
uses: cpina/github-action-push-to-another-repository@main
env:
SSH_DEPLOY_KEY: ${{ secrets.SSH_PSDAG_PUSH_KEY }}
with:
source-directory: 'workflows/airflow'
destination-github-username: 'slac-lcls'
destination-repository-name: 'psdag'
commit-message: 'LUTE DAG Update (GH Action)'
target-directory: '/lute'
target-branch: main
5 changes: 5 additions & 0 deletions config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ Test:
compound_var:
int_var: 10
dict_var: {"a": "b"}
throw_error: False # Set True to test Task failure

TestBinary:
executable: "/sdf/home/d/dorlhiac/test_tasks/test_threads"
p_arg1: 4 # Number of cores

TestBinaryErr:
executable: "/sdf/home/d/dorlhiac/test_tasks/test_threads_err"
p_arg1: 4 # Number of cores

TestSocket:
array_size: 8000 # Size of arrays to send. 8000 floats ~ 6.4e4
num_arrays: 10 # Number of arrays to send.
Expand Down
149 changes: 149 additions & 0 deletions docs/tutorial/creating_workflows.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Workflows with Airflow
**Note:** Airflow uses the term **DAG**, or directed acyclic graph, to describe workflows of tasks with defined (and acyclic) connectivities. This page will use the terms workflow and DAG interchangeably.

## Relevant Components
In addition to the core LUTE package, a number of components are generally involved to run a workflow. The current set of scripts and objects are used to interface with Airflow, and the SLURM job scheduler. The core LUTE library can also be used to run workflows using different backends, and in the future these may be supported.

For building and running workflows using SLURM and Airflow, the following components are necessary, and will be described in more detail below:
- Airflow launch script: `launch_airflow.py`
- SLURM submission script: `submit_slurm.sh`
- Airflow operators:
- `JIDSlurmOperator`

## Launch/Submission Scripts
## `launch_airflow.py`
Sends a request to an Airflow instance to submit a specific DAG (workflow). This script prepares an HTTP request with the appropriate parameters in a specific format.

A request involves the following information, most of which is retrieved automatically:
```py
dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = {
"dag_run_id": str(uuid.uuid4()),
"conf": {
"experiment": os.environ.get("EXPERIMENT"),
"run_id": f"{os.environ.get('RUN_NUM')}{datetime.datetime.utcnow().isoformat()}",
"JID_UPDATE_COUNTERS": os.environ.get("JID_UPDATE_COUNTERS"),
"ARP_ROOT_JOB_ID": os.environ.get("ARP_JOB_ID"),
"ARP_LOCATION": os.environ.get("ARP_LOCATION", "S3DF"),
"Authorization": os.environ.get("Authorization"),
"user": getpass.getuser(),
"lute_params": params,
"slurm_params": extra_args,
},
}
```
Note that the environment variables are used to fill in the appropriate information because this script is intended to be launched primarily from the ARP (which passes these variables). The ARP allows for the launch job to be defined in the experiment eLog and submitted automatically for each new DAQ run. The environment variables `EXPERIMENT` and `RUN` can alternatively be defined prior to submitting the script on the command-line.

The script takes a number of parameters:

```bash
launch_airflow.py -c <path_to_config_yaml> -w <workflow_name> [--debug] [--test]
```

- `-c` refers to the path of the configuration YAML that contains the parameters for each **managed** `Task` in the requested workflow.
- `-w` is the name of the DAG (workflow) to run. By convention each DAG is named by the Python file it is defined in. (See below).
- `--debug` is an optional flag to run all steps of the workflow in debug mode for verbose logging and output.
- `--test` is an optional flag which will use the test Airflow instance. By default the script will make requests of the standard production Airflow instance.


## `submit_slurm.sh`
Launches a job on the S3DF batch nodes using the SLURM job scheduler. This script launches a single **managed** `Task` at a time. The usage is as follows:
```bash
submit_slurm.sh -c <path_to_config_yaml> -t <MANAGED_task_name> [--debug] [--SLURM_ARGS ...]
```
As a reminder the **managed** `Task` refers to the `Executor`-`Task` combination. The script does not parse any SLURM specific parameters, and instead passes them transparently to SLURM. At least the following two SLURM arguments must be provided:
```bash
--partition=<...> # Usually partition=milano
--account=<...> # Usually account=lcls:$EXPERIMENT
```
Generally, resource requests will also be included, such as the number of cores to use. A complete call may look like the following:
```bash
submit_slurm.sh -c /sdf/data/lcls/ds/hutch/experiment/scratch/config.yaml -t Tester --partition=milano --account=lcls:experiment --ntasks=100 [...]
```

When running a workflow using the `launch_airflow.py` script, each step of the workflow will be submitted using this script.

## Operators
`Operator`s are the objects submitted as individual steps of a DAG by Airflow. They are conceptually linked to the idea of a task in that each task of a workflow is generally an operator. Care should be taken, not to confuse them with LUTE `Task`s or **managed** `Task`s though. There is, however, usually a one-to-one correspondance between a `Task` and an `Operator`.

Airflow runs on a K8S cluster which has no access to the experiment data. When we ask Airflow to run a DAG, it will launch an `Operator` for each step of the DAG. However, the `Operator` itself cannot perform productive analysis without access to the data. The solution employed by `LUTE` is to have a limited set of `Operator`s which do not perform analysis, but instead request that a `LUTE` **managed** `Task`s be submitted on the batch nodes where it can access the data. There may be small differences between how the various provided `Operator`s do this, but in general they will all make a request to the **job interface daemon** (JID) that a new SLURM job be scheduled using the `submit_slurm.sh` script described above.

Therefore, running a typical Airflow DAG involves the following steps:
1. `launch_airflow.py` script is submitted, usually from a definition in the eLog.
2. The `launch_airflow` script requests that Airflow run a specific DAG.
3. The Airflow instance begins submitting the `Operator`s that makeup the DAG definition.
4. Each `Operator` sends a request to the `JID` to submit a job.
5. The `JID` submits the `elog_submit.sh` script with the appropriate **managed** `Task`.
6. The **managed** `Task` runs on the batch nodes, while the `Operator`, requesting updates from the JID on job status, waits for it to complete.
7. Once a **managed** `Task` completes, the `Operator` will receieve this information and tell the Airflow server whether the job completed successfully or resulted in failure.
8. The Airflow server will then launch the next step of the DAG, and so on, until every step has been executed.

Currently, the following `Operator`s are maintained:
- `JIDSlurmOperator`: The standard `Operator`. Each instance has a one-to-one correspondance with a LUTE **managed** `Task`.

### `JIDSlurmOperator` arguments
- `task_id`: This is nominally the name of the task on the Airflow side. However, for simplicity this is used 1-1 to match the name of a **managed** Task defined in LUTE's `managed_tasks.py` module. I.e., it should the name of an `Executor("Task")` object which will run the specific Task of interest. This **must** match the name of a defined managed Task.
- `max_cores`: Used to cap the maximum number of cores which should be requested of SLURM. By default all jobs will run with the same number of cores, which should be specified when running the `launch_airflow.py` script (either from the ARP, or by hand). This behaviour was chosen because in general we want to increase or decrease the core-count for all Tasks uniformly, and we don't want to have to specify core number arguments for each job individually. Nonetheless, on occassion it may be necessary to cap the number of cores a specific job will use. E.g. if the default value specified when launching the Airflow DAG is multiple cores, and one job is single threaded, the core count can be capped for that single job to 1, while the rest run with multiple cores.


# Creating a new workflow
Defining a new workflow involves creating a **new** module (Python file) in the directory `workflows/airflow`, creating a number of `Operator` instances within the module, and then drawing the connectivity between them. At the top of the file an Airflow DAG is created and given a name. By convention all `LUTE` workflows use the name of the file as the name of the DAG. The following code can be copied exactly into the file:

```py
from datetime import datetime
import os
from airflow import DAG
from lute.operators.jidoperators import JIDSlurmOperator # Import other operators if needed

dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}"
description: str = (
"Run SFX processing using PyAlgos peak finding and experimental phasing"
)

dag: DAG = DAG(
dag_id=dag_id,
start_date=datetime(2024, 3, 18),
schedule_interval=None,
description=description,
)
```

Once the DAG has been created, a number of `Operator`s must be created to run the various LUTE analysis operations. As an example consider a partial SFX processing workflow which includes steps for peak finding, indexing, merging, and calculating figures of merit. Each of the 4 steps will have an `Operator` instance which will launch a corresponding `LUTE` **managed** `Task`, for example:

```py
# Using only the JIDSlurmOperator
# syntax: JIDSlurmOperator(task_id="LuteManagedTaskName", dag=dag) # optionally, max_cores=123)
peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag)

# We specify a maximum number of cores for the rest of the jobs.
indexer: JIDSlurmOperator = JIDSlurmOperator(
max_cores=120, task_id="CrystFELIndexer", dag=dag
)

# Merge
merger: JIDSlurmOperator = JIDSlurmOperator(
max_cores=120, task_id="PartialatorMerger", dag=dag
)

# Figures of merit
hkl_comparer: JIDSlurmOperator = JIDSlurmOperator(
max_cores=8, task_id="HKLComparer", dag=dag
)
```

Finally, the dependencies between the `Operator`s are "drawn", defining the execution order of the various steps. The `>>` operator has been overloaded for the `Operator` class, allowing it to be used to specify the next step in the DAG. In this case, a completely linear DAG is drawn as:

```py
peak_finder >> indexer >> merger >> hkl_comparer
```

Parallel execution can be added by using the `>>` operator multiple times. Consider a `task1` which upon successful completion starts a `task2` and `task3` in parallel. This dependency can be added to the DAG using:

```py
#task1: JIDSlurmOperator = JIDSlurmOperator(...)
#task2 ...

task1 >> task2
task1 >> task3
```

As each DAG is defined in pure Python, standard control structures (loops, if statements, etc.) can be used to create more complex workflow arrangements.
108 changes: 108 additions & 0 deletions launch_scripts/launch_airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.59-py3/bin/python

"""Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG.
This script is submitted by the ARP to the batch nodes. It triggers Airflow to
begin running the tasks of the specified directed acyclic graph (DAG).
"""

__author__ = "Gabriel Dorlhiac"

import os
import uuid
import getpass
import datetime
import logging
import argparse
from typing import Dict, Union, List

import requests
from requests.auth import HTTPBasicAuth

if __debug__:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

logger: logging.Logger = logging.getLogger(__name__)


def _retrieve_pw(instance: str = "prod") -> str:
path: str = "/sdf/group/lcls/ds/tools/lute/airflow_{instance}.txt"
if instance == "prod" or instance == "test":
path = path.format(instance)
else:
raise ValueError('`instance` must be either "test" or "prod"!')

with open(path, "r") as f:
pw: str = f.readline().strip()
return pw


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="trigger_airflow_lute_dag",
description="Trigger Airflow to begin executing a LUTE DAG.",
epilog="Refer to https://github.com/slac-lcls/lute for more information.",
)
parser.add_argument("-c", "--config", type=str, help="Path to config YAML file.")
parser.add_argument("-d", "--debug", help="Run in debug mode.", action="store_true")
parser.add_argument(
"--test", help="Use test Airflow instance.", action="store_true"
)
parser.add_argument(
"-w", "--workflow", type=str, help="Workflow to run.", default="test"
)

args: argparse.Namespace
extra_args: List[str] # Should contain all SLURM arguments!
args, extra_args = parser.parse_known_args()
airflow_instance: str
instance_str: str
if args.test:
airflow_instance = "http://172.24.5.190:8080/"
instance_str = "test"
else:
airflow_instance = "http://172.24.5.247:8080/"
instance_str = "prod"

airflow_api_endpoints: Dict[str, str] = {
"health": "api/v1/health",
"run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns",
}

resp: requests.models.Response = requests.get(
f"{airflow_instance}/{airflow_api_endpoints['health']}",
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
)
resp.raise_for_status()

params: Dict[str, Union[str, int, List[str]]] = {
"config_file": args.config,
"debug": args.debug,
}

# Experiment, run #, and ARP env variables come from ARP submission only
dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = {
"dag_run_id": str(uuid.uuid4()),
"conf": {
"experiment": os.environ.get("EXPERIMENT"),
"run_id": f"{os.environ.get('RUN_NUM')}{datetime.datetime.utcnow().isoformat()}",
"JID_UPDATE_COUNTERS": os.environ.get("JID_UPDATE_COUNTERS"),
"ARP_ROOT_JOB_ID": os.environ.get("ARP_JOB_ID"),
"ARP_LOCATION": os.environ.get("ARP_LOCATION", "S3DF"),
"Authorization": os.environ.get("Authorization"),
"user": getpass.getuser(),
"lute_location": os.path.abspath(f"{os.path.dirname(__file__)}/.."),
"lute_params": params,
"slurm_params": extra_args,
},
}

resp: requests.models.Response = requests.post(
f"{airflow_instance}/{airflow_api_endpoints['run_dag']}",
json=dag_run_data,
auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)),
)
resp.raise_for_status()
logger.info(resp.text)
92 changes: 92 additions & 0 deletions launch_scripts/submit_slurm.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/bin/bash
usage()
{
cat << EOF
$(basename "$0"):
Submit a LUTE managed Task using SLURM on S3DF.
Options:
-c|--config
ABSOLUTE path to the LUTE configuration YAML. Must be absolute.
-h|--help
Display this message.
-t|--taskname
Name of the LUTE managed Task to run.
NOTE: This script does not parse SLURM arguments, but a number of them are
mandatory. All additional arguments are transparently passed to SLURM.
You will need to provide at least the queue and account using, e.g.:
--partition=milano --account=lcls:<experiment>
EOF
}

POSITIONAL=()
while [[ $# -gt 0 ]]
do
flag="$1"

case $flag in
-c|--config)
CONFIGPATH="$2"
shift
shift
;;
-h|--help)
usage
exit
;;
-t|--taskname)
TASK="$2"
shift
shift
;;
--debug)
DEBUG=1
shift
;;
*)
POS+=("$1")
shift
;;
esac
done
set -- "${POS[@]}"

if [[ -z ${CONFIGPATH} || -z ${TASK} ]]; then
echo "Path to LUTE config and Task name are required!"
usage
exit
fi

# Assume all other arguments are for SLURM
SLURM_ARGS=$@

# Setup logfile names - $EXPERIMENT and $RUN will be available if ARP submitted
FORMAT_RUN=$(printf "%04d" ${RUN:-0})
LOG_FILE="${TASK}_${EXPERIMENT:-$EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')"
SLURM_ARGS+=" --output=${LOG_FILE}.out"
SLURM_ARGS+=" --error=${LOG_FILE}.err"

export LUTE_SOCKET="/tmp/lute_${RANDOM}.sock"

# By default source the psana environment since most Tasks will use it.
source /sdf/group/lcls/ds/ana/sw/conda1/manage/bin/psconda.sh

export LUTE_PATH="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd | sed s/launch_scripts//g )"
EXECUTABLE="${LUTE_PATH}run_task.py"

if [[ ${DEBUG} ]]; then
echo "Running in debug mode - verbose logging."
CMD="python -B ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}"
else
echo "Running in standard mode."
CMD="python -OB ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}"
fi

echo "Submitting task ${TASK}"
if [[ $DEBUG ]]; then
echo "Running ${TASK} with SLURM arguments: ${SLURM_ARGS}"
echo "Using socket ${LUTE_SOCKET}"
echo "${CMD}"
fi

sbatch $SLURM_ARGS --wrap "${CMD}"
Loading

0 comments on commit 885b25b

Please sign in to comment.