Skip to content

Commit

Permalink
[TEST] test the runner launching first level or second level only
Browse files Browse the repository at this point in the history
  • Loading branch information
bclenet committed Apr 25, 2023
1 parent da8a9fa commit 45a64c2
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 21 deletions.
10 changes: 5 additions & 5 deletions narps_open/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def start(self, first_level_only: bool = False, group_level_only: bool = False)
for sub_workflow in workflow:
if not isinstance(sub_workflow, Workflow):
raise AttributeError('Workflow must be of type nipype.Workflow')

if nb_procs > 1:
sub_workflow.run('MultiProc', plugin_args={'n_procs': nb_procs})
else:
Expand All @@ -126,10 +126,10 @@ def start(self, first_level_only: bool = False, group_level_only: bool = False)
if not isinstance(workflow, Workflow):
raise AttributeError('Workflow must be of type nipype.Workflow')

if nb_procs > 1:
workflow.run('MultiProc', plugin_args={'n_procs': nb_procs})
else:
workflow.run()
if nb_procs > 1:
workflow.run('MultiProc', plugin_args={'n_procs': nb_procs})
else:
workflow.run()

if __name__ == '__main__':

Expand Down
1 change: 1 addition & 0 deletions narps_open/utils/configuration/testing_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dataset = "tests/data/original/ds001734/"
reproduced_results = "tests/data/reproduced/"
narps_results = "tests/data/results/"
test_data = "tests/test_data/"
test_runs = "run/"

[runner]
nb_procs = 8 # Maximum number of threads executed by the runner
119 changes: 103 additions & 16 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@
pytest -q test_runner.py -k <selected_test>
"""

from os import remove
from os.path import join, isfile, abspath

from datetime import datetime

from pytest import raises, mark

from nipype import Node, Workflow
from nipype.interfaces.utility import Split, Merge
from nipype.interfaces.utility import Function

from narps_open.utils.configuration import Configuration
from narps_open.runner import PipelineRunner
from narps_open.pipelines import Pipeline
from narps_open.pipelines.team_2T6S import PipelineTeam2T6S
Expand All @@ -25,35 +31,67 @@ class MockupPipeline(Pipeline):

def __init__(self):
super().__init__()

def get_preprocessing(self):
node_1 = Node(
Split(),
self.test_file = abspath(
join(Configuration()['directories']['test_runs'], 'test_runner.txt'))
if isfile(self.test_file):
remove(self.test_file)

def __del__(self):
if isfile(self.test_file):
remove(self.test_file)

def write_to_file(_, text_to_write: str, file_path: str):
""" Method used inside a nipype Node, to write a line in a test file """
with open(file_path, 'a', encoding = 'utf-8') as file:
file.write(text_to_write)

def create_workflow(self, workflow_name: str):
""" Return a nipype worflow with two nodes writing in a file """
node_1 = Node(Function(
input_names = ['_', 'text_to_write', 'file_path'],
output_names = ['_'],
function = self.write_to_file),
name = 'node_1'
)
node_1.inputs.inlist = [1, 2, 3] # the list to split
node_1.inputs.splits = [1, 2] # the number of elements per output lists

node_2 = Node(
Merge(2),
# this input is set to now(), so that it changes at every run, thus preventing
# nipype's cache to work
node_1.inputs._ = datetime.now()
node_1.inputs.text_to_write = 'MockupPipeline : '+workflow_name+' node_1\n'
node_1.inputs.file_path = self.test_file

node_2 = Node(Function(
input_names = ['_', 'text_to_write', 'file_path'],
output_names = [],
function = self.write_to_file),
name = 'node_2'
)
node_2.inputs.text_to_write = 'MockupPipeline : '+workflow_name+' node_2\n'
node_2.inputs.file_path = self.test_file

workflow = Workflow(base_dir = 'run', name = 'TestPipelineRunner_preprocessing_workflow')
workflow = Workflow(
base_dir = Configuration()['directories']['test_runs'],
name = workflow_name
)
workflow.add_nodes([node_1, node_2])
workflow.connect(node_1, 'out1', node_2, 'in1')
workflow.connect(node_1, 'out2', node_2, 'in2')
workflow.connect(node_1, '_', node_2, '_')

return workflow

def get_preprocessing(self):
""" Return a fake preprocessing workflow """
return self.create_workflow('TestPipelineRunner_preprocessing_workflow')

def get_run_level_analysis(self):
return None
""" Return a fake run level workflow """
return self.create_workflow('TestPipelineRunner_run_level_workflow')

def get_subject_level_analysis(self):
return None
""" Return a fake subject level workflow """
return self.create_workflow('TestPipelineRunner_subject_level_workflow')

def get_group_level_analysis(self):
return None
""" Return a fake subject level workflow """
return self.create_workflow('TestPipelineRunner_group_level_workflow')

class MockupWrongPipeline(Pipeline):
""" A simple Pipeline class for test purposes """
Expand Down Expand Up @@ -182,6 +220,13 @@ def test_start_nok():
with raises(AttributeError):
runner.start()

# 2b - test starting a pipeline with wrong options (fist_level_only + group_level_only)
runner = PipelineRunner('2T6S')
runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline

with raises(AttributeError):
runner.start(True, True)

@staticmethod
@mark.unit_test
def test_start_ok():
Expand All @@ -190,3 +235,45 @@ def test_start_ok():
runner = PipelineRunner('2T6S')
runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline
runner.start()

# 1 - read results of the pipeline
with open(
join(Configuration()['directories']['test_runs'], 'test_runner.txt'),
'r', encoding = 'utf-8') as file:
for workflow in [
'TestPipelineRunner_preprocessing_workflow',
'TestPipelineRunner_run_level_workflow',
'TestPipelineRunner_subject_level_workflow',
'TestPipelineRunner_group_level_workflow']:
assert file.readline() == 'MockupPipeline : '+workflow+' node_1\n'
assert file.readline() == 'MockupPipeline : '+workflow+' node_2\n'

# 2 - test starting a pipeline partly (group_level_only)
runner = PipelineRunner('2T6S')
runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline
runner.start(False, True)

# 2 - read results of the pipeline
with open(
join(Configuration()['directories']['test_runs'], 'test_runner.txt'),
'r', encoding = 'utf-8') as file:
assert file.readline() == \
'MockupPipeline : TestPipelineRunner_group_level_workflow node_1\n'
assert file.readline() == \
'MockupPipeline : TestPipelineRunner_group_level_workflow node_2\n'

# 3 - test starting a pipeline partly (first_level_only)
runner = PipelineRunner('2T6S')
runner._pipeline = MockupPipeline() # hack the runner by setting a test Pipeline
runner.start(True, False)

# 3 - read results of the pipeline
with open(
join(Configuration()['directories']['test_runs'], 'test_runner.txt'),
'r', encoding = 'utf-8') as file:
for workflow in [
'TestPipelineRunner_preprocessing_workflow',
'TestPipelineRunner_run_level_workflow',
'TestPipelineRunner_subject_level_workflow']:
assert file.readline() == 'MockupPipeline : '+workflow+' node_1\n'
assert file.readline() == 'MockupPipeline : '+workflow+' node_2\n'

0 comments on commit 45a64c2

Please sign in to comment.