Skip to content

Commit

Permalink
Merge pull request #4 from glatard/master
Browse files Browse the repository at this point in the history
Metadata is now added to files involved in pipeline executions
  • Loading branch information
glatard authored May 2, 2018
2 parents 4270a97 + 0df7869 commit 3947825
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 16 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,15 @@ run FSL bet on a test image and commit the results to the dataset.
datalad create test-dataset
conp-pipeline run test-dataset tests/fsl_bet.json tests/invocation.json
```

## Metadata

`conp-pipeline run` adds the following metadata to the pipeline inputs and
outputs:
* `<descriptor>`:
- `conp-pipeline-role`: `pipeline-description`
- various fields extracted from the Boutiques descriptor
including `name`, `description`, `tags`, `container-type`, etc.
* `<invocation>`: `conp-pipeline-role`: `invocation-description`.
* Pipeline input files: `conp-pipeline-role`: `input-file`.
* Pipeline results: `conp-pipeline-role`: `result-file`.
50 changes: 47 additions & 3 deletions conp_pipeline/conp_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from boutiques import invocation as bosh_invocation
from datalad.distribution.dataset import Dataset
from datalad.distribution.add import Add
from datalad.metadata.aggregate import AggregateMetaData


class CONPPipelineError(Exception):
Expand Down Expand Up @@ -42,7 +43,9 @@ def add_to_execution(file_name, execution_dir, to_git):


def to_git_guess(file_name):
return file_name.endswith('.json') or file_name.endswith('.txt')
# return file_name.endswith('.json') or file_name.endswith('.txt')
# Looks like a file must be in the annex to have metadata
return False


def main(args=None):
Expand Down Expand Up @@ -92,7 +95,28 @@ def main(args=None):
# Add descriptor to execution dir
descriptor_file = add_to_execution(descriptor_file,
execution_dir,
to_git=True)
to_git=to_git_guess(descriptor_file))

# Add metadata to descriptor
descriptor = json.loads(open(descriptor_file).read())
descriptor_properties = {}
descriptor_properties['conp-pipeline-role'] = ['pipeline-description']
descriptor_properties['pipeline-format'] = ['boutiques']
for field in ['name', 'description', 'tool-version', 'author',
'url', 'doi', 'tool-doi']:
if descriptor.get(field):
descriptor_properties[field] = [descriptor[field]]
if descriptor.get('container-image'):
descriptor_properties['container-image-type'] = \
[descriptor['container-image']['type']]
if descriptor.get('tests'):
descriptor_properties['has-tests'] = [True]
if descriptor.get('tags'):
for tag in descriptor['tags'].keys():
descriptor_properties[tag] = [descriptor['tags'][tag]]
g = dataset.repo.set_metadata(descriptor_file,
add=descriptor_properties)
list(g) # seems to be required or the metadata is not added

# Add input data files to dataset and update invocation accordingly
# TODO: check if it works with file lists
Expand All @@ -113,14 +137,25 @@ def main(args=None):
# Add file to the dataset and update invocation accordingly
file_path_in_dataset = add_to_execution(input_file,
execution_dir,
to_git=False)
to_git=to_git_guess(
input_file))
g = dataset.repo.set_metadata(file_path_in_dataset,
init={'conp-pipeline-role':
'input-file'})
list(g)
bosh_inputs[input_id] = file_path_in_dataset
invocation[input_id] = file_path_in_dataset

# Write updated invocation to dataset
invocation_file = op.join(execution_dir, op.basename(invocation_file))
with open(invocation_file, 'w') as fhandle:
fhandle.write(json.dumps(invocation, indent=4, sort_keys=True))
add_to_execution(invocation_file, execution_dir,
to_git=to_git_guess(invocation_file))
g = dataset.repo.set_metadata(invocation_file,
init={'conp-pipeline-role':
'invocation-description'})
list(g)

# Run the execution in Clowdr
info("Executing invocation with Clowdr", verbose)
Expand All @@ -144,14 +179,23 @@ def main(args=None):
for file_name in os.listdir(task_dir):
file_name = op.join(task_dir, file_name)
if ((not op.basename(file_name).startswith("clowtask")) and
(not op.basename(file_name).startswith("MD5E-")) and
(file_name not in ignored_files)):
dest_file = op.abspath(op.join(execution_dir,
op.basename(file_name)))
info("Found result file: {}".format(file_name), verbose)
shutil.move(file_name, dest_file)
Add.__call__(dest_file, to_git=to_git_guess(dest_file))
g = dataset.repo.set_metadata(dest_file,
init={'conp-pipeline-role':
'result-file'})
list(g)
# Cleanup Clowdr dir
shutil.rmtree(op.dirname(task_dir))

# Aggregate DataLad metadata
AggregateMetaData.__call__(dataset=dataset)

info("Done!", verbose)
# Add metadadta to descriptor so that others can find it (how?)

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup
import sys

VERSION = "0.1.1"
VERSION = "0.1.2"
DEPS = ["clowdr"]

setup(name="conp-pipeline",
Expand Down
35 changes: 23 additions & 12 deletions tests/test_conp_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@

class TestImport(TestCase):

def test_conp_pipeline(self):
command = "chmod 777 test_dataset/.git -R"
os.system(command)
shutil.rmtree("test_dataset", ignore_errors=True)
command = ("datalad create test_dataset")
def run_command_test(self, command):
process = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
Expand All @@ -21,15 +17,30 @@ def test_conp_pipeline(self):
process.communicate()
self.assertFalse(process.returncode)

def search(self, key, file_name):
command = ("datalad -C test_dataset -c datalad.search.index-e"
"grep-documenttype=all search -f {} |"
" grep {}".format(key, file_name))
self.run_command_test(command)

def test_conp_pipeline(self):
# Test dataset creation with DataLad
command = "chmod 777 test_dataset/.git -R"
os.system(command)
shutil.rmtree("test_dataset", ignore_errors=True)

command = ("datalad create test_dataset")
self.run_command_test(command)

# Test pipeline execution
command = ("conp-pipeline run test_dataset tests/fsl_bet.json "
"tests/invocation.json")
process = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print(process.stdout.read())
print(process.stderr.read())
process.communicate()
self.assertFalse(process.returncode)
self.run_command_test(command)

self.assertTrue(os.path.exists('test_dataset/execution/'
'sub-01_T1w_brain.nii.gz'))

# Test that some metadata was added
self.search('boutiques', 'fsl_bet')
self.search('result-file', 'sub-01_T1w_brain.nii.gz')
self.search('invocation', 'invocation.json')

0 comments on commit 3947825

Please sign in to comment.