Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH Airflow Workflow Submission from Command-line #29

Merged
merged 5 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ LUTE is publically available on [GitHub](https://github.com/slac-lcls/lute). In
# Navigate to the directory of your choice.
git [email protected]:slac-lcls/lute
```
The repostiory directory structure is as follows:
The repository directory structure is as follows:

```
lute
Expand All @@ -24,6 +24,7 @@ In general, most interactions with the software will be through scripts located

### A note on utilties
In the `utilities` directory there are two useful programs to provide assistance with using the software:

- `utilities/dbview`: LUTE stores all parameters for every analysis routine it runs (as well as results) in a database. This database is stored in the `work_dir` defined in the YAML file (see below). The `dbview` utility is a TUI application (Text-based user interface) which runs in the terminal. It allows you to navigate a LUTE database using the arrow keys, etc. Usage is: `utilities/dbview -p <path/to/lute.db>`.
- `utilities/lute_help`: This utility provides help and usage information for running LUTE software. E.g., it provides access to parameter descriptions to assist in properly filling out a configuration YAML. It's usage is described in slightly more detail below.

Expand All @@ -32,6 +33,7 @@ In the `utilities` directory there are two useful programs to provide assistance
LUTE runs code as `Task`s that are managed by an `Executor`. The `Executor` provides modifications to the environment the `Task` runs in, as well as controls details of inter-process communication, reporting results to the eLog, etc. Combinations of specific `Executor`s and `Task`s are already provided, and are referred to as **managed** `Task`s. **Managed** `Task`s are submitted as a single unit. They can be run individually, or a series of independent steps can be submitted all at once in the form of a workflow, or **directed acyclic graph** (**DAG**). This latter option makes use of Airflow to manage the individual execution steps.

Running analysis with LUTE is the process of submitting one or more **managed** `Task`s. This is generally a two step process.

1. First, a configuration YAML file is prepared. This contains the parameterizations of all the `Task`s which you may run.
2. Individual **managed** `Task` submission, or workflow (**DAG**) submission.

Expand Down Expand Up @@ -70,7 +72,8 @@ In the first document, the header, it is important that the `work_dir` is proper

The actual analysis parameters are defined in the second document. As these vary from `Task` to `Task`, a full description will not be provided here. An actual template with real `Task` parameters is available in `config/test.yaml`. Your analysis POC can also help you set up and choose the correct `Task`s to include as a starting point. The template YAML file has further descriptions of what each parameter does and how to fill it out. You can also refer to the `lute_help` program described under the following sub-heading.

Some things to consider and possible points of confusion:
**Some things to consider and possible points of confusion:**

- While we will be submitting **managed** `Task`s, the parameters are defined at the `Task` level. I.e. the **managed** `Task` and `Task` itself have different names, and the names in the YAML refer to the latter. This is because a single `Task` can be run using different `Executor` configurations, but using the same parameters. The list of **managed** `Task`s is in `lute/managed_tasks.py`. A table is also provided below for some routines of interest..


Expand Down Expand Up @@ -136,6 +139,7 @@ no_image_data (boolean) - Default: False
## Running Managed `Task`s and Workflows (DAGs)
After a YAML file has been filled in you can run a `Task`. There are multiple ways to submit a `Task`, but there are 3 that are most likely:
1. Run a single **managed** `Task` interactively by running `python ...`
2. Run a single **managed** `Task` as a batch job (e.g. on S3DF) via a SLURM submission `submit_slurm.sh ...`
3. Run a DAG (workflow with multiple **managed** `Task`s).
Expand All @@ -151,6 +155,7 @@ The simplest submission method is just to run Python interactively. In most case
```
The command-line arguments in square brackets `[]` are optional, while those in `<>` must be provided:
- `-O` is the flag controlling whether you run in debug or non-debug mode. **By default, i.e. if you do NOT provide this flag you will run in debug mode** which enables verbose printing. Passing `-O` will turn off debug to minimize output.
- `-t <ManagedTaskName>` is the name of the **managed** `Task` you want to run.
- `-c </path/...>` is the path to the configuration YAML.
Expand All @@ -163,28 +168,30 @@ On S3DF you can also submit individual **managed** `Task`s to run as batch jobs.
```
As before command-line arguments in square brackets `[]` are optional, while those in `<>` must be provided
- `-t <ManagedTaskName>` is the name of the **managed** `Task` you want to run.
- `-c </path/...>` is the path to the configuration YAML.
- `--debug` is the flag to control whether or not to run in debug mode.
In addition to the LUTE-specific arguments, SLURM arguments must also be provided (`$SLURM_ARGS` above). You can provide as many as you want; however you will need to at least provide:
- `--partition=<partition/queue>` - The queue to run on, in general for LCLS this is `milano`
- `--account=lcls:<experiment>` - The account to use for batch job accounting.
You will likely also want to provide at a minimum:
- `--ntasks=<...>` to control the number of cores in allocated.
In general, it is best to prefer the long-form of the SLURM-argument (`--arg=<...>`) in order to avoid potential clashes with present or future LUTE arguments.
### Workflow (DAG) submission
**NOTE**: Support for submitting Airflow DAGs from the command-line is coming soon. As of 2024/05/03 you will need to use the instructions for **DAG Submission from the** `eLog` described below. This is due to authentication requirements - support for new API calls is in the works.
Finally, you can submit a full workflow (e.g. SFX analysis, smalldata production and summary results, geometry optimization...). This can be done using a single script, `submit_launch_airflow.sh`, similarly to the SLURM submission above:
```bash
> launch_scripts/submit_launch_airflow.sh /path/to/lute/launch_scripts/launch_airflow.py -c </path/to/yaml.yaml> -w <dag_name> [--debug] [--test] $SLURM_ARGS
```
The submission process is slightly more complicated in this case. A more in-depth explanation is provided under "Airflow Launch Steps", in the advanced usage section below if interested. The parameters are as follows - as before command-line arguments in square brackets `[]` are optional, while those in `<>` must be provided:
- The **first argument** (must be first) is the full path to the `launch_scripts/launch_airflow.py` script located in whatever LUTE installation you are running. All other arguments can come afterwards in any order.
- `-c </path/...>` is the path to the configuration YAML to use.
- `-w <dag_name>` is the name of the DAG (workflow) to run. This replaces the task name provided when using the other two methods above. A DAG list is provided below.
Expand All @@ -194,13 +201,15 @@ The submission process is slightly more complicated in this case. A more in-dept
The `$SLURM_ARGS` must be provided in the same manner as when submitting an individual **managed** `Task` by hand to be run as batch job with the script above. **Note** that these parameters will be used as the starting point for the SLURM arguments of **every managed** `Task` in the DAG; however, individual steps in the DAG may have overrides built-in where appropriate to make sure that step is not submitted with potentially incompatible arguments. For example, a single threaded analysis `Task` may be capped to running on one core, even if in general everything should be running on 100 cores, per the SLURM argument provided. These caps are added during development and cannot be disabled through configuration changes in the YAML.
**DAG List**
- `find_peaks_index`
- `psocake_sfx_phasing`
- `pyalgos_sfx`
#### DAG Submission from the `eLog`
You can use the script in the previous section to submit jobs through the eLog. To do so navigate to the `Workflow > Definitions` tab using the blue navigation bar at the top of the eLog. On this tab, in the top-right corner (underneath the help and zoom icons) you can click the `+` sign to add a new workflow. This will bring up a "Workflow definition" UI window. When filling out the eLog workflow definition the following fields are needed (all of them):
- `Name`: You can name the workflow anything you like. It should probably be something descriptive, e.g. if you are using LUTE to run smalldata_tools, you may call the workflow `lute_smd`.
- `Executable`: In this field you will put the **full path** to the `submit_launch_airflow.sh` script: `/path/to/lute/launch_scripts/submit_launch_airflow.sh`.
- `Parameters`: You will use the parameters as described above. Remember the first argument will be the **full path** to the `launch_airflow.py` script (this is NOT the same as the bash script used in the executable!): `/full/path/to/lute/launch_scripts/launch_airflow.py -c <path/to/yaml> -w <dag_name> [--debug] [--test] $SLURM_ARGS`
Expand Down Expand Up @@ -258,10 +267,12 @@ Task:
In order to use parameter `c`, you would use `{{ Task.param_set.c }}` as the substitution.
Take care when using substitutions! This process will not try to guess for you. When a substitution is not available, e.g. due to misspelling, one of two things will happen:
- If it was an environment variable that does not exist, no substitution will be performed, although a message will be printed. I.e. you will be left with `param: /my/failed/{{ $SUBSTITUTION }}` as your parameter. This may or may not fail the model validation step, but is likely not what you intended.
- If it was an attempt at substituting another YAML parameter which does not exist, an exception will be thrown and the program will exit.
**Defining your own parameters**
The configuration file is **not** validated in its totality, only on a `Task`-by-`Task` basis, but it **is read** in its totality. E.g. when running `MyTask` only that portion of the configuration is validated even though the entire file has been read, and is available for substitutions. As a result, it is safe to introduce extra entries into the YAML file, as long as they are not entered under a specific `Task`'s configuration. This may be useful to create your own global substitutions, for example if there is a key variable that may be used across different `Task`s.
E.g. Consider a case where you want to create a more generic configuration file where a single variable is used by multiple `Task`s. This single variable may be changed between experiments, for instance, but is likely static for the duration of a single set of analyses. In order to avoid a mistake when changing the configuration between experiments you can define this special variable (or variables) as a separate entry in the YAML, and make use of substitutions in each `Task`'s configuration. This way the variable only needs to be changed in one place.
Expand Down Expand Up @@ -290,6 +301,7 @@ RunTask3:
### Gotchas!
**Order matters**
While in general you can use parameters that appear later in a YAML document to substitute for values of parameters that appear earlier, the substitutions themselves will be performed in order of appearance. It is therefore **NOT possible** to correctly use a later parameter as a substitution for an earlier one, if the later one itself depends on a substitution. The YAML document, however, can be rearranged without error. The order in the YAML document has no effect on execution order which is determined purely by the workflow definition. As mentioned above, the document is not validated in its entirety so rearrangements are allowed. For example consider the following situation which produces an incorrect substitution:
Expand Down Expand Up @@ -343,6 +355,7 @@ RunTaskOne:
On the otherhand, relationships such as these may point to inconsistencies in the dependencies between `Task`s which may warrant a refactor.
**Found unhashable key**
To avoid YAML parsing issues when using the substitution syntax, be sure to quote your substitutions. Before substitution is performed, a dictionary is first constructed by the `pyyaml` package which parses the document - it may fail to parse the document and raise an exception if the substitutions are not quoted.
E.g.
```yaml
Expand All @@ -361,6 +374,7 @@ During validation, Pydantic will by default cast variables if possible, because
Special markers have been inserted at certain points in the execution flow for LUTE. These can be enabled by setting the environment variables detailed below. These are intended to allow developers to exit the program at certain points to investigate behaviour or a bug. For instance, when working on configuration parsing, an environment variable can be set which exits the program after passing this step. This allows you to run LUTE otherwise as normal (described above), without having to modify any additional code or insert your own early exits.
Types of debug markers:
- `LUTE_DEBUG_EXIT`: Will exit the program at this point if the corresponding environment variable has been set.
Developers can insert these markers as needed into their code to add new exit points, although as a rule of thumb they should be used sparingly, and generally only after major steps in the execution flow (e.g. after parsing, after beginning a task, after returning a result, etc.).
Expand All @@ -386,12 +400,14 @@ MYENVVAR=1 python -B run_task.py -t Tester -c config/test.yaml
## Airflow Launch and DAG Execution Steps
The Airflow launch process actually involves a number of steps, and is rather complicated. There are two wrapper steps prior to getting to the actual Airflow API communication.
1. `launch_scripts/submit_launch_airflow.sh` is run.
2. This script calls `/sdf/group/lcls/ds/tools/lute_launcher` with all the same parameters that it was called with.
3. `lute_launcher` runs the `launch_scripts/launch_airflow.py` script which was provided as the first argument. This is the **true** launch script
4. `launch_airflow.py` communicates with the Airflow API, requesting that a specific DAG be launched. It then continues to run, and gathers the individual logs and the exit status of each step of the DAG.
5. Airflow will then enter a loop of communication where it asks the JID to submit each step of the requested DAG as batch job using `launch_scripts/submit_slurm.sh`.
There are some specific reasons for this complexity:
- The use of `submit_launch_airflow.sh` as a thin-wrapper around `lute_launcher` is to allow the true Airflow launch script to be a long-lived job. This is for compatibility with the eLog and the ARP. When run from the eLog as a workflow, the job submission process must occur within 30 seconds due to a timeout built-in to the system. This is fine when submitting jobs to run on the batch-nodes, as the submission to the queue takes very little time. So here, `submit_launch_airflow.sh` serves as a thin script to have `lute_launcher` run as a batch job. It can then run as a long-lived job (for the duration of the entire DAG) collecting log files all in one place. This allows the log for each stage of the Airflow DAG to be inspected in a single file, and through the eLog browser interface.
- The use `lute_launcher` as a wrapper around `launch_airflow.py` is to manage authentication and credentials. The `launch_airflow.py` script requires loading credentials in order to authenticate against the Airflow API. For the average user this is not possible, unless the script is run from within the `lute_launcher` process.
89 changes: 88 additions & 1 deletion launch_scripts/launch_airflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.60-py3/bin/python
#!/sdf/group/lcls/ds/ana/sw/conda1/inst/envs/ana-4.0.62-py3/bin/python

"""Script submitted by Automated Run Processor (ARP) to trigger an Airflow DAG.
Expand Down Expand Up @@ -46,6 +46,44 @@ def _retrieve_pw(instance: str = "prod") -> str:
return pw


def _request_arp_token(exp: str, lifetime: int = 300) -> str:
"""Request an ARP token via Kerberos endpoint.
A token is required for job submission.
Args:
exp (str): The experiment to request the token for. All tokens are
scoped to a single experiment.
lifetime (int): The lifetime, in minutes, of the token. After the token
expires, it can no longer be used for job submission. The maximum
time you can request is 480 minutes (i.e. 8 hours). NOTE: since this
token is used for the entirety of a workflow, it must have a lifetime
equal or longer than the duration of the workflow's execution time.
"""
from kerberos import GSSError
from krtc import KerberosTicket

try:
krbheaders: Dict[str, str] = KerberosTicket(
"[email protected]"
).getAuthHeaders()
except GSSError:
logger.info(
"Cannot proceed without credentials. Try running `kinit` from the command-line."
)
raise
base_url: str = "https://pswww.slac.stanford.edu/ws-kerb/lgbk/lgbk"
token_endpoint: str = (
f"{base_url}/{exp}/ws/generate_arp_token?token_lifetime={lifetime}"
)
resp: requests.models.Response = requests.get(token_endpoint, headers=krbheaders)
resp.raise_for_status()
token: str = resp.json()["value"]
formatted_token: str = f"Bearer {token}"
return formatted_token


if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="trigger_airflow_lute_dag",
Expand All @@ -60,10 +98,50 @@ def _retrieve_pw(instance: str = "prod") -> str:
parser.add_argument(
"-w", "--workflow", type=str, help="Workflow to run.", default="test"
)
# Optional arguments for when running from command-line
parser.add_argument(
"-e",
"--experiment",
type=str,
help="Provide an experiment if not running with ARP.",
required=False,
)
parser.add_argument(
"-r",
"--run",
type=str,
help="Provide a run number if not running with ARP.",
required=False,
)

args: argparse.Namespace
extra_args: List[str] # Should contain all SLURM arguments!
args, extra_args = parser.parse_known_args()
# Check if was submitted from ARP - look for token
use_kerberos: bool = False
if os.getenv("Authorization") is None:
use_kerberos = True
cache_file: Optional[str] = os.getenv("KRB5CCNAME")
if cache_file is None:
logger.info("No Kerberos cache. Try running `kinit` and resubmitting.")
sys.exit(-1)

if args.experiment is None or args.run is None:
logger.info(
(
"You must provide a `-e ${EXPERIMENT}` and `-r ${RUN_NUM}` "
"if not running with the ARP!\n"
"If you submitted this from the eLog and are seeing this error "
"please contact the maintainers."
)
)
sys.exit(-1)
os.environ["EXPERIMENT"] = args.experiment
os.environ["RUN_NUM"] = args.run

os.environ["Authorization"] = _request_arp_token(args.experiment)
os.environ["ARP_JOB_ID"] = str(uuid.uuid4())

airflow_instance: str
instance_str: str
if args.test:
Expand Down Expand Up @@ -194,6 +272,15 @@ def _retrieve_pw(instance: str = "prod") -> str:
logger.info(f"DAG exited: {dag_state}")
break

if use_kerberos:
# We had to do some funny business to get Kerberos credentials...
# Cleanup now that we're done
logger.debug("Removing duplicate Kerberos credentials.")
# This should be defined if we get here
# Format is FILE:/.../...
os.remove(cache_file[5:])
os.rmdir(f"{os.path.expanduser('~')}/.tmp_cache")

if dag_state == "failed":
sys.exit(1)
else:
Expand Down
Loading
Loading