diff --git a/.github/workflows/psdag.yml b/.github/workflows/psdag.yml new file mode 100644 index 00000000..4a4e2f02 --- /dev/null +++ b/.github/workflows/psdag.yml @@ -0,0 +1,24 @@ +name: psDAG_Sync + +on: + push: + branches: [ dev ] + pull_request: + branches: [ main ] + +jobs: + deploy-dags: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Pushes to another repository + uses: cpina/github-action-push-to-another-repository@main + env: + SSH_DEPLOY_KEY: ${{ secrets.SSH_PSDAG_PUSH_KEY }} + with: + source-directory: 'workflows/airflow' + destination-github-username: 'slac-lcls' + destination-repository-name: 'psdag' + commit-message: 'LUTE DAG Update (GH Action)' + target-directory: '/lute' + target-branch: main diff --git a/config/test.yaml b/config/test.yaml index 2c4cf37f..d6d5b01d 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -113,11 +113,16 @@ Test: compound_var: int_var: 10 dict_var: {"a": "b"} + throw_error: False # Set True to test Task failure TestBinary: executable: "/sdf/home/d/dorlhiac/test_tasks/test_threads" p_arg1: 4 # Number of cores +TestBinaryErr: + executable: "/sdf/home/d/dorlhiac/test_tasks/test_threads_err" + p_arg1: 4 # Number of cores + TestSocket: array_size: 8000 # Size of arrays to send. 8000 floats ~ 6.4e4 num_arrays: 10 # Number of arrays to send. diff --git a/docs/tutorial/creating_workflows.md b/docs/tutorial/creating_workflows.md new file mode 100644 index 00000000..acd4a5c9 --- /dev/null +++ b/docs/tutorial/creating_workflows.md @@ -0,0 +1,149 @@ +# Workflows with Airflow +**Note:** Airflow uses the term **DAG**, or directed acyclic graph, to describe workflows of tasks with defined (and acyclic) connectivities. This page will use the terms workflow and DAG interchangeably. + +## Relevant Components +In addition to the core LUTE package, a number of components are generally involved to run a workflow. The current set of scripts and objects are used to interface with Airflow, and the SLURM job scheduler. The core LUTE library can also be used to run workflows using different backends, and in the future these may be supported. + +For building and running workflows using SLURM and Airflow, the following components are necessary, and will be described in more detail below: +- Airflow launch script: `launch_airflow.py` +- SLURM submission script: `submit_slurm.sh` +- Airflow operators: + - `JIDSlurmOperator` + +## Launch/Submission Scripts +## `launch_airflow.py` +Sends a request to an Airflow instance to submit a specific DAG (workflow). This script prepares an HTTP request with the appropriate parameters in a specific format. + +A request involves the following information, most of which is retrieved automatically: +```py +dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = { + "dag_run_id": str(uuid.uuid4()), + "conf": { + "experiment": os.environ.get("EXPERIMENT"), + "run_id": f"{os.environ.get('RUN_NUM')}{datetime.datetime.utcnow().isoformat()}", + "JID_UPDATE_COUNTERS": os.environ.get("JID_UPDATE_COUNTERS"), + "ARP_ROOT_JOB_ID": os.environ.get("ARP_JOB_ID"), + "ARP_LOCATION": os.environ.get("ARP_LOCATION", "S3DF"), + "Authorization": os.environ.get("Authorization"), + "user": getpass.getuser(), + "lute_params": params, + "slurm_params": extra_args, + }, +} +``` +Note that the environment variables are used to fill in the appropriate information because this script is intended to be launched primarily from the ARP (which passes these variables). The ARP allows for the launch job to be defined in the experiment eLog and submitted automatically for each new DAQ run. The environment variables `EXPERIMENT` and `RUN` can alternatively be defined prior to submitting the script on the command-line. + +The script takes a number of parameters: + +```bash +launch_airflow.py -c -w [--debug] [--test] +``` + +- `-c` refers to the path of the configuration YAML that contains the parameters for each **managed** `Task` in the requested workflow. +- `-w` is the name of the DAG (workflow) to run. By convention each DAG is named by the Python file it is defined in. (See below). +- `--debug` is an optional flag to run all steps of the workflow in debug mode for verbose logging and output. +- `--test` is an optional flag which will use the test Airflow instance. By default the script will make requests of the standard production Airflow instance. + + +## `submit_slurm.sh` +Launches a job on the S3DF batch nodes using the SLURM job scheduler. This script launches a single **managed** `Task` at a time. The usage is as follows: +```bash +submit_slurm.sh -c -t [--debug] [--SLURM_ARGS ...] +``` +As a reminder the **managed** `Task` refers to the `Executor`-`Task` combination. The script does not parse any SLURM specific parameters, and instead passes them transparently to SLURM. At least the following two SLURM arguments must be provided: +```bash +--partition=<...> # Usually partition=milano +--account=<...> # Usually account=lcls:$EXPERIMENT +``` +Generally, resource requests will also be included, such as the number of cores to use. A complete call may look like the following: +```bash +submit_slurm.sh -c /sdf/data/lcls/ds/hutch/experiment/scratch/config.yaml -t Tester --partition=milano --account=lcls:experiment --ntasks=100 [...] +``` + +When running a workflow using the `launch_airflow.py` script, each step of the workflow will be submitted using this script. + +## Operators +`Operator`s are the objects submitted as individual steps of a DAG by Airflow. They are conceptually linked to the idea of a task in that each task of a workflow is generally an operator. Care should be taken, not to confuse them with LUTE `Task`s or **managed** `Task`s though. There is, however, usually a one-to-one correspondance between a `Task` and an `Operator`. + +Airflow runs on a K8S cluster which has no access to the experiment data. When we ask Airflow to run a DAG, it will launch an `Operator` for each step of the DAG. However, the `Operator` itself cannot perform productive analysis without access to the data. The solution employed by `LUTE` is to have a limited set of `Operator`s which do not perform analysis, but instead request that a `LUTE` **managed** `Task`s be submitted on the batch nodes where it can access the data. There may be small differences between how the various provided `Operator`s do this, but in general they will all make a request to the **job interface daemon** (JID) that a new SLURM job be scheduled using the `submit_slurm.sh` script described above. + +Therefore, running a typical Airflow DAG involves the following steps: +1. `launch_airflow.py` script is submitted, usually from a definition in the eLog. +2. The `launch_airflow` script requests that Airflow run a specific DAG. +3. The Airflow instance begins submitting the `Operator`s that makeup the DAG definition. +4. Each `Operator` sends a request to the `JID` to submit a job. +5. The `JID` submits the `elog_submit.sh` script with the appropriate **managed** `Task`. +6. The **managed** `Task` runs on the batch nodes, while the `Operator`, requesting updates from the JID on job status, waits for it to complete. +7. Once a **managed** `Task` completes, the `Operator` will receieve this information and tell the Airflow server whether the job completed successfully or resulted in failure. +8. The Airflow server will then launch the next step of the DAG, and so on, until every step has been executed. + +Currently, the following `Operator`s are maintained: +- `JIDSlurmOperator`: The standard `Operator`. Each instance has a one-to-one correspondance with a LUTE **managed** `Task`. + +### `JIDSlurmOperator` arguments +- `task_id`: This is nominally the name of the task on the Airflow side. However, for simplicity this is used 1-1 to match the name of a **managed** Task defined in LUTE's `managed_tasks.py` module. I.e., it should the name of an `Executor("Task")` object which will run the specific Task of interest. This **must** match the name of a defined managed Task. +- `max_cores`: Used to cap the maximum number of cores which should be requested of SLURM. By default all jobs will run with the same number of cores, which should be specified when running the `launch_airflow.py` script (either from the ARP, or by hand). This behaviour was chosen because in general we want to increase or decrease the core-count for all Tasks uniformly, and we don't want to have to specify core number arguments for each job individually. Nonetheless, on occassion it may be necessary to cap the number of cores a specific job will use. E.g. if the default value specified when launching the Airflow DAG is multiple cores, and one job is single threaded, the core count can be capped for that single job to 1, while the rest run with multiple cores. + + +# Creating a new workflow +Defining a new workflow involves creating a **new** module (Python file) in the directory `workflows/airflow`, creating a number of `Operator` instances within the module, and then drawing the connectivity between them. At the top of the file an Airflow DAG is created and given a name. By convention all `LUTE` workflows use the name of the file as the name of the DAG. The following code can be copied exactly into the file: + +```py +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator # Import other operators if needed + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = ( + "Run SFX processing using PyAlgos peak finding and experimental phasing" +) + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(2024, 3, 18), + schedule_interval=None, + description=description, +) +``` + +Once the DAG has been created, a number of `Operator`s must be created to run the various LUTE analysis operations. As an example consider a partial SFX processing workflow which includes steps for peak finding, indexing, merging, and calculating figures of merit. Each of the 4 steps will have an `Operator` instance which will launch a corresponding `LUTE` **managed** `Task`, for example: + +```py +# Using only the JIDSlurmOperator +# syntax: JIDSlurmOperator(task_id="LuteManagedTaskName", dag=dag) # optionally, max_cores=123) +peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) + +# We specify a maximum number of cores for the rest of the jobs. +indexer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="CrystFELIndexer", dag=dag +) + +# Merge +merger: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="PartialatorMerger", dag=dag +) + +# Figures of merit +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLComparer", dag=dag +) +``` + +Finally, the dependencies between the `Operator`s are "drawn", defining the execution order of the various steps. The `>>` operator has been overloaded for the `Operator` class, allowing it to be used to specify the next step in the DAG. In this case, a completely linear DAG is drawn as: + +```py +peak_finder >> indexer >> merger >> hkl_comparer +``` + +Parallel execution can be added by using the `>>` operator multiple times. Consider a `task1` which upon successful completion starts a `task2` and `task3` in parallel. This dependency can be added to the DAG using: + +```py +#task1: JIDSlurmOperator = JIDSlurmOperator(...) +#task2 ... + +task1 >> task2 +task1 >> task3 +``` + +As each DAG is defined in pure Python, standard control structures (loops, if statements, etc.) can be used to create more complex workflow arrangements. diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py new file mode 100755 index 00000000..c82b5dcf --- /dev/null +++ b/launch_scripts/launch_airflow.py @@ -0,0 +1,108 @@ +#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.59-py3/bin/python + +"""Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG. + +This script is submitted by the ARP to the batch nodes. It triggers Airflow to +begin running the tasks of the specified directed acyclic graph (DAG). +""" + +__author__ = "Gabriel Dorlhiac" + +import os +import uuid +import getpass +import datetime +import logging +import argparse +from typing import Dict, Union, List + +import requests +from requests.auth import HTTPBasicAuth + +if __debug__: + logging.basicConfig(level=logging.DEBUG) +else: + logging.basicConfig(level=logging.INFO) + +logger: logging.Logger = logging.getLogger(__name__) + + +def _retrieve_pw(instance: str = "prod") -> str: + path: str = "/sdf/group/lcls/ds/tools/lute/airflow_{instance}.txt" + if instance == "prod" or instance == "test": + path = path.format(instance) + else: + raise ValueError('`instance` must be either "test" or "prod"!') + + with open(path, "r") as f: + pw: str = f.readline().strip() + return pw + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="trigger_airflow_lute_dag", + description="Trigger Airflow to begin executing a LUTE DAG.", + epilog="Refer to https://github.com/slac-lcls/lute for more information.", + ) + parser.add_argument("-c", "--config", type=str, help="Path to config YAML file.") + parser.add_argument("-d", "--debug", help="Run in debug mode.", action="store_true") + parser.add_argument( + "--test", help="Use test Airflow instance.", action="store_true" + ) + parser.add_argument( + "-w", "--workflow", type=str, help="Workflow to run.", default="test" + ) + + args: argparse.Namespace + extra_args: List[str] # Should contain all SLURM arguments! + args, extra_args = parser.parse_known_args() + airflow_instance: str + instance_str: str + if args.test: + airflow_instance = "http://172.24.5.190:8080/" + instance_str = "test" + else: + airflow_instance = "http://172.24.5.247:8080/" + instance_str = "prod" + + airflow_api_endpoints: Dict[str, str] = { + "health": "api/v1/health", + "run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns", + } + + resp: requests.models.Response = requests.get( + f"{airflow_instance}/{airflow_api_endpoints['health']}", + auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), + ) + resp.raise_for_status() + + params: Dict[str, Union[str, int, List[str]]] = { + "config_file": args.config, + "debug": args.debug, + } + + # Experiment, run #, and ARP env variables come from ARP submission only + dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = { + "dag_run_id": str(uuid.uuid4()), + "conf": { + "experiment": os.environ.get("EXPERIMENT"), + "run_id": f"{os.environ.get('RUN_NUM')}{datetime.datetime.utcnow().isoformat()}", + "JID_UPDATE_COUNTERS": os.environ.get("JID_UPDATE_COUNTERS"), + "ARP_ROOT_JOB_ID": os.environ.get("ARP_JOB_ID"), + "ARP_LOCATION": os.environ.get("ARP_LOCATION", "S3DF"), + "Authorization": os.environ.get("Authorization"), + "user": getpass.getuser(), + "lute_location": os.path.abspath(f"{os.path.dirname(__file__)}/.."), + "lute_params": params, + "slurm_params": extra_args, + }, + } + + resp: requests.models.Response = requests.post( + f"{airflow_instance}/{airflow_api_endpoints['run_dag']}", + json=dag_run_data, + auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), + ) + resp.raise_for_status() + logger.info(resp.text) diff --git a/launch_scripts/submit_slurm.sh b/launch_scripts/submit_slurm.sh new file mode 100755 index 00000000..baf055d6 --- /dev/null +++ b/launch_scripts/submit_slurm.sh @@ -0,0 +1,92 @@ +#!/bin/bash +usage() +{ + cat << EOF +$(basename "$0"): + Submit a LUTE managed Task using SLURM on S3DF. + Options: + -c|--config + ABSOLUTE path to the LUTE configuration YAML. Must be absolute. + -h|--help + Display this message. + -t|--taskname + Name of the LUTE managed Task to run. + + NOTE: This script does not parse SLURM arguments, but a number of them are + mandatory. All additional arguments are transparently passed to SLURM. + You will need to provide at least the queue and account using, e.g.: + --partition=milano --account=lcls: +EOF +} + +POSITIONAL=() +while [[ $# -gt 0 ]] +do + flag="$1" + + case $flag in + -c|--config) + CONFIGPATH="$2" + shift + shift + ;; + -h|--help) + usage + exit + ;; + -t|--taskname) + TASK="$2" + shift + shift + ;; + --debug) + DEBUG=1 + shift + ;; + *) + POS+=("$1") + shift + ;; + esac +done +set -- "${POS[@]}" + +if [[ -z ${CONFIGPATH} || -z ${TASK} ]]; then + echo "Path to LUTE config and Task name are required!" + usage + exit +fi + +# Assume all other arguments are for SLURM +SLURM_ARGS=$@ + +# Setup logfile names - $EXPERIMENT and $RUN will be available if ARP submitted +FORMAT_RUN=$(printf "%04d" ${RUN:-0}) +LOG_FILE="${TASK}_${EXPERIMENT:-$EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')" +SLURM_ARGS+=" --output=${LOG_FILE}.out" +SLURM_ARGS+=" --error=${LOG_FILE}.err" + +export LUTE_SOCKET="/tmp/lute_${RANDOM}.sock" + +# By default source the psana environment since most Tasks will use it. +source /sdf/group/lcls/ds/ana/sw/conda1/manage/bin/psconda.sh + +export LUTE_PATH="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd | sed s/launch_scripts//g )" +EXECUTABLE="${LUTE_PATH}run_task.py" + +if [[ ${DEBUG} ]]; then + echo "Running in debug mode - verbose logging." + CMD="python -B ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}" +else + echo "Running in standard mode." + CMD="python -OB ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}" +fi + +echo "Submitting task ${TASK}" +if [[ $DEBUG ]]; then + echo "Running ${TASK} with SLURM arguments: ${SLURM_ARGS}" + echo "Using socket ${LUTE_SOCKET}" + echo "${CMD}" +fi + +sbatch $SLURM_ARGS --wrap "${CMD}" diff --git a/lute/execution/ipc.py b/lute/execution/ipc.py index 541af8a6..ff28427f 100644 --- a/lute/execution/ipc.py +++ b/lute/execution/ipc.py @@ -197,6 +197,7 @@ def read(self, proc: subprocess.Popen) -> Message: else: contents = f"{contents} ({signal})" signal = None + return Message(contents=contents, signal=signal) def _safe_unpickle_decode(self, maybe_mixed: bytes) -> Optional[str]: @@ -401,7 +402,14 @@ def _create_socket(self) -> socket.socket: try: socket_path = os.environ["LUTE_SOCKET"] except KeyError as err: - socket_path = "/tmp/.lock.sock" + import uuid + import tempfile + + # Define a path,up and add to environment + # Executor-side always created first, Task will use the same one + socket_path = f"{tempfile.gettempdir()}/lute_{uuid.uuid4().hex}.sock" + os.environ["LUTE_SOCKET"] = socket_path + logger.debug(f"SocketCommunicator defines socket_path: {socket_path}") data_socket: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) diff --git a/lute/io/models/tests.py b/lute/io/models/tests.py index 315b90ca..089e7624 100644 --- a/lute/io/models/tests.py +++ b/lute/io/models/tests.py @@ -20,6 +20,7 @@ __all__ = [ "TestParameters", "TestBinaryParameters", + "TestBinaryErrParameters", "TestSocketParameters", "TestWriteOutputParameters", "TestReadOutputParameters", @@ -49,6 +50,7 @@ class CompoundVar(BaseModel): dict_var: Dict[str, str] = {"a": "b"} compound_var: CompoundVar + throw_error: bool = False class TestBinaryParameters(BaseBinaryParameters): @@ -56,6 +58,13 @@ class TestBinaryParameters(BaseBinaryParameters): p_arg1: int = 1 +class TestBinaryErrParameters(BaseBinaryParameters): + """Same as TestBinary, but exits with non-zero code.""" + + executable: str = "/sdf/home/d/dorlhiac/test_tasks/test_threads_err" + p_arg1: int = 1 + + class TestSocketParameters(TaskParameters): array_size: int = 10000 num_arrays: int = 10 diff --git a/lute/managed_tasks.py b/lute/managed_tasks.py index aa69a947..1ef46ada 100644 --- a/lute/managed_tasks.py +++ b/lute/managed_tasks.py @@ -7,10 +7,12 @@ ####### Tester: Executor = Executor("Test") BinaryTester: Executor = Executor("TestBinary") +BinaryErrTester = Executor("TestBinaryErr") SocketTester: Executor = Executor("TestSocket") WriteTester: Executor = Executor("TestWriteOutput") ReadTester: Executor = Executor("TestReadOutput") + # SmallData-related ################### SmallDataProducer: Executor = Executor("SubmitSMD") diff --git a/lute/tasks/task.py b/lute/tasks/task.py index 4ff7689a..7d021c5c 100644 --- a/lute/tasks/task.py +++ b/lute/tasks/task.py @@ -11,11 +11,10 @@ import time from abc import ABC, abstractmethod -from typing import Any, List, Dict, Union, Type, TextIO, Optional +from typing import Any, List, Dict, Union, Type, TextIO import os import warnings import signal -import types from ..io.models.base import ( TaskParameters, diff --git a/lute/tasks/test.py b/lute/tasks/test.py index db94f1d7..4831d35f 100644 --- a/lute/tasks/test.py +++ b/lute/tasks/test.py @@ -35,6 +35,8 @@ def _run(self) -> None: time.sleep(1) msg: Message = Message(contents=f"Test message {i}") self._report_to_executor(msg) + if self._task_parameters.throw_error: + raise RuntimeError("Testing Error!") def _post_run(self) -> None: self._result.summary = "Test Finished." diff --git a/workflows/airflow/minairflow.py b/workflows/airflow/minairflow.py new file mode 100644 index 00000000..a9d3fbbe --- /dev/null +++ b/workflows/airflow/minairflow.py @@ -0,0 +1,23 @@ +"""Test Airflow <-> JID requests. + +Minimal example to test requests from Airflow to the JID. +""" + +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import RequestOnlyOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = "Test Airflow <-> JID API" + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(1970, 1, 1), + schedule_interval=None, + description=description, +) + +requester: RequestOnlyOperator = RequestOnlyOperator(task_id="MakeRequest", dag=dag) + +requester diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py new file mode 100644 index 00000000..8ce2787a --- /dev/null +++ b/workflows/airflow/operators/jidoperators.py @@ -0,0 +1,336 @@ +"""Airflow Operators for running LUTE tasks via Airflow. + +Operators submit managed tasks to run and monitor task status. Status is +reported to Airflow which manages the execution order of a directed acyclic +graph (DAG) to determine which managed task to submit and when. + +Classes: + JIDSlurmOperator: Submits a managed task to run on S3DF batch nodes via the + job interface daemon (JID). Airflow itself has no access to data or the + file system mounted on the batch node so submission and monitoring is + done exclusively via the JID API. +""" + +__all__ = ["JIDSlurmOperator", "RequestOnlyOperator"] +__author__ = "Fred Poitevin, Murali Shankar" + +import uuid +import getpass +import time +import logging +import re +from typing import Dict, Any, Union, List, Optional + +import requests + +from airflow.models import BaseOperator +from airflow.exceptions import AirflowException +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +if __debug__: + logging.basicConfig(level=logging.DEBUG) +else: + logging.basicConfig(level=logging.INFO) + +logger: logging.Logger = logging.getLogger(__name__) + + +class RequestOnlyOperator(BaseOperator): + """This Operator makes a JID request and exits.""" + + @apply_defaults + def __init__( + self, + user: str = getpass.getuser(), + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) # Initializes self.task_id + self.user: str = user + + def execute(self, context: Dict[str, Any]) -> None: + """Method called by Airflow which submits SLURM Job via JID. + + Args: + context (Dict[str, Any]): Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + """ + # logger.info(f"Attempting to run at {self.get_location(context)}...") + logger.info(f"Attempting to run at S3DF.") + dagrun_config: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = ( + context.get("dag_run").conf + ) + jid_job_definition: Dict[str, str] = { + "_id": str(uuid.uuid4()), + "name": self.task_id, + "executable": f"myexecutable.sh", + "trigger": "MANUAL", + "location": dagrun_config.get("ARP_LOCATION", "S3DF"), + "parameters": "--partition=milano --account=lcls:data", + "run_as_user": self.user, + } + + control_doc: Dict[str, Union[str, Dict[str, str]]] = { + "_id": str(uuid.uuid4()), + "arp_root_job_id": dagrun_config.get("ARP_ROOT_JOB_ID"), + "experiment": dagrun_config.get("experiment"), + "run_num": dagrun_config.get("run_id"), + "user": dagrun_config.get("user"), + "status": "", + "tool_id": "", + "def_id": str(uuid.uuid4()), + "def": jid_job_definition, + } + + uri: str = ( + "https://psdm.slac.stanford.edu/arps3dfjid/jid/ws/{experiment}/start_job" + ) + # Endpoints have the string "{experiment}" in them + uri = uri.format(experiment=dagrun_config.get("experiment")) + auth: Any = dagrun_config.get("Authorization") + logger.info(f"Calling {uri} with {control_doc}...") + + logger.info(requests.__file__) + resp: requests.models.Response = requests.post( + uri, json=control_doc, headers={"Authorization": auth} + ) + logger.info(f"Status code: {resp.status_code}") + logger.info(requests.__file__) + + +class JIDSlurmOperator(BaseOperator): + """Airflow Operator which submits SLURM jobs through the JID.""" + + ui_color: str = "#006699" + + jid_api_location: str = "https://psdm.slac.stanford.edu/arps3dfjid/jid/ws" + """S3DF JID API location.""" + + jid_api_endpoints: Dict[str, str] = { + "start_job": "{experiment}/start_job", + "job_statuses": "job_statuses", + "job_log_file": "{experiment}/job_log_file", + } + + @apply_defaults + def __init__( + self, + user: str = getpass.getuser(), + poke_interval: float = 30.0, + max_cores: Optional[int] = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) # Initializes self.task_id + self.lute_location: str = "" + self.user: str = user + self.poke_interval: float = poke_interval + self.max_cores: Optional[int] = max_cores + + def create_control_doc( + self, context: Dict[str, Any] + ) -> Dict[str, Union[str, Dict[str, str]]]: + """Prepare the control document for job submission via the JID. + + Translates and Airflow dictionary to the representation needed by the + JID. + + Args: + context (Dict[str, Any]): Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + + Returns: + control_doc (Dict[str, Union[str, Dict[str, str]]]): JID job control + dictionary. + """ + + dagrun_config: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = ( + context.get("dag_run").conf + ) + + self.lute_location = dagrun_config.get( + "lute_location", "/sdf/group/lcls/ds/tools/lute/latest" + ) + lute_params: Dict[str, str] = dagrun_config.get("lute_params", {}) + + config_path: str = lute_params["config_file"] + # Note that task_id is from the parent class. + # When defining the Operator instances the id is assumed to match a + # managed task! + lute_param_str: str + if lute_params["debug"]: + lute_param_str = f"--taskname {self.task_id} --config {config_path} --debug" + else: + lute_param_str = f"--taskname {self.task_id} --config {config_path}" + + # slurm_params holds a List[str] + slurm_param_str: str = " ".join(dagrun_config.get("slurm_params")) + # Cap max cores used by a managed Task if that is requested + pattern: str = r"(?<=\bntasks=)\d+" + ntasks: int + try: + ntasks = int(re.findall(pattern, slurm_param_str)[0]) + except IndexError as err: # If `ntasks` not passed - 1 is default + ntasks = 1 + if self.max_cores is not None and ntasks > self.max_cores: + slurm_param_str = re.sub(pattern, f"{self.max_cores}", slurm_param_str) + + parameter_str: str = f"{lute_param_str} {slurm_param_str}" + + jid_job_definition: Dict[str, str] = { + "_id": str(uuid.uuid4()), + "name": self.task_id, + "executable": f"{self.lute_location}/launch_scripts/submit_slurm.sh", + "trigger": "MANUAL", + "location": dagrun_config.get("ARP_LOCATION", "S3DF"), + "parameters": parameter_str, + "run_as_user": self.user, + } + + control_doc: Dict[str, Union[str, Dict[str, str]]] = { + "_id": str(uuid.uuid4()), + "arp_root_job_id": dagrun_config.get("ARP_ROOT_JOB_ID"), + "experiment": dagrun_config.get("experiment"), + "run_num": dagrun_config.get("run_id"), + "user": dagrun_config.get("user"), + "status": "", + "tool_id": "", + "def_id": str(uuid.uuid4()), + "def": jid_job_definition, + } + + return control_doc + + def parse_response( + self, resp: requests.models.Response, check_for_error: List[str] + ) -> Dict[str, Any]: + """Parse a JID HTTP response. + + Args: + resp (requests.models.Response): The response object from a JID + HTTP request. + check_for_error (List[str]): A list of strings/patterns to search + for in response. Exception is raised if there are any matches. + + Returns: + value (Dict[str, Any]): Dictionary containing HTTP response value. + + Raises: + AirflowException: Raised to translate multiple errors into object + properly handled by the Airflow server. + """ + logger.info(f"{resp.status_code}: {resp.content}") + if not resp.status_code in (200,): + raise AirflowException(f"Bad response from JID {resp}: {resp.content}") + try: + json: Dict[str, Union[str, int]] = resp.json() + if not json.get("success", "") in (True,): + raise AirflowException(f"Error from JID {resp}: {resp.content}") + value: Dict[str, Any] = json.get("value") + + for pattern in check_for_error: + if pattern in value: + raise AirflowException( + f"Response failed due to string match {pattern} against response {value}" + ) + return value + except Exception as err: + raise AirflowException( + f"Response from JID not parseable, unknown error: {err}" + ) + + def rpc( + self, + endpoint: str, + control_doc: Union[ + List[Dict[str, Union[str, Dict[str, str]]]], + Dict[str, Union[str, Dict[str, str]]], + ], + context: Dict[str, Any], + check_for_error: List[str] = [], + ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: + """Submit job via JID and retrieve responses. + + Remote Procedure Call (RPC). + + Args: + endpoint (str): Which API endpoint to use. + + control_doc (Dict[str, Union[str, Dict[str, str]]]): Dictionary for + JID call. + + context (Dict[str, Any]): Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + + check_for_error (List[str]): A list of keywords to search for in a + response to indicate error conditions. Default []. + + Returns: + value (Dict[str, Any]): Dictionary containing HTTP response value. + """ + # if not self.get_location(context) in self.locations: + # raise AirflowException(f"JID location {self.get_location(context)} is not configured") + dagrun_config: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = ( + context.get("dag_run").conf + ) + experiment: str = dagrun_config.get("experiment") + auth: Any = dagrun_config.get("Authorization") + + uri: str = f"{self.jid_api_location}/{self.jid_api_endpoints[endpoint]}" + # Endpoints have the string "{experiment}" in them + uri = uri.format(experiment=experiment) + + logger.info(f"Calling {uri} with {control_doc}...") + + resp: requests.models.Response = requests.post( + uri, json=control_doc, headers={"Authorization": auth} + ) + logger.info(f" + {resp.status_code}: {resp.content.decode('utf-8')}") + + value: Dict[str, Any] = self.parse_response(resp, check_for_error) + + return value + + def execute(self, context: Dict[str, Any]) -> None: + """Method called by Airflow which submits SLURM Job via JID. + + Args: + context (Dict[str, Any]): Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + """ + # logger.info(f"Attempting to run at {self.get_location(context)}...") + logger.info(f"Attempting to run at S3DF.") + control_doc = self.create_control_doc(context) + logger.info(control_doc) + logger.info(f"{self.jid_api_location}/{self.jid_api_endpoints['start_job']}") + # start_job requires a dictionary + msg: Dict[str, Any] = self.rpc( + endpoint="start_job", control_doc=control_doc, context=context + ) + logger.info(f"JobID {msg['tool_id']} successfully submitted!") + + jobs: List[Dict[str, Any]] = [msg] + time.sleep(10) # Wait for job to queue.... FIXME + logger.info("Checking for job completion.") + while jobs[0].get("status") in ("RUNNING", "SUBMITTED"): + jobs = self.rpc( + endpoint="job_statuses", + control_doc=jobs, # job_statuses requires a list + context=context, + check_for_error=[" error: ", "Traceback"], + ) + time.sleep(self.poke_interval) + + # Logs out to xcom + out = self.rpc("job_log_file", jobs[0], context) + context["task_instance"].xcom_push(key="log", value=out) + + +class JIDPlugins(AirflowPlugin): + name = "jid_plugins" + operators = [JIDSlurmOperator] diff --git a/workflows/airflow/psocake_sfx_phasing.py b/workflows/airflow/psocake_sfx_phasing.py new file mode 100644 index 00000000..e5b733ce --- /dev/null +++ b/workflows/airflow/psocake_sfx_phasing.py @@ -0,0 +1,65 @@ +"""SFX Processing DAG (W/ Experimental Phasing) + +Runs SFX processing, beginning with the PyAlgos peak finding algorithm. +This workflow is used for data requiring experimental phasing. Do NOT use this +DAG if you are using molecular replacement. + +Note: + The task_id MUST match the managed task name when defining DAGs - it is used + by the operator to properly launch it. + + dag_id names must be unique, and they are not namespaced via folder + hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The + Airflow instance used by LUTE is currently shared by other software - DAG + IDs should always be prefixed with `lute_`. LUTE scripts should append this + internally, so a DAG "lute_test" can be triggered by asking for "test" +""" + +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = ( + "Run SFX processing using Psocake peak finding and experimental phasing" +) + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(2024, 3, 18), + schedule_interval=None, + description=description, +) + +peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPsocake", dag=dag) + +indexer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="CrystFELIndexer", dag=dag +) + +# Merge +merger: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="PartialatorMerger", dag=dag +) + +# Figures of merit +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLComparer", dag=dag +) + +# HKL conversions +hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLManipulator", dag=dag +) + +# SHELX Tasks +shelxc: JIDSlurmOperator = JIDSlurmOperator( + max_cores=20, task_id="SHELXCRunner", dag=dag +) + + +peak_finder >> indexer >> merger >> hkl_manipulator >> shelxc +merger >> hkl_comparer + +# Run summaries diff --git a/workflows/airflow/pyalgos_sfx.py b/workflows/airflow/pyalgos_sfx.py new file mode 100644 index 00000000..ddd6e165 --- /dev/null +++ b/workflows/airflow/pyalgos_sfx.py @@ -0,0 +1,63 @@ +"""SFX Processing DAG (W/ Experimental Phasing) + +Runs SFX processing, beginning with the PyAlgos peak finding algorithm. +This workflow is used for data requiring experimental phasing. Do NOT use this +DAG if you are using molecular replacement. + +Note: + The task_id MUST match the managed task name when defining DAGs - it is used + by the operator to properly launch it. + + dag_id names must be unique, and they are not namespaced via folder + hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The + Airflow instance used by LUTE is currently shared by other software - DAG + IDs should always be prefixed with `lute_`. LUTE scripts should append this + internally, so a DAG "lute_test" can be triggered by asking for "test" +""" + +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = ( + "Run SFX processing using PyAlgos peak finding and Molecular Replacement" +) + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(2024, 3, 18), + schedule_interval=None, + description=description, +) + +peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) + +indexer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="CrystFELIndexer", dag=dag +) + +# Merge +merger: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="PartialatorMerger", dag=dag +) + +# Figures of merit +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLComparer", dag=dag +) + +# HKL conversions +hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLManipulator", dag=dag +) + +# SHELX Tasks +dimple_runner: JIDSlurmOperator = JIDSlurmOperator(task_id="DimpleSolver", dag=dag) + + +peak_finder >> indexer >> merger >> hkl_manipulator >> dimple_runner +merger >> hkl_comparer + +# Run summaries diff --git a/workflows/airflow/test.py b/workflows/airflow/test.py new file mode 100644 index 00000000..a4a44014 --- /dev/null +++ b/workflows/airflow/test.py @@ -0,0 +1,50 @@ +"""Test Airflow DAG. + +Runs all managed Tasks either in sequence or parallel. + +Note: + The task_id MUST match the managed task name when defining DAGs - it is used + by the operator to properly launch it. + + dag_id names must be unique, and they are not namespaced via folder + hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The + Airflow instance used by LUTE is currently shared by other software - DAG + IDs should always be prefixed with `lute_`. LUTE scripts should append this + internally, so a DAG "lute_test" can be triggered by asking for "test" +""" + +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = "Run managed test Task in sequence and parallel" + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(1970, 1, 1), + schedule_interval=None, + description=description, +) + +tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="Tester", dag=dag) +binary_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=5, task_id="BinaryTester", dag=dag +) +binary_err_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=5, task_id="BinaryErrTester", dag=dag +) +socket_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=2, task_id="SocketTester", dag=dag +) +write_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=2, task_id="WriteTester", dag=dag +) +read_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=2, task_id="ReadTester", dag=dag +) + +tester >> binary_tester +tester >> binary_err_tester # Second Task should fail +tester >> socket_tester >> write_tester >> read_tester