From 639517e9511b9ac6868b4bfb09b8fd9363a7420a Mon Sep 17 00:00:00 2001 From: Nikhil Bhagwat Date: Mon, 2 Oct 2023 22:22:33 -0400 Subject: [PATCH] Updates to trackers and global variable handling (#163) * moving sample_global_configs.json and tree.json to the nipoppy subdir * fixed import paths after refactor * fixed import paths after refactor * refactored and cleaned up mriqc run script * refactored and cleaned up mriqc run script * Started tracker refactoring to use doughnut * added a catalog function to identify new proc-participants from bagel and doughnut * added a catalog function to identify new proc-participants from bagel and doughnut * added custom (new subjects only) pybids layout (sqldb) generation * fixed sessions loop and incorporated utils.load_status into run_tracker.py * fixed import path errors and incorporated tracker.py for status flags * fixed global var imports and logging levels * updated sample_run_nipoppy to set log-level and prototypical mriqc run with tracker * updated bids_tracker to match proc_pipe tracker schema * minor fixes and comments * fixed Pandas future warning on setting an item of incompatible dtype * fixed another Pandas future warning on setting an item of incompatible dtype * 1) Updated mriqc and fmirpre run scripts to bind complete bids_dir path, 2) added sqldb generation with ignore list for subjects and datatype+acq, 3) updated sample_run_nipoppy.py to show the these two functionalities. * fixed fmriprep pytest * fixed codespell * fixed NM filename pattern * added functionality to custom map participant_id to bids_id * fixed minor codespell errors --- nipoppy/extractors/freesurfer/run_FS_utils.py | 2 +- nipoppy/sample_run_nipoppy.py | 162 +++++++++++++++- nipoppy/trackers/bids_tracker.py | 179 +++++++++--------- nipoppy/trackers/mriqc_tracker.py | 7 +- nipoppy/trackers/run_tracker.py | 72 +++++-- nipoppy/workflow/bids_conv/run_bids_conv.py | 2 +- nipoppy/workflow/catalog.py | 63 ++++-- .../workflow/dicom_org/check_dicom_status.py | 20 +- .../proc_pipe/fmriprep/run_fmriprep.py | 19 +- nipoppy/workflow/proc_pipe/mriqc/run_mriqc.py | 32 ++-- nipoppy/workflow/utils.py | 9 +- tests/test_workflow_fmriprep.py | 13 +- 12 files changed, 413 insertions(+), 167 deletions(-) diff --git a/nipoppy/extractors/freesurfer/run_FS_utils.py b/nipoppy/extractors/freesurfer/run_FS_utils.py index dbd42385..eab0e7b1 100644 --- a/nipoppy/extractors/freesurfer/run_FS_utils.py +++ b/nipoppy/extractors/freesurfer/run_FS_utils.py @@ -161,5 +161,5 @@ def run(FS_dir, participants_list, out_file, meas, fwhm, template): print("-"*50) else: print("-"*50) - print("No partcipants found to process...") + print("No participants found to process...") print("-"*50) \ No newline at end of file diff --git a/nipoppy/sample_run_nipoppy.py b/nipoppy/sample_run_nipoppy.py index ce74692e..58ec99cd 100644 --- a/nipoppy/sample_run_nipoppy.py +++ b/nipoppy/sample_run_nipoppy.py @@ -2,9 +2,21 @@ from pathlib import Path import argparse import json +import pandas as pd +import shutil +import numpy as np +from glob import glob +from joblib import Parallel, delayed import nipoppy.workflow.logger as my_logger +# from nipoppy.workflow.tabular import generate_manifest from nipoppy.workflow.dicom_org import run_dicom_org +from nipoppy.workflow.dicom_org import check_dicom_status from nipoppy.workflow.bids_conv import run_bids_conv +from nipoppy.workflow.proc_pipe.mriqc import run_mriqc +from nipoppy.workflow.proc_pipe.fmriprep import run_fmriprep +from nipoppy.workflow.catalog import get_new_proc_participants +from nipoppy.workflow.catalog import generate_pybids_index +from nipoppy.trackers import run_tracker # argparse HELPTEXT = """ @@ -26,30 +38,168 @@ log_dir = f"{DATASET_ROOT}/scratch/logs/" log_file = f"{log_dir}/nipoppy.log" +# Used to run trackers to identify new participants to process +dash_schema_file = f"{DATASET_ROOT}/proc/bagel_schema.json" + +# bids_db_path +# TODO create a common BIDS_DB for mriqc and fmriprep once relative_paths issue is resolved +FMRIPREP_VERSION = global_configs["PROC_PIPELINES"]["fmriprep"]["VERSION"] +output_dir = f"{DATASET_ROOT}/derivatives/" +fmriprep_dir = f"{output_dir}/fmriprep/v{FMRIPREP_VERSION}" +# bids_db_path = f"{fmriprep_dir}/first_run/bids_db/" + session_id = args.session_id +session = f"ses-{session_id}" + n_jobs = args.n_jobs +MAX_BATCH = 10 # Max number of participants to run BEFORE cleaning up intermediate files -logger = my_logger.get_logger(log_file) +logger = my_logger.get_logger(log_file, level="INFO") logger.info("-"*75) logger.info(f"Starting nipoppy for {DATASET_ROOT} dataset...") -logger.info(f"dataset session (i.e visit): {session_id}") +logger.info("-"*75) + +logger.info(f"dataset session (i.e visit): {session}") logger.info(f"Running {n_jobs} jobs in parallel") workflows = global_configs["WORKFLOWS"] -logger.info(f"Running {workflows} serially") +logger.info(f"Running workflows: {workflows} serially") for wf in workflows: logger.info("-"*50) logger.info(f"Starting workflow: {wf}") - if wf == "dicom_org": - run_dicom_org.run(global_configs, session_id, n_jobs=n_jobs) + logger.info("-"*50) + + if wf == "generate_manifest": + logger.info(f"***All sessions are fetched while generating manifest***") + # generate_manifest.run(global_configs, task="regenerate", dash_bagel=True, logger=logger) + check_dicom_status.run(global_config_file, regenerate=True, empty=False) + + elif wf == "dicom_org": + run_dicom_org.run(global_configs, session_id, n_jobs=n_jobs, logger=logger) + elif wf == "bids_conv": - run_bids_conv.run(global_configs, session_id, n_jobs=n_jobs) + run_bids_conv.run(global_configs, session_id, n_jobs=n_jobs, logger=logger) + + elif wf == "mriqc": + # Supported modalities (i.e. suffixes) for MRIQC + modalities = ["T1w", "T2w"] + ignore_patterns = ["/anat/{}_{}_acq-NM_{}","/anat/{}_{}_{}_FLAIR", + "/fmap/", + "/swi/", + "/perf"] + + # Run mriqc tracker to regenerate bagel + run_tracker.run(global_configs, dash_schema_file, ["mriqc"], logger=logger) + + proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline="mriqc", logger=logger) + n_proc_participants = len(proc_participants) + + if n_proc_participants > 0: + logger.info(f"Running MRIQC on {n_proc_participants} participants from session: {session} and for modalities: {modalities}") + # The default bids_db_path is proc/bids_db_{pipeline} + bids_db_path = generate_pybids_index(global_configs, session_id, pipeline="mriqc", + ignore_patterns=ignore_patterns, logger=logger) + logger.info(f"bids_db_path: {bids_db_path}") + # bids_db_path = None + + if n_jobs > 1: + # Process in parallel! (Won't write to logs) + mriqc_results = Parallel(n_jobs=n_jobs)(delayed(run_mriqc.run)( + global_configs=global_configs, session_id=session_id, participant_id=participant_id, + modalities=modalities, output_dir=None, logger=logger) + for participant_id in proc_participants) + + else: + # Useful for debugging + mriqc_results = [] + for participant_id in proc_participants: + res = run_mriqc.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id, + modalities=modalities, output_dir=None, logger=logger) + mriqc_results.append(res) + + # Rerun tracker for updated bagel + run_tracker.run(global_configs, dash_schema_file, ["mriqc"], logger=logger) + + else: + logger.info(f"No new participants to run MRIQC on for session: {session}") + + elif wf == "fmriprep": + ignore_patterns = ["/anat/{}_{}_{}_NM","/anat/{}_{}_{}_echo", + "/dwi/" + "/swi/", + "/perf"] + # Run fmriprep tracker to regenerate bagel + run_tracker.run(global_configs, dash_schema_file, ["fmriprep"], logger=logger) + + proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline="fmriprep", logger=logger) + n_proc_participants = len(proc_participants) + + if n_proc_participants > 0: + # remove old bids_db + sql_db_file = f"{DATASET_ROOT}/proc/bids_db_fmriprep/bids_db.sqlite" + logger.info(f"Removing old bids_db from {sql_db_file}") + if os.path.exists(sql_db_file): + os.remove(sql_db_file) + + logger.info(f"Running fmriprep on {n_proc_participants} participants from session: {session} and for modalities: {modalities}") + # The default bids_db_path is proc/bids_db_{pipeline} + bids_db_path = generate_pybids_index(global_configs, session_id, pipeline="fmriprep", + ignore_patterns=ignore_patterns, logger=logger) + logger.info(f"bids_db_path: {bids_db_path}") + + # Don't run more than MAX_BATCH participants in parallel + # Also clean-up after MAX_BATCH participants to avoid storage issues + if n_proc_participants > MAX_BATCH: + n_batches = int(np.ceil(n_proc_participants/MAX_BATCH)) + logger.info(f"Running fmriprep in {n_batches} batches of at most {MAX_BATCH} participants each") + proc_participant_batches = np.array_split(proc_participants, n_batches) + + else: + proc_participant_batches = [proc_participants] + + for proc_participant_batch in proc_participant_batches: + proc_participant_batch = list(proc_participant_batch) + logger.info(f"Running fmriprep on participants: {proc_participant_batch}") + if n_jobs > 1: + # Process in parallel! (Won't write to logs) + fmiprep_results = Parallel(n_jobs=n_jobs)(delayed(run_fmriprep.run)( + global_configs=global_configs, session_id=session_id, participant_id=participant_id, + output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) + for participant_id in proc_participant_batch) + + else: + # Useful for debugging + fmiprep_results = [] + for participant_id in proc_participant_batch: + res = run_fmriprep.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id, + output_dir=None, anat_only=False, use_bids_filter=True, logger=logger) + fmiprep_results.append(res) + + # Clean up intermediate files + logger.info(f"Cleaning up intermediate files from {fmriprep_dir}") + fmriprep_wf_dir = glob(f"{fmriprep_dir}/fmriprep*wf") + subject_home_dirs = glob(f"{fmriprep_dir}/output/fmriprep_home_sub-*") + run_toml_dirs = glob(f"{fmriprep_dir}/2023*") + + logger.info(f"fmriprep_wf_dir:\n{fmriprep_wf_dir}") + logger.info(f"subject_home_dirs:\n{subject_home_dirs}") + logger.info(f"run_toml_dirs:\n{run_toml_dirs}") + + for cleanup_dir in fmriprep_wf_dir + subject_home_dirs + run_toml_dirs: + shutil.rmtree(cleanup_dir) + + # Rerun tracker for updated bagel + run_tracker.run(global_configs, dash_schema_file, ["fmriprep"], logger=logger) + else: logger.error(f"Unknown workflow: {wf}") + + logger.info("-"*50) logger.info(f"Finishing workflow: {wf}") logger.info("-"*50) +logger.info("-"*75) logger.info(f"Finishing nipoppy run...") logger.info("-"*75) \ No newline at end of file diff --git a/nipoppy/trackers/bids_tracker.py b/nipoppy/trackers/bids_tracker.py index c8b3e4aa..70b89961 100644 --- a/nipoppy/trackers/bids_tracker.py +++ b/nipoppy/trackers/bids_tracker.py @@ -1,94 +1,95 @@ -import numpy as np -import pandas as pd from pathlib import Path -import argparse -import glob -from bids import BIDSLayout import os - -HELPTEXT = """ -Script to check participant-session availability -""" -#Author: nikhil153 -#Date: 1-Dec-2022 - -modality_suffic_dict = { - "anat": "T1w", - "dwi": "dwi" +# Status flags +from nipoppy.trackers.tracker import ( + SUCCESS, + FAIL +) + +from nipoppy.workflow.utils import participant_id_to_dicom_id + +# File dicts per BIDS datatype +# "datatype":"suffix" +files_dict = { + "anat": ["T1w"], + "dwi": ["dwi"], + "fmap": ["phasediff", "magnitude1", "magnitude2"], + "func": ["bold"] } -# argparse -parser = argparse.ArgumentParser(description=HELPTEXT) - -# data -parser.add_argument('--bids_dir', help='path to bids_dir with all the subjects') -parser.add_argument('--modalities', nargs='*', default=["anat"], - help='modalities to check') -parser.add_argument('--file_ext', default='nii.gz', help='file extension to query') -parser.add_argument('--output_csv', help='path to output csv file') - -args = parser.parse_args() -bids_dir = args.bids_dir -modalities = args.modalities -file_ext = args.file_ext - -print(f"Validating output in: {modalities}") - -output_csv = args.output_csv -participants_tsv = f"{bids_dir}/participants.tsv" - -# Check participants tsv and actual participant dirs -tsv_participants = set(pd.read_csv(participants_tsv,sep="\t")["participant_id"].values) -bids_dir_paths = glob.glob(f"{bids_dir}/sub*") -bids_dir_participants = set([os.path.basename(x) for x in bids_dir_paths]) - -participants_missing_in_tsv = list(bids_dir_participants - tsv_participants) -participants_missing_in_bids_dir = list(tsv_participants - bids_dir_participants) - -print(f"n_participants_tsv: {len(tsv_participants)}, \ - n_participants_bids_dir: {len(bids_dir_participants)}, \ - n_participants_missing_in_tsv: {len(participants_missing_in_tsv)}, \ - n_participants_missing_in_bids_dir: {len(participants_missing_in_bids_dir)}") - -if tsv_participants == bids_dir_participants: - layout = BIDSLayout(bids_dir) - sessions = layout.get_sessions() - - bids_status_df = pd.DataFrame() - for participant in tsv_participants: - participant_id = participant.split("-",2)[1] - - session_df = pd.DataFrame(index=sessions, columns=modalities) - for ses in sessions: - f_count = [] - for modality in modalities: - file_suffix = modality_suffic_dict[modality] - f = layout.get(subject=participant_id, - session=ses, - extension=file_ext, - suffix=file_suffix, - return_type='filename') - - f_count.append(len(f)) - - session_df.loc[ses] = f_count - - session_df = session_df.reset_index().rename(columns={"index":"session_id"}) - session_df["participant_id"] = participant - bids_status_df = bids_status_df.append(session_df) - - print(f"Saving bids_status_df at {output_csv}") - bids_status_df = bids_status_df.set_index("participant_id") - bids_status_df.to_csv(output_csv) - -else: - print(f"participants_tsv and bids_dir participants mismatch...") - output_csv = os.path.join(os.path.dirname(output_csv) + "/mismatched_participants.csv") - missing_tsv_status = len(participants_missing_in_tsv) * ["participants_missing_in_tsv"] - missing_bids_status = len(participants_missing_in_bids_dir) * ["participants_missing_in_bids_dir"] - missing_df = pd.DataFrame() - missing_df["participant_id"] = participants_missing_in_tsv + participants_missing_in_bids_dir - missing_df["status"] = missing_tsv_status + missing_bids_status - print(f"Saving missing participants csv at {output_csv}") - missing_df.to_csv(output_csv,index=None) +def check_staus(bids_layout, participant_id, session_id, run_id, datatype): + # Remove non-alphanumeric characters from participant_id + participant_id = participant_id_to_dicom_id(participant_id) + suffix_list = files_dict[datatype] + filepath_status_list = [] + for suffix in suffix_list: + scan_file = bids_layout.get(subject=participant_id, + session=session_id, + datatype=datatype, + run=run_id, + suffix=suffix, + extension='nii.gz') + + sidecar_file = bids_layout.get(subject=participant_id, + session=session_id, + datatype=datatype, + run=run_id, + suffix=suffix, + extension='json') + + + if (len(scan_file) > 0) & (len(sidecar_file) > 0): + filepath_status = (Path.is_file(Path(scan_file[0].path))) & (Path.is_file(Path(sidecar_file[0].path))) + else: + filepath_status = False + + filepath_status_list.append(filepath_status) + + if not any(filepath_status_list): + status_msg = FAIL + else: + status_msg = SUCCESS + + return status_msg + +def check_T1w(bids_layout, participant_id, session_id, run_id): + datatype = "anat" + status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + return status + +def check_dwi(bids_layout, participant_id, session_id, run_id): + datatype = "dwi" + status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + return status + +def check_fmap(bids_layout, participant_id, session_id, run_id): + datatype = "fmap" + status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + return status + +def check_func(bids_layout, participant_id, session_id, run_id): + datatype = "func" + status = check_staus(bids_layout, participant_id, session_id, run_id, datatype) + return status + +def check_structural(bids_layout, participant_id, session_id, run_id): + T1_status = check_T1w(bids_layout, participant_id, session_id, run_id) + dwi_status = check_dwi(bids_layout, participant_id, session_id, run_id) + if (T1_status == SUCCESS) & (dwi_status == SUCCESS): + status = SUCCESS + else: + status = FAIL + + return status + +tracker_configs = { + "pipeline_complete": check_structural, + + "PHASE__": { + "anat": check_T1w, + "dwi": check_dwi, + "fmap": check_fmap, + "func": check_func + } +} \ No newline at end of file diff --git a/nipoppy/trackers/mriqc_tracker.py b/nipoppy/trackers/mriqc_tracker.py index 2e6c8b34..d92cec53 100644 --- a/nipoppy/trackers/mriqc_tracker.py +++ b/nipoppy/trackers/mriqc_tracker.py @@ -22,13 +22,14 @@ } def check_staus(subject_dir, session_id, run_id, file_dict): - participant_id = os.path.basename(subject_dir) + bids_id = os.path.basename(subject_dir) + session = f"ses-{session_id}" filepath_status_list = [] for k,v in file_dict.items(): if k == "html": - filepath = Path(f"{subject_dir}/" + v.format(participant_id, session_id, run_id)) + filepath = Path(f"{subject_dir}/" + v.format(bids_id, session, run_id)) else: - filepath = Path(f"{subject_dir}/" + v.format(session_id, participant_id, session_id, run_id)) + filepath = Path(f"{subject_dir}/" + v.format(session, bids_id, session, run_id)) # print(f"filepath: {filepath}") filepath_status = Path.is_file(filepath) diff --git a/nipoppy/trackers/run_tracker.py b/nipoppy/trackers/run_tracker.py index 03dd5baf..e26844f9 100755 --- a/nipoppy/trackers/run_tracker.py +++ b/nipoppy/trackers/run_tracker.py @@ -1,15 +1,18 @@ #!/usr/bin/env python import argparse import json +import bids +import json import warnings from pathlib import Path - import pandas as pd import nipoppy.workflow.logger as my_logger from nipoppy.trackers.tracker import Tracker, get_start_time, get_end_time, UNAVAILABLE, TRUE from nipoppy.trackers import fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker from nipoppy.workflow.utils import ( + BIDS_SUBJECT_PREFIX, + BIDS_SESSION_PREFIX, COL_SUBJECT_MANIFEST, COL_BIDS_ID_MANIFEST, COL_SESSION_MANIFEST, @@ -25,6 +28,7 @@ # Globals PIPELINE_STATUS_COLUMNS = "PIPELINE_STATUS_COLUMNS" pipeline_tracker_config_dict = { + "heudiconv": bids_tracker.tracker_configs, "freesurfer": fs_tracker.tracker_configs, "fmriprep": fmriprep_tracker.tracker_configs, "mriqc": mriqc_tracker.tracker_configs, @@ -32,30 +36,43 @@ } BIDS_PIPES = ["mriqc","fmriprep", "tractoflow"] -def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, logger=None): +def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, logger=None, log_level="INFO"): """ driver code running pipeline specific trackers """ DATASET_ROOT = global_configs["DATASET_ROOT"] + # for bids tracker + bids_dir = f"{DATASET_ROOT}/bids/" + + # Grab BIDS participants from the doughnut + doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/doughnut.csv" + doughnut_df = load_status(doughnut_file) + # logging log_dir = f"{DATASET_ROOT}/scratch/logs/" if logger is None: - log_file = f"{log_dir}/mriqc.log" - logger = my_logger.get_logger(log_file) + log_file = f"{log_dir}/tracker.log" + logger = my_logger.get_logger(log_file, level=log_level) logger.info(f"Tracking pipelines: {pipelines}") if session_id == "ALL": - session_ids = global_configs["SESSIONS"] + sessions = global_configs["SESSIONS"] else: - session_ids = [session_id] + sessions = [f"ses-{session_id}"] logger.info(f"tracking session_ids: {session_ids}") for pipeline in pipelines: pipe_tracker = Tracker(global_configs, dash_schema_file, pipeline) - dataset_root, _, version = pipe_tracker.get_global_configs() + # TODO revise tracker class + # DATASET_ROOT, session_ids, version = pipe_tracker.get_global_configs() + if pipeline == "heudiconv": + version = global_configs["BIDS"][pipeline]["VERSION"] + else: + version = global_configs["PROC_PIPELINES"][pipeline]["VERSION"] + schema = pipe_tracker.get_dash_schema() tracker_configs = pipeline_tracker_config_dict[pipeline] @@ -90,6 +107,19 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, _df["pipeline_version"] = version _df["has_mri_data"] = TRUE # everyone in the doughnut file has MRI data + # Set correct dtype based on dash schema to avoid panads warning + # i.e. "FutureWarning: Setting an item of incompatible dtype" + dash_col_dtype = "str" + for dash_col, _ in status_check_dict.items(): + _df[dash_col] = _df[dash_col].astype(dash_col_dtype) + + # BIDS (i.e. heudiconv tracker is slightly different than proc_pipes) + if pipeline == "heudiconv": + # Generate BIDSLayout only once per tracker run and not for each participant + bids_layout = bids.BIDSLayout(bids_dir, validate=False) + logger.debug(f"bids_dir: {bids_dir}") + logger.debug(f"bids_layout: {bids_layout.get_subjects()}") + fpath_bagel = Path(dataset_root, 'derivatives', FNAME_BAGEL) if fpath_bagel.exists(): df_bagel_old_full = load_bagel(fpath_bagel) @@ -121,26 +151,33 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, _df.loc[bids_id, COL_SUBJECT_MANIFEST] = participant_id _df.loc[bids_id, COL_BIDS_ID_MANIFEST] = bids_id - if pipeline == "freesurfer": - subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/ses-{session_id}/{bids_id}" + if pipeline == "heudiconv": + subject_dir = f"{DATASET_ROOT}/bids/{bids_id}" + elif pipeline == "freesurfer": + subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/{session}/{bids_id}" elif pipeline in BIDS_PIPES: subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/{bids_id}" else: - logger.info(f"unknown pipeline: {pipeline}") + logger.error(f"unknown pipeline: {pipeline}") dir_status = Path(subject_dir).is_dir() logger.debug(f"subject_dir:{subject_dir}, dir_status: {dir_status}") if dir_status: for name, func in status_check_dict.items(): - status = func(subject_dir, session_id, run_id) - logger.info(f"task_name: {name}, status: {status}") + if pipeline == "heudiconv": + status = func(bids_layout, participant_id, session_id, run_id) + else: + status = func(subject_dir, session_id, run_id) + + logger.debug(f"task_name: {name}, status: {status}") + _df.loc[bids_id,name] = status _df.loc[bids_id,"pipeline_starttime"] = get_start_time(subject_dir) # TODO only check files listed in the tracker config _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # get_end_time(subject_dir) else: - logger.error(f"Output for pipeline: {pipeline} not found for bids_id: {bids_id}, session: {session}") + logger.warning(f"Output for pipeline: {pipeline} not found for bids_id: {bids_id}, session: {session}") for name in status_check_dict.keys(): _df.loc[bids_id,name] = UNAVAILABLE _df.loc[bids_id,"pipeline_starttime"] = UNAVAILABLE @@ -164,7 +201,6 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, save_backup(df_bagel, fpath_bagel, DNAME_BACKUPS_BAGEL) def load_bagel(fpath_bagel): - def time_converter(value): # convert to datetime if possible if str(value) != UNAVAILABLE: @@ -183,7 +219,7 @@ def time_converter(value): 'pipeline_endtime': time_converter, } ) - + return df_bagel if __name__ == '__main__': @@ -195,7 +231,8 @@ def time_converter(value): parser.add_argument('--global_config', type=str, help='path to global config file for your nipoppy dataset', required=True) parser.add_argument('--dash_schema', type=str, help='path to dashboard schema to display tracker status', required=True) parser.add_argument('--pipelines', nargs='+', help='list of pipelines to track', required=True) - parser.add_argument('--session_id', type=str, default="ALL", help='session_id (default = ALL') + parser.add_argument('--session_id', type=str, default="ALL", help='session_id (default = ALL)') + parser.add_argument('--log_level', type=str, default="INFO", help='log level') args = parser.parse_args() # read global configs @@ -209,5 +246,6 @@ def time_converter(value): dash_schema_file = args.dash_schema pipelines = args.pipelines session_id = args.session_id + log_level = args.log_level - run(global_configs, dash_schema_file, pipelines, session_id) + run(global_configs, dash_schema_file, pipelines, session_id, log_level=log_level) diff --git a/nipoppy/workflow/bids_conv/run_bids_conv.py b/nipoppy/workflow/bids_conv/run_bids_conv.py index cf460197..04f2733f 100755 --- a/nipoppy/workflow/bids_conv/run_bids_conv.py +++ b/nipoppy/workflow/bids_conv/run_bids_conv.py @@ -163,7 +163,7 @@ def run(global_configs, session_id, stage=2, overlays=None, n_jobs=2, dicom_id=N n_heudiconv_success = np.sum(heudiconv_results) logger.info(f"Successfully ran Heudiconv (Stage 1 or Stage 2) for {n_heudiconv_success} out of {n_heudiconv_participants} participants") - # Check succussful bids (NOTE: will count partial conversion as successful) + # Check successful bids (NOTE: will count partial conversion as successful) participants_with_bids = { parse_file_entities(dpath)['subject'] for dpath in diff --git a/nipoppy/workflow/catalog.py b/nipoppy/workflow/catalog.py index a85fb131..db2d1080 100644 --- a/nipoppy/workflow/catalog.py +++ b/nipoppy/workflow/catalog.py @@ -11,9 +11,16 @@ COL_ORG_STATUS, COL_SESSION_MANIFEST, COL_SUBJECT_MANIFEST, + COL_BIDS_ID_MANIFEST, load_doughnut, ) +from nipoppy.trackers.tracker import ( + SUCCESS, + FAIL, + UNAVAILABLE +) + def read_and_process_doughnut(fpath_doughnut, session_id, logger): # read current participant manifest df_doughnut = load_doughnut(fpath_doughnut) @@ -162,8 +169,8 @@ def get_new_proc_participants(global_configs, session_id, pipeline, logger): # Grab BIDS participants from the doughnut doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/doughnut.csv" doughnut_df = pd.read_csv(doughnut_file) - doughnut_df["converted"] = doughnut_df["converted"].astype(bool) - bids_participants = doughnut_df[(doughnut_df["session"]==session) & (doughnut_df["converted"])]["bids_id"].unique() + doughnut_df[COL_CONV_STATUS] = doughnut_df[COL_CONV_STATUS].astype(bool) + bids_participants = doughnut_df[(doughnut_df[COL_SESSION_MANIFEST]==session) & (doughnut_df[COL_CONV_STATUS])][COL_BIDS_ID_MANIFEST].unique() n_bids_participants = len(bids_participants) logger.info(f"n_bids_participants: {n_bids_participants}, session_id: {session_id}") @@ -171,9 +178,9 @@ def get_new_proc_participants(global_configs, session_id, pipeline, logger): # Grab processed participants from the bagel bagel_file = f"{DATASET_ROOT}/derivatives/bagel.csv" bagel_df = pd.read_csv(bagel_file) - bagel_df = bagel_df[bagel_df["session"] == session] + bagel_df = bagel_df[bagel_df[COL_SESSION_MANIFEST] == session] bagel_df = bagel_df[(bagel_df["pipeline_name"] == pipeline) & (bagel_df["pipeline_version"] == pipeline_version)] - on_disk_participants = bagel_df[bagel_df["pipeline_complete"]=="SUCCESS"]["bids_id"].unique() + on_disk_participants = bagel_df[bagel_df["pipeline_complete"]==SUCCESS][COL_BIDS_ID_MANIFEST].unique() n_on_disk_participants = len(on_disk_participants) logger.info(f"n_on_disk_participants: {n_on_disk_participants}") @@ -183,31 +190,47 @@ def get_new_proc_participants(global_configs, session_id, pipeline, logger): n_new_proc_participants = len(new_proc_participants) logger.info(f"n_new_proc_participants: {n_new_proc_participants}") - return new_proc_participants + return new_proc_participants, on_disk_participants -def generate_pybids_index(global_configs, session_id, pipeline, logger, bids_db_path=None): +# NOTE - currently not using it because of pybids warning on "absolute_paths=False" +def generate_pybids_index(global_configs, session_id, pipeline, ignore_patterns, logger, run_id=1, bids_db_path=None): """ Generates a pybids index for a selected list of bids_ids using --ignore argument. You can pass a list of folder names, or a regex pattern to the ignore argument. """ DATASET_ROOT = global_configs["DATASET_ROOT"] bids_dir = f"{DATASET_ROOT}/bids" - - pipeline_version = global_configs["PROC_PIPELINES"][pipeline]["VERSION"] - session = f"ses-{session_id}" if bids_db_path == None: - bids_db_path = f"{DATASET_ROOT}/proc/bids_db" + bids_db_path = f"{DATASET_ROOT}/proc/bids_db_{pipeline}" - # Grab processed participants from the bagel - bagel_file = f"{DATASET_ROOT}/derivatives/bagel.csv" - bagel_df = pd.read_csv(bagel_file) - bagel_df = bagel_df[bagel_df["session"] == session] - bagel_df = bagel_df[(bagel_df["pipeline_name"] == pipeline) & (bagel_df["pipeline_version"] == pipeline_version)] - on_disk_participants = bagel_df[bagel_df["pipeline_complete"]=="SUCCESS"]["bids_id"].unique() + # Get list of on-disk and new participants + # We will completely ignore the on-disk participants in the BIDS index + # We will ignore certain modalities / acq patterns for the new participants (avoid mriqc and fmriprep errors) + # Example ignore patterns: ["/anat/{}_{}_{}_NM"] + + new_proc_participants, on_disk_participants = get_new_proc_participants(global_configs, session_id, pipeline, logger) + n_on_disk_participants = len(on_disk_participants) + logger.info(f"ignoring ({n_on_disk_participants}) n_on_disk_participants from pybids index") + + # Completely ignore these subjects + ignore_subjects = on_disk_participants + ignore_session = f"ses-{session_id}" + ignore_run = f"run-{run_id}" + # Need to have "ses-" appended to the subject_id to avoid wildcard matching + ignore_pattern_list = list(pd.Series(ignore_subjects) + f"/{ignore_session}") + + # Ignore specific sessions and datatypes / acq patterns for these subjects + index_subjects = new_proc_participants + ignore_SRE_patterns = ignore_patterns + + for sub in index_subjects: + for sre_pattern in ignore_SRE_patterns: + sre_pattern = sre_pattern.format(sub, ignore_session, ignore_run) + ignore_pattern = f"{sub}/{ignore_session}{sre_pattern}" + ignore_pattern_list.append(ignore_pattern) - ignore_list = [f"{bids_dir}/{p}" for p in on_disk_participants] - logger.info(f"ignoring {n_on_disk_participants} (n_on_disk_participants) from pybids index") + logger.info(f"ignoring {len(ignore_pattern_list)} subjects + datatype + acq patterns from pybids index") # Check if old db exists if Path.is_dir(Path(bids_db_path)): @@ -217,8 +240,8 @@ def generate_pybids_index(global_configs, session_id, pipeline, logger, bids_db_ # TODO # Check diff against previous index and only update if there are new participants - indexer = BIDSLayoutIndexer(ignore=ignore_list) - layout = BIDSLayout(bids_dir, indexer=indexer) + indexer = BIDSLayoutIndexer(ignore=ignore_pattern_list) + layout = BIDSLayout(bids_dir, indexer=indexer) # Throws deprecation warning indexed_subjects = layout.get(return_type='id', target='subject', suffix='T1w') n_indexed_subjects = len(indexed_subjects) diff --git a/nipoppy/workflow/dicom_org/check_dicom_status.py b/nipoppy/workflow/dicom_org/check_dicom_status.py index 65dc7784..136c5bf4 100755 --- a/nipoppy/workflow/dicom_org/check_dicom_status.py +++ b/nipoppy/workflow/dicom_org/check_dicom_status.py @@ -95,10 +95,24 @@ def run(global_config_file, regenerate=False, empty=False): ' in the dataset (can be slow)' ) + # Check for custom ID maps + # example: participant_id --> bids_id + # TODO: Check if this would work for participant_id --> dicom_dir + if "CUSTOM_ID_MAPS" in global_config.keys(): + custom_id_maps = global_config["CUSTOM_ID_MAPS"] + if "participant_id_to_bids_id" in custom_id_maps.keys(): + map_file = custom_id_maps["participant_id_to_bids_id"] + print(f"Using custom participant_id_to_bids_id mapping from: {map_file}") + else: + map_file = None + # generate bids_id - df_doughnut.loc[:, COL_BIDS_ID_MANIFEST] = df_doughnut[COL_SUBJECT_MANIFEST].apply( - participant_id_to_bids_id - ) + df_status[COL_BIDS_ID_MANIFEST] = df_status.apply( + lambda row: participant_id_to_bids_id( + row[COL_SUBJECT_MANIFEST], + map_file), + axis='columns' + ) # initialize dicom dir (cannot be inferred directly from participant id) df_doughnut.loc[:, COL_PARTICIPANT_DICOM_DIR] = np.nan diff --git a/nipoppy/workflow/proc_pipe/fmriprep/run_fmriprep.py b/nipoppy/workflow/proc_pipe/fmriprep/run_fmriprep.py index a73a9639..558040a8 100644 --- a/nipoppy/workflow/proc_pipe/fmriprep/run_fmriprep.py +++ b/nipoppy/workflow/proc_pipe/fmriprep/run_fmriprep.py @@ -24,6 +24,7 @@ def run_fmriprep(participant_id: str, bids_dir, + proc_dir, fmriprep_dir, fs_dir, templateflow_dir, @@ -37,23 +38,27 @@ def run_fmriprep(participant_id: str, fmriprep_home_dir = f"{fmriprep_out_dir}/fmriprep_home_{participant_id}/" Path(f"{fmriprep_home_dir}").mkdir(parents=True, exist_ok=True) + # BIDS DB created for fmriprep by run_nipoppy.py + bids_db_dir = f"/fmripre_proc/bids_db_fmriprep" + # Singularity CMD SINGULARITY_CMD=f"singularity run \ - -B {bids_dir}:/data_dir \ + -B {bids_dir}:{bids_dir} \ -B {fmriprep_home_dir}:/home/fmriprep --home /home/fmriprep --cleanenv \ -B {fmriprep_out_dir}:/output \ + -B {proc_dir}:/fmripre_proc \ -B {templateflow_dir}:{SINGULARITY_TEMPLATEFLOW_DIR} \ -B {fmriprep_dir}:/work \ -B {fs_dir}:{SINGULARITY_FS_DIR} \ {SINGULARITY_CONTAINER}" # Compose fMRIPrep command - fmriprep_CMD=f" /data_dir /output participant --participant-label {participant_id} \ + fmriprep_CMD=f" {bids_dir} /output participant --participant-label {participant_id} \ -w /work \ --output-spaces MNI152NLin2009cAsym:res-2 anat fsnative \ --fs-subjects-dir {SINGULARITY_FS_DIR} \ --skip_bids_validation \ - --bids-database-dir /work/first_run/bids_db/ \ + --bids-database-dir {bids_db_dir} \ --fs-license-file {SINGULARITY_FS_LICENSE} \ --return-all-components -v \ --write-graph --notrack \ @@ -65,7 +70,7 @@ def run_fmriprep(participant_id: str, # Append optional args if use_bids_filter: logger.info("Using bids_filter.json") - bids_filter_str = "--bids-filter-file /data_dir/bids_filter.json" + bids_filter_str = f"--bids-filter-file /fmripre_proc/bids_filter_fmriprep.json" fmriprep_CMD = f"{fmriprep_CMD} {bids_filter_str}" if anat_only: @@ -124,6 +129,7 @@ def run(participant_id: str, output_dir = f"{DATASET_ROOT}/derivatives/" bids_dir = f"{DATASET_ROOT}/bids/" + proc_dir = f"{DATASET_ROOT}/proc/" fmriprep_dir = f"{output_dir}/fmriprep/v{FMRIPREP_VERSION}" # Check and create session_dirs for freesurfer since it won't happen automatically @@ -137,12 +143,13 @@ def run(participant_id: str, # Copy bids_filter.json `/bids/bids_filter.json` if use_bids_filter: - logger.info(f"Copying ./bids_filter.json to {DATASET_ROOT}/bids/bids_filter.json (to be seen by Singularity container)") - shutil.copyfile(f"{CWD}/bids_filter.json", f"{bids_dir}/bids_filter.json") + logger.info(f"Copying ./bids_filter.json to {proc_dir}/bids_filter_fmriprep.json (to be seen by Singularity container)") + shutil.copyfile(f"{CWD}/bids_filter.json", f"{proc_dir}/bids_filter_fmriprep.json") # launch fmriprep run_fmriprep(participant_id, bids_dir, + proc_dir, fmriprep_dir, fs_dir, TEMPLATEFLOW_DIR, diff --git a/nipoppy/workflow/proc_pipe/mriqc/run_mriqc.py b/nipoppy/workflow/proc_pipe/mriqc/run_mriqc.py index 1541be72..3e3bb298 100644 --- a/nipoppy/workflow/proc_pipe/mriqc/run_mriqc.py +++ b/nipoppy/workflow/proc_pipe/mriqc/run_mriqc.py @@ -5,11 +5,15 @@ from pathlib import Path import os -def run(participant_id, global_configs, session_id, output_dir, modalities, bids_db_path=None, logger=None): +SINGULARITY_TEMPLATEFLOW_DIR = "/templateflow" +os.environ['SINGULARITYENV_TEMPLATEFLOW_HOME'] = SINGULARITY_TEMPLATEFLOW_DIR + +def run(participant_id, global_configs, session_id, output_dir, modalities, bids_db_dir=None, logger=None): """ Runs mriqc command """ DATASET_ROOT = global_configs["DATASET_ROOT"] CONTAINER_STORE = global_configs["CONTAINER_STORE"] + TEMPLATEFLOW_DIR = global_configs["TEMPLATEFLOW_DIR"] MRIQC_CONTAINER = global_configs["PROC_PIPELINES"]["mriqc"]["CONTAINER"] MRIQC_VERSION = global_configs["PROC_PIPELINES"]["mriqc"]["VERSION"] MRIQC_CONTAINER = MRIQC_CONTAINER.format(MRIQC_VERSION) @@ -18,9 +22,15 @@ def run(participant_id, global_configs, session_id, output_dir, modalities, bids bids_dir = f"{DATASET_ROOT}/bids/" proc_dir = f"{DATASET_ROOT}/proc/" - if bids_db_path is None: - bids_db_path = f"{DATASET_ROOT}/proc/bids_db" - bids_db_dir = os.path.basename(bids_db_path) + # logging + log_dir = f"{DATASET_ROOT}/scratch/logs/" + if logger is None: + log_file = f"{log_dir}/mriqc.log" + logger = my_logger.get_logger(log_file) + + if bids_db_dir is None: + bids_db_dir = f"/mriqc_proc/bids_db_mriqc" + logger.info(f"bids_db_dir: {bids_db_dir}") if output_dir is None: @@ -34,13 +44,6 @@ def run(participant_id, global_configs, session_id, output_dir, modalities, bids mriqc_work_dir = f"{output_dir}/mriqc/v{MRIQC_VERSION}/work/" Path(mriqc_work_dir).mkdir(parents=True, exist_ok=True) - # logging - log_dir = f"{DATASET_ROOT}/scratch/logs/" - - if logger is None: - log_file = f"{log_dir}/mriqc.log" - logger = my_logger.get_logger(log_file) - logger.info("Starting mriqc run...") logger.info(f"participant: {participant_id}, session: {session_id}") logger.info(f"bids_dir: {bids_dir}") @@ -49,21 +52,22 @@ def run(participant_id, global_configs, session_id, output_dir, modalities, bids # Singularity CMD SINGULARITY_CMD=f"singularity run \ - -B {bids_dir}:/data:ro \ + -B {bids_dir}:{bids_dir}:ro \ -B {proc_dir}:/mriqc_proc \ -B {mriqc_output_dir}:/out \ -B {mriqc_work_dir}:/work \ + -B {TEMPLATEFLOW_DIR}:{SINGULARITY_TEMPLATEFLOW_DIR} \ {SINGULARITY_CONTAINER} " # Compose mriqc command modalities_str = " ".join(modalities) - MRIQC_CMD=f"/data /out participant \ + MRIQC_CMD=f"{bids_dir} /out participant \ --participant-label {participant_id} \ --session-id {session_id} \ --modalities {modalities_str} \ --no-sub \ --work-dir /work \ - --bids-database-dir /mriqc_proc/{bids_db_dir}" + --bids-database-dir {bids_db_dir}" # --bids-database-wipe" # wiping and regerating bids db with catalog.py CMD_ARGS = SINGULARITY_CMD + MRIQC_CMD diff --git a/nipoppy/workflow/utils.py b/nipoppy/workflow/utils.py index 4e48e1f2..5c58f2b6 100644 --- a/nipoppy/workflow/utils.py +++ b/nipoppy/workflow/utils.py @@ -49,8 +49,13 @@ def participant_id_to_dicom_id(participant_id): def dicom_id_to_bids_id(dicom_id): return f'{BIDS_SUBJECT_PREFIX}{dicom_id}' -def participant_id_to_bids_id(participant_id): - return dicom_id_to_bids_id(participant_id_to_dicom_id(participant_id)) +def participant_id_to_bids_id(participant_id, custom_map=None): + if custom_map == None: + bids_id = dicom_id_to_bids_id(participant_id_to_dicom_id(participant_id)) + else: + _df = pd.read_csv(custom_map) + bids_id = _df.loc[(_df["participant_id"]==participant_id)]["bids_id"].values[0] + return bids_id def session_id_to_bids_session(session_id): # add BIDS prefix if it doesn't already exist diff --git a/tests/test_workflow_fmriprep.py b/tests/test_workflow_fmriprep.py index fb5211ee..d4fa52cc 100644 --- a/tests/test_workflow_fmriprep.py +++ b/tests/test_workflow_fmriprep.py @@ -68,15 +68,17 @@ def test_run_fmriprep(tmp_path, use_bids_filter, anat_only): log_file = tmp_path / "fmriprep.log" bids_dir = "bids_dir" + proc_dir = "fmripre_proc" fmriprep_dir = tmp_path / "fmriprep_dir" fs_dir = "fs_dir" templateflow_dir = "templateflow_dir" participant_id = "01" - singularity_container = "fmriprep.simg" + singularity_container = "fmriprep.sif" cmd = run_fmriprep( participant_id=participant_id, bids_dir=bids_dir, + proc_dir=proc_dir, fmriprep_dir=fmriprep_dir, fs_dir=fs_dir, templateflow_dir=templateflow_dir, @@ -88,8 +90,9 @@ def test_run_fmriprep(tmp_path, use_bids_filter, anat_only): # fmt: off expected_cmd = [ 'singularity', 'run', - '-B', f'{bids_dir}:/data_dir', - '-B', f'{fmriprep_dir}/output//fmriprep_home_{participant_id}/:/home/fmriprep', + '-B', f'{bids_dir}:{bids_dir}', + '-B', f'{proc_dir}:/fmriprep_proc', + '-B', f'{fmriprep_dir}/output/fmriprep_home_{participant_id}/:/home/fmriprep', '--home', '/home/fmriprep', '--cleanenv', '-B', f'{fmriprep_dir}/output/:/output', @@ -102,7 +105,7 @@ def test_run_fmriprep(tmp_path, use_bids_filter, anat_only): '--output-spaces', 'MNI152NLin2009cAsym:res-2', 'anat', 'fsnative', '--fs-subjects-dir', '/fsdir/', '--skip_bids_validation', - '--bids-database-dir', '/work/first_run/bids_db/', + '--bids-database-dir', '/fmripre_proc/bids_db_fmriprep', '--fs-license-file', '/fsdir/license.txt', '--return-all-components', '-v', @@ -114,7 +117,7 @@ def test_run_fmriprep(tmp_path, use_bids_filter, anat_only): # fmt: on if use_bids_filter: - expected_cmd += ["--bids-filter-file", "/data_dir/bids_filter.json"] + expected_cmd += ["--bids-filter-file", "/fmripre_proc/bids_filter_fmriprep.json"] if anat_only: expected_cmd += ["--anat-only"]