Skip to content

Commit

Permalink
Update to match neurodatascience/nipoppy:main (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
michellewang authored Oct 3, 2023
2 parents b743a09 + 639517e commit 9c54a4a
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 167 deletions.
2 changes: 1 addition & 1 deletion nipoppy/extractors/freesurfer/run_FS_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,5 @@ def run(FS_dir, participants_list, out_file, meas, fwhm, template):
print("-"*50)
else:
print("-"*50)
print("No partcipants found to process...")
print("No participants found to process...")
print("-"*50)
162 changes: 156 additions & 6 deletions nipoppy/sample_run_nipoppy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,21 @@
from pathlib import Path
import argparse
import json
import pandas as pd
import shutil
import numpy as np
from glob import glob
from joblib import Parallel, delayed
import nipoppy.workflow.logger as my_logger
# from nipoppy.workflow.tabular import generate_manifest
from nipoppy.workflow.dicom_org import run_dicom_org
from nipoppy.workflow.dicom_org import check_dicom_status
from nipoppy.workflow.bids_conv import run_bids_conv
from nipoppy.workflow.proc_pipe.mriqc import run_mriqc
from nipoppy.workflow.proc_pipe.fmriprep import run_fmriprep
from nipoppy.workflow.catalog import get_new_proc_participants
from nipoppy.workflow.catalog import generate_pybids_index
from nipoppy.trackers import run_tracker

# argparse
HELPTEXT = """
Expand All @@ -26,30 +38,168 @@
log_dir = f"{DATASET_ROOT}/scratch/logs/"
log_file = f"{log_dir}/nipoppy.log"

# Used to run trackers to identify new participants to process
dash_schema_file = f"{DATASET_ROOT}/proc/bagel_schema.json"

# bids_db_path
# TODO create a common BIDS_DB for mriqc and fmriprep once relative_paths issue is resolved
FMRIPREP_VERSION = global_configs["PROC_PIPELINES"]["fmriprep"]["VERSION"]
output_dir = f"{DATASET_ROOT}/derivatives/"
fmriprep_dir = f"{output_dir}/fmriprep/v{FMRIPREP_VERSION}"
# bids_db_path = f"{fmriprep_dir}/first_run/bids_db/"

session_id = args.session_id
session = f"ses-{session_id}"

n_jobs = args.n_jobs
MAX_BATCH = 10 # Max number of participants to run BEFORE cleaning up intermediate files

logger = my_logger.get_logger(log_file)
logger = my_logger.get_logger(log_file, level="INFO")

logger.info("-"*75)
logger.info(f"Starting nipoppy for {DATASET_ROOT} dataset...")
logger.info(f"dataset session (i.e visit): {session_id}")
logger.info("-"*75)

logger.info(f"dataset session (i.e visit): {session}")
logger.info(f"Running {n_jobs} jobs in parallel")

workflows = global_configs["WORKFLOWS"]
logger.info(f"Running {workflows} serially")
logger.info(f"Running workflows: {workflows} serially")

for wf in workflows:
logger.info("-"*50)
logger.info(f"Starting workflow: {wf}")
if wf == "dicom_org":
run_dicom_org.run(global_configs, session_id, n_jobs=n_jobs)
logger.info("-"*50)

if wf == "generate_manifest":
logger.info(f"***All sessions are fetched while generating manifest***")
# generate_manifest.run(global_configs, task="regenerate", dash_bagel=True, logger=logger)
check_dicom_status.run(global_config_file, regenerate=True, empty=False)

elif wf == "dicom_org":
run_dicom_org.run(global_configs, session_id, n_jobs=n_jobs, logger=logger)

elif wf == "bids_conv":
run_bids_conv.run(global_configs, session_id, n_jobs=n_jobs)
run_bids_conv.run(global_configs, session_id, n_jobs=n_jobs, logger=logger)

elif wf == "mriqc":
# Supported modalities (i.e. suffixes) for MRIQC
modalities = ["T1w", "T2w"]
ignore_patterns = ["/anat/{}_{}_acq-NM_{}","/anat/{}_{}_{}_FLAIR",
"/fmap/",
"/swi/",
"/perf"]

# Run mriqc tracker to regenerate bagel
run_tracker.run(global_configs, dash_schema_file, ["mriqc"], logger=logger)

proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline="mriqc", logger=logger)
n_proc_participants = len(proc_participants)

if n_proc_participants > 0:
logger.info(f"Running MRIQC on {n_proc_participants} participants from session: {session} and for modalities: {modalities}")
# The default bids_db_path is proc/bids_db_{pipeline}
bids_db_path = generate_pybids_index(global_configs, session_id, pipeline="mriqc",
ignore_patterns=ignore_patterns, logger=logger)
logger.info(f"bids_db_path: {bids_db_path}")
# bids_db_path = None

if n_jobs > 1:
# Process in parallel! (Won't write to logs)
mriqc_results = Parallel(n_jobs=n_jobs)(delayed(run_mriqc.run)(
global_configs=global_configs, session_id=session_id, participant_id=participant_id,
modalities=modalities, output_dir=None, logger=logger)
for participant_id in proc_participants)

else:
# Useful for debugging
mriqc_results = []
for participant_id in proc_participants:
res = run_mriqc.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id,
modalities=modalities, output_dir=None, logger=logger)
mriqc_results.append(res)

# Rerun tracker for updated bagel
run_tracker.run(global_configs, dash_schema_file, ["mriqc"], logger=logger)

else:
logger.info(f"No new participants to run MRIQC on for session: {session}")

elif wf == "fmriprep":
ignore_patterns = ["/anat/{}_{}_{}_NM","/anat/{}_{}_{}_echo",
"/dwi/"
"/swi/",
"/perf"]
# Run fmriprep tracker to regenerate bagel
run_tracker.run(global_configs, dash_schema_file, ["fmriprep"], logger=logger)

proc_participants, _ = get_new_proc_participants(global_configs, session_id, pipeline="fmriprep", logger=logger)
n_proc_participants = len(proc_participants)

if n_proc_participants > 0:
# remove old bids_db
sql_db_file = f"{DATASET_ROOT}/proc/bids_db_fmriprep/bids_db.sqlite"
logger.info(f"Removing old bids_db from {sql_db_file}")
if os.path.exists(sql_db_file):
os.remove(sql_db_file)

logger.info(f"Running fmriprep on {n_proc_participants} participants from session: {session} and for modalities: {modalities}")
# The default bids_db_path is proc/bids_db_{pipeline}
bids_db_path = generate_pybids_index(global_configs, session_id, pipeline="fmriprep",
ignore_patterns=ignore_patterns, logger=logger)
logger.info(f"bids_db_path: {bids_db_path}")

# Don't run more than MAX_BATCH participants in parallel
# Also clean-up after MAX_BATCH participants to avoid storage issues
if n_proc_participants > MAX_BATCH:
n_batches = int(np.ceil(n_proc_participants/MAX_BATCH))
logger.info(f"Running fmriprep in {n_batches} batches of at most {MAX_BATCH} participants each")
proc_participant_batches = np.array_split(proc_participants, n_batches)

else:
proc_participant_batches = [proc_participants]

for proc_participant_batch in proc_participant_batches:
proc_participant_batch = list(proc_participant_batch)
logger.info(f"Running fmriprep on participants: {proc_participant_batch}")
if n_jobs > 1:
# Process in parallel! (Won't write to logs)
fmiprep_results = Parallel(n_jobs=n_jobs)(delayed(run_fmriprep.run)(
global_configs=global_configs, session_id=session_id, participant_id=participant_id,
output_dir=None, anat_only=False, use_bids_filter=True, logger=logger)
for participant_id in proc_participant_batch)

else:
# Useful for debugging
fmiprep_results = []
for participant_id in proc_participant_batch:
res = run_fmriprep.run(global_configs=global_configs, session_id=session_id, participant_id=participant_id,
output_dir=None, anat_only=False, use_bids_filter=True, logger=logger)
fmiprep_results.append(res)

# Clean up intermediate files
logger.info(f"Cleaning up intermediate files from {fmriprep_dir}")
fmriprep_wf_dir = glob(f"{fmriprep_dir}/fmriprep*wf")
subject_home_dirs = glob(f"{fmriprep_dir}/output/fmriprep_home_sub-*")
run_toml_dirs = glob(f"{fmriprep_dir}/2023*")

logger.info(f"fmriprep_wf_dir:\n{fmriprep_wf_dir}")
logger.info(f"subject_home_dirs:\n{subject_home_dirs}")
logger.info(f"run_toml_dirs:\n{run_toml_dirs}")

for cleanup_dir in fmriprep_wf_dir + subject_home_dirs + run_toml_dirs:
shutil.rmtree(cleanup_dir)

# Rerun tracker for updated bagel
run_tracker.run(global_configs, dash_schema_file, ["fmriprep"], logger=logger)

else:
logger.error(f"Unknown workflow: {wf}")

logger.info("-"*50)
logger.info(f"Finishing workflow: {wf}")
logger.info("-"*50)

logger.info("-"*75)
logger.info(f"Finishing nipoppy run...")
logger.info("-"*75)
179 changes: 90 additions & 89 deletions nipoppy/trackers/bids_tracker.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,95 @@
import numpy as np
import pandas as pd
from pathlib import Path
import argparse
import glob
from bids import BIDSLayout
import os


HELPTEXT = """
Script to check participant-session availability
"""
#Author: nikhil153
#Date: 1-Dec-2022

modality_suffic_dict = {
"anat": "T1w",
"dwi": "dwi"
# Status flags
from nipoppy.trackers.tracker import (
SUCCESS,
FAIL
)

from nipoppy.workflow.utils import participant_id_to_dicom_id

# File dicts per BIDS datatype
# "datatype":"suffix"
files_dict = {
"anat": ["T1w"],
"dwi": ["dwi"],
"fmap": ["phasediff", "magnitude1", "magnitude2"],
"func": ["bold"]
}

# argparse
parser = argparse.ArgumentParser(description=HELPTEXT)

# data
parser.add_argument('--bids_dir', help='path to bids_dir with all the subjects')
parser.add_argument('--modalities', nargs='*', default=["anat"],
help='modalities to check')
parser.add_argument('--file_ext', default='nii.gz', help='file extension to query')
parser.add_argument('--output_csv', help='path to output csv file')

args = parser.parse_args()
bids_dir = args.bids_dir
modalities = args.modalities
file_ext = args.file_ext

print(f"Validating output in: {modalities}")

output_csv = args.output_csv
participants_tsv = f"{bids_dir}/participants.tsv"

# Check participants tsv and actual participant dirs
tsv_participants = set(pd.read_csv(participants_tsv,sep="\t")["participant_id"].values)
bids_dir_paths = glob.glob(f"{bids_dir}/sub*")
bids_dir_participants = set([os.path.basename(x) for x in bids_dir_paths])

participants_missing_in_tsv = list(bids_dir_participants - tsv_participants)
participants_missing_in_bids_dir = list(tsv_participants - bids_dir_participants)

print(f"n_participants_tsv: {len(tsv_participants)}, \
n_participants_bids_dir: {len(bids_dir_participants)}, \
n_participants_missing_in_tsv: {len(participants_missing_in_tsv)}, \
n_participants_missing_in_bids_dir: {len(participants_missing_in_bids_dir)}")

if tsv_participants == bids_dir_participants:
layout = BIDSLayout(bids_dir)
sessions = layout.get_sessions()

bids_status_df = pd.DataFrame()
for participant in tsv_participants:
participant_id = participant.split("-",2)[1]

session_df = pd.DataFrame(index=sessions, columns=modalities)
for ses in sessions:
f_count = []
for modality in modalities:
file_suffix = modality_suffic_dict[modality]
f = layout.get(subject=participant_id,
session=ses,
extension=file_ext,
suffix=file_suffix,
return_type='filename')

f_count.append(len(f))

session_df.loc[ses] = f_count

session_df = session_df.reset_index().rename(columns={"index":"session_id"})
session_df["participant_id"] = participant
bids_status_df = bids_status_df.append(session_df)

print(f"Saving bids_status_df at {output_csv}")
bids_status_df = bids_status_df.set_index("participant_id")
bids_status_df.to_csv(output_csv)

else:
print(f"participants_tsv and bids_dir participants mismatch...")
output_csv = os.path.join(os.path.dirname(output_csv) + "/mismatched_participants.csv")
missing_tsv_status = len(participants_missing_in_tsv) * ["participants_missing_in_tsv"]
missing_bids_status = len(participants_missing_in_bids_dir) * ["participants_missing_in_bids_dir"]
missing_df = pd.DataFrame()
missing_df["participant_id"] = participants_missing_in_tsv + participants_missing_in_bids_dir
missing_df["status"] = missing_tsv_status + missing_bids_status
print(f"Saving missing participants csv at {output_csv}")
missing_df.to_csv(output_csv,index=None)
def check_staus(bids_layout, participant_id, session_id, run_id, datatype):
# Remove non-alphanumeric characters from participant_id
participant_id = participant_id_to_dicom_id(participant_id)
suffix_list = files_dict[datatype]
filepath_status_list = []
for suffix in suffix_list:
scan_file = bids_layout.get(subject=participant_id,
session=session_id,
datatype=datatype,
run=run_id,
suffix=suffix,
extension='nii.gz')

sidecar_file = bids_layout.get(subject=participant_id,
session=session_id,
datatype=datatype,
run=run_id,
suffix=suffix,
extension='json')


if (len(scan_file) > 0) & (len(sidecar_file) > 0):
filepath_status = (Path.is_file(Path(scan_file[0].path))) & (Path.is_file(Path(sidecar_file[0].path)))
else:
filepath_status = False

filepath_status_list.append(filepath_status)

if not any(filepath_status_list):
status_msg = FAIL
else:
status_msg = SUCCESS

return status_msg

def check_T1w(bids_layout, participant_id, session_id, run_id):
datatype = "anat"
status = check_staus(bids_layout, participant_id, session_id, run_id, datatype)
return status

def check_dwi(bids_layout, participant_id, session_id, run_id):
datatype = "dwi"
status = check_staus(bids_layout, participant_id, session_id, run_id, datatype)
return status

def check_fmap(bids_layout, participant_id, session_id, run_id):
datatype = "fmap"
status = check_staus(bids_layout, participant_id, session_id, run_id, datatype)
return status

def check_func(bids_layout, participant_id, session_id, run_id):
datatype = "func"
status = check_staus(bids_layout, participant_id, session_id, run_id, datatype)
return status

def check_structural(bids_layout, participant_id, session_id, run_id):
T1_status = check_T1w(bids_layout, participant_id, session_id, run_id)
dwi_status = check_dwi(bids_layout, participant_id, session_id, run_id)
if (T1_status == SUCCESS) & (dwi_status == SUCCESS):
status = SUCCESS
else:
status = FAIL

return status

tracker_configs = {
"pipeline_complete": check_structural,

"PHASE__": {
"anat": check_T1w,
"dwi": check_dwi,
"fmap": check_fmap,
"func": check_func
}
}
Loading

0 comments on commit 9c54a4a

Please sign in to comment.