diff --git a/narps_open/runner.py b/narps_open/runner.py index 56e2588d..403cb237 100644 --- a/narps_open/runner.py +++ b/narps_open/runner.py @@ -117,7 +117,7 @@ def start(self, first_level_only: bool = False, group_level_only: bool = False) for sub_workflow in workflow: if not isinstance(sub_workflow, Workflow): raise AttributeError('Workflow must be of type nipype.Workflow') - + if nb_procs > 1: sub_workflow.run('MultiProc', plugin_args={'n_procs': nb_procs}) else: @@ -126,10 +126,10 @@ def start(self, first_level_only: bool = False, group_level_only: bool = False) if not isinstance(workflow, Workflow): raise AttributeError('Workflow must be of type nipype.Workflow') - if nb_procs > 1: - workflow.run('MultiProc', plugin_args={'n_procs': nb_procs}) - else: - workflow.run() + if nb_procs > 1: + workflow.run('MultiProc', plugin_args={'n_procs': nb_procs}) + else: + workflow.run() if __name__ == '__main__': diff --git a/narps_open/utils/configuration/testing_config.toml b/narps_open/utils/configuration/testing_config.toml index 4e67d3ac..595c22c3 100644 --- a/narps_open/utils/configuration/testing_config.toml +++ b/narps_open/utils/configuration/testing_config.toml @@ -7,6 +7,7 @@ dataset = "tests/data/original/ds001734/" reproduced_results = "tests/data/reproduced/" narps_results = "tests/data/results/" test_data = "tests/test_data/" +test_runs = "run/" [runner] nb_procs = 8 # Maximum number of threads executed by the runner \ No newline at end of file diff --git a/tests/test_runner.py b/tests/test_runner.py index b75654f4..90407227 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -11,11 +11,17 @@ pytest -q test_runner.py -k """ +from os import remove +from os.path import join, isfile, abspath + +from datetime import datetime + from pytest import raises, mark from nipype import Node, Workflow -from nipype.interfaces.utility import Split, Merge +from nipype.interfaces.utility import Function +from narps_open.utils.configuration import Configuration from narps_open.runner import PipelineRunner from narps_open.pipelines import Pipeline from narps_open.pipelines.team_2T6S import PipelineTeam2T6S @@ -25,35 +31,67 @@ class MockupPipeline(Pipeline): def __init__(self): super().__init__() - - def get_preprocessing(self): - node_1 = Node( - Split(), + self.test_file = abspath( + join(Configuration()['directories']['test_runs'], 'test_runner.txt')) + if isfile(self.test_file): + remove(self.test_file) + + def __del__(self): + if isfile(self.test_file): + remove(self.test_file) + + def write_to_file(_, text_to_write: str, file_path: str): + """ Method used inside a nipype Node, to write a line in a test file """ + with open(file_path, 'a', encoding = 'utf-8') as file: + file.write(text_to_write) + + def create_workflow(self, workflow_name: str): + """ Return a nipype worflow with two nodes writing in a file """ + node_1 = Node(Function( + input_names = ['_', 'text_to_write', 'file_path'], + output_names = ['_'], + function = self.write_to_file), name = 'node_1' ) - node_1.inputs.inlist = [1, 2, 3] # the list to split - node_1.inputs.splits = [1, 2] # the number of elements per output lists - - node_2 = Node( - Merge(2), + # this input is set to now(), so that it changes at every run, thus preventing + # nipype's cache to work + node_1.inputs._ = datetime.now() + node_1.inputs.text_to_write = 'MockupPipeline : '+workflow_name+' node_1\n' + node_1.inputs.file_path = self.test_file + + node_2 = Node(Function( + input_names = ['_', 'text_to_write', 'file_path'], + output_names = [], + function = self.write_to_file), name = 'node_2' ) + node_2.inputs.text_to_write = 'MockupPipeline : '+workflow_name+' node_2\n' + node_2.inputs.file_path = self.test_file - workflow = Workflow(base_dir = 'run', name = 'TestPipelineRunner_preprocessing_workflow') + workflow = Workflow( + base_dir = Configuration()['directories']['test_runs'], + name = workflow_name + ) workflow.add_nodes([node_1, node_2]) - workflow.connect(node_1, 'out1', node_2, 'in1') - workflow.connect(node_1, 'out2', node_2, 'in2') + workflow.connect(node_1, '_', node_2, '_') return workflow + def get_preprocessing(self): + """ Return a fake preprocessing workflow """ + return self.create_workflow('TestPipelineRunner_preprocessing_workflow') + def get_run_level_analysis(self): - return None + """ Return a fake run level workflow """ + return self.create_workflow('TestPipelineRunner_run_level_workflow') def get_subject_level_analysis(self): - return None + """ Return a fake subject level workflow """ + return self.create_workflow('TestPipelineRunner_subject_level_workflow') def get_group_level_analysis(self): - return None + """ Return a fake subject level workflow """ + return self.create_workflow('TestPipelineRunner_group_level_workflow') class MockupWrongPipeline(Pipeline): """ A simple Pipeline class for test purposes """ @@ -182,6 +220,13 @@ def test_start_nok(): with raises(AttributeError): runner.start() + # 2b - test starting a pipeline with wrong options (fist_level_only + group_level_only) + runner = PipelineRunner('2T6S') + runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline + + with raises(AttributeError): + runner.start(True, True) + @staticmethod @mark.unit_test def test_start_ok(): @@ -190,3 +235,45 @@ def test_start_ok(): runner = PipelineRunner('2T6S') runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline runner.start() + + # 1 - read results of the pipeline + with open( + join(Configuration()['directories']['test_runs'], 'test_runner.txt'), + 'r', encoding = 'utf-8') as file: + for workflow in [ + 'TestPipelineRunner_preprocessing_workflow', + 'TestPipelineRunner_run_level_workflow', + 'TestPipelineRunner_subject_level_workflow', + 'TestPipelineRunner_group_level_workflow']: + assert file.readline() == 'MockupPipeline : '+workflow+' node_1\n' + assert file.readline() == 'MockupPipeline : '+workflow+' node_2\n' + + # 2 - test starting a pipeline partly (group_level_only) + runner = PipelineRunner('2T6S') + runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline + runner.start(False, True) + + # 2 - read results of the pipeline + with open( + join(Configuration()['directories']['test_runs'], 'test_runner.txt'), + 'r', encoding = 'utf-8') as file: + assert file.readline() == \ + 'MockupPipeline : TestPipelineRunner_group_level_workflow node_1\n' + assert file.readline() == \ + 'MockupPipeline : TestPipelineRunner_group_level_workflow node_2\n' + + # 3 - test starting a pipeline partly (first_level_only) + runner = PipelineRunner('2T6S') + runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline + runner.start(True, False) + + # 3 - read results of the pipeline + with open( + join(Configuration()['directories']['test_runs'], 'test_runner.txt'), + 'r', encoding = 'utf-8') as file: + for workflow in [ + 'TestPipelineRunner_preprocessing_workflow', + 'TestPipelineRunner_run_level_workflow', + 'TestPipelineRunner_subject_level_workflow']: + assert file.readline() == 'MockupPipeline : '+workflow+' node_1\n' + assert file.readline() == 'MockupPipeline : '+workflow+' node_2\n'