From e4fbe739139e5fed625097fc31196e4a6b0124a2 Mon Sep 17 00:00:00 2001 From: Tien Tong Date: Thu, 5 Dec 2024 08:55:42 -0500 Subject: [PATCH] Initial changes to implement array submission Not all babs arguments are modified yet, such as babs-submit --job --- babs/babs.py | 662 +++++++++++++++++++++++++++----------------------- babs/cli.py | 6 +- babs/utils.py | 265 ++++++++++++++------ 3 files changed, 554 insertions(+), 379 deletions(-) diff --git a/babs/babs.py b/babs/babs.py index 7f99c8a9..f76aaf5e 100644 --- a/babs/babs.py +++ b/babs/babs.py @@ -38,9 +38,10 @@ generate_cmd_datalad_run, generate_cmd_determine_zipfilename, get_list_sub_ses, - submit_one_job, - df_update_one_job, - prepare_job_ind_list, + submit_array, + df_submit_update, + df_status_update, + prepare_job_array_df, submit_one_test_job, read_job_status_csv, report_job_status, @@ -116,6 +117,9 @@ def __init__(self, project_root, type_session, type_system): job_status_path_abs: str Absolute path of `job_status_path_abs`. Example: '/path/to/analysis/code/job_status.csv' + job_submit_path_abs: str + Absolute path of `job_submit_path_abs`. + Example: '/path/to/analysis/code/job_submit.csv' ''' # validation: @@ -153,6 +157,8 @@ def __init__(self, project_root, type_session, type_system): self.job_status_path_rel = 'code/job_status.csv' self.job_status_path_abs = op.join(self.analysis_path, self.job_status_path_rel) + self.job_submit_path_abs = op.join(self.analysis_path, + 'code/job_submit.csv') def datalad_save(self, path, message=None): """ @@ -325,8 +331,8 @@ def babs_bootstrap(self, input_ds, babs_proj_config_file.write("input_ds:\n") # input dataset's name(s) for i_ds in range(0, input_ds.num_ds): babs_proj_config_file.write(" $INPUT_DATASET_#" + str(i_ds+1) + ":\n") - babs_proj_config_file.write(" name: '" + input_ds.df["name"][i_ds] + "'\n") - babs_proj_config_file.write(" path_in: '" + input_ds.df["path_in"][i_ds] + "'\n") + babs_proj_config_file.write(" name: '" + input_ds.df.loc[i_ds, "name"] + "'\n") + babs_proj_config_file.write(" path_in: '" + input_ds.df.loc[i_ds, "path_in"] + "'\n") babs_proj_config_file.write(" path_data_rel: 'TO_BE_FILLED'\n") babs_proj_config_file.write(" is_zipped: 'TO_BE_FILLED'\n") # container ds: @@ -368,18 +374,18 @@ def babs_bootstrap(self, input_ds, for i_ds in range(0, input_ds.num_ds): # path to cloned dataset: i_ds_path = op.join(self.analysis_path, - input_ds.df["path_now_rel"][i_ds]) + input_ds.df.loc[i_ds, "path_now_rel"]) print("Cloning input dataset #" + str(i_ds+1) + ": '" - + input_ds.df["name"][i_ds] + "'") + + input_ds.df.loc[i_ds, "name"] + "'") # clone input dataset(s) as sub-dataset into `analysis` dataset: dlapi.clone(dataset=self.analysis_path, - source=input_ds.df["path_in"][i_ds], # input dataset(s) + source=input_ds.df.loc[i_ds, "path_in"], # input dataset(s) path=i_ds_path) # path to clone into # amend the previous commit with a nicer commit message: proc_git_commit_amend = subprocess.run( ["git", "commit", "--amend", "-m", - "Register input data dataset '" + input_ds.df["name"][i_ds] + "Register input data dataset '" + input_ds.df.loc[i_ds, "name"] + "' as a subdataset"], cwd=self.analysis_path, stdout=subprocess.PIPE @@ -406,10 +412,10 @@ def babs_bootstrap(self, input_ds, ds_index_str = "$INPUT_DATASET_#" + str(i_ds+1) # update `path_data_rel`: babs_proj_config["input_ds"][ds_index_str]["path_data_rel"] = \ - input_ds.df["path_data_rel"][i_ds] + input_ds.df.loc[i_ds, "path_data_rel"] # update `is_zipped`: babs_proj_config["input_ds"][ds_index_str]["is_zipped"] = \ - input_ds.df["is_zipped"][i_ds] + input_ds.df.loc[i_ds, "is_zipped"] # dump: write_yaml(babs_proj_config, self.config_path, if_filelock=True) # datalad save: update: @@ -519,7 +525,7 @@ def babs_bootstrap(self, input_ds, print("DataLad dropping input dataset's contents...") for i_ds in range(0, input_ds.num_ds): _ = self.analysis_datalad_handle.drop( - path=input_ds.df["path_now_rel"][i_ds], + path=input_ds.df.loc[i_ds, "path_now_rel"], recursive=True, # and potential subdataset reckless='availability') # not to check availability @@ -586,7 +592,7 @@ def clean_up(self, input_ds): print("Removing input dataset(s) if cloned...") for i_ds in range(0, input_ds.num_ds): # check if it exists yet: - path_now_abs = op.join(self.analysis_path, input_ds.df["path_now_rel"][i_ds]) + path_now_abs = op.join(self.analysis_path, input_ds.df.loc[i_ds, "path_now_rel"]) if op.exists(path_now_abs): # this input dataset has been cloned: # use `datalad remove` to remove: _ = self.analysis_datalad_handle.remove( @@ -697,18 +703,18 @@ def babs_check_setup(self, input_ds, flag_job_test): # check each input ds: for i_ds in range(0, input_ds.num_ds): - path_now_abs = input_ds.df["path_now_abs"][i_ds] + path_now_abs = input_ds.df.loc[i_ds, "path_now_abs"] # check if the dir of this input ds exists: assert op.exists(path_now_abs), \ "The path to the cloned input dataset #" + str(i_ds + 1) \ - + " '" + input_ds.df["name"][i_ds] + "' does not exist: " \ + + " '" + input_ds.df.loc[i_ds, "name"] + "' does not exist: " \ + path_now_abs # check if dir of input ds is a datalad dataset: assert op.exists(op.join(path_now_abs, ".datalad/config")), \ "The input dataset #" + str(i_ds + 1) \ - + " '" + input_ds.df["name"][i_ds] + "' is not a valid DataLad dataset:" \ + + " '" + input_ds.df.loc[i_ds, "name"] + "' is not a valid DataLad dataset:" \ + " There is no file '.datalad/config' in its directory: " + path_now_abs # ROADMAP: check if input dataset ID saved in YAML file @@ -1021,47 +1027,66 @@ def babs_submit(self, count=1, df_job_specified=None): # in `cli.py` # Load the csv file - lock_path = self.job_status_path_abs + ".lock" + lock_path = self.job_submit_path_abs + ".lock" lock = FileLock(lock_path) try: with lock.acquire(timeout=5): # lock the file, i.e., lock job status df df_job = read_job_status_csv(self.job_status_path_abs) - # getting list of jobs indices to submit (based either on df_job_specified or count): - job_ind_list = prepare_job_ind_list(df_job, df_job_specified, count, self.type_session) - df_job_updated = df_job.copy() - for i_progress, i_job in enumerate(job_ind_list): - # Submit a job: - if self.type_session == "single-ses": - sub = df_job.at[i_job, "sub_id"] - ses = None - else: # multi-ses - sub = df_job.at[i_job, "sub_id"] - ses = df_job.at[i_job, "ses_id"] - - job_id, _, log_filename = \ - submit_one_job(self.analysis_path, - self.type_session, - self.type_system, - sub, ses) - df_job_updated = df_update_one_job(df_job_updated, i_job, job_id, - log_filename, submitted=True) + # create and save a job array df to submit (based either on df_job_specified or count): + df_job_submit = prepare_job_array_df( + df_job, df_job_specified, count, self.type_session + ) + # only run `babs-submit` when there are subjects/sessions not yet submitted + if df_job_submit.shape[0] > 0: + maxarray = str(df_job_submit.shape[0]) + # run array submission + job_id, _, task_id_list, log_filename_list = submit_array( + self.analysis_path, self.type_session, self.type_system, maxarray + ) + # Update `analysis/code/job_submit.csv` with new status + df_job_submit_updated = df_submit_update( + df_job_submit, + job_id, + task_id_list, + log_filename_list, + submitted=True, + ) + # Update `analysis/code/job_status.csv` with new status + df_job_updated = df_job.copy() + df_job_updated = df_status_update( + df_job_updated, + df_job_submit_updated, + submitted=True, + ) + # COMMENT OUT BECAUSE ONLY 1 JOB IS SUBMITTED AT A TIME # if it's several times of `count_report_progress`: - if (i_progress + 1) % count_report_progress == 0: - print('So far ' + str(i_progress + 1) + ' jobs have been submitted.') - - num_rows_to_print = 6 - print("\nFirst " + str(num_rows_to_print) - + " rows of 'analysis/code/job_status.csv':") - with pd.option_context('display.max_rows', None, - 'display.max_columns', None, - 'display.width', 120): # default is 80 characters... - # ^^ print all the columns and rows (with returns) - print(df_job_updated.head(num_rows_to_print)) # only first several rows - - # save updated df: - df_job_updated.to_csv(self.job_status_path_abs, index=False) + # if (i_progress + 1) % count_report_progress == 0: + # print('So far ' + str(i_progress + 1) + ' jobs have been submitted.') + + num_rows_to_print = 6 + print( + "\nFirst " + + str(num_rows_to_print) + + " rows of 'analysis/code/job_status.csv':" + ) + with pd.option_context( + "display.max_rows", + None, + "display.max_columns", + None, + "display.width", + 120, + ): # default is 80 characters... + # ^^ print all the columns and rows (with returns) + print( + df_job_updated.head(num_rows_to_print) + ) # only first several rows + + # save updated df: + df_job_updated.to_csv(self.job_status_path_abs, index=False) + df_job_submit_updated.to_csv(self.job_submit_path_abs, index=False) # here, the job status was not checked, so message from `report_job_status()` # based on current df is not trustable: @@ -1074,7 +1099,7 @@ def babs_submit(self, count=1, df_job_specified=None): print("Another instance of this application currently holds the lock.") def babs_status(self, flags_resubmit, - df_resubmit_job_specific=None, reckless=False, + df_resubmit_task_specific=None, reckless=False, container_config_yaml_file=None, job_account=False): """ @@ -1086,12 +1111,12 @@ def babs_status(self, flags_resubmit, Under what condition to perform job resubmit. Element choices are: 'failed', 'pending'. CLI does not support 'stalled' right now, as it's not tested. - df_resubmit_job_specific: pd.DataFrame or None + df_resubmit_task_specific: pd.DataFrame or None list of specified job(s) to resubmit, requested by `--resubmit-job` columns: 'sub_id' (and 'ses_id', if multi-ses) if `--resubmit-job` was not specified in `babs-status`, it will be None. reckless: bool - Whether to resubmit jobs listed in `df_resubmit_job_specific`, + Whether to resubmit jobs listed in `df_resubmit_task_specific`, even they're done or running. This is used when `--resubmit-job`. NOTE: currently this argument has not been tested; @@ -1113,7 +1138,7 @@ def babs_status(self, flags_resubmit, from .constants import MSG_NO_ALERT_IN_LOGS # Load the csv file - lock_path = self.job_status_path_abs + ".lock" + lock_path = self.job_submit_path_abs + ".lock" lock = FileLock(lock_path) # Prepare for checking alert messages in log files: @@ -1145,44 +1170,47 @@ def babs_status(self, flags_resubmit, # Update job status, and resubmit if requested: # get the list of jobs submitted, but `is_done` is not True: temp = (df_job['has_submitted']) & (~df_job['is_done']) - list_index_job_tocheck = df_job.index[temp].tolist() - for i_job in list_index_job_tocheck: + list_index_task_tocheck = df_job.index[temp].tolist() + for i_task in list_index_task_tocheck: # Get basic information for this job: - job_id = df_job.at[i_job, "job_id"] + job_id = df_job.at[i_task, "job_id"] job_id_str = str(job_id) - log_filename = df_job.at[i_job, "log_filename"] # with "*" + task_id = df_job.at[i_task, "task_id"] + task_id_str = str(task_id) + job_task_id_str = job_id_str + "_" + task_id_str # eg: 3536406_1 + log_filename = df_job.at[i_task, "log_filename"] # with "*" log_fn = op.join(self.analysis_path, "logs", log_filename) # abs path o_fn = log_fn.replace(".*", ".o") # did_resubmit = False # reset: did not resubmit this job if self.type_session == "single-ses": - sub = df_job.at[i_job, "sub_id"] + sub = df_job.at[i_task, "sub_id"] ses = None branchname = "job-" + job_id_str + "-" + sub # e.g., job-00000-sub-01 elif self.type_session == "multi-ses": - sub = df_job.at[i_job, "sub_id"] - ses = df_job.at[i_job, "ses_id"] + sub = df_job.at[i_task, "sub_id"] + ses = df_job.at[i_task, "ses_id"] branchname = "job-" + job_id_str + "-" + sub + "-" + ses # e.g., job-00000-sub-01-ses-B - # Check if resubmission of this job is requested: - if_request_resubmit_this_job = False - if df_resubmit_job_specific is not None: + # Check if resubmission of this task is requested: + if_request_resubmit_this_task = False + if df_resubmit_task_specific is not None: if self.type_session == "single-ses": - temp = df_resubmit_job_specific['sub_id'] == sub + temp = df_resubmit_task_specific['sub_id'] == sub elif self.type_session == "multi-ses": - temp = (df_resubmit_job_specific['sub_id'] == sub) & \ - (df_resubmit_job_specific['ses_id'] == ses) + temp = (df_resubmit_task_specific['sub_id'] == sub) & \ + (df_resubmit_task_specific['ses_id'] == ses) if any(temp): # any matched; `temp` is pd.Series of True or False - if_request_resubmit_this_job = True + if_request_resubmit_this_task = True # print("debugging purpose: request to resubmit job: " + sub + ", " + ses) # ^^ only for multi-ses! # Update the "last_line_stdout_file": - df_job_updated.at[i_job, "last_line_stdout_file"] = \ + df_job_updated.at[i_task, "last_line_stdout_file"] = \ get_last_line(o_fn) # Check if any alert message in log files for this job: @@ -1192,36 +1220,36 @@ def babs_status(self, flags_resubmit, alert_message_in_log_files, if_no_alert_in_log, if_found_log_files = \ get_alert_message_in_log_files(config_msg_alert, log_fn) # ^^ the function will handle even if `config_msg_alert=None` - df_job_updated.at[i_job, "alert_message"] = \ + df_job_updated.at[i_task, "alert_message"] = \ alert_message_in_log_files # Check if there is a branch in output RIA: # check if branch name of current job is in the list of all branches: if branchname in list_branches: # found the branch: - df_job_updated.at[i_job, "is_done"] = True + df_job_updated.at[i_task, "is_done"] = True # reset/update: - df_job_updated.at[i_job, "job_state_category"] = np.nan - df_job_updated.at[i_job, "job_state_code"] = np.nan - df_job_updated.at[i_job, "duration"] = np.nan + df_job_updated.at[i_task, "job_state_category"] = np.nan + df_job_updated.at[i_task, "job_state_code"] = np.nan + df_job_updated.at[i_task, "duration"] = np.nan # ROADMAP: ^^ get duration via `qacct` # (though qacct may not be accurate) - df_job_updated.at[i_job, "is_failed"] = False + df_job_updated.at[i_task, "is_failed"] = False # check if echoed "SUCCESS": - # TODO ^^ - + # TODO ^^ + else: # did not find the branch # Check the job status: - if job_id_str in df_all_job_status.index.to_list(): + if job_task_id_str in df_all_job_status.index.to_list(): # ^^ if `df` is empty, `.index.to_list()` will return [] - state_category = df_all_job_status.at[job_id_str, '@state'] - state_code = df_all_job_status.at[job_id_str, 'state'] + state_category = df_all_job_status.at[job_task_id_str, '@state'] + state_code = df_all_job_status.at[job_task_id_str, 'state'] # ^^ column `@state`: 'running' or 'pending' if state_code == "r": # Check if resubmit is requested: - if if_request_resubmit_this_job & (not reckless): + if if_request_resubmit_this_task & (not reckless): # requested resubmit, but without `reckless`: print msg to_print = "Although resubmission for job: " + sub if self.type_session == "multi-ses": @@ -1232,56 +1260,63 @@ def babs_status(self, flags_resubmit, # can add this ^^ back after supporting `--reckless` in CLI warnings.warn(to_print) - if if_request_resubmit_this_job & reckless: # force to resubmit: - # Resubmit: - # did_resubmit = True - # print a message: - to_print = "Resubmit job for " + sub - if self.type_session == "multi-ses": - to_print += ", " + ses - to_print += ", although it was running," \ - + " resubmit for this job was requested" \ - + " and `--reckless` was specified." - print(to_print) + # COMMENT OUT BECAUSE reckless is always False + # AND THIS HAS BEEN REMOVE FROM CLI + # if if_request_resubmit_this_task & reckless: # force to resubmit: + # # Resubmit: + # # did_resubmit = True + # # print a message: + # to_print = "Resubmit job for " + sub + # if self.type_session == "multi-ses": + # to_print += ", " + ses + # to_print += ", although it was running," \ + # + " resubmit for this job was requested" \ + # + " and `--reckless` was specified." + # print(to_print) - # kill original one - proc_kill = subprocess.run( - [get_cmd_cancel_job(self.type_system), - job_id_str], # e.g., `qdel ` - stdout=subprocess.PIPE - ) - proc_kill.check_returncode() - # submit new one: - job_id_updated, _, log_filename = \ - submit_one_job(self.analysis_path, - self.type_session, - self.type_system, - sub, ses) - # update fields: - df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, - log_filename, debug=True) + # # kill original one + # proc_kill = subprocess.run( + # [get_cmd_cancel_job(self.type_system), + # job_id_str], # e.g., `qdel ` + # stdout=subprocess.PIPE + # ) + # proc_kill.check_returncode() + # # submit new one: + # job_id_updated, _, log_filename = \ + # submit_one_job(self.analysis_path, + # self.type_session, + # self.type_system, + # sub, ses) + # # update fields: + # df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, + # log_filename, debug=True) else: # just let it run: - df_job_updated.at[i_job, "job_state_category"] = state_category - df_job_updated.at[i_job, "job_state_code"] = state_code + df_job_updated.at[i_task, "job_state_category"] = state_category + df_job_updated.at[i_task, "job_state_code"] = state_code # get the duration: if "duration" in df_all_job_status: # e.g., slurm `squeue` automatically returns the duration, # so no need to calcu again. - duration = df_all_job_status.at[job_id_str, "duration"] + duration = df_all_job_status.at[job_task_id_str, "duration"] else: # This duration time may be slightly longer than actual # time, as this is using current time, instead of # the time when `qstat`/requesting job queue. duration = calcu_runtime( - df_all_job_status.at[job_id_str, "JAT_start_time"]) - df_job_updated.at[i_job, "duration"] = duration + df_all_job_status.at[job_task_id_str, "JAT_start_time"]) + df_job_updated.at[i_task, "duration"] = duration # do nothing else, just wait elif state_code == "qw": - if ('pending' in flags_resubmit) or (if_request_resubmit_this_job): + # pending so set `is_failed` to False + df_job_updated.at[i_task, "is_failed"] = False + # resubmit pending + if ('pending' in flags_resubmit) or (if_request_resubmit_this_task): # Resubmit: # did_resubmit = True + df_job_updated.at[i_task, "needs_resubmit"] = True + # print a message: to_print = "Resubmit job for " + sub if self.type_session == "multi-ses": @@ -1296,75 +1331,69 @@ def babs_status(self, flags_resubmit, stdout=subprocess.PIPE ) proc_kill.check_returncode() - # submit new one: - job_id_updated, _, log_filename = \ - submit_one_job(self.analysis_path, - self.type_session, - self.type_system, - sub, ses) - # update fields: - df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, - log_filename, debug=True) + # RESUBMIT ARRAY BELOW + else: # not to resubmit: # update fields: - df_job_updated.at[i_job, "job_state_category"] = state_category - df_job_updated.at[i_job, "job_state_code"] = state_code - - elif state_code == "eqw": - # NOTE: comment out resubmission of `eqw` jobs - # as this was not tested out; - # also, equivalent `eqw` code on Slurm was not mapped either. - - if ('stalled' in flags_resubmit) or (if_request_resubmit_this_job): - # requested resubmit, - # but currently not support resubmitting stalled jobs: - # print warning msg: - to_print = "Although resubmission for job: " + sub - if self.type_session == "multi-ses": - to_print += ", " + ses - to_print += " was requested, as this job is stalled" \ - + " (e.g., job state code 'eqw' on SGE)," \ - + " BABS won't resubmit this job." - warnings.warn(to_print) - - # # Resubmit: - # # did_resubmit = True - # # print a message: - # to_print = "Resubmit job for " + sub - # if self.type_session == "multi-ses": - # to_print += ", " + ses - # to_print += ", as it was stalled and resubmit was requested." - # print(to_print) - - # # kill original one - # proc_kill = subprocess.run( - # [get_cmd_cancel_job(self.type_system), - # job_id_str], # e.g., `qdel ` - # stdout=subprocess.PIPE - # ) - # proc_kill.check_returncode() - # # submit new one: - # job_id_updated, _, log_filename = \ - # submit_one_job(self.analysis_path, - # self.type_session, - # self.type_system, - # sub, ses) - # # update fields: - # df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, - # log_filename, debug=True) - # else: # not to resubmit: - - # only update fields: - df_job_updated.at[i_job, "job_state_category"] = state_category - df_job_updated.at[i_job, "job_state_code"] = state_code + df_job_updated.at[i_task, "job_state_category"] = state_category + df_job_updated.at[i_task, "job_state_code"] = state_code + + # COMMENT OUT BECAUSE "eqw" is SGE STATE + # elif state_code == "eqw": + # # NOTE: comment out resubmission of `eqw` jobs + # # as this was not tested out; + # # also, equivalent `eqw` code on Slurm was not mapped either. + + # if ('stalled' in flags_resubmit) or (if_request_resubmit_this_task): + # # requested resubmit, + # # but currently not support resubmitting stalled jobs: + # # print warning msg: + # to_print = "Although resubmission for job: " + sub + # if self.type_session == "multi-ses": + # to_print += ", " + ses + # to_print += " was requested, as this job is stalled" \ + # + " (e.g., job state code 'eqw' on SGE)," \ + # + " BABS won't resubmit this job." + # warnings.warn(to_print) + + # # # Resubmit: + # # # did_resubmit = True + # # # print a message: + # # to_print = "Resubmit job for " + sub + # # if self.type_session == "multi-ses": + # # to_print += ", " + ses + # # to_print += ", as it was stalled and resubmit was requested." + # # print(to_print) + + # # # kill original one + # # proc_kill = subprocess.run( + # # [get_cmd_cancel_job(self.type_system), + # # job_id_str], # e.g., `qdel ` + # # stdout=subprocess.PIPE + # # ) + # # proc_kill.check_returncode() + # # # submit new one: + # # job_id_updated, _, log_filename = \ + # # submit_one_job(self.analysis_path, + # # self.type_session, + # # self.type_system, + # # sub, ses) + # # # update fields: + # # df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, + # # log_filename, debug=True) + # # else: # not to resubmit: + + # # only update fields: + # df_job_updated.at[i_task, "job_state_category"] = state_category + # df_job_updated.at[i_task, "job_state_code"] = state_code else: # did not find in `df_all_job_status`, i.e., job queue # probably error - df_job_updated.at[i_job, "is_failed"] = True + df_job_updated.at[i_task, "is_failed"] = True # reset: - df_job_updated.at[i_job, "job_state_category"] = np.nan - df_job_updated.at[i_job, "job_state_code"] = np.nan - df_job_updated.at[i_job, "duration"] = np.nan + df_job_updated.at[i_task, "job_state_category"] = np.nan + df_job_updated.at[i_task, "job_state_code"] = np.nan + df_job_updated.at[i_task, "duration"] = np.nan # ROADMAP: ^^ get duration via `qacct` if if_found_log_files == False: # bool or np.nan # If there is no log files, the alert message would be 'np.nan'; @@ -1373,42 +1402,35 @@ def babs_status(self, flags_resubmit, # change the 'alert_message' to no alert in logs, # so that when reporting job status, # info from job accounting will be reported - df_job_updated.at[i_job, "alert_message"] = \ + df_job_updated.at[i_task, "alert_message"] = \ MSG_NO_ALERT_IN_LOGS # check the log file: # TODO ^^ - # TODO: assign error category in df; also print it out + # TODO: assign error category in df; also print it out # resubmit if requested: - if ("failed" in flags_resubmit) or (if_request_resubmit_this_job): + elif ("failed" in flags_resubmit) or (if_request_resubmit_this_task): # Resubmit: # did_resubmit = True + df_job_updated.at[i_task, "needs_resubmit"] = True + # print a message: to_print = "Resubmit job for " + sub if self.type_session == "multi-ses": to_print += ", " + ses - to_print += ", as it is failed and resubmit was requested." + to_print += ", as it failed and resubmit was requested." print(to_print) # no need to kill original one! # As it already failed and out of job queue... + # RESUBMIT ARRAY BELOW - # submit new one: - job_id_updated, _, log_filename = \ - submit_one_job(self.analysis_path, - self.type_session, - self.type_system, - sub, ses) - - # update fields: - df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, - log_filename, debug=True) else: # resubmit 'error' was not requested: # reset: - df_job_updated.at[i_job, "job_state_category"] = np.nan - df_job_updated.at[i_job, "job_state_code"] = np.nan - df_job_updated.at[i_job, "duration"] = np.nan + df_job_updated.at[i_task, "job_state_category"] = np.nan + df_job_updated.at[i_task, "job_state_code"] = np.nan + df_job_updated.at[i_task, "duration"] = np.nan # ROADMAP: ^^ get duration via `qacct` # If `--job-account` is requested: @@ -1420,46 +1442,74 @@ def babs_status(self, flags_resubmit, check_job_account(job_id_str, job_name, username_lowercase, self.type_system) df_job_updated.at[i_job, "job_account"] = msg_job_account + + # Collect all to-be-resubmitted tasks into a single DataFrame + df_job_resubmit = df_job_updated[df_job_updated["needs_resubmit"] == True].copy() + df_job_resubmit.reset_index(drop=True, inplace=True) + if df_job_resubmit.shape[0] > 0: + maxarray = str(df_job_resubmit.shape[0]) + # run array submission + job_id, _, task_id_list, log_filename_list = submit_array( + self.analysis_path, self.type_session, self.type_system, maxarray + ) + # Update `analysis/code/job_submit.csv` with new status + df_job_resubmit_updated = df_submit_update( + df_job_resubmit, + job_id, + task_id_list, + log_filename_list, + submitted=True, + ) + # Update `analysis/code/job_status.csv` with new status + df_job_updated = df_status_update( + df_job_updated, + df_job_resubmit_updated, + submitted=True, + ) + df_job_resubmit_updated.to_csv(self.job_submit_path_abs, index=False) # Done: submitted jobs that not 'is_done' # For 'is_done' jobs in previous round: temp = (df_job['has_submitted']) & (df_job['is_done']) - list_index_job_is_done = df_job.index[temp].tolist() - for i_job in list_index_job_is_done: + list_index_task_is_done = df_job.index[temp].tolist() + for i_task in list_index_task_is_done: # Get basic information for this job: - job_id = df_job.at[i_job, "job_id"] + job_id = df_job.at[i_task, "job_id"] job_id_str = str(job_id) - log_filename = df_job.at[i_job, "log_filename"] # with "*" + task_id = df_job.at[i_task, "task_id"] + task_id_str = str(task_id) + job_task_id_str = job_id_str + "_" + task_id_str # eg: 3536406_1 + log_filename = df_job.at[i_task, "log_filename"] # with "*" log_fn = op.join(self.analysis_path, "logs", log_filename) # abs path o_fn = log_fn.replace(".*", ".o") if self.type_session == "single-ses": - sub = df_job.at[i_job, "sub_id"] + sub = df_job.at[i_task, "sub_id"] ses = None branchname = "job-" + job_id_str + "-" + sub # e.g., job-00000-sub-01 elif self.type_session == "multi-ses": - sub = df_job.at[i_job, "sub_id"] - ses = df_job.at[i_job, "ses_id"] + sub = df_job.at[i_task, "sub_id"] + ses = df_job.at[i_task, "ses_id"] branchname = "job-" + job_id_str + "-" + sub + "-" + ses # e.g., job-00000-sub-01-ses-B # Check if resubmission of this job is requested: - if_request_resubmit_this_job = False - if df_resubmit_job_specific is not None: + if_request_resubmit_this_task = False + if df_resubmit_task_specific is not None: if self.type_session == "single-ses": - temp = df_resubmit_job_specific['sub_id'] == sub + temp = df_resubmit_task_specific['sub_id'] == sub elif self.type_session == "multi-ses": - temp = (df_resubmit_job_specific['sub_id'] == sub) & \ - (df_resubmit_job_specific['ses_id'] == ses) + temp = (df_resubmit_task_specific['sub_id'] == sub) & \ + (df_resubmit_task_specific['ses_id'] == ses) if any(temp): # any matched; `temp` is pd.Series of True or False - if_request_resubmit_this_job = True + if_request_resubmit_this_task = True # print("debugging purpose: request to resubmit job:" + sub + ", " + ses) # ^^ only for multi-ses # if want to resubmit, but `--reckless` is NOT specified: print msg: - if if_request_resubmit_this_job & (not reckless): + if if_request_resubmit_this_task & (not reckless): to_print = "Although resubmission for job: " + sub if self.type_session == "multi-ses": to_print += ", " + ses @@ -1469,54 +1519,57 @@ def babs_status(self, flags_resubmit, # can add this ^^ back after supporting `--reckless` in CLI warnings.warn(to_print) + # COMMENT OUT BECAUSE reckless is always False + # AND THIS HAS BEEN REMOVE FROM CLI # if resubmit is requested, and `--reckless` is specified: - if if_request_resubmit_this_job & reckless: - # Resubmit: - # did_resubmit = True - # print a message: - to_print = "Resubmit job for " + sub - if self.type_session == "multi-ses": - to_print += ", " + ses - to_print += ", although it is done," \ - + " resubmit for this job was requested" \ - + " and `--reckless` was specified." - print(to_print) - - # TODO: delete the original branch? - - # kill original one - proc_kill = subprocess.run( - [get_cmd_cancel_job(self.type_system), - job_id_str], # e.g., `qdel ` - stdout=subprocess.PIPE - ) - proc_kill.check_returncode() - # submit new one: - job_id_updated, _, log_filename = \ - submit_one_job(self.analysis_path, - self.type_session, - self.type_system, - sub, ses) - # update fields: - df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, - log_filename, done=False, debug=True) + # if if_request_resubmit_this_task & reckless: + # # Resubmit: + # # did_resubmit = True + # # print a message: + # to_print = "Resubmit job for " + sub + # if self.type_session == "multi-ses": + # to_print += ", " + ses + # to_print += ", although it is done," \ + # + " resubmit for this job was requested" \ + # + " and `--reckless` was specified." + # print(to_print) + + # # TODO: delete the original branch? + + # # kill original one + # proc_kill = subprocess.run( + # [get_cmd_cancel_job(self.type_system), + # job_id_str], # e.g., `qdel ` + # stdout=subprocess.PIPE + # ) + # proc_kill.check_returncode() + # # submit new one: + # job_id_updated, _, log_filename = \ + # submit_one_job(self.analysis_path, + # self.type_session, + # self.type_system, + # sub, ses) + # # update fields: + # df_job_updated = df_update_one_job(df_job_updated, i_job, job_id_updated, + # log_filename, done=False, debug=True) + else: # did not request resubmit, or `--reckless` is None: # just perform normal stuff for a successful job: # Update the "last_line_stdout_file": - df_job_updated.at[i_job, "last_line_stdout_file"] = \ + df_job_updated.at[i_task, "last_line_stdout_file"] = \ get_last_line(o_fn) # Check if any alert message in log files for this job: # this is to update `alert_message` in case user changes configs in yaml alert_message_in_log_files, if_no_alert_in_log, _ = \ get_alert_message_in_log_files(config_msg_alert, log_fn) # ^^ the function will handle even if `config_msg_alert=None` - df_job_updated.at[i_job, "alert_message"] = \ + df_job_updated.at[i_task, "alert_message"] = \ alert_message_in_log_files # Done: 'is_done' jobs. # For jobs that haven't been submitted yet: # just to throw out warnings if `--resubmit-job` was requested... - if df_resubmit_job_specific is not None: + if df_resubmit_task_specific is not None: # only keep those not submitted: df_job_not_submitted = df_job[~df_job["has_submitted"]] # only keep columns of `sub_id` and `ses_id`: @@ -1526,7 +1579,7 @@ def babs_status(self, flags_resubmit, df_job_not_submitted_slim = df_job_not_submitted[["sub_id", "ses_id"]] # check if `--resubmit-job` was requested for any these jobs: - df_intersection = df_resubmit_job_specific.merge(df_job_not_submitted_slim) + df_intersection = df_resubmit_task_specific.merge(df_job_not_submitted_slim) if len(df_intersection) > 0: warnings.warn("Jobs for some of the subjects (and sessions) requested in" + " `--resubmit-job` haven't been submitted yet." @@ -1890,9 +1943,9 @@ def __init__(self, input_cli): # change the `input_cli` from nested list to a pandas dataframe: for i in range(0, self.num_ds): - self.df["name"][i] = input_cli[i][0] - self.df["path_in"][i] = input_cli[i][1] - self.df["path_now_rel"][i] = op.join("inputs/data", self.df["name"][i]) + self.df.loc[i, "name"] = input_cli[i][0] + self.df.loc[i, "path_in"] = input_cli[i][1] + self.df.loc[i, "path_now_rel"] = op.join("inputs/data", self.df.loc[i, "name"]) # sanity check: input ds names should not be identical: if len(set(self.df["name"].tolist())) != self.num_ds: # length of the set = number of ds @@ -1981,8 +2034,8 @@ def assign_path_now_abs(self, analysis_path): """ for i in range(0, self.num_ds): - self.df["path_now_abs"][i] = op.join(analysis_path, - self.df["path_now_rel"][i]) + self.df.loc[i, "path_now_abs"] = op.join(analysis_path, + self.df.loc[i, "path_now_rel"]) def check_if_zipped(self): """ @@ -1994,7 +2047,7 @@ def check_if_zipped(self): # Determine if it's a zipped dataset, for each input ds: for i_ds in range(0, self.num_ds): - temp_list = glob.glob(self.df["path_now_abs"][i_ds] + "/sub-*") + temp_list = glob.glob(self.df.loc[i_ds, "path_now_abs"] + "/sub-*") count_zip = 0 count_dir = 0 for i_temp in range(0, len(temp_list)): @@ -2004,29 +2057,29 @@ def check_if_zipped(self): count_zip += 1 if (count_zip > 0) & (count_dir == 0): # all are zip files: - self.df["is_zipped"][i_ds] = True - print("input dataset '" + self.df["name"][i_ds] + "'" + self.df.loc[i_ds, "is_zipped"] = True + print("input dataset '" + self.df.loc[i_ds, "name"] + "'" + " is considered as a zipped dataset.") elif (count_dir > 0) & (count_zip == 0): # all are directories: - self.df["is_zipped"][i_ds] = False - print("input dataset '" + self.df["name"][i_ds] + "'" + self.df.loc[i_ds, "is_zipped"] = False + print("input dataset '" + self.df.loc[i_ds, "name"]+ "'" + " is considered as an unzipped dataset.") elif (count_zip > 0) & (count_dir > 0): # detect both: - self.df["is_zipped"][i_ds] = True # consider as zipped - print("input dataset '" + self.df["name"][i_ds] + "'" + self.df.loc[i_ds, "is_zipped"] = True # consider as zipped + print("input dataset '" + self.df.loc[i_ds, "name"] + "'" + " has both zipped files and unzipped folders;" + " thus it's considered as a zipped dataset.") else: # did not detect any of them... raise Exception("BABS did not detect any folder or zip file of `sub-*`" - + " in input dataset '" + self.df["name"][i_ds] + "'.") + + " in input dataset '" + self.df.loc[i_ds, "name"] + "'.") # Assign `path_data_rel`: for i_ds in range(0, self.num_ds): - if self.df["is_zipped"][i_ds] is True: # zipped ds - self.df["path_data_rel"][i_ds] = op.join(self.df["path_now_rel"][i_ds], - self.df["name"][i_ds]) + if self.df.loc[i_ds, "is_zipped"] is True: # zipped ds + self.df.loc[i_ds, "path_data_rel"] = op.join(self.df.loc[i_ds, "path_now_rel"], + self.df.loc[i_ds, "name"]) else: # unzipped ds: - self.df["path_data_rel"][i_ds] = self.df["path_now_rel"][i_ds] + self.df.loc[i_ds, "path_data_rel"] = self.df.loc[i_ds, "path_now_rel"] def check_validity_zipped_input_dataset(self, type_session): """ @@ -2050,29 +2103,29 @@ def check_validity_zipped_input_dataset(self, type_session): print("Performing sanity check for any zipped input dataset..." " Getting example zip file(s) to check...") for i_ds in range(0, self.num_ds): - if self.df["is_zipped"][i_ds] is True: # zipped ds + if self.df.loc[i_ds, "is_zipped"] is True: # zipped ds # Sanity check #1: zip filename: ---------------------------------- if type_session == "multi-ses": # check if matches the pattern of `sub-*_ses-*_*.zip`: - temp_list = glob.glob(self.df["path_now_abs"][i_ds] - + "/sub-*_ses-*_" + self.df["name"][i_ds] + "*.zip") + temp_list = glob.glob(self.df.loc[i_ds, "path_now_abs"] + + "/sub-*_ses-*_" + self.df.loc[i_ds, "name"] + "*.zip") temp_list = sorted(temp_list) # sort by name if len(temp_list) == 0: # did not find any matched raise Exception("In zipped input dataset #" + str(i_ds + 1) - + " (named '" + self.df["name"][i_ds] + "')," + + " (named '" + self.df.loc[i_ds, "name"] + "')," + " no zip filename matches the pattern of" + " 'sub-*_ses-*_" - + self.df["name"][i_ds] + "*.zip'") + + self.df.loc[i_ds, "name"] + "*.zip'") elif type_session == "single-ses": - temp_list = glob.glob(self.df["path_now_abs"][i_ds] - + "/sub-*_" + self.df["name"][i_ds] + "*.zip") + temp_list = glob.glob(self.df.loc[i_ds, "path_now_abs"] + + "/sub-*_" + self.df.loc[i_ds, "name"] + "*.zip") temp_list = sorted(temp_list) # sort by name if len(temp_list) == 0: # did not find any matched raise Exception("In zipped input dataset #" + str(i_ds + 1) - + " (named '" + self.df["name"][i_ds] + "')," + + " (named '" + self.df.loc[i_ds, "name"] + "')," + " no zip filename matches the pattern of" + " 'sub-*_" - + self.df["name"][i_ds] + "*.zip'") + + self.df.loc[i_ds, "name"] + "*.zip'") # not to check below stuff anymore: # # also check there should not be `_ses-*_` # temp_list_2 = glob.glob(self.df["path_now_abs"][i_ds] @@ -2087,7 +2140,7 @@ def check_validity_zipped_input_dataset(self, type_session): # Sanity check #2: foldername within zipped file: ------------------- temp_zipfile = temp_list[0] # try out the first zipfile temp_zipfilename = op.basename(temp_zipfile) - dlapi.get(path=temp_zipfile, dataset=self.df["path_now_abs"][i_ds]) + dlapi.get(path=temp_zipfile, dataset=self.df.loc[i_ds, "path_now_abs"]) # unzip to a temporary folder and get the foldername temp_unzip_to = tempfile.mkdtemp() shutil.unpack_archive(temp_zipfile, temp_unzip_to) @@ -2095,14 +2148,14 @@ def check_validity_zipped_input_dataset(self, type_session): # remove the temporary folder: shutil.rmtree(temp_unzip_to) # `datalad drop` the zipfile: - dlapi.drop(path=temp_zipfile, dataset=self.df["path_now_abs"][i_ds]) + dlapi.drop(path=temp_zipfile, dataset=self.df.loc[i_ds, "path_now_abs"]) # check if there is folder named as ds's name: - if self.df["name"][i_ds] not in list_unzip_foldernames: + if self.df.loc[i_ds, "name"] not in list_unzip_foldernames: warnings.warn("In input dataset #" + str(i_ds + 1) - + " (named '" + self.df["name"][i_ds] + + " (named '" + self.df.loc[i_ds, "name"] + "'), there is no folder called '" - + self.df["name"][i_ds] + "' in zipped input file '" + + self.df.loc[i_ds, "name"] + "' in zipped input file '" + temp_zipfilename + "'. This may cause error" + " when running BIDS App for this subject/session") @@ -2295,7 +2348,7 @@ def generate_bash_run_bidsapp(self, bash_path, input_ds, type_session): flag_fs_license = False path_fs_license = None # copied from `generate_cmd_singularityRun_from_config`: - singuRun_input_dir = input_ds.df["path_data_rel"][0] + singuRun_input_dir = input_ds.df.loc[0, "path_data_rel"] else: # print("Generate singularity run command from `container_config_yaml_file`") # # contain \ for each key-value @@ -2333,9 +2386,9 @@ def generate_bash_run_bidsapp(self, bash_path, input_ds, type_session): # zip filename as input of bash file: for i_ds in range(0, input_ds.num_ds): - if input_ds.df["is_zipped"][i_ds] is True: # is zipped: + if input_ds.df.loc[i_ds, "is_zipped"] is True: # is zipped: count_inputs_bash += 1 - bash_file.write(input_ds.df["name"][i_ds].upper() + bash_file.write(input_ds.df.loc[i_ds, "name"].upper() + '_ZIP="$' + str(count_inputs_bash) + '"\n') bash_file.write("\n") @@ -2500,11 +2553,17 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session, bash_file.write("\n") bash_file.write('dssource="$1"\t# i.e., `input_ria`\n') bash_file.write('pushgitremote="$2"\t# i.e., `output_ria`\n') - bash_file.write('subid="$3"\n') - - if type_session == "multi-ses": - # also have the input of `sesid`: - bash_file.write('sesid="$4"\n') + bash_file.write('SUBJECT_CSV="$3"\n') + bash_file.write("\n") + bash_file.write( + "subject_row=$(head -n $((${SLURM_ARRAY_TASK_ID} + 1)) ${SUBJECT_CSV} | tail -n 1)\n" + ) + bash_file.write( + "subid=$(echo \"$subject_row\" | sed -n 's/^.*\\(sub-[A-Za-z0-9]*\\).*$/\\1/p')\n" + ) + bash_file.write( + "sesid=$(echo \"$subject_row\" | sed -n 's/^.*\\(ses-[A-Za-z0-9]*\\).*$/\\1/p')\n" + ) # Change path to a temporary job compute workspace: # the path is based on what users provide in section 'job_compute_space' in YAML file: @@ -2517,7 +2576,7 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session, if system.type == "sge": varname_jobid = "JOB_ID" elif system.type == "slurm": - varname_jobid = "SLURM_JOBID" + varname_jobid = "SLURM_ARRAY_JOB_ID" if type_session == "multi-ses": bash_file.write('BRANCH="job-${' + varname_jobid + '}-${subid}-${sesid}"' + '\n') @@ -2550,10 +2609,10 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session, bash_file.write( "\n# Pull down the input subject (or dataset) but don't retrieve data contents:\n") for i_ds in range(0, input_ds.num_ds): - if input_ds.df["is_zipped"][i_ds] is False: # unzipped ds: + if input_ds.df.loc[i_ds, "is_zipped"] is False: # unzipped ds: # seems regardless of multi-ses or not # as for multi-ses, it might uses other ses's data e.g., anat? - bash_file.write('datalad get -n "' + input_ds.df["path_now_rel"][i_ds] + bash_file.write('datalad get -n "' + input_ds.df.loc[i_ds, "path_now_rel"] + "/${subid}" + '"' + "\n") # ^^ `-n` means "Get (clone) a registered subdataset, but don’t retrieve data" # here input ds is a sub-dataset of dataset `analysis`. @@ -2564,7 +2623,7 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session, # TODO: try out if adding `-r` still works? # remove other sub's data: - bash_file.write("(cd " + input_ds.df["path_now_rel"][i_ds] + bash_file.write("(cd " + input_ds.df.loc[i_ds, "path_now_rel"] + " && rm -rf `find . -type d -name 'sub*'" + " | grep -v $subid`" + ")" + "\n") """ @@ -2574,7 +2633,7 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session, """ else: # zipped ds: bash_file.write('datalad get -n "' - + input_ds.df["path_now_rel"][i_ds] + + input_ds.df.loc[i_ds, "path_now_rel"] + '"' + "\n") # e.g., `datalad get -n "inputs/data/freesurfer"` # ^^ should NOT only get specific zip file, as right now we need to @@ -2584,7 +2643,7 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session, # bash_file.write('datalad get -n "' + input_ds.df["path_now_rel"][i_ds] # + "/${subid}_*" + input_ds.df["name"][i_ds] + "*.zip" # + '"' + "\n") - bash_file.write("(cd " + input_ds.df["path_now_rel"][i_ds] + bash_file.write("(cd " + input_ds.df.loc[i_ds, "path_now_rel"] + " && rm -f `ls sub-*.zip | grep -v ${subid}`" + ")" + "\n") """ @@ -2619,7 +2678,7 @@ def generate_bash_participant_job(self, bash_path, input_ds, type_session, bash_file.write("echo ${BRANCH}" + "\n") # each input dataset: for i_ds in range(0, input_ds.num_ds): - bash_file.write("datalad drop -d " + input_ds.df["path_now_rel"][i_ds] + " -r" + bash_file.write("datalad drop -d " + input_ds.df.loc[i_ds, "path_now_rel"] + " -r" + " --reckless availability" # previous `--nocheck` (deprecated) + " --reckless modification" # ^^ previous `--if-dirty ignore` (deprecated) @@ -2792,7 +2851,7 @@ def generate_job_submit_template(self, yaml_path, babs, system, test=False): # Write into the bash file: yaml_file = open(yaml_path, "a") # open in append mode if not test: - yaml_file.write("# '${sub_id}' and '${ses_id}' are placeholders." + "\n") + yaml_file.write("# '${max_array}' is a placeholder." + "\n") # Variables to use: # `dssource`: Input RIA: dssource = babs.input_ria_url + "#" + babs.analysis_dataset_id @@ -2811,9 +2870,7 @@ def generate_job_submit_template(self, yaml_path, babs, system, test=False): if test: job_name = self.container_name[0:3] + "_" + "test_job" else: - job_name = self.container_name[0:3] + "_" + "${sub_id}" - if babs.type_session == "multi-ses": - job_name += "_${ses_id}" + job_name = self.container_name[0:3] # Now, we can define stdout and stderr file names/paths: if system.type == "sge": @@ -2822,23 +2879,22 @@ def generate_job_submit_template(self, yaml_path, babs, system, test=False): + "-o " + babs.analysis_path + "/logs" elif system.type == "slurm": # slurm clusters also need exact filenames: - eo_args = "-e " + babs.analysis_path + f"/logs/{job_name}.e%A " \ - + "-o " + babs.analysis_path + f"/logs/{job_name}.o%A" + eo_args = "-e " + babs.analysis_path + f"/logs/{job_name}.e%A_%a " \ + + "-o " + babs.analysis_path + f"/logs/{job_name}.o%A_%a" + # array task id starts from 0 so that max_array == count + array_args = "--array=1-${max_array}" # Generate the job submission command, with sub ID and ses ID as placeholders: - cmd = submit_head + " " + env_flags + name_flag_str + job_name + " " + eo_args + " " + cmd = submit_head + " " + env_flags + name_flag_str + job_name + " " + \ + eo_args + " " + array_args + " " if test: cmd += babs.analysis_path + "/code/check_setup/call_test_job.sh" else: # if test is False, the type of session will be checked - if babs.type_session == "single-ses": - cmd += babs.analysis_path + "/code/participant_job.sh" + " " \ - + dssource + " " \ - + pushgitremote + " " + "${sub_id}" - elif babs.type_session == "multi-ses": - cmd += babs.analysis_path + "/code/participant_job.sh" + " " \ - + dssource + " " \ - + pushgitremote + " " + "${sub_id} ${ses_id}" + cmd += babs.analysis_path + "/code/participant_job.sh" + " " \ + + dssource + " " \ + + pushgitremote + " " \ + + babs.job_submit_path_abs yaml_file.write("cmd_template: '" + cmd + "'" + "\n") yaml_file.write("job_name_template: '" + job_name + "'\n") diff --git a/babs/cli.py b/babs/cli.py index 6111b379..e4c4c25d 100644 --- a/babs/cli.py +++ b/babs/cli.py @@ -304,7 +304,7 @@ def babs_submit_cli(): --all # if specified, will submit all remaining jobs that haven't been submitted. --job sub-id ses-id # can repeat - If none of these flags are specified, will only submit one job. + If none of these flags are specified, will only submit one job array task. """ parser = argparse.ArgumentParser( @@ -836,10 +836,10 @@ def get_existing_babs_proj(project_root): for i_ds in range(0, input_ds.num_ds): ds_index_str = "$INPUT_DATASET_#" + str(i_ds+1) # `path_data_rel`: - input_ds.df["path_data_rel"][i_ds] = \ + input_ds.df.loc[i_ds, "path_data_rel"] = \ babs_proj_config["input_ds"][ds_index_str]["path_data_rel"] # `is_zipped`: - input_ds.df["is_zipped"][i_ds] = \ + input_ds.df.loc[i_ds, "is_zipped"] = \ babs_proj_config["input_ds"][ds_index_str]["is_zipped"] return babs_proj, input_ds diff --git a/babs/utils.py b/babs/utils.py index d8103475..462e8b93 100644 --- a/babs/utils.py +++ b/babs/utils.py @@ -154,9 +154,11 @@ def validate_type_system(type_system): For valid ones, the type string will be changed to lower case. If not valid, raise error message. """ - list_supported = ['sge', 'slurm'] + list_supported = ['slurm'] if type_system.lower() in list_supported: type_system = type_system.lower() # change to lower case, if needed + elif type_system.lower() == 'sge': + raise Exception("We no longer support SGE. Use BABS 0.0.8 for SGE support.") else: raise Exception("Invalid cluster system type: '" + type_system + "'!" + " Currently BABS only support one of these: " @@ -1413,10 +1415,10 @@ def get_list_sub_ses(input_ds, config, babs): elif babs.type_session == "multi-ses": return dict_sub_ses -def submit_one_job(analysis_path, type_session, type_system, sub, ses=None, +def submit_array(analysis_path, type_session, type_system, maxarray, flag_print_message=True): """ - This is to submit one job. + This is to submit a job array based on template yaml file. Parameters: ---------------- @@ -1426,10 +1428,8 @@ def submit_one_job(analysis_path, type_session, type_system, sub, ses=None, multi-ses or single-ses type_system: str the type of job scheduling system, "sge" or "slurm" - sub: str - subject id - ses: str or None - session id. For type-session == "single-ses", this is None + maxarray: str + max index of the array (first index is always 1) flag_print_message: bool to print a message (True) or not (False) @@ -1439,9 +1439,11 @@ def submit_one_job(analysis_path, type_session, type_system, sub, ses=None, the int version of ID of the submitted job. job_id_str: str the string version of ID of the submitted job. - log_filename: str - log filename of this job. - Example: 'qsi_sub-01_ses-A.*'; user needs to replace '*' with 'o', 'e', etc + task_id_list: list + the list of task ID (dtype int) from the submitted job, starting from 1. + log_filename: list + the list of log filenames (dtype str) of this job. + Example: 'qsi_sub-01_ses-A.*_'; user needs to replace '*' with 'o', 'e', etc Notes: ----------------- @@ -1459,14 +1461,15 @@ def submit_one_job(analysis_path, type_session, type_system, sub, ses=None, cmd_template = templates["cmd_template"] job_name_template = templates["job_name_template"] - if type_session == "single-ses": - cmd = cmd_template.replace("${sub_id}", sub) - to_print = "Job for " + sub - job_name = job_name_template.replace("${sub_id}", sub) - else: # multi-ses - cmd = cmd_template.replace("${sub_id}", sub).replace("${ses_id}", ses) - to_print = "Job for " + sub + ", " + ses - job_name = job_name_template.replace("${sub_id}", sub).replace("${ses_id}", ses) + cmd = cmd_template.replace("${max_array}", maxarray) + to_print = "Job for an array of " + maxarray + job_name = job_name_template.replace("${max_array}", str(int(maxarray)-1)) + + # COMMENT OUT BECAUSE sub and ses AREN'T NEEDED FOR JOB SUBMISSION + # if type_session == "single-ses": + # sub_list_path = op.join(analysis_path, "code", "sub_final_inclu.csv") + # elif type_session == "multi-ses": + # sub_list_path = op.join(analysis_path, "code", "sub_ses_final_inclu.csv") # print(cmd) # run the command, get the job id: @@ -1487,33 +1490,95 @@ def submit_one_job(analysis_path, type_session, type_system, sub, ses=None, raise Exception("type system can be slurm or sge") job_id = int(job_id_str) - # log filename: - log_filename = job_name + ".*" + job_id_str + task_id_list = [] + log_filename_list = [] + + for i_array in range(int(maxarray)): + task_id_list.append(i_array + 1) # minarray starts from 1 + # log filename: + log_filename_list.append( + job_name + ".*" + job_id_str + "_" + str(i_array + 1)) to_print += " has been submitted (job ID: " + job_id_str + ")." if flag_print_message: print(to_print) - return job_id, job_id_str, log_filename + return job_id, job_id_str, task_id_list, log_filename_list -def df_update_one_job(df_jobs, i_job, job_id, log_filename, submitted=None, done=None, debug=False): +def df_submit_update(df_job_submit, job_id, task_id_list, log_filename_list, + submitted=None, done=None, debug=False): """ - This is to update one job's status and information in the dataframe df_jobs, - mostly used after job submission or resubmission. Therefore, a lot of fields will be reset. - For other cases (e.g., to update job status to running state / successfully finished state, etc.), - you may directly update df_jobs without using this function. + This is to update the status of one array task in the dataframe df_job_submit + (file: code/job_status.csv). This + function is mostly used after job submission or resubmission. Therefore, + a lot of fields will be reset. For other cases (e.g., to update job status + to running state / successfully finished state, etc.), you may directly + update df_jobs without using this function. + Parameters: + ---------------- + df_job_submit: pd.DataFrame + dataframe of the submitted job + job_id: int + the int version of ID of the submitted job. + task_id_list: list + list of task id (dtype int), starts from 1 + log_filename_list: list + list log filename (dtype str) of the submitted job + submitted: bool or None + whether the has_submitted field has to be updated + done: bool or None + whether the is_done field has to be updated + debug: bool + whether the job auditing fields need to be reset to np.nan + (fields include last_line_stdout_file, alert_message, and job_account). + + Returns: + ------------------ + df_job_submit: pd.DataFrame + dataframe of the submitted job, updated + """ + # Updating df_job_submit: + # looping through each array task id in `task_id_list` + for ind in range(len(task_id_list)): #`task_id_list` starts from 1 + df_job_submit.loc[ind, "job_id"] = job_id + df_job_submit.loc[ind, "task_id"] = int(task_id_list[ind]) + df_job_submit.at[ind, "log_filename"] = log_filename_list[ind] + # reset fields: + df_job_submit.loc[ind, "needs_resubmit"] = False + df_job_submit.loc[ind, "is_failed"] = np.nan + df_job_submit.loc[ind, "job_state_category"] = np.nan + df_job_submit.loc[ind, "job_state_code"] = np.nan + df_job_submit.loc[ind, "duration"] = np.nan + if submitted is not None: + # update the status: + df_job_submit.loc[ind, "has_submitted"] = submitted + if done is not None: + # update the status: + df_job_submit.loc[ind, "is_done"] = done + if debug: + df_job_submit.loc[ind, "last_line_stdout_file"] = np.nan + df_job_submit.loc[ind, "alert_message"] = np.nan + df_job_submit.loc[ind, "job_account"] = np.nan + return df_job_submit + + +def df_status_update(df_jobs, df_job_submit, submitted=None, done=None, debug=False): + """ + This is to update the status of one array task in the dataframe df_jobs + (file: code/job_status.csv). This is done by inserting information from + the updated dataframe df_job_submit (file: code/job_submit.csv). This + function is mostly used after job submission or resubmission. Therefore, + a lot of fields will be reset. For other cases (e.g., to update job status + to running state / successfully finished state, etc.), you may directly + update df_jobs without using this function. Parameters: ---------------- df_jobs: pd.DataFrame dataframe of jobs and their status - i_job: int - index of the job to be updated - job_id: int - job id - log_filename: str - log filename of this job. + df_job_submit: pd.DataFrame + dataframe of the to-be-submitted job submitted: bool or None whether the has_submitted field has to be updated done: bool or None @@ -1525,32 +1590,45 @@ def df_update_one_job(df_jobs, i_job, job_id, log_filename, submitted=None, done Returns: ------------------ df_jobs: pd.DataFrame - dataframe of jobs, updated - """ - # assign into `df_job_updated`: - df_jobs.at[i_job, "job_id"] = job_id - df_jobs.at[i_job, "log_filename"] = log_filename - # reset fields: - df_jobs.at[i_job, "is_failed"] = np.nan - df_jobs.at[i_job, "job_state_category"] = np.nan - df_jobs.at[i_job, "job_state_code"] = np.nan - df_jobs.at[i_job, "duration"] = np.nan - if submitted is not None: - # update the status: - df_jobs.at[i_job, "has_submitted"] = submitted - if done is not None: - # update the status: - df_jobs.at[i_job, "is_done"] = done - if debug: - df_jobs.at[i_job, "last_line_stdout_file"] = np.nan - df_jobs.at[i_job, "alert_message"] = np.nan - df_jobs.at[i_job, "job_account"] = np.nan + dataframe of jobs and their status, updated + """ + # Updating df_jobs + for index, row in df_job_submit.iterrows(): + sub_id = row['sub_id'] + + if 'ses_id' in df_jobs.columns: + ses_id = row['ses_id'] + # Locate the corresponding rows in df_jobs + mask = (df_jobs['sub_id'] == sub_id) & (df_jobs['ses_id'] == ses_id) + elif 'ses_id' not in df_jobs.columns: + mask = (df_jobs['sub_id'] == sub_id) + + # Update df_jobs fields based on the latest info in df_job_submit + df_jobs.loc[mask, "job_id"] = row['job_id'] + df_jobs.loc[mask, "task_id"] = row['task_id'] + df_jobs.loc[mask, "log_filename"] = row['log_filename'] + # reset fields: + df_jobs.loc[mask, "needs_resubmit"] = row['needs_resubmit'] + df_jobs.loc[mask, "is_failed"] = row['is_failed'] + df_jobs.loc[mask, "job_state_category"] = row['job_state_category'] + df_jobs.loc[mask, "job_state_code"] = row['job_state_code'] + df_jobs.loc[mask, "duration"] = row['duration'] + if submitted is not None: + # update the status: + df_jobs.loc[mask, "has_submitted"] = row['has_submitted'] + if done is not None: + # update the status: + df_jobs.loc[mask, "is_done"] = row['is_done'] + if debug: + df_jobs.loc[mask, "last_line_stdout_file"] = row['last_line_stdout_file'] + df_jobs.loc[mask, "alert_message"] = row['alert_message'] + df_jobs.loc[mask, "job_account"] = row['job_account'] return df_jobs -def prepare_job_ind_list(df_job, df_job_specified, count, type_session): +def prepare_job_array_df(df_job, df_job_specified, count, type_session): """ - This is to prepare the list of job indices to be submitted. + This is to prepare the df_job_submit to be submitted. Parameters: ---------------- @@ -1565,20 +1643,21 @@ def prepare_job_ind_list(df_job, df_job_specified, count, type_session): Returns: ------------------ - job_ind_list: list + df_job_submit: pd.DataFrame list of job indices to be submitted, these are indices from the full job status dataframe `df_job` """ - job_ind_list = [] + df_job_submit = pd.DataFrame() # Check if there is still jobs to submit: total_has_submitted = int(df_job["has_submitted"].sum()) - if total_has_submitted == df_job.shape[0]: # all submitted + if total_has_submitted == df_job.shape[0]: # all submitted print("All jobs have already been submitted. " + "Use `babs-status` to check job status.") - return job_ind_list + return df_job_submit # See if user has specified list of jobs to submit: - if df_job_specified is not None: + # NEED TO WORK ON THIS + if df_job_specified is not None: # NEED TO WORK ON THIS print("Will only submit specified jobs...") for j_job in range(0, df_job_specified.shape[0]): # find the index in the full `df_job`: @@ -1615,15 +1694,13 @@ def prepare_job_ind_list(df_job, df_job_specified, count, type_session): + " If you want to resubmit it," \ + " please use `babs-status --resubmit`" print(to_print) - else: # taking into account the `count` argument - j_count = 0 - for i_job in range(0, df_job.shape[0]): - if not df_job["has_submitted"][i_job]: # to run - job_ind_list.append(i_job) - j_count += 1 - if j_count == count: - break - return job_ind_list + else: # taking into account the `count` argument + df_remain = df_job[df_job.has_submitted == False] + if count > 0: + df_job_submit = df_remain[:count].reset_index(drop=True) + else: # if count is None or negative, run all + df_job_submit = df_remain.copy().reset_index(drop=True) + return df_job_submit def submit_one_test_job(analysis_path, type_system, flag_print_message=True): @@ -1727,6 +1804,7 @@ def create_job_status_csv(babs): df_job["job_state_code"] = np.nan df_job["duration"] = np.nan df_job["is_done"] = False # = has branch in output_ria + df_job["needs_resubmit"] = False # df_job["echo_success"] = np.nan # echoed success in log file; # TODO # # if ^^ is False, but `is_done` is True, did not successfully clean the space df_job["is_failed"] = np.nan @@ -1768,9 +1846,18 @@ def read_job_status_csv(csv_path): loaded dataframe """ df = pd.read_csv(csv_path, - dtype={"job_id": 'int', - 'has_submitted': 'bool', - 'is_done': 'bool' + dtype={"job_id": 'Int64', + "task_id": 'Int64', + "log_filename": 'str', + 'has_submitted': 'boolean', + 'is_done': 'boolean', + 'is_failed': 'boolean', + 'needs_resubmit': 'boolean', + 'last_line_stdout_file': 'str', + 'job_state_category': 'str', + 'job_state_code': 'str', + 'duration': 'str', + "alert_message": 'str' }) return df @@ -1805,7 +1892,7 @@ def report_job_status(df, analysis_path, config_msg_alert): if total_has_submitted > 0: # there is at least one job submitted total_is_done = int(df["is_done"].sum()) print("Among submitted jobs,") - print(str(total_is_done) + ' job(s) are successfully finished;') + print(str(total_is_done) + ' job(s) successfully finished;') if total_is_done == total_jobs: print("All jobs are completed!") @@ -1819,7 +1906,7 @@ def report_job_status(df, analysis_path, config_msg_alert): # TODO: add stalled one total_is_failed = int(df["is_failed"].sum()) - print(str(total_is_failed) + ' job(s) are failed.') + print(str(total_is_failed) + ' job(s) failed.') # if there is job failed: print more info by categorizing msg: if total_is_failed > 0: @@ -2009,6 +2096,38 @@ def _parsing_squeue_out(squeue_std): df = pd.DataFrame(data=dict_val) df = df.set_index('JB_job_number') + # df for array submission looked different + # Need to expand rows like 3556872_[98-1570] to 3556872_98, 3556872_99, etc + # This code only expects the first line to be pending array tasks, 3556872_[98-1570] + if '[' in df.index[0]: + first_row = df.iloc[0] + range_parts = re.search(r"\[(\d+-\d+)", df.index[0]).group(1) # get the array range + start, end = map(int, range_parts.split("-")) # get min and max pending array + job_id = df.index[0].split("_")[0] + + expanded_rows = [] + for task_id in range(start, end + 1): + expanded_rows.append( + { + "JB_job_number": f"{job_id}_{task_id}", + "@state": first_row["@state"], + "duration": first_row["duration"], + "state": first_row["state"], + "job_id": job_id, + "task_id": task_id, + } + ) + # Convert expanded rows to DataFrame + expanded_df = pd.DataFrame(expanded_rows).set_index("JB_job_number") + # Process the rest of the DataFrame + remaining_df = df.iloc[1:].copy() + remaining_df["job_id"] = remaining_df.index.str.split("_").str[0] + remaining_df["task_id"] = remaining_df.index.str.split("_").str[1].astype(int) + # Combine and sort + final_df = pd.concat([expanded_df, remaining_df]) + final_df = final_df.sort_values(by=["job_id", "task_id"]) + return final_df + return df