From 57fea46e1e43f31302c923dc7bc15c6bad4714a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= <117362283+bclenet@users.noreply.github.com> Date: Fri, 19 Apr 2024 10:06:09 +0200 Subject: [PATCH 1/3] O6R6 reproduction (#192) * First version of O6R6 [skip ci] * Group level of O6R6 * Bug in subject level analysis * Mask input in subject level * Input mask names in subject level analysis * Changes on group level analysis for O6R6 * Remove useless smoothing * Tstat outputs of randomise [skip ci] --- narps_open/pipelines/__init__.py | 2 +- narps_open/pipelines/team_O6R6.py | 727 +++++++++++++++++++++++ narps_open/utils/correlation/__main__.py | 2 +- tests/pipelines/test_team_O6R6.py | 116 ++++ 4 files changed, 845 insertions(+), 2 deletions(-) create mode 100644 narps_open/pipelines/team_O6R6.py create mode 100644 tests/pipelines/test_team_O6R6.py diff --git a/narps_open/pipelines/__init__.py b/narps_open/pipelines/__init__.py index 87a8dc70..348337a2 100644 --- a/narps_open/pipelines/__init__.py +++ b/narps_open/pipelines/__init__.py @@ -59,7 +59,7 @@ 'L9G5': None, 'O03M': None, 'O21U': None, - 'O6R6': None, + 'O6R6': 'PipelineTeamO6R6', 'P5F3': None, 'Q58J': None, 'Q6O0': 'PipelineTeamQ6O0', diff --git a/narps_open/pipelines/team_O6R6.py b/narps_open/pipelines/team_O6R6.py new file mode 100644 index 00000000..ec78a7d8 --- /dev/null +++ b/narps_open/pipelines/team_O6R6.py @@ -0,0 +1,727 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Write the work of NARPS team O6R6 using Nipype """ + +from os.path import join +from itertools import product + +from nipype import Workflow, Node, MapNode +from nipype.interfaces.utility import IdentityInterface, Function, Split +from nipype.interfaces.io import SelectFiles, DataSink +from nipype.interfaces.fsl import ( + IsotropicSmooth, Level1Design, FEATModel, + L2Model, Merge, FLAMEO, FILMGLS, MultipleRegressDesign, + FSLCommand, Randomise + ) +from nipype.algorithms.modelgen import SpecifyModel +from nipype.interfaces.fsl.maths import MultiImageMaths + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines import Pipeline +from narps_open.data.task import TaskInformation +from narps_open.data.participants import get_group +from narps_open.core.common import list_intersection, elements_in_string, clean_list +from narps_open.core.interfaces import InterfaceFactory + +# Setup FSL +FSLCommand.set_default_output_type('NIFTI_GZ') + +class PipelineTeamO6R6(Pipeline): + """ A class that defines the pipeline of team O6R6 """ + + def __init__(self): + super().__init__() + self.team_id = 'O6R6' + self.contrast_list = ['1', '2'] + self.run_level_contrasts = [ + ('effect_of_gain', 'T', ['gain_trial', 'loss_trial'], [1, 0]), + ('effect_of_loss', 'T', ['gain_trial', 'loss_trial'], [0, 1]) + ] + + def get_preprocessing(self): + """ No preprocessing has been done by team O6R6 """ + return None + + def get_subject_information(event_file, group): + """ + Create Bunchs for specifyModel. + + Parameters : + - event_file : str, file corresponding to the run and the subject to analyze + - group : str, the group to which belong the subject + + Returns : + - subject_info : list of Bunch for 1st level analysis. + """ + from nipype.interfaces.base import Bunch + + onsets_trial = [] + durations_trial = [] + weights_trial = [] + onsets_gain_trial = [] + durations_gain_trial = [] + weights_gain_trial = [] + onsets_loss_trial = [] + durations_loss_trial = [] + weights_loss_trial = [] + + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + + if float(info[4]) > 0.0: # Response time exists + onsets_trial.append(float(info[0])) + durations_trial.append(float(info[4])) + weights_trial.append(1.0) + + gain_amount = float(info[2]) + loss_amount = float(info[3]) + gain_weight = 1.0 + loss_weight = 1.0 + if group == 'equalIndifference': + gain_weight = (gain_amount/2) - 4.0 + loss_weight = (loss_amount/2) - 4.0 + elif group == 'equalRange': + gain_weight = gain_amount - 4.0 + loss_weight = loss_amount - 4.0 + + # Classify trial as "gain" or "loss" + if (gain_amount / loss_amount) > 1.93: # "Gain" trial + onsets_gain_trial.append(float(info[0])) + durations_gain_trial.append(float(info[4])) + weights_gain_trial.append(gain_weight) + else: # "Loss" trial + onsets_loss_trial.append(float(info[0])) + durations_loss_trial.append(float(info[4])) + weights_loss_trial.append(loss_weight) + + return [ + Bunch( + conditions = ['trial', 'gain_trial', 'loss_trial'], + onsets = [onsets_trial, onsets_gain_trial, onsets_loss_trial], + durations = [durations_trial, durations_gain_trial, durations_loss_trial], + amplitudes = [weights_trial, weights_gain_trial, weights_loss_trial] + ) + ] + + def get_subject_group(subject_id: str): + """ + Return the group of the subject (either 'equalRange' or 'equalIndifference'). + + Parameters : + - subject_id : str, the subject identifier + + Returns : + - group : str, the group to which belong the subject + """ + from narps_open.data.participants import get_group + + if subject_id in get_group('equalRange'): + return 'equalRange' + return 'equalIndifference' + + def get_run_level_analysis(self): + """ + Create the run level analysis workflow. + + Returns: + - run_level : nipype.WorkFlow + """ + # Create run level analysis workflow and connect its nodes + run_level = Workflow( + base_dir = self.directories.working_dir, + name = 'run_level_analysis' + ) + + # IdentityInterface Node - Iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'run_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list) + ] + + # SelectFiles - Get necessary files + templates = { + 'func' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc.nii.gz'), + 'events' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + run_level.connect(information_source, 'subject_id', select_files, 'subject_id') + run_level.connect(information_source, 'run_id', select_files, 'run_id') + + # Function Node get_subject_group + # This returns the name of the subject's group + subject_group = Node(Function( + function = self.get_subject_group, + input_names = ['subject_id'], + output_names = ['group'] + ), + name = 'subject_group' + ) + run_level.connect(information_source, 'subject_id', subject_group, 'subject_id') + + # Get Subject Info - get subject specific condition information + subject_information = Node(Function( + function = self.get_subject_information, + input_names = ['event_file', 'group'], + output_names = ['subject_info'] + ), name = 'subject_information') + run_level.connect(select_files, 'events', subject_information, 'event_file') + run_level.connect(subject_group, 'group', subject_information, 'group') + + # SpecifyModel Node - Generate run level model + specify_model = Node(SpecifyModel(), name = 'specify_model') + specify_model.inputs.high_pass_filter_cutoff = 100 + specify_model.inputs.input_units = 'secs' + specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] + run_level.connect(select_files, 'func', specify_model, 'functional_runs') + run_level.connect(subject_information, 'subject_info', specify_model, 'subject_info') + + # Level1Design Node - Generate files for run level computation + model_design = Node(Level1Design(), name = 'model_design') + model_design.inputs.bases = {'dgamma' : {'derivs' : True }} + model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + model_design.inputs.model_serial_correlations = True + model_design.inputs.contrasts = self.run_level_contrasts + run_level.connect(specify_model, 'session_info', model_design, 'session_info') + + # FEATModel Node - Generate run level model + model_generation = Node(FEATModel(), name = 'model_generation') + run_level.connect(model_design, 'ev_files', model_generation, 'ev_files') + run_level.connect(model_design, 'fsf_files', model_generation, 'fsf_file') + + # FILMGLS Node - Estimate first level model + model_estimate = Node(FILMGLS(), name='model_estimate') + run_level.connect(select_files, 'func', model_estimate, 'in_file') + run_level.connect(model_generation, 'con_file', model_estimate, 'tcon_file') + run_level.connect(model_generation, 'design_file', model_estimate, 'design_file') + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + run_level.connect(model_estimate, 'results_dir', data_sink, 'run_level_analysis.@results') + run_level.connect( + model_generation, 'design_file', data_sink, 'run_level_analysis.@design_file') + run_level.connect( + model_generation, 'design_image', data_sink, 'run_level_analysis.@design_img') + + return run_level + + def get_run_level_outputs(self): + """ Return the names of the files the run level analysis is supposed to generate. """ + + parameters = { + 'run_id' : self.run_list, + 'subject_id' : self.subject_list, + 'contrast_id' : self.contrast_list, + } + parameter_sets = product(*parameters.values()) + output_dir = join(self.directories.output_dir, + 'run_level_analysis', '_run_id_{run_id}_subject_id_{subject_id}') + templates = [ + join(output_dir, 'results', 'cope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'tstat{contrast_id}.nii.gz'), + join(output_dir, 'results', 'varcope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'zstat{contrast_id}.nii.gz') + ] + return [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets for template in templates] + + def get_subject_level_analysis(self): + """ + Create the subject level analysis workflow. + + Returns: + - subject_level_analysis : nipype.WorkFlow + """ + # Second level (single-subject, mean of all four scans) analysis workflow. + subject_level = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis') + + # Infosource Node - To iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'contrast_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('contrast_id', self.contrast_list) + ] + + # SelectFiles node - to select necessary files + templates = { + 'cope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'cope{contrast_id}.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'varcope{contrast_id}.nii.gz'), + 'masks' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_brainmask.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory= self.directories.dataset_dir + subject_level.connect(information_source, 'subject_id', select_files, 'subject_id') + subject_level.connect(information_source, 'contrast_id', select_files, 'contrast_id') + + # Merge Node - Merge copes files for each subject + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + subject_level.connect(select_files, 'cope', merge_copes, 'in_files') + + # Merge Node - Merge varcopes files for each subject + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + subject_level.connect(select_files, 'varcope', merge_varcopes, 'in_files') + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.run_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + subject_level.connect(select_files, 'masks', split_masks, 'inlist') + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.run_list) - 1) + subject_level.connect(split_masks, 'out1', mask_intersection, 'in_file') + subject_level.connect(split_masks, 'out2', mask_intersection, 'operand_files') + + # L2Model Node - Generate subject specific second level model + generate_model = Node(L2Model(), name = 'generate_model') + generate_model.inputs.num_copes = len(self.run_list) + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + subject_level.connect(mask_intersection, 'out_file', estimate_model, 'mask_file') + subject_level.connect(merge_copes, 'merged_file', estimate_model, 'cope_file') + subject_level.connect(merge_varcopes, 'merged_file', estimate_model, 'var_cope_file') + subject_level.connect(generate_model, 'design_mat', estimate_model, 'design_file') + subject_level.connect(generate_model, 'design_con', estimate_model, 't_con_file') + subject_level.connect(generate_model, 'design_grp', estimate_model, 'cov_split_file') + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + subject_level.connect( + mask_intersection, 'out_file', data_sink, 'subject_level_analysis.@mask') + subject_level.connect(estimate_model, 'zstats', data_sink, 'subject_level_analysis.@stats') + subject_level.connect( + estimate_model, 'tstats', data_sink, 'subject_level_analysis.@tstats') + subject_level.connect(estimate_model, 'copes', data_sink, 'subject_level_analysis.@copes') + subject_level.connect( + estimate_model, 'var_copes', data_sink, 'subject_level_analysis.@varcopes') + + return subject_level + + def get_subject_level_outputs(self): + """ Return the names of the files the subject level analysis is supposed to generate. """ + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + 'file' : ['cope1.nii.gz', 'tstat1.nii.gz', 'varcope1.nii.gz', 'zstat1.nii.gz'] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_{subject_id}','{file}' + ) + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-01_bold_space-MNI152NLin2009cAsym_brainmask_maths.nii.gz' + ) + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_one_sample_t_test_regressors(subject_list: list) -> dict: + """ + Create dictionary of regressors for one sample t-test group analysis. + + Parameters: + - subject_list: ids of subject in the group for which to do the analysis + + Returns: + - dict containing named lists of regressors. + """ + + return dict(group_mean = [1 for _ in subject_list]) + + def get_two_sample_t_test_regressors( + equal_range_ids: list, + equal_indifference_ids: list, + subject_list: list, + ) -> dict: + """ + Create dictionary of regressors for two sample t-test group analysis. + + Parameters: + - equal_range_ids: ids of subjects in equal range group + - equal_indifference_ids: ids of subjects in equal indifference group + - subject_list: ids of subject for which to do the analysis + + Returns: + - regressors, dict: containing named lists of regressors. + - groups, list: group identifiers to distinguish groups in FSL analysis. + """ + + # Create 2 lists containing n_sub values which are + # * 1 if the participant is on the group + # * 0 otherwise + equal_range_regressors = [1 if i in equal_range_ids else 0 for i in subject_list] + equal_indifference_regressors = [ + 1 if i in equal_indifference_ids else 0 for i in subject_list + ] + + # Create regressors output : a dict with the two list + regressors = dict( + equalRange = equal_range_regressors, + equalIndifference = equal_indifference_regressors + ) + + # Create groups outputs : a list with 1 for equalRange subjects and 2 for equalIndifference + groups = [1 if i == 1 else 2 for i in equal_range_regressors] + + return regressors, groups + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level: nipype.WorkFlow + """ + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Declare the workflow + group_level = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + + # Infosource Node - iterate over the contrasts generated by the subject level analysis + information_source = Node(IdentityInterface( + fields = ['contrast_id']), + name = 'information_source') + information_source.iterables = [('contrast_id', self.contrast_list)] + + # SelectFiles Node - select necessary files + templates = { + 'cope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'cope1.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'varcope1.nii.gz'), + 'masks': join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_1_subject_id_*', + 'sub-*_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_brainmask_maths.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.results_dir + group_level.connect(information_source, 'contrast_id', select_files, 'contrast_id') + + # Function Node elements_in_string + # Get contrast of parameter estimates (cope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_copes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_copes', iterfield = 'input_str' + ) + group_level.connect(select_files, 'cope', get_copes, 'input_str') + + # Function Node elements_in_string + # Get variance of the estimated copes (varcope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_varcopes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_varcopes', iterfield = 'input_str' + ) + group_level.connect(select_files, 'varcope', get_varcopes, 'input_str') + + # Merge Node - Merge cope files + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + group_level.connect(get_copes, ('out_list', clean_list), merge_copes, 'in_files') + + # Merge Node - Merge cope files + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + group_level.connect(get_varcopes, ('out_list', clean_list), merge_varcopes, 'in_files') + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.subject_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + group_level.connect(select_files, 'masks', split_masks, 'inlist') + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.subject_list) - 1) + group_level.connect(split_masks, 'out1', mask_intersection, 'in_file') + group_level.connect(split_masks, 'out2', mask_intersection, 'operand_files') + + # MultipleRegressDesign Node - Specify model + specify_model = Node(MultipleRegressDesign(), name = 'specify_model') + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + group_level.connect(mask_intersection, 'out_file', estimate_model, 'mask_file') + group_level.connect(merge_copes, 'merged_file', estimate_model, 'cope_file') + group_level.connect(merge_varcopes, 'merged_file', estimate_model, 'var_cope_file') + group_level.connect(specify_model, 'design_mat', estimate_model, 'design_file') + group_level.connect(specify_model, 'design_con', estimate_model, 't_con_file') + group_level.connect(specify_model, 'design_grp', estimate_model, 'cov_split_file') + + # Randomise Node - Perform clustering on statistical output + randomise = Node(Randomise(), name = 'randomise') + randomise.inputs.tfce = True + randomise.inputs.num_perm = 5000 + randomise.inputs.c_thresh = 0.05 + group_level.connect(mask_intersection, 'out_file', randomise, 'mask') + group_level.connect(merge_copes, 'merged_file', randomise, 'in_file') + group_level.connect(specify_model, 'design_con', randomise, 'tcon') + group_level.connect(specify_model, 'design_mat', randomise, 'design_mat') + + # Datasink Node - Save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + group_level.connect(estimate_model, 'zstats', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@zstats') + group_level.connect(estimate_model, 'tstats', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstats') + group_level.connect(randomise,'t_corrected_p_files', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@t_corrected_p_files') + group_level.connect(randomise,'tstat_files', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstat_files') + + if method in ('equalIndifference', 'equalRange'): + # Setup a one sample t-test + specify_model.inputs.contrasts = [ + ['group_mean', 'T', ['group_mean'], [1]], + ['group_mean_neg', 'T', ['group_mean'], [-1]] + ] + + # Function Node get_group_subjects - Get subjects in the group and in the subject_list + get_group_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_group_subjects' + ) + get_group_subjects.inputs.list_1 = get_group(method) + get_group_subjects.inputs.list_2 = self.subject_list + group_level.connect(get_group_subjects, 'out_list', get_copes, 'elements') + group_level.connect(get_group_subjects, 'out_list', get_varcopes, 'elements') + + # Function Node get_one_sample_t_test_regressors + # Get regressors in the equalRange and equalIndifference method case + regressors_one_sample = Node( + Function( + function = self.get_one_sample_t_test_regressors, + input_names = ['subject_list'], + output_names = ['regressors'] + ), + name = 'regressors_one_sample', + ) + group_level.connect(get_group_subjects, 'out_list', regressors_one_sample, 'subject_list') + group_level.connect(regressors_one_sample, 'regressors', specify_model, 'regressors') + + elif method == 'groupComp': + + # Select copes and varcopes corresponding to the selected subjects + # Indeed the SelectFiles node asks for all (*) subjects available + get_copes.inputs.elements = self.subject_list + get_varcopes.inputs.elements = self.subject_list + + # Setup a two sample t-test + specify_model.inputs.contrasts = [ + ['equalRange_sup', 'T', ['equalRange', 'equalIndifference'], [1, -1]] + ] + + # Function Node get_equal_range_subjects + # Get subjects in the equalRange group and in the subject_list + get_equal_range_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_range_subjects' + ) + get_equal_range_subjects.inputs.list_1 = get_group('equalRange') + get_equal_range_subjects.inputs.list_2 = self.subject_list + + # Function Node get_equal_indifference_subjects + # Get subjects in the equalIndifference group and in the subject_list + get_equal_indifference_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_indifference_subjects' + ) + get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') + get_equal_indifference_subjects.inputs.list_2 = self.subject_list + + # Function Node get_two_sample_t_test_regressors + # Get regressors in the groupComp method case + regressors_two_sample = Node( + Function( + function = self.get_two_sample_t_test_regressors, + input_names = [ + 'equal_range_ids', + 'equal_indifference_ids', + 'subject_list', + ], + output_names = ['regressors', 'groups'] + ), + name = 'regressors_two_sample', + ) + regressors_two_sample.inputs.subject_list = self.subject_list + + # Add missing connections + group_level.connect( + get_equal_range_subjects, 'out_list', regressors_two_sample, 'equal_range_ids') + group_level.connect( + get_equal_indifference_subjects, 'out_list', + regressors_two_sample, 'equal_indifference_ids') + group_level.connect(regressors_two_sample, 'regressors', specify_model, 'regressors') + group_level.connect(regressors_two_sample, 'groups', specify_model, 'groups') + + return group_level + + def get_group_level_outputs(self): + """ Return all names for the files the group level analysis is supposed to generate. """ + + # Handle equalRange and equalIndifference + parameters = { + 'contrast_id': self.contrast_list, + 'method': ['equalRange', 'equalIndifference'], + 'file': [ + 'randomise_tfce_corrp_tstat1.nii.gz', + 'randomise_tfce_corrp_tstat2.nii.gz', + 'tstat1.nii.gz', + 'tstat2.nii.gz', + 'zstat1.nii.gz', + 'zstat2.nii.gz' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_'+f'{len(self.subject_list)}', + '_contrast_id_{contrast_id}', + '{file}' + ) + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + # Handle groupComp + parameters = { + 'contrast_id': self.contrast_list, + 'file': [ + 'randomise_tfce_corrp_tstat1.nii.gz', + 'tstat1.nii.gz', + 'zstat1.nii.gz' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + f'group_level_analysis_groupComp_nsub_{len(self.subject_list)}', + '_contrast_id_{contrast_id}', + '{file}' + ) + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_hypotheses_outputs(self): + """ Return all hypotheses output file names. """ + + nb_sub = len(self.subject_list) + files = [ + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz') + ] + return [join(self.directories.output_dir, f) for f in files] diff --git a/narps_open/utils/correlation/__main__.py b/narps_open/utils/correlation/__main__.py index d086499b..36ff4aa0 100644 --- a/narps_open/utils/correlation/__main__.py +++ b/narps_open/utils/correlation/__main__.py @@ -19,7 +19,7 @@ def main(): parser = ArgumentParser(description = 'Compare reproduced files to original results.') parser.add_argument('-t', '--team', type = str, required = True, help = 'the team ID', choices = get_implemented_pipelines()) - subjects.add_argument('-n', '--nsubjects', type=str, required = True, + parser.add_argument('-n', '--nsubjects', type=int, required = True, help='the number of subjects to be selected') arguments = parser.parse_args() diff --git a/tests/pipelines/test_team_O6R6.py b/tests/pipelines/test_team_O6R6.py new file mode 100644 index 00000000..ef13926b --- /dev/null +++ b/tests/pipelines/test_team_O6R6.py @@ -0,0 +1,116 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_O6R6' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_O6R6.py + pytest -q test_team_O6R6.py -k +""" +from os.path import join, exists, abspath +from filecmp import cmp + +from pytest import helpers, mark +from nipype import Workflow, Node, Function +from nipype.interfaces.base import Bunch + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines.team_O6R6 import PipelineTeamO6R6 + +class TestPipelinesTeamO6R6: + """ A class that contains all the unit tests for the PipelineTeamO6R6 class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeamO6R6 object """ + + pipeline = PipelineTeamO6R6() + + # 1 - check the parameters + assert pipeline.team_id == 'O6R6' + + # 2 - check workflows + assert pipeline.get_preprocessing() is None + assert isinstance(pipeline.get_run_level_analysis(), Workflow) + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeamO6R6 object """ + + pipeline = PipelineTeamO6R6() + + # 1 - 1 subject outputs + pipeline.subject_list = ['001'] + helpers.test_pipeline_outputs(pipeline, [0, 2*1*4*4, 2*4*1 + 2*1, 8*4*2 + 4*4, 18]) + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + helpers.test_pipeline_outputs(pipeline, [0, 2*4*4*4, 2*4*4 + 2*4, 8*4*2 + 4*4, 18]) + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + # Get test files + test_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + + # Prepare several scenarii + info_ei = PipelineTeamO6R6.get_subject_information(test_file, 'equalIndifference') + info_er = PipelineTeamO6R6.get_subject_information(test_file, 'equalRange') + + # Compare bunches to expected + bunch = info_ei[0] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['trial', 'gain_trial', 'loss_trial'] + helpers.compare_float_2d_arrays(bunch.onsets, [ + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834], + [27.535, 36.435] + ]) + helpers.compare_float_2d_arrays(bunch.durations, [ + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289], + [2.08, 2.288] + ]) + helpers.compare_float_2d_arrays(bunch.amplitudes, [ + [1.0, 1.0, 1.0, 1.0], + [3.0, 13.0], + [3.5, 4.5] + ]) + + # Compare bunches to expected + bunch = info_er[0] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['trial', 'gain_trial', 'loss_trial'] + helpers.compare_float_2d_arrays(bunch.onsets, [ + [4.071, 11.834, 27.535, 36.435], + [4.071, 11.834], + [27.535, 36.435] + ]) + helpers.compare_float_2d_arrays(bunch.durations, [ + [2.388, 2.289, 2.08, 2.288], + [2.388, 2.289], + [2.08, 2.288] + ]) + helpers.compare_float_2d_arrays(bunch.amplitudes, [ + [1.0, 1.0, 1.0, 1.0], + [10.0, 30.0], + [11.0, 13.0] + ]) + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeamO6R6 and compare results """ + helpers.test_pipeline_evaluation('O6R6') From f35ea9720f7b86b2e9f3dafdf40271609c23e0fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= <117362283+bclenet@users.noreply.github.com> Date: Mon, 29 Apr 2024 16:04:13 +0200 Subject: [PATCH 2/3] O21U reproduction (#202) * Starting O21U reproduction [skip ci] * Group level outputs * Hypotheses names * Changing unit test for O6R6 * Changing unit test for o21U --- narps_open/pipelines/__init__.py | 2 +- narps_open/pipelines/team_O21U.py | 742 ++++++++++++++++++ tests/pipelines/test_team_O21U.py | 128 +++ tests/pipelines/test_team_O6R6.py | 4 +- .../pipelines/team_O21U/confounds.tsv | 3 + 5 files changed, 876 insertions(+), 3 deletions(-) create mode 100644 narps_open/pipelines/team_O21U.py create mode 100644 tests/pipelines/test_team_O21U.py create mode 100644 tests/test_data/pipelines/team_O21U/confounds.tsv diff --git a/narps_open/pipelines/__init__.py b/narps_open/pipelines/__init__.py index 348337a2..dc1b254f 100644 --- a/narps_open/pipelines/__init__.py +++ b/narps_open/pipelines/__init__.py @@ -58,7 +58,7 @@ 'L7J7': 'PipelineTeamL7J7', 'L9G5': None, 'O03M': None, - 'O21U': None, + 'O21U': 'PipelineTeamO21U', 'O6R6': 'PipelineTeamO6R6', 'P5F3': None, 'Q58J': None, diff --git a/narps_open/pipelines/team_O21U.py b/narps_open/pipelines/team_O21U.py new file mode 100644 index 00000000..6fa70c53 --- /dev/null +++ b/narps_open/pipelines/team_O21U.py @@ -0,0 +1,742 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Write the work of NARPS team O21U using Nipype """ + +from os.path import join +from itertools import product + +from nipype import Workflow, Node, MapNode +from nipype.interfaces.utility import IdentityInterface, Function, Split +from nipype.interfaces.io import SelectFiles, DataSink +from nipype.interfaces.fsl import ( + IsotropicSmooth, Level1Design, FEATModel, + L2Model, Merge, FLAMEO, FILMGLS, MultipleRegressDesign, + FSLCommand, Cluster + ) +from nipype.algorithms.modelgen import SpecifyModel +from nipype.interfaces.fsl.maths import MultiImageMaths + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines import Pipeline +from narps_open.data.task import TaskInformation +from narps_open.data.participants import get_group +from narps_open.core.common import list_intersection, elements_in_string, clean_list +from narps_open.core.interfaces import InterfaceFactory + +# Setup FSL +FSLCommand.set_default_output_type('NIFTI_GZ') + +class PipelineTeamO21U(Pipeline): + """ A class that defines the pipeline of team O21U """ + + def __init__(self): + super().__init__() + self.fwhm = 5.0 + self.team_id = 'O21U' + self.contrast_list = ['1', '2'] + self.run_level_contrasts = [ + ('effect_of_gain', 'T', ['gain', 'loss'], [1, 0]), + ('effect_of_loss', 'T', ['gain', 'loss'], [0, 1]) + ] + + def get_preprocessing(self): + """ No preprocessing has been done by team O21U """ + return None + + def get_subject_information(event_file): + """ + Create Bunchs for specifyModel. + + Parameters : + - event_file : str, file corresponding to the run and the subject to analyze + + Returns : + - subject_info : list of Bunch for 1st level analysis. + """ + from numpy import mean + from nipype.interfaces.base import Bunch + + onsets = [] + durations = [] + amplitudes_trial = [] + amplitudes_gain = [] + amplitudes_loss = [] + + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + onsets.append(float(info[0])) + durations.append(float(info[1])) + amplitudes_trial.append(1.0) + amplitudes_gain.append(float(info[2])) + amplitudes_loss.append(float(info[3])) + + return [ + Bunch( + conditions = ['trial', 'gain', 'loss'], + onsets = [onsets] * 3, + durations = [durations] * 3, + amplitudes = [amplitudes_trial, amplitudes_gain, amplitudes_loss] + ) + ] + + def get_confounds_file(filepath, subject_id, run_id): + """ + Create a tsv file with only desired confounds per subject per run. + + Parameters : + - filepath : path to the subject confounds file (i.e. one per run) + - subject_id : subject for whom the 1st level analysis is made + - run_id: run for which the 1st level analysis is made + + Return : + - confounds_file : paths to new files containing only desired confounds. + """ + from os.path import abspath + + from pandas import read_csv, DataFrame + from numpy import array, transpose + + data_frame = read_csv(filepath, sep = '\t', header=0) + retained_confounds = DataFrame(transpose(array([ + data_frame['FramewiseDisplacement'], data_frame['X'], data_frame['Y'], data_frame['Z'], + data_frame['RotX'], data_frame['RotY'], data_frame['RotZ'] + ]))) + + # Write confounds to a file + confounds_file = abspath(f'confounds_file_sub-{subject_id}_run-{run_id}.tsv') + with open(confounds_file, 'w', encoding = 'utf-8') as writer: + writer.write(retained_confounds.to_csv( + sep = '\t', index = False, header = False, na_rep = '0.0')) + + return confounds_file + + def get_run_level_analysis(self): + """ + Create the run level analysis workflow. + + Returns: + - run_level : nipype.WorkFlow + """ + # Create run level analysis workflow and connect its nodes + run_level = Workflow( + base_dir = self.directories.working_dir, + name = 'run_level_analysis' + ) + + # IdentityInterface Node - Iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'run_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list) + ] + + # SelectFiles - Get necessary files + templates = { + 'func' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc.nii.gz'), + 'mask' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_brainmask.nii.gz'), + 'events' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + run_level.connect(information_source, 'subject_id', select_files, 'subject_id') + run_level.connect(information_source, 'run_id', select_files, 'run_id') + + # MultiImageMaths Node - Apply mask to func + mask_func = Node(MultiImageMaths(), name = 'mask_func') + mask_func.inputs.op_string = '-mul %s ' + run_level.connect(select_files, 'func', mask_func, 'in_file') + run_level.connect(select_files, 'mask', mask_func, 'operand_files') + + # IsotropicSmooth Node - Smoothing data + smoothing_func = Node(IsotropicSmooth(), name = 'smoothing_func') + smoothing_func.inputs.fwhm = self.fwhm + run_level.connect(mask_func, 'out_file', smoothing_func, 'in_file') + + # Get Subject Info - get subject specific condition information + subject_information = Node(Function( + function = self.get_subject_information, + input_names = ['event_file'], + output_names = ['subject_info'] + ), name = 'subject_information') + run_level.connect(select_files, 'events', subject_information, 'event_file') + + # SpecifyModel Node - Generate run level model + specify_model = Node(SpecifyModel(), name = 'specify_model') + specify_model.inputs.high_pass_filter_cutoff = 100 + specify_model.inputs.input_units = 'secs' + specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] + run_level.connect(smoothing_func, 'out_file', specify_model, 'functional_runs') + run_level.connect(subject_information, 'subject_info', specify_model, 'subject_info') + + # Level1Design Node - Generate files for run level computation + model_design = Node(Level1Design(), name = 'model_design') + model_design.inputs.bases = {'dgamma' : {'derivs' : True }} + model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + model_design.inputs.model_serial_correlations = True + model_design.inputs.contrasts = self.run_level_contrasts + run_level.connect(specify_model, 'session_info', model_design, 'session_info') + + # FEATModel Node - Generate run level model + model_generation = Node(FEATModel(), name = 'model_generation') + run_level.connect(model_design, 'ev_files', model_generation, 'ev_files') + run_level.connect(model_design, 'fsf_files', model_generation, 'fsf_file') + + # FILMGLS Node - Estimate first level model + model_estimate = Node(FILMGLS(), name='model_estimate') + run_level.connect(smoothing_func, 'out_file', model_estimate, 'in_file') + run_level.connect(model_generation, 'con_file', model_estimate, 'tcon_file') + run_level.connect(model_generation, 'design_file', model_estimate, 'design_file') + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + run_level.connect(model_estimate, 'results_dir', data_sink, 'run_level_analysis.@results') + run_level.connect( + model_generation, 'design_file', data_sink, 'run_level_analysis.@design_file') + run_level.connect( + model_generation, 'design_image', data_sink, 'run_level_analysis.@design_img') + + # Remove large files, if requested + if Configuration()['pipelines']['remove_unused_data']: + remove_masked = Node( + InterfaceFactory.create('remove_parent_directory'), + name = 'remove_masked') + run_level.connect(smoothing_func, 'out_file', remove_masked, '_') + run_level.connect(mask_func, 'out_file', remove_masked, 'file_name') + + remove_smooth = Node( + InterfaceFactory.create('remove_parent_directory'), + name = 'remove_smooth') + run_level.connect(data_sink, 'out_file', remove_smooth, '_') + run_level.connect(smoothing_func, 'out_file', remove_smooth, 'file_name') + + return run_level + + def get_run_level_outputs(self): + """ Return the names of the files the run level analysis is supposed to generate. """ + + parameters = { + 'run_id' : self.run_list, + 'subject_id' : self.subject_list, + 'contrast_id' : self.contrast_list, + } + parameter_sets = product(*parameters.values()) + output_dir = join(self.directories.output_dir, + 'run_level_analysis', '_run_id_{run_id}_subject_id_{subject_id}') + templates = [ + join(output_dir, 'results', 'cope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'tstat{contrast_id}.nii.gz'), + join(output_dir, 'results', 'varcope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'zstat{contrast_id}.nii.gz') + ] + return [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets for template in templates] + + def get_subject_level_analysis(self): + """ + Create the subject level analysis workflow. + + Returns: + - subject_level_analysis : nipype.WorkFlow + """ + # Second level (single-subject, mean of all four scans) analysis workflow. + subject_level = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis') + + # Infosource Node - To iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'contrast_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('contrast_id', self.contrast_list) + ] + + # SelectFiles node - to select necessary files + templates = { + 'cope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'cope{contrast_id}.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'varcope{contrast_id}.nii.gz'), + 'masks' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_brainmask.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory= self.directories.dataset_dir + subject_level.connect(information_source, 'subject_id', select_files, 'subject_id') + subject_level.connect(information_source, 'contrast_id', select_files, 'contrast_id') + + # Merge Node - Merge copes files for each subject + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + subject_level.connect(select_files, 'cope', merge_copes, 'in_files') + + # Merge Node - Merge varcopes files for each subject + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + subject_level.connect(select_files, 'varcope', merge_varcopes, 'in_files') + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.run_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + subject_level.connect(select_files, 'masks', split_masks, 'inlist') + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.run_list) - 1) + subject_level.connect(split_masks, 'out1', mask_intersection, 'in_file') + subject_level.connect(split_masks, 'out2', mask_intersection, 'operand_files') + + # L2Model Node - Generate subject specific second level model + generate_model = Node(L2Model(), name = 'generate_model') + generate_model.inputs.num_copes = len(self.run_list) + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + subject_level.connect(mask_intersection, 'out_file', estimate_model, 'mask_file') + subject_level.connect(merge_copes, 'merged_file', estimate_model, 'cope_file') + subject_level.connect(merge_varcopes, 'merged_file', estimate_model, 'var_cope_file') + subject_level.connect(generate_model, 'design_mat', estimate_model, 'design_file') + subject_level.connect(generate_model, 'design_con', estimate_model, 't_con_file') + subject_level.connect(generate_model, 'design_grp', estimate_model, 'cov_split_file') + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + subject_level.connect( + mask_intersection, 'out_file', data_sink, 'subject_level_analysis.@mask') + subject_level.connect(estimate_model, 'zstats', data_sink, 'subject_level_analysis.@stats') + subject_level.connect( + estimate_model, 'tstats', data_sink, 'subject_level_analysis.@tstats') + subject_level.connect(estimate_model, 'copes', data_sink, 'subject_level_analysis.@copes') + subject_level.connect( + estimate_model, 'var_copes', data_sink, 'subject_level_analysis.@varcopes') + + return subject_level + + def get_subject_level_outputs(self): + """ Return the names of the files the subject level analysis is supposed to generate. """ + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + 'file' : ['cope1.nii.gz', 'tstat1.nii.gz', 'varcope1.nii.gz', 'zstat1.nii.gz'] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_{subject_id}','{file}' + ) + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-01_bold_space-MNI152NLin2009cAsym_brainmask_maths.nii.gz' + ) + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_one_sample_t_test_regressors(subject_list: list) -> dict: + """ + Create dictionary of regressors for one sample t-test group analysis. + + Parameters: + - subject_list: ids of subject in the group for which to do the analysis + + Returns: + - dict containing named lists of regressors. + """ + + return dict(group_mean = [1 for _ in subject_list]) + + def get_two_sample_t_test_regressors( + equal_range_ids: list, + equal_indifference_ids: list, + subject_list: list, + ) -> dict: + """ + Create dictionary of regressors for two sample t-test group analysis. + + Parameters: + - equal_range_ids: ids of subjects in equal range group + - equal_indifference_ids: ids of subjects in equal indifference group + - subject_list: ids of subject for which to do the analysis + + Returns: + - regressors, dict: containing named lists of regressors. + - groups, list: group identifiers to distinguish groups in FSL analysis. + """ + + # Create 2 lists containing n_sub values which are + # * 1 if the participant is on the group + # * 0 otherwise + equal_range_regressors = [1 if i in equal_range_ids else 0 for i in subject_list] + equal_indifference_regressors = [ + 1 if i in equal_indifference_ids else 0 for i in subject_list + ] + + # Create regressors output : a dict with the two list + regressors = dict( + equalRange = equal_range_regressors, + equalIndifference = equal_indifference_regressors + ) + + # Create groups outputs : a list with 1 for equalRange subjects and 2 for equalIndifference + groups = [1 if i == 1 else 2 for i in equal_range_regressors] + + return regressors, groups + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level: nipype.WorkFlow + """ + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Declare the workflow + group_level = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + + # Infosource Node - iterate over the contrasts generated by the subject level analysis + information_source = Node(IdentityInterface( + fields = ['contrast_id']), + name = 'information_source') + information_source.iterables = [('contrast_id', self.contrast_list)] + + # SelectFiles Node - select necessary files + templates = { + 'cope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'cope1.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'varcope1.nii.gz'), + 'masks': join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_1_subject_id_*', + 'sub-*_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_brainmask_maths.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.results_dir + group_level.connect(information_source, 'contrast_id', select_files, 'contrast_id') + + # Function Node elements_in_string + # Get contrast of parameter estimates (cope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_copes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_copes', iterfield = 'input_str' + ) + group_level.connect(select_files, 'cope', get_copes, 'input_str') + + # Function Node elements_in_string + # Get variance of the estimated copes (varcope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_varcopes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_varcopes', iterfield = 'input_str' + ) + group_level.connect(select_files, 'varcope', get_varcopes, 'input_str') + + # Merge Node - Merge cope files + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + group_level.connect(get_copes, ('out_list', clean_list), merge_copes, 'in_files') + + # Merge Node - Merge cope files + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + group_level.connect(get_varcopes, ('out_list', clean_list), merge_varcopes, 'in_files') + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.subject_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + group_level.connect(select_files, 'masks', split_masks, 'inlist') + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.subject_list) - 1) + group_level.connect(split_masks, 'out1', mask_intersection, 'in_file') + group_level.connect(split_masks, 'out2', mask_intersection, 'operand_files') + + # MultipleRegressDesign Node - Specify model + specify_model = Node(MultipleRegressDesign(), name = 'specify_model') + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + group_level.connect(mask_intersection, 'out_file', estimate_model, 'mask_file') + group_level.connect(merge_copes, 'merged_file', estimate_model, 'cope_file') + group_level.connect(merge_varcopes, 'merged_file', estimate_model, 'var_cope_file') + group_level.connect(specify_model, 'design_mat', estimate_model, 'design_file') + group_level.connect(specify_model, 'design_con', estimate_model, 't_con_file') + group_level.connect(specify_model, 'design_grp', estimate_model, 'cov_split_file') + + # Cluster Node - Perform clustering on statistical output + cluster = MapNode( + Cluster(), + name = 'cluster', + iterfield = ['in_file', 'cope_file'], + synchronize = True + ) + cluster.inputs.threshold = 2.3 + cluster.inputs.out_threshold_file = True + group_level.connect(estimate_model, 'zstats', cluster, 'in_file') + group_level.connect(estimate_model, 'copes', cluster, 'cope_file') + + # Datasink Node - Save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + group_level.connect(estimate_model, 'zstats', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@zstats') + group_level.connect(estimate_model, 'tstats', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstats') + group_level.connect(cluster,'threshold_file', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@threshold_file') + + if method in ('equalIndifference', 'equalRange'): + # Setup a one sample t-test + specify_model.inputs.contrasts = [ + ['group_mean', 'T', ['group_mean'], [1]], + ['group_mean_neg', 'T', ['group_mean'], [-1]] + ] + + # Function Node get_group_subjects - Get subjects in the group and in the subject_list + get_group_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_group_subjects' + ) + get_group_subjects.inputs.list_1 = get_group(method) + get_group_subjects.inputs.list_2 = self.subject_list + group_level.connect(get_group_subjects, 'out_list', get_copes, 'elements') + group_level.connect(get_group_subjects, 'out_list', get_varcopes, 'elements') + + # Function Node get_one_sample_t_test_regressors + # Get regressors in the equalRange and equalIndifference method case + regressors_one_sample = Node( + Function( + function = self.get_one_sample_t_test_regressors, + input_names = ['subject_list'], + output_names = ['regressors'] + ), + name = 'regressors_one_sample', + ) + group_level.connect(get_group_subjects, 'out_list', regressors_one_sample, 'subject_list') + group_level.connect(regressors_one_sample, 'regressors', specify_model, 'regressors') + + elif method == 'groupComp': + + # Select copes and varcopes corresponding to the selected subjects + # Indeed the SelectFiles node asks for all (*) subjects available + get_copes.inputs.elements = self.subject_list + get_varcopes.inputs.elements = self.subject_list + + # Setup a two sample t-test + specify_model.inputs.contrasts = [ + ['equalRange_sup', 'T', ['equalRange', 'equalIndifference'], [1, -1]] + ] + + # Function Node get_equal_range_subjects + # Get subjects in the equalRange group and in the subject_list + get_equal_range_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_range_subjects' + ) + get_equal_range_subjects.inputs.list_1 = get_group('equalRange') + get_equal_range_subjects.inputs.list_2 = self.subject_list + + # Function Node get_equal_indifference_subjects + # Get subjects in the equalIndifference group and in the subject_list + get_equal_indifference_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_indifference_subjects' + ) + get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') + get_equal_indifference_subjects.inputs.list_2 = self.subject_list + + # Function Node get_two_sample_t_test_regressors + # Get regressors in the groupComp method case + regressors_two_sample = Node( + Function( + function = self.get_two_sample_t_test_regressors, + input_names = [ + 'equal_range_ids', + 'equal_indifference_ids', + 'subject_list', + ], + output_names = ['regressors', 'groups'] + ), + name = 'regressors_two_sample', + ) + regressors_two_sample.inputs.subject_list = self.subject_list + + # Add missing connections + group_level.connect( + get_equal_range_subjects, 'out_list', regressors_two_sample, 'equal_range_ids') + group_level.connect( + get_equal_indifference_subjects, 'out_list', + regressors_two_sample, 'equal_indifference_ids') + group_level.connect(regressors_two_sample, 'regressors', specify_model, 'regressors') + group_level.connect(regressors_two_sample, 'groups', specify_model, 'groups') + + return group_level + + def get_group_level_outputs(self): + """ Return all names for the files the group level analysis is supposed to generate. """ + + # Handle equalRange and equalIndifference + parameters = { + 'contrast_id': self.contrast_list, + 'method': ['equalRange', 'equalIndifference'], + 'file': [ + '_cluster0/zstat1_threshold.nii.gz', + '_cluster1/zstat2_threshold.nii.gz', + 'tstat1.nii.gz', + 'tstat2.nii.gz', + 'zstat1.nii.gz', + 'zstat2.nii.gz' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_'+f'{len(self.subject_list)}', + '_contrast_id_{contrast_id}', + '{file}' + ) + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + # Handle groupComp + parameters = { + 'contrast_id': self.contrast_list, + 'file': [ + '_cluster0/zstat1_threshold.nii.gz', + 'tstat1.nii.gz', + 'zstat1.nii.gz' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + f'group_level_analysis_groupComp_nsub_{len(self.subject_list)}', + '_contrast_id_{contrast_id}', + '{file}' + ) + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_hypotheses_outputs(self): + """ Return all hypotheses output file names. """ + + nb_sub = len(self.subject_list) + files = [ + # Hypothesis 1 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + # Hypothesis 2 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + # Hypothesis 3 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + # Hypothesis 4 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat1.nii.gz'), + # Hypothesis 5 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + # Hypothesis 6 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat2.nii.gz'), + # Hypothesis 7 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + # Hypothesis 8 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + # Hypothesis 9 + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz') + ] + return [join(self.directories.output_dir, f) for f in files] diff --git a/tests/pipelines/test_team_O21U.py b/tests/pipelines/test_team_O21U.py new file mode 100644 index 00000000..3d65297a --- /dev/null +++ b/tests/pipelines/test_team_O21U.py @@ -0,0 +1,128 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_O21U' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_O21U.py + pytest -q test_team_O21U.py -k +""" +from os.path import join, exists, abspath +from filecmp import cmp + +from pytest import helpers, mark +from nipype import Workflow, Node, Function +from nipype.interfaces.base import Bunch + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines.team_O21U import PipelineTeamO21U + +class TestPipelinesTeamO21U: + """ A class that contains all the unit tests for the PipelineTeamO21U class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeamO21U object """ + + pipeline = PipelineTeamO21U() + + # 1 - check the parameters + assert pipeline.fwhm == 5.0 + assert pipeline.team_id == 'O21U' + + # 2 - check workflows + assert pipeline.get_preprocessing() is None + assert isinstance(pipeline.get_run_level_analysis(), Workflow) + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeamO21U object """ + + pipeline = PipelineTeamO21U() + + # 1 - 1 subject outputs + pipeline.subject_list = ['001'] + helpers.test_pipeline_outputs(pipeline, [0, 4*1*2*4, 4*2*1 + 2*1, 6*2*2 + 3*2, 18]) + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + helpers.test_pipeline_outputs(pipeline, [0, 4*4*2*4, 4*2*4 + 2*4, 6*2*2 + 3*2, 18]) + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + # Get test files + test_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + + # Prepare several scenarii + info_missed = PipelineTeamO21U.get_subject_information(test_file) + + # Compare bunches to expected + bunch = info_missed[0] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['trial', 'gain', 'loss'] + helpers.compare_float_2d_arrays(bunch.onsets, [ + [4.071, 11.834, 19.535, 27.535, 36.435], + [4.071, 11.834, 19.535, 27.535, 36.435], + [4.071, 11.834, 19.535, 27.535, 36.435] + ]) + helpers.compare_float_2d_arrays(bunch.durations, [ + [4.0, 4.0, 4.0, 4.0, 4.0], + [4.0, 4.0, 4.0, 4.0, 4.0], + [4.0, 4.0, 4.0, 4.0, 4.0] + ]) + helpers.compare_float_2d_arrays(bunch.amplitudes, [ + [1.0, 1.0, 1.0, 1.0, 1.0], + [14.0, 34.0, 38.0, 10.0, 16.0], + [6.0, 14.0, 19.0, 15.0, 17.0] + ]) + + @staticmethod + @mark.unit_test + def test_confounds_file(temporary_data_dir): + """ Test the get_confounds_file method """ + + # Get input and reference output file + confounds_file = abspath(join( + Configuration()['directories']['test_data'], 'pipelines', 'confounds.tsv')) + reference_file = abspath(join( + Configuration()['directories']['test_data'], + 'pipelines', 'team_O21U', 'confounds.tsv')) + + # Create new confounds file + confounds_node = Node(Function( + input_names = ['filepath', 'subject_id', 'run_id'], + output_names = ['confounds_file'], + function = PipelineTeamO21U.get_confounds_file), + name = 'confounds_node') + confounds_node.base_dir = temporary_data_dir + confounds_node.inputs.filepath = confounds_file + confounds_node.inputs.subject_id = 'sid' + confounds_node.inputs.run_id = 'rid' + confounds_node.run() + + # Check confounds file was created + created_confounds_file = abspath(join( + temporary_data_dir, confounds_node.name, 'confounds_file_sub-sid_run-rid.tsv')) + assert exists(created_confounds_file) + + # Check contents + assert cmp(reference_file, created_confounds_file) + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeamO21U and compare results """ + helpers.test_pipeline_evaluation('O21U') diff --git a/tests/pipelines/test_team_O6R6.py b/tests/pipelines/test_team_O6R6.py index ef13926b..bd571d4d 100644 --- a/tests/pipelines/test_team_O6R6.py +++ b/tests/pipelines/test_team_O6R6.py @@ -51,11 +51,11 @@ def test_outputs(): # 1 - 1 subject outputs pipeline.subject_list = ['001'] - helpers.test_pipeline_outputs(pipeline, [0, 2*1*4*4, 2*4*1 + 2*1, 8*4*2 + 4*4, 18]) + helpers.test_pipeline_outputs(pipeline, [0, 2*1*4*4, 2*4*1 + 2*1, 6*2*2 + 3*2, 18]) # 2 - 4 subjects outputs pipeline.subject_list = ['001', '002', '003', '004'] - helpers.test_pipeline_outputs(pipeline, [0, 2*4*4*4, 2*4*4 + 2*4, 8*4*2 + 4*4, 18]) + helpers.test_pipeline_outputs(pipeline, [0, 2*4*4*4, 2*4*4 + 2*4, 6*2*2 + 3*2, 18]) @staticmethod @mark.unit_test diff --git a/tests/test_data/pipelines/team_O21U/confounds.tsv b/tests/test_data/pipelines/team_O21U/confounds.tsv new file mode 100644 index 00000000..04e5f158 --- /dev/null +++ b/tests/test_data/pipelines/team_O21U/confounds.tsv @@ -0,0 +1,3 @@ +0.0 0.0 0.0 0.0 0.0 -0.0 0.0 +0.1352790093099999 -0.00996895 -0.0313444 -3.00931e-06 0.00132687 -0.000384193 -0.00016819 +0.12437666391 -2.56954e-05 -0.00923735 0.0549667 0.000997278 -0.00019745 -0.000398988 From 13c7bc3edf44f3f129f905609d4c2f75af726e4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= <117362283+bclenet@users.noreply.github.com> Date: Thu, 13 Jun 2024 14:38:28 +0200 Subject: [PATCH 3/3] 4TQ6 reproduction (#191) * Repro of 4TQ6 from Elodie's work * 4TQ8 marked as implemented * Last checks on 4TQ6 * [TEST] 4TQ6 --- narps_open/pipelines/__init__.py | 2 +- narps_open/pipelines/team_4TQ6.py | 697 ++++++++++++++++++++++++++ narps_open/pipelines/team_4TQ6_wip.py | 563 --------------------- tests/pipelines/test_team_4TQ6.py | 96 ++++ 4 files changed, 794 insertions(+), 564 deletions(-) create mode 100644 narps_open/pipelines/team_4TQ6.py delete mode 100755 narps_open/pipelines/team_4TQ6_wip.py create mode 100644 tests/pipelines/test_team_4TQ6.py diff --git a/narps_open/pipelines/__init__.py b/narps_open/pipelines/__init__.py index dc1b254f..d6af5905 100644 --- a/narps_open/pipelines/__init__.py +++ b/narps_open/pipelines/__init__.py @@ -27,7 +27,7 @@ '43FJ': None, '46CD': None, '4SZ2': None, - '4TQ6': None, + '4TQ6': 'PipelineTeam4TQ6', '50GV': None, '51PW': 'PipelineTeam51PW', '5G9K': None, diff --git a/narps_open/pipelines/team_4TQ6.py b/narps_open/pipelines/team_4TQ6.py new file mode 100644 index 00000000..222c11cd --- /dev/null +++ b/narps_open/pipelines/team_4TQ6.py @@ -0,0 +1,697 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Write the work of NARPS team 4TQ6 using Nipype """ + +from os.path import join +from itertools import product + +from nipype import Workflow, Node, MapNode +from nipype.interfaces.utility import IdentityInterface, Function, Split +from nipype.interfaces.io import SelectFiles, DataSink +from nipype.interfaces.fsl import ( + IsotropicSmooth, Level1Design, FEATModel, + L2Model, Merge, FLAMEO, FILMGLS, MultipleRegressDesign, + FSLCommand, Randomise + ) +from nipype.algorithms.modelgen import SpecifyModel +from nipype.interfaces.fsl.maths import MultiImageMaths + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines import Pipeline +from narps_open.data.task import TaskInformation +from narps_open.data.participants import get_group +from narps_open.core.common import list_intersection, elements_in_string, clean_list +from narps_open.core.interfaces import InterfaceFactory + +# Setup FSL +FSLCommand.set_default_output_type('NIFTI_GZ') + +class PipelineTeam4TQ6(Pipeline): + """ A class that defines the pipeline of team 4TQ6 """ + + def __init__(self): + super().__init__() + self.fwhm = 6.0 + self.team_id = '4TQ6' + self.contrast_list = ['1', '2'] + self.run_level_contrasts = [ + ('effect_of_gain', 'T', ['trial', 'gain', 'loss'], [0, 1, 0]), + ('effect_of_loss', 'T', ['trial', 'gain', 'loss'], [0, 0, 1]) + ] + + def get_preprocessing(self): + """ No preprocessing has been done by team 4TQ6 """ + return None + + def get_subject_information(event_file): + """ + Create Bunchs for specifyModel. + + Parameters : + - event_file : str, file corresponding to the run and the subject to analyze + + Returns : + - subject_info : list of Bunch for 1st level analysis. + """ + from numpy import mean + from nipype.interfaces.base import Bunch + + onsets = [] + durations = [] + amplitudes_trial = [] + amplitudes_gain = [] + amplitudes_loss = [] + + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + onsets.append(float(info[0])) + durations.append(float(info[1])) + amplitudes_trial.append(1.0) + amplitudes_gain.append(float(info[2])) + amplitudes_loss.append(float(info[3])) + + return [ + Bunch( + conditions = ['trial', 'gain', 'loss'], + onsets = [onsets] * 3, + durations = [durations] * 3, + amplitudes = [amplitudes_trial, amplitudes_gain, amplitudes_loss] + ) + ] + + def get_run_level_analysis(self): + """ + Create the run level analysis workflow. + + Returns: + - run_level : nipype.WorkFlow + """ + # Create run level analysis workflow and connect its nodes + run_level = Workflow( + base_dir = self.directories.working_dir, + name = 'run_level_analysis' + ) + + # IdentityInterface Node - Iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'run_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('run_id', self.run_list) + ] + + # SelectFiles - Get necessary files + templates = { + 'func' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc.nii.gz'), + 'events' : join('sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.dataset_dir + run_level.connect(information_source, 'subject_id', select_files, 'subject_id') + run_level.connect(information_source, 'run_id', select_files, 'run_id') + + # IsotropicSmooth Node - Smoothing data + smoothing_func = Node(IsotropicSmooth(), name = 'smoothing_func') + smoothing_func.inputs.fwhm = self.fwhm + run_level.connect(select_files, 'func', smoothing_func, 'in_file') + + # Get Subject Info - get subject specific condition information + subject_information = Node(Function( + function = self.get_subject_information, + input_names = ['event_file'], + output_names = ['subject_info'] + ), name = 'subject_information') + run_level.connect(select_files, 'events', subject_information, 'event_file') + + # SpecifyModel Node - Generate run level model + specify_model = Node(SpecifyModel(), name = 'specify_model') + specify_model.inputs.high_pass_filter_cutoff = 60 + specify_model.inputs.input_units = 'secs' + specify_model.inputs.time_repetition = TaskInformation()['RepetitionTime'] + run_level.connect(smoothing_func, 'out_file', specify_model, 'functional_runs') + run_level.connect(subject_information, 'subject_info', specify_model, 'subject_info') + + # Level1Design Node - Generate files for run level computation + model_design = Node(Level1Design(), name = 'model_design') + model_design.inputs.bases = {'dgamma' : {'derivs' : True }} + model_design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + model_design.inputs.model_serial_correlations = True + model_design.inputs.contrasts = self.run_level_contrasts + run_level.connect(specify_model, 'session_info', model_design, 'session_info') + + # FEATModel Node - Generate run level model + model_generation = Node(FEATModel(), name = 'model_generation') + run_level.connect(model_design, 'ev_files', model_generation, 'ev_files') + run_level.connect(model_design, 'fsf_files', model_generation, 'fsf_file') + + # FILMGLS Node - Estimate first level model + model_estimate = Node(FILMGLS(), name='model_estimate') + run_level.connect(smoothing_func, 'out_file', model_estimate, 'in_file') + run_level.connect(model_generation, 'con_file', model_estimate, 'tcon_file') + run_level.connect(model_generation, 'design_file', model_estimate, 'design_file') + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + run_level.connect(model_estimate, 'results_dir', data_sink, 'run_level_analysis.@results') + run_level.connect( + model_generation, 'design_file', data_sink, 'run_level_analysis.@design_file') + run_level.connect( + model_generation, 'design_image', data_sink, 'run_level_analysis.@design_img') + + # Remove large files, if requested + if Configuration()['pipelines']['remove_unused_data']: + remove_smooth = Node( + InterfaceFactory.create('remove_parent_directory'), + name = 'remove_smooth') + run_level.connect(data_sink, 'out_file', remove_smooth, '_') + run_level.connect(smoothing_func, 'out_file', remove_smooth, 'file_name') + + return run_level + + def get_run_level_outputs(self): + """ Return the names of the files the run level analysis is supposed to generate. """ + + parameters = { + 'run_id' : self.run_list, + 'subject_id' : self.subject_list, + 'contrast_id' : self.contrast_list, + } + parameter_sets = product(*parameters.values()) + output_dir = join(self.directories.output_dir, + 'run_level_analysis', '_run_id_{run_id}_subject_id_{subject_id}') + templates = [ + join(output_dir, 'results', 'cope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'tstat{contrast_id}.nii.gz'), + join(output_dir, 'results', 'varcope{contrast_id}.nii.gz'), + join(output_dir, 'results', 'zstat{contrast_id}.nii.gz') + ] + return [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets for template in templates] + + def get_subject_level_analysis(self): + """ + Create the subject level analysis workflow. + + Returns: + - subject_level_analysis : nipype.WorkFlow + """ + # Second level (single-subject, mean of all four scans) analysis workflow. + subject_level = Workflow( + base_dir = self.directories.working_dir, + name = 'subject_level_analysis') + + # Infosource Node - To iterate on subject and runs + information_source = Node(IdentityInterface( + fields = ['subject_id', 'contrast_id']), + name = 'information_source') + information_source.iterables = [ + ('subject_id', self.subject_list), + ('contrast_id', self.contrast_list) + ] + + # SelectFiles node - to select necessary files + templates = { + 'cope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'cope{contrast_id}.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'run_level_analysis', '_run_id_*_subject_id_{subject_id}', 'results', + 'varcope{contrast_id}.nii.gz'), + 'masks' : join('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', + 'sub-{subject_id}_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_brainmask.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory= self.directories.dataset_dir + subject_level.connect(information_source, 'subject_id', select_files, 'subject_id') + subject_level.connect(information_source, 'contrast_id', select_files, 'contrast_id') + + # Merge Node - Merge copes files for each subject + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + subject_level.connect(select_files, 'cope', merge_copes, 'in_files') + + # Merge Node - Merge varcopes files for each subject + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + subject_level.connect(select_files, 'varcope', merge_varcopes, 'in_files') + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.run_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + subject_level.connect(select_files, 'masks', split_masks, 'inlist') + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.run_list) - 1) + subject_level.connect(split_masks, 'out1', mask_intersection, 'in_file') + subject_level.connect(split_masks, 'out2', mask_intersection, 'operand_files') + + # L2Model Node - Generate subject specific second level model + generate_model = Node(L2Model(), name = 'generate_model') + generate_model.inputs.num_copes = len(self.run_list) + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + subject_level.connect(mask_intersection, 'out_file', estimate_model, 'mask_file') + subject_level.connect(merge_copes, 'merged_file', estimate_model, 'cope_file') + subject_level.connect(merge_varcopes, 'merged_file', estimate_model, 'var_cope_file') + subject_level.connect(generate_model, 'design_mat', estimate_model, 'design_file') + subject_level.connect(generate_model, 'design_con', estimate_model, 't_con_file') + subject_level.connect(generate_model, 'design_grp', estimate_model, 'cov_split_file') + + # DataSink Node - store the wanted results in the wanted directory + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + subject_level.connect( + mask_intersection, 'out_file', data_sink, 'subject_level_analysis.@mask') + subject_level.connect(estimate_model, 'zstats', data_sink, 'subject_level_analysis.@stats') + subject_level.connect( + estimate_model, 'tstats', data_sink, 'subject_level_analysis.@tstats') + subject_level.connect(estimate_model, 'copes', data_sink, 'subject_level_analysis.@copes') + subject_level.connect( + estimate_model, 'var_copes', data_sink, 'subject_level_analysis.@varcopes') + + return subject_level + + def get_subject_level_outputs(self): + """ Return the names of the files the subject level analysis is supposed to generate. """ + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + 'file' : ['cope1.nii.gz', 'tstat1.nii.gz', 'varcope1.nii.gz', 'zstat1.nii.gz'] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_{subject_id}','{file}' + ) + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + parameters = { + 'contrast_id' : self.contrast_list, + 'subject_id' : self.subject_list, + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_{subject_id}', + 'sub-{subject_id}_task-MGT_run-01_bold_space-MNI152NLin2009cAsym_brainmask_maths.nii.gz' + ) + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_one_sample_t_test_regressors(subject_list: list) -> dict: + """ + Create dictionary of regressors for one sample t-test group analysis. + + Parameters: + - subject_list: ids of subject in the group for which to do the analysis + + Returns: + - dict containing named lists of regressors. + """ + + return dict(group_mean = [1 for _ in subject_list]) + + def get_two_sample_t_test_regressors( + equal_range_ids: list, + equal_indifference_ids: list, + subject_list: list, + ) -> dict: + """ + Create dictionary of regressors for two sample t-test group analysis. + + Parameters: + - equal_range_ids: ids of subjects in equal range group + - equal_indifference_ids: ids of subjects in equal indifference group + - subject_list: ids of subject for which to do the analysis + + Returns: + - regressors, dict: containing named lists of regressors. + - groups, list: group identifiers to distinguish groups in FSL analysis. + """ + + # Create 2 lists containing n_sub values which are + # * 1 if the participant is on the group + # * 0 otherwise + equal_range_regressors = [1 if i in equal_range_ids else 0 for i in subject_list] + equal_indifference_regressors = [ + 1 if i in equal_indifference_ids else 0 for i in subject_list + ] + + # Create regressors output : a dict with the two list + regressors = dict( + equalRange = equal_range_regressors, + equalIndifference = equal_indifference_regressors + ) + + # Create groups outputs : a list with 1 for equalRange subjects and 2 for equalIndifference + groups = [1 if i == 1 else 2 for i in equal_range_regressors] + + return regressors, groups + + def get_group_level_analysis(self): + """ + Return all workflows for the group level analysis. + + Returns; + - a list of nipype.WorkFlow + """ + + methods = ['equalRange', 'equalIndifference', 'groupComp'] + return [self.get_group_level_analysis_sub_workflow(method) for method in methods] + + def get_group_level_analysis_sub_workflow(self, method): + """ + Return a workflow for the group level analysis. + + Parameters: + - method: one of 'equalRange', 'equalIndifference' or 'groupComp' + + Returns: + - group_level: nipype.WorkFlow + """ + # Compute the number of participants used to do the analysis + nb_subjects = len(self.subject_list) + + # Declare the workflow + group_level = Workflow( + base_dir = self.directories.working_dir, + name = f'group_level_analysis_{method}_nsub_{nb_subjects}') + + # Infosource Node - iterate over the contrasts generated by the subject level analysis + information_source = Node(IdentityInterface( + fields = ['contrast_id']), + name = 'information_source') + information_source.iterables = [('contrast_id', self.contrast_list)] + + # SelectFiles Node - select necessary files + templates = { + 'cope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'cope1.nii.gz'), + 'varcope' : join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_{contrast_id}_subject_id_*', + 'varcope1.nii.gz'), + 'masks': join(self.directories.output_dir, + 'subject_level_analysis', '_contrast_id_1_subject_id_*', + 'sub-*_task-MGT_run-*_bold_space-MNI152NLin2009cAsym_brainmask_maths.nii.gz') + } + select_files = Node(SelectFiles(templates), name = 'select_files') + select_files.inputs.base_directory = self.directories.results_dir + group_level.connect(information_source, 'contrast_id', select_files, 'contrast_id') + + # Function Node elements_in_string + # Get contrast of parameter estimates (cope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_copes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_copes', iterfield = 'input_str' + ) + group_level.connect(select_files, 'cope', get_copes, 'input_str') + + # Function Node elements_in_string + # Get variance of the estimated copes (varcope) for these subjects + # Note : using a MapNode with elements_in_string requires using clean_list to remove + # None values from the out_list + get_varcopes = MapNode(Function( + function = elements_in_string, + input_names = ['input_str', 'elements'], + output_names = ['out_list'] + ), + name = 'get_varcopes', iterfield = 'input_str' + ) + group_level.connect(select_files, 'varcope', get_varcopes, 'input_str') + + # Merge Node - Merge cope files + merge_copes = Node(Merge(), name = 'merge_copes') + merge_copes.inputs.dimension = 't' + group_level.connect(get_copes, ('out_list', clean_list), merge_copes, 'in_files') + + # Merge Node - Merge cope files + merge_varcopes = Node(Merge(), name = 'merge_varcopes') + merge_varcopes.inputs.dimension = 't' + group_level.connect(get_varcopes, ('out_list', clean_list), merge_varcopes, 'in_files') + + # Split Node - Split mask list to serve them as inputs of the MultiImageMaths node. + split_masks = Node(Split(), name = 'split_masks') + split_masks.inputs.splits = [1, len(self.subject_list) - 1] + split_masks.inputs.squeeze = True # Unfold one-element splits removing the list + group_level.connect(select_files, 'masks', split_masks, 'inlist') + + # MultiImageMaths Node - Create a subject mask by + # computing the intersection of all run masks. + mask_intersection = Node(MultiImageMaths(), name = 'mask_intersection') + mask_intersection.inputs.op_string = '-mul %s ' * (len(self.subject_list) - 1) + group_level.connect(split_masks, 'out1', mask_intersection, 'in_file') + group_level.connect(split_masks, 'out2', mask_intersection, 'operand_files') + + # MultipleRegressDesign Node - Specify model + specify_model = Node(MultipleRegressDesign(), name = 'specify_model') + + # FLAMEO Node - Estimate model + estimate_model = Node(FLAMEO(), name = 'estimate_model') + estimate_model.inputs.run_mode = 'flame1' + group_level.connect(mask_intersection, 'out_file', estimate_model, 'mask_file') + group_level.connect(merge_copes, 'merged_file', estimate_model, 'cope_file') + group_level.connect(merge_varcopes, 'merged_file', estimate_model, 'var_cope_file') + group_level.connect(specify_model, 'design_mat', estimate_model, 'design_file') + group_level.connect(specify_model, 'design_con', estimate_model, 't_con_file') + group_level.connect(specify_model, 'design_grp', estimate_model, 'cov_split_file') + + # Randomise Node - Perform clustering on statistical output + randomise = Node(Randomise(), name = 'randomise') + randomise.inputs.tfce = True + randomise.inputs.vox_p_values = True + randomise.inputs.num_perm = 5000 + randomise.inputs.c_thresh = 0.05 + randomise.inputs.tfce_E = 0.01 + group_level.connect(mask_intersection, 'out_file', randomise, 'mask') + group_level.connect(merge_copes, 'merged_file', randomise, 'in_file') + group_level.connect(specify_model, 'design_con', randomise, 'tcon') + group_level.connect(specify_model, 'design_mat', randomise, 'design_mat') + + # Datasink Node - Save important files + data_sink = Node(DataSink(), name = 'data_sink') + data_sink.inputs.base_directory = self.directories.output_dir + group_level.connect(estimate_model, 'zstats', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@zstats') + group_level.connect(estimate_model, 'tstats', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@tstats') + group_level.connect(randomise,'t_corrected_p_files', data_sink, + f'group_level_analysis_{method}_nsub_{nb_subjects}.@t_corrected_p_files') + + if method in ('equalIndifference', 'equalRange'): + # Setup a one sample t-test + specify_model.inputs.contrasts = [ + ['group_mean', 'T', ['group_mean'], [1]], + ['group_mean_neg', 'T', ['group_mean'], [-1]] + ] + + # Function Node get_group_subjects - Get subjects in the group and in the subject_list + get_group_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_group_subjects' + ) + get_group_subjects.inputs.list_1 = get_group(method) + get_group_subjects.inputs.list_2 = self.subject_list + group_level.connect(get_group_subjects, 'out_list', get_copes, 'elements') + group_level.connect(get_group_subjects, 'out_list', get_varcopes, 'elements') + + # Function Node get_one_sample_t_test_regressors + # Get regressors in the equalRange and equalIndifference method case + regressors_one_sample = Node( + Function( + function = self.get_one_sample_t_test_regressors, + input_names = ['subject_list'], + output_names = ['regressors'] + ), + name = 'regressors_one_sample', + ) + group_level.connect(get_group_subjects, 'out_list', regressors_one_sample, 'subject_list') + group_level.connect(regressors_one_sample, 'regressors', specify_model, 'regressors') + + elif method == 'groupComp': + + # Select copes and varcopes corresponding to the selected subjects + # Indeed the SelectFiles node asks for all (*) subjects available + get_copes.inputs.elements = self.subject_list + get_varcopes.inputs.elements = self.subject_list + + # Setup a two sample t-test + specify_model.inputs.contrasts = [ + ['equalRange_sup', 'T', ['equalRange', 'equalIndifference'], [1, -1]] + ] + + # Function Node get_equal_range_subjects + # Get subjects in the equalRange group and in the subject_list + get_equal_range_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_range_subjects' + ) + get_equal_range_subjects.inputs.list_1 = get_group('equalRange') + get_equal_range_subjects.inputs.list_2 = self.subject_list + + # Function Node get_equal_indifference_subjects + # Get subjects in the equalIndifference group and in the subject_list + get_equal_indifference_subjects = Node(Function( + function = list_intersection, + input_names = ['list_1', 'list_2'], + output_names = ['out_list'] + ), + name = 'get_equal_indifference_subjects' + ) + get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') + get_equal_indifference_subjects.inputs.list_2 = self.subject_list + + # Function Node get_two_sample_t_test_regressors + # Get regressors in the groupComp method case + regressors_two_sample = Node( + Function( + function = self.get_two_sample_t_test_regressors, + input_names = [ + 'equal_range_ids', + 'equal_indifference_ids', + 'subject_list', + ], + output_names = ['regressors', 'groups'] + ), + name = 'regressors_two_sample', + ) + regressors_two_sample.inputs.subject_list = self.subject_list + + # Add missing connections + group_level.connect( + get_equal_range_subjects, 'out_list', regressors_two_sample, 'equal_range_ids') + group_level.connect( + get_equal_indifference_subjects, 'out_list', + regressors_two_sample, 'equal_indifference_ids') + group_level.connect(regressors_two_sample, 'regressors', specify_model, 'regressors') + group_level.connect(regressors_two_sample, 'groups', specify_model, 'groups') + + return group_level + + def get_group_level_outputs(self): + """ Return all names for the files the group level analysis is supposed to generate. """ + + # Handle equalRange and equalIndifference + parameters = { + 'contrast_id': self.contrast_list, + 'method': ['equalRange', 'equalIndifference'], + 'file': [ + 'randomise_tfce_corrp_tstat1.nii.gz', + 'randomise_tfce_corrp_tstat2.nii.gz', + 'tstat1.nii.gz', + 'tstat2.nii.gz', + 'zstat1.nii.gz', + 'zstat2.nii.gz' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + 'group_level_analysis_{method}_nsub_'+f'{len(self.subject_list)}', + '_contrast_id_{contrast_id}', + '{file}' + ) + return_list = [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + # Handle groupComp + parameters = { + 'contrast_id': self.contrast_list, + 'file': [ + 'randomise_tfce_corrp_tstat1.nii.gz', + 'tstat1.nii.gz', + 'zstat1.nii.gz' + ] + } + parameter_sets = product(*parameters.values()) + template = join( + self.directories.output_dir, + f'group_level_analysis_groupComp_nsub_{len(self.subject_list)}', + '_contrast_id_{contrast_id}', + '{file}' + ) + return_list += [template.format(**dict(zip(parameters.keys(), parameter_values)))\ + for parameter_values in parameter_sets] + + return return_list + + def get_hypotheses_outputs(self): + """ Return all hypotheses output file names. """ + + nb_sub = len(self.subject_list) + files = [ + # Hypothesis 1 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat2.nii.gz'), + # Hypothesis 2 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat2.nii.gz'), + # Hypothesis 3 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_1', 'zstat2.nii.gz'), + # Hypothesis 4 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'randomise_tfce_corrp_tstat2'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_1', 'zstat2.nii.gz'), + # Hypothesis 5 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + # Hypothesis 6 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + # Hypothesis 7 + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalIndifference_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + # Hypothesis 8 + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_equalRange_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz'), + # Hypothesis 9 + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'randomise_tfce_corrp_tstat1'), + join(f'group_level_analysis_groupComp_nsub_{nb_sub}', + '_contrast_id_2', 'zstat1.nii.gz') + ] + return [join(self.directories.output_dir, f) for f in files] diff --git a/narps_open/pipelines/team_4TQ6_wip.py b/narps_open/pipelines/team_4TQ6_wip.py deleted file mode 100755 index 28e9aada..00000000 --- a/narps_open/pipelines/team_4TQ6_wip.py +++ /dev/null @@ -1,563 +0,0 @@ -# pylint: skip-file -from nipype.interfaces.fsl import (BET, ICA_AROMA, FAST, MCFLIRT, FLIRT, FNIRT, ApplyWarp, SUSAN, - Info, ImageMaths, IsotropicSmooth, Threshold, Level1Design, FEATModel, - L2Model, Merge, FLAMEO, ContrastMgr,Cluster, FILMGLS, Randomise, MultipleRegressDesign) -from nipype.algorithms.modelgen import SpecifyModel - -from niflow.nipype1.workflows.fmri.fsl import create_susan_smooth -from nipype.interfaces.utility import IdentityInterface, Function -from nipype.interfaces.io import SelectFiles, DataSink -from nipype.algorithms.misc import Gunzip -from nipype import Workflow, Node, MapNode -from nipype.interfaces.base import Bunch - -from os.path import join as opj -import os - -def get_session_infos(event_file): - ''' - Create Bunchs for specifyModel. - - Parameters : - - event_file : str, file corresponding to the run and the subject to analyze - - Returns : - - subject_info : list of Bunch for 1st level analysis. - ''' - from os.path import join as opj - from nipype.interfaces.base import Bunch - import numpy as np - - cond_names = ['trial', 'gain', 'loss'] - - onset = {} - duration = {} - amplitude = {} - - for c in cond_names: # For each condition. - onset.update({c : []}) # creates dictionary items with empty lists - duration.update({c : []}) - amplitude.update({c : []}) - - with open(event_file, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - # Creates list with onsets, duration and loss/gain for amplitude (FSL) - for c in cond_names: - if c == 'gain': - onset[c].append(float(info[0])) - duration[c].append(float(info[4])) - amplitude[c].append(float(info[2])) - elif c == 'loss': - onset[c].append(float(info[0])) - duration[c].append(float(info[4])) - amplitude[c].append(float(info[3])) - elif c == 'trial': - onset[c].append(float(info[0])) - duration[c].append(float(info[4])) - amplitude[c].append(float(1)) - - - - subject_info = [] - - subject_info.append(Bunch(conditions=cond_names, - onsets=[onset[k] for k in cond_names], - durations=[duration[k] for k in cond_names], - amplitudes=[amplitude[k] for k in cond_names], - regressor_names=None, - regressors=None)) - - return subject_info - -# Linear contrast effects: 'Gain' vs. baseline, 'Loss' vs. baseline. -def get_contrasts(subject_id): - ''' - Create the list of tuples that represents contrasts. - Each contrast is in the form : - (Name,Stat,[list of condition names],[weights on those conditions]) - - Parameters: - - subject_id: str, ID of the subject - - Returns: - - contrasts: list of tuples, list of contrasts to analyze - ''' - # list of condition names - conditions = ['trial', 'gain', 'loss'] - - # create contrasts - gain = ('gain', 'T', conditions, [0, 1, 0]) - - loss = ('loss', 'T', conditions, [0, 0, 1]) - - # contrast list - contrasts = [gain, loss] - - return contrasts - - -def rm_smoothed_files(files, subject_id, run_id, result_dir, working_dir): - import shutil - from os.path import join as opj - - smooth_dir = opj(result_dir, working_dir, 'l1_analysis', f"_run_id_{run_id}_subject_id_{subject_id}", 'smooth') - - try: - shutil.rmtree(smooth_dir) - except OSError as e: - print(e) - else: - print("The directory is deleted successfully") - - -def get_l1_analysis(subject_list, run_list, TR, fwhm, exp_dir, output_dir, working_dir, result_dir): - """ - Returns the first level analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the analysis - - run_list: list of str, list of runs for which you want to do the analysis - - fwhm: float, fwhm for smoothing step - - TR: float, time repetition used during acquisition - - Returns: - - l1_analysis : Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource = Node(IdentityInterface(fields = ['subject_id', 'run_id']), name = 'infosource') - infosource.iterables = [('subject_id', subject_list), - ('run_id', run_list)] - - # Templates to select files node - func_file = opj('derivatives', 'fmriprep', 'sub-{subject_id}', 'func', - 'sub-{subject_id}_task-MGT_run-{run_id}_bold_space-MNI152NLin2009cAsym_preproc.nii.gz') - - event_file = opj('sub-{subject_id}', 'func', 'sub-{subject_id}_task-MGT_run-{run_id}_events.tsv') - - template = {'func' : func_file, 'event' : event_file} - - # SelectFiles node - to select necessary files - selectfiles = Node(SelectFiles(template, base_directory=exp_dir), name = 'selectfiles') - - # DataSink Node - store the wanted results in the wanted repository - datasink = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink') - - ## Skullstripping - skullstrip = Node(BET(frac = 0.1, functional = True, mask = True), name = 'skullstrip') - - ## Smoothing - smooth = Node(IsotropicSmooth(fwhm = 6), name = 'smooth') - - # Node contrasts to get contrasts - contrasts = Node(Function(function=get_contrasts, - input_names=['subject_id'], - output_names=['contrasts']), - name='contrasts') - - # Get Subject Info - get subject specific condition information - get_subject_infos = Node(Function(input_names=['event_file'], - output_names=['subject_info'], - function=get_session_infos), - name='get_subject_infos') - - specify_model = Node(SpecifyModel(high_pass_filter_cutoff = 60, - input_units = 'secs', - time_repetition = TR), name = 'specify_model') - - # First temporal derivatives of the two regressors were also used, - # along with temporal filtering (60 s) of all the independent variable time-series. - # No motion parameter regressors used. - - l1_design = Node(Level1Design(bases = {'dgamma':{'derivs' : True}}, - interscan_interval = TR, - model_serial_correlations = True), name = 'l1_design') - - model_generation = Node(FEATModel(), name = 'model_generation') - - model_estimate = Node(FILMGLS(), name='model_estimate') - - remove_smooth = Node(Function(input_names = ['subject_id', 'run_id', 'files', 'result_dir', 'working_dir'], - function = rm_smoothed_files), name = 'remove_smooth') - - remove_smooth.inputs.result_dir = result_dir - remove_smooth.inputs.working_dir = working_dir - - # Create l1 analysis workflow and connect its nodes - l1_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l1_analysis") - - l1_analysis.connect([(infosource, selectfiles, [('subject_id', 'subject_id'), - ('run_id', 'run_id')]), - (selectfiles, get_subject_infos, [('event', 'event_file')]), - (infosource, contrasts, [('subject_id', 'subject_id')]), - (selectfiles, skullstrip, [('func', 'in_file')]), - (skullstrip, smooth, [('out_file', 'in_file')]), - (smooth, specify_model, [('out_file', 'functional_runs')]), - (get_subject_infos, specify_model, [('subject_info', 'subject_info')]), - (contrasts, l1_design, [('contrasts', 'contrasts')]), - (specify_model, l1_design, [('session_info', 'session_info')]), - (l1_design, model_generation, [('ev_files', 'ev_files'), ('fsf_files', 'fsf_file')]), - (smooth, model_estimate, [('out_file', 'in_file')]), - (model_generation, model_estimate, [('con_file', 'tcon_file'), - ('design_file', 'design_file')]), - (infosource, remove_smooth, [('subject_id', 'subject_id'), ('run_id', 'run_id')]), - (model_estimate, remove_smooth, [('results_dir', 'files')]), - (model_estimate, datasink, [('results_dir', 'l1_analysis.@results')]), - (model_generation, datasink, [('design_file', 'l1_analysis.@design_file'), - ('design_image', 'l1_analysis.@design_img')]), - (skullstrip, datasink, [('mask_file', 'l1_analysis.@skullstriped')]) - ]) - - return l1_analysis - -def get_l2_analysis(subject_list, contrast_list, run_list, exp_dir, output_dir, working_dir, result_dir, data_dir): - """ - Returns the 2nd level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - contrast_list: list of str, list of contrasts to analyze - - run_list: list of str, list of runs for which you want to do the analysis - - - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource_2ndlevel = Node(IdentityInterface(fields=['subject_id', 'contrast_id']), name='infosource_2ndlevel') - infosource_2ndlevel.iterables = [('subject_id', subject_list), ('contrast_id', contrast_list)] - - # Templates to select files node - copes_file = opj(output_dir, 'l1_analysis', '_run_id_*_subject_id_{subject_id}', 'results', - 'cope{contrast_id}.nii.gz') - - varcopes_file = opj(output_dir, 'l1_analysis', '_run_id_*_subject_id_{subject_id}', 'results', - 'varcope{contrast_id}.nii.gz') - - mask_file = opj(data_dir, 'NARPS-4TQ6', 'hypo1_unthresh.nii.gz') - - template = {'cope' : copes_file, 'varcope' : varcopes_file, 'mask':mask_file} - - # SelectFiles node - to select necessary files - selectfiles_2ndlevel = Node(SelectFiles(template, base_directory=result_dir), name = 'selectfiles_2ndlevel') - - datasink_2ndlevel = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink_2ndlevel') - - # Generate design matrix - specify_model_2ndlevel = Node(L2Model(num_copes = len(run_list)), name='l2model_2ndlevel') - - # Merge copes and varcopes files for each subject - merge_copes_2ndlevel = Node(Merge(dimension='t'), name='merge_copes_2ndlevel') - - merge_varcopes_2ndlevel = Node(Merge(dimension='t'), name='merge_varcopes_2ndlevel') - - # Second level (single-subject, mean of all four scans) analyses: Fixed effects analysis. - flame = Node(FLAMEO(run_mode = 'flame1'), - name='flameo') - - l2_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = "l2_analysis") - - l2_analysis.connect([(infosource_2ndlevel, selectfiles_2ndlevel, [('subject_id', 'subject_id'), - ('contrast_id', 'contrast_id')]), - (selectfiles_2ndlevel, merge_copes_2ndlevel, [('cope', 'in_files')]), - (selectfiles_2ndlevel, merge_varcopes_2ndlevel, [('varcope', 'in_files')]), - (selectfiles_2ndlevel, flame, [('mask', 'mask_file')]), - (merge_copes_2ndlevel, flame, [('merged_file', 'cope_file')]), - (merge_varcopes_2ndlevel, flame, [('merged_file', 'var_cope_file')]), - (specify_model_2ndlevel, flame, [('design_mat', 'design_file'), - ('design_con', 't_con_file'), - ('design_grp', 'cov_split_file')]), - (flame, datasink_2ndlevel, [('zstats', 'l2_analysis.@stats'), - ('tstats', 'l2_analysis.@tstats'), - ('copes', 'l2_analysis.@copes'), - ('var_copes', 'l2_analysis.@varcopes')])]) - - return l2_analysis - -def get_subgroups_contrasts(copes, varcopes, subject_ids, participants_file): - ''' - Parameters : - - copes: original file list selected by selectfiles node - - varcopes: original file list selected by selectfiles node - - subject_ids: list of subject IDs that are analyzed - - participants_file: str, file containing participants characteristics - - This function return the file list containing only the files belonging to subject in the wanted group. - ''' - - from os.path import join as opj - - equalRange_id = [] - equalIndifference_id = [] - - subject_list = ['sub-' + str(i) for i in subject_ids] - - with open(participants_file, 'rt') as f: - next(f) # skip the header - - for line in f: - info = line.strip().split() - - if info[0] in subject_list and info[1] == "equalIndifference": - equalIndifference_id.append(info[0][-3:]) - elif info[0] in subject_list and info[1] == "equalRange": - equalRange_id.append(info[0][-3:]) - - copes_equalIndifference = [] - copes_equalRange = [] - copes_global = [] - varcopes_equalIndifference = [] - varcopes_equalRange = [] - varcopes_global = [] - - for file in copes: - sub_id = file.split('/') - if sub_id[-2][-3:] in equalIndifference_id: - copes_equalIndifference.append(file) - elif sub_id[-2][-3:] in equalRange_id: - copes_equalRange.append(file) - if sub_id[-2][-3:] in subject_ids: - copes_global.append(file) - - for file in varcopes: - sub_id = file.split('/') - if sub_id[-2][-3:] in equalIndifference_id: - varcopes_equalIndifference.append(file) - elif sub_id[-2][-3:] in equalRange_id: - varcopes_equalRange.append(file) - if sub_id[-2][-3:] in subject_ids: - varcopes_global.append(file) - - print('ER', copes_equalRange, 'EI',copes_equalIndifference) - - return copes_equalIndifference, copes_equalRange, varcopes_equalIndifference, varcopes_equalRange, equalIndifference_id, equalRange_id, copes_global, varcopes_global - -def get_regs(equalRange_id, equalIndifference_id, method, subject_list): - """ - Create dictionary of regressors for group analysis. - - Parameters: - - equalRange_id: list of str, ids of subjects in equal range group - - equalIndifference_id: list of str, ids of subjects in equal indifference group - - method: one of "equalRange", "equalIndifference" or "groupComp" - - subject_list: list of str, ids of subject for which to do the analysis - - Returns: - - regressors: dict, dictionary of regressors used to distinguish groups in FSL group analysis - """ - if method == "equalRange": - regressors = dict(group_mean = [1 for i in range(len(equalRange_id))]) - - elif method == "equalIndifference": - regressors = dict(group_mean = [1 for i in range(len(equalIndifference_id))]) - - elif method == "groupComp": - equalRange_reg = [1 for i in range(len(equalRange_id) + len(equalIndifference_id))] - equalIndifference_reg = [0 for i in range(len(equalRange_id) + len(equalIndifference_id))] - - for i, sub_id in enumerate(subject_list): - if sub_id in equalIndifference_id: - index = i - equalIndifference_reg[index] = 1 - equalRange_reg[index] = 0 - - regressors = dict(equalRange = equalRange_reg, - equalIndifference = equalIndifference_reg) - - return regressors - -def get_group_workflow(subject_list, n_sub, contrast_list, method, exp_dir, output_dir, - working_dir, result_dir, data_dir): - """ - Returns the group level of analysis workflow. - - Parameters: - - exp_dir: str, directory where raw data are stored - - result_dir: str, directory where results will be stored - - working_dir: str, name of the sub-directory for intermediate results - - output_dir: str, name of the sub-directory for final results - - subject_list: list of str, list of subject for which you want to do the preprocessing - - contrast_list: list of str, list of contrasts to analyze - - method: one of "equalRange", "equalIndifference" or "groupComp" - - n_sub: int, number of subjects - - Returns: - - l2_analysis: Nipype WorkFlow - """ - # Infosource Node - To iterate on subject and runs - infosource_3rdlevel = Node(IdentityInterface(fields = ['contrast_id', 'exp_dir', 'result_dir', - 'output_dir', 'working_dir', 'subject_list', 'method'], - exp_dir = exp_dir, result_dir = result_dir, - output_dir = output_dir, working_dir = working_dir, - subject_list = subject_list, method = method), - name = 'infosource_3rdlevel') - infosource_3rdlevel.iterables = [('contrast_id', contrast_list)] - - # Templates to select files node - copes_file = opj(output_dir, 'l2_analysis', '_contrast_id_{contrast_id}_subject_id_*', - 'cope1.nii.gz') - - varcopes_file = opj(output_dir, 'l2_analysis', '_contrast_id_{contrast_id}_subject_id_*', - 'varcope1.nii.gz') - - participants_file = opj(exp_dir, 'participants.tsv') - - mask_file = opj(data_dir, 'NARPS-4TQ6', 'hypo1_unthresh.nii.gz') - - template = {'cope' : copes_file, 'varcope' : varcopes_file, 'participants' : participants_file, - 'mask' : mask_file} - - # SelectFiles node - to select necessary files - selectfiles_3rdlevel = Node(SelectFiles(template, base_directory=result_dir), name = 'selectfiles_3rdlevel') - - datasink_3rdlevel = Node(DataSink(base_directory=result_dir, container=output_dir), name='datasink_3rdlevel') - - merge_copes_3rdlevel = Node(Merge(dimension = 't'), name = 'merge_copes_3rdlevel') - merge_varcopes_3rdlevel = Node(Merge(dimension = 't'), name = 'merge_varcopes_3rdlevel') - - subgroups_contrasts = Node(Function(input_names = ['copes', 'varcopes', 'subject_ids', 'participants_file'], - output_names = ['copes_equalIndifference', 'copes_equalRange', - 'varcopes_equalIndifference', 'varcopes_equalRange', - 'equalIndifference_id', 'equalRange_id', 'copes_global', - 'varcopes_global'], - function = get_subgroups_contrasts), - name = 'subgroups_contrasts') - - specifymodel_3rdlevel = Node(MultipleRegressDesign(), name = 'specifymodel_3rdlevel') - - flame_3rdlevel = Node(FLAMEO(run_mode = 'flame1'), - name='flame_3rdlevel') - - regs = Node(Function(input_names = ['equalRange_id', 'equalIndifference_id', 'method', 'subject_list'], - output_names = ['regressors'], - function = get_regs), name = 'regs') - regs.inputs.method = method - regs.inputs.subject_list = subject_list - - randomise = Node(Randomise(num_perm = 5000, tfce=True, vox_p_values=True, c_thresh=0.05, tfce_E=0.01), - name = "randomise") - - l3_analysis = Workflow(base_dir = opj(result_dir, working_dir), name = f"l3_analysis_{method}_nsub_{n_sub}") - - l3_analysis.connect([(infosource_3rdlevel, selectfiles_3rdlevel, [('contrast_id', 'contrast_id')]), - (infosource_3rdlevel, subgroups_contrasts, [('subject_list', 'subject_ids')]), - (selectfiles_3rdlevel, subgroups_contrasts, [('cope', 'copes'), ('varcope', 'varcopes'), - ('participants', 'participants_file')]), - (selectfiles_3rdlevel, flame_3rdlevel, [('mask', 'mask_file')]), - (selectfiles_3rdlevel, randomise, [('mask', 'mask')]), - (subgroups_contrasts, regs, [('equalRange_id', 'equalRange_id'), - ('equalIndifference_id', 'equalIndifference_id')]), - (regs, specifymodel_3rdlevel, [('regressors', 'regressors')])]) - - - if method == 'equalIndifference' or method == 'equalRange': - specifymodel_3rdlevel.inputs.contrasts = [["group_mean", "T", ["group_mean"], [1]], ["group_mean_neg", "T", ["group_mean"], [-1]]] - - if method == 'equalIndifference': - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, - [('copes_equalIndifference', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, - [('varcopes_equalIndifference', 'in_files')])]) - elif method == 'equalRange': - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, [('copes_equalRange', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, [('varcopes_equalRange', 'in_files')])]) - - elif method == "groupComp": - specifymodel_3rdlevel.inputs.contrasts = [["equalRange_sup", "T", ["equalRange", "equalIndifference"], - [1, -1]]] - - l3_analysis.connect([(subgroups_contrasts, merge_copes_3rdlevel, [('copes_global', 'in_files')]), - (subgroups_contrasts, merge_varcopes_3rdlevel, [('varcopes_global', 'in_files')])]) - - l3_analysis.connect([(merge_copes_3rdlevel, flame_3rdlevel, [('merged_file', 'cope_file')]), - (merge_varcopes_3rdlevel, flame_3rdlevel, [('merged_file', 'var_cope_file')]), - (specifymodel_3rdlevel, flame_3rdlevel, [('design_mat', 'design_file'), - ('design_con', 't_con_file'), - ('design_grp', 'cov_split_file')]), - (merge_copes_3rdlevel, randomise, [('merged_file', 'in_file')]), - (specifymodel_3rdlevel, randomise, [('design_mat', 'design_mat'), - ('design_con', 'tcon')]), - (randomise, datasink_3rdlevel, [('t_corrected_p_files', - f"l3_analysis_{method}_nsub_{n_sub}.@tcorpfile"), - ('tstat_files', f"l3_analysis_{method}_nsub_{n_sub}.@tstat")]), - (flame_3rdlevel, datasink_3rdlevel, [('zstats', - f"l3_analysis_{method}_nsub_{n_sub}.@zstats"), - ('tstats', - f"l3_analysis_{method}_nsub_{n_sub}.@tstats")]), - ]) - - return l3_analysis - -def reorganize_results(result_dir, output_dir, n_sub, team_ID): - """ - Reorganize the results to analyze them. - - Parameters: - - result_dir: str, directory where results will be stored - - output_dir: str, name of the sub-directory for final results - - n_sub: float, number of subject used for the analysis - - team_ID: str, ID of the team to reorganize results - - """ - from os.path import join as opj - import os - import shutil - import gzip - import nibabel as nib - import numpy as np - - h1 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1') - h2 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1') - h3 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_1') - h4 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_1') - h5 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_2') - h6 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_2') - h7 = opj(result_dir, output_dir, f"l3_analysis_equalIndifference_nsub_{n_sub}", '_contrast_id_2') - h8 = opj(result_dir, output_dir, f"l3_analysis_equalRange_nsub_{n_sub}", '_contrast_id_2') - h9 = opj(result_dir, output_dir, f"l3_analysis_groupComp_nsub_{n_sub}", '_contrast_id_2') - - h = [h1, h2, h3, h4, h5, h6, h7, h8, h9] - - repro_unthresh = [opj(filename, "zstat2.nii.gz") if i in [4, 5] else opj(filename, "zstat1.nii.gz") for i, filename in enumerate(h)] - - repro_thresh = [opj(filename, "randomise_tfce_corrp_tstat2.nii.gz") if i in [4, 5] else opj(filename, 'randomise_tfce_corrp_tstat1.nii.gz') for i, filename in enumerate(h)] - - if not os.path.isdir(opj(result_dir, "NARPS-reproduction")): - os.mkdir(opj(result_dir, "NARPS-reproduction")) - - for i, filename in enumerate(repro_unthresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_unthresholded.nii.gz") - shutil.copyfile(f_in, f_out) - - for i, filename in enumerate(repro_thresh): - f_in = filename - f_out = opj(result_dir, "NARPS-reproduction", f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_thresholded.nii.gz") - shutil.copyfile(f_in, f_out) - - for i, filename in enumerate(repro_thresh): - f_in = filename - img = nib.load(filename) - original_affine = img.affine.copy() - spm = nib.load(repro_unthresh[i]) - new_img = img.get_fdata().astype(float) > 0 - new_img = new_img.astype(float) - print(np.unique(new_img)) - print(np.unique(spm.get_fdata())) - new_img = new_img * spm.get_fdata() - print(np.unique(new_img)) - new_spm = nib.Nifti1Image(new_img, original_affine) - nib.save(new_spm, opj(result_dir, "NARPS-reproduction", - f"team_{team_ID}_nsub_{n_sub}_hypo{i+1}_thresholded.nii.gz")) - - print(f"Results files of team {team_ID} reorganized.") diff --git a/tests/pipelines/test_team_4TQ6.py b/tests/pipelines/test_team_4TQ6.py new file mode 100644 index 00000000..f0e2d6f6 --- /dev/null +++ b/tests/pipelines/test_team_4TQ6.py @@ -0,0 +1,96 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_4TQ6' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_4TQ6.py + pytest -q test_team_4TQ6.py -k +""" +from os.path import join, exists, abspath +from filecmp import cmp + +from pytest import helpers, mark +from nipype import Workflow, Node, Function +from nipype.interfaces.base import Bunch + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines.team_4TQ6 import PipelineTeam4TQ6 + +class TestPipelinesTeam4TQ6: + """ A class that contains all the unit tests for the PipelineTeam4TQ6 class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeam4TQ6 object """ + + pipeline = PipelineTeam4TQ6() + + # 1 - check the parameters + assert pipeline.fwhm == 6.0 + assert pipeline.team_id == '4TQ6' + + # 2 - check workflows + assert pipeline.get_preprocessing() is None + assert isinstance(pipeline.get_run_level_analysis(), Workflow) + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow) + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeam4TQ6 object """ + + pipeline = PipelineTeam4TQ6() + + # 1 - 1 subject outputs + pipeline.subject_list = ['001'] + helpers.test_pipeline_outputs(pipeline, [0, 4*1*2*4, 4*2*1 + 2*1, 6*2*2 + 3*2, 18]) + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + helpers.test_pipeline_outputs(pipeline, [0, 4*4*2*4, 4*2*4 + 2*4, 6*2*2 + 3*2, 18]) + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + # Get test files + test_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + + # Prepare several scenarii + info_missed = PipelineTeam4TQ6.get_subject_information(test_file) + + # Compare bunches to expected + bunch = info_missed[0] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['trial', 'gain', 'loss'] + helpers.compare_float_2d_arrays(bunch.onsets, [ + [4.071, 11.834, 19.535, 27.535, 36.435], + [4.071, 11.834, 19.535, 27.535, 36.435], + [4.071, 11.834, 19.535, 27.535, 36.435] + ]) + helpers.compare_float_2d_arrays(bunch.durations, [ + [4.0, 4.0, 4.0, 4.0, 4.0], + [4.0, 4.0, 4.0, 4.0, 4.0], + [4.0, 4.0, 4.0, 4.0, 4.0] + ]) + helpers.compare_float_2d_arrays(bunch.amplitudes, [ + [1.0, 1.0, 1.0, 1.0, 1.0], + [14.0, 34.0, 38.0, 10.0, 16.0], + [6.0, 14.0, 19.0, 15.0, 17.0] + ]) + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeam4TQ6 and compare results """ + helpers.test_pipeline_evaluation('4TQ6')