Skip to content

Commit

Permalink
Merge pull request #96 from PennLINC/enh/slurm_job
Browse files Browse the repository at this point in the history
[ENH] test out BABS's Slurm version using toy BIDS App and toy dataset: enhanced code and fixed bugs
  • Loading branch information
Chenying Zhao authored May 22, 2023
2 parents 2e11bc5 + 0aae44d commit c8f86c3
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 75 deletions.
50 changes: 28 additions & 22 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@
// "--project-root",
// "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/test_babs_multi-ses_toybidsapp",
// ],
"args": [
"--where_project",
"/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data",
"--project_name",
"test_babs_multi-ses_toybidsapp", // "test_babs_multi-ses_fmriprep",
"--input",
"BIDS",
"/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/w2nu3",
// "https://osf.io/w2nu3/",
"--container_ds",
"/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/toybidsapp-container-docker",
"--container_name",
"toybidsapp-0-0-7", // "fmriprep-20-2-3",
"--container_config_yaml_file",
"/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_toybidsapp.yaml",
// "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_fmriprep.yaml",
"--type_session",
"multi-ses",
"--type_system",
"sge",
// "--keep-if-failed"
]
// "args": [
// "--where_project",
// "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data",
// "--project_name",
// "test_babs_multi-ses_toybidsapp", // "test_babs_multi-ses_fmriprep",
// "--input",
// "BIDS",
// "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/w2nu3",
// // "https://osf.io/w2nu3/",
// "--container_ds",
// "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/toybidsapp-container-docker",
// "--container_name",
// "toybidsapp-0-0-7", // "fmriprep-20-2-3",
// "--container_config_yaml_file",
// "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_toybidsapp.yaml",
// // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_fmriprep.yaml",
// "--type_session",
// "multi-ses",
// "--type_system",
// "sge",
// // "--keep-if-failed"
// ]
// "args": [
// "--where_project", "/cbica/projects/BABS/data",
// // "--project_name", "test_babs_multi-ses_toybidsapp",
Expand All @@ -56,6 +56,12 @@
// "--type_session", "multi-ses",
// "--type_system", "sge"
// ]
"args": [
"--project-root",
"/home/faird/zhaoc/data/test_babs_multi-ses_toybidsapp""--container-config-yaml-file",
"/home/faird/zhaoc/babs_tests/notebooks/bidsapp-toybidsapp-0-0-7_task-rawBIDS_system-slurm_cluster-MSI_egConfig.yaml",
"--job-account"
]
// "args": [
// "--project-root",
// "/cbica/projects/BABS/data/test_babs_multi-ses_toybidsapp",
Expand Down
14 changes: 11 additions & 3 deletions babs/babs.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ def babs_bootstrap(self, input_ds,
if system.type == "sge":
gitignore_file.write("\n.SGE_datalad_lock")
elif system.type == "slurm":
# TODO: add command for `slurm`!!!
print("Not supported yet... To work on...")
gitignore_file.write("\n.SLURM_datalad_lock")
else:
warnings.warn("Not supporting systems other than SGE or Slurm"
+ " for '.gitignore'.")
# not to track lock file:
gitignore_file.write("\n" + "code/babs_proj_config.yaml.lock")
# not to track `job_status.csv`:
Expand Down Expand Up @@ -1336,8 +1338,13 @@ def babs_status(self, flags_resubmit,
df_job_updated.at[i_job, "job_state_code"] = state_code
# get the duration:
if "duration" in df_all_job_status:
# e.g., slurm `squeue` automatically returns the duration,
# so no need to calcu again.
duration = df_all_job_status.at[job_id_str, "duration"]
else:
# This duration time may be slightly longer than actual
# time, as this is using current time, instead of
# the time when `qstat`/requesting job queue.
duration = calcu_runtime(
df_all_job_status.at[job_id_str, "JAT_start_time"])
df_job_updated.at[i_job, "duration"] = duration
Expand Down Expand Up @@ -1471,7 +1478,8 @@ def babs_status(self, flags_resubmit,
# message found in log files:
job_name = log_filename.split(".*")[0]
msg_job_account = \
check_job_account(job_id_str, job_name, username_lowercase, self.type_system)
check_job_account(job_id_str, job_name,
username_lowercase, self.type_system)
df_job_updated.at[i_job, "job_account"] = msg_job_account
# Done: submitted jobs that not 'is_done'

Expand Down
202 changes: 154 additions & 48 deletions babs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ def report_job_status(df, analysis_path, config_msg_alert):
def request_all_job_status(type_system):
"""
This is to get all jobs' status
using `qstat` for SGE clusters and squeue for Slurm
using `qstat` for SGE clusters and `squeue` for Slurm
Parameters:
--------------
Expand Down Expand Up @@ -1693,46 +1693,82 @@ def _request_all_job_status_sge():
def _request_all_job_status_slurm():
"""
This is to get all jobs' status for Slurm
by calling `squeue`.
"""
username = get_username()
sacct_proc = subprocess.run(
squeue_proc = subprocess.run(
["squeue", "-u", username, "-o", "%.18i %.9P %.8j %.8u %.2t %T %.10M"],
stdout=subprocess.PIPE
)
std = sacct_proc.stdout.decode('utf-8')
std = squeue_proc.stdout.decode('utf-8')

sacct_out_df = _parsing_sacct_out(std)
return sacct_out_df
squeue_out_df = _parsing_squeue_out(std)
return squeue_out_df


def _parsing_sacct_out(sacct_std):
header_l = sacct_std.splitlines()[0].split()
datarows = sacct_std.splitlines()[1:]

dict_ind = {"jobid": 0, "st": 4, "state": 5, "time": 6}
dict_val = dict((key, []) for key in dict_ind)


for fld in ["jobid", "st", "state", "time"]:
if header_l[dict_ind[fld]].lower() != fld:
raise Exception(f"error in the squeue output, expected {fld} and got {header_l[dict_ind[fld]].lower()}")

for row in datarows:
if "." not in row.split()[0]:
for key, ind in dict_ind.items():
dict_val[key].append(row.split()[ind])

# renaming the keys
dict_val["JB_job_number"] = dict_val.pop("jobid")
dict_val["@state"] =dict_val.pop("state")
dict_val["duration"] =dict_val.pop("time")

def _parsing_squeue_out(squeue_std):
"""
This is to parse printed messages from `squeue` on Slurm clusters
and to convert Slurm codes to SGE codes
state_slurm2sge = {"R": "r", "PD": "qw"}
dict_val["state"] = [state_slurm2sge.get(sl_st, "NA") for sl_st in dict_val.pop("st")]
Parameters
-------------
squeue_std: str
Standard output from running command `squeue` in terminal
df = pd.DataFrame(data=dict_val)
if dict_val["JB_job_number"]:
Returns
-----------
df: pd.DataFrame
Job status based on `squeue` printed messages.
If there is no job in the queue, df will be an empty DataFrame
(i.e., Columns: [], Index: [])
"""
# Sanity check: if there is no job in queue:
if len(squeue_std.splitlines()) <= 1:
# there is only a header, no job is in queue:
df = pd.DataFrame(data=[]) # empty dataframe
else: # there are job(s) in queue (e.g., pending or running)
header_l = squeue_std.splitlines()[0].split()
datarows = squeue_std.splitlines()[1:]

# column index of these column names:
# NOTE: this is hard coded! Please check out `_request_all_job_status_slurm()`
# for the format of printed messages from `squeue`
dict_ind = {"jobid": 0, "st": 4, "state": 5, "time": 6}
# initialize a dict for holding the values from all jobs:
# ROADMAP: pd.DataFrame is probably more memory efficient than dicts
dict_val = dict((key, []) for key in dict_ind)

# sanity check: these fields show up in the header we got:
for fld in ["jobid", "st", "state", "time"]:
if header_l[dict_ind[fld]].lower() != fld:
raise Exception("error in the `squeue` output,"
+ f" expected {fld} and got {header_l[dict_ind[fld]].lower()}")

for row in datarows:
if "." not in row.split()[0]:
for key, ind in dict_ind.items():
dict_val[key].append(row.split()[ind])
# e.g.: dict_val: {'jobid': ['157414586', '157414584'],
# 'st': ['PD', 'R'], 'state': ['PENDING', 'RUNNING'], 'time': ['0:00', '0:52']}

# Renaming the keys, to be consistent with results got from SGE clusters:
dict_val["JB_job_number"] = dict_val.pop("jobid")
# change to lowercase, and rename the key:
dict_val["@state"] = [x.lower() for x in dict_val.pop("state")]
dict_val["duration"] = dict_val.pop("time")
# e.g.,: dict_val: {'st': ['PD', 'R'], 'JB_job_number': ['157414586', '157414584'],
# '@state': ['pending', 'running'], 'duration': ['0:00', '0:52']}
# NOTE: the 'duration' format might be slightly different from results from
# function `calcu_runtime()` used by SGE clusters.

# job state mapping from slurm to sge:
state_slurm2sge = {"R": "r", "PD": "qw"}
dict_val["state"] = [state_slurm2sge.get(sl_st, "NA") for sl_st in dict_val.pop("st")]
# e.g.,: dict_val: {'JB_job_number': ['157414586', '157414584'],
# '@state': ['pending', 'running'], 'duration': ['0:00', '0:52'], 'state': ['qw', 'r']}

df = pd.DataFrame(data=dict_val)
df = df.set_index('JB_job_number')

return df
Expand All @@ -1749,13 +1785,23 @@ def calcu_runtime(start_time_str):
Can be got via `df.at['2820901', 'JAT_start_time']`
Example on CUBIC: ''
TODO: add type_system
Returns:
-----------------
duration_time_str: str
Duration time of running.
Format: '0:00:05.050744' (i.e., ~5sec), '2 days, 0:00:00'
Notes:
---------
TODO: add type_system if needed
Currently we don't need to add `type_system`. Whether 'duration' has been returned
is checked before current function is called.
However the format of the duration that got from Slurm cluster might be a bit different from
what we get here. See examples in function `_parsing_squeue_out()` for Slurm clusters.
This duration time may be slightly longer than actual
time, as this is using current time, instead of
the time when `qstat`/requesting job queue.
"""
# format of time in the job status requested:
format_job_status = '%Y-%m-%dT%H:%M:%S' # format in `qstat`
Expand Down Expand Up @@ -2005,28 +2051,88 @@ def _check_job_account_slurm(job_id_str, job_name, username_lowercase):
"""
get information for a finished job in Slurm by calling `sacct`
"""
proc_qacct = subprocess.run(
msg_no_sacct = "BABS: sacct doesn't provide information about the job."
if_no_sacct = False
msg_more_than_one = "BABS: sacct detects more than one job for this job ID."

len_char_jobid = 20
len_char_jobname = 50

the_delimiter = "!" # use a special delimiter for easy parsing
# ^^ if parsing with default e.g., space:
# will have problem when State is "CANCELLED by 78382" - it will also be parsed out...
proc_sacct = subprocess.run(
["sacct", "-u", username_lowercase,
"-j", job_id_str],
"-j", job_id_str,
"--format=JobID%" + str(len_char_jobid) + ","
+ "JobName%" + str(len_char_jobname) + ",State%30,ExitCode%15",
# ^^ specific format: column names and the number of chars
# e.g., '--format=JobID%20,JobName%50,State%30,ExitCode%15'
"--parsable2", # Output will be delimited without a delimiter at the end.
"--delimiter=" + the_delimiter],
stdout=subprocess.PIPE
)
# ref: https://slurm.schedmd.com/sacct.html

proc_qacct.check_returncode()
msg_l = proc_qacct.stdout.decode('utf-8').split("\n")
msg_head = msg_l[0].split()
if "State" not in msg_head or "JobID" not in msg_head or "JobName" not in msg_head:
return "sacct doesn't provide information about the job"

st_ind = msg_head.index("State")
jobid_ind = msg_head.index("JobID")
jobnm_ind = msg_head.index("JobName")
job_saact = msg_l[2].split() # the 2nd row should have the main job
proc_sacct.check_returncode()
# even if the job does not exist, there will still be printed msg from sacct,
# at least a header. So `check_returncode()` should always succeed.
msg_l = proc_sacct.stdout.decode('utf-8').split("\n")
msg_head = msg_l[0].split(the_delimiter) # list of column names

if job_saact[jobid_ind] != job_id_str or job_saact[jobnm_ind] != job_name:
return "sacct doesn't have the info for the specific job or the format is different"
# Check if there is any problem when calling `sacct` for this job:
if "State" not in msg_head or "JobID" not in msg_head or "JobName" not in msg_head:
if_no_sacct = True
if len(msg_l) <= 2 or msg_l[2] == '':
# if there is only header (len <= 2 or the 3rd element is empty):
if_no_sacct = True

return "sacct state: " + job_saact[st_ind]
if if_no_sacct: # there is no information about this job in sacct:
warnings.warn("`sacct` did not provide information about job " + job_id_str
+ ", " + job_name)
print("Hint: check if the job is still in the queue,"
" e.g., in state of pending, running, etc")
print("Hint: check if the username used for submitting this job"
+ " was not current username '" + username_lowercase + "'")
msg_toreturn = msg_no_sacct
else:
# create a pd.DataFrame for printed messages from `sacct`:
df = pd.DataFrame(data=[], columns=msg_head)
msg_l_jobs = msg_l[1:] # only keeps rows for jobs
# ^^ NOTE: if using `--parsable2` and `--delimiter`, there is no 2nd line of "----" dashes
for i_row in range(0, len(msg_l_jobs)):
if msg_l_jobs[i_row] == '': # empty
pass
else:
# add to df:
df.loc[len(df)] = msg_l_jobs[i_row].split(the_delimiter)

# find the row that matches the job id and job name
# i.e., without '.batch' or '.extern'; usually is the first line:
temp = df.index[(df["JobID"] == job_id_str)
& (df["JobName"] == job_name)].tolist()
if len(temp) == 0: # did not find the job:
warnings.warn("`sacct` did not provide information about job " + job_id_str
+ ", " + job_name)
print("Hint: check if the job is still in the queue,"
" e.g., in state of pending, running, etc")
print("Hint: check if the username used for submitting this job"
+ " was not current username '" + username_lowercase + "'")
print("Hint: check if the job ID is more than " + str(len_char_jobid) + " chars,"
" or job name is more than " + str(len_char_jobname) + " chars.")
msg_toreturn = msg_no_sacct
elif len(temp) > 1: # more than one matched:
warnings.warn("`sacct` detects more than one job for this job "
+ job_id_str
+ ", " + job_name)
print("Hint: check if the job ID is more than " + str(len_char_jobid) + " chars,"
" or job name is more than " + str(len_char_jobname) + " chars.")
msg_toreturn = msg_more_than_one
else: # expected, only one:
msg_toreturn = "sacct: state: " \
+ df.loc[temp[0], "State"] # `temp[0]`: first and the only element from list `temp`

return msg_toreturn

def _check_job_account_sge(job_id_str, job_name, username_lowercase):
"""
Expand Down
11 changes: 9 additions & 2 deletions notebooks/testing_babs_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@

# ++++++++++++++++++++++++++++++++++++++++++++++++
flag_instance = "toybidsapp"
type_session = "single-ses"
type_session = "multi-ses"
count = 1

flag_where = "cubic" # "cubic" or "local"
flag_where = "msi" # "cubic" or "local" or "msi"
# ++++++++++++++++++++++++++++++++++++++++++++++++

# where:
if flag_where == "cubic":
where_project = "/cbica/projects/BABS/data"
elif flag_where == "local":
where_project = "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data"
elif flag_where == "msi":
where_project = "/home/faird/zhaoc/data"
else:
raise Exception("not valid `flag_where`!")

Expand All @@ -36,6 +38,11 @@
else:
raise Exception("not valid `flag_instance`!")

project_root = op.join(where_project, project_name)

print("--project-root:")
print(project_root)

babs_project = op.join(where_project, project_name)

# babs_submit_main()
Expand Down

0 comments on commit c8f86c3

Please sign in to comment.