From 3f8bb8c32cf8f4e8d454958d56248bfba18ec306 Mon Sep 17 00:00:00 2001 From: cmaumet Date: Fri, 13 Dec 2024 14:43:17 +0100 Subject: [PATCH 1/9] print echo times --- narps_open/pipelines/team_98BT.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index 2c1a1911..74ad0dc7 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -151,6 +151,10 @@ def get_fieldmap_info(fieldmap_info_file, magnitude_files): short_echo_time = min(float(fieldmap_info['EchoTime1']), float(fieldmap_info['EchoTime2'])) long_echo_time = max(float(fieldmap_info['EchoTime1']), float(fieldmap_info['EchoTime2'])) + print('Echo times: ------') + print(short_echo_time) + print(long_echo_time) + if short_echo_time == float(fieldmap_info['EchoTime1']): magnitude_file = magnitude_files[0] elif short_echo_time == float(fieldmap_info['EchoTime2']): From 4c8c429f19ba3b7be35f73ca9c12095bf42cdc35 Mon Sep 17 00:00:00 2001 From: cmaumet Date: Fri, 13 Dec 2024 14:49:36 +0100 Subject: [PATCH 2/9] reverse order of anat and other preproc (for testing) --- narps_open/pipelines/team_98BT.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index 74ad0dc7..bba58851 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -337,8 +337,8 @@ def get_preprocessing(self): Returns: a list of nipype.WorkFlow """ return [ - self.get_dartel_template_sub_workflow(), - self.get_preprocessing_sub_workflow() + self.get_preprocessing_sub_workflow(), + self.get_dartel_template_sub_workflow() ] def get_preprocessing_outputs(self): From e1c8726338f083b6f079ae497497ed401292ff36 Mon Sep 17 00:00:00 2001 From: cmaumet Date: Fri, 13 Dec 2024 14:57:29 +0100 Subject: [PATCH 3/9] revert to correct order of anat and other preproc --- narps_open/pipelines/team_98BT.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index bba58851..74ad0dc7 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -337,8 +337,8 @@ def get_preprocessing(self): Returns: a list of nipype.WorkFlow """ return [ - self.get_preprocessing_sub_workflow(), - self.get_dartel_template_sub_workflow() + self.get_dartel_template_sub_workflow(), + self.get_preprocessing_sub_workflow() ] def get_preprocessing_outputs(self): From 3dc5638785b08a03c409993bc2ba5836a39d9407 Mon Sep 17 00:00:00 2001 From: cmaumet Date: Fri, 13 Dec 2024 15:37:16 +0100 Subject: [PATCH 4/9] set a parametric modulation for accept or reject --- narps_open/pipelines/team_98BT.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index 74ad0dc7..bb14a708 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -151,10 +151,6 @@ def get_fieldmap_info(fieldmap_info_file, magnitude_files): short_echo_time = min(float(fieldmap_info['EchoTime1']), float(fieldmap_info['EchoTime2'])) long_echo_time = max(float(fieldmap_info['EchoTime1']), float(fieldmap_info['EchoTime2'])) - print('Echo times: ------') - print(short_echo_time) - print(long_echo_time) - if short_echo_time == float(fieldmap_info['EchoTime1']): magnitude_file = magnitude_files[0] elif short_echo_time == float(fieldmap_info['EchoTime2']): @@ -473,10 +469,14 @@ def get_subject_information(event_file: str, short_run_id: int): durations.append(4.0) weights_gain.append(float(info[2])) weights_loss.append(float(info[3])) - if 'accept' in str(info[5]): + if 'weakly_accept' in str(info[5]): answers.append(1) + elif 'strongly_accept' in str(info[5]): + answers.append(2) + elif 'weakly_reject' in str(info[5]): + answers.append(-1) else: - answers.append(0) + answers.append(-2) # Create Bunch return Bunch( From 9ce943acd395731a862a57222efee8dd7eeb3801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Tue, 17 Dec 2024 09:55:39 +0100 Subject: [PATCH 5/9] Refactoring pipeline --- narps_open/pipelines/team_98BT.py | 421 ++++++++++++++++-------------- 1 file changed, 220 insertions(+), 201 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index bb14a708..e495be82 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -58,60 +58,57 @@ def get_dartel_template_sub_workflow(self): Returns: - dartel : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects + + # Init workflow + dartel_workflow = Workflow( + base_dir = self.directories.working_dir, name = 'dartel_workflow') + + # IDENTITY INTERFACE - To iterate on subjects information_source = Node(IdentityInterface( fields = ['subject_id']), name = 'information_source') information_source.iterables = ('subject_id', self.subject_list) - # SelectFiles Node - to select necessary files + # SELECT FILES - Select anat file template = { 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz') } select_files = Node(SelectFiles(template), name = 'select_files') select_files.inputs.base_directory = self.directories.dataset_dir + dartel_workflow.connect(information_source, 'subject_id', select_files, 'subject_id') - # Gunzip node - SPM do not use .nii.gz files + # GUNZIP - SPM do not use .nii.gz files gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') + dartel_workflow.connect(select_files, 'anat', gunzip_anat, 'in_file') - def get_dartel_input(structural_files): - print(structural_files) - return structural_files - - dartel_input = JoinNode(Function( - function = get_dartel_input, - input_names = ['structural_files'], - output_names = ['structural_files']), - name = 'dartel_input', + # IDENTITY INTERFACE - Join all gunziped files + dartel_inputs = JoinNode(IdentityInterface(fields = ['structural_files']), + name = 'dartel_inputs', joinsource = 'information_source', joinfield = 'structural_files') + dartel_workflow.connect(gunzip_anat, 'out_file', dartel_inputs, 'structural_files') + # RENAME - Rename files before dartel workflow rename_dartel = MapNode(Rename(format_string = 'subject_id_%(subject_id)s_struct'), iterfield = ['in_file', 'subject_id'], name = 'rename_dartel') rename_dartel.inputs.subject_id = self.subject_list rename_dartel.inputs.keep_ext = True + dartel_workflow.connect(dartel_inputs, 'structural_files', rename_dartel) + # DARTEL - Using a already existing workflow dartel_sub_workflow = create_DARTEL_template(name = 'dartel_sub_workflow') dartel_sub_workflow.inputs.inputspec.template_prefix = 'template' + dartel_workflow.connect( + rename_dartel, 'out_file', dartel_sub_workflow, 'inputspec.structural_files') - # DataSink Node - store the wanted results in the wanted repository + # DATASINK - store the wanted results in the wanted repository data_sink = Node(DataSink(), name = 'data_sink') data_sink.inputs.base_directory = self.directories.output_dir - - # Create dartel workflow and connect its nodes - dartel_workflow = Workflow( - base_dir = self.directories.working_dir, name = 'dartel_workflow') - dartel_workflow.connect([ - (information_source, select_files, [('subject_id', 'subject_id')]), - (select_files, gunzip_anat, [('anat', 'in_file')]), - (gunzip_anat, dartel_input, [('out_file', 'structural_files')]), - (dartel_input, rename_dartel, [('structural_files', 'in_file')]), - (rename_dartel, dartel_sub_workflow, [('out_file', 'inputspec.structural_files')]), - (dartel_sub_workflow, data_sink, [ - ('outputspec.template_file', 'dartel_template.@template_file'), - ('outputspec.flow_fields', 'dartel_template.@flow_fields')]) - ]) + dartel_workflow.connect( + dartel_sub_workflow, 'outputspec.template_file', data_sink, 'dartel_template.@template_file') + dartel_workflow.connect( + dartel_sub_workflow, 'outputspec.flow_fields', data_sink, 'dartel_template.@flow_fields') # Remove large files, if requested if Configuration()['pipelines']['remove_unused_data']: @@ -122,12 +119,8 @@ def get_dartel_input(structural_files): input_names = ['_', 'file_name'], output_names = [] ), name = 'remove_gunzip') - - # Add connections - dartel_workflow.connect([ - (gunzip_anat, remove_gunzip, [('out_file', 'file_name')]), - (data_sink, remove_gunzip, [('out_file', '_')]) - ]) + dartel_workflow.connect(gunzip_anat, 'out_file', remove_gunzip, 'file_name') + dartel_workflow.connect(data_sink, 'out_file', remove_gunzip, '_') return dartel_workflow @@ -165,19 +158,26 @@ def get_preprocessing_sub_workflow(self): Returns: - preprocessing : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects - information_source = Node(IdentityInterface( + # Create preprocessing workflow + preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') + + # IDENTITY INTERFACE - To iterate on subjects + information_source_subjects = Node(IdentityInterface( + fields = ['subject_id']), + name = 'information_source_subjects') + information_source_subjects.iterables = ('subject_id', self.subject_list) + + # IDENTITY INTERFACE - To iterate on runs + information_source_runs = Node(IdentityInterface( fields = ['subject_id', 'run_id']), - name = 'information_source') - information_source.iterables = [ - ('subject_id', self.subject_list), ('run_id', self.run_list) - ] + name = 'information_source_runs') + information_source_runs.iterables = ('run_id', self.run_list) + preprocessing.connect( + information_source_runs, 'subject_id', information_source_subjects, 'subject_id') - # SelectFiles Node - Select necessary files + # SELECT FILES - Select necessary subject files templates = { 'anat' : join('sub-{subject_id}', 'anat', 'sub-{subject_id}_T1w.nii.gz'), - 'func' : join('sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz'), 'magnitude' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_magnitude*.nii.gz'), 'phasediff' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.nii.gz'), 'info_fmap' : join('sub-{subject_id}', 'fmap', 'sub-{subject_id}_phasediff.json'), @@ -186,27 +186,51 @@ def get_preprocessing_sub_workflow(self): 'dartel_template' :join( self.directories.output_dir, 'dartel_template', 'template_6.nii') } - select_files = Node(SelectFiles(templates), name = 'select_files') - select_files.inputs.base_directory = self.directories.dataset_dir + select_subject_files = Node(SelectFiles(templates), name = 'select_subject_files') + select_subject_files.inputs.base_directory = self.directories.dataset_dir + preprocessing.connect( + information_source_subjects, 'subject_id', select_subject_files, 'subject_id') - # Gunzip nodes - gunzip files because SPM do not use .nii.gz files + # SELECT FILES - Select necessary run files + templates = { + 'func' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold.nii.gz') + } + select_run_files = Node(SelectFiles(templates), name = 'select_run_files') + select_run_files.inputs.base_directory = self.directories.dataset_dir + preprocessing.connect(information_source, 'subject_id', select_run_files, 'subject_id') + preprocessing.connect(information_source, 'run_id', select_run_files, 'run_id') + + # GUNZIP - gunzip files because SPM do not use .nii.gz files gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') gunzip_func = Node(Gunzip(), name = 'gunzip_func') gunzip_magnitude = Node(Gunzip(), name = 'gunzip_magnitude') gunzip_phasediff = Node(Gunzip(), name = 'gunzip_phasediff') - - # Function Node get_fieldmap_info - + preprocessing.connect(select_subject_files, 'anat', gunzip_anat, 'in_file'), + preprocessing.connect(select_run_files, 'func', gunzip_func, 'in_file') + preprocessing.connect(select_subject_files, 'phasediff', gunzip_phasediff, 'in_file') + + # FUNCTION Node get_fieldmap_info - Retrieve magnitude and phasediff metadata to decide + # which files to use for the fieldmap node, and what echo times fieldmap_info = Node(Function( function = self.get_fieldmap_info, input_names = ['fieldmap_info_file', 'magnitude_files'], output_names = ['echo_times', 'magnitude_file']), name = 'fieldmap_info') + preprocessing.connect( + select_subject_files, 'info_fmap', fieldmap_info, 'fieldmap_info_file') + preprocessing.connect(select_subject_files, 'magnitude', fieldmap_info, 'magnitude_files') + preprocessing.connect(fieldmap_info, 'magnitude_file', gunzip_magnitude, 'in_file') - # FieldMap Node - + # FIELDMAP Node - create fieldmap from phasediff and magnitude files fieldmap = Node(FieldMap(), name = 'fieldmap') fieldmap.inputs.blip_direction = -1 fieldmap.inputs.total_readout_time = TaskInformation()['TotalReadoutTime'] - + preprocessing.connect(fieldmap_info, 'echo_times', fieldmap, 'echo_times') + preprocessing.connect(gunzip_magnitude, 'out_file', fieldmap, 'magnitude_file') + preprocessing.connect(gunzip_phasediff, 'out_file', fieldmap, 'phase_file') + preprocessing.connect(gunzip_func, 'out_file', fieldmap, 'epi_file') + # Get SPM Tissue Probability Maps file spm_tissues_file = join(SPMInfo.getinfo()['path'], 'tpm', 'TPM.nii') @@ -222,6 +246,7 @@ def get_preprocessing_sub_workflow(self): [(spm_tissues_file, 5), 4, (True,False), (False, False)], [(spm_tissues_file, 6), 2, (False,False), (False, False)] ] + preprocessing.connect(gunzip_anat, 'out_file', segmentation, 'channel_files') # Slice timing - SPM slice time correction with default parameters slice_timing = Node(SliceTiming(), name = 'slice_timing') @@ -230,74 +255,62 @@ def get_preprocessing_sub_workflow(self): slice_timing.inputs.slice_order = TaskInformation()['SliceTiming'] slice_timing.inputs.time_acquisition = TaskInformation()['AcquisitionTime'] slice_timing.inputs.time_repetition = TaskInformation()['RepetitionTime'] + preprocessing.connect(gunzip_func, 'out_file', slice_timing, 'in_files') # Motion correction - SPM realign and unwarp motion_correction = Node(RealignUnwarp(), name = 'motion_correction') motion_correction.inputs.interp = 4 - + preprocessing.connect(fieldmap, 'vdm', motion_correction, 'phase_map') + preprocessing.connect(slice_timing, 'timecorrected_files', motion_correction, 'in_files') + # Intrasubject coregistration extract_first = Node(ExtractROI(), name = 'extract_first') extract_first.inputs.t_min = 1 extract_first.inputs.t_size = 1 extract_first.inputs.output_type = 'NIFTI' - + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', extract_first, 'in_file') + preprocessing.connect(extract_first, 'roi_file', coregistration, 'source') + coregistration = Node(Coregister(), name = 'coregistration') coregistration.inputs.cost_function = 'nmi' coregistration.inputs.jobtype = 'estimate' - + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', coregistration, 'apply_to_files') + preprocessing.connect(gunzip_anat, 'out_file', coregistration, 'target') + dartel_norm_func = Node(DARTELNorm2MNI(), name = 'dartel_norm_func') dartel_norm_func.inputs.fwhm = self.fwhm dartel_norm_func.inputs.modulate = False dartel_norm_func.inputs.voxel_size = (2.3, 2.3, 2.15) - + preprocessing.connect( + select_subject_files, 'dartel_flow_field', dartel_norm_func, 'flowfield_files') + preprocessing.connect( + select_subject_files, 'dartel_template', dartel_norm_func, 'template_file') + preprocessing.connect( + coregistration, 'coregistered_files', dartel_norm_func, 'apply_to_files') + dartel_norm_anat = Node(DARTELNorm2MNI(), name = 'dartel_norm_anat') dartel_norm_anat.inputs.fwhm = self.fwhm dartel_norm_anat.inputs.voxel_size = (1, 1, 1) - + preprocessing.connect( + select_subject_files, 'dartel_flow_field', dartel_norm_anat, 'flowfield_files') + preprocessing.connect( + select_subject_files, 'dartel_template', dartel_norm_anat, 'template_file') + preprocessing.connect(gunzip_anat, 'out_file', dartel_norm_anat, 'apply_to_files') + # DataSink Node - store the wanted results in the wanted repository data_sink = Node(DataSink(), name = 'data_sink') data_sink.inputs.base_directory = self.directories.output_dir - - # Create preprocessing workflow and connect its nodes - preprocessing = Workflow(base_dir = self.directories.working_dir, name = 'preprocessing') - preprocessing.connect([ - (information_source, select_files, [ - ('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (select_files, gunzip_anat, [('anat', 'in_file')]), - (select_files, gunzip_func, [('func', 'in_file')]), - (select_files, gunzip_phasediff, [('phasediff', 'in_file')]), - (select_files, fieldmap_info, [ - ('info_fmap', 'fieldmap_info_file'), - ('magnitude', 'magnitude_files')]), - (fieldmap_info, gunzip_magnitude, [('magnitude_file', 'in_file')]), - (fieldmap_info, fieldmap, [('echo_times', 'echo_times')]), - (gunzip_magnitude, fieldmap, [('out_file', 'magnitude_file')]), - (gunzip_phasediff, fieldmap, [('out_file', 'phase_file')]), - (gunzip_func, fieldmap, [('out_file', 'epi_file')]), - (fieldmap, motion_correction, [('vdm', 'phase_map')]), - (gunzip_anat, segmentation, [('out_file', 'channel_files')]), - (gunzip_func, slice_timing, [('out_file', 'in_files')]), - (slice_timing, motion_correction, [('timecorrected_files', 'in_files')]), - (motion_correction, coregistration, [('realigned_unwarped_files', 'apply_to_files')]), - (gunzip_anat, coregistration, [('out_file', 'target')]), - (motion_correction, extract_first, [('realigned_unwarped_files', 'in_file')]), - (extract_first, coregistration, [('roi_file', 'source')]), - (select_files, dartel_norm_func, [ - ('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (select_files, dartel_norm_anat, [ - ('dartel_flow_field', 'flowfield_files'), - ('dartel_template', 'template_file')]), - (gunzip_anat, dartel_norm_anat, [('out_file', 'apply_to_files')]), - (coregistration, dartel_norm_func, [('coregistered_files', 'apply_to_files')]), - (dartel_norm_func, data_sink, [ - ('normalized_files', 'preprocessing.@normalized_files')]), - (motion_correction, data_sink, [ - ('realigned_unwarped_files', 'preprocessing.@motion_corrected'), - ('realignment_parameters', 'preprocessing.@param')]), - (segmentation, data_sink, [('normalized_class_images', 'preprocessing.@seg')]), - ]) + preprocessing.connect( + dartel_norm_func, 'normalized_files', data_sink, 'preprocessing.@normalized_files') + preprocessing.connect( + motion_correction, 'realigned_unwarped_files', + data_sink, 'preprocessing.@motion_corrected') + preprocessing.connect( + motion_correction, 'realignment_parameters', data_sink, 'preprocessing.@param') + preprocessing.connect( + segmentation, 'normalized_class_images', data_sink, 'preprocessing.@seg') # Remove large files, if requested if Configuration()['pipelines']['remove_unused_data']: @@ -312,8 +325,6 @@ def get_preprocessing_sub_workflow(self): input_names = ['_', 'file_name'], output_names = [] ), name = 'remove_after_datasink', iterfield = 'file_name') - - # Add connections preprocessing.connect([ (gunzip_func, merge_removable_files, [('out_file', 'in1')]), (gunzip_phasediff, merge_removable_files, [('out_file', 'in2')]), @@ -506,13 +517,19 @@ def get_subject_level_analysis(self): Returns: - subject_level_analysis : nipype.WorkFlow """ - # Infosource Node - To iterate on subjects + # Init workflow + subject_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis' + ) + + # IDENTITY INTERFACE - To iterate on subjects information_source = Node(IdentityInterface( fields = ['subject_id']), name = 'information_source') information_source.iterables = [('subject_id', self.subject_list)] - # SelectFiles Node - to select necessary files + # SELECT FILES - to select necessary files templates = { 'func' : join(self.directories.output_dir, 'preprocessing', '_run_id_*_subject_id_{subject_id}', @@ -531,20 +548,19 @@ def get_subject_level_analysis(self): } select_files = Node(SelectFiles(templates),name = 'select_files') select_files.inputs.base_directory = self.directories.results_dir + subject_level_analysis.connect( + information_source, 'subject_id', select_files, 'subject_id') - # DataSink Node - store the wanted results in the wanted repository - data_sink = Node(DataSink(), name = 'data_sink') - data_sink.inputs.base_directory = self.directories.output_dir - - # Get Subject Info - get subject specific condition information + # FUNCTION node get_subject_information - get subject specific condition information subject_information = MapNode(Function( function = self.get_subject_information, input_names = ['event_file', 'short_run_id'], output_names = ['subject_info']), name = 'subject_information', iterfield = ['event_file', 'short_run_id']) subject_information.inputs.short_run_id = list(range(1, len(self.run_list) + 1)) - - # Get parameters + subject_level_analysis.connect(select_files, 'events', subject_information, 'event_file') + + # FUNCTION node get_parameters_file - Get subject parameters parameters = MapNode(Function( function = self.get_parameters_file, input_names = [ @@ -560,57 +576,56 @@ def get_subject_level_analysis(self): name = 'parameters') parameters.inputs.run_id = self.run_list parameters.inputs.working_dir = self.directories.working_dir + subject_level_analysis.connect(information_source, 'subject_id', parameters, 'subject_id') + subject_level_analysis.connect(select_files, 'motion_correction', parameters, 'func_file') + subject_level_analysis.connect(select_files, 'param', parameters,'parameters_file') + subject_level_analysis.connect(select_files, 'wc2', parameters,'wc2_file') - # SpecifyModel - Generates SPM-specific Model + # SPECIFY MODEL - Generates SPM-specific Model specify_model = Node(SpecifySPMModel(), name = 'specify_model') specify_model.inputs.concatenate_runs = False specify_model.inputs.input_units = 'secs' specify_model.inputs.output_units = 'secs' specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] specify_model.inputs.high_pass_filter_cutoff = 128 - - # Level1Design - Generates an SPM design matrix + subject_level_analysis.connect(select_files, 'func', specify_model, 'functional_runs') + subject_level_analysis.connect( + subject_information, 'subject_info', specify_model, 'subject_info') + subject_level_analysis.connect( + parameters, 'new_parameters_file', specify_model, 'realignment_parameters') + + # LEVEL1DESIGN - Generates an SPM design matrix model_design = Node(Level1Design(), name = 'model_design') model_design.inputs.bases = {'hrf': {'derivs': [1, 1]}} model_design.inputs.timing_units = 'secs' model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + subject_level_analysis.connect(specify_model, 'session_info', model_design, 'session_info') - # EstimateModel - estimate the parameters of the model + # ESTIMATE MODEL - estimate the parameters of the model model_estimate = Node(EstimateModel(), name = 'model_estimate') model_estimate.inputs.estimation_method = {'Classical': 1} + subject_level_analysis.connect( + model_design, 'spm_mat_file', model_estimate, 'spm_mat_file') - # EstimateContrast - estimates contrasts + # ESTIMATE CONTRAST - estimates contrasts contrast_estimate = Node(EstimateContrast(), name = 'contrast_estimate') contrast_estimate.inputs.contrasts = self.subject_level_contrasts - - # Create l1 analysis workflow and connect its nodes - subject_level_analysis = Workflow( - base_dir = self.directories.working_dir, - name = 'subject_level_analysis' - ) - subject_level_analysis.connect([ - (information_source, select_files, [('subject_id', 'subject_id')]), - (information_source, parameters, [('subject_id', 'subject_id')]), - (select_files, subject_information, [('events', 'event_file')]), - (select_files, parameters, [ - ('motion_correction', 'func_file'), - ('param', 'parameters_file'), - ('wc2', 'wc2_file')]), - (select_files, specify_model, [('func', 'functional_runs')]), - (subject_information, specify_model, [('subject_info', 'subject_info')]), - (parameters, specify_model, [ - ('new_parameters_file', 'realignment_parameters')]), - (specify_model, model_design, [('session_info', 'session_info')]), - (model_design, model_estimate, [('spm_mat_file', 'spm_mat_file')]), - (model_estimate, contrast_estimate, [ - ('spm_mat_file', 'spm_mat_file'), - ('beta_images', 'beta_images'), - ('residual_image', 'residual_image')]), - (contrast_estimate, data_sink, [ - ('con_images', 'subject_level_analysis.@con_images'), - ('spmT_images', 'subject_level_analysis.@spmT_images'), - ('spm_mat_file', 'subject_level_analysis.@spm_mat_file')]) - ]) + subject_level_analysis.connect( + model_estimate, 'spm_mat_file', contrast_estimate, 'spm_mat_file') + subject_level_analysis.connect( + model_estimate, 'beta_images', contrast_estimate, 'beta_images') + subject_level_analysis.connect( + model_estimate, 'residual_image', contrast_estimate,'residual_image') + + # DATASINK - store the wanted results in the wanted repository + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + subject_level_analysis.connect( + contrast_estimate, 'con_images', data_sink, 'subject_level_analysis.@con_images') + subject_level_analysis.connect( + contrast_estimate, 'spmT_images', data_sink, 'subject_level_analysis.@spmT_images') + subject_level_analysis.connect( + contrast_estimate, 'spm_mat_file', data_sink, 'subject_level_analysis.@spm_mat_file') return subject_level_analysis @@ -665,26 +680,28 @@ def get_group_level_analysis_sub_workflow(self, method): # Compute the number of participants used to do the analysis nb_subjects = len(self.subject_list) - # Infosource - a function free node to iterate over the list of subject names + # Init workflow + group_level_analysis = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + + # IDENTITY INTERFACE - iterate over the list of contrasts information_source = Node( IdentityInterface( fields = ['contrast_id', 'subjects']), name = 'information_source') information_source.iterables = [('contrast_id', self.contrast_list)] - # SelectFiles + # SELECT FILES - get contrast files from subject-level output templates = { 'contrasts' : join(self.directories.output_dir, 'subject_level_analysis', '_subject_id_*', 'con_{contrast_id}.nii') } - select_files = Node(SelectFiles(templates), name = 'select_files') select_files.inputs.base_directory = self.directories.results_dir select_files.inputs.force_list = True - - # Datasink - save important files - data_sink = Node(DataSink(), name = 'data_sink') - data_sink.inputs.base_directory = self.directories.output_dir + group_level_analysis.connect( + information_source, 'contrast_id', select_files, 'contrast_id') # Function Node get_equal_range_subjects # Get subjects in the equalRange group and in the subject_list @@ -726,16 +743,23 @@ def get_group_level_analysis_sub_workflow(self, method): ), name = 'get_contrasts', iterfield = 'input_str' ) + group_level_analysis.connect(select_files, 'contrasts', get_contrasts, 'input_str') - # Estimate model + # ESTIMATE MODEL (inputs will be set later, depending on the method) estimate_model = Node(EstimateModel(), name = 'estimate_model') estimate_model.inputs.estimation_method = {'Classical':1} - # Estimate contrasts + # ESTIMATE CONTRAST estimate_contrast = Node(EstimateContrast(), name = 'estimate_contrast') estimate_contrast.inputs.group_contrast = True - - # Create thresholded maps + group_level_analysis.connect( + estimate_model, 'spm_mat_file', estimate_contrast, 'spm_mat_file') + group_level_analysis.connect( + estimate_model, 'residual_image', estimate_contrast, 'residual_image') + group_level_analysis.connect( + estimate_model, 'beta_images', estimate_contrast, 'beta_images') + + # THRESHOLD - Create thresholded maps threshold = MapNode(Threshold(), name = 'threshold', iterfield = ['stat_image', 'contrast_index']) threshold.inputs.use_fwe_correction = False @@ -744,28 +768,8 @@ def get_group_level_analysis_sub_workflow(self, method): threshold.inputs.use_topo_fdr = False threshold.inputs.force_activation = True threshold.synchronize = True - - group_level_analysis = Workflow( - base_dir = self.directories.working_dir, - name = f'group_level_analysis_{method}_nsub_{nb_subjects}') - group_level_analysis.connect([ - (information_source, select_files, [('contrast_id', 'contrast_id')]), - (select_files, get_contrasts, [('contrasts', 'input_str')]), - (estimate_model, estimate_contrast, [ - ('spm_mat_file', 'spm_mat_file'), - ('residual_image', 'residual_image'), - ('beta_images', 'beta_images')]), - (estimate_contrast, threshold, [ - ('spm_mat_file', 'spm_mat_file'), - ('spmT_images', 'stat_image')]), - (threshold, data_sink, [ - ('thresholded_map', f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh')]), - (estimate_model, data_sink, [ - ('mask_image', f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask')]), - (estimate_contrast, data_sink, [ - ('spm_mat_file', f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat'), - ('spmT_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@T'), - ('con_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@con')])]) + group_level_analysis.connect(estimate_contrast, 'spm_mat_file', threshold, 'spm_mat_file') + group_level_analysis.connect(estimate_contrast, 'spmT_images', threshold, 'stat_image') if method in ('equalRange', 'equalIndifference'): estimate_contrast.inputs.contrasts = [ @@ -773,18 +777,15 @@ def get_group_level_analysis_sub_workflow(self, method): ('Group', 'T', ['mean'], [-1]) ] - threshold.inputs.contrast_index = [1, 2] - - # Specify design matrix + # ONE SAMPLE T TEST DESIGN - Create the design matrix one_sample_t_test_design = Node(OneSampleTTestDesign(), name = 'one_sample_t_test_design') + group_level_analysis.connect( + get_contrasts, ('out_list', clean_list), one_sample_t_test_design, 'in_files') + group_level_analysis.connect( + one_sample_t_test_design, 'spm_mat_file', estimate_model, 'spm_mat_file') - group_level_analysis.connect([ - (get_contrasts, one_sample_t_test_design, [ - (('out_list', clean_list), 'in_files') - ]), - (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) + threshold.inputs.contrast_index = [1, 2] if method == 'equalRange': group_level_analysis.connect( @@ -799,12 +800,14 @@ def get_group_level_analysis_sub_workflow(self, method): ) elif method == 'groupComp': + group_level_analysis.connect( + get_equal_range_subjects, ('out_list', complete_subject_ids), + get_contrasts, 'elements') + estimate_contrast.inputs.contrasts = [ ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) ] - threshold.inputs.contrast_index = [1] - # Function Node elements_in_string # Get contrast files for required subjects # Note : using a MapNode with elements_in_string requires using clean_list to remove @@ -816,27 +819,43 @@ def get_group_level_analysis_sub_workflow(self, method): ), name = 'get_contrasts_2', iterfield = 'input_str' ) + group_level_analysis.connect(select_files, 'contrasts', get_contrasts_2, 'input_str') + group_level_analysis.connect( + get_equal_indifference_subjects, ('out_list', complete_subject_ids), + get_contrasts_2, 'elements') - # Node for the design matrix + # TWO SAMPLE T TEST DESIGN - Create the design matrix two_sample_t_test_design = Node(TwoSampleTTestDesign(), name = 'two_sample_t_test_design') + group_level_analysis.connect( + get_contrasts, ('out_list', clean_list), + two_sample_t_test_design, 'group1_files') + group_level_analysis.connect( + get_contrasts_2, ('out_list', clean_list), + two_sample_t_test_design, 'group2_files') + group_level_analysis.connect( + two_sample_t_test_design, 'spm_mat_file', estimate_model, 'spm_mat_file') - group_level_analysis.connect([ - (select_files, get_contrasts_2, [('contrasts', 'input_str')]), - (get_equal_range_subjects, get_contrasts, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_equal_indifference_subjects, get_contrasts_2, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_contrasts, two_sample_t_test_design, [ - (('out_list', clean_list), 'group1_files') - ]), - (get_contrasts_2, two_sample_t_test_design, [ - (('out_list', clean_list), 'group2_files') - ]), - (two_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) + threshold.inputs.contrast_index = [1] + + # Datasink - save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + group_level_analysis.connect( + threshold, 'thresholded_map', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh') + group_level_analysis.connect( + estimate_model, 'mask_image', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask') + group_level_analysis.connect( + estimate_contrast, 'spm_mat_file', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat') + group_level_analysis.connect( + estimate_contrast, 'spmT_images', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@T') + group_level_analysis.connect( + estimate_contrast, 'con_images', + data_sink, f'group_level_analysis_{method}_nsub_{nb_subjects}.@con') return group_level_analysis From e63016450ebec6278fdb6c18cb200aeedaf7fead Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Tue, 17 Dec 2024 10:47:55 +0100 Subject: [PATCH 6/9] [REFAC] Some bug fixes after refac [skip ci] --- narps_open/pipelines/team_98BT.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index e495be82..7d7ae3a5 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -59,7 +59,7 @@ def get_dartel_template_sub_workflow(self): - dartel : nipype.WorkFlow """ - # Init workflow + # Init workflow dartel_workflow = Workflow( base_dir = self.directories.working_dir, name = 'dartel_workflow') @@ -94,7 +94,7 @@ def get_dartel_template_sub_workflow(self): name = 'rename_dartel') rename_dartel.inputs.subject_id = self.subject_list rename_dartel.inputs.keep_ext = True - dartel_workflow.connect(dartel_inputs, 'structural_files', rename_dartel) + dartel_workflow.connect(dartel_inputs, 'structural_files', rename_dartel, 'in_file') # DARTEL - Using a already existing workflow dartel_sub_workflow = create_DARTEL_template(name = 'dartel_sub_workflow') @@ -198,8 +198,8 @@ def get_preprocessing_sub_workflow(self): } select_run_files = Node(SelectFiles(templates), name = 'select_run_files') select_run_files.inputs.base_directory = self.directories.dataset_dir - preprocessing.connect(information_source, 'subject_id', select_run_files, 'subject_id') - preprocessing.connect(information_source, 'run_id', select_run_files, 'run_id') + preprocessing.connect(information_source_runs, 'subject_id', select_run_files, 'subject_id') + preprocessing.connect(information_source_runs, 'run_id', select_run_files, 'run_id') # GUNZIP - gunzip files because SPM do not use .nii.gz files gunzip_anat = Node(Gunzip(), name = 'gunzip_anat') @@ -209,7 +209,7 @@ def get_preprocessing_sub_workflow(self): preprocessing.connect(select_subject_files, 'anat', gunzip_anat, 'in_file'), preprocessing.connect(select_run_files, 'func', gunzip_func, 'in_file') preprocessing.connect(select_subject_files, 'phasediff', gunzip_phasediff, 'in_file') - + # FUNCTION Node get_fieldmap_info - Retrieve magnitude and phasediff metadata to decide # which files to use for the fieldmap node, and what echo times fieldmap_info = Node(Function( @@ -230,7 +230,7 @@ def get_preprocessing_sub_workflow(self): preprocessing.connect(gunzip_magnitude, 'out_file', fieldmap, 'magnitude_file') preprocessing.connect(gunzip_phasediff, 'out_file', fieldmap, 'phase_file') preprocessing.connect(gunzip_func, 'out_file', fieldmap, 'epi_file') - + # Get SPM Tissue Probability Maps file spm_tissues_file = join(SPMInfo.getinfo()['path'], 'tpm', 'TPM.nii') @@ -262,7 +262,7 @@ def get_preprocessing_sub_workflow(self): motion_correction.inputs.interp = 4 preprocessing.connect(fieldmap, 'vdm', motion_correction, 'phase_map') preprocessing.connect(slice_timing, 'timecorrected_files', motion_correction, 'in_files') - + # Intrasubject coregistration extract_first = Node(ExtractROI(), name = 'extract_first') extract_first.inputs.t_min = 1 @@ -270,15 +270,15 @@ def get_preprocessing_sub_workflow(self): extract_first.inputs.output_type = 'NIFTI' preprocessing.connect( motion_correction, 'realigned_unwarped_files', extract_first, 'in_file') - preprocessing.connect(extract_first, 'roi_file', coregistration, 'source') - + coregistration = Node(Coregister(), name = 'coregistration') coregistration.inputs.cost_function = 'nmi' coregistration.inputs.jobtype = 'estimate' preprocessing.connect( motion_correction, 'realigned_unwarped_files', coregistration, 'apply_to_files') preprocessing.connect(gunzip_anat, 'out_file', coregistration, 'target') - + preprocessing.connect(extract_first, 'roi_file', coregistration, 'source') + dartel_norm_func = Node(DARTELNorm2MNI(), name = 'dartel_norm_func') dartel_norm_func.inputs.fwhm = self.fwhm dartel_norm_func.inputs.modulate = False @@ -289,7 +289,7 @@ def get_preprocessing_sub_workflow(self): select_subject_files, 'dartel_template', dartel_norm_func, 'template_file') preprocessing.connect( coregistration, 'coregistered_files', dartel_norm_func, 'apply_to_files') - + dartel_norm_anat = Node(DARTELNorm2MNI(), name = 'dartel_norm_anat') dartel_norm_anat.inputs.fwhm = self.fwhm dartel_norm_anat.inputs.voxel_size = (1, 1, 1) @@ -298,7 +298,7 @@ def get_preprocessing_sub_workflow(self): preprocessing.connect( select_subject_files, 'dartel_template', dartel_norm_anat, 'template_file') preprocessing.connect(gunzip_anat, 'out_file', dartel_norm_anat, 'apply_to_files') - + # DataSink Node - store the wanted results in the wanted repository data_sink = Node(DataSink(), name = 'data_sink') data_sink.inputs.base_directory = self.directories.output_dir @@ -559,7 +559,7 @@ def get_subject_level_analysis(self): name = 'subject_information', iterfield = ['event_file', 'short_run_id']) subject_information.inputs.short_run_id = list(range(1, len(self.run_list) + 1)) subject_level_analysis.connect(select_files, 'events', subject_information, 'event_file') - + # FUNCTION node get_parameters_file - Get subject parameters parameters = MapNode(Function( function = self.get_parameters_file, @@ -593,7 +593,7 @@ def get_subject_level_analysis(self): subject_information, 'subject_info', specify_model, 'subject_info') subject_level_analysis.connect( parameters, 'new_parameters_file', specify_model, 'realignment_parameters') - + # LEVEL1DESIGN - Generates an SPM design matrix model_design = Node(Level1Design(), name = 'model_design') model_design.inputs.bases = {'hrf': {'derivs': [1, 1]}} From f08ba4b14f739a46bfdb00cf147c2028ec385c98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Thu, 19 Dec 2024 16:31:32 +0100 Subject: [PATCH 7/9] Possible bug with contrast naming [skip ci] --- narps_open/pipelines/team_98BT.py | 10 ++++------ tests/pipelines/test_team_98BT.py | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index 7d7ae3a5..6bc9a848 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -38,8 +38,8 @@ def __init__(self): self.contrast_list = ['0001', '0002', '0003', '0004'] # Define contrasts - gain_conditions = [f'gamble_run{r}xgain_run{r}^1' for r in range(1,len(self.run_list) + 1)] - loss_conditions = [f'gamble_run{r}xloss_run{r}^1' for r in range(1,len(self.run_list) + 1)] + gain_conditions = [f'gamble_run{r}xgain^1' for r in range(1,len(self.run_list) + 1)] + loss_conditions = [f'gamble_run{r}xloss^1' for r in range(1,len(self.run_list) + 1)] self.subject_level_contrasts = [ ('pos_gain', 'T', gain_conditions, [1, 1, 1, 1]), ('pos_loss', 'T', loss_conditions, [1, 1, 1, 1]), @@ -498,10 +498,7 @@ def get_subject_information(event_file: str, short_run_id: int): tmod = None, pmod = [ Bunch( - name = [ - f'gain_run{short_run_id}', - f'loss_run{short_run_id}', - f'answers_run{short_run_id}'], + name = ['gain', 'loss', 'answers'], poly = [1, 1, 1], param = [weights_gain, weights_loss, answers] ) @@ -599,6 +596,7 @@ def get_subject_level_analysis(self): model_design.inputs.bases = {'hrf': {'derivs': [1, 1]}} model_design.inputs.timing_units = 'secs' model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + model_design.inputs.volterra_expansion_order = 2 subject_level_analysis.connect(specify_model, 'session_info', model_design, 'session_info') # ESTIMATE MODEL - estimate the parameters of the model diff --git a/tests/pipelines/test_team_98BT.py b/tests/pipelines/test_team_98BT.py index ad09d2bf..6ee646c4 100644 --- a/tests/pipelines/test_team_98BT.py +++ b/tests/pipelines/test_team_98BT.py @@ -135,7 +135,7 @@ def test_subject_information(): assert bunch.regressors is None pmod = bunch.pmod[0] assert isinstance(pmod, Bunch) - assert pmod.name == ['gain_run1', 'loss_run1', 'answers_run1'] + assert pmod.name == ['gain', 'loss', 'answers'] assert pmod.poly == [1, 1, 1] helpers.compare_float_2d_arrays(pmod.param, [ [14.0, 34.0, 38.0, 10.0, 16.0], From 217ee12e685469858b0b38ef7b16cdb35626592c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Fri, 20 Dec 2024 09:26:42 +0100 Subject: [PATCH 8/9] Preprocessing outputs paths [skip ci] --- narps_open/pipelines/team_98BT.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index 6bc9a848..b0f1a664 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -364,9 +364,8 @@ def get_preprocessing_outputs(self): } parameter_sets = product(*parameters.values()) output_dir = join( - self.directories.output_dir, - 'preprocessing', - '_run_id_{run_id}_subject_id_{subject_id}' + self.directories.output_dir, 'preprocessing', + '_subject_id_{subject_id}', '_run_id_{run_id}' ) templates = [ # Realignment parameters @@ -529,16 +528,16 @@ def get_subject_level_analysis(self): # SELECT FILES - to select necessary files templates = { 'func' : join(self.directories.output_dir, - 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', '_run_id_*', 'swuasub-{subject_id}_task-MGT_run-*_bold.nii'), 'motion_correction': join(self.directories.output_dir, - 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', '_run_id_*', 'uasub-{subject_id}_task-MGT_run-*_bold.nii'), 'param' : join(self.directories.output_dir, - 'preprocessing', '_run_id_*_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', '_run_id_*', 'rp_asub-{subject_id}_task-MGT_run-*_bold.txt'), 'wc2' : join(self.directories.output_dir, - 'preprocessing', '_run_id_01_subject_id_{subject_id}', + 'preprocessing', '_subject_id_{subject_id}', '_run_id_01', 'wc2sub-{subject_id}_T1w.nii'), 'events' : join(self.directories.dataset_dir, 'sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-*_events.tsv') From 5d503792c62e288ad36bedeb4203fb3597444a25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Fri, 20 Dec 2024 14:31:20 +0100 Subject: [PATCH 9/9] Remove preprocessing files [skip ci] --- narps_open/pipelines/team_98BT.py | 41 +++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/narps_open/pipelines/team_98BT.py b/narps_open/pipelines/team_98BT.py index b0f1a664..aa4eadd1 100755 --- a/narps_open/pipelines/team_98BT.py +++ b/narps_open/pipelines/team_98BT.py @@ -315,24 +315,39 @@ def get_preprocessing_sub_workflow(self): # Remove large files, if requested if Configuration()['pipelines']['remove_unused_data']: - # Merge Node - Merge file names to be removed after datasink node is performed - merge_removable_files = Node(Merge(5), name = 'merge_removable_files') - merge_removable_files.inputs.ravel_inputs = True + # Merge Node - Merge anat file names to be removed after datasink node is performed + merge_removable_anat_files = Node(Merge(3), name = 'merge_removable_anat_files') + merge_removable_anat_files.inputs.ravel_inputs = True - # Function Nodes remove_files - Remove sizeable files once they aren't needed - remove_after_datasink = MapNode(Function( + # Function Nodes remove_files - Remove sizeable anat files once they aren't needed + remove_anat_after_datasink = MapNode(Function( function = remove_parent_directory, input_names = ['_', 'file_name'], output_names = [] - ), name = 'remove_after_datasink', iterfield = 'file_name') + ), name = 'remove_anat_after_datasink', iterfield = 'file_name') preprocessing.connect([ - (gunzip_func, merge_removable_files, [('out_file', 'in1')]), - (gunzip_phasediff, merge_removable_files, [('out_file', 'in2')]), - (gunzip_magnitude, merge_removable_files, [('out_file', 'in3')]), - (fieldmap, merge_removable_files, [('vdm', 'in4')]), - (slice_timing, merge_removable_files, [('timecorrected_files', 'in5')]), - (merge_removable_files, remove_after_datasink, [('out', 'file_name')]), - (data_sink, remove_after_datasink, [('out_file', '_')]) + (gunzip_phasediff, merge_removable_anat_files, [('out_file', 'in1')]), + (gunzip_magnitude, merge_removable_anat_files, [('out_file', 'in2')]), + (fieldmap, merge_removable_anat_files, [('vdm', 'in3')]), + (merge_removable_anat_files, remove_anat_after_datasink, [('out', 'file_name')]), + (data_sink, remove_anat_after_datasink, [('out_file', '_')]) + ]) + + # Merge Node - Merge func file names to be removed after datasink node is performed + merge_removable_func_files = Node(Merge(2), name = 'merge_removable_func_files') + merge_removable_func_files.inputs.ravel_inputs = True + + # Function Nodes remove_files - Remove sizeable func files once they aren't needed + remove_func_after_datasink = MapNode(Function( + function = remove_parent_directory, + input_names = ['_', 'file_name'], + output_names = [] + ), name = 'remove_func_after_datasink', iterfield = 'file_name') + preprocessing.connect([ + (gunzip_func, merge_removable_func_files, [('out_file', 'in1')]), + (slice_timing, merge_removable_func_files, [('timecorrected_files', 'in2')]), + (merge_removable_func_files, remove_func_after_datasink, [('out', 'file_name')]), + (data_sink, remove_func_after_datasink, [('out_file', '_')]) ]) return preprocessing