diff --git a/.gitmodules b/.gitmodules index 4a838f24..e69de29b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +0,0 @@ -[submodule "nipoppy/workflow/proc_pipe/tractoflow/tractoflow"] - path = nipoppy/workflow/proc_pipe/tractoflow/tractoflow - url = https://github.com/scilus/tractoflow.git - datalad-url = https://github.com/scilus/tractoflow.git diff --git a/nipoppy/trackers/tractoflow_tracker.py b/nipoppy/trackers/tractoflow_tracker.py index 591cd6e1..ed5fa7bb 100644 --- a/nipoppy/trackers/tractoflow_tracker.py +++ b/nipoppy/trackers/tractoflow_tracker.py @@ -7,58 +7,27 @@ INCOMPLETE="INCOMPLETE" UNAVAILABLE="UNAVAILABLE" -## dictionary for all output stages / file stems from TractoFlow in execution order - +# dictionary for most important stages to track a "complete" pipeline TractoFlow_Stages = { - "All": [ 'Bet_Prelim_DWI', 'Denoise_DWI', 'Eddy', 'Topup', 'Eddy_Topup', 'Bet_DWI', 'N4_DWI', 'Crop_DWI', 'Normalize_DWI', 'Extract_B0', 'Resample_B0', 'Resample_DWI', 'Denoise_T1', 'N4_T1', 'Resample_T1', 'Bet_T1', 'Crop_T1', 'Register_T1', 'Segment_Tissues', 'Extract_DTI_Shell', 'Extract_FODF_Shell', 'DTI_Metrics', 'FODF_Metrics', 'Compute_FRF', 'PFT_Tracking_Maps', 'PFT_Seeding_Mask', 'PFT_Tracking' ], - "DWIPreproc": [ 'Bet_Prelim_DWI', 'Denoise_DWI', 'Eddy', 'Topup', 'Eddy_Topup', 'Bet_DWI', 'N4_DWI', 'Crop_DWI', 'Normalize_DWI', 'Extract_B0', 'Resample_B0', 'Resample_DWI' ], - "DWIPreprocEddyTopup": [ 'Bet_Prelim_DWI', 'Denoise_DWI', 'Eddy', 'Topup', 'Eddy_Topup' ], - "DWIPreprocResampled":[ 'Bet_DWI', 'N4_DWI', 'Crop_DWI', 'Normalize_DWI', 'Extract_B0', 'Resample_B0', 'Resample_DWI' ], - "AnatPreproc": [ 'Denoise_T1', 'N4_T1', 'Resample_T1', 'Bet_T1', 'Crop_T1', 'Register_T1', 'Segment_Tissues' ], - "AnatResample": [ 'Denoise_T1', 'N4_T1', 'Resample_T1' ], - "AnatSegment": [ 'Bet_T1', 'Crop_T1', 'Register_T1', 'Segment_Tissues' ], - "DWIModel": [ 'Extract_DTI_Shell', 'Extract_FODF_Shell', 'DTI_Metrics', 'FODF_Metrics', 'Compute_FRF' ], - "DWITensor": [ 'Extract_DTI_Shell', 'DTI_Metrics' ], - "DWIFODF": [ 'Extract_FODF_Shell', 'FODF_Metrics', 'Compute_FRF' ], - "Tracking": [ 'PFT_Tracking_Maps', 'PFT_Seeding_Mask', 'PFT_Tracking' ] + "All": [ 'Extract_B0', 'Resample_DWI', 'Register_T1', 'Segment_Tissues', 'Extract_DTI_Shell', 'Extract_FODF_Shell', 'DTI_Metrics', 'FODF_Metrics', 'Compute_FRF', 'PFT_Tracking_Maps', 'PFT_Seeding_Mask', 'PFT_Tracking' ] } -# , 'Local_Tracking_Mask', 'Local_Seeding_mask', 'Local_Tracking' - +# the most critical file stems within each output to check TractoFlow_Procs = { - "Bet_Prelim_DWI": [ '__b0_bet_mask_dilated.nii.gz', '__b0_bet_mask.nii.gz', '__b0_bet.nii.gz' ], - "Denoise_DWI": [ '__dwi_denoised.nii.gz' ], - "Eddy": [ '__bval_eddy', '__dwi_corrected.nii.gz', '__dwi_eddy_corrected.bvec' ], - "Topup": [ '__corrected_b0s.nii.gz', '__rev_b0_warped.nii.gz', 'topup_results_fieldcoef.nii.gz', 'topup_results_movpar.txt' ], - "Eddy_Topup": [ '__b0_bet_mask.nii.gz', '__bval_eddy', '__dwi_corrected.nii.gz', '__dwi_eddy_corrected.bvec' ], - "Bet_DWI": [ '__b0_bet_mask.nii.gz', '__b0_bet.nii.gz', '__dwi_bet.nii.gz' ], - "N4_DWI": [ '__dwi_n4.nii.gz' ], - "Crop_DWI": [ '__b0_cropped.nii.gz', '__b0_mask_cropped.nii.gz', '__dwi_cropped.nii.gz' ], - "Normalize_DWI": [ '__dwi_normalized.nii.gz', '_fa_wm_mask.nii.gz' ], - "Extract_B0": [ '__b0.nii.gz' ], - "Resample_B0": [ '__b0_mask_resampled.nii.gz', '__b0_resampled.nii.gz' ], + "Extract_B0": [ '__b0_mask_resampled.nii.gz', '__b0_resampled.nii.gz' ], "Resample_DWI": [ '__dwi_resampled.nii.gz' ], "Extract_DTI_Shell": [ '__bval_dti', '__bvec_dti', '__dwi_dti.nii.gz' ], "Extract_FODF_Shell": [ '__bval_fodf', '__bvec_fodf', '__dwi_fodf.nii.gz' ], - "DTI_Metrics": [ '__ad.nii.gz', '__evecs.nii.gz', '__ga.nii.gz', '__pulsation_std_dwi.nii.gz', '__residual_q1_residuals.npy', '__tensor.nii.gz', '__evals_e1.nii.gz', '__evecs_v1.nii.gz', '__md.nii.gz', '__rd.nii.gz', '__residual_q3_residuals.npy', '__evals_e2.nii.gz', '__evecs_v2.nii.gz', '__mode.nii.gz', '__residual_iqr_residuals.npy', '__residual_residuals_stats.png', '__evals_e3.nii.gz', '__evecs_v3.nii.gz', '__nonphysical.nii.gz', '__residual_mean_residuals.npy', '__residual_std_residuals.npy', '__evals.nii.gz', '__fa.nii.gz', '__norm.nii.gz', '__residual.nii.gz', '__rgb.nii.gz' ], + "DTI_Metrics": [ '__ad.nii.gz', '__evecs.nii.gz', '__ga.nii.gz', '__tensor.nii.gz', '__md.nii.gz', '__rd.nii.gz', '__mode.nii.gz', '__evals.nii.gz', '__fa.nii.gz', '__norm.nii.gz', '__residual.nii.gz', '__rgb.nii.gz' ], "FODF_Metrics": [ '__afd_max.nii.gz', '__afd_sum.nii.gz', '__afd_total.nii.gz', '__fodf.nii.gz', '__nufo.nii.gz', '__peak_indices.nii.gz', '__peaks.nii.gz' ], "Compute_FRF": [ '__frf.txt' ], - "Denoise_T1": [ '__t1_denoised.nii.gz' ], - "N4_T1": [ '__t1_n4.nii.gz' ], - "Resample_T1": [ '__t1_resampled.nii.gz' ], - "Bet_T1": [ '__t1_bet_mask.nii.gz', '__t1_bet.nii.gz' ], - "Crop_T1": [ '__t1_bet_cropped.nii.gz', '__t1_bet_mask_cropped.nii.gz' ], "Register_T1": [ '__output0GenericAffine.mat', '__output1InverseWarp.nii.gz', '__output1Warp.nii.gz', '__t1_mask_warped.nii.gz', '__t1_warped.nii.gz' ], "Segment_Tissues": [ '__map_csf.nii.gz', '__map_gm.nii.gz', '__map_wm.nii.gz', '__mask_csf.nii.gz', '__mask_gm.nii.gz', '__mask_wm.nii.gz' ], "PFT_Tracking_Maps": [ '__interface.nii.gz', '__map_exclude.nii.gz', '__map_include.nii.gz' ], "PFT_Seeding_Mask": [ '__pft_seeding_mask.nii.gz' ], - "PFT_Tracking": [ '__pft_wholebrain_tracking.trk' ] + "PFT_Tracking": [ '__pft_tracking_prob_wm_seed_0.trk' ] } - # "Local_Tracking_Mask": [ '__?' ], - # "Local_Seeding_Mask": [ '__?' ], - # "Local_Tracking": [ '__?' ] - ## ## define functions to check if the files exist / stages complete ## @@ -74,9 +43,6 @@ def check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_ ## default to incomplete status_msg = UNAVAILABLE - ## turn off local tracking - configure as an optional step? - doLocalTracking = False - ## the valid options for task if not (task in list(stage_dict.keys())): raise ValueError("The requested report is not recognized.") @@ -84,26 +50,6 @@ def check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_ ## pull the processes to check files for procs = stage_dict[task] - ## check if Topup output exists - if it does, drop Eddy, otherwise drop Topup/Eddy_Topup - if os.path.exists(subject_dir + '/Topup'): - try: - procs.remove('Eddy') - except ValueError: - pass - else: - try: - procs.remove('Topup') - procs.remove('Eddy_Topup') - except ValueError: - pass - - # ## drop local tracking - # if (task=='Tracking'): - # if not doLocalTracking: - # procs.remove('Local_Tracking_Mask') - # procs.remove('Local_Seeding_Mask') - # procs.remove('Local_Tracking') - ## build the filepaths of the files that are supposed to exist files = [] for proc in procs: @@ -115,6 +61,10 @@ def check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_ ## build logical if files exist filesExist = [ os.path.exists(out) for out in files ] + + ## print missing files + #from itertools import compress + #print(list(compress(files, [not x for x in filesExist]))) ## fill in possible status files if any(filesExist): @@ -130,68 +80,68 @@ def check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_ ## return status return status_msg -def check_dwiPreproc(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIPreproc') - return status_msg - -def check_anatPreproc(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='AnatPreproc') - return status_msg - -def check_dwiModel(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIModel') - return status_msg - -def check_pftTracking(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='Tracking') - return status_msg - -def check_dwiPreprocEddyTopup(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIPreprocEddyTopup') - return status_msg - -def check_dwiNormalize(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIPreprocResampled') - return status_msg - -def check_anatReorient(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='AnatResample') - return status_msg - -def check_anatTracking(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='AnatSegment') - return status_msg - -def check_dwiModelTensor(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWITensor') - return status_msg - -def check_dwiModelFODF(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here - """ - status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIFODF') - return status_msg +# def check_dwiPreproc(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIPreproc') +# return status_msg + +# def check_anatPreproc(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='AnatPreproc') +# return status_msg + +# def check_dwiModel(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIModel') +# return status_msg + +# def check_pftTracking(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='Tracking') +# return status_msg + +# def check_dwiPreprocEddyTopup(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIPreprocEddyTopup') +# return status_msg + +# def check_dwiNormalize(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIPreprocResampled') +# return status_msg + +# def check_anatReorient(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='AnatResample') +# return status_msg + +# def check_anatTracking(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='AnatSegment') +# return status_msg + +# def check_dwiModelTensor(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWITensor') +# return status_msg + +# def check_dwiModelFODF(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): +# """ docstring here +# """ +# status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='DWIFODF') +# return status_msg def check_tf_final(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages): - """ docstring here + """ Call the function to check for output files with the parameters set to check all the stages """ status_msg = check_tf_output(subject_dir, session_id, run_id, file_check_dict=TractoFlow_Procs, stage_dict=TractoFlow_Stages, task='All') return status_msg @@ -199,19 +149,75 @@ def check_tf_final(subject_dir, session_id, run_id, file_check_dict=TractoFlow_P ## the dictionary to return with the inspected outputs tracker_configs = { - "pipeline_complete": check_tf_final, - "PHASE_": { - "DWI-Preprocessing": check_dwiPreproc, - "Anat-Preprocessing": check_anatPreproc, - "DWI-ModelFitting": check_dwiModel, - "PFT-Tracking": check_pftTracking - }, - "STAGE_": { - "DWI-Preproc-EddyTopup": check_dwiPreprocEddyTopup, - "DWI-Preproc-Normalize": check_dwiNormalize, - "Anat-Preproc-Reorient": check_anatReorient, - "Anat-Preproc-Tracking": check_anatTracking, - "DWI-ModelFitting-Tensor": check_dwiModelTensor, - "DWI-ModelFitting-fODF": check_dwiModelFODF - } + "pipeline_complete": check_tf_final } + + +## keep complete lists because I don't want to have to dig them out of commit history + +# TractoFlow_Stages = { +# "All": [ 'Denoise_DWI', 'Gibbs_correction', 'Prepare_for_Topup', 'Prepare_dwi_for_eddy', 'Eddy', 'Topup', 'Eddy_Topup', 'Bet_DWI', 'N4_DWI', 'Crop_DWI', 'Normalize_DWI', 'Extract_B0', 'Resample_DWI', 'Denoise_T1', 'N4_T1', 'Resample_T1', 'Bet_T1', 'Crop_T1', 'Register_T1', 'Segment_Tissues', 'Extract_DTI_Shell', 'Extract_FODF_Shell', 'DTI_Metrics', 'FODF_Metrics', 'Compute_FRF', 'PFT_Tracking_Maps', 'PFT_Seeding_Mask', 'PFT_Tracking' ], +# "DWIPreproc": [ 'Denoise_DWI', 'Eddy', 'Topup', 'Eddy_Topup', 'Bet_DWI', 'N4_DWI', 'Crop_DWI', 'Normalize_DWI', 'Extract_B0', 'Resample_B0', 'Resample_DWI' ], +# "DWIPreprocEddyTopup": [ 'Denoise_DWI', 'Gibbs_correction', 'Prepare_for_Topup', 'Prepare_dwi_for_eddy', 'Eddy', 'Topup', 'Eddy_Topup' ], +# "DWIPreprocResampled":[ 'Bet_DWI', 'N4_DWI', 'Crop_DWI', 'Normalize_DWI', 'Extract_B0', 'Resample_B0', 'Resample_DWI' ], +# "AnatPreproc": [ 'Denoise_T1', 'N4_T1', 'Resample_T1', 'Bet_T1', 'Crop_T1', 'Register_T1', 'Segment_Tissues' ], +# "AnatResample": [ 'Denoise_T1', 'N4_T1', 'Resample_T1' ], +# "AnatSegment": [ 'Bet_T1', 'Crop_T1', 'Register_T1', 'Segment_Tissues' ], +# "DWIModel": [ 'Extract_DTI_Shell', 'Extract_FODF_Shell', 'DTI_Metrics', 'FODF_Metrics', 'Compute_FRF' ], +# "DWITensor": [ 'Extract_DTI_Shell', 'DTI_Metrics' ], +# "DWIFODF": [ 'Extract_FODF_Shell', 'FODF_Metrics', 'Compute_FRF' ], +# "Tracking": [ 'PFT_Tracking_Maps', 'PFT_Seeding_Mask', 'PFT_Tracking' ] +# } + +# # , 'Local_Tracking_Mask', 'Local_Seeding_mask', 'Local_Tracking' + +# TractoFlow_Procs = { +# "Bet_Prelim_DWI": [ '__b0_bet_mask_dilated.nii.gz', '__b0_bet_mask.nii.gz', '__b0_bet.nii.gz' ], +# "Bet_DWI": [ '__b0_bet_mask.nii.gz', '__b0_bet.nii.gz', '__b0_no_bet.nii.gz', '__dwi_bet.nii.gz' ], +# "Denoise_DWI": [ '__dwi_denoised.nii.gz' ], +# "Gibbs_correction": [ '__dwi_gibbs_corrected.nii.gz' ], +# "Prepare_for_Topup": [ '__b0_mean.nii.gz' ], +# "Prepare_dwi_for_eddy": [ '__?' ], +# "Eddy": [ '__bval_eddy', '__dwi_corrected.nii.gz', '__dwi_eddy_corrected.bvec' ], +# "Topup": [ '__corrected_b0s.nii.gz', '__rev_b0_warped.nii.gz', 'topup_results_fieldcoef.nii.gz', 'topup_results_movpar.txt' ], +# "Eddy_Topup": [ '__b0_bet_mask.nii.gz', '__bval_eddy', '__dwi_corrected.nii.gz', '__dwi_eddy_corrected.bvec' ], +# "Bet_DWI": [ '__b0_bet_mask.nii.gz', '__b0_bet.nii.gz', '__dwi_bet.nii.gz' ], +# "N4_DWI": [ '__dwi_n4.nii.gz' ], +# "Crop_DWI": [ '__b0_cropped.nii.gz', '__b0_mask_cropped.nii.gz', '__dwi_cropped.nii.gz' ], +# "Normalize_DWI": [ '__dwi_normalized.nii.gz', '_fa_wm_mask.nii.gz' ], +# "Extract_B0": [ '__b0_mask_resampled.nii.gz', '__b0_resampled.nii.gz' ], +# "Resample_DWI": [ '__dwi_resampled.nii.gz' ], +# "Extract_DTI_Shell": [ '__bval_dti', '__bvec_dti', '__dwi_dti.nii.gz' ], +# "Extract_FODF_Shell": [ '__bval_fodf', '__bvec_fodf', '__dwi_fodf.nii.gz' ], +# "DTI_Metrics": [ '__ad.nii.gz', '__evecs.nii.gz', '__ga.nii.gz', '__pulsation_std_dwi.nii.gz', '__residual_q1_residuals.npy', '__tensor.nii.gz', '__evals_e1.nii.gz', '__evecs_v1.nii.gz', '__md.nii.gz', '__rd.nii.gz', '__residual_q3_residuals.npy', '__evals_e2.nii.gz', '__evecs_v2.nii.gz', '__mode.nii.gz', '__residual_iqr_residuals.npy', '__residual_residuals_stats.png', '__evals_e3.nii.gz', '__evecs_v3.nii.gz', '__nonphysical.nii.gz', '__residual_mean_residuals.npy', '__residual_std_residuals.npy', '__evals.nii.gz', '__fa.nii.gz', '__norm.nii.gz', '__residual.nii.gz', '__rgb.nii.gz' ], +# "FODF_Metrics": [ '__afd_max.nii.gz', '__afd_sum.nii.gz', '__afd_total.nii.gz', '__fodf.nii.gz', '__nufo.nii.gz', '__peak_indices.nii.gz', '__peaks.nii.gz' ], +# "Compute_FRF": [ '__frf.txt' ], +# "Denoise_T1": [ '__t1_denoised.nii.gz' ], +# "N4_T1": [ '__t1_n4.nii.gz' ], +# "Resample_T1": [ '__t1_resampled.nii.gz' ], +# "Bet_T1": [ '__t1_bet_mask.nii.gz', '__t1_bet.nii.gz' ], +# "Crop_T1": [ '__t1_bet_cropped.nii.gz', '__t1_bet_mask_cropped.nii.gz' ], +# "Register_T1": [ '__output0GenericAffine.mat', '__output1InverseWarp.nii.gz', '__output1Warp.nii.gz', '__t1_mask_warped.nii.gz', '__t1_warped.nii.gz' ], +# "Segment_Tissues": [ '__map_csf.nii.gz', '__map_gm.nii.gz', '__map_wm.nii.gz', '__mask_csf.nii.gz', '__mask_gm.nii.gz', '__mask_wm.nii.gz' ], +# "PFT_Tracking_Maps": [ '__interface.nii.gz', '__map_exclude.nii.gz', '__map_include.nii.gz' ], +# "PFT_Seeding_Mask": [ '__pft_seeding_mask.nii.gz' ], +# "PFT_Tracking": [ '__pft_tracking_prob_wm_seed_0.trk' ] +# } + +# tracker_configs = { +# "pipeline_complete": check_tf_final, +# "PHASE_": { +# "DWI-Preprocessing": check_dwiPreproc, +# "Anat-Preprocessing": check_anatPreproc, +# "DWI-ModelFitting": check_dwiModel, +# "PFT-Tracking": check_pftTracking +# }, +# "STAGE_": { +# "DWI-Preproc-EddyTopup": check_dwiPreprocEddyTopup, +# "DWI-Preproc-Normalize": check_dwiNormalize, +# "Anat-Preproc-Reorient": check_anatReorient, +# "Anat-Preproc-Tracking": check_anatTracking, +# "DWI-ModelFitting-Tensor": check_dwiModelTensor, +# "DWI-ModelFitting-fODF": check_dwiModelFODF +# } +# } diff --git a/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py b/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py index a96e977f..5e38a140 100644 --- a/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py +++ b/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py @@ -5,12 +5,13 @@ import json import subprocess import tempfile - +import re + import numpy as np import nibabel as nib -from bids import BIDSLayout +from bids import BIDSLayout, BIDSLayoutIndexer -import workflow.logger as my_logger +import nipoppy.workflow.logger as my_logger #Author: bcmcpher #Date: 15-Jun-2023 @@ -21,22 +22,59 @@ MEM_MB = 4000 -def parse_data(bids_dir, participant_id, session_id, logger=None): +def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_filter=False, logger=None): """ Parse and verify the input files to build TractoFlow's simplified input to avoid their custom BIDS filter """ + ## load from global configs + DATASET_ROOT = global_configs["DATASET_ROOT"] + ## because why parse subject ID the same as bids ID? subj = participant_id.replace('sub-', '') - logger.info('Building BIDS Layout...') + ## build a regex of anything not subject id + srx = re.compile(f"sub-(?!{subj}.*$)") + + logger.info('Building Single Subject BIDS Layout...') + + ## build a BIDSLayoutIndexer to only pull subject ID + bidx = BIDSLayoutIndexer(ignore=[srx]) + + ## parse bids directory with subject filter + layout = BIDSLayout(bids_dir, indexer=bidx) + ## check if DB exists on disk first? BIDSLayout(database_path=var)? where is this saved? + ## should this be made / updated as part of BIDS-ification of dicoms? + + ## load the bids filter if it's called + if use_bids_filter: + + bidf_path = Path(f"{DATASET_ROOT}", 'proc', 'bids_filter_tractoflow.json') ## is this where it will always be? + ## or does this need to turn into a path to a filter to load? + + ## if a filter exists + if os.path.exists(bidf_path): + logger.info(' -- Expected bids_filter.json is found.') + f = open(bidf_path) + bidf = json.load(f) ## load the json as a dictionary + f.close() + ## does this validate the dictionary in any way? + ## https://github.com/nipreps/fmriprep/blob/20659650be367dff78f5e8c91c1856d4df7fcd4b/fmriprep/cli/parser.py#L72-L91 - ## parse directory - layout = BIDSLayout(bids_dir) + else: + logger.info(' -- Expected bids_filter.json is not found.') + bidf = {} ## make it empty + + else: + logger.info(' -- Not using a bids_filter.json') ## pull every t1w / dwi file name from BIDS layout - anat_files = layout.get(subject=subj, session=session_id, suffix='T1w', extension='.nii.gz', return_type='object') - dmri_files = layout.get(subject=subj, session=session_id, suffix='dwi', extension='.nii.gz', return_type='object') - + if bidf: + anat_files = layout.get(extension='.nii.gz', **bidf['t1w']) + dmri_files = layout.get(extension='.nii.gz', **bidf['dwi']) + else: + anat_files = layout.get(suffix='T1w', extension='.nii.gz') + dmri_files = layout.get(suffix='dwi', extension='.nii.gz') + ## preallocate candidate anatomical files canat = [] @@ -153,7 +191,7 @@ def parse_data(bids_dir, participant_id, session_id, logger=None): cdmri.append(dmri) logger.info("- "*25) - + ## if there's more than 1 candidate with bv* files if sum(cbv == 1) > 1: @@ -166,7 +204,9 @@ def parse_data(bids_dir, participant_id, session_id, logger=None): if x == 1: logger.info(f"File {idx+1}: {dmri_files[idx].filename}") dmrifs.append(dmri_files[idx]) - + + ## if each shell is in a separate file, that would need to be determined and fixed here. + ## if there are more than 2, quit - bad input if len(dmrifs) > 2: raise ValueError('Too many candidate full sequences.') @@ -183,6 +223,18 @@ def parse_data(bids_dir, participant_id, session_id, logger=None): dmrifs1nv = dmrifs1.get_image().shape[3] dmrifs2nv = dmrifs2.get_image().shape[3] + ## get the sequence bvals + dmrifs1va = np.loadtxt(Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmrifs[0].filename.replace('.nii.gz', '.bval')).joinpath()) + dmrifs2va = np.loadtxt(Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmrifs[1].filename.replace('.nii.gz', '.bval')).joinpath()) + + ## get the sequence bvecs + dmrifs1ve = np.loadtxt(Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmrifs[0].filename.replace('.nii.gz', '.bvec')).joinpath()) + dmrifs2ve = np.loadtxt(Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmrifs[1].filename.replace('.nii.gz', '.bvec')).joinpath()) + + ## get the number of directions + dmrifs1wd = dmrifs1ve[:,dmrifs1va > 0] + dmrifs2wd = dmrifs2ve[:,dmrifs2va > 0] + ## if the phase encodings are the same axis if (dmrifs1pe[0] == dmrifs2pe[0]): @@ -191,16 +243,22 @@ def parse_data(bids_dir, participant_id, session_id, logger=None): ## if the phase encodings match exactly if (dmrifs1pe == dmrifs2pe): - ## THIS HAPPENS WITH A IDENTICAL RUN-1/RUN-2 IN PPMI - ## ARE THESE MEANT TO BE TIME 1 / TIME 2? - ## UNSURE HOW I SHOULD DEAL WITH MULTIPLE RUNS IN dMRI - NOT COMMON - logger.info('Sequences are not reverse encoded. Ignoring second sequence.') - didx = dmri_files.index(dmrifs1) + ## print log for surprising situation of matching files + logger.info('Sequences are not reverse encoded and are identifcal.') + logger.info('Was the phase encoding not flipped during acquisition or are these sequences longitudinal?') + logger.info('Unsure how to parse. Ignoring shorter (or second) sequence.') + + ## pick the longer (or first) sequence to return + if dmrifs1wd.shape[1] >= dmrifs2wd.shape[1]: + didx = dmri_files.index(dmrifs1) + else: + didx = dmri_files.index(dmrifs2) + rpe_out = None ## they're the same axis in opposite orientations else: - + logger.info(f"Forward Phase Encoding: {dmrifs1pe}") logger.info(f"Reverse Phase Encoding: {dmrifs2pe}") @@ -209,10 +267,13 @@ def parse_data(bids_dir, participant_id, session_id, logger=None): logger.info('N volumes match. Assuming mirrored sequences.') - ## verify that bvecs match? - - dmrifs1nv = dmrifs1.get_image().shape[3] - dmrifs2nv = dmrifs1.get_image().shape[3] + ## verify that bvecs match + if all(dmrifs1wd == dmrifs2wd): + logger.info(' -- Verified weighted directions match.') + ## identity matching bvecs may be fragile - add a failover tolerance? + + else: + logger.info(' -- Weighted directions are different. Sequences do not match.') ## pull the first as forward didx = dmri_files.index(dmrifs1) @@ -260,7 +321,8 @@ def parse_data(bids_dir, participant_id, session_id, logger=None): nib.save(rpe_data, Path(tmp_dir, rpe_out).joinpath()) ## print a warning to log - logger.info('The sequences are suffieciently mismatched that an acquisition or conversion error is likely to have occurred.') + logger.info('The reverse sequence has non-b0 directed volumes that cannot be used and will be ignored during processing.') + logger.info('If that is not the expected result, check that the data has been converted correctly.') else: @@ -335,9 +397,21 @@ def parse_data(bids_dir, participant_id, session_id, logger=None): rpe_file = None else: rpe_file = Path(tmp_dir, rpe_out).joinpath() - + + ## set the phase encoding direction + if ('i' in dmri_files[didx].get_metadata()['PhaseEncodingDirection']): + phase = 'x' + if ('j' in dmri_files[didx].get_metadata()['PhaseEncodingDirection']): + phase = 'y' + else: + logger.info('An unlikely phase encoding has been selected.') + phase = 'z' + + ## set the total readout time for topup + readout = dmri_files[didx].get_metadata()['TotalReadoutTime'] + ## return the paths to the input files to copy - return(dmrifile, bvalfile, bvecfile, anatfile, rpe_file) + return(dmrifile, bvalfile, bvecfile, anatfile, rpe_file, phase, readout) def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, dti_shells=1000, fodf_shells=1000, sh_order=6, logger=None): """ Runs TractoFlow command with Nextflow @@ -349,9 +423,10 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, TRACTOFLOW_CONTAINER = global_configs["PROC_PIPELINES"]["tractoflow"]["CONTAINER"] TRACTOFLOW_VERSION = global_configs["PROC_PIPELINES"]["tractoflow"]["VERSION"] TRACTOFLOW_CONTAINER = TRACTOFLOW_CONTAINER.format(TRACTOFLOW_VERSION) - SINGULARITY_TRACTOFLOW = f"{CONTAINER_STORE}{TRACTOFLOW_CONTAINER}" + SINGULARITY_TRACTOFLOW = f"{CONTAINER_STORE}/{TRACTOFLOW_CONTAINER}" LOGDIR = f"{DATASET_ROOT}/scratch/logs" - + SINGULARITY_COMMAND = global_configs["SINGULARITY_PATH"] + ## initialize the logger if logger is None: log_file = f"{LOGDIR}/{participant_id}_ses-{session_id}_tractoflow.log" @@ -370,38 +445,31 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, bids_dir = f"{DATASET_ROOT}/bids" tractoflow_dir = f"{output_dir}/tractoflow/v{TRACTOFLOW_VERSION}" - ## Copy bids_filter.json `/bids/bids_filter.json` + ## Copy bids_filter.json if use_bids_filter: - logger.info(f"Copying ./bids_filter.json to {DATASET_ROOT}/bids/bids_filter.json (to be seen by Singularity container)") - shutil.copyfile(f"{CWD}/bids_filter.json", f"{bids_dir}/bids_filter.json") + if not os.path.exists(f"{DATASET_ROOT}/proc/bids_filter_tractoflow.json"): + logger.info(f"Copying ./bids_filter.json to {DATASET_ROOT}/proc/bids_filter_tractoflow.json (to be seen by Singularity container)") + shutil.copyfile(f"{CWD}/bids_filter.json", f"{DATASET_ROOT}/proc/bids_filter_tractoflow.json") + else: + logger.info(f"Using found {DATASET_ROOT}/proc/bids_filter_tractoflow.json") ## build paths for outputs - tractoflow_out_dir = f"{tractoflow_dir}/output/" + tractoflow_out_dir = f"{tractoflow_dir}/output/ses-{session_id}" tractoflow_home_dir = f"{tractoflow_out_dir}/{participant_id}" if not os.path.exists(Path(f"{tractoflow_home_dir}")): Path(f"{tractoflow_home_dir}").mkdir(parents=True, exist_ok=True) ## call the file parser to copy the correct files to the input structure - dmrifile, bvalfile, bvecfile, anatfile, rpe_file = parse_data(bids_dir, participant_id, session_id, logger) - - ## nest inputs in rpe/no-rpe folders so tractoflow will parse mixed datasets w/o failing b/c data is "bad" - if rpe_file == None: - tractoflow_input_dir = f"{tractoflow_dir}/input/norpe" - else: - tractoflow_input_dir = f"{tractoflow_dir}/input/rpe" - - ## build paths for working inputs - - ## isolate datasets based on input shapes - if not rpe_file: - tractoflow_input_dir = f"{tractoflow_dir}/input/norpe" - else: - tractoflow_input_dir = f"{tractoflow_dir}/input/rpe" + dmrifile, bvalfile, bvecfile, anatfile, rpe_file, phase, readout = parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_filter, logger) + ## use_bids_filter may need to be path to the filter so it can be loaded, not the logical ## build paths to working inputs - tractoflow_subj_dir = f"{tractoflow_input_dir}/{participant_id}" - tractoflow_work_dir = f"{tractoflow_dir}/work/{participant_id}" - + tractoflow_input_dir = f"{tractoflow_dir}/input" + tractoflow_nxtf_inp = f"{tractoflow_input_dir}/{participant_id}_ses-{session_id}" + tractoflow_subj_dir = f"{tractoflow_input_dir}/{participant_id}_ses-{session_id}/{participant_id}" + tractoflow_work_dir = f"{tractoflow_dir}/work/{participant_id}_ses-{session_id}" + nextflow_logdir = f"{LOGDIR}/nextflow" + ## check if working values already exist if not os.path.exists(Path(f"{tractoflow_subj_dir}")): Path(f"{tractoflow_subj_dir}").mkdir(parents=True, exist_ok=True) @@ -410,6 +478,10 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, if not os.path.exists(Path(f"{tractoflow_work_dir}")): Path(f"{tractoflow_work_dir}").mkdir(parents=True, exist_ok=True) + ## build path to nextflow folder in logs for storing .nextflow* files for each launch + if not os.path.exists(Path(f"{nextflow_logdir}")): + Path(f"{nextflow_logdir}").mkdir(parents=True, exist_ok=True) + ## just make copies if they aren't already there - resume option cannot work w/ modified (recopied) files, so check first ## delete on success? if len(os.listdir(tractoflow_subj_dir)) == 0: @@ -446,7 +518,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, odf_use = list(map(int, odf_use.split(","))) ## create merged list of requested shells - rshell = np.unique([ dti_use, odf_use ]) + rshell = np.unique([ dti_use + odf_use ]) logger.info(f'Requested shells: {rshell}') ## if any requested shell(s) are absent within data, error w/ useful warning @@ -472,14 +544,15 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, tdir = np.unique(tvec, axis=0) ## compute and print the maximum shell - plmax = int(np.floor((-3 + np.sqrt(1 + 8 * tdir.shape[1])) / 2.0)) + plmax = int(np.floor((-3 + np.sqrt(1 + 8 * tdir.shape[1]) / 2.0))) logger.info(f"Single shell data has b = {int(bunq[1])} shell with a maximum possible lmax of {plmax}.") ## have to check the utility of every shell else: logger.info(f"Multishell data has shells b = {bunq[1:]}") - mlmax = [] + mlmax = [] + mldir = [] for shell in bunq[1:]: ## pull the shells @@ -489,23 +562,25 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## check that vectors are unique tvec = bvec[:,tndir] tdir = np.unique(tvec, axis=0) - - ## compute and print the maximum shell - can get odd numbers? - tlmax = int(np.floor((-3 + np.sqrt(1 + 8 * tdir.shape[1])) / 2.0)) + mldir.append(tdir.shape[1]) + + ## compute and print the maximum shell + tlmax = int(np.floor((-3 + np.sqrt(1 + 8 * tdir.shape[1]) / 2.0))) mlmax.append(tlmax) logger.info(f" -- Shell {int(shell)} has {tdir.shape[1]} directions capable of a max lmax: {tlmax}") - ## the minimum lmax across shells is used - plmax = min(mlmax) - logger.info(f"The maximum lmax across shells is: {plmax}") + ## the max lmax within any 1 shell is used + plmax = max(mlmax) + logger.info(f"The maximum lmax for any one shell is: {plmax}") ## if lmax too large, reset with warning if sh_order <= plmax: logger.info(f"Running model with requested lmax: {sh_order}") else: logger.info(f"The requested lmax ({sh_order}) exceeds the recommended capabilities of the data ({plmax})") + logger.info(f"Generally, you do not want to fit an lmax in excess of any one shell's ability in the data.") logger.info(f" -- You should redo with sh_order set to: {plmax}") - logger.info(f"Running model with overrode lmax: {sh_order}") + #logger.info(f"Running model with overrode lmax: {sh_order}") #sh_order = plmax ## hard coded inputs to the tractoflow command in mrproc @@ -518,12 +593,11 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## path to pipelines TRACTOFLOW_PIPE=f'{DATASET_ROOT}/workflow/proc_pipe/tractoflow' - ## this is fixed for every run - nextflow is a dependency b/c it's too hard to package in the containers that will call this - ## this reality prompts the planned migration to micapipe - or anything else, honestly - NEXTFLOW_CMD=f"nextflow run {TRACTOFLOW_PIPE}/tractoflow/main.nf -with-singularity {SINGULARITY_TRACTOFLOW} -work-dir {tractoflow_work_dir} -with-trace {LOGDIR}/{participant_id}_ses-{session_id}_nf-trace.txt -with-report {LOGDIR}/{participant_id}_ses-{session_id}_nf-report.html" + ## nextflow arguments for logging - this is fixed for every run + NEXTFLOW_CMD=f"nextflow -log {LOGDIR}/{participant_id}_ses-{session_id}_nf-log.txt run /scilus_flows/tractoflow/main.nf -work-dir {tractoflow_work_dir} -with-trace {LOGDIR}/{participant_id}_ses-{session_id}_nf-trace.txt -with-report {LOGDIR}/{participant_id}_ses-{session_id}_nf-report.html" ## compose tractoflow arguments - TRACTOFLOW_CMD=f""" --input {tractoflow_input_dir} --output_dir {tractoflow_out_dir} --participant-label "{tf_id}" --dti_shells "0 {dti_use}" --fodf_shells "0 {odf_use}" --sh_order {sh_order} --profile {profile} --processes {ncore}""" + TRACTOFLOW_CMD=f""" --input {tractoflow_nxtf_inp} --output_dir {tractoflow_out_dir} --run_t1_denoising --run_gibbs_correction --encoding_direction {phase} --readout {readout} --dti_shells "0 {dti_use}" --fodf_shells "0 {odf_use}" --sh_order {sh_order} --profile {profile} --processes {ncore}""" ## TractoFlow arguments can be printed multiple ways that appear consistent with the documentation but are parsed incorrectly by nextflow. ## .nexflow.log (a run log that documents what is getting parsed by nexflow) shows additional quotes being added around the dti / fodf parameters. Something like: "'0' '1000'" @@ -536,23 +610,27 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## build command line call CMD_ARGS = NEXTFLOW_CMD + TRACTOFLOW_CMD - CMD=CMD_ARGS - + ## log what is called - logger.info(f"Running TractoFlow...") - logger.info("-"*50) - logger.info(f"CMD:\n{CMD_ARGS}") - logger.info("-"*50) - logger.info(f"Calling TractoFlow for participant: {participant_id}") + logger.info("-"*75) + logger.info(f"Running TractoFlow for participant: {participant_id}") + + ## singularity + SINGULARITY_CMD=f"{SINGULARITY_COMMAND} exec --cleanenv -H {nextflow_logdir} -B {nextflow_logdir}:/nextflow -B {DATASET_ROOT} {SINGULARITY_TRACTOFLOW}" + CMD=SINGULARITY_CMD + " " + CMD_ARGS + logger.info("+"*75) + logger.info(f"Command passed to system:\n\n{CMD}\n") + logger.info("+"*75) + ## there's probably a better way to try / catch the .run() call here try: + logger.info('Attempting Run') tractoflow_proc = subprocess.run(CMD, shell=True) - logger.info("-"*75) except Exception as e: - logger.error(f"TractoFlow run failed with exceptions: {e}") - logger.info("-"*75) + logger.error(f"TractoFlow run failed to launch with exception: {e}") + logger.info('End of TractoFlow run script.') if __name__ == '__main__': ## argparse diff --git a/nipoppy/workflow/proc_pipe/tractoflow/sample_bids_filter.json b/nipoppy/workflow/proc_pipe/tractoflow/sample_bids_filter.json new file mode 100755 index 00000000..e9234b65 --- /dev/null +++ b/nipoppy/workflow/proc_pipe/tractoflow/sample_bids_filter.json @@ -0,0 +1,15 @@ +{ + "t1w": { + "datatype": "anat", + "session": "01", + "run": "1", + "acquisition": null, + "suffix": "T1w" + }, + "dwi": { + "session": "01", + "acquisition": null, + "run": "1", + "suffix": "dwi" + } +} diff --git a/nipoppy/workflow/proc_pipe/tractoflow/tractoflow b/nipoppy/workflow/proc_pipe/tractoflow/tractoflow deleted file mode 160000 index 1766c72b..00000000 --- a/nipoppy/workflow/proc_pipe/tractoflow/tractoflow +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 1766c72b9c987b386b503d03c1eb6c9c36f83e13