Skip to content

Commit

Permalink
Merge pull request #98 from PennLINC/enh/slurm_directives
Browse files Browse the repository at this point in the history
[FIX/ENH] Fix directives and job accounting; enhance example container config YAML filenames
  • Loading branch information
Chenying Zhao authored May 25, 2023
2 parents c8f86c3 + e3057fa commit 6e70a6e
Show file tree
Hide file tree
Showing 26 changed files with 620 additions and 165 deletions.
10 changes: 8 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@
// "--type_session", "multi-ses",
// "--type_system", "sge"
// ]
// "args": [
// "--project-root",
// "/home/faird/zhaoc/data/test_babs_multi-ses_toybidsapp",
// "--container-config-yaml-file",
// "/home/faird/zhaoc/babs_tests/notebooks/bidsapp-toybidsapp-0-0-7_task-rawBIDS_system-slurm_cluster-MSI_egConfig.yaml",
// "--job-account"
// ]
"args": [
"--project-root",
"/home/faird/zhaoc/data/test_babs_multi-ses_toybidsapp""--container-config-yaml-file",
"/home/faird/zhaoc/babs_tests/notebooks/bidsapp-toybidsapp-0-0-7_task-rawBIDS_system-slurm_cluster-MSI_egConfig.yaml",
"/home/faird/zhaoc/data/test_babs_single-ses_PNC_fmriprep_sloppy",
"--job-account"
]
// "args": [
Expand Down
23 changes: 19 additions & 4 deletions babs/babs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,8 @@ def babs_status(self, flags_resubmit,
# `create_job_status_csv(self)` has been called in `babs_status()`
# in `cli.py`

from .constants import MSG_NO_ALERT_IN_LOGS

# Load the csv file
lock_path = self.job_status_path_abs + ".lock"
lock = FileLock(lock_path)
Expand Down Expand Up @@ -1256,7 +1258,7 @@ def babs_status(self, flags_resubmit,
# NOTE: in theory can skip failed jobs in previous round,
# but making assigning variables hard; so not to skip
# if df_job.at[i_job, "is_failed"] is not True: # np.nan or False
alert_message_in_log_files, if_no_alert_in_log = \
alert_message_in_log_files, if_no_alert_in_log, if_found_log_files = \
get_alert_message_in_log_files(config_msg_alert, log_fn)
# ^^ the function will handle even if `config_msg_alert=None`
df_job_updated.at[i_job, "alert_message"] = \
Expand Down Expand Up @@ -1436,6 +1438,15 @@ def babs_status(self, flags_resubmit,
df_job_updated.at[i_job, "job_state_code"] = np.nan
df_job_updated.at[i_job, "duration"] = np.nan
# ROADMAP: ^^ get duration via `qacct`
if if_found_log_files == False: # bool or np.nan
# If there is no log files, the alert message would be 'np.nan';
# however this is a failed job, so it should have log files,
# unless it was killed by the user when pending.
# change the 'alert_message' to no alert in logs,
# so that when reporting job status,
# info from job accounting will be reported
df_job_updated.at[i_job, "alert_message"] = \
MSG_NO_ALERT_IN_LOGS

# check the log file:
# TODO ^^
Expand Down Expand Up @@ -1575,7 +1586,7 @@ def babs_status(self, flags_resubmit,
get_last_line(o_fn)
# Check if any alert message in log files for this job:
# this is to update `alert_message` in case user changes configs in yaml
alert_message_in_log_files, if_no_alert_in_log = \
alert_message_in_log_files, if_no_alert_in_log, _ = \
get_alert_message_in_log_files(config_msg_alert, log_fn)
# ^^ the function will handle even if `config_msg_alert=None`
df_job_updated.at[i_job, "alert_message"] = \
Expand Down Expand Up @@ -2542,7 +2553,9 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session,
# Write into the bash file:
bash_file = open(bash_path, "a") # open in append mode

bash_file.write("#!/bin/bash\n")
# NOTE: not to automatically generate the interpreting shell;
# instead, let users specify it in the container config yaml file
# using `interpreting_shell`

# Cluster resources requesting:
cmd_bashhead_resources = generate_bashhead_resources(system, self.config)
Expand Down Expand Up @@ -2745,7 +2758,9 @@ def generate_bash_test_job(self, folder_check_setup,
# Write into the bash file:
bash_file = open(fn_call_test_job, "a") # open in append mode

bash_file.write("#!/bin/bash\n")
# NOTE: not to automatically generate the interpreting shell;
# instead, let users specify it in the container config yaml file
# using `interpreting_shell`

# Cluster resources requesting:
cmd_bashhead_resources = generate_bashhead_resources(system, self.config)
Expand Down
6 changes: 4 additions & 2 deletions babs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,10 @@ def babs_status_cli():
action='store_true',
# ^^ if `--job-account` is specified, args.job_account = True; otherwise, False
help="Whether to account failed jobs, which may take some time."
" If `--resubmit failed` or `--resubmit-job` for this failed job is also requested,"
" this `--job-account` will be skipped.")
" When using ``--job-account``, please also add ``--container_config_yaml_file``."
" If ``--resubmit failed`` or ``--resubmit-job`` (for some failed jobs)"
" is also requested,"
" this ``--job-account`` will be skipped.")

return parser

Expand Down
10 changes: 7 additions & 3 deletions babs/dict_cluster_systems.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@

# format: <name in UI>: "<cluster-understandable format>"
# placeholder "$VALUE" will be replaced by the real value provided by the user.
# For 'interpreting_shell': nothing else will be added by BABS
# For other keys: cluster-type-specific prefix will be added
# e.g., '#$ ' for SGE clusters
# e.g., '#SBATCH ' for Slurm clusters

sge:
interpreting_shell: "-S $VALUE" # "-S /bin/bash" on cubic
interpreting_shell: "#!$VALUE" # "#!/bin/bash" on cubic
hard_memory_limit: "-l h_vmem=$VALUE" # "-l h_vmem=25G" on cubic
soft_memory_limit: "-l s_vmem=$VALUE" # "-l s_vmem=23.5G" on cubic
temporary_disk_space: "-l tmpfree=$VALUE" # "-l tmpfree=200G" on cubic
number_of_cpus: "-pe threaded $VALUE" # "-pe threaded N" or a range: "-pe threaded N-M", N<M on cubic
hard_runtime_limit: "-l h_rt=$VALUE" # "-l h_rt=24:00:00" on cubic
slurm:
interpreting_shell: ""
interpreting_shell: "#!$VALUE" # e.g., "#!/bin/bash -l" on MSI
hard_memory_limit: "--mem=$VALUE"
soft_memory_limit: ""
temporary_disk_space: "--tmp=$VALUE" # "--tmp=20g" on MSI
temporary_disk_space: "--tmp=$VALUE" # "#SBATCH --tmp=20g" on MSI
number_of_cpus: "--cpus-per-task=$VALUE"
hard_runtime_limit: "--time=$VALUE"
62 changes: 50 additions & 12 deletions babs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,12 +653,19 @@ def generate_one_bashhead_resources(system, key, value):
This does not include "\n" at the end.
e.g., "#$ -S /bin/bash".
Notes:
---------
For interpreting shell, regardless of system type,
it will be '#!' + the value user provided.
"""
cmd = "#"
if system.type == "sge":
cmd += "$ " # e.g., `#$ -l h_vmem=xxx`
elif system.type == "slurm":
cmd += "SBATCH " # e.g., `#SBATCH --xxx=yyy`
if key == "interpreting_shell":
cmd = "" # directly use the format provided in the dict
else:
cmd = "#"
if system.type == "sge":
cmd += "$ " # e.g., `#$ -l h_vmem=xxx`
elif system.type == "slurm":
cmd += "SBATCH " # e.g., `#SBATCH --xxx=yyy`

# find the key in the `system.dict`:
if key not in system.dict:
Expand All @@ -675,8 +682,8 @@ def generate_one_bashhead_resources(system, key, value):

def generate_bashhead_resources(system, config):
"""
This is to generate the head of the bash file
for requesting cluster resources.
This is to generate the directives ("head of the bash file")
for requesting cluster resources, specifying interpreting shell, etc.
Parameters:
------------
Expand All @@ -700,10 +707,25 @@ def generate_bashhead_resources(system, config):
raise Exception("There is no section `cluster_resources`"
+ " in `container_config_yaml_file`!")

# loop: for each key, call `generate_one_bashhead_resources()`:
# generate the command for interpreting shell first:
if "interpreting_shell" not in config["cluster_resources"]:
warnings.warn("The interpreting shell was not specified for 'participant_job.sh'."
+ " This should be specified using 'interpreting_shell'"
+ " under section 'cluster_resources' in container's"
+ " configuration YAML file.")
else:
key = "interpreting_shell"
value = config["cluster_resources"][key]
one_cmd = generate_one_bashhead_resources(system, key, value)
cmd += one_cmd + "\n"

# loop for other keys:
# for each key, call `generate_one_bashhead_resources()`:
for key, value in config["cluster_resources"].items():
if key == "customized_text":
pass # handle this below
elif key == "interpreting_shell":
pass # has been handled - see above
else:
one_cmd = generate_one_bashhead_resources(system, key, value)
cmd += one_cmd + "\n"
Expand Down Expand Up @@ -1926,6 +1948,9 @@ def get_alert_message_in_log_files(config_msg_alert, log_fn):
When `alert_message` is `msg_no_alert`,
or is `np.nan` (`if_valid_alert_msg=False`), this is True;
Otherwise, any other message, this is False
if_found_log_files: bool or np.nan
np.nan if `config_msg_alert` is None, as it's unknown whether log files exist or not
Otherwise, True or False based on if any log files were found
Notes:
-----------------
Expand All @@ -1938,15 +1963,18 @@ def get_alert_message_in_log_files(config_msg_alert, log_fn):
msg_no_alert = MSG_NO_ALERT_IN_LOGS
if_valid_alert_msg = True # by default, `alert_message` is valid (i.e., not np.nan)
# this is to avoid check `np.isnan(alert_message)`, as `np.isnan(str)` causes error.
if_found_log_files = np.nan

if config_msg_alert is None:
alert_message = np.nan
if_valid_alert_msg = False
if_found_log_files = np.nan # unknown if log files exist or not
else:
o_fn = log_fn.replace("*", 'o')
e_fn = log_fn.replace("*", 'e')

if op.exists(o_fn) or op.exists(e_fn): # either exists:
if_found_log_files = True
found_message = False
alert_message = msg_no_alert

Expand Down Expand Up @@ -1977,6 +2005,7 @@ def get_alert_message_in_log_files(config_msg_alert, log_fn):
break # no need to go to next log file

else: # neither o_fn nor e_fn exists yet:
if_found_log_files = False
alert_message = np.nan
if_valid_alert_msg = False

Expand All @@ -1986,7 +2015,7 @@ def get_alert_message_in_log_files(config_msg_alert, log_fn):
else: # `alert_message`: np.nan or any other message:
if_no_alert_in_log = False

return alert_message, if_no_alert_in_log
return alert_message, if_no_alert_in_log, if_found_log_files

def get_username():
"""
Expand Down Expand Up @@ -2073,18 +2102,24 @@ def _check_job_account_slurm(job_id_str, job_name, username_lowercase):
stdout=subprocess.PIPE
)
# ref: https://slurm.schedmd.com/sacct.html
# also based on ref: https://github.com/ComputeCanada/slurm_utils/blob/master/sacct-all.py

proc_sacct.check_returncode()
# even if the job does not exist, there will still be printed msg from sacct,
# at least a header. So `check_returncode()` should always succeed.
msg_l = proc_sacct.stdout.decode('utf-8').split("\n")
msg_l = proc_sacct.stdout.decode('utf-8').split("\n") # all lines from `sacct`
# 1st line: column names
# 2nd and forward lines: job information
# ^^ if using `--parsable2` and `--delimiter`, there is no 2nd line of "----" dashes
# Usually there are more than one job lines;
# However if the job was manually killed when pending, then there will only be one job line.
msg_head = msg_l[0].split(the_delimiter) # list of column names

# Check if there is any problem when calling `sacct` for this job:
if "State" not in msg_head or "JobID" not in msg_head or "JobName" not in msg_head:
if_no_sacct = True
if len(msg_l) <= 2 or msg_l[2] == '':
# if there is only header (len <= 2 or the 3rd element is empty):
if len(msg_l) <= 1 or msg_l[1] == '':
# if there is only header (len <= 1 or the 2nd element is empty):
if_no_sacct = True

if if_no_sacct: # there is no information about this job in sacct:
Expand Down Expand Up @@ -2189,6 +2224,9 @@ def _check_job_account_sge(job_id_str, job_name, username_lowercase):
print("Hint: check if the job is still in the queue, e.g., in state of qw, r, etc")
print("Hint: check if the username used for submitting this job"
+ " was not current username '" + username_lowercase + "'")
print("Hint: check if the job was killed during pending state")
# ^^ for SGE cluster: job manually killed during pending: `qacct` will fail:
# "error: job id xxx not found"
msg_toreturn = msg_failed_to_call_qacct

return msg_toreturn
Expand Down
49 changes: 46 additions & 3 deletions docs/source/babs-status.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,52 @@
*************************************
##################################################
``babs-status``: Check job status
*************************************
##################################################

.. contents:: Table of Contents

**********************
Command-Line Arguments
**********************

.. argparse::
:ref: babs.cli.babs_status_cli
:prog: babs-status
:nodefault:
:nodefaultconst:
:nodefaultconst:


**********************
Example commands
**********************

Basic use: you'll only get job summary (number of jobs finished/pending/running/failed):

.. code-block:: bash
babs-status \
--project-root /path/to/my_BABS_project
Failed job auditing: only using alert messages in log files:

.. code-block:: bash
babs-status \
--project-root /path/to/my_BABS_project \
--container-config-yaml-file /path/to/container_config.yaml
Failed job auditing: using alert messages in log files + performing job account for jobs
without alert messages in log files:

.. code-block:: bash
babs-status \
--project-root /path/to/my_BABS_project \
--container-config-yaml-file /path/to/container_config.yaml \
--job-account
When using ``--job-account``, you should also use ``--container-config-yaml-file``.

.. developer's note: seems like if only using `--job-account` without `--container-config-yaml-file`,
.. although job account commands will be called (taking more time),
.. it won't report the message e.g., "Among job(s) that are failed and don't have alert message in log files:"
.. This is probably because the "alert_message" was cleared up, so no job has "BABS: No alert message found in log files."
Loading

0 comments on commit 6e70a6e

Please sign in to comment.