diff --git a/nipoppy/extractors/freesurfer/run_FS_utils.py b/nipoppy/extractors/freesurfer/run_FS_utils.py index eab0e7b1..04083301 100644 --- a/nipoppy/extractors/freesurfer/run_FS_utils.py +++ b/nipoppy/extractors/freesurfer/run_FS_utils.py @@ -16,10 +16,13 @@ def get_mris_preproc_cmd(FS_dir, participants_list, out_file, meas="thickness", """ participants_str_list = [] for participant in participants_list: - dirpath = Path(f"{FS_dir}/{participant}") - dirpath_status = Path.is_dir(dirpath) - if dirpath_status: + fpath_lh = Path(f"{FS_dir}/{participant}/surf/lh.thickness") + fpath_rh = Path(f"{FS_dir}/{participant}/surf/rh.thickness") + fpath_status = Path.is_file(fpath_lh) & Path.is_file(fpath_rh) + if fpath_status: participants_str_list.append(f"--s {participant}") + else: + print(f"ignoring {participant} with missing surf files...") participants_str = ' '.join(participants_str_list) FS_CMD_dict = {} @@ -61,7 +64,7 @@ def run(FS_dir, participants_list, out_file, meas, fwhm, template): parser = argparse.ArgumentParser(description=HELPTEXT) parser.add_argument('--global_config', type=str, help='path to global config file for your nipoppy dataset', required=True) parser.add_argument('--session_id', type=str, help='session_id', required=True) - parser.add_argument('--visit_id', type=str, help='visit_id', required=True) + parser.add_argument('--visit_id', type=str, default=None, help='visit_id') parser.add_argument('--group', type=str, default=None, help='filter participants based on a specific group value in the csv') parser.add_argument('--output_dir', type=str, default=None, help='out_file path for the processed / aggregated output') parser.add_argument('--meas', type=str, default="thickness", help='cortical measure') @@ -81,7 +84,11 @@ def run(FS_dir, participants_list, out_file, meas, fwhm, template): template = args.template session = f"ses-{session_id}" - visit = f"V{visit_id}" + + if visit_id is None: + visit = f"V{session_id}" + else: + visit = f"V{visit_id}" # Read global config with open(global_config_file, 'r') as f: @@ -99,9 +106,13 @@ def run(FS_dir, participants_list, out_file, meas, fwhm, template): FS_dir = f"{dataset_root}/derivatives/freesurfer/v{FS_VERSION}/output/{session}/" FS_license = f"{FS_dir}/license.txt" + + if output_dir is None: output_dir = f"{dataset_root}/derivatives/freesurfer/v{FS_VERSION}/surfmaps/{session}/" + Path(f"{output_dir}").mkdir(parents=True, exist_ok=True) + # grab bids_ids manifest = f"{dataset_root}/tabular/manifest.csv" @@ -114,6 +125,13 @@ def run(FS_dir, participants_list, out_file, meas, fwhm, template): # Read participant lists and filter by session and group manifest_df = pd.read_csv(manifest) manifest_df = manifest_df[manifest_df["session"] == session] + + if "bids_id" not in manifest_df.columns: + # Append bids id col from id_map_file (participant_id --> bids_id) + id_mapping_csv = f"{dataset_root}/scratch/participant_id_bids_id_map.csv" + id_mapping_df = pd.read_csv(id_mapping_csv) + manifest_df = pd.merge(manifest_df, id_mapping_df[["participant_id","bids_id"]], on=["participant_id"]) + manifest_df = manifest_df[~manifest_df["bids_id"].isna()] n_bids = len(manifest_df["bids_id"].unique()) @@ -150,9 +168,8 @@ def run(FS_dir, participants_list, out_file, meas, fwhm, template): out_file = f"/output_dir/surf_concat_{group}_{fwhm}mm.mgh" - run(FS_dir, proc_participants, out_file, meas, fwhm, template) - print("Running mris_preproc separately for left and right hemisphere\n") + run(FS_dir, proc_participants, out_file, meas, fwhm, template) print(" -"*30) print("") diff --git a/nipoppy/sample_run_nipoppy.py b/nipoppy/sample_run_nipoppy.py index 58ec99cd..d308eb0c 100644 --- a/nipoppy/sample_run_nipoppy.py +++ b/nipoppy/sample_run_nipoppy.py @@ -10,7 +10,7 @@ import nipoppy.workflow.logger as my_logger # from nipoppy.workflow.tabular import generate_manifest from nipoppy.workflow.dicom_org import run_dicom_org -from nipoppy.workflow.dicom_org import check_dicom_status +from nipoppy.workflow import make_doughnut from nipoppy.workflow.bids_conv import run_bids_conv from nipoppy.workflow.proc_pipe.mriqc import run_mriqc from nipoppy.workflow.proc_pipe.fmriprep import run_fmriprep @@ -18,6 +18,37 @@ from nipoppy.workflow.catalog import generate_pybids_index from nipoppy.trackers import run_tracker +# helper functions +def get_proc_batches(proc_participants, MAX_BATCH, logger): + """ Generates MAX_BATCH participants to run at any given time before clean-up + """ + n_proc_participants = len(proc_participants) + if n_proc_participants > MAX_BATCH: + n_batches = int(np.ceil(n_proc_participants/MAX_BATCH)) + logger.info(f"Running fmriprep in {n_batches} batches of at most {MAX_BATCH} participants each") + proc_participant_batches = np.array_split(proc_participants, n_batches) + else: + proc_participant_batches = [proc_participants] + + return proc_participant_batches + +def refresh_bids_db(global_configs, session_id, pipeline, ignore_patterns, logger): + """ Remove and rebuilds the bids_db for the given pipeline with the given ignore_patterns + """ + DATASET_ROOT = global_configs["DATASET_ROOT"] + # remove old bids_db + sql_db_file = f"{DATASET_ROOT}/proc/bids_db_{pipeline}/bids_db.sqlite" + logger.info(f"Removing old bids_db from {sql_db_file}") + if os.path.exists(sql_db_file): + os.remove(sql_db_file) + + # The default bids_db_path is proc/bids_db_{pipeline} + bids_db_path = generate_pybids_index(global_configs, session_id, pipeline=pipeline, + ignore_patterns=ignore_patterns, logger=logger) + logger.info(f"bids_db_path: {bids_db_path}") + + return bids_db_path + # argparse HELPTEXT = """ Top level script to orchestrate workflows as specified in the global_config.json @@ -25,7 +56,10 @@ parser = argparse.ArgumentParser(description=HELPTEXT) parser.add_argument('--global_config', type=str, required=True, help='path to global config file for your nipoppy dataset') parser.add_argument('--session_id', type=str, required=True, help='current session or visit ID for the dataset') +parser.add_argument('--workflows', type=str, nargs="*", help='workflows to run in order (default: all))') +parser.add_argument('--use_hpc', action='store_true', help='dispatch proc_pipe jobs to HPC (default: False)') parser.add_argument('--n_jobs', type=int, default=4, help='number of parallel processes') +parser.add_argument('--n_max_cleanup', type=int, default=10, help='number of participants to run before cleaning up intermediate files') args = parser.parse_args() @@ -35,36 +69,56 @@ global_configs = json.load(f) DATASET_ROOT = global_configs["DATASET_ROOT"] + log_dir = f"{DATASET_ROOT}/scratch/logs/" log_file = f"{log_dir}/nipoppy.log" +logger = my_logger.get_logger(log_file, level="INFO") # Used to run trackers to identify new participants to process dash_schema_file = f"{DATASET_ROOT}/proc/bagel_schema.json" # bids_db_path -# TODO create a common BIDS_DB for mriqc and fmriprep once relative_paths issue is resolved FMRIPREP_VERSION = global_configs["PROC_PIPELINES"]["fmriprep"]["VERSION"] output_dir = f"{DATASET_ROOT}/derivatives/" fmriprep_dir = f"{output_dir}/fmriprep/v{FMRIPREP_VERSION}" -# bids_db_path = f"{fmriprep_dir}/first_run/bids_db/" session_id = args.session_id session = f"ses-{session_id}" -n_jobs = args.n_jobs -MAX_BATCH = 10 # Max number of participants to run BEFORE cleaning up intermediate files +# HPC job list dir +hpc_job_list_dir = f"{DATASET_ROOT}/proc/" -logger = my_logger.get_logger(log_file, level="INFO") +# Number of parallel jobs to run +n_jobs = args.n_jobs +# Max number of participants to run BEFORE cleaning up intermediate files +MAX_BATCH = args.n_max_cleanup +# Use HPC to run proc_pipe jobs +use_hpc = args.use_hpc + +# Workflows to run +workflows = args.workflows +if not workflows: + workflows = global_configs["WORKFLOWS"] + +# if workflows include proc_pipes then include all proc_pipes from global config (freesurfer is not run by default) +proc_pipes = list(global_configs["PROC_PIPELINES"].keys()) +if "proc_pipes" in workflows: + logger.info(f"Running all proc_pipes: {proc_pipes}") + workflows.remove("proc_pipes") + workflows.extend(proc_pipes) + +# Run all available trackers at the end +ALL_TRACKERS = ["heudiconv"] + proc_pipes logger.info("-"*75) logger.info(f"Starting nipoppy for {DATASET_ROOT} dataset...") logger.info("-"*75) logger.info(f"dataset session (i.e visit): {session}") -logger.info(f"Running {n_jobs} jobs in parallel") - -workflows = global_configs["WORKFLOWS"] logger.info(f"Running workflows: {workflows} serially") +logger.info(f"Running {n_jobs} jobs in parallel (only for local workflows)") +logger.info(f"Cleaning up intermediate files after {MAX_BATCH} participants (only for local workflows)") +logger.info(f"Using HPC for proc pipelines: {use_hpc}") for wf in workflows: logger.info("-"*50) @@ -74,7 +128,8 @@ if wf == "generate_manifest": logger.info(f"***All sessions are fetched while generating manifest***") # generate_manifest.run(global_configs, task="regenerate", dash_bagel=True, logger=logger) - check_dicom_status.run(global_config_file, regenerate=True, empty=False) + logger.info(f"test run NOT generating manifest") + make_doughnut.run(global_config_file, regenerate=True, empty=False) elif wf == "dicom_org": run_dicom_org.run(global_configs, session_id, n_jobs=n_jobs, logger=logger) @@ -91,37 +146,37 @@ "/perf"] # Run mriqc tracker to regenerate bagel - run_tracker.run(global_configs, dash_schema_file, ["mriqc"], logger=logger) + run_tracker.run(global_configs, dash_schema_file, [wf], session_id=session_id, logger=logger) - proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline="mriqc", logger=logger) + proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline=wf, logger=logger) n_proc_participants = len(proc_participants) if n_proc_participants > 0: logger.info(f"Running MRIQC on {n_proc_participants} participants from session: {session} and for modalities: {modalities}") - # The default bids_db_path is proc/bids_db_{pipeline} - bids_db_path = generate_pybids_index(global_configs, session_id, pipeline="mriqc", - ignore_patterns=ignore_patterns, logger=logger) - logger.info(f"bids_db_path: {bids_db_path}") - # bids_db_path = None - - if n_jobs > 1: - # Process in parallel! (Won't write to logs) - mriqc_results = Parallel(n_jobs=n_jobs)(delayed(run_mriqc.run)( - global_configs=global_configs, session_id=session_id, participant_id=participant_id, - modalities=modalities, output_dir=None, logger=logger) - for participant_id in proc_participants) + bids_db_path = refresh_bids_db(global_configs, session_id, wf, ignore_patterns, logger) + # Generate a list of participants to run on HPC + if use_hpc: + hpc_job_list_file = f"{hpc_job_list_dir}/hpc_job_list_{wf}_{session}.txt" + logger.info(f"Generating HPC job list for {n_proc_participants} participants: {hpc_job_list_file}") + pd.DataFrame(data=proc_participants).to_csv(hpc_job_list_file, header=False, index=False) + else: - # Useful for debugging - mriqc_results = [] - for participant_id in proc_participants: - res = run_mriqc.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id, - modalities=modalities, output_dir=None, logger=logger) - mriqc_results.append(res) - - # Rerun tracker for updated bagel - run_tracker.run(global_configs, dash_schema_file, ["mriqc"], logger=logger) + if n_jobs > 1: + # Process in parallel! (Won't write to logs) + wf_results = Parallel(n_jobs=n_jobs)(delayed(run_mriqc.run)( + global_configs=global_configs, session_id=session_id, participant_id=participant_id, + modalities=modalities, output_dir=None, logger=logger) + for participant_id in proc_participants) + else: + # Useful for debugging + wf_results = [] + for participant_id in proc_participants: + res = run_mriqc.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id, + modalities=modalities, output_dir=None, logger=logger) + wf_results.append(res) + else: logger.info(f"No new participants to run MRIQC on for session: {session}") @@ -131,68 +186,90 @@ "/swi/", "/perf"] # Run fmriprep tracker to regenerate bagel - run_tracker.run(global_configs, dash_schema_file, ["fmriprep"], logger=logger) + run_tracker.run(global_configs, dash_schema_file, [wf], session_id=session_id, logger=logger) - proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline="fmriprep", logger=logger) + proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline=wf, logger=logger) n_proc_participants = len(proc_participants) if n_proc_participants > 0: - # remove old bids_db - sql_db_file = f"{DATASET_ROOT}/proc/bids_db_fmriprep/bids_db.sqlite" - logger.info(f"Removing old bids_db from {sql_db_file}") - if os.path.exists(sql_db_file): - os.remove(sql_db_file) - - logger.info(f"Running fmriprep on {n_proc_participants} participants from session: {session} and for modalities: {modalities}") - # The default bids_db_path is proc/bids_db_{pipeline} - bids_db_path = generate_pybids_index(global_configs, session_id, pipeline="fmriprep", - ignore_patterns=ignore_patterns, logger=logger) - logger.info(f"bids_db_path: {bids_db_path}") - - # Don't run more than MAX_BATCH participants in parallel - # Also clean-up after MAX_BATCH participants to avoid storage issues - if n_proc_participants > MAX_BATCH: - n_batches = int(np.ceil(n_proc_participants/MAX_BATCH)) - logger.info(f"Running fmriprep in {n_batches} batches of at most {MAX_BATCH} participants each") - proc_participant_batches = np.array_split(proc_participants, n_batches) + bids_db_path = refresh_bids_db(global_configs, session_id, wf, ignore_patterns, logger) - else: - proc_participant_batches = [proc_participants] + # Generate a list of participants to run on HPC + if use_hpc: + hpc_job_list_file = f"{hpc_job_list_dir}/hpc_job_list_{wf}_{session}.txt" + logger.info(f"Generating HPC job list for {n_proc_participants} participants: {hpc_job_list_file}") + pd.DataFrame(data=proc_participants).to_csv(hpc_job_list_file, header=False, index=False) - for proc_participant_batch in proc_participant_batches: - proc_participant_batch = list(proc_participant_batch) - logger.info(f"Running fmriprep on participants: {proc_participant_batch}") - if n_jobs > 1: - # Process in parallel! (Won't write to logs) - fmiprep_results = Parallel(n_jobs=n_jobs)(delayed(run_fmriprep.run)( - global_configs=global_configs, session_id=session_id, participant_id=participant_id, - output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) - for participant_id in proc_participant_batch) + else: + proc_participant_batches = get_proc_batches(proc_participants, MAX_BATCH, logger) + for proc_participant_batch in proc_participant_batches: + proc_participant_batch = list(proc_participant_batch) + logger.info(f"Running fmriprep on participants: {proc_participant_batch}") + if n_jobs > 1: + # Process in parallel! (Won't write to logs) + wf_results = Parallel(n_jobs=n_jobs)(delayed(run_fmriprep.run)( + global_configs=global_configs, session_id=session_id, participant_id=participant_id, + output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) + for participant_id in proc_participant_batch) + + else: + # Useful for debugging + wf_results = [] + for participant_id in proc_participant_batch: + res = run_fmriprep.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id, + output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) + wf_results.append(res) + + # Clean up intermediate files + logger.info(f"Cleaning up intermediate files from {fmriprep_dir}") + fmriprep_wf_dir = glob(f"{fmriprep_dir}/fmriprep*wf") + subject_home_dirs = glob(f"{fmriprep_dir}/output/fmriprep_home_sub-*") + run_toml_dirs = glob(f"{fmriprep_dir}/2023*") + + logger.info(f"fmriprep_wf_dir:\n{fmriprep_wf_dir}") + logger.info(f"subject_home_dirs:\n{subject_home_dirs}") + logger.info(f"run_toml_dirs:\n{run_toml_dirs}") + + for cleanup_dir in fmriprep_wf_dir + subject_home_dirs + run_toml_dirs: + shutil.rmtree(cleanup_dir) + + elif wf == "freesurfer": + logger.info(f"freesurfer is currently run within the fmriprep run") + + elif wf == "tractoflow": + # Run tractoflow tracker to regenerate bagel + run_tracker.run(global_configs, dash_schema_file, [wf], session_id=session_id, logger=logger) + + proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline=wf, logger=logger) + n_proc_participants = len(proc_participants) + + if n_proc_participants > 0: + # Generate a list of participants to run on HPC + if use_hpc: + hpc_job_list_file = f"{hpc_job_list_dir}/hpc_job_list_{wf}_{session}.txt" + logger.info(f"Generating HPC job list for {n_proc_participants} participants: {hpc_job_list_file}") + pd.DataFrame(data=proc_participants).to_csv(hpc_job_list_file, header=False, index=False) - else: - # Useful for debugging - fmiprep_results = [] - for participant_id in proc_participant_batch: - res = run_fmriprep.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id, - output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) - fmiprep_results.append(res) + else: + proc_participant_batches = get_proc_batches(proc_participants, MAX_BATCH, logger) + for proc_participant_batch in proc_participant_batches: + proc_participant_batch = list(proc_participant_batch) + logger.info(f"Running fmriprep on participants: {proc_participant_batch}") + if n_jobs > 1: + # Process in parallel! (Won't write to logs) + wf_results = Parallel(n_jobs=n_jobs)(delayed(run_fmriprep.run)( + global_configs=global_configs, session_id=session_id, participant_id=participant_id, + output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) + for participant_id in proc_participant_batch) + + else: + # Useful for debugging + wf_results = [] + for participant_id in proc_participant_batch: + res = run_fmriprep.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id, + output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) + wf_results.append(res) - # Clean up intermediate files - logger.info(f"Cleaning up intermediate files from {fmriprep_dir}") - fmriprep_wf_dir = glob(f"{fmriprep_dir}/fmriprep*wf") - subject_home_dirs = glob(f"{fmriprep_dir}/output/fmriprep_home_sub-*") - run_toml_dirs = glob(f"{fmriprep_dir}/2023*") - - logger.info(f"fmriprep_wf_dir:\n{fmriprep_wf_dir}") - logger.info(f"subject_home_dirs:\n{subject_home_dirs}") - logger.info(f"run_toml_dirs:\n{run_toml_dirs}") - - for cleanup_dir in fmriprep_wf_dir + subject_home_dirs + run_toml_dirs: - shutil.rmtree(cleanup_dir) - - # Rerun tracker for updated bagel - run_tracker.run(global_configs, dash_schema_file, ["fmriprep"], logger=logger) - else: logger.error(f"Unknown workflow: {wf}") @@ -200,6 +277,10 @@ logger.info(f"Finishing workflow: {wf}") logger.info("-"*50) +# Rerun tracker(s) to update bagel +logger.info(f"Running ALL trackers: {ALL_TRACKERS} to update bagel") +run_tracker.run(global_configs, dash_schema_file, ALL_TRACKERS, session_id="ALL", logger=logger) + logger.info("-"*75) logger.info(f"Finishing nipoppy run...") logger.info("-"*75) \ No newline at end of file diff --git a/nipoppy/trackers/bids_tracker.py b/nipoppy/trackers/bids_tracker.py index 70b89961..e5a12e96 100644 --- a/nipoppy/trackers/bids_tracker.py +++ b/nipoppy/trackers/bids_tracker.py @@ -18,7 +18,7 @@ "func": ["bold"] } -def check_staus(bids_layout, participant_id, session_id, run_id, datatype): +def check_status(bids_layout, participant_id, session_id, datatype, run_id, acq_label): # Remove non-alphanumeric characters from participant_id participant_id = participant_id_to_dicom_id(participant_id) suffix_list = files_dict[datatype] @@ -27,6 +27,7 @@ def check_staus(bids_layout, participant_id, session_id, run_id, datatype): scan_file = bids_layout.get(subject=participant_id, session=session_id, datatype=datatype, + acquisition=acq_label, run=run_id, suffix=suffix, extension='nii.gz') @@ -34,6 +35,7 @@ def check_staus(bids_layout, participant_id, session_id, run_id, datatype): sidecar_file = bids_layout.get(subject=participant_id, session=session_id, datatype=datatype, + acquisition=acq_label, run=run_id, suffix=suffix, extension='json') @@ -53,29 +55,29 @@ def check_staus(bids_layout, participant_id, session_id, run_id, datatype): return status_msg -def check_T1w(bids_layout, participant_id, session_id, run_id): +def check_T1w(bids_layout, participant_id, session_id, run_id, acq_label=None): datatype = "anat" - status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + status = check_status(bids_layout, participant_id, session_id, datatype, run_id, acq_label) return status -def check_dwi(bids_layout, participant_id, session_id, run_id): +def check_dwi(bids_layout, participant_id, session_id, run_id, acq_label=None): datatype = "dwi" - status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + status = check_status(bids_layout, participant_id, session_id, datatype, run_id, acq_label) return status -def check_fmap(bids_layout, participant_id, session_id, run_id): +def check_fmap(bids_layout, participant_id, session_id, run_id, acq_label=None): datatype = "fmap" - status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + status = check_status(bids_layout, participant_id, session_id, datatype, run_id, acq_label) return status -def check_func(bids_layout, participant_id, session_id, run_id): +def check_func(bids_layout, participant_id, session_id, run_id, acq_label=None): datatype = "func" - status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + status = check_status(bids_layout, participant_id, session_id, datatype, run_id, acq_label) return status -def check_structural(bids_layout, participant_id, session_id, run_id): - T1_status = check_T1w(bids_layout, participant_id, session_id, run_id) - dwi_status = check_dwi(bids_layout, participant_id, session_id, run_id) +def check_structural(bids_layout, participant_id, session_id, run_id, acq_label=None): + T1_status = check_T1w(bids_layout, participant_id, session_id, run_id, acq_label) + dwi_status = check_dwi(bids_layout, participant_id, session_id, run_id, acq_label) if (T1_status == SUCCESS) & (dwi_status == SUCCESS): status = SUCCESS else: diff --git a/nipoppy/trackers/fmriprep_tracker.py b/nipoppy/trackers/fmriprep_tracker.py index c6418622..8100d5d6 100644 --- a/nipoppy/trackers/fmriprep_tracker.py +++ b/nipoppy/trackers/fmriprep_tracker.py @@ -8,6 +8,7 @@ # sub-MNI0056D864854_ses-01_task-rest_run-1_space-T1w_desc-preproc_bold.nii.gz # sub-MNI0056D864854_ses-01_task-rest_run-1_space-T1w_desc-brain_mask.json # sub-PD01134_ses-01_task-rest_run-1_space-MNI152NLin2009cSym_res-1_desc-brain_mask.json +# sub-YLOPD160_ses-01_acq-bold_run-1_magnitude1.json (ACQ) # Globals (any one of this would qualify as success) default_tpl_spaces = ["MNI152NLin2009cAsym","MNI152NLin2009cSym"] @@ -30,11 +31,16 @@ "preproc_bold.nii": "desc-preproc_bold.nii.gz", } -def check_output(subject_dir, file_check_dict, session_id, run_id, modality, - tpl_spaces=default_tpl_spaces, tpl_resolutions=default_tpl_resolutions, task=None): +def check_output(subject_dir, file_check_dict, session_id, run_id, acq_label, modality, + tpl_spaces=default_tpl_spaces, tpl_resolutions=default_tpl_resolutions, task_label=None): + + # bids file-name tags in the correct order + bids_id = os.path.basename(subject_dir) session = f"ses-{session_id}" + task = f"task-{task_label}" + acq = f"acq-{acq_label}" run = f"run-{run_id}" - participant_id = os.path.basename(subject_dir) + status_msg = SUCCESS for k,v in file_check_dict.items(): if status_msg == SUCCESS: @@ -43,16 +49,24 @@ def check_output(subject_dir, file_check_dict, session_id, run_id, modality, for tpl_res in tpl_resolutions: file_suffix = f"space-{tpl_space}_{tpl_res}_{v}" if modality == "anat": - if run_id == None: - filepath = Path(f"{subject_dir}/{session}/{modality}/{participant_id}_{session}_{file_suffix}") + if (run_id == None) & (acq_label == None): + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{file_suffix}") + elif (run_id == None) & (acq_label != None): + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{acq}_{file_suffix}") + elif (run_id != None) & (acq_label == None): + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{run}_{file_suffix}") else: - filepath = Path(f"{subject_dir}/{session}/{modality}/{participant_id}_{session}_{run}_{file_suffix}") + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{acq}_{run}_{file_suffix}") elif modality == "func": - if run_id == None: - filepath = Path(f"{subject_dir}/{session}/{modality}/{participant_id}_{session}_{task}_{file_suffix}") + if (run_id == None) & (acq_label == None): + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{task}_{file_suffix}") + elif (run_id == None) & (acq_label != None): + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{task}_{acq}_{file_suffix}") + elif (run_id != None) & (acq_label == None): + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{task}_{run}_{file_suffix}") else: - filepath = Path(f"{subject_dir}/{session}/{modality}/{participant_id}_{session}_{task}_{run}_{file_suffix}") + filepath = Path(f"{subject_dir}/{session}/{modality}/{bids_id}_{session}_{task}_{acq}_{run}_{file_suffix}") else: print(f"Unknown modality: {modality}") @@ -69,67 +83,69 @@ def check_output(subject_dir, file_check_dict, session_id, run_id, modality, return status_msg -def check_anat_output(subject_dir, session_id, run_id): +def check_anat_output(subject_dir, session_id, run_id, acq_label=None): """ Check output paths for anat stream """ modality = "anat" - status_msg = check_output(subject_dir, anat_files_dict, session_id, run_id, modality) + status_msg = check_output(subject_dir, anat_files_dict, session_id, run_id, acq_label, modality) return status_msg -def check_func_output(subject_dir, session_id, run_id, task="task-rest"): +def check_func_output(subject_dir, session_id, run_id, acq_label=None, task_label="rest"): """ Check output paths for func stream """ modality = "func" - status_msg = check_output(subject_dir, func_files_dict, session_id, run_id, modality, task=task) + status_msg = check_output(subject_dir, func_files_dict, session_id, run_id, acq_label, modality, task_label=task_label) return status_msg -# TODO -def check_MNI152NLin2009cSym(subject_dir, session_id, run_id): +# TODO ------------------ Add custom trackers ------------------ +def check_MNI152NLin2009cSym(subject_dir, session_id, run_id, acq_label=None): """ Checks availability of MNI152NLin2009cSym space images """ custom_tpl_spaces = ["MNI152NLin2009cSym"] custom_tpl_resolutions = ["res-1"] modality = "anat" file_dict = anat_files_dict - status_msg = check_output(subject_dir, file_dict, session_id, run_id, modality, + status_msg = check_output(subject_dir, file_dict, session_id, run_id, acq_label, modality, tpl_spaces=custom_tpl_spaces, tpl_resolutions=custom_tpl_resolutions) return status_msg -def check_MNI152NLin2009cAsym(subject_dir, session_id, run_id): +def check_MNI152NLin2009cAsym(subject_dir, session_id, run_id, acq_label=None): """ Checks availability of MNI152NLin2009cAsym space images """ custom_tpl_spaces = ["MNI152NLin2009cAsym"] custom_tpl_resolutions = ["res-1"] modality = "anat" file_dict = anat_files_dict - status_msg = check_output(subject_dir, file_dict, session_id, run_id, modality, + status_msg = check_output(subject_dir, file_dict, session_id, run_id, acq_label, modality, tpl_spaces=custom_tpl_spaces, tpl_resolutions=custom_tpl_resolutions) return status_msg -def check_MNI152NLin6Sym(subject_dir, session_id, run_id): +def check_MNI152NLin6Sym(subject_dir, session_id, run_id, acq_label=None): """ Checks availability of MNI152NLin6Sym space images """ custom_tpl_spaces = ["MNI152NLin6Sym"] custom_tpl_resolutions = ["res-1"] modality = "anat" file_dict = anat_files_dict - status_msg = check_output(subject_dir, file_dict, session_id, run_id, modality, + status_msg = check_output(subject_dir, file_dict, session_id, run_id, acq_label, modality, tpl_spaces=custom_tpl_spaces, tpl_resolutions=custom_tpl_resolutions) return status_msg -def check_MNI152Lin(subject_dir, session_id, run_id): +def check_MNI152Lin(subject_dir, session_id, run_id, acq_label=None): """ Checks availability of MNI152Lin space images """ custom_tpl_spaces = ["MNI152Lin"] custom_tpl_resolutions = ["res-1"] modality = "anat" file_dict = anat_files_dict - status_msg = check_output(subject_dir, file_dict, session_id, run_id, modality, + status_msg = check_output(subject_dir, file_dict, session_id, run_id, acq_label, modality, tpl_spaces=custom_tpl_spaces, tpl_resolutions=custom_tpl_resolutions) return status_msg +# TODO ------------------ Add custom trackers ------------------ + tracker_configs = { "pipeline_complete": check_anat_output, diff --git a/nipoppy/trackers/fs_tracker.py b/nipoppy/trackers/fs_tracker.py index 2cf867d4..1108ec7c 100644 --- a/nipoppy/trackers/fs_tracker.py +++ b/nipoppy/trackers/fs_tracker.py @@ -72,7 +72,7 @@ def check_stats(subject_dir, PARCELS=DEFAULT_PARCELS): return filepath_status & aseg_status -def check_run_status(subject_dir, session_id=None, run_id=None): +def check_run_status(subject_dir, session_id=None, run_id=None, acq_label=None): check_list = [check_fsdirs,check_mri,check_label,check_surf,check_stats] status_list = [] for cl in check_list: @@ -84,7 +84,7 @@ def check_run_status(subject_dir, session_id=None, run_id=None): status_msg = FAIL return status_msg -def check_parcels(subject_dir, session_id=None, run_id=None): +def check_parcels(subject_dir, session_id=None, run_id=None, acq_label=None): stats_status = check_stats(subject_dir,ALL_PARCELS) if stats_status: status_msg = SUCCESS diff --git a/nipoppy/trackers/mriqc_tracker.py b/nipoppy/trackers/mriqc_tracker.py index d92cec53..9bd8647e 100644 --- a/nipoppy/trackers/mriqc_tracker.py +++ b/nipoppy/trackers/mriqc_tracker.py @@ -7,29 +7,63 @@ FAIL ) +## PPMI example: +# json: <>/mriqc/23.1.0/output/sub-3000/ses-BL/anat/sub-3000_ses-BL_acq-sag3D_run-01_T1w.json +# html: <>/mriqc/23.1.0/output/sub-3000_ses-BL_acq-sag3D_run-01_T1w.html + +## YLO example: +# json: <>/mriqc/23.1.0/output/sub-YLOPD169/ses-01/anat/sub-YLOPD169_ses-01_run-1_T1w.json +# html: <>/mriqc/23.1.0/output/sub-YLOPD169_ses-01_run-1_T1w.html + # relative to subject_dir -T1w_files_dict = { - "html" : "../{}_{}_run-{}_T1w.html", - "json" : "{}/anat/{}_{}_run-{}_T1w.json", -} -T2w_files_dict = { - "html" : "../{}_{}_run-1_T2w.html", - "json" : "{}/anat/{}_{}_run-{}_T2w.json", +# need to do this separate for anat and func since suffix need not match the filepath inclusive of datatype +anat_files_dict = { + "html" : "../{}.html", + "json" : "{}/anat/{}.json", } func_files_dict = { - "html" : "../{}_{}_task-rest_run-{}_bold.html", - "json" : "{}/func/{}_{}_task-rest_run-{}_bold.json", + "html" : "../{}.html", + "json" : "{}/func/{}.json", } -def check_staus(subject_dir, session_id, run_id, file_dict): +def check_status(subject_dir, session_id, run_id, acq_label, task_label, suffix, file_dict): + + # bids file-name tags in the correct order bids_id = os.path.basename(subject_dir) session = f"ses-{session_id}" + task = f"task-{task_label}" + acq = f"acq-{acq_label}" + run = f"run-{run_id}" + + if suffix in ["T1w", "T2w"]: + if (run_id == None) & (acq_label == None): + fname = Path(f"{bids_id}_{session}_{suffix}") + elif (run_id == None) & (acq_label != None): + fname = Path(f"{bids_id}_{session}_{acq}_{suffix}") + elif (run_id != None) & (acq_label == None): + fname = Path(f"{bids_id}_{session}_{run}_{suffix}") + else: + fname = Path(f"{bids_id}_{session}_{acq}_{run}_{suffix}") + + elif suffix == "bold": + if (run_id == None) & (acq_label == None): + fname = Path(f"{bids_id}_{session}_{task}_{suffix}") + elif (run_id == None) & (acq_label != None): + fname = Path(f"{bids_id}_{session}_{task}_{acq}_{suffix}") + elif (run_id != None) & (acq_label == None): + fname = Path(f"{bids_id}_{session}_{task}_{run}_{suffix}") + else: + fname = Path(f"{bids_id}_{session}_{task}_{acq}_{run}_{suffix}") + + else: + print(f"Unknown suffix: {suffix}") + filepath_status_list = [] for k,v in file_dict.items(): if k == "html": - filepath = Path(f"{subject_dir}/" + v.format(bids_id, session, run_id)) + filepath = Path(f"{subject_dir}/" + v.format(fname)) else: - filepath = Path(f"{subject_dir}/" + v.format(session, bids_id, session, run_id)) + filepath = Path(f"{subject_dir}/" + v.format(session, fname)) # print(f"filepath: {filepath}") filepath_status = Path.is_file(filepath) @@ -43,16 +77,17 @@ def check_staus(subject_dir, session_id, run_id, file_dict): return status_msg -def check_T1w(subject_dir, session_id, run_id): - return check_staus(subject_dir, session_id, run_id, T1w_files_dict) +def check_T1w(subject_dir, session_id, run_id, acq_label=None, task_label=None): + suffix = "T1w" + return check_status(subject_dir, session_id, run_id, acq_label, task_label, suffix, anat_files_dict) -def check_T2w(subject_dir, session_id,run_id): - return check_staus(subject_dir, session_id, run_id, T2w_files_dict) - -def check_func(subject_dir, session_id, run_id): - return check_staus(subject_dir, session_id, run_id, func_files_dict) - +def check_T2w(subject_dir, session_id,run_id, acq_label=None, task_label=None): + suffix = "T2w" + return check_status(subject_dir, session_id, run_id, acq_label, task_label, suffix, anat_files_dict) +def check_func(subject_dir, session_id, run_id, acq_label=None, task_label="rest"): + suffix = "bold" + return check_status(subject_dir, session_id, run_id, acq_label, task_label, suffix, func_files_dict) tracker_configs = { "pipeline_complete": check_T1w, diff --git a/nipoppy/trackers/run_tracker.py b/nipoppy/trackers/run_tracker.py index e26844f9..dbde6357 100755 --- a/nipoppy/trackers/run_tracker.py +++ b/nipoppy/trackers/run_tracker.py @@ -9,7 +9,7 @@ import nipoppy.workflow.logger as my_logger from nipoppy.trackers.tracker import Tracker, get_start_time, get_end_time, UNAVAILABLE, TRUE -from nipoppy.trackers import fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker +from nipoppy.trackers import bids_tracker, fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker from nipoppy.workflow.utils import ( BIDS_SUBJECT_PREFIX, BIDS_SESSION_PREFIX, @@ -35,8 +35,9 @@ "tractoflow": tractoflow_tracker.tracker_configs, } BIDS_PIPES = ["mriqc","fmriprep", "tractoflow"] +NO_TRACKER_PIPES = ["maget_brain"] -def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, logger=None, log_level="INFO"): +def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1", acq_label=None, logger=None, log_level="INFO"): """ driver code running pipeline specific trackers """ DATASET_ROOT = global_configs["DATASET_ROOT"] @@ -46,7 +47,7 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, # Grab BIDS participants from the doughnut doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/doughnut.csv" - doughnut_df = load_status(doughnut_file) + doughnut_df = load_doughnut(doughnut_file) # logging log_dir = f"{DATASET_ROOT}/scratch/logs/" @@ -59,9 +60,10 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, if session_id == "ALL": sessions = global_configs["SESSIONS"] else: - sessions = [f"ses-{session_id}"] + sessions = [f"{BIDS_SESSION_PREFIX}{session_id}"] - logger.info(f"tracking session_ids: {session_ids}") + logger.info(f"tracking session: {sessions}") + logger.info(f"tracking run: {run_id} and acq_label: {acq_label}") for pipeline in pipelines: pipe_tracker = Tracker(global_configs, dash_schema_file, pipeline) @@ -74,7 +76,11 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, version = global_configs["PROC_PIPELINES"][pipeline]["VERSION"] schema = pipe_tracker.get_dash_schema() - tracker_configs = pipeline_tracker_config_dict[pipeline] + + if pipeline in list(pipeline_tracker_config_dict.keys()): + tracker_configs = pipeline_tracker_config_dict[pipeline] + else: + logger.warning(f"Skipping pipeline: {pipeline}. Tracker not listed in the config") # Grab BIDS participants from the doughnut doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/{FNAME_DOUGHNUT}" @@ -84,7 +90,7 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, logger.info("-"*50) logger.info(f"pipeline: {pipeline}, version: {version}") - logger.info(f"n_participants_total: {n_participants_total}, session_ids: {session_ids}") + logger.info(f"n_participants_total: {n_participants_total}, sessions: {sessions}") logger.info("-"*50) status_check_dict = pipe_tracker.get_pipe_tasks(tracker_configs, PIPELINE_STATUS_COLUMNS, pipeline, version) @@ -92,9 +98,11 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, # only use non-prefixed columns at this stage # for prefixed columns we need to generate the column name dash_col_list = list(key for key, value in schema["GLOBAL_COLUMNS"].items() if not value["IsPrefixedColumn"]) + # status_check_dict will typically only have minimal pipeline_complete key + dash_col_list = dash_col_list + list(status_check_dict.keys()) - for session_id in session_ids: - session = session_id_to_bids_session(session_id) + for session in sessions: + session_id = session.removeprefix(BIDS_SESSION_PREFIX) logger.info(f"Checking session: {session}") participants_session = doughnut_df[(doughnut_df[COL_BIDS_ID_MANIFEST].isin(participants_total)) & (doughnut_df[COL_SESSION_MANIFEST] == session)][COL_BIDS_ID_MANIFEST].drop_duplicates().astype(str).str.strip().values @@ -120,7 +128,7 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, logger.debug(f"bids_dir: {bids_dir}") logger.debug(f"bids_layout: {bids_layout.get_subjects()}") - fpath_bagel = Path(dataset_root, 'derivatives', FNAME_BAGEL) + fpath_bagel = Path(DATASET_ROOT, 'derivatives', FNAME_BAGEL) if fpath_bagel.exists(): df_bagel_old_full = load_bagel(fpath_bagel) @@ -157,18 +165,20 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/{session}/{bids_id}" elif pipeline in BIDS_PIPES: subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/{bids_id}" + elif pipeline in NO_TRACKER_PIPES: + logger.warning(f"pipeline: {pipeline} does not have a tracker yet...") else: logger.error(f"unknown pipeline: {pipeline}") dir_status = Path(subject_dir).is_dir() logger.debug(f"subject_dir:{subject_dir}, dir_status: {dir_status}") - if dir_status: + if dir_status: for name, func in status_check_dict.items(): if pipeline == "heudiconv": - status = func(bids_layout, participant_id, session_id, run_id) + status = func(bids_layout, participant_id, session_id, run_id, acq_label) else: - status = func(subject_dir, session_id, run_id) + status = func(subject_dir, session_id, run_id, acq_label) logger.debug(f"task_name: {name}, status: {status}") @@ -177,7 +187,7 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, # TODO only check files listed in the tracker config _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # get_end_time(subject_dir) else: - logger.warning(f"Output for pipeline: {pipeline} not found for bids_id: {bids_id}, session: {session}") + logger.debug(f"Output for pipeline: {pipeline} not found for bids_id: {bids_id}, session: {session}") for name in status_check_dict.keys(): _df.loc[bids_id,name] = UNAVAILABLE _df.loc[bids_id,"pipeline_starttime"] = UNAVAILABLE @@ -192,7 +202,7 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, # don't write a new file if no changes try: if len(df_bagel.compare(df_bagel_old_full)) == 0: - logger.info(f'\nNo change in bagel file for pipeline {pipeline}, session {session}') + logger.info(f'No change in bagel file for pipeline {pipeline}, session {session}') continue except Exception: pass @@ -231,7 +241,9 @@ def time_converter(value): parser.add_argument('--global_config', type=str, help='path to global config file for your nipoppy dataset', required=True) parser.add_argument('--dash_schema', type=str, help='path to dashboard schema to display tracker status', required=True) parser.add_argument('--pipelines', nargs='+', help='list of pipelines to track', required=True) - parser.add_argument('--session_id', type=str, default="ALL", help='session_id (default = ALL)') + parser.add_argument('--session_id', type=str, default="ALL", help='bids session_id') + parser.add_argument('--run_id', type=str, default="1", help='bids run_id') + parser.add_argument('--acq_label', type=str, default=None, help='bids acq label') parser.add_argument('--log_level', type=str, default="INFO", help='log level') args = parser.parse_args() @@ -246,6 +258,8 @@ def time_converter(value): dash_schema_file = args.dash_schema pipelines = args.pipelines session_id = args.session_id + run_id = args.run_id + acq_label = args.acq_label log_level = args.log_level - run(global_configs, dash_schema_file, pipelines, session_id, log_level=log_level) + run(global_configs, dash_schema_file, pipelines, session_id, run_id, acq_label, log_level=log_level) diff --git a/nipoppy/trackers/tractoflow_tracker.py b/nipoppy/trackers/tractoflow_tracker.py index ed5fa7bb..43468a8b 100644 --- a/nipoppy/trackers/tractoflow_tracker.py +++ b/nipoppy/trackers/tractoflow_tracker.py @@ -140,7 +140,7 @@ def check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_ # status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIFODF') # return status_msg -def check_tf_final(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +def check_tf_final(subject_dir, session_id, run_id, acq_label=None, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): """ Call the function to check for output files with the parameters set to check all the stages """ status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='All') diff --git a/nipoppy/workflow/dicom_org/check_dicom_status.py b/nipoppy/workflow/make_doughnut.py similarity index 98% rename from nipoppy/workflow/dicom_org/check_dicom_status.py rename to nipoppy/workflow/make_doughnut.py index 136c5bf4..c5008b1e 100755 --- a/nipoppy/workflow/dicom_org/check_dicom_status.py +++ b/nipoppy/workflow/make_doughnut.py @@ -107,7 +107,7 @@ def run(global_config_file, regenerate=False, empty=False): map_file = None # generate bids_id - df_status[COL_BIDS_ID_MANIFEST] = df_status.apply( + df_doughnut[COL_BIDS_ID_MANIFEST] = df_doughnut.apply( lambda row: participant_id_to_bids_id( row[COL_SUBJECT_MANIFEST], map_file), @@ -222,7 +222,7 @@ def check_dir(dpath): return False dpath = Path(dpath) - status = pd.Series(np.nan, index=df.index) + status = pd.Series(np.nan, index=df.index, dtype=bool) for session in df[COL_SESSION_MANIFEST].drop_duplicates(): if pd.isna(session): continue