From 47fed19afc49e60e535d6f1ce1ebbda190d72ed0 Mon Sep 17 00:00:00 2001 From: Brent McPherson Date: Mon, 18 Mar 2024 14:42:48 -0400 Subject: [PATCH] Remove symlinks from tractoflow output (#200) * added logic to check outputs, remove symlinks, and clean unnecessary directories. * edits from pr request --- .../proc_pipe/tractoflow/run_tractoflow.py | 196 +++++++++++------- 1 file changed, 123 insertions(+), 73 deletions(-) diff --git a/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py b/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py index d3cdcf3b..2ad4f88d 100644 --- a/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py +++ b/nipoppy/workflow/proc_pipe/tractoflow/run_tractoflow.py @@ -6,7 +6,7 @@ import subprocess import tempfile import re - + import numpy as np import nibabel as nib from bids import BIDSLayout, BIDSLayoutIndexer @@ -23,6 +23,31 @@ MEM_MB = 4000 + +def move_target_to_symlink(symlink: Path) -> None: + ''' + Convert a symlink into its target file by moving the target to the linked location. + + This is for removing unnecessary complexity from completed TractoFlow outputs and + facilitating the archiving and moving of results. + ''' + # if the symlink is a link + if symlink.is_symlink(): + + # extract the target path from the link + target = symlink.resolve() + + # remove the link + symlink.unlink() + + # rename (move) the target of the link to replace the link + target.rename(symlink) + + # otherwise, error and exit + else: + raise ValueError(f"symlink is not a symlink: {symlink}\nDoing nothing.") + + def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_filter=False, logger=None): """ Parse and verify the input files to build TractoFlow's simplified input to avoid their custom BIDS filter """ @@ -40,7 +65,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## build a BIDSLayoutIndexer to only pull subject ID bidx = BIDSLayoutIndexer(ignore=[srx]) - + ## parse bids directory with subject filter layout = BIDSLayout(bids_dir, indexer=bidx) ## check if DB exists on disk first? BIDSLayout(database_path=var)? where is this saved? @@ -61,10 +86,10 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi f.close() ## does this validate the dictionary in any way? ## https://github.com/nipreps/fmriprep/blob/20659650be367dff78f5e8c91c1856d4df7fcd4b/fmriprep/cli/parser.py#L72-L91 - + else: logger.info(' -- Expected bids_filter.json is not found.') - + else: logger.info(' -- Not using a bids_filter.json') @@ -87,7 +112,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi tvol = anat.get_image() # ## because PPMI doesn't have full sidecars - + try: tmcmode = tmeta['MatrixCoilMode'] except: @@ -102,7 +127,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi tprotocol = tmeta['ProtocolName'] except: tprotocol = 'unknown' - + logger.info("- "*25) logger.info(anat.filename) logger.info(f"Scan Type: {tmcmode}") @@ -119,10 +144,10 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## look for Neuromelanin type scan in name somewhere if ('neuromel' in tprotocol.lower()): continue - + ## heudiconv heuristics file has some fields that could be reused. ## how much effort are we supposed to spend generalizing parsing to other inputs? - + ## append the data if it passes all the skips canat.append(anat) @@ -131,7 +156,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## error if nothing passes if len(canat) == 0: raise ValueError(f'No valid T1 anat file for {participant_id} in ses-{session_id}.') - + ## check how many candidates there are if len(canat) > 1: logger.info('Still have to pick one...') @@ -142,13 +167,13 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi logger.info(f"Selected anat file: {oanat.filename}") logger.info("= "*25) - + ## preallocate candidate dmri inputs cdmri = [] cbv = np.empty(len(dmri_files)) cnv = np.empty(len(dmri_files)) cpe = [] - + logger.info("Parsing Diffusion Files...") for idx, dmri in enumerate(dmri_files): @@ -159,7 +184,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi tpedir = tmeta['PhaseEncodingDirection'] except: raise ValueError("INCOMPLETE SIDECAR: ['PhaseEncodingDirection'] is not defined in sidecar. This is required to accurately parse the dMRI data.") - + logger.info("- "*25) logger.info(dmri.filename) logger.info(f"Encoding Direction: {tmeta['PhaseEncodingDirection']}") @@ -167,7 +192,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## store phase encoding data cpe.append(tmeta['PhaseEncodingDirection']) - + ## store image dimension if len(tvol.shape) == 4: cnv[idx] = tvol.shape[-1] @@ -175,7 +200,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi cnv[idx] = 1 else: raise ValueError('dMRI File: {dmri.filename} is not 3D/4D.') - + ## build paths to bvec / bval data tbvec = Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmri.filename.replace('.nii.gz', '.bvec')).joinpath() tbval = Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmri.filename.replace('.nii.gz', '.bval')).joinpath() @@ -190,16 +215,16 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## append to output (?) cdmri.append(dmri) - + logger.info("- "*25) - + ## if there's more than 1 candidate with bv* files if sum(cbv == 1) > 1: - + logger.info("Continue checks assuming 2 directed files...") dmrifs = [] - + ## pull the full sequences for idx, x in enumerate(cbv): if x == 1: @@ -207,7 +232,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi dmrifs.append(dmri_files[idx]) ## if each shell is in a separate file, that would need to be determined and fixed here. - + ## if there are more than 2, quit - bad input if len(dmrifs) > 2: raise ValueError('Too many candidate full sequences.') @@ -215,7 +240,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## split out to separate files dmrifs1 = dmrifs[0] dmrifs2 = dmrifs[1] - + ## pull phase encoding direction dmrifs1pe = dmrifs1.get_metadata()['PhaseEncodingDirection'] dmrifs2pe = dmrifs2.get_metadata()['PhaseEncodingDirection'] @@ -235,7 +260,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## get the number of directions dmrifs1wd = dmrifs1ve[:,dmrifs1va > 0] dmrifs2wd = dmrifs2ve[:,dmrifs2va > 0] - + ## if the phase encodings are the same axis if (dmrifs1pe[0] == dmrifs2pe[0]): @@ -254,37 +279,37 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi didx = dmri_files.index(dmrifs1) else: didx = dmri_files.index(dmrifs2) - + rpe_out = None ## they're the same axis in opposite orientations else: - + logger.info(f"Forward Phase Encoding (FPE) File: {dmrifs1.filename}") logger.info(f"Reverse Phase Encoding (RPE) File: {dmrifs2.filename}") logger.info(f"FPE Direction / RPE Direction: {dmrifs1pe} / {dmrifs2pe}") - + ## if the sequences are the same length if (dmrifs1nv == dmrifs2nv): - + logger.info('N volumes match. Assuming mirrored sequences.') ## verify that bvecs match if np.allclose(dmrifs1wd, dmrifs2wd): logger.info(' -- Verified weighted directions match.') ## identity matching bvecs may be fragile - add a failover tolerance? - + else: logger.info(' -- Weighted directions are different. Sequences do not match.') ## pull the first as forward - didx = dmri_files.index(dmrifs1) + didx = dmri_files.index(dmrifs1) ## pull the second as reverse rpeimage = dmrifs2.get_image() ## load image data - rpedata = rpeimage.get_fdata() + rpedata = rpeimage.get_fdata() ## load bval data rpeb0s = np.loadtxt(Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmrifs2.filename.replace('.nii.gz', '.bval')).joinpath()) @@ -302,12 +327,12 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi else: didx = dmri_files.index(dmrifs1) - + ## pull the second as reverse rpeimage = dmrifs2.get_image() ## load image data - rpedata = rpeimage.get_fdata() + rpedata = rpeimage.get_fdata() ## load bval data rpeb0s = np.loadtxt(Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmrifs2.filename.replace('.nii.gz', '.bval')).joinpath()) @@ -325,34 +350,34 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## print a warning to log logger.info('The reverse sequence has non-b0 directed volumes that cannot be used and will be ignored during processing.') logger.info('If that is not the expected result, check that the data has been converted correctly.') - + else: raise ValueError(f'The phase encodings are on different axes: {dmrifs1pe}, {dmrifs2pe}\nCannot determine what to do.') - + elif sum(cbv == 1) == 1: - + logger.info("Continue checks assuming 1 directed file...") ## pull the index of the bvec that exists didx = np.argmax(cbv) - + ## pull phase encoding for directed volume fpe = cpe[didx] ## clean fpe of unnecessary + if it's there if fpe[-1] == "+": fpe = fpe[0] - + ## determine the reverse phase encoding if (len(fpe) == 1): rpe = fpe + "-" else: rpe = fpe[0] - + ## look for the reverse phase encoded file in the other candidates if (rpe in cpe): - + rpeb0 = dmri_files[cpe.index(rpe)] rpevol = rpeb0.get_image() tmp_dir = tempfile.mkdtemp() #TemporaryDirectory() @@ -361,7 +386,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi logger.info(f"Forward Phase Encoding (FPE) File: {dmri_files[didx].filename}") logger.info(f"Reverse Phase Encoding (RPE) File: {rpeb0.filename}") logger.info(f"FPE Direction / RPE Direction: {fpe} / {rpe}") - + ## if the rpe file has multiple volumes if len(rpevol.shape) == 4: @@ -374,23 +399,23 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi rpe_data = nib.nifti1.Nifti1Image(rpedat, rpevol.affine) rpe_shape = rpe_data.shape nib.save(rpe_data, Path(tmp_dir, rpe_out).joinpath()) - + else: logger.info('A 3D RPE file is present: Copying the single RPE b0 volume...') ## otherwise, copy the input file to tmp rpe_shape = rpevol.shape shutil.copyfile(rpeb0, rpe_out) - + else: - + logger.info("No valid RPE file is found in candidate files") rpe_out = None else: raise ValueError('No valid dMRI files found.') - + logger.info("= "*25) - + ## default assignments dmrifile = Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmri_files[didx].filename).joinpath() bvalfile = Path(bids_dir, participant_id, 'ses-' + session_id, 'dwi', dmri_files[didx].filename.replace('.nii.gz', '.bval')).joinpath() @@ -414,7 +439,7 @@ def parse_data(global_configs, bids_dir, participant_id, session_id, use_bids_fi ## set the total readout time for topup readout = dmri_files[didx].get_metadata()['TotalReadoutTime'] - + ## return the paths to the input files to copy return(dmrifile, bvalfile, bvecfile, anatfile, rpe_file, phase, readout) @@ -431,7 +456,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, SINGULARITY_TRACTOFLOW = f"{CONTAINER_STORE}/{TRACTOFLOW_CONTAINER}" LOGDIR = Path(f"{DATASET_ROOT}/scratch/logs").resolve() SINGULARITY_COMMAND = global_configs["SINGULARITY_PATH"] - + ## initialize the logger if logger is None: log_file = f"{LOGDIR}/{participant_id}_ses-{session_id}_tractoflow.log" @@ -450,14 +475,14 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, bids_dir = Path(f"{DATASET_ROOT}/bids").resolve() tractoflow_dir = f"{output_dir}/tractoflow/{TRACTOFLOW_VERSION}" - ## Copy bids_filter.json + ## Copy bids_filter.json if use_bids_filter: if not os.path.exists(f"{DATASET_ROOT}/proc/bids_filter_tractoflow.json"): logger.info(f"Copying ./bids_filter.json to {DATASET_ROOT}/proc/bids_filter_tractoflow.json (to be seen by Singularity container)") shutil.copyfile(f"{CWD}/bids_filter.json", f"{DATASET_ROOT}/proc/bids_filter_tractoflow.json") else: logger.info(f"Using found {DATASET_ROOT}/proc/bids_filter_tractoflow.json") - + ## build paths for outputs tractoflow_out_dir = f"{tractoflow_dir}/output/ses-{session_id}" tractoflow_home_dir = f"{tractoflow_out_dir}/{participant_id}" @@ -474,7 +499,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, tractoflow_subj_dir = f"{tractoflow_input_dir}/{participant_id}_ses-{session_id}/{participant_id}" tractoflow_work_dir = f"{tractoflow_dir}/work/{participant_id}_ses-{session_id}" nextflow_logdir = f"{LOGDIR}/nextflow" - + ## check if working values already exist if not os.path.exists(Path(f"{tractoflow_subj_dir}")): Path(f"{tractoflow_subj_dir}").mkdir(parents=True, exist_ok=True) @@ -486,7 +511,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## build path to nextflow folder in logs for storing .nextflow* files for each launch if not os.path.exists(Path(f"{nextflow_logdir}")): Path(f"{nextflow_logdir}").mkdir(parents=True, exist_ok=True) - + ## just make copies if they aren't already there - resume option cannot work w/ modified (recopied) files, so check first ## delete on success? if len(os.listdir(tractoflow_subj_dir)) == 0: @@ -496,20 +521,20 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, shutil.copyfile(anatfile, Path(tractoflow_subj_dir, 't1.nii.gz').joinpath()) if os.path.exists(rpe_file): shutil.copyfile(rpe_file, Path(tractoflow_subj_dir, 'rev_b0.nii.gz').joinpath()) - + ## load the bval / bvec data bval = np.loadtxt(bvalfile) bvec = np.loadtxt(bvecfile) #sh_order = int(sh_order) #logger.info(f'bval: {bval}') - + ## round shells to get b0s that are ~50 / group shells that are off by +/- 10 rval = bval + 49 ## this either does too much or not enough rounding for YLO dataset rval = np.floor(rval / 100) * 100 rval = rval.astype(int) ## I don't know how to get around just overwriting with the "fix" np.savetxt(Path(tractoflow_subj_dir, 'bval').joinpath(), rval, fmt='%1.0i', newline=' ') - + ## pull the number of shells bunq = np.unique(rval) nshell = bunq.size - 1 @@ -518,7 +543,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## where only the shell closest to 1000 is used #dten = rval[np.where(np.abs(rval - 1000) == np.min(np.abs(rval) - 1000))[0]] dten = rval[np.abs(rval-1000) == np.min(np.abs(rval - 1000))][0] - + if dti_shells is None: logger.info(f'No requested shell(s) passed to tensor fitting. Automatically extracting the shell closest to 1000.') dti_use = str(dten) @@ -532,7 +557,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, logger.info(f'The ODF will be fit on data with b = {odf_use}') else: odf_use = fodf_shells - + ## fix the passed bvals ## convert to integer @@ -556,20 +581,20 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, odf_use = ' '.join(map(str, odf_use)) ## pull lmax possible from all directions - + ## logical index of b0 values b0idx = rval == 0 - + ## check that vectors are unique tvec = bvec[:,~b0idx] tdir = np.unique(tvec, axis=0) - + ## compute and print the maximum shell dlmax = int(np.floor((-3 + np.sqrt(1 + 8 * tdir.shape[1]) / 2.0))) logger.info(f'The largest supported lmax using the whole dMRI sequence is: {dlmax}') logger.info(f' -- The lmax is generally restricted to the highest lmax supported by any one shell.') logger.info(f' -- At most, the lmax should not exceed the highest lmax supported by any one shell.') - + if sh_order is None: sh_order = dlmax logger.info(f'No lmax is requested by user. Setting lmax to the largest value supported by the data.') @@ -579,14 +604,14 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, logger.info(f'Determining if data supports an max lmax of {sh_order}.') else: raise ValueError(f'Requested lmax of {sh_order} is higher than the data can support.') - + ## deal with multishell data if nshell == 1: plmax = dlmax logger.info(f"Single shell data has b = {int(bunq[1])}") logger.info(f" -- Shell b = {int(bunq[1])} has {tdir.shape[1]} directions capable of a max lmax: {plmax}.") logger.info(f"The maximum lmax for the single shell data is: {plmax}") - + ## have to check the utility of every shell else: @@ -603,7 +628,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, tvec = bvec[:,tndir] tdir = np.unique(tvec, axis=0) mldir.append(tdir.shape[1]) - + ## compute and print the maximum lmax per shell tlmax = int(np.floor((-3 + np.sqrt(1 + 8 * tdir.shape[1]) / 2.0))) mlmax.append(tlmax) @@ -612,7 +637,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## the max lmax within any 1 shell is used plmax = max(mlmax) logger.info(f"The maximum lmax for any one shell is: {plmax}") - + ## if lmax too large, reset with warning if int(sh_order) <= plmax: logger.info(f"Running CSD model with lmax: {sh_order}") @@ -636,10 +661,10 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## path to pipelines TRACTOFLOW_PIPE=f'{DATASET_ROOT}/workflow/proc_pipe/tractoflow' - + ## nextflow arguments for logging - this is fixed for every run NEXTFLOW_CMD=f"nextflow -log {LOGDIR}/{participant_id}_ses-{session_id}_nf-log.txt run /scilus_flows/tractoflow/main.nf -work-dir {tractoflow_work_dir} -with-trace {LOGDIR}/{participant_id}_ses-{session_id}_nf-trace.txt -with-report {LOGDIR}/{participant_id}_ses-{session_id}_nf-report.html" - + ## compose tractoflow arguments TRACTOFLOW_CMD=f""" --input {tractoflow_nxtf_inp} --output_dir {tractoflow_out_dir} --run_t1_denoising --run_gibbs_correction --encoding_direction {phase} --readout {readout} --dti_shells "0 {dti_use}" --fodf_shells "0 {odf_use}" --sh_order {sh_order} --profile {profile} --processes {ncore}""" @@ -647,39 +672,64 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, ## .nexflow.log (a run log that documents what is getting parsed by nexflow) shows additional quotes being added around the dti / fodf parameters. Something like: "'0' '1000'" ## Obviously, this breaks the calls that nextflow tries to make at somepoint (typically around Normalize_DWI) because half the command becomes an unfinished text block from the mysteriously added quotes. ## I don't know if the problem is python printing unhelpful/inaccurate text to the user or if nextflow can't parse its own input arguments correctly. - + ## add resume option if working directory is not empty if not len(os.listdir(tractoflow_work_dir)) == 0: TRACTOFLOW_CMD = TRACTOFLOW_CMD + " -resume" - + ## build command line call - CMD_ARGS = NEXTFLOW_CMD + TRACTOFLOW_CMD - + CMD_ARGS = NEXTFLOW_CMD + TRACTOFLOW_CMD + ## log what is called logger.info("-"*75) logger.info(f"Running TractoFlow for participant: {participant_id}") - ## singularity + ## singularity SINGULARITY_CMD=f"{SINGULARITY_COMMAND} exec --cleanenv -H {nextflow_logdir} -B {nextflow_logdir}:/nextflow -B {LOGDIR} -B {output_dir} {SINGULARITY_TRACTOFLOW}" CMD=SINGULARITY_CMD + " " + CMD_ARGS logger.info("+"*75) logger.info(f"Command passed to system:\n\n{CMD}\n") logger.info("+"*75) - + ## there's probably a better way to try / catch the .run() call here try: logger.info('Attempting Run') tractoflow_proc = subprocess.run(CMD, shell=True) + except Exception as e: logger.error(f"TractoFlow run failed to launch with exception: {e}") + # do clean-up + + # check if tractogram is a valid file + trk = nib.streamlines.load(Path(tractoflow_out_dir, 'PFT_Tracking', f"{participant_id}__pft_tracking_prob_wm_seed_0.trk")) + + # if the the last file created (tractogram) can be validly loaded, + # processing is done. + if trk: + + logger.info('Cleaning up symlinks and input directories...') + + # find every file in the output path (this should grab symlinks) + out_files = Path(tractoflow_out_dir).rglob("*") + + # convert symlinks to real files + for out in out_files: + move_target_to_symlink(out) + + # remove tractoflow_input_dir + shutil.rmtree(tractoflow_nxtf_inp) + + # remove tractoflow_work_dir + shutil.rmtree(tractoflow_work_dir) + logger.info('End of TractoFlow run script.') if __name__ == '__main__': ## argparse HELPTEXT = """ - Script to run TractoFlow + Script to run TractoFlow """ ## parse inputs @@ -692,7 +742,7 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, parser.add_argument('--dti_shells', type=str, default=None, help='shell value(s) on which a tensor will be fit', required=False) parser.add_argument('--fodf_shells', type=str, default=None, help='shell value(s) on which the CSD will be fit', required=False) parser.add_argument('--sh_order', type=str, default=None, help='The order of the CSD function to fit', required=False) - + ## extract arguments args = parser.parse_args() global_config_file = args.global_config @@ -711,5 +761,5 @@ def run(participant_id, global_configs, session_id, output_dir, use_bids_filter, # add sub- prefix to participant_id if needed participant_id = participant_id_to_bids_id(participant_id, double_prefix=False) - ## make valid tractoflow call based on inputs + ## make valid tractoflow call based on inputs run(participant_id, global_configs, session_id, output_dir, use_bids_filter, dti_shells, fodf_shells, sh_order)