From 350dada8b7bb31443ae3ce2eb1fb3d3b23897fcf Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 8 Jan 2024 13:30:57 -0800 Subject: [PATCH 01/41] SKL Rough draft for airflow integration scripts --- run_task.py | 2 +- subprocess_task.py | 2 +- workflows/airflow/operators/jidoperators.py | 204 ++++++++++++++++++++ workflows/airflow/test.py | 35 ++++ workflows/run_airflow_dag.py | 112 +++++++++++ 5 files changed, 353 insertions(+), 2 deletions(-) create mode 100644 workflows/airflow/operators/jidoperators.py create mode 100644 workflows/airflow/test.py create mode 100644 workflows/run_airflow_dag.py diff --git a/run_task.py b/run_task.py index 610b751c..b9c44c6b 100644 --- a/run_task.py +++ b/run_task.py @@ -15,7 +15,7 @@ logger: logging.Logger = logging.getLogger(__name__) parser: argparse.ArgumentParser = argparse.ArgumentParser( - prog="LUTE Managed Task", + prog="run_managed_task", description="Run a LUTE managed task.", epilog="Refer to https://github.com/slac-lcls/lute for more information.", ) diff --git a/subprocess_task.py b/subprocess_task.py index 64c6bc1a..d9c2e997 100644 --- a/subprocess_task.py +++ b/subprocess_task.py @@ -14,7 +14,7 @@ logger: logging.Logger = logging.getLogger(__name__) parser: argparse.ArgumentParser = argparse.ArgumentParser( - prog="LUTE Task", + prog="run_subprocess_task", description="Analysis Task run as a subprocess managed by a LUTE Executor.", epilog="Refer to https://github.com/slac-lcls/lute for more information.", ) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py new file mode 100644 index 00000000..1f1379cd --- /dev/null +++ b/workflows/airflow/operators/jidoperators.py @@ -0,0 +1,204 @@ +"""Airflow Operators for running LUTE tasks via Airflow. + +Operators submit managed tasks to run and monitor task status. Status is +reported to Airflow which manages the execution order of a directed acyclic +graph (DAG) to determine which managed task to submit and when. + +Classes: + JIDSlurmOperator: Submits a managed task to run on S3DF batch nodes via the + job interface daemon (JID). Airflow itself has no access to data or the + file system mounted on the batch node so submission and monitoring is + done exclusively via the JID API. +""" + +__all__ = ["JIDSlurmOperator"] +__author__ = "Fred Poitevin, Murali Shankar" + +import uuid +import getpass +import time +import requests +import logging +from typing import Dict, Any, Union, List + +from airflow.models import BaseOperator +from airflow.exceptions import AirflowException +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +if __debug__: + logging.basicConfig(level=logging.DEBUG) +else: + logging.basicConfig(level=logging.INFO) + +logger: logging.Logger = logging.getLogger(__name__) + + +class JIDSlurmOperator(BaseOperator): + """Airflow Operator which submits SLURM jobs through the JID.""" + + ui_color: str = "#006699" + + jid_api_location: str = "http://psdm.slac.stanford.edu/arps3dfjid/jid/ws" + """S3DF JID API location.""" + + jid_api_endpoints: Dict[str, str] = {} + + @apply_defaults + def __init__( + self, + lute_location: str, + user: str = getpass.getuser(), + poke_interval: float = 30.0, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.task_id: str = "" + self.lute_executable: str = f"{lute_location}/run_task.py" + self.user: str = "" + self.poke_interval: float = poke_interval + + def create_control_doc( + self, context: Dict[str, Any] + ) -> Dict[str, Union[str, Dict[str, str]]]: + """ + + Args: + context: Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + """ + + dagrun_config: Dict[str, Union[str, Dict[str, str]]] = context.get( + "dag_run" + ).conf + + command_line_params: Dict[str, str] = dagrun_config.get("parameters", {}) + + config_path: str = command_line_params["config"] + # Note that task_id is from the parent class. + # When defining the Operator instances the id is assumed to match a + # managed task! + parameter_string: str = f"--taskname {self.task_id} --config {config_path}" + + jid_job_definition: Dict[str, str] = { + "_id": str(uuid.uuid4()), + "name": self.task_id, + "executable": self.lute_executable, + "trigger": "MANUAL", + "location": "S3DF", # self.get_location(context) + "parameters": parameter_string, + "run_as_user": self.user, + } + + control_doc: Dict[str, Union[str, Dict[str, str]]] = { + "_id": str(uuid.uuid4()), + "arp_root_job_id": dagrun_config.get("ARP_ROOT_JOB_ID"), + "experiment": dagrun_config.get("experiment"), + "run_id": dagrun_config.get("run_id"), + "user": dagrun_config.get("user"), + "status": "", + "tool_id": "", + "def_id": str(uuid.uuid4()), + "def": jid_job_definition, + } + + return control_doc + + def parse_response( + self, resp: requests.models.Response, check_for_error: List[str] + ) -> Dict[str, Any]: + """Parse a JID HTTP response. + + Args: + resp (requests.models.Response): The response object from a JID + HTTP request. + check_for_error (List[str]): A list of strings/patterns to search + for in response. Exception is raised if there are any matches. + Raises: + AirflowException: Raised to translate multiple errors into object + properly handled by the Airflow server. + """ + logger.info(f"{resp.status_code}: {resp.content}") + if not resp.status_code in (200,): + raise AirflowException(f"Bad response from JID {resp}: {resp.content}") + try: + json: Dict[str, Union[str, int]] = resp.json() + if not json.get("success", "") in (True,): + raise AirflowException(f"Error from JID {resp}: {resp.content}") + value: Dict[str, Any] = json.get("value") + + for pattern in check_for_error: + if pattern in value: + raise AirflowException( + f"Response failed due to string match {pattern} against response {value}" + ) + return value + except Exception as err: + raise AirflowException( + f"Response from JID not parseable, unknown error: {err}" + ) + + def rpc( + self, + endpoint: str, + control_doc: Dict[str, Union[str, Dict[str, str]]], + context: Dict[str, Any], + check_for_error: List[str] = [], + ) -> Dict[str, Any]: + # if not self.get_location(context) in self.locations: + # raise AirflowException(f"JID location {self.get_location(context)} is not configured") + dagrun_config: Dict[str, str] = context.get("dag_run").conf + experiment: str = dagrun_config.get("experiment") + auth: Any = dagrun_config.get("Authorization") + + uri: str = f"{self.jid_api_location}/{self.jid_api_endpoints[endpoint]}" + # Endpoints have the string "{experiment}" in them + uri = uri.format(experiment=experiment) + + logger.info(f"Calling {uri} with {control_doc}...") + + resp: requests.models.Response = requests.post( + uri, json=control_doc, headers={"Authorization": auth} + ) + logger.info(f" + {resp.status_code}: {resp.content.decode('utf-8')}") + + value: Dict[str, Any] = self.parse_response(resp, check_for_error) + + return value + + def execute(self, context: Dict[str, Any]) -> None: + """Method called by Airflow which submits SLURM Job via JID.""" + # logger.info(f"Attempting to run at {self.get_location(context)}...") + logger.info(f"Attempting to run at S3DF.") + control_doc = self.create_control_doc(context) + logger.info(control_doc) + logger.info(f"{self.jid_api_location}/{self.jid_api_endpoints['start_job']}") + msg: Dict[str, Any] = self.rpc( + endpoint="start_job", control_doc=control_doc, context=context + ) + logger.info(f"JobID {msg['tool_id']} successfully submitted!") + + jobs: List[Dict[str, Any]] = [msg] + time.sleep(10) # Wait for job to queue.... FIXME + logger.info("Checking for job completion.") + while jobs[0].get("status") in ("RUNNING", "SUBMITTED"): + jobs = [ + self.rpc( + "job_statuses", + jobs[0], + context, + check_for_error=[" error: ", "Traceback"], + ) + ] + time.sleep(self.poke_interval) + + # Logs out to xcom + out = self.rpc("job_log_file", jobs[0], context) + context["task_instance"].xcom_push(key="log", value=out) + + +class JIDPlugins(AirflowPlugin): + name = "jid_plugins" + operators = [JIDSlurmOperator] diff --git a/workflows/airflow/test.py b/workflows/airflow/test.py new file mode 100644 index 00000000..4bd12ee2 --- /dev/null +++ b/workflows/airflow/test.py @@ -0,0 +1,35 @@ +"""Test Airflow DAG. + +Runs all managed Tasks either in sequence or parallel. + +Note: + The task_id MUST match the managed task name when defining DAGs - it is used + by the operator to properly launch it. + + dag_id names must be unique, and they are not namespaced via folder + hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The + Airflow instance used by LUTE is currently shared by other software - DAG + IDs should always be prefixed with `lute_`. LUTE scripts should append this + internally, so a DAG "lute_test" can be triggered by asking for "test" +""" +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = "Run managed test Task in sequence and parallel" + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(1970, 1, 1), + schedule_interval=None, + description=description, +) + +tester: JIDSlurmOperator = JIDSlurmOperator(task_id="Tester", dag=dag) +binary_tester: JIDSlurmOperator = JIDSlurmOperator(task_id="BinaryTester", dag=dag) +socket_tester: JIDSlurmOperator = JIDSlurmOperator(task_id="SocketTester", dag=dag) + +tester >> binary_tester +tester >> socket_tester diff --git a/workflows/run_airflow_dag.py b/workflows/run_airflow_dag.py new file mode 100644 index 00000000..f94cc8eb --- /dev/null +++ b/workflows/run_airflow_dag.py @@ -0,0 +1,112 @@ +#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.47-py3/bin/python + +"""Script submitted by Automated Run Processor (ARP) to trigger Airflow DAG. + +This script is submitted by the ARP to the batch nodes. It triggers Airflow to +begin running the tasks of the specified directed acyclic graph (DAG). +""" + +__author__ = "Gabriel Dorlhiac" + +import os +import uuid +import getpass +import datetime +import logging +import argparse +import requests +from typing import Dict, Union + +if __debug__: + logging.basicConfig(level=logging.DEBUG) +else: + logging.basicConfig(level=logging.INFO) + +logger: logging.Logger = logging.getLogger(__name__) + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="trigger_airflow_lute_dag", + description="Trigger Airflow to begin executing a LUTE DAG.", + epilog="Refer to https://github.com/slac-lcls/lute for more information.", + ) + parser.add_argument( + "-a", + "--account", + type=str, + help="SLURM account.", + default=f"lcls:{os.environ.get('EXPERIMENT', '')}", + ) + parser.add_argument("-c", "--config", type=str, help="Path to config YAML file.") + parser.add_argument( + "-d", "--debug", type=str, help="Run in debug mode.", action="store_true" + ) + parser.add_argument( + "-l", + "--lute_path", + type=str, + help="Path to the LUTE installation to use.", + default="/sdf/group/lcls/ds/tools/lute", + ) + parser.add_argument( + "-n", + "--ncores", + type=int, + help="Number of cores. Add an extra core for the Executor.", + default=65, + ) + parser.add_argument( + "-q", "--queue", type=str, help="SLURM queue.", default="milano" + ) + parser.add_argument( + "-r", "--reservation", type=str, help="SLURM reservation", default="" + ) + parser.add_argument( + "-w", "--workflow", type=str, help="Workflow to run.", default="test" + ) + + args: argparse.Namespace = parser.parse_args() + airflow_instance: str = "http://172.24.5.247:8080/" + + airflow_api_endpoints: Dict[str, str] = { + "health": "api/v1/health", + "run_dag": f"api/v1/dags/lute_{args.workflow}/dagRuns", + } + + resp: requests.models.Response = requests.get( + f"{airflow_instance}/{airflow_api_endpoints['health']}", + auth=HTTPBasicAuth(), # NEED AUTH SOLUTION + ) + resp.raise_for_status() + + params: Dict[str, Union[str, int]] = { + "config_file": args.config, + # "dag": f"lute_{args.workflow}", + "queue": args.queue, + "ncores": args.ncores, + "experiment": os.environ["EXPERIMENT"], + "run_num": os.environ["RUN_NUM"], + "account": args.account, + } + + dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int]]]] = { + "dag_run_id": str(uuid.uuid4()), + "conf": { + "experiment": os.environ["EXPERIMENT"], + "run_id": f"{os.environ['RUN_NUM']}{datetime.datetime.utcnow().isoformat()}", + "JID_UPDATE_COUNTERS": os.environ["JID_UPDATE_COUNTERS"], + "ARP_ROOT_JOB_ID": os.environ["ARP_JOB_ID"], + "ARP_LOCATION": "S3DF", # os.environ["ARP_LOCATION"], + "Authorization": os.environ["Authorization"], + "user": getpass.getuser(), + "parameters": params, + }, + } + + resp: requests.models.Response = requests.post( + f"{airflow_instance}/{airflow_api_endpoints['run_dag']}", + json=dag_run_data, + auth=HTTPBasicAuth(), # NEED AUTH SOLUTION + ) + resp.raise_for_status() + logger.info(resp.text) From 4c269254289c3282ff076b6388ff1435e4e8e6d1 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Wed, 21 Feb 2024 11:27:00 -0800 Subject: [PATCH 02/41] ENH Add SLURM task submission script --- launch_scripts/submit_slurm.sh | 75 ++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100755 launch_scripts/submit_slurm.sh diff --git a/launch_scripts/submit_slurm.sh b/launch_scripts/submit_slurm.sh new file mode 100755 index 00000000..04f0196a --- /dev/null +++ b/launch_scripts/submit_slurm.sh @@ -0,0 +1,75 @@ +#!/bin/bash +usage() +{ + cat << EOF +$(basename "$0"): + Submit a LUTE managed Task using SLURM on S3DF. + Options: + -c|--config + Path to the LUTE configuration YAML. + -h|--help + Display this message. + -t|--taskname + Name of the LUTE managed Task to run. +EOF +} + +POSITIONAL=() +while [[ $# -gt 0 ]] +do + flag="$1" + + case $flag in + -c|--config) + CONFIGPATH="$2" + shift + shift + ;; + -h|--help) + usage + exit + ;; + -t|--taskname) + TASK="$2" + shift + shift + ;; + --debug) + DEBUG=1 + shift + shift + ;; + *) + POS+=("$1") + shift + ;; + esac +done +set -- "${POS[@]}" + +# Assume all other arguments are for SLURM +SLURM_ARGS=$@ + +if [[ -z ${CONFIGPATH} || -z ${TASK} ]]; then + echo "Path to LUTE config amd Task name are required!" + usage + exit +fi + +# By default source the psana environment since most Tasks will use it. +if [[ $HOSTNAME =~ "sdf" ]]; then + source /sdf/group/lcls/ds/ana/sw/conda1/manage/bin/psconda.sh +fi + +export LUTE_PATH="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd | sed s/launch_scripts//g )" +EXECUTABLE="${LUTE_PATH}run_task.py" + +if [[ ${DEBUG} ]]; then + echo "Running in debug mode - verbose logging." + CMD="python -B ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}" +else + echo "Running in standard mode." + CMD="python -OB ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}" +fi + +sbatch $SLURM_ARGS --wrap "${CMD}" From 5ac8fff0372c50a3d04968ca4084ec956c6fc5f9 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Wed, 21 Feb 2024 11:45:58 -0800 Subject: [PATCH 03/41] MNT Move workflowss/run_airflow_dag.py to launch_scripts/launch_airflow.py --- .../launch_airflow.py | 62 +++++-------------- 1 file changed, 17 insertions(+), 45 deletions(-) rename workflows/run_airflow_dag.py => launch_scripts/launch_airflow.py (55%) diff --git a/workflows/run_airflow_dag.py b/launch_scripts/launch_airflow.py similarity index 55% rename from workflows/run_airflow_dag.py rename to launch_scripts/launch_airflow.py index f94cc8eb..372e7e66 100644 --- a/workflows/run_airflow_dag.py +++ b/launch_scripts/launch_airflow.py @@ -1,6 +1,6 @@ #!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.47-py3/bin/python -"""Script submitted by Automated Run Processor (ARP) to trigger Airflow DAG. +"""Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG. This script is submitted by the ARP to the batch nodes. It triggers Airflow to begin running the tasks of the specified directed acyclic graph (DAG). @@ -15,7 +15,7 @@ import logging import argparse import requests -from typing import Dict, Union +from typing import Dict, Union, List if __debug__: logging.basicConfig(level=logging.DEBUG) @@ -30,42 +30,17 @@ description="Trigger Airflow to begin executing a LUTE DAG.", epilog="Refer to https://github.com/slac-lcls/lute for more information.", ) - parser.add_argument( - "-a", - "--account", - type=str, - help="SLURM account.", - default=f"lcls:{os.environ.get('EXPERIMENT', '')}", - ) parser.add_argument("-c", "--config", type=str, help="Path to config YAML file.") parser.add_argument( "-d", "--debug", type=str, help="Run in debug mode.", action="store_true" ) - parser.add_argument( - "-l", - "--lute_path", - type=str, - help="Path to the LUTE installation to use.", - default="/sdf/group/lcls/ds/tools/lute", - ) - parser.add_argument( - "-n", - "--ncores", - type=int, - help="Number of cores. Add an extra core for the Executor.", - default=65, - ) - parser.add_argument( - "-q", "--queue", type=str, help="SLURM queue.", default="milano" - ) - parser.add_argument( - "-r", "--reservation", type=str, help="SLURM reservation", default="" - ) parser.add_argument( "-w", "--workflow", type=str, help="Workflow to run.", default="test" ) - args: argparse.Namespace = parser.parse_args() + args: argparse.Namespace + extra_args: List[str] # Should contain all SLURM arguments! + args, extra_args = parser.parse_known_args() airflow_instance: str = "http://172.24.5.247:8080/" airflow_api_endpoints: Dict[str, str] = { @@ -79,27 +54,24 @@ ) resp.raise_for_status() - params: Dict[str, Union[str, int]] = { + params: Dict[str, Union[str, int, List[str]]] = { "config_file": args.config, - # "dag": f"lute_{args.workflow}", - "queue": args.queue, - "ncores": args.ncores, - "experiment": os.environ["EXPERIMENT"], - "run_num": os.environ["RUN_NUM"], - "account": args.account, + "debug": args.debug, } - dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int]]]] = { + # Experiment, run #, and ARP env variables come from ARP submission only + dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = { "dag_run_id": str(uuid.uuid4()), "conf": { - "experiment": os.environ["EXPERIMENT"], - "run_id": f"{os.environ['RUN_NUM']}{datetime.datetime.utcnow().isoformat()}", - "JID_UPDATE_COUNTERS": os.environ["JID_UPDATE_COUNTERS"], - "ARP_ROOT_JOB_ID": os.environ["ARP_JOB_ID"], - "ARP_LOCATION": "S3DF", # os.environ["ARP_LOCATION"], - "Authorization": os.environ["Authorization"], + "experiment": os.environ.get("EXPERIMENT"), + "run_id": f"{os.environ.get('RUN_NUM')}{datetime.datetime.utcnow().isoformat()}", + "JID_UPDATE_COUNTERS": os.environ.get("JID_UPDATE_COUNTERS"), + "ARP_ROOT_JOB_ID": os.environ.get("ARP_JOB_ID"), + "ARP_LOCATION": os.environ.get("ARP_LOCATION", "S3DF"), + "Authorization": os.environ.get("Authorization"), "user": getpass.getuser(), - "parameters": params, + "lute_params": params, + "slurm_params": extra_args, }, } From 14a1dc2fdb7fe6d41a3c05547981bfc6ea9efb02 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Wed, 21 Feb 2024 11:49:21 -0800 Subject: [PATCH 04/41] SKL Begin modifying JID Operator - separate handling of SLURM and LUTE args --- launch_scripts/launch_airflow.py | 2 +- workflows/airflow/operators/jidoperators.py | 83 +++++++++++++++++---- 2 files changed, 69 insertions(+), 16 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index 372e7e66..902b609d 100644 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -39,7 +39,7 @@ ) args: argparse.Namespace - extra_args: List[str] # Should contain all SLURM arguments! + extra_args: List[str] # Should contain all SLURM arguments! args, extra_args = parser.parse_known_args() airflow_instance: str = "http://172.24.5.247:8080/" diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 1f1379cd..9a8d5756 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -42,7 +42,11 @@ class JIDSlurmOperator(BaseOperator): jid_api_location: str = "http://psdm.slac.stanford.edu/arps3dfjid/jid/ws" """S3DF JID API location.""" - jid_api_endpoints: Dict[str, str] = {} + jid_api_endpoints: Dict[str, str] = { + "start_job": "{experiment}/start_job", + "job_statuses": "job_statuses", + "job_log_file": "{experiment}/job_log_file", + } @apply_defaults def __init__( @@ -55,40 +59,57 @@ def __init__( ) -> None: super().__init__(*args, **kwargs) self.task_id: str = "" - self.lute_executable: str = f"{lute_location}/run_task.py" + self.lute_location: str = ( + f"{lute_location}/run_task.py" # switch to os.path.split(__file__)... + ) self.user: str = "" self.poke_interval: float = poke_interval def create_control_doc( self, context: Dict[str, Any] ) -> Dict[str, Union[str, Dict[str, str]]]: - """ + """Prepare the control document for job submission via the JID. + + Translates and Airflow dictionary to the representation needed by the + JID. Args: - context: Airflow dictionary object. + context (Dict[str, Any]): Airflow dictionary object. https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html contains a list of available variables and their description. + + Returns: + control_doc (Dict[str, Union[str, Dict[str, str]]]): JID job control + dictionary. """ - dagrun_config: Dict[str, Union[str, Dict[str, str]]] = context.get( - "dag_run" - ).conf + dagrun_config: Dict[ + str, Union[str, Dict[str, Union[str, int, List[str]]]] + ] = context.get("dag_run").conf - command_line_params: Dict[str, str] = dagrun_config.get("parameters", {}) + lute_params: Dict[str, str] = dagrun_config.get("lute_params", {}) - config_path: str = command_line_params["config"] + config_path: str = lute_params["config_file"] # Note that task_id is from the parent class. # When defining the Operator instances the id is assumed to match a # managed task! - parameter_string: str = f"--taskname {self.task_id} --config {config_path}" + lute_param_str: str + if lute_params["debug"]: + lute_param_str = f"--taskname {self.task_id} --config {config_path} --debug" + else: + lute_param_str = f"--taskname {self.task_id} --config {config_path}" + + # slurm_params holds a List[str] + slurm_param_str: str = " ".join(dagrun_config.get("slurm_params")) + parameter_str: str = f"{lute_param_str} {slurm_param_str}" jid_job_definition: Dict[str, str] = { "_id": str(uuid.uuid4()), "name": self.task_id, - "executable": self.lute_executable, + "executable": f"{self.lute_location}/launch_scripts/submit_slurm.sh", "trigger": "MANUAL", - "location": "S3DF", # self.get_location(context) - "parameters": parameter_string, + "location": dagrun_config.get("ARP_LOCATION", "S3DF"), + "parameters": parameter_str, "run_as_user": self.user, } @@ -116,6 +137,10 @@ def parse_response( HTTP request. check_for_error (List[str]): A list of strings/patterns to search for in response. Exception is raised if there are any matches. + + Returns: + value (Dict[str, Any]): Dictionary containing HTTP response value. + Raises: AirflowException: Raised to translate multiple errors into object properly handled by the Airflow server. @@ -147,9 +172,31 @@ def rpc( context: Dict[str, Any], check_for_error: List[str] = [], ) -> Dict[str, Any]: + """Submit job via JID and retrieve responses. + + Remote Procedure Call (RPC). + + Args: + endpoint (str): Which API endpoint to use. + + control_doc (Dict[str, Union[str, Dict[str, str]]]): Dictionary for + JID call. + + context (Dict[str, Any]): Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + + check_for_error (List[str]): A list of keywords to search for in a + response to indicate error conditions. Default []. + + Returns: + value (Dict[str, Any]): Dictionary containing HTTP response value. + """ # if not self.get_location(context) in self.locations: # raise AirflowException(f"JID location {self.get_location(context)} is not configured") - dagrun_config: Dict[str, str] = context.get("dag_run").conf + dagrun_config: Dict[ + str, Union[str, Dict[str, Union[str, int, List[str]]]] + ] = context.get("dag_run").conf experiment: str = dagrun_config.get("experiment") auth: Any = dagrun_config.get("Authorization") @@ -169,7 +216,13 @@ def rpc( return value def execute(self, context: Dict[str, Any]) -> None: - """Method called by Airflow which submits SLURM Job via JID.""" + """Method called by Airflow which submits SLURM Job via JID. + + Args: + context (Dict[str, Any]): Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + """ # logger.info(f"Attempting to run at {self.get_location(context)}...") logger.info(f"Attempting to run at S3DF.") control_doc = self.create_control_doc(context) From 3c6e51692d245047a50a29dd207cdb9fc9054ad3 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 26 Feb 2024 08:25:26 -0800 Subject: [PATCH 05/41] MNT assign user attribute to value passed by initializer --- workflows/airflow/operators/jidoperators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 9a8d5756..e217a511 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -62,7 +62,7 @@ def __init__( self.lute_location: str = ( f"{lute_location}/run_task.py" # switch to os.path.split(__file__)... ) - self.user: str = "" + self.user: str = user self.poke_interval: float = poke_interval def create_control_doc( From f20e20eec582d4d1e2c4b105670dcb8f1d7572be Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Wed, 28 Feb 2024 11:35:58 -0800 Subject: [PATCH 06/41] ENH Add option for operator to cap cores requested of SLURM --- workflows/airflow/operators/jidoperators.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index e217a511..988e8abf 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -19,7 +19,8 @@ import time import requests import logging -from typing import Dict, Any, Union, List +import re +from typing import Dict, Any, Union, List, Optional from airflow.models import BaseOperator from airflow.exceptions import AirflowException @@ -54,6 +55,7 @@ def __init__( lute_location: str, user: str = getpass.getuser(), poke_interval: float = 30.0, + max_cores: Optional[int] = None, *args, **kwargs, ) -> None: @@ -64,6 +66,7 @@ def __init__( ) self.user: str = user self.poke_interval: float = poke_interval + self.max_cores: Optional[int] = max_cores def create_control_doc( self, context: Dict[str, Any] @@ -101,6 +104,16 @@ def create_control_doc( # slurm_params holds a List[str] slurm_param_str: str = " ".join(dagrun_config.get("slurm_params")) + # Cap max cores used by a managed Task if that is requested + pattern: str = r"(?<=\bntasks=)\d+" + ntasks: int + try: + ntasks = int(re.findall(pattern, slurm_param_str)[0]) + except IndexError as err: # If `ntasks` not passed - 1 is default + ntasks = 1 + if self.max_cores is not None and ntasks > self.max_cores: + slurm_param_str = re.sub(pattern, f"{self.max_cores}", slurm_param_str) + parameter_str: str = f"{lute_param_str} {slurm_param_str}" jid_job_definition: Dict[str, str] = { From 1581f3c68846d64a357363172826158fb131bd10 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Thu, 29 Feb 2024 08:52:59 -0800 Subject: [PATCH 07/41] MNT Add support for test airflow instance --- launch_scripts/launch_airflow.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index 902b609d..2b77b338 100644 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -32,8 +32,9 @@ ) parser.add_argument("-c", "--config", type=str, help="Path to config YAML file.") parser.add_argument( - "-d", "--debug", type=str, help="Run in debug mode.", action="store_true" + "-d", "--debug", help="Run in debug mode.", action="store_true" ) + parser.add_argument("--test", help="Use test Airflow instance.", action="store_true") parser.add_argument( "-w", "--workflow", type=str, help="Workflow to run.", default="test" ) @@ -41,7 +42,11 @@ args: argparse.Namespace extra_args: List[str] # Should contain all SLURM arguments! args, extra_args = parser.parse_known_args() - airflow_instance: str = "http://172.24.5.247:8080/" + airflow_instance: str + if args.test: + airflow_instance = "http://172.24.5.190:8080/" + else: + airflow_instance = "http://172.24.5.247:8080/" airflow_api_endpoints: Dict[str, str] = { "health": "api/v1/health", From 5cdd5fa0c802b8946e88076b2c9a7c63e05bf675 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Thu, 29 Feb 2024 09:34:07 -0800 Subject: [PATCH 08/41] MNT Pull lute location from location of launch script and pass it through Airflow context --- launch_scripts/launch_airflow.py | 6 ++++-- workflows/airflow/operators/jidoperators.py | 13 ++++++------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index 2b77b338..b8f37a73 100644 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -15,6 +15,7 @@ import logging import argparse import requests +from requests.auth import HTTPBasicAuth from typing import Dict, Union, List if __debug__: @@ -31,10 +32,10 @@ epilog="Refer to https://github.com/slac-lcls/lute for more information.", ) parser.add_argument("-c", "--config", type=str, help="Path to config YAML file.") + parser.add_argument("-d", "--debug", help="Run in debug mode.", action="store_true") parser.add_argument( - "-d", "--debug", help="Run in debug mode.", action="store_true" + "--test", help="Use test Airflow instance.", action="store_true" ) - parser.add_argument("--test", help="Use test Airflow instance.", action="store_true") parser.add_argument( "-w", "--workflow", type=str, help="Workflow to run.", default="test" ) @@ -75,6 +76,7 @@ "ARP_LOCATION": os.environ.get("ARP_LOCATION", "S3DF"), "Authorization": os.environ.get("Authorization"), "user": getpass.getuser(), + "lute_location": os.path.abspath(f"{os.path.dirname(__file__)}/.."), "lute_params": params, "slurm_params": extra_args, }, diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 988e8abf..0cc3ad39 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -52,18 +52,14 @@ class JIDSlurmOperator(BaseOperator): @apply_defaults def __init__( self, - lute_location: str, user: str = getpass.getuser(), poke_interval: float = 30.0, max_cores: Optional[int] = None, *args, **kwargs, ) -> None: - super().__init__(*args, **kwargs) - self.task_id: str = "" - self.lute_location: str = ( - f"{lute_location}/run_task.py" # switch to os.path.split(__file__)... - ) + super().__init__(*args, **kwargs) # Initializes self.task_id + self.lute_location: str = "" self.user: str = user self.poke_interval: float = poke_interval self.max_cores: Optional[int] = max_cores @@ -90,6 +86,9 @@ def create_control_doc( str, Union[str, Dict[str, Union[str, int, List[str]]]] ] = context.get("dag_run").conf + self.lute_location = dagrun_config.get( + "lute_location", "/sdf/group/lcls/ds/tools/lute/latest" + ) lute_params: Dict[str, str] = dagrun_config.get("lute_params", {}) config_path: str = lute_params["config_file"] @@ -109,7 +108,7 @@ def create_control_doc( ntasks: int try: ntasks = int(re.findall(pattern, slurm_param_str)[0]) - except IndexError as err: # If `ntasks` not passed - 1 is default + except IndexError as err: # If `ntasks` not passed - 1 is default ntasks = 1 if self.max_cores is not None and ntasks > self.max_cores: slurm_param_str = re.sub(pattern, f"{self.max_cores}", slurm_param_str) From ecd10e3c12e9f59cfe6d5323bc8ba76b13763387 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Thu, 29 Feb 2024 09:34:36 -0800 Subject: [PATCH 09/41] MNT Add additional tasks to test DAG --- workflows/airflow/test.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/workflows/airflow/test.py b/workflows/airflow/test.py index 4bd12ee2..44aca9ef 100644 --- a/workflows/airflow/test.py +++ b/workflows/airflow/test.py @@ -27,9 +27,11 @@ description=description, ) -tester: JIDSlurmOperator = JIDSlurmOperator(task_id="Tester", dag=dag) -binary_tester: JIDSlurmOperator = JIDSlurmOperator(task_id="BinaryTester", dag=dag) -socket_tester: JIDSlurmOperator = JIDSlurmOperator(task_id="SocketTester", dag=dag) +tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="Tester", dag=dag) +binary_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=5, task_id="BinaryTester", dag=dag) +socket_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="SocketTester", dag=dag) +write_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="WriteTester", dag=dag) +read_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="ReadTester", dag=dag) tester >> binary_tester -tester >> socket_tester +tester >> socket_tester >> write_tester >> read_tester From 6137d632438903b1c9fb3757a23f6d202f83d7ec Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Thu, 29 Feb 2024 16:39:28 -0800 Subject: [PATCH 10/41] MNT Adjust python version and change the parameter name for JID request --- launch_scripts/launch_airflow.py | 4 ++-- workflows/airflow/operators/jidoperators.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index b8f37a73..b69929fc 100644 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -1,4 +1,4 @@ -#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.47-py3/bin/python +#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.59-py3/bin/python """Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG. @@ -16,7 +16,7 @@ import argparse import requests from requests.auth import HTTPBasicAuth -from typing import Dict, Union, List +from typing import Dict, Union, List, Any if __debug__: logging.basicConfig(level=logging.DEBUG) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 0cc3ad39..b76a3d5a 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -129,7 +129,7 @@ def create_control_doc( "_id": str(uuid.uuid4()), "arp_root_job_id": dagrun_config.get("ARP_ROOT_JOB_ID"), "experiment": dagrun_config.get("experiment"), - "run_id": dagrun_config.get("run_id"), + "run_num": dagrun_config.get("run_id"), "user": dagrun_config.get("user"), "status": "", "tool_id": "", From e3ec629bb1dfc41b698a496d308663bc27b460f3 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 08:14:58 -0800 Subject: [PATCH 11/41] BUG Debugging test workflows and minor changes to get JID requests to work. --- workflows/airflow/minairflow.py | 22 ++++++ workflows/airflow/operators/jidoperators.py | 77 +++++++++++++++++++-- workflows/airflow/test.py | 16 +++-- 3 files changed, 107 insertions(+), 8 deletions(-) create mode 100644 workflows/airflow/minairflow.py diff --git a/workflows/airflow/minairflow.py b/workflows/airflow/minairflow.py new file mode 100644 index 00000000..b86ce9f6 --- /dev/null +++ b/workflows/airflow/minairflow.py @@ -0,0 +1,22 @@ +"""Test Airflow <-> JID requests. + +Minimal example to test requests from Airflow to the JID. +""" +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import RequestOnlyOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = "Test Airflow <-> JID API" + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(1970, 1, 1), + schedule_interval=None, + description=description, +) + +requester: RequestOnlyOperator = RequestOnlyOperator(task_id="MakeRequest", dag=dag) + +requester diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index b76a3d5a..e4bbffc1 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -11,7 +11,7 @@ done exclusively via the JID API. """ -__all__ = ["JIDSlurmOperator"] +__all__ = ["JIDSlurmOperator", "RequestOnlyOperator"] __author__ = "Fred Poitevin, Murali Shankar" import uuid @@ -34,6 +34,75 @@ logger: logging.Logger = logging.getLogger(__name__) +class RequestOnlyOperator(BaseOperator): + """This Operator makes a JID request and exits.""" + + @apply_defaults + def __init__( + self, + user: str = getpass.getuser(), + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) # Initializes self.task_id + self.user: str = user + + def execute(self, context: Dict[str, Any]) -> None: + """Method called by Airflow which submits SLURM Job via JID. + + Args: + context (Dict[str, Any]): Airflow dictionary object. + https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html + contains a list of available variables and their description. + """ + # logger.info(f"Attempting to run at {self.get_location(context)}...") + logger.info(f"Attempting to run at S3DF.") + dagrun_config: Dict[ + str, Union[str, Dict[str, Union[str, int, List[str]]]] + ] = context.get("dag_run").conf + jid_job_definition: Dict[str, str] = { + "_id": str(uuid.uuid4()), + "name": self.task_id, + "executable": f"myexecutable.sh", + "trigger": "MANUAL", + "location": dagrun_config.get("ARP_LOCATION", "S3DF"), + "parameters": "--partition=milano --account=lcls:data", + "run_as_user": self.user, + } + + control_doc: Dict[str, Union[str, Dict[str, str]]] = { + "_id": str(uuid.uuid4()), + "arp_root_job_id": dagrun_config.get("ARP_ROOT_JOB_ID"), + "experiment": dagrun_config.get("experiment"), + "run_num": dagrun_config.get("run_id"), + "user": dagrun_config.get("user"), + "status": "", + "tool_id": "", + "def_id": str(uuid.uuid4()), + "def": jid_job_definition, + } + + uri: str = f"http://psdm.slac.stanford.edu/arps3dfjid/jid/ws/start_job/" + # Endpoints have the string "{experiment}" in them + auth: Any = dagrun_config.get("Authorization") + logger.info(f"Calling {uri} with {control_doc}...") + + print(requests.__file__) + resp: requests.models.Response = requests.post( + uri, json=control_doc, headers={"Authorization": auth} + ) + print(requests.__file__) + json: Dict[str, Union[str, int]] = resp.json() + if not json.get("success", "") in (True,): + raise AirflowException(f"Error from JID {resp}: {resp.content}") + value: Dict[str, Any] = json.get("value") + logger.info(f"JobID {value['tool_id']} successfully submitted!") + control_doc = self.create_control_doc(context) + logger.info(control_doc) + logger.info(f"{self.jid_api_location}/{self.jid_api_endpoints['start_job']}") + msg: Dict[str, Any] = self.rpc( + endpoint="start_job", control_doc=control_doc, context=context + ) class JIDSlurmOperator(BaseOperator): """Airflow Operator which submits SLURM jobs through the JID.""" @@ -44,9 +113,9 @@ class JIDSlurmOperator(BaseOperator): """S3DF JID API location.""" jid_api_endpoints: Dict[str, str] = { - "start_job": "{experiment}/start_job", - "job_statuses": "job_statuses", - "job_log_file": "{experiment}/job_log_file", + "start_job": "{experiment}/start_job/", + "job_statuses": "job_statuses/", + "job_log_file": "{experiment}/job_log_file/", } @apply_defaults diff --git a/workflows/airflow/test.py b/workflows/airflow/test.py index 44aca9ef..fee0e1ea 100644 --- a/workflows/airflow/test.py +++ b/workflows/airflow/test.py @@ -28,10 +28,18 @@ ) tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="Tester", dag=dag) -binary_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=5, task_id="BinaryTester", dag=dag) -socket_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="SocketTester", dag=dag) -write_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="WriteTester", dag=dag) -read_tester: JIDSlurmOperator = JIDSlurmOperator(max_cores=2, task_id="ReadTester", dag=dag) +binary_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=5, task_id="BinaryTester", dag=dag +) +socket_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=2, task_id="SocketTester", dag=dag +) +write_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=2, task_id="WriteTester", dag=dag +) +read_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=2, task_id="ReadTester", dag=dag +) tester >> binary_tester tester >> socket_tester >> write_tester >> read_tester From 298ddddf0280735d4cd5e7fffb8d0783dcc918a2 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 08:22:58 -0800 Subject: [PATCH 12/41] BUG Indentation. --- workflows/airflow/operators/jidoperators.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index e4bbffc1..5070d9c8 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -34,6 +34,7 @@ logger: logging.Logger = logging.getLogger(__name__) + class RequestOnlyOperator(BaseOperator): """This Operator makes a JID request and exits.""" @@ -44,10 +45,10 @@ def __init__( *args, **kwargs, ) -> None: - super().__init__(*args, **kwargs) # Initializes self.task_id + super().__init__(*args, **kwargs) # Initializes self.task_id self.user: str = user - def execute(self, context: Dict[str, Any]) -> None: + def execute(self, context: Dict[str, Any]) -> None: """Method called by Airflow which submits SLURM Job via JID. Args: @@ -97,12 +98,7 @@ def execute(self, context: Dict[str, Any]) -> None: raise AirflowException(f"Error from JID {resp}: {resp.content}") value: Dict[str, Any] = json.get("value") logger.info(f"JobID {value['tool_id']} successfully submitted!") - control_doc = self.create_control_doc(context) - logger.info(control_doc) - logger.info(f"{self.jid_api_location}/{self.jid_api_endpoints['start_job']}") - msg: Dict[str, Any] = self.rpc( - endpoint="start_job", control_doc=control_doc, context=context - ) + class JIDSlurmOperator(BaseOperator): """Airflow Operator which submits SLURM jobs through the JID.""" @@ -127,7 +123,7 @@ def __init__( *args, **kwargs, ) -> None: - super().__init__(*args, **kwargs) # Initializes self.task_id + super().__init__(*args, **kwargs) # Initializes self.task_id self.lute_location: str = "" self.user: str = user self.poke_interval: float = poke_interval From 008cbc34ae1c4730cf3329180465e47547f5754c Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 08:37:22 -0800 Subject: [PATCH 13/41] BUG Fix URI for minimal operator --- workflows/airflow/operators/jidoperators.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 5070d9c8..4e5d3da0 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -83,8 +83,11 @@ def execute(self, context: Dict[str, Any]) -> None: "def": jid_job_definition, } - uri: str = f"http://psdm.slac.stanford.edu/arps3dfjid/jid/ws/start_job/" + uri: str = ( + "http://psdm.slac.stanford.edu/arps3dfjid/jid/ws/{experiment}/start_job/" + ) # Endpoints have the string "{experiment}" in them + uri = uri.format(experiment=dagrun_config.get("experiment")) auth: Any = dagrun_config.get("Authorization") logger.info(f"Calling {uri} with {control_doc}...") From 5d4ffc199c400bebc49a21c822d2e3d54a476f3a Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 08:41:08 -0800 Subject: [PATCH 14/41] MNT Make minimal operator even simpler --- workflows/airflow/operators/jidoperators.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 4e5d3da0..84a24de8 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -96,11 +96,6 @@ def execute(self, context: Dict[str, Any]) -> None: uri, json=control_doc, headers={"Authorization": auth} ) print(requests.__file__) - json: Dict[str, Union[str, int]] = resp.json() - if not json.get("success", "") in (True,): - raise AirflowException(f"Error from JID {resp}: {resp.content}") - value: Dict[str, Any] = json.get("value") - logger.info(f"JobID {value['tool_id']} successfully submitted!") class JIDSlurmOperator(BaseOperator): From 961517824da5440b7782d0b88fcd3e7c139c3763 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 08:47:37 -0800 Subject: [PATCH 15/41] MNT Remove trailing slash from JID operator --- workflows/airflow/operators/jidoperators.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 84a24de8..defb771d 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -91,11 +91,12 @@ def execute(self, context: Dict[str, Any]) -> None: auth: Any = dagrun_config.get("Authorization") logger.info(f"Calling {uri} with {control_doc}...") - print(requests.__file__) + logger.info(requests.__file__) resp: requests.models.Response = requests.post( uri, json=control_doc, headers={"Authorization": auth} ) - print(requests.__file__) + logger.info(f"Status code: {resp.status_code}") + logger.info(requests.__file__) class JIDSlurmOperator(BaseOperator): @@ -107,9 +108,9 @@ class JIDSlurmOperator(BaseOperator): """S3DF JID API location.""" jid_api_endpoints: Dict[str, str] = { - "start_job": "{experiment}/start_job/", - "job_statuses": "job_statuses/", - "job_log_file": "{experiment}/job_log_file/", + "start_job": "{experiment}/start_job", + "job_statuses": "job_statuses", + "job_log_file": "{experiment}/job_log_file", } @apply_defaults From a9096277738ca2ed0632626942cd1f40c351b1ff Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 09:01:17 -0800 Subject: [PATCH 16/41] BUG remove trailing slash from JID endpoints --- workflows/airflow/operators/jidoperators.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index defb771d..7e10bec6 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -84,7 +84,7 @@ def execute(self, context: Dict[str, Any]) -> None: } uri: str = ( - "http://psdm.slac.stanford.edu/arps3dfjid/jid/ws/{experiment}/start_job/" + "http://psdm.slac.stanford.edu/arps3dfjid/jid/ws/{experiment}/start_job" ) # Endpoints have the string "{experiment}" in them uri = uri.format(experiment=dagrun_config.get("experiment")) From cf7bf3478aca05fb1675476ad466c208cb335af3 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 14:39:05 -0800 Subject: [PATCH 17/41] BUG Must use HTTPS to avoid redirects and POST to GET --- workflows/airflow/operators/jidoperators.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 7e10bec6..1f18e346 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -84,7 +84,7 @@ def execute(self, context: Dict[str, Any]) -> None: } uri: str = ( - "http://psdm.slac.stanford.edu/arps3dfjid/jid/ws/{experiment}/start_job" + "https://psdm.slac.stanford.edu/arps3dfjid/jid/ws/{experiment}/start_job" ) # Endpoints have the string "{experiment}" in them uri = uri.format(experiment=dagrun_config.get("experiment")) @@ -104,7 +104,7 @@ class JIDSlurmOperator(BaseOperator): ui_color: str = "#006699" - jid_api_location: str = "http://psdm.slac.stanford.edu/arps3dfjid/jid/ws" + jid_api_location: str = "https://psdm.slac.stanford.edu/arps3dfjid/jid/ws" """S3DF JID API location.""" jid_api_endpoints: Dict[str, str] = { From e71fb39a4e4c32501674a1a4d93d3790dd6f9fea Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 1 Mar 2024 15:20:26 -0800 Subject: [PATCH 18/41] BUG List/Dictionary issue. Need to FIX typing --- workflows/airflow/operators/jidoperators.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 1f18e346..8b575cd1 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -313,14 +313,12 @@ def execute(self, context: Dict[str, Any]) -> None: time.sleep(10) # Wait for job to queue.... FIXME logger.info("Checking for job completion.") while jobs[0].get("status") in ("RUNNING", "SUBMITTED"): - jobs = [ - self.rpc( - "job_statuses", - jobs[0], - context, - check_for_error=[" error: ", "Traceback"], - ) - ] + jobs = self.rpc( + endpoint="job_statuses", + control_doc=jobs, + context=context, + check_for_error=[" error: ", "Traceback"], + ) time.sleep(self.poke_interval) # Logs out to xcom From fb3d90900eaf467b870571a4ba8d0a872ef05732 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 4 Mar 2024 08:56:15 -0800 Subject: [PATCH 19/41] ENH Add logfile names to submit_slurm. Make executor use absolute path for subprocess.py --- launch_scripts/submit_slurm.sh | 30 +++++++++++++++++++++++------- lute/execution/executor.py | 8 +++++++- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/launch_scripts/submit_slurm.sh b/launch_scripts/submit_slurm.sh index 04f0196a..29037563 100755 --- a/launch_scripts/submit_slurm.sh +++ b/launch_scripts/submit_slurm.sh @@ -6,11 +6,16 @@ $(basename "$0"): Submit a LUTE managed Task using SLURM on S3DF. Options: -c|--config - Path to the LUTE configuration YAML. + ABSOLUTE path to the LUTE configuration YAML. Must be absolute. -h|--help Display this message. -t|--taskname Name of the LUTE managed Task to run. + + NOTE: This script does not parse SLURM arguments, but a number of them are + mandatory. All additional arguments are transparently passed to SLURM. + You will need to provide at least the queue and account using, e.g.: + --partition=milano --account=lcls: EOF } @@ -47,19 +52,28 @@ do done set -- "${POS[@]}" -# Assume all other arguments are for SLURM -SLURM_ARGS=$@ - if [[ -z ${CONFIGPATH} || -z ${TASK} ]]; then echo "Path to LUTE config amd Task name are required!" usage exit fi +# Assume all other arguments are for SLURM +SLURM_ARGS=$@ + +# Setup logfile names - $EXPERIMENT and $RUN will be available if ARP submitted +FORMAT_RUN=$(printf "%04d" ${RUN:-0}) +LOG_FILE="${TASK}_${EXPERIMENT:-EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')" +SLURM_ARGS+=" --output=${LOG_FILE}.out" +SLURM_ARGS+=" --error=${LOG_FILE}.err" +echo "Running ${TASK} with SLURM arguments: ${SLURM_ARGS}" + +export LUTE_SOCKET="/tmp/lute_${RANDOM}.sock" + +echo "Using socket ${LUTE_SOCKET}" + # By default source the psana environment since most Tasks will use it. -if [[ $HOSTNAME =~ "sdf" ]]; then - source /sdf/group/lcls/ds/ana/sw/conda1/manage/bin/psconda.sh -fi +source /sdf/group/lcls/ds/ana/sw/conda1/manage/bin/psconda.sh export LUTE_PATH="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd | sed s/launch_scripts//g )" EXECUTABLE="${LUTE_PATH}run_task.py" @@ -72,4 +86,6 @@ else CMD="python -OB ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}" fi +echo "${CMD}" + sbatch $SLURM_ARGS --wrap "${CMD}" diff --git a/lute/execution/executor.py b/lute/execution/executor.py index b2ba7b7e..56d4039f 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -259,7 +259,13 @@ def _finalize_task(self, proc: subprocess.Popen) -> None: def execute_task(self) -> None: """Run the requested Task as a subprocess.""" - executable_path: str = "subprocess_task.py" + lute_path: Optional[str] = os.getenv("LUTE_PATH") + executable_path: str + if lute_path is not None: + executable_path = f"{lute_path}/subprocess_task.py" + else: + logger.debug("Absolute path to subprocess.py not found.") + executable_path = "subprocess_task.py" config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"] params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}" From 32edcf8d1c4c276dbe1d1a63dce23175832431ec Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 4 Mar 2024 10:07:41 -0800 Subject: [PATCH 20/41] MNT Make launch script executable. Get correct RUN_NUM env var. --- launch_scripts/launch_airflow.py | 0 launch_scripts/submit_slurm.sh | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) mode change 100644 => 100755 launch_scripts/launch_airflow.py diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py old mode 100644 new mode 100755 diff --git a/launch_scripts/submit_slurm.sh b/launch_scripts/submit_slurm.sh index 29037563..8b2fa4c8 100755 --- a/launch_scripts/submit_slurm.sh +++ b/launch_scripts/submit_slurm.sh @@ -62,7 +62,7 @@ fi SLURM_ARGS=$@ # Setup logfile names - $EXPERIMENT and $RUN will be available if ARP submitted -FORMAT_RUN=$(printf "%04d" ${RUN:-0}) +FORMAT_RUN=$(printf "%04d" ${RUN_NUM:-0}) LOG_FILE="${TASK}_${EXPERIMENT:-EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')" SLURM_ARGS+=" --output=${LOG_FILE}.out" SLURM_ARGS+=" --error=${LOG_FILE}.err" From f4dc65f1fe0ddde03c7be172b5632dad2f0a4124 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 4 Mar 2024 10:36:11 -0800 Subject: [PATCH 21/41] ENH Add random socket path fallback in communicator code. Add logging and property to communicator --- lute/execution/ipc.py | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/lute/execution/ipc.py b/lute/execution/ipc.py index 82b2a416..ee673035 100644 --- a/lute/execution/ipc.py +++ b/lute/execution/ipc.py @@ -29,6 +29,8 @@ import pickle import subprocess import select +import logging +import warnings from typing import Optional, Any, Set from typing_extensions import Self from dataclasses import dataclass @@ -45,6 +47,18 @@ "TASK_RESULT", } +if __debug__: + warnings.simplefilter("default") + os.environ["PYTHONWARNINGS"] = "default" + logging.basicConfig(level=logging.DEBUG) + logging.captureWarnings(True) +else: + logging.basicConfig(level=logging.INFO) + warnings.simplefilter("ignore") + os.environ["PYTHONWARNINGS"] = "ignore" + +logger: logging.Logger = logging.getLogger(__name__) + class Party(Enum): """Identifier for which party (side/end) is using a communicator. @@ -327,7 +341,14 @@ def _create_socket(self) -> socket.socket: try: socket_path = os.environ["LUTE_SOCKET"] except KeyError as err: - socket_path = "/tmp/.lock.sock" + import uuid + import tempfile + + # Define a path,up and add to environment + # Executor-side always created first, Task will use the same one + socket_path = f"{tempfile.gettempdir()}/lute_{uuid.uuid4().hex}.sock" + os.environ["LUTE_SOCKET"] = socket_path + logger.debug(f"SocketCommunicator defines socket_path: {socket_path}") data_socket: socket.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -362,5 +383,10 @@ def _clean_up(self) -> None: if self._party == Party.EXECUTOR: os.unlink(socket_path) + @property + def socket_path(self) -> str: + socket_path: str = self._data_socket.getsockname() + return socket_path + def __exit__(self): self._clean_up() From d98878f8c57fe7b8982b2bf738f262b241e7ec21 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 4 Mar 2024 11:02:38 -0800 Subject: [PATCH 22/41] MNT Set LUTE_PATH in Executor if the env var isn't set already --- lute/execution/executor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index 56d4039f..ce9c0154 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -265,7 +265,9 @@ def execute_task(self) -> None: executable_path = f"{lute_path}/subprocess_task.py" else: logger.debug("Absolute path to subprocess.py not found.") - executable_path = "subprocess_task.py" + lute_path = os.path.abspath(f"{os.path.dirname(__file__)}/../..") + os.environ["LUTE_PATH"] = lute_path + executable_path = f"{lute_path}/subprocess_task.py" config_path: str = self._analysis_desc.task_env["LUTE_CONFIGPATH"] params: str = f"-c {config_path} -t {self._analysis_desc.task_result.task_name}" From d1a5da0f55b0da550d5394b71bf51342e8904021 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 4 Mar 2024 11:10:11 -0800 Subject: [PATCH 23/41] MNT Formatting and removing unused import --- launch_scripts/launch_airflow.py | 3 ++- lute/execution/executor.py | 1 - workflows/airflow/operators/jidoperators.py | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index b69929fc..8913e680 100755 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -14,9 +14,10 @@ import datetime import logging import argparse +from typing import Dict, Union, List, Any + import requests from requests.auth import HTTPBasicAuth -from typing import Dict, Union, List, Any if __debug__: logging.basicConfig(level=logging.DEBUG) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index ce9c0154..54fb6ecb 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -35,7 +35,6 @@ from typing import Dict, Callable, List, Union, Any, Tuple, Optional from typing_extensions import Self, TypeAlias from abc import ABC, abstractmethod -from dataclasses import dataclass import warnings import types import resource diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 8b575cd1..bcaa908c 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -17,11 +17,12 @@ import uuid import getpass import time -import requests import logging import re from typing import Dict, Any, Union, List, Optional +import requests + from airflow.models import BaseOperator from airflow.exceptions import AirflowException from airflow.plugins_manager import AirflowPlugin From aed4b767459405d347610b63456c8d579e0d3290 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 5 Mar 2024 10:10:56 -0800 Subject: [PATCH 24/41] ENH Add failed task handling --- config/test.yaml | 5 +++++ lute/execution/executor.py | 28 +++++++++++++++++++++++++++- lute/execution/ipc.py | 18 +++++++++--------- lute/io/models/tests.py | 6 ++++++ lute/managed_tasks.py | 1 + lute/tasks/test.py | 2 ++ 6 files changed, 50 insertions(+), 10 deletions(-) diff --git a/config/test.yaml b/config/test.yaml index 90884678..48c91aa4 100644 --- a/config/test.yaml +++ b/config/test.yaml @@ -113,11 +113,16 @@ Test: compound_var: int_var: 10 dict_var: {"a": "b"} + throw_error: False # Set True to test Task failure TestBinary: executable: "/sdf/home/d/dorlhiac/test_tasks/test_threads" p_arg1: 4 # Number of cores +TestBinaryErr: + executable: "/sdf/home/d/dorlhiac/test_tasks/test_threads_err" + p_arg1: 4 # Number of cores + TestSocket: array_size: 8000 # Size of arrays to send. 8000 floats ~ 6.4e4 num_arrays: 10 # Number of arrays to send. diff --git a/lute/execution/executor.py b/lute/execution/executor.py index 54fb6ecb..969f6514 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -39,6 +39,7 @@ import types import resource import copy +import re from .ipc import * from ..tasks.task import * @@ -256,6 +257,21 @@ def _finalize_task(self, proc: subprocess.Popen) -> None: """ ... + def _check_exceptions(self, msg: Message) -> bool: + """Check for exceptions in subprocess output.""" + if not isinstance(msg.contents, str): + return False + err_patterns: List[str] = ["(?s)Traceback \(most recent call last\).*Error"] + for pattern in err_patterns: + if m := re.search(pattern, msg.contents): + err: str = re.findall("[A-Za-z]*Error", msg.contents)[0] + logger.info( + f"\tTask failed with (at least) error: {err}\n\n{m.group()}" + ) + self._analysis_desc.task_result.task_status = TaskStatus.FAILED + return True + return False + def execute_task(self) -> None: """Run the requested Task as a subprocess.""" lute_path: Optional[str] = os.getenv("LUTE_PATH") @@ -288,8 +304,15 @@ def execute_task(self) -> None: self._finalize_task(proc) proc.stdout.close() proc.stderr.close() - self._store_configuration() proc.wait() + if ret := proc.returncode: + logger.info(f"Task failed with return code: {ret}") + self._analysis_desc.task_result.task_status = TaskStatus.FAILED + elif self._analysis_desc.task_result.task_status == TaskStatus.RUNNING: + # Ret code is 0, no exception was thrown, task forgot to set status + self._analysis_desc.task_result.task_status = TaskStatus.COMPLETED + logger.debug(f"Task did not change from RUNNING status. Assume COMPLETED.") + self._store_configuration() for comm in self._communicators: comm.clear_communicator() @@ -378,6 +401,7 @@ def task_started(self: Executor, msg: Message): logger.info( f"Executor: {self._analysis_desc.task_result.task_name} started" ) + self._analysis_desc.task_result.task_status = TaskStatus.RUNNING self.add_hook("task_started", task_started) @@ -417,6 +441,8 @@ def _task_loop(self, proc: subprocess.Popen) -> None: """ for communicator in self._communicators: msg: Message = communicator.read(proc) + if self._check_exceptions(msg): + break if msg.signal is not None and msg.signal.upper() in LUTE_SIGNALS: hook: Callable[[None], None] = getattr(self.Hooks, msg.signal.lower()) hook(self, msg) diff --git a/lute/execution/ipc.py b/lute/execution/ipc.py index ee673035..89d572f6 100644 --- a/lute/execution/ipc.py +++ b/lute/execution/ipc.py @@ -191,15 +191,15 @@ def read(self, proc: subprocess.Popen) -> Message: except UnicodeDecodeError as err: contents: str = pickle.loads(contents) - if signal and signal not in LUTE_SIGNALS: - # Some tasks write on stderr - # If the signal channel has "non-signal" info, add it to - # contents - if not contents: - contents = f"({signal})" - else: - contents = f"{contents} ({signal})" - signal: str = "" + if signal and signal not in LUTE_SIGNALS: + # Some tasks write on stderr + # If the signal channel has "non-signal" info, add it to + # contents + if not contents: + contents = f"({signal})" + else: + contents = f"{contents} ({signal})" + signal: str = "" return Message(contents=contents, signal=signal) def write(self, msg: Message) -> None: diff --git a/lute/io/models/tests.py b/lute/io/models/tests.py index 315b90ca..59ffc87b 100644 --- a/lute/io/models/tests.py +++ b/lute/io/models/tests.py @@ -20,6 +20,7 @@ __all__ = [ "TestParameters", "TestBinaryParameters", + "TestBinaryErrParameters", "TestSocketParameters", "TestWriteOutputParameters", "TestReadOutputParameters", @@ -49,12 +50,17 @@ class CompoundVar(BaseModel): dict_var: Dict[str, str] = {"a": "b"} compound_var: CompoundVar + throw_error: bool = False class TestBinaryParameters(BaseBinaryParameters): executable: str = "/sdf/home/d/dorlhiac/test_tasks/test_threads" p_arg1: int = 1 +class TestBinaryErrParameters(BaseBinaryParameters): + """Same as TestBinary, but exits with non-zero code.""" + executable: str = "/sdf/home/d/dorlhiac/test_tasks/test_threads_err" + p_arg1: int = 1 class TestSocketParameters(TaskParameters): array_size: int = 10000 diff --git a/lute/managed_tasks.py b/lute/managed_tasks.py index 19d80dc8..8d456d4a 100644 --- a/lute/managed_tasks.py +++ b/lute/managed_tasks.py @@ -5,6 +5,7 @@ ####### Tester = Executor("Test") BinaryTester = Executor("TestBinary") +BinaryErrTester = Executor("TestBinaryErr") SocketTester = Executor("TestSocket") WriteTester = Executor("TestWriteOutput") ReadTester = Executor("TestReadOutput") diff --git a/lute/tasks/test.py b/lute/tasks/test.py index 1b1bb5c8..ead14512 100644 --- a/lute/tasks/test.py +++ b/lute/tasks/test.py @@ -35,6 +35,8 @@ def _run(self) -> None: time.sleep(1) msg: Message = Message(contents=f"Test message {i}") self._report_to_executor(msg) + if self._task_parameters.throw_error: + raise RuntimeError("Testing Error!") def _post_run(self) -> None: self._result.summary = "Test Finished." From bbb664101f162234e0f4814e3d536005d3230f21 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 5 Mar 2024 10:39:31 -0800 Subject: [PATCH 25/41] MNT Remove regex exception catching. Unneeded if POSIX signal handler can be made to work with subprocess --- lute/execution/executor.py | 18 ------------------ lute/io/models/tests.py | 3 +++ 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index 969f6514..fcc66d0e 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -39,7 +39,6 @@ import types import resource import copy -import re from .ipc import * from ..tasks.task import * @@ -257,21 +256,6 @@ def _finalize_task(self, proc: subprocess.Popen) -> None: """ ... - def _check_exceptions(self, msg: Message) -> bool: - """Check for exceptions in subprocess output.""" - if not isinstance(msg.contents, str): - return False - err_patterns: List[str] = ["(?s)Traceback \(most recent call last\).*Error"] - for pattern in err_patterns: - if m := re.search(pattern, msg.contents): - err: str = re.findall("[A-Za-z]*Error", msg.contents)[0] - logger.info( - f"\tTask failed with (at least) error: {err}\n\n{m.group()}" - ) - self._analysis_desc.task_result.task_status = TaskStatus.FAILED - return True - return False - def execute_task(self) -> None: """Run the requested Task as a subprocess.""" lute_path: Optional[str] = os.getenv("LUTE_PATH") @@ -441,8 +425,6 @@ def _task_loop(self, proc: subprocess.Popen) -> None: """ for communicator in self._communicators: msg: Message = communicator.read(proc) - if self._check_exceptions(msg): - break if msg.signal is not None and msg.signal.upper() in LUTE_SIGNALS: hook: Callable[[None], None] = getattr(self.Hooks, msg.signal.lower()) hook(self, msg) diff --git a/lute/io/models/tests.py b/lute/io/models/tests.py index 59ffc87b..089e7624 100644 --- a/lute/io/models/tests.py +++ b/lute/io/models/tests.py @@ -57,11 +57,14 @@ class TestBinaryParameters(BaseBinaryParameters): executable: str = "/sdf/home/d/dorlhiac/test_tasks/test_threads" p_arg1: int = 1 + class TestBinaryErrParameters(BaseBinaryParameters): """Same as TestBinary, but exits with non-zero code.""" + executable: str = "/sdf/home/d/dorlhiac/test_tasks/test_threads_err" p_arg1: int = 1 + class TestSocketParameters(TaskParameters): array_size: int = 10000 num_arrays: int = 10 From 7495c9a58f2f8e1861d29ab3bf5c1d9789ef3213 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 5 Mar 2024 12:23:34 -0800 Subject: [PATCH 26/41] BUG Disable custom signal handlers for now. Breaks passing return codes from subprocess --- lute/execution/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index fcc66d0e..eaa337a6 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -476,4 +476,4 @@ def sigchld_handler(signum: _SIGNUM, frame: types.FrameType) -> None: logger.info("Task stopped.") -signal.signal(signal.SIGCHLD, sigchld_handler) +# signal.signal(signal.SIGCHLD, sigchld_handler) From 08b6a7eec650f496bbccbef6e5b3939eed1ac35e Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 5 Mar 2024 13:25:17 -0800 Subject: [PATCH 27/41] MNT Move timeout handling to subprocess.py, remove sigchld handling altogether. --- lute/execution/executor.py | 42 ++------------------------------------ lute/tasks/task.py | 25 +---------------------- subprocess_task.py | 28 ++++++++++++++++++++++++- 3 files changed, 30 insertions(+), 65 deletions(-) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index eaa337a6..a747d368 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -32,12 +32,10 @@ import time import os import signal -from typing import Dict, Callable, List, Union, Any, Tuple, Optional -from typing_extensions import Self, TypeAlias +from typing import Dict, Callable, List, Optional +from typing_extensions import Self from abc import ABC, abstractmethod import warnings -import types -import resource import copy from .ipc import * @@ -441,39 +439,3 @@ def _finalize_task(self, proc: subprocess.Popen) -> None: reporting to third party services, etc. """ self._task_loop(proc) # Perform a final read. - - -_SIGNUM: TypeAlias = Union[int, signal.Signals] -_HANDLER: TypeAlias = Union[ - Callable[[int, Union[types.FrameType, None]], Any], int, signal.Handlers, None -] - - -def get_executor(where: str) -> Optional[Executor]: - """Return the current Executor.""" - objects: Dict[str, Any] = globals() - for _, obj in objects.items(): - if isinstance(obj, Executor): - return obj - return None - - -def sigchld_handler(signum: _SIGNUM, frame: types.FrameType) -> None: - """Handle child Task suspension and resumption from outside of Executor.""" - # (pid, status, resource usage - can maybe infer errors, etc.) - ret: Tuple[int, int, resource.struct_rusage] = os.wait4( - -1, os.WUNTRACED | os.WCONTINUED - ) - if os.WIFCONTINUED(ret[1]): - executor: Optional[Executor] = get_executor(__name__) - if executor: - executor._analysis_desc.task_result.task_status = TaskStatus.RUNNING - logger.info("Task resumed.") - elif os.WIFSTOPPED(ret[1]): - executor: Optional[Executor] = get_executor(__name__) - if executor: - executor._analysis_desc.task_result.task_status = TaskStatus.STOPPED - logger.info("Task stopped.") - - -# signal.signal(signal.SIGCHLD, sigchld_handler) diff --git a/lute/tasks/task.py b/lute/tasks/task.py index 94478fed..b1df14d4 100644 --- a/lute/tasks/task.py +++ b/lute/tasks/task.py @@ -11,11 +11,10 @@ import time from abc import ABC, abstractmethod -from typing import Any, List, Dict, Union, Type, TextIO, Optional +from typing import Any, List, Dict, Union, Type, TextIO import os import warnings import signal -import types from ..io.models.base import ( TaskParameters, @@ -315,25 +314,3 @@ def _signal_start(self) -> None: signal: str = "NO_PICKLE_MODE" msg: Message = Message(signal=signal) self._report_to_executor(msg) - - -def get_task(where: str) -> Optional[Task]: - """Return the current Task.""" - objects: Dict[str, Any] = globals() - for _, obj in objects.items(): - if isinstance(obj, Task): - return obj - return None - - -def timeout_handler(signum: int, frame: types.FrameType) -> None: - """Log and exit gracefully on Task timeout.""" - task: Optional[Task] = get_task(__name__) - if task: - msg: Message = Message(contents="Timed out.", signal="TASK_FAILED") - task._report_to_executor(msg) - task.clean_up_timeout() - sys.exit(-1) - - -signal.signal(signal.SIGALRM, timeout_handler) diff --git a/subprocess_task.py b/subprocess_task.py index c04cc78b..e08f5f7e 100644 --- a/subprocess_task.py +++ b/subprocess_task.py @@ -1,12 +1,38 @@ import sys import argparse import logging -from typing import Type +import signal +import types +from typing import Type, Optional, Dict, Any +from lute.tasks.task import Task +from lute.execution.ipc import Message from lute.io.config import * from lute.io.models.base import TaskParameters from lute import tasks + +def get_task() -> Optional[Task]: + """Return the current Task.""" + objects: Dict[str, Any] = globals() + for _, obj in objects.items(): + if isinstance(obj, Task): + return obj + return None + + +def timeout_handler(signum: int, frame: types.FrameType) -> None: + """Log and exit gracefully on Task timeout.""" + task: Optional[Task] = get_task() + if task: + msg: Message = Message(contents="Timed out.", signal="TASK_FAILED") + task._report_to_executor(msg) + task.clean_up_timeout() + sys.exit(-1) + + +signal.signal(signal.SIGALRM, timeout_handler) + if __debug__: logging.basicConfig(level=logging.DEBUG) else: From 50e2e860981ed42b48dfafa78bfac6641a6f57d8 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Wed, 6 Mar 2024 08:43:37 -0800 Subject: [PATCH 28/41] BUG Remove extra shift in submit_slurm which caused script to skip an argument if --debug passed before it. Move print statements to debug block --- launch_scripts/submit_slurm.sh | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/launch_scripts/submit_slurm.sh b/launch_scripts/submit_slurm.sh index 8b2fa4c8..a29ec5a2 100755 --- a/launch_scripts/submit_slurm.sh +++ b/launch_scripts/submit_slurm.sh @@ -42,7 +42,6 @@ do --debug) DEBUG=1 shift - shift ;; *) POS+=("$1") @@ -53,7 +52,7 @@ done set -- "${POS[@]}" if [[ -z ${CONFIGPATH} || -z ${TASK} ]]; then - echo "Path to LUTE config amd Task name are required!" + echo "Path to LUTE config and Task name are required!" usage exit fi @@ -66,12 +65,9 @@ FORMAT_RUN=$(printf "%04d" ${RUN_NUM:-0}) LOG_FILE="${TASK}_${EXPERIMENT:-EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')" SLURM_ARGS+=" --output=${LOG_FILE}.out" SLURM_ARGS+=" --error=${LOG_FILE}.err" -echo "Running ${TASK} with SLURM arguments: ${SLURM_ARGS}" export LUTE_SOCKET="/tmp/lute_${RANDOM}.sock" -echo "Using socket ${LUTE_SOCKET}" - # By default source the psana environment since most Tasks will use it. source /sdf/group/lcls/ds/ana/sw/conda1/manage/bin/psconda.sh @@ -86,6 +82,11 @@ else CMD="python -OB ${EXECUTABLE} -c ${CONFIGPATH} -t ${TASK}" fi -echo "${CMD}" +echo "Submitting task ${TASK}" +if [[ $DEBUG ]]; then + echo "Running ${TASK} with SLURM arguments: ${SLURM_ARGS}" + echo "Using socket ${LUTE_SOCKET}" + echo "${CMD}" +fi sbatch $SLURM_ARGS --wrap "${CMD}" From 165c30e8b5302ae702104c15ab36ffc21711f6bb Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Wed, 6 Mar 2024 09:03:28 -0800 Subject: [PATCH 29/41] UTL Add GH Action for syncing DAGs to psdag automatically --- .github/workflows/psdag.yml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 .github/workflows/psdag.yml diff --git a/.github/workflows/psdag.yml b/.github/workflows/psdag.yml new file mode 100644 index 00000000..4a4e2f02 --- /dev/null +++ b/.github/workflows/psdag.yml @@ -0,0 +1,24 @@ +name: psDAG_Sync + +on: + push: + branches: [ dev ] + pull_request: + branches: [ main ] + +jobs: + deploy-dags: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Pushes to another repository + uses: cpina/github-action-push-to-another-repository@main + env: + SSH_DEPLOY_KEY: ${{ secrets.SSH_PSDAG_PUSH_KEY }} + with: + source-directory: 'workflows/airflow' + destination-github-username: 'slac-lcls' + destination-repository-name: 'psdag' + commit-message: 'LUTE DAG Update (GH Action)' + target-directory: '/lute' + target-branch: main From aa595ca341aaf5b2433cc66cc78caa416145c324 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Wed, 6 Mar 2024 14:40:09 -0800 Subject: [PATCH 30/41] MNT Type hints for operator. --- workflows/airflow/operators/jidoperators.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index bcaa908c..6f394b71 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -245,10 +245,13 @@ def parse_response( def rpc( self, endpoint: str, - control_doc: Dict[str, Union[str, Dict[str, str]]], + control_doc: Union[ + List[Dict[str, Union[str, Dict[str, str]]]], + Dict[str, Union[str, Dict[str, str]]], + ], context: Dict[str, Any], check_for_error: List[str] = [], - ) -> Dict[str, Any]: + ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: """Submit job via JID and retrieve responses. Remote Procedure Call (RPC). @@ -305,6 +308,7 @@ def execute(self, context: Dict[str, Any]) -> None: control_doc = self.create_control_doc(context) logger.info(control_doc) logger.info(f"{self.jid_api_location}/{self.jid_api_endpoints['start_job']}") + # start_job requires a dictionary msg: Dict[str, Any] = self.rpc( endpoint="start_job", control_doc=control_doc, context=context ) @@ -316,7 +320,7 @@ def execute(self, context: Dict[str, Any]) -> None: while jobs[0].get("status") in ("RUNNING", "SUBMITTED"): jobs = self.rpc( endpoint="job_statuses", - control_doc=jobs, + control_doc=jobs, # job_statuses requires a list context=context, check_for_error=[" error: ", "Traceback"], ) From 326b2bea408d6a0e9c423498aa929cadbf676337 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 18 Mar 2024 10:09:45 -0700 Subject: [PATCH 31/41] MNT Formatting --- lute/execution/executor.py | 45 ++++++++------------- lute/execution/ipc.py | 3 +- lute/io/db.py | 7 +++- lute/io/models/__init__.py | 1 + lute/io/models/base.py | 1 + lute/tasks/task.py | 6 +-- workflows/airflow/minairflow.py | 1 + workflows/airflow/operators/jidoperators.py | 20 ++++----- workflows/airflow/test.py | 1 + 9 files changed, 41 insertions(+), 44 deletions(-) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index a747d368..01cff27a 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -89,26 +89,19 @@ class Hooks: signal. """ - def no_pickle_mode(self: Self, msg: Message): - ... + def no_pickle_mode(self: Self, msg: Message): ... - def task_started(self: Self, msg: Message): - ... + def task_started(self: Self, msg: Message): ... - def task_failed(self: Self, msg: Message): - ... + def task_failed(self: Self, msg: Message): ... - def task_stopped(self: Self, msg: Message): - ... + def task_stopped(self: Self, msg: Message): ... - def task_done(self: Self, msg: Message): - ... + def task_done(self: Self, msg: Message): ... - def task_cancelled(self: Self, msg: Message): - ... + def task_cancelled(self: Self, msg: Message): ... - def task_result(self: Self, msg: Message): - ... + def task_result(self: Self, msg: Message): ... def __init__( self, @@ -199,13 +192,13 @@ def update_environment( if "PATH" in env: sep: str = os.pathsep if update_path == "prepend": - env[ - "PATH" - ] = f"{env['PATH']}{sep}{self._analysis_desc.task_env['PATH']}" + env["PATH"] = ( + f"{env['PATH']}{sep}{self._analysis_desc.task_env['PATH']}" + ) elif update_path == "append": - env[ - "PATH" - ] = f"{self._analysis_desc.task_env['PATH']}{sep}{env['PATH']}" + env["PATH"] = ( + f"{self._analysis_desc.task_env['PATH']}{sep}{env['PATH']}" + ) elif update_path == "overwrite": pass else: @@ -387,23 +380,19 @@ def task_started(self: Executor, msg: Message): self.add_hook("task_started", task_started) - def task_failed(self: Executor, msg: Message): - ... + def task_failed(self: Executor, msg: Message): ... self.add_hook("task_failed", task_failed) - def task_stopped(self: Executor, msg: Message): - ... + def task_stopped(self: Executor, msg: Message): ... self.add_hook("task_stopped", task_stopped) - def task_done(self: Executor, msg: Message): - ... + def task_done(self: Executor, msg: Message): ... self.add_hook("task_done", task_done) - def task_cancelled(self: Executor, msg: Message): - ... + def task_cancelled(self: Executor, msg: Message): ... self.add_hook("task_cancelled", task_cancelled) diff --git a/lute/execution/ipc.py b/lute/execution/ipc.py index 89d572f6..18013aec 100644 --- a/lute/execution/ipc.py +++ b/lute/execution/ipc.py @@ -118,8 +118,7 @@ def __repr__(self): def __enter__(self) -> Self: return self - def __exit__(self) -> None: - ... + def __exit__(self) -> None: ... def stage_communicator(self): """Alternative method for staging outside of context manager.""" diff --git a/lute/io/db.py b/lute/io/db.py index e8b058a7..42728dcf 100644 --- a/lute/io/db.py +++ b/lute/io/db.py @@ -65,7 +65,12 @@ def _cfg_to_exec_entry_cols( def _params_to_entry_cols( params: TaskParameters, -) -> Tuple[Dict[str, Any], Dict[str, str], Dict[str, Any], Dict[str, str],]: +) -> Tuple[ + Dict[str, Any], + Dict[str, str], + Dict[str, Any], + Dict[str, str], +]: """Adapts a TaskParameters object to be entered into a table. Extracts the appropriate names and types from a TaskParameters object. diff --git a/lute/io/models/__init__.py b/lute/io/models/__init__.py index f8c25c0d..1a307c94 100644 --- a/lute/io/models/__init__.py +++ b/lute/io/models/__init__.py @@ -1,4 +1,5 @@ """Pydantic models for Task parameters and configuration.""" + from .base import * from .tests import * from .smd import * diff --git a/lute/io/models/base.py b/lute/io/models/base.py index c1fd391f..1931a5bb 100644 --- a/lute/io/models/base.py +++ b/lute/io/models/base.py @@ -16,6 +16,7 @@ TemplateConfig(BaseModel): Class for holding information on where templates are stored in order to properly handle ThirdPartyParameter objects. """ + __all__ = [ "TaskParameters", "AnalysisHeader", diff --git a/lute/tasks/task.py b/lute/tasks/task.py index b1df14d4..fcd123c4 100644 --- a/lute/tasks/task.py +++ b/lute/tasks/task.py @@ -242,9 +242,9 @@ def _pre_run(self) -> None: identified. """ super()._pre_run() - full_schema: Dict[ - str, Union[str, Dict[str, Any]] - ] = self._task_parameters.schema() + full_schema: Dict[str, Union[str, Dict[str, Any]]] = ( + self._task_parameters.schema() + ) for param, value in self._task_parameters.dict().items(): # Clunky test with __dict__[param] because compound model-types are # converted to `dict`. E.g. type(value) = dict not AnalysisHeader diff --git a/workflows/airflow/minairflow.py b/workflows/airflow/minairflow.py index b86ce9f6..a9d3fbbe 100644 --- a/workflows/airflow/minairflow.py +++ b/workflows/airflow/minairflow.py @@ -2,6 +2,7 @@ Minimal example to test requests from Airflow to the JID. """ + from datetime import datetime import os from airflow import DAG diff --git a/workflows/airflow/operators/jidoperators.py b/workflows/airflow/operators/jidoperators.py index 6f394b71..8ce2787a 100644 --- a/workflows/airflow/operators/jidoperators.py +++ b/workflows/airflow/operators/jidoperators.py @@ -59,9 +59,9 @@ def execute(self, context: Dict[str, Any]) -> None: """ # logger.info(f"Attempting to run at {self.get_location(context)}...") logger.info(f"Attempting to run at S3DF.") - dagrun_config: Dict[ - str, Union[str, Dict[str, Union[str, int, List[str]]]] - ] = context.get("dag_run").conf + dagrun_config: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = ( + context.get("dag_run").conf + ) jid_job_definition: Dict[str, str] = { "_id": str(uuid.uuid4()), "name": self.task_id, @@ -147,9 +147,9 @@ def create_control_doc( dictionary. """ - dagrun_config: Dict[ - str, Union[str, Dict[str, Union[str, int, List[str]]]] - ] = context.get("dag_run").conf + dagrun_config: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = ( + context.get("dag_run").conf + ) self.lute_location = dagrun_config.get( "lute_location", "/sdf/group/lcls/ds/tools/lute/latest" @@ -274,9 +274,9 @@ def rpc( """ # if not self.get_location(context) in self.locations: # raise AirflowException(f"JID location {self.get_location(context)} is not configured") - dagrun_config: Dict[ - str, Union[str, Dict[str, Union[str, int, List[str]]]] - ] = context.get("dag_run").conf + dagrun_config: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = ( + context.get("dag_run").conf + ) experiment: str = dagrun_config.get("experiment") auth: Any = dagrun_config.get("Authorization") @@ -320,7 +320,7 @@ def execute(self, context: Dict[str, Any]) -> None: while jobs[0].get("status") in ("RUNNING", "SUBMITTED"): jobs = self.rpc( endpoint="job_statuses", - control_doc=jobs, # job_statuses requires a list + control_doc=jobs, # job_statuses requires a list context=context, check_for_error=[" error: ", "Traceback"], ) diff --git a/workflows/airflow/test.py b/workflows/airflow/test.py index fee0e1ea..16f5feb7 100644 --- a/workflows/airflow/test.py +++ b/workflows/airflow/test.py @@ -12,6 +12,7 @@ IDs should always be prefixed with `lute_`. LUTE scripts should append this internally, so a DAG "lute_test" can be triggered by asking for "test" """ + from datetime import datetime import os from airflow import DAG From cedcf4ecd62729f2d759dab854a8c1b97710cc4e Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 18 Mar 2024 10:25:06 -0700 Subject: [PATCH 32/41] MNT Remove duplication missed in merge conflict resolution --- subprocess_task.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/subprocess_task.py b/subprocess_task.py index 5ec86aa4..5e6b1a34 100644 --- a/subprocess_task.py +++ b/subprocess_task.py @@ -13,28 +13,6 @@ from lute.io.models.base import TaskParameters, BaseBinaryParameters -def get_task() -> Optional[Task]: - """Return the current Task.""" - objects: Dict[str, Any] = globals() - for _, obj in objects.items(): - if isinstance(obj, Task): - return obj - return None - - -def timeout_handler(signum: int, frame: types.FrameType) -> None: - """Log and exit gracefully on Task timeout.""" - task: Optional[Task] = get_task() - if task: - msg: Message = Message(contents="Timed out.", signal="TASK_FAILED") - task._report_to_executor(msg) - task.clean_up_timeout() - sys.exit(-1) - - -signal.signal(signal.SIGALRM, timeout_handler) - - def get_task() -> Optional[Task]: """Return the current Task.""" objects: Dict[str, Any] = globals() From 139d691ace7d576503e4f4d321a9a3db9cc146c6 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 18 Mar 2024 10:31:50 -0700 Subject: [PATCH 33/41] ENH Add a failing task to the test dag --- workflows/airflow/test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/workflows/airflow/test.py b/workflows/airflow/test.py index 16f5feb7..a4a44014 100644 --- a/workflows/airflow/test.py +++ b/workflows/airflow/test.py @@ -32,6 +32,9 @@ binary_tester: JIDSlurmOperator = JIDSlurmOperator( max_cores=5, task_id="BinaryTester", dag=dag ) +binary_err_tester: JIDSlurmOperator = JIDSlurmOperator( + max_cores=5, task_id="BinaryErrTester", dag=dag +) socket_tester: JIDSlurmOperator = JIDSlurmOperator( max_cores=2, task_id="SocketTester", dag=dag ) @@ -43,4 +46,5 @@ ) tester >> binary_tester +tester >> binary_err_tester # Second Task should fail tester >> socket_tester >> write_tester >> read_tester From fceffe1d412e7508813e1fe8138ec2977794fc83 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 18 Mar 2024 10:32:09 -0700 Subject: [PATCH 34/41] MNT Formatting. --- lute/execution/executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lute/execution/executor.py b/lute/execution/executor.py index f0408e6f..43f3f495 100644 --- a/lute/execution/executor.py +++ b/lute/execution/executor.py @@ -519,4 +519,3 @@ def execute_task(self) -> None: self._store_configuration() for comm in self._communicators: comm.clear_communicator() - From 687e6c34aac8cd522b1390a4c095deee095eb565 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 18 Mar 2024 14:08:17 -0700 Subject: [PATCH 35/41] SKL Begin DAG for SFX processing using PyAlgos peak finder, and experimental phasing --- workflows/airflow/pyalgos_sfx_phasing.py | 53 ++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 workflows/airflow/pyalgos_sfx_phasing.py diff --git a/workflows/airflow/pyalgos_sfx_phasing.py b/workflows/airflow/pyalgos_sfx_phasing.py new file mode 100644 index 00000000..c9217a44 --- /dev/null +++ b/workflows/airflow/pyalgos_sfx_phasing.py @@ -0,0 +1,53 @@ +"""SFX Processing DAG (W/ Experimental Phasing) + +Runs SFX processing, beginning with the PyAlgos peak finding algorithm. +This workflow is used for data requiring experimental phasing. Do NOT use this +DAG if you are using molecular replacement. + +Note: + The task_id MUST match the managed task name when defining DAGs - it is used + by the operator to properly launch it. + + dag_id names must be unique, and they are not namespaced via folder + hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The + Airflow instance used by LUTE is currently shared by other software - DAG + IDs should always be prefixed with `lute_`. LUTE scripts should append this + internally, so a DAG "lute_test" can be triggered by asking for "test" +""" + +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = "Run SFX processing using PyAlgos peak finding and experimental phasing" + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(2024, 3, 18), + schedule_interval=None, + description=description, +) + +peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) + +indexer: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="CrystFELIndexer", dag=dag) + +# Merge +merger: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="PartialatorMerger", dag=dag) + +# Figures of merit +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLComparer", dag=dag) + +# HKL conversions +hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLManipulator", dag=dag) + +# SHELX Tasks +shelxc: JIDSlurmOperator = JIDSlurmOperator(max_cores=20, task_id="SHELXCRunner", dag=dag) + + +peak_finder >> indexer >> merger >> hkl_manipulator >> shelxc +merger >> hkl_comparer + +# Run summaries From 1f6d81a3d44dc83fc7464a38363afa0adb5bf7c9 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 18 Mar 2024 14:21:44 -0700 Subject: [PATCH 36/41] ENH Authentication workaround until LDAP is working. Requires launcher binary on S3DF --- launch_scripts/launch_airflow.py | 22 +++++++++++++++++++--- workflows/airflow/pyalgos_sfx_phasing.py | 24 ++++++++++++++++++------ 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/launch_scripts/launch_airflow.py b/launch_scripts/launch_airflow.py index 8913e680..c82b5dcf 100755 --- a/launch_scripts/launch_airflow.py +++ b/launch_scripts/launch_airflow.py @@ -14,7 +14,7 @@ import datetime import logging import argparse -from typing import Dict, Union, List, Any +from typing import Dict, Union, List import requests from requests.auth import HTTPBasicAuth @@ -26,6 +26,19 @@ logger: logging.Logger = logging.getLogger(__name__) + +def _retrieve_pw(instance: str = "prod") -> str: + path: str = "/sdf/group/lcls/ds/tools/lute/airflow_{instance}.txt" + if instance == "prod" or instance == "test": + path = path.format(instance) + else: + raise ValueError('`instance` must be either "test" or "prod"!') + + with open(path, "r") as f: + pw: str = f.readline().strip() + return pw + + if __name__ == "__main__": parser = argparse.ArgumentParser( prog="trigger_airflow_lute_dag", @@ -45,10 +58,13 @@ extra_args: List[str] # Should contain all SLURM arguments! args, extra_args = parser.parse_known_args() airflow_instance: str + instance_str: str if args.test: airflow_instance = "http://172.24.5.190:8080/" + instance_str = "test" else: airflow_instance = "http://172.24.5.247:8080/" + instance_str = "prod" airflow_api_endpoints: Dict[str, str] = { "health": "api/v1/health", @@ -57,7 +73,7 @@ resp: requests.models.Response = requests.get( f"{airflow_instance}/{airflow_api_endpoints['health']}", - auth=HTTPBasicAuth(), # NEED AUTH SOLUTION + auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), ) resp.raise_for_status() @@ -86,7 +102,7 @@ resp: requests.models.Response = requests.post( f"{airflow_instance}/{airflow_api_endpoints['run_dag']}", json=dag_run_data, - auth=HTTPBasicAuth(), # NEED AUTH SOLUTION + auth=HTTPBasicAuth("btx", _retrieve_pw(instance_str)), ) resp.raise_for_status() logger.info(resp.text) diff --git a/workflows/airflow/pyalgos_sfx_phasing.py b/workflows/airflow/pyalgos_sfx_phasing.py index c9217a44..1bdbe02b 100644 --- a/workflows/airflow/pyalgos_sfx_phasing.py +++ b/workflows/airflow/pyalgos_sfx_phasing.py @@ -21,7 +21,9 @@ from lute.operators.jidoperators import JIDSlurmOperator dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" -description: str = "Run SFX processing using PyAlgos peak finding and experimental phasing" +description: str = ( + "Run SFX processing using PyAlgos peak finding and experimental phasing" +) dag: DAG = DAG( dag_id=dag_id, @@ -32,19 +34,29 @@ peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) -indexer: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="CrystFELIndexer", dag=dag) +indexer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="CrystFELIndexer", dag=dag +) # Merge -merger: JIDSlurmOperator = JIDSlurmOperator(max_cores=120, task_id="PartialatorMerger", dag=dag) +merger: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="PartialatorMerger", dag=dag +) # Figures of merit -hkl_comparer: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLComparer", dag=dag) +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLComparer", dag=dag +) # HKL conversions -hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator(max_cores=8, task_id="HKLManipulator", dag=dag) +hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLManipulator", dag=dag +) # SHELX Tasks -shelxc: JIDSlurmOperator = JIDSlurmOperator(max_cores=20, task_id="SHELXCRunner", dag=dag) +shelxc: JIDSlurmOperator = JIDSlurmOperator( + max_cores=20, task_id="SHELXCRunner", dag=dag +) peak_finder >> indexer >> merger >> hkl_manipulator >> shelxc From f43ef3ddde057ece1979ea103327e9f99f59a08c Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Mon, 18 Mar 2024 14:24:01 -0700 Subject: [PATCH 37/41] DOC Begin basic documentation on adding new workflows. --- docs/tutorial/creating_workflows.md | 39 +++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 docs/tutorial/creating_workflows.md diff --git a/docs/tutorial/creating_workflows.md b/docs/tutorial/creating_workflows.md new file mode 100644 index 00000000..bccf3def --- /dev/null +++ b/docs/tutorial/creating_workflows.md @@ -0,0 +1,39 @@ +# Creating a new workflow + +## Relevant Components +In addition to the core LUTE package, a number of components are generally involved to run a workflow. The current set of scripts and objects are used to interface with Airflow, and the SLURM job scheduler. The core LUTE library can also be used to run workflows using different backends, and in the future these may be supported. + +For building and running workflows using SLURM and Airflow, the following components are necessary, and will be described in more detail below: +- Airflow launch script: `launch_airflow.py` +- SLURM submission script: `submit_slurm.sh` +- Airflow operators: + - `JIDSlurmOperator` + +## Launch Scripts +## `launch_airflow.py` +Sends a request to an Airflow instance to submit a specific DAG. This script prepares an HTTP request with the appropriate parameters in a specific format. + +A request involves the following information: +```py +dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = { + "dag_run_id": str(uuid.uuid4()), + "conf": { + "experiment": os.environ.get("EXPERIMENT"), + "run_id": f"{os.environ.get('RUN_NUM')}{datetime.datetime.utcnow().isoformat()}", + "JID_UPDATE_COUNTERS": os.environ.get("JID_UPDATE_COUNTERS"), + "ARP_ROOT_JOB_ID": os.environ.get("ARP_JOB_ID"), + "ARP_LOCATION": os.environ.get("ARP_LOCATION", "S3DF"), + "Authorization": os.environ.get("Authorization"), + "user": getpass.getuser(), + "lute_params": params, + "slurm_params": extra_args, + }, +} +``` + +## `submit_slurm.sh` + +## Operators +### `JIDSlurmOperator` arguments +- `task_id`: This is nominally the name of the task on the Airflow side. However, for simplicity this is used 1-1 to match the name of a **managed** Task defined in LUTE's `managed_tasks.py` module. I.e., it should the name of an `Executor("Task")` object which will run the specific Task of interest. This **must** match the name of a defined managed Task. +- `max_cores`: Used to cap the maximum number of cores which should be requested of SLURM. By default all jobs will run with the same number of cores, which should be specified when running the `launch_airflow.py` script (either from the ARP, or by hand). This behaviour was chosen because in general we want to increase or decrease the core-count for all Tasks uniformly, and we don't want to have to specify core number arguments for each job individually. Nonetheless, on occassion it may be necessary to cap the number of cores a specific job will use. E.g. if the default value specified when launching the Airflow DAG is multiple cores, and one job is single threaded, the core count can be capped for that single job to 1, while the rest run with multiple cores. From 385af7df0783cd84f5fb9df730b5f9ca033fecd9 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 19 Mar 2024 14:32:18 -0700 Subject: [PATCH 38/41] BUG Fix environment variables in SLURM script --- launch_scripts/submit_slurm.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/launch_scripts/submit_slurm.sh b/launch_scripts/submit_slurm.sh index a29ec5a2..baf055d6 100755 --- a/launch_scripts/submit_slurm.sh +++ b/launch_scripts/submit_slurm.sh @@ -61,8 +61,8 @@ fi SLURM_ARGS=$@ # Setup logfile names - $EXPERIMENT and $RUN will be available if ARP submitted -FORMAT_RUN=$(printf "%04d" ${RUN_NUM:-0}) -LOG_FILE="${TASK}_${EXPERIMENT:-EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')" +FORMAT_RUN=$(printf "%04d" ${RUN:-0}) +LOG_FILE="${TASK}_${EXPERIMENT:-$EXP}_r${FORMAT_RUN}_$(date +'%Y-%m-%d_%H-%M-%S')" SLURM_ARGS+=" --output=${LOG_FILE}.out" SLURM_ARGS+=" --error=${LOG_FILE}.err" From 3e826d98618c59546c43d70fbae7dfe13e2d56b0 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Tue, 26 Mar 2024 12:35:10 -0700 Subject: [PATCH 39/41] DOC Additional workflow docs --- docs/tutorial/creating_workflows.md | 118 +++++++++++++++++++++++++++- 1 file changed, 114 insertions(+), 4 deletions(-) diff --git a/docs/tutorial/creating_workflows.md b/docs/tutorial/creating_workflows.md index bccf3def..acd4a5c9 100644 --- a/docs/tutorial/creating_workflows.md +++ b/docs/tutorial/creating_workflows.md @@ -1,4 +1,5 @@ -# Creating a new workflow +# Workflows with Airflow +**Note:** Airflow uses the term **DAG**, or directed acyclic graph, to describe workflows of tasks with defined (and acyclic) connectivities. This page will use the terms workflow and DAG interchangeably. ## Relevant Components In addition to the core LUTE package, a number of components are generally involved to run a workflow. The current set of scripts and objects are used to interface with Airflow, and the SLURM job scheduler. The core LUTE library can also be used to run workflows using different backends, and in the future these may be supported. @@ -9,11 +10,11 @@ For building and running workflows using SLURM and Airflow, the following compon - Airflow operators: - `JIDSlurmOperator` -## Launch Scripts +## Launch/Submission Scripts ## `launch_airflow.py` -Sends a request to an Airflow instance to submit a specific DAG. This script prepares an HTTP request with the appropriate parameters in a specific format. +Sends a request to an Airflow instance to submit a specific DAG (workflow). This script prepares an HTTP request with the appropriate parameters in a specific format. -A request involves the following information: +A request involves the following information, most of which is retrieved automatically: ```py dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = { "dag_run_id": str(uuid.uuid4()), @@ -30,10 +31,119 @@ dag_run_data: Dict[str, Union[str, Dict[str, Union[str, int, List[str]]]]] = { }, } ``` +Note that the environment variables are used to fill in the appropriate information because this script is intended to be launched primarily from the ARP (which passes these variables). The ARP allows for the launch job to be defined in the experiment eLog and submitted automatically for each new DAQ run. The environment variables `EXPERIMENT` and `RUN` can alternatively be defined prior to submitting the script on the command-line. + +The script takes a number of parameters: + +```bash +launch_airflow.py -c -w [--debug] [--test] +``` + +- `-c` refers to the path of the configuration YAML that contains the parameters for each **managed** `Task` in the requested workflow. +- `-w` is the name of the DAG (workflow) to run. By convention each DAG is named by the Python file it is defined in. (See below). +- `--debug` is an optional flag to run all steps of the workflow in debug mode for verbose logging and output. +- `--test` is an optional flag which will use the test Airflow instance. By default the script will make requests of the standard production Airflow instance. + ## `submit_slurm.sh` +Launches a job on the S3DF batch nodes using the SLURM job scheduler. This script launches a single **managed** `Task` at a time. The usage is as follows: +```bash +submit_slurm.sh -c -t [--debug] [--SLURM_ARGS ...] +``` +As a reminder the **managed** `Task` refers to the `Executor`-`Task` combination. The script does not parse any SLURM specific parameters, and instead passes them transparently to SLURM. At least the following two SLURM arguments must be provided: +```bash +--partition=<...> # Usually partition=milano +--account=<...> # Usually account=lcls:$EXPERIMENT +``` +Generally, resource requests will also be included, such as the number of cores to use. A complete call may look like the following: +```bash +submit_slurm.sh -c /sdf/data/lcls/ds/hutch/experiment/scratch/config.yaml -t Tester --partition=milano --account=lcls:experiment --ntasks=100 [...] +``` + +When running a workflow using the `launch_airflow.py` script, each step of the workflow will be submitted using this script. ## Operators +`Operator`s are the objects submitted as individual steps of a DAG by Airflow. They are conceptually linked to the idea of a task in that each task of a workflow is generally an operator. Care should be taken, not to confuse them with LUTE `Task`s or **managed** `Task`s though. There is, however, usually a one-to-one correspondance between a `Task` and an `Operator`. + +Airflow runs on a K8S cluster which has no access to the experiment data. When we ask Airflow to run a DAG, it will launch an `Operator` for each step of the DAG. However, the `Operator` itself cannot perform productive analysis without access to the data. The solution employed by `LUTE` is to have a limited set of `Operator`s which do not perform analysis, but instead request that a `LUTE` **managed** `Task`s be submitted on the batch nodes where it can access the data. There may be small differences between how the various provided `Operator`s do this, but in general they will all make a request to the **job interface daemon** (JID) that a new SLURM job be scheduled using the `submit_slurm.sh` script described above. + +Therefore, running a typical Airflow DAG involves the following steps: +1. `launch_airflow.py` script is submitted, usually from a definition in the eLog. +2. The `launch_airflow` script requests that Airflow run a specific DAG. +3. The Airflow instance begins submitting the `Operator`s that makeup the DAG definition. +4. Each `Operator` sends a request to the `JID` to submit a job. +5. The `JID` submits the `elog_submit.sh` script with the appropriate **managed** `Task`. +6. The **managed** `Task` runs on the batch nodes, while the `Operator`, requesting updates from the JID on job status, waits for it to complete. +7. Once a **managed** `Task` completes, the `Operator` will receieve this information and tell the Airflow server whether the job completed successfully or resulted in failure. +8. The Airflow server will then launch the next step of the DAG, and so on, until every step has been executed. + +Currently, the following `Operator`s are maintained: +- `JIDSlurmOperator`: The standard `Operator`. Each instance has a one-to-one correspondance with a LUTE **managed** `Task`. + ### `JIDSlurmOperator` arguments - `task_id`: This is nominally the name of the task on the Airflow side. However, for simplicity this is used 1-1 to match the name of a **managed** Task defined in LUTE's `managed_tasks.py` module. I.e., it should the name of an `Executor("Task")` object which will run the specific Task of interest. This **must** match the name of a defined managed Task. - `max_cores`: Used to cap the maximum number of cores which should be requested of SLURM. By default all jobs will run with the same number of cores, which should be specified when running the `launch_airflow.py` script (either from the ARP, or by hand). This behaviour was chosen because in general we want to increase or decrease the core-count for all Tasks uniformly, and we don't want to have to specify core number arguments for each job individually. Nonetheless, on occassion it may be necessary to cap the number of cores a specific job will use. E.g. if the default value specified when launching the Airflow DAG is multiple cores, and one job is single threaded, the core count can be capped for that single job to 1, while the rest run with multiple cores. + + +# Creating a new workflow +Defining a new workflow involves creating a **new** module (Python file) in the directory `workflows/airflow`, creating a number of `Operator` instances within the module, and then drawing the connectivity between them. At the top of the file an Airflow DAG is created and given a name. By convention all `LUTE` workflows use the name of the file as the name of the DAG. The following code can be copied exactly into the file: + +```py +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator # Import other operators if needed + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = ( + "Run SFX processing using PyAlgos peak finding and experimental phasing" +) + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(2024, 3, 18), + schedule_interval=None, + description=description, +) +``` + +Once the DAG has been created, a number of `Operator`s must be created to run the various LUTE analysis operations. As an example consider a partial SFX processing workflow which includes steps for peak finding, indexing, merging, and calculating figures of merit. Each of the 4 steps will have an `Operator` instance which will launch a corresponding `LUTE` **managed** `Task`, for example: + +```py +# Using only the JIDSlurmOperator +# syntax: JIDSlurmOperator(task_id="LuteManagedTaskName", dag=dag) # optionally, max_cores=123) +peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) + +# We specify a maximum number of cores for the rest of the jobs. +indexer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="CrystFELIndexer", dag=dag +) + +# Merge +merger: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="PartialatorMerger", dag=dag +) + +# Figures of merit +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLComparer", dag=dag +) +``` + +Finally, the dependencies between the `Operator`s are "drawn", defining the execution order of the various steps. The `>>` operator has been overloaded for the `Operator` class, allowing it to be used to specify the next step in the DAG. In this case, a completely linear DAG is drawn as: + +```py +peak_finder >> indexer >> merger >> hkl_comparer +``` + +Parallel execution can be added by using the `>>` operator multiple times. Consider a `task1` which upon successful completion starts a `task2` and `task3` in parallel. This dependency can be added to the DAG using: + +```py +#task1: JIDSlurmOperator = JIDSlurmOperator(...) +#task2 ... + +task1 >> task2 +task1 >> task3 +``` + +As each DAG is defined in pure Python, standard control structures (loops, if statements, etc.) can be used to create more complex workflow arrangements. From 7c90114f2192b8f92ef577156c75cf9df875db71 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 29 Mar 2024 08:14:58 -0700 Subject: [PATCH 40/41] ENH Add psocake SFX workflow. Switch pyalgos SFX workflow to be for molecular replacement. --- ..._sfx_phasing.py => psocake_sfx_phasing.py} | 4 +- workflows/airflow/pyalgos_sfx.py | 65 +++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) rename workflows/airflow/{pyalgos_sfx_phasing.py => psocake_sfx_phasing.py} (95%) create mode 100644 workflows/airflow/pyalgos_sfx.py diff --git a/workflows/airflow/pyalgos_sfx_phasing.py b/workflows/airflow/psocake_sfx_phasing.py similarity index 95% rename from workflows/airflow/pyalgos_sfx_phasing.py rename to workflows/airflow/psocake_sfx_phasing.py index 1bdbe02b..e5b733ce 100644 --- a/workflows/airflow/pyalgos_sfx_phasing.py +++ b/workflows/airflow/psocake_sfx_phasing.py @@ -22,7 +22,7 @@ dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" description: str = ( - "Run SFX processing using PyAlgos peak finding and experimental phasing" + "Run SFX processing using Psocake peak finding and experimental phasing" ) dag: DAG = DAG( @@ -32,7 +32,7 @@ description=description, ) -peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) +peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPsocake", dag=dag) indexer: JIDSlurmOperator = JIDSlurmOperator( max_cores=120, task_id="CrystFELIndexer", dag=dag diff --git a/workflows/airflow/pyalgos_sfx.py b/workflows/airflow/pyalgos_sfx.py new file mode 100644 index 00000000..e417486a --- /dev/null +++ b/workflows/airflow/pyalgos_sfx.py @@ -0,0 +1,65 @@ +"""SFX Processing DAG (W/ Experimental Phasing) + +Runs SFX processing, beginning with the PyAlgos peak finding algorithm. +This workflow is used for data requiring experimental phasing. Do NOT use this +DAG if you are using molecular replacement. + +Note: + The task_id MUST match the managed task name when defining DAGs - it is used + by the operator to properly launch it. + + dag_id names must be unique, and they are not namespaced via folder + hierarchy. I.e. all DAGs on an Airflow instance must have unique ids. The + Airflow instance used by LUTE is currently shared by other software - DAG + IDs should always be prefixed with `lute_`. LUTE scripts should append this + internally, so a DAG "lute_test" can be triggered by asking for "test" +""" + +from datetime import datetime +import os +from airflow import DAG +from lute.operators.jidoperators import JIDSlurmOperator + +dag_id: str = f"lute_{os.path.splitext(os.path.basename(__file__))[0]}" +description: str = ( + "Run SFX processing using PyAlgos peak finding and Molecular Replacement" +) + +dag: DAG = DAG( + dag_id=dag_id, + start_date=datetime(2024, 3, 18), + schedule_interval=None, + description=description, +) + +peak_finder: JIDSlurmOperator = JIDSlurmOperator(task_id="PeakFinderPyAlgos", dag=dag) + +indexer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="CrystFELIndexer", dag=dag +) + +# Merge +merger: JIDSlurmOperator = JIDSlurmOperator( + max_cores=120, task_id="PartialatorMerger", dag=dag +) + +# Figures of merit +hkl_comparer: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLComparer", dag=dag +) + +# HKL conversions +hkl_manipulator: JIDSlurmOperator = JIDSlurmOperator( + max_cores=8, task_id="HKLManipulator", dag=dag +) + +# SHELX Tasks +dimple_runner: JIDSlurmOperator = JIDSlurmOperator( + task_id="DimpleSolver", dag=dag +) + + +peak_finder >> indexer >> merger >> hkl_manipulator >> dimple_runner +merger >> hkl_comparer + +# Run summaries From deabc5ea286b5d579fb80e3a64b8ac78dc045300 Mon Sep 17 00:00:00 2001 From: gadorlhiac Date: Fri, 29 Mar 2024 08:16:31 -0700 Subject: [PATCH 41/41] MNT Formatting. --- workflows/airflow/pyalgos_sfx.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/workflows/airflow/pyalgos_sfx.py b/workflows/airflow/pyalgos_sfx.py index e417486a..ddd6e165 100644 --- a/workflows/airflow/pyalgos_sfx.py +++ b/workflows/airflow/pyalgos_sfx.py @@ -54,9 +54,7 @@ ) # SHELX Tasks -dimple_runner: JIDSlurmOperator = JIDSlurmOperator( - task_id="DimpleSolver", dag=dag -) +dimple_runner: JIDSlurmOperator = JIDSlurmOperator(task_id="DimpleSolver", dag=dag) peak_finder >> indexer >> merger >> hkl_manipulator >> dimple_runner