From c7035ee985dc608afa4c14b3d067812a6cbec7e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Cl=C3=A9net?= Date: Tue, 23 Jan 2024 17:48:53 +0100 Subject: [PATCH] First level of U26C --- narps_open/pipelines/team_U26C.py | 343 +++--------------- tests/pipelines/test_team_U26C.py | 151 ++++++++ .../pipelines/team_U26C/confounds.tsv | 3 + 3 files changed, 210 insertions(+), 287 deletions(-) create mode 100644 tests/pipelines/test_team_U26C.py create mode 100644 tests/test_data/pipelines/team_U26C/confounds.tsv diff --git a/narps_open/pipelines/team_U26C.py b/narps_open/pipelines/team_U26C.py index a6f434f8..58c1d2bf 100755 --- a/narps_open/pipelines/team_U26C.py +++ b/narps_open/pipelines/team_U26C.py @@ -50,85 +50,64 @@ def get_run_level_analysis(self): # @staticmethod # Starting python 3.10, staticmethod should be used here # Otherwise it produces a TypeError: 'staticmethod' object is not callable - def get_subject_information(event_files: list, model: str): + def get_subject_information(event_files: list): """ Create Bunchs for SpecifySPMModel. Parameters : - event_files: list of str, list of events files (one per run) for the subject - - model: str, either 'gain' or 'loss' Returns : - subject_information : list of Bunch for 1st level analysis. """ - '''Picks onsets and durations per condition and adds them to lists. - This function specifically picks onsets for the speech vs speaker - where the presentation is clear or in noise. - The function accepts event files. - - 'subject_id' is a string, i.e., sub-001 - ''' + from nipype.interfaces.base import Bunch - cond_names = ['gamble'] - onset = {} - duration = {} + onsets = {} + durations = {} weights_gain = {} weights_loss = {} - runs = ['01', '02', '03', '04'] - - for r in range(len(runs)): # Loop over number of runs. - onset.update({s + '_run' + str(r+1): [] for s in cond_names}) - duration.update({s + '_run' + str(r+1): [] for s in cond_names}) - weights_gain.update({'gain_run' + str(r+1): []}) - weights_loss.update({'loss_run' + str(r+1): []}) - - base_name = '/data/pt_nmc002/other/narps/event_tsvs/' - # subject_id = 'sub-001' - for ir, run in enumerate(runs): - f_events = base_name + subject_id + \ - '_task-MGT_run-' + runs[ir] + '_events.tsv' - with open(f_events, 'rt') as f: - next(f) # skip the header - for line in f: - info = line.strip().split() - for cond in cond_names: - val = cond + '_run' + str(ir+1) - val_gain = 'gain_run' + str(ir+1) - val_loss = 'loss_run' + str(ir+1) - onset[val].append(float(info[0])) - duration[val].append(float(info[1])) - weights_gain[val_gain].append(float(info[2])) - weights_loss[val_loss].append(float(info[3])) - # if cond == 'gain': - # weights[val].append(float(info[2])) - # elif cond == 'loss': - # weights[val].append(float(info[3])) - # elif cond == 'task-activ': - # weights[val].append(float(1)) - from nipype.interfaces.base import Bunch - # Bunching is done per run, i.e. cond1_run1, cond2_run1, etc. - subjectinfo = [] - for r in range(len(runs)): - - cond = [c + '_run' + str(r+1) for c in cond_names] - gain = 'gain_run' + str(r+1) - loss = 'loss_run' + str(r+1) - - subjectinfo.insert(r, - Bunch(conditions=cond, - onsets=[onset[k] for k in cond], - durations=[duration[k] for k in cond], - amplitudes=None, - tmod=None, - pmod=[Bunch(name=[gain, loss], - poly=[1, 1], - param=[weights_gain[gain], - weights_loss[loss]])], - regressor_names=None, - regressors=None)) - - return subjectinfo + subject_info = [] + + for run_id, event_file in enumerate(event_files): + + trial_key = f'gamble_run{run_id + 1}' + gain_key = f'gain_run{run_id + 1}' + loss_key = f'loss_run{run_id + 1}' + + onsets.update({trial_key: []}) + durations.update({trial_key: []}) + weights_gain.update({gain_key: []}) + weights_loss.update({loss_key: []}) + + with open(event_file, 'rt') as file: + next(file) # skip the header + + for line in file: + info = line.strip().split() + onsets[trial_key].append(float(info[0])) + durations[trial_key].append(float(info[1])) + weights_gain[gain_key].append(float(info[2])) + weights_loss[loss_key].append(float(info[3])) + + # Create a Bunch per run, i.e. cond1_run1, cond2_run1, etc. + subject_info.append( + Bunch( + conditions = [trial_key], + onsets = [onsets[trial_key]], + durations = [durations[trial_key]], + amplitudes = None, + tmod = None, + pmod = [Bunch( + name = [gain_key, loss_key], + poly = [1, 1], + param = [weights_gain[gain_key], weights_loss[loss_key]] + )], + regressor_names = None, + regressors = None + )) + + return subject_info # @staticmethod # Starting python 3.10, staticmethod should be used here # Otherwise it produces a TypeError: 'staticmethod' object is not callable @@ -191,7 +170,7 @@ def get_subject_level_analysis(self): # Identitiy interface Node - to iterate over subject_id and run infosource = Node(interface=IdentityInterface(fields=['subject_id']), name = 'infosource') - infosource.iterables = [('subject_id', subs)] + infosource.iterables = [('subject_id', self.subject_list)] # Select files from derivatives templates = { @@ -210,13 +189,13 @@ def get_subject_level_analysis(self): # Smooth warped functionals. smooth = Node(Smooth(), name = 'smooth') - smooth.inputs.overwrite = False - smooth.iterables = ('fwhm', fwhmlist) + smooth.inputs.fwhm = self.fwhm + smooth.overwrite = False # Function node get_subject_information - get subject specific condition information getsubinforuns = Node(Function( - function = pick_onsets, - input_names = ['subject_id'], + function = self.get_subject_information, + input_names = ['event_files'], output_names = ['subject_info'] ), name = 'getsubinforuns') @@ -231,49 +210,38 @@ def get_subject_level_analysis(self): confounds.inputs.run_id = self.run_list modelspec = Node(SpecifySPMModel(), name = 'modelspec') - modelspec.inputs.overwrite = False modelspec.inputs.concatenate_runs = False modelspec.inputs.input_units = 'secs' modelspec.inputs.output_units = 'secs' modelspec.inputs.time_repetition = TaskInformation()['RepetitionTime'] modelspec.inputs.high_pass_filter_cutoff = 128 + modelspec.overwrite = False level1design = Node(Level1Design(), name = 'level1design') - level1design.inputs.overwrite = False level1design.inputs.bases = {'hrf': {'derivs': [0, 0]}} level1design.inputs.timing_units = 'secs' level1design.inputs.interscan_interval = TaskInformation()['RepetitionTime'] + level1design.overwrite = False level1estimate = Node(EstimateModel(), name = 'level1estimate') - level1estimate.inputs.overwrite = False level1estimate.inputs.estimation_method = {'Classical': 1} + level1estimate.overwrite = False contrast_estimate = Node(EstimateContrast(), name = 'contraste_estimate') - contrast_estimate.inputs.overwrite=False, + contrast_estimate.inputs.contrasts = self.subject_level_contrasts contrast_estimate.config = {'execution': {'remove_unnecessary_outputs': False}} - - contrasts = Node(Function( - function = con_setup, - input_names = ['subject_id'], - output_names = ['contrasts'] - ), - name = 'contrasts') + contrast_estimate.overwrite = False subject_level_analysis = Workflow( base_dir = self.directories.working_dir, name = 'subject_level_analysis' ) subject_level_analysis.connect([ (infosource, selectderivs, [('subject_id', 'subject_id')]), - (infosource, contrasts, [('subject_id', 'subject_id')]), - (infosource, getsubinforuns, [('subject_id', 'subject_id')]), + (infosource, getsubinforuns, [('events', 'event_files')]), (infosource, confounds, [('subject_id', 'subject_id')]), (selectderivs, gunzip, [('func', 'in_file')]), (selectderivs, confounds, [('confounds', 'filepath')]), (gunzip, smooth, [('out_file', 'in_files')]), - (contrasts, contrast_estimate, [('contrasts', 'contrasts')]), - (contrast_estimate, selectcontrast, [('con_images', 'inlist')]), - (selectcontrast, overlaystats, [('out', 'stat_image')]), - (overlaystats, slicestats, [('out_file', 'in_file')]), (getsubinforuns, modelspec, [('subject_info', 'subject_info')]), (confounds, modelspec, [('confounds_file', 'realignment_parameters')]), (smooth, modelspec, [('smoothed_files', 'functional_runs')]), @@ -329,23 +297,6 @@ def get_group_level_analysis(self): Returns; - a list of nipype.WorkFlow """ - return_list = [] - - self.model_list = ['gain', 'loss'] - self.contrast_list = ['0001'] - return_list.append(self.get_group_level_analysis_sub_workflow('equalRange')) - return_list.append(self.get_group_level_analysis_sub_workflow('equalIndifference')) - - self.model_list = ['loss'] - self.contrast_list = ['0001'] - return_list.append(self.get_group_level_analysis_sub_workflow('groupComp')) - - self.model_list = ['loss'] - self.contrast_list = ['0002'] - return_list.append(self.get_group_level_analysis_sub_workflow('equalRange')) - return_list.append(self.get_group_level_analysis_sub_workflow('equalIndifference')) - - return return_list def get_group_level_analysis_sub_workflow(self, method): """ @@ -357,188 +308,6 @@ def get_group_level_analysis_sub_workflow(self, method): Returns: - group_level_analysis: nipype.WorkFlow """ - # Compute the number of participants used to do the analysis - nb_subjects = len(self.subject_list) - - # Infosource - iterate over the list of contrasts - information_source = Node(IdentityInterface( - fields = ['model_type', 'contrast_id']), - name = 'information_source') - information_source.iterables = [ - ('model_type', self.model_list), - ('contrast_id', self.contrast_list) - ] - - # SelectFiles Node - templates = { - # Contrast files for all participants - 'contrasts' : join(self.directories.output_dir, - 'subject_level_analysis_{model_type}', '_subject_id_*', 'con_{contrast_id}.nii' - ) - } - select_files = Node(SelectFiles(templates), name = 'select_files') - select_files.inputs.base_directory = self.directories.dataset_dir - select_files.inputs.force_list = True - - # Datasink - save important files - data_sink = Node(DataSink(), name = 'data_sink') - data_sink.inputs.base_directory = self.directories.output_dir - - # Function Node get_equal_range_subjects - # Get subjects in the equalRange group and in the subject_list - get_equal_range_subjects = Node(Function( - function = list_intersection, - input_names = ['list_1', 'list_2'], - output_names = ['out_list'] - ), - name = 'get_equal_range_subjects' - ) - get_equal_range_subjects.inputs.list_1 = get_group('equalRange') - get_equal_range_subjects.inputs.list_2 = self.subject_list - - # Function Node get_equal_indifference_subjects - # Get subjects in the equalIndifference group and in the subject_list - get_equal_indifference_subjects = Node(Function( - function = list_intersection, - input_names = ['list_1', 'list_2'], - output_names = ['out_list'] - ), - name = 'get_equal_indifference_subjects' - ) - get_equal_indifference_subjects.inputs.list_1 = get_group('equalIndifference') - get_equal_indifference_subjects.inputs.list_2 = self.subject_list - - # Create a function to complete the subject ids out from the get_equal_*_subjects nodes - # If not complete, subject id '001' in search patterns - # would match all contrast files with 'con_0001.nii'. - complete_subject_ids = lambda l : [f'_subject_id_{a}' for a in l] - - # Function Node elements_in_string - # Get contrast files for required subjects - # Note : using a MapNode with elements_in_string requires using clean_list to remove - # None values from the out_list - get_contrasts = MapNode(Function( - function = elements_in_string, - input_names = ['input_str', 'elements'], - output_names = ['out_list'] - ), - name = 'get_contrasts', iterfield = 'input_str' - ) - - # Estimate model - estimate_model = Node(EstimateModel(), name = 'estimate_model') - estimate_model.inputs.estimation_method = {'Classical':1} - - # Estimate contrasts - estimate_contrast = Node(EstimateContrast(), name = 'estimate_contrast') - estimate_contrast.inputs.group_contrast = True - - # Create thresholded maps - threshold = MapNode(Threshold(), name = 'threshold', - iterfield = ['stat_image', 'contrast_index']) - threshold.inputs.contrast_index = 1 - threshold.inputs.use_topo_fdr = True - threshold.inputs.use_fwe_correction = False - threshold.inputs.extent_threshold = 0 - threshold.inputs.height_threshold = 0.001 - threshold.inputs.height_threshold_type = 'p-value' - threshold.synchronize = True - - group_level_analysis = Workflow( - base_dir = self.directories.working_dir, - name = f'group_level_analysis_{method}_nsub_{nb_subjects}') - group_level_analysis.connect([ - (information_source, select_files, [ - ('contrast_id', 'contrast_id'), - ('model_type', 'model_type')]), - (select_files, get_contrasts, [('contrasts', 'input_str')]), - (estimate_model, estimate_contrast, [ - ('spm_mat_file', 'spm_mat_file'), - ('residual_image', 'residual_image'), - ('beta_images', 'beta_images')]), - (estimate_contrast, threshold, [ - ('spm_mat_file', 'spm_mat_file'), - ('spmT_images', 'stat_image')]), - (estimate_model, data_sink, [ - ('mask_image', f'group_level_analysis_{method}_nsub_{nb_subjects}.@mask')]), - (estimate_contrast, data_sink, [ - ('spm_mat_file', f'group_level_analysis_{method}_nsub_{nb_subjects}.@spm_mat'), - ('spmT_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@T'), - ('con_images', f'group_level_analysis_{method}_nsub_{nb_subjects}.@con')]), - (threshold, data_sink, [ - ('thresholded_map', f'group_level_analysis_{method}_nsub_{nb_subjects}.@thresh')])]) - - if method in ('equalRange', 'equalIndifference'): - estimate_contrast.inputs.contrasts = [ - ('Group', 'T', ['mean'], [1]), - ('Group', 'T', ['mean'], [-1]) - ] - threshold.inputs.contrast_index = [1, 2] - - # Specify design matrix - one_sample_t_test_design = Node(OneSampleTTestDesign(), - name = 'one_sample_t_test_design') - group_level_analysis.connect([ - (get_contrasts, one_sample_t_test_design, [ - (('out_list', clean_list), 'in_files') - ]), - (one_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) - - if method == 'equalRange': - group_level_analysis.connect([ - (get_equal_range_subjects, get_contrasts, [ - (('out_list', complete_subject_ids), 'elements') - ]) - ]) - - elif method == 'equalIndifference': - group_level_analysis.connect([ - (get_equal_indifference_subjects, get_contrasts, [ - (('out_list', complete_subject_ids), 'elements') - ]) - ]) - - elif method == 'groupComp': - estimate_contrast.inputs.contrasts = [ - ('Eq range vs Eq indiff in loss', 'T', ['Group_{1}', 'Group_{2}'], [1, -1]) - ] - threshold.inputs.contrast_index = [1] - - # Function Node elements_in_string - # Get contrast files for required subjects - # Note : using a MapNode with elements_in_string requires using clean_list to remove - # None values from the out_list - get_contrasts_2 = MapNode(Function( - function = elements_in_string, - input_names = ['input_str', 'elements'], - output_names = ['out_list'] - ), - name = 'get_contrasts_2', iterfield = 'input_str' - ) - - # Specify design matrix - two_sample_t_test_design = Node(TwoSampleTTestDesign(), - name = 'two_sample_t_test_design') - - group_level_analysis.connect([ - (select_files, get_contrasts_2, [('contrasts', 'input_str')]), - (get_equal_range_subjects, get_contrasts, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_equal_indifference_subjects, get_contrasts_2, [ - (('out_list', complete_subject_ids), 'elements') - ]), - (get_contrasts, two_sample_t_test_design, [ - (('out_list', clean_list), 'group1_files') - ]), - (get_contrasts_2, two_sample_t_test_design, [ - (('out_list', clean_list), 'group2_files') - ]), - (two_sample_t_test_design, estimate_model, [('spm_mat_file', 'spm_mat_file')]) - ]) - - return group_level_analysis def get_group_level_outputs(self): """ Return all names for the files the group level analysis is supposed to generate. """ diff --git a/tests/pipelines/test_team_U26C.py b/tests/pipelines/test_team_U26C.py new file mode 100644 index 00000000..b0971e07 --- /dev/null +++ b/tests/pipelines/test_team_U26C.py @@ -0,0 +1,151 @@ +#!/usr/bin/python +# coding: utf-8 + +""" Tests of the 'narps_open.pipelines.team_U26C' module. + +Launch this test with PyTest + +Usage: +====== + pytest -q test_team_U26C.py + pytest -q test_team_U26C.py -k +""" +from os import mkdir +from os.path import join, exists +from shutil import rmtree +from filecmp import cmp + +from pytest import helpers, mark, fixture +from numpy import isclose +from nipype import Workflow +from nipype.interfaces.base import Bunch + +from narps_open.utils.configuration import Configuration +from narps_open.pipelines.team_U26C import PipelineTeamU26C + +TEMPORARY_DIR = join(Configuration()['directories']['test_runs'], 'test_U26C') + +@fixture +def remove_test_dir(): + """ A fixture to remove temporary directory created by tests """ + + rmtree(TEMPORARY_DIR, ignore_errors = True) + mkdir(TEMPORARY_DIR) + yield # test runs here + #rmtree(TEMPORARY_DIR, ignore_errors = True) + +def compare_float_2d_arrays(array_1, array_2): + """ Assert array_1 and array_2 are close enough """ + + assert len(array_1) == len(array_2) + for reference_array, test_array in zip(array_1, array_2): + assert len(reference_array) == len(test_array) + assert isclose(reference_array, test_array).all() + +class TestPipelinesTeamU26C: + """ A class that contains all the unit tests for the PipelineTeamU26C class.""" + + @staticmethod + @mark.unit_test + def test_create(): + """ Test the creation of a PipelineTeamU26C object """ + + pipeline = PipelineTeamU26C() + + # 1 - check the parameters + assert pipeline.fwhm == 5.0 + assert pipeline.team_id == 'U26C' + + # 2 - check workflows + assert pipeline.get_preprocessing() is None + assert pipeline.get_run_level_analysis() is None + assert isinstance(pipeline.get_subject_level_analysis(), Workflow) + group_level = pipeline.get_group_level_analysis() + + """assert len(group_level) == 3 + for sub_workflow in group_level: + assert isinstance(sub_workflow, Workflow)""" + + @staticmethod + @mark.unit_test + def test_outputs(): + """ Test the expected outputs of a PipelineTeamU26C object """ + pipeline = PipelineTeamU26C() + # 1 - 1 subject outputs + """pipeline.subject_list = ['001'] + assert len(pipeline.get_preprocessing_outputs()) == 0 + assert len(pipeline.get_run_level_outputs()) == 0 + assert len(pipeline.get_subject_level_outputs()) == 7 + assert len(pipeline.get_group_level_outputs()) == 63 + assert len(pipeline.get_hypotheses_outputs()) == 18 + + # 2 - 4 subjects outputs + pipeline.subject_list = ['001', '002', '003', '004'] + assert len(pipeline.get_preprocessing_outputs()) == 0 + assert len(pipeline.get_run_level_outputs()) == 0 + assert len(pipeline.get_subject_level_outputs()) == 28 + assert len(pipeline.get_group_level_outputs()) == 63 + assert len(pipeline.get_hypotheses_outputs()) == 18""" + + @staticmethod + @mark.unit_test + def test_subject_information(): + """ Test the get_subject_information method """ + + # Get test files + test_file = join(Configuration()['directories']['test_data'], 'pipelines', 'events.tsv') + info = PipelineTeamU26C.get_subject_information([test_file, test_file]) + + # Compare bunches to expected + bunch = info[0] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['gamble_run1'] + compare_float_2d_arrays(bunch.onsets, [[4.071, 11.834, 19.535, 27.535, 36.435]]) + compare_float_2d_arrays(bunch.durations, [[4.0, 4.0, 4.0, 4.0, 4.0]]) + assert bunch.amplitudes == None + assert bunch.tmod == None + assert bunch.pmod[0].name == ['gain_run1', 'loss_run1'] + assert bunch.pmod[0].poly == [1, 1] + compare_float_2d_arrays(bunch.pmod[0].param, [[14.0, 34.0, 38.0, 10.0, 16.0], [6.0, 14.0, 19.0, 15.0, 17.0]]) + assert bunch.regressor_names == None + assert bunch.regressors == None + + bunch = info[1] + assert isinstance(bunch, Bunch) + assert bunch.conditions == ['gamble_run2'] + compare_float_2d_arrays(bunch.onsets, [[4.071, 11.834, 19.535, 27.535, 36.435]]) + compare_float_2d_arrays(bunch.durations, [[4.0, 4.0, 4.0, 4.0, 4.0]]) + assert bunch.amplitudes == None + assert bunch.tmod == None + assert bunch.pmod[0].name == ['gain_run2', 'loss_run2'] + assert bunch.pmod[0].poly == [1, 1] + compare_float_2d_arrays(bunch.pmod[0].param, [[14.0, 34.0, 38.0, 10.0, 16.0], [6.0, 14.0, 19.0, 15.0, 17.0]]) + assert bunch.regressor_names == None + assert bunch.regressors == None + + @staticmethod + @mark.unit_test + def test_confounds_file(remove_test_dir): + """ Test the get_confounds_file method """ + + confounds_file = join( + Configuration()['directories']['test_data'], 'pipelines', 'confounds.tsv') + reference_file = join( + Configuration()['directories']['test_data'], 'pipelines', 'team_U26C', 'confounds.tsv') + + # Get new confounds file + PipelineTeamU26C.get_confounds_file(confounds_file, 'sid', 'rid', TEMPORARY_DIR) + + # Check confounds file was created + created_confounds_file = join( + TEMPORARY_DIR, 'confounds_files', 'confounds_file_sub-sid_run-rid.tsv') + assert exists(created_confounds_file) + + # Check contents + assert cmp(reference_file, created_confounds_file) + + @staticmethod + @mark.pipeline_test + def test_execution(): + """ Test the execution of a PipelineTeamU26C and compare results """ + helpers.test_pipeline_evaluation('U26C') diff --git a/tests/test_data/pipelines/team_U26C/confounds.tsv b/tests/test_data/pipelines/team_U26C/confounds.tsv new file mode 100644 index 00000000..05925432 --- /dev/null +++ b/tests/test_data/pipelines/team_U26C/confounds.tsv @@ -0,0 +1,3 @@ +6551.281999999999 6476.4653 0.0 0.0 0.0 0.0 -0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 +6484.7285 6473.4890000000005 -0.00996895 -0.0313444 -3.00931e-06 0.00132687 -0.000384193 -0.00016819 -0.00996895 -0.0313444 -3.00931e-06 0.00132687 -0.000384193 -0.00016819 +6441.5337 6485.7256 -2.56954e-05 -0.00923735 0.0549667 0.000997278 -0.00019745 -0.000398988 0.009943254600000001 0.022107050000000003 0.05496970931 -0.00032959199999999986 0.000186743 -0.00023079800000000002