Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Allow unrecognized pipeline names and versions in processing status file #401

Merged
merged 23 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions bagel/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,18 +414,9 @@ def derivatives(
f"We found missing values in the following rows (first row is zero): {row_indices}."
)

pipelines = status_df[PROC_STATUS_COLS["pipeline_name"]].unique()
derivative_utils.check_pipelines_are_recognized(pipelines)

# TODO: Do we need to check all versions across all pipelines first, and report all unrecognized versions together?
for pipeline in pipelines:
versions = status_df[
status_df[PROC_STATUS_COLS["pipeline_name"]] == pipeline
][PROC_STATUS_COLS["pipeline_version"]].unique()

derivative_utils.check_pipeline_versions_are_recognized(
pipeline, versions
)
derivative_utils.check_at_least_one_pipeline_version_is_recognized(
status_df=status_df
)

jsonld_dataset = model_utils.extract_and_validate_jsonld_dataset(
jsonld_path
Expand Down
2 changes: 1 addition & 1 deletion bagel/mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def get_pipeline_catalog(url: str, path: Path) -> list[dict]:
) from e


def parse_pipeline_catalog():
def parse_pipeline_catalog() -> tuple[dict, dict]:
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"""
Load the pipeline catalog and return a dictionary of pipeline names and their URIs in the Nipoppy namespace,
and a dictionary of pipeline names and their supported versions in Nipoppy.
Expand Down
90 changes: 75 additions & 15 deletions bagel/utilities/derivative_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import Iterable

import pandas as pd
Expand All @@ -17,35 +18,92 @@
}


def check_pipelines_are_recognized(pipelines: Iterable[str]):
"""Check that all pipelines in the processing status file are supported by Nipoppy."""
def get_recognized_pipelines(pipelines: Iterable[str]) -> list:
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"""
Check that all pipelines in the processing status file are supported by Nipoppy.
Raise an error if all pipelines are unrecognized, otherwise warn about unrecognized pipelines.
"""
allowed_pipelines_message = (
f"Allowed pipeline names are the following pipelines supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog):\n"
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
f"{list(mappings.KNOWN_PIPELINE_URIS.keys())}"
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
)
recognized_pipelines = list(
set(pipelines).intersection(mappings.KNOWN_PIPELINE_URIS)
)
unrecognized_pipelines = list(
set(pipelines).difference(mappings.KNOWN_PIPELINE_URIS)
)
if len(unrecognized_pipelines) > 0:

if not recognized_pipelines:
raise LookupError(
f"The processing status file contains no recognized pipelines in the column '{PROC_STATUS_COLS['pipeline_name']}'.\n"
f"{allowed_pipelines_message}"
)
if unrecognized_pipelines:
warnings.warn(
f"The processing status file contains unrecognized pipelines in the column '{PROC_STATUS_COLS['pipeline_name']}': "
f"{unrecognized_pipelines}. "
f"Allowed pipeline names are the following pipelines supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n"
f"{mappings.KNOWN_PIPELINE_URIS}"
f"{unrecognized_pipelines}. These will be ignored.\n"
f"{allowed_pipelines_message}"
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
)
return recognized_pipelines


def check_pipeline_versions_are_recognized(
def classify_pipeline_versions(
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
pipeline: str, versions: Iterable[str]
):
) -> tuple[list, list]:
"""
Check that all pipeline versions in the processing status file are supported by Nipoppy.
Assumes that the input pipeline name is recognized.
For a given pipeline, return the recognized and unrecognized pipeline versions in the processing status file
based on the Nipoppy pipeline catalog, and return both as lists.
"""
recognized_versions = list(
set(versions).intersection(mappings.KNOWN_PIPELINE_VERSIONS[pipeline])
)
unrecognized_versions = list(
set(versions).difference(mappings.KNOWN_PIPELINE_VERSIONS[pipeline])
)
if len(unrecognized_versions) > 0:

return recognized_versions, unrecognized_versions


def check_at_least_one_pipeline_version_is_recognized(status_df: pd.DataFrame):
"""
Check that at least one pipeline name and version combination found in the processing status file is supported by Nipoppy.
"""
more_info_message = (
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"Allowed processing pipelines and versions are those supported natively in Nipoppy. "
"For a full list, see https://github.com/nipoppy/pipeline-catalog."
)

recognized_pipelines = get_recognized_pipelines(
status_df[PROC_STATUS_COLS["pipeline_name"]].unique()
)

any_recognized_versions = 0
unrecognized_pipeline_versions = {}
for pipeline in recognized_pipelines:
versions = status_df[
status_df[PROC_STATUS_COLS["pipeline_name"]] == pipeline
][PROC_STATUS_COLS["pipeline_version"]].unique()
alyssadai marked this conversation as resolved.
Show resolved Hide resolved

recognized_versions, unrecognized_versions = (
classify_pipeline_versions(pipeline, versions)
)

any_recognized_versions += len(recognized_versions)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
if len(unrecognized_versions) > 0:
unrecognized_pipeline_versions[pipeline] = unrecognized_versions

if not any_recognized_versions:
# TODO: Consider simply exiting with a message and no output instead?
raise LookupError(
f"The processing status file contains unrecognized {pipeline} versions in the column '{PROC_STATUS_COLS['pipeline_version']}': {unrecognized_versions}. "
f"Allowed {pipeline} versions are the following versions supported natively in Nipoppy (https://github.com/nipoppy/pipeline-catalog): \n"
f"{mappings.KNOWN_PIPELINE_VERSIONS[pipeline]}"
f"The processing status file contains no recognized versions of {recognized_pipelines} in the column '{PROC_STATUS_COLS['pipeline_version']}'.\n"
f"{more_info_message}"
)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
if unrecognized_pipeline_versions:
warnings.warn(
f"The processing status file contains unrecognized versions of the following pipelines in the column '{PROC_STATUS_COLS['pipeline_version']}': "
f"{unrecognized_pipeline_versions}. These will be ignored.\n"
f"{more_info_message}"
)


Expand All @@ -61,8 +119,10 @@ def create_completed_pipelines(session_proc_df: pd.DataFrame) -> list:
PROC_STATUS_COLS["pipeline_version"],
]
):
# Check that all pipeline steps have succeeded
if (
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
pipeline in mappings.KNOWN_PIPELINE_URIS
and version in mappings.KNOWN_PIPELINE_VERSIONS[pipeline]
) and (
session_pipe_df[PROC_STATUS_COLS["status"]].str.lower()
== "success"
).all():
Expand Down
3 changes: 3 additions & 0 deletions tests/data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ _incomplete.tsv | Has a missing value in the `bids_participant_id` column | Fail
_unique_sessions.csv | Includes a unique subject-session (`sub-01`, `ses-03`) not found in the synthetic dataset | Pass
_missing_sessions.tsv | One subject (`sub-02`) is missing all session labels | Pass
_no_bids_sessions.tsv | Has session labels in all rows for `session_id`, but no values in `bids_session_id` column | Pass
_unrecognized_pipelines.tsv | Includes some pipeline names and versions not found in the pipeline catalog | Pass
_no_recognized_pipelines.tsv | Includes pipeline names found in the pipeline catalog, but no recognized versions | Fail



## Example expected CLI outputs
Expand Down
6 changes: 6 additions & 0 deletions tests/data/proc_status_no_recognized_pipelines.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
participant_id bids_participant_id session_id bids_session_id pipeline_name pipeline_version pipeline_step status
01 sub-01 01 ses-01 fmriprep unknown.version1 step1 FAIL
01 sub-01 01 ses-01 fmriprep unknown.version1 step2 INCOMPLETE
01 sub-01 01 ses-01 fmriprep unknown.version2 default SUCCESS
01 sub-01 01 ses-01 freesurfer unknown.version3 default SUCCESS
01 sub-01 02 ses-02 freesurfer unknown.version3 default UNAVAILABLE
6 changes: 6 additions & 0 deletions tests/data/proc_status_unrecognized_pipelines.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
participant_id bids_participant_id session_id bids_session_id pipeline_name pipeline_version pipeline_step status
01 sub-01 01 ses-01 fmriprep unknown.version step1 FAIL
01 sub-01 01 ses-01 fmriprep unknown.version step2 INCOMPLETE
01 sub-01 01 ses-01 unknown-pipeline 1.0.0 default SUCCESS
01 sub-01 01 ses-01 freesurfer 7.3.2 default SUCCESS
01 sub-01 02 ses-02 freesurfer 7.3.2 default UNAVAILABLE
96 changes: 96 additions & 0 deletions tests/integration/test_cli_derivatives.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from bagel import mappings
from bagel.cli import bagel


Expand Down Expand Up @@ -205,3 +206,98 @@ def test_custom_imaging_sessions_created_for_missing_session_labels(

# Note: order of items does not matter for dict comparison
assert custom_ses_completed_pipes == completed_pipes_for_missing_ses_sub


def test_unrecognized_pipelines_and_versions_excluded_from_output(
runner,
test_data,
test_data_upload_path,
default_derivatives_output_path,
load_test_json,
):
"""
Test that when a subset of pipelines or versions from a processing status file are unrecognized,
they are excluded from the output JSONLD with informative warnings, without causing the derivatives command to fail.
"""
with pytest.warns(UserWarning) as w:
result = runner.invoke(
bagel,
[
"derivatives",
"-t",
test_data / "proc_status_unrecognized_pipelines.tsv",
"-p",
test_data_upload_path / "example_synthetic.jsonld",
"-o",
default_derivatives_output_path,
],
catch_exceptions=False,
)

assert result.exit_code == 0, f"Errored out. STDOUT: {result.output}"

assert len(w) == 2
warnings = [warning.message.args[0] for warning in w]
for warn_substrings in [
("unrecognized pipelines", "unknown-pipeline"),
("unrecognized versions", "{'fmriprep': ['unknown.version']}"),
]:
assert any(
all(substr in warning for substr in warn_substrings)
for warning in warnings
)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved

output = load_test_json(default_derivatives_output_path)

sessions_with_completed_pipes = {}
for sub in output["hasSamples"]:
if sub["hasLabel"] == "sub-01":
for ses in sub["hasSession"]:
if (
ses["schemaKey"] == "ImagingSession"
and "hasCompletedPipeline" in ses
):
sessions_with_completed_pipes[ses["hasLabel"]] = ses[
"hasCompletedPipeline"
]
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
surchs marked this conversation as resolved.
Show resolved Hide resolved
surchs marked this conversation as resolved.
Show resolved Hide resolved
surchs marked this conversation as resolved.
Show resolved Hide resolved
sourcery-ai[bot] marked this conversation as resolved.
Show resolved Hide resolved

assert sessions_with_completed_pipes.keys() == {"ses-01"}
ses01_completed_pipes = sessions_with_completed_pipes["ses-01"]
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
assert len(ses01_completed_pipes) == 1
assert (
ses01_completed_pipes[0]["hasPipelineName"]["identifier"]
== f"{mappings.NP.pf}:freesurfer"
)
assert ses01_completed_pipes[0]["hasPipelineVersion"] == "7.3.2"


def test_error_when_no_pipeline_version_combos_recognized(
runner,
test_data,
test_data_upload_path,
default_derivatives_output_path,
load_test_json,
):
"""
Test that when there is no recognized pipeline-version combination in the processing status file,
an error is raised and no output JSONLD is created.
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
"""
with pytest.raises(LookupError) as e:
runner.invoke(
bagel,
[
"derivatives",
"-t",
test_data / "proc_status_no_recognized_pipelines.tsv",
"-p",
test_data_upload_path / "example_synthetic.jsonld",
"-o",
default_derivatives_output_path,
],
catch_exceptions=False,
)

assert "no recognized versions" in str(e.value)
assert (
not default_derivatives_output_path.exists()
), "A JSONLD was created despite inputs being invalid."
63 changes: 38 additions & 25 deletions tests/unit/test_derivative_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,44 +114,57 @@ def test_pipeline_versions_are_loaded():
)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize(
"pipelines, unrecog_pipelines",
[
(["fmriprep", "pipeline1"], ["pipeline1"]),
(["pipelineA", "pipelineB"], ["pipelineA", "pipelineB"]),
],
)
def test_unrecognized_pipeline_names_raise_error(pipelines, unrecog_pipelines):
"""Test that pipeline names not found in the pipeline catalog raise an informative error."""
with pytest.raises(LookupError) as e:
derivative_utils.check_pipelines_are_recognized(pipelines)
def test_warning_raised_when_some_pipeline_names_unrecognized():
"""
Test that when a subset of pipeline names are not found in the pipeline catalog,
an informative warning is raised but the recognized pipeline names are successfully returned.
"""
pipelines = ["fmriprep", "fakepipeline1"]

with pytest.warns(UserWarning) as w:
recognized_pipelines = derivative_utils.get_recognized_pipelines(
pipelines
surchs marked this conversation as resolved.
Show resolved Hide resolved
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
)

assert all(
substr in str(e.value)
for substr in ["unrecognized pipelines"] + unrecog_pipelines
substr in str(w[0].message.args[0])
for substr in ["unrecognized pipelines", "fakepipeline1"]
)
assert recognized_pipelines == ["fmriprep"]


def test_error_raised_when_no_pipeline_names_recognized():
"""
Test that when no provided pipeline names are found in the pipeline catalog,
an informative error is raised.
"""
pipelines = ["fakepipeline1", "fakepipeline2"]

with pytest.raises(LookupError) as e:
derivative_utils.get_recognized_pipelines(pipelines)

assert "no recognized pipelines" in str(e.value)
alyssadai marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize(
"fmriprep_versions, unrecog_versions",
"fmriprep_versions, expctd_recog_versions, expctd_unrecog_versions",
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
[
(["20.2.7", "vA.B"], ["vA.B"]),
(["C.D.E", "F.G.H"], ["C.D.E", "F.G.H"]),
(["20.2.7", "vA.B"], ["20.2.7"], ["vA.B"]),
(["C.D.E", "F.G.H"], [], ["C.D.E", "F.G.H"]),
],
)
def test_unrecognized_pipeline_versions_raise_error(
fmriprep_versions, unrecog_versions
def test_pipeline_versions_classified_correctly(
fmriprep_versions, expctd_recog_versions, expctd_unrecog_versions
):
"""Test that versions of a pipeline not found in the pipeline catalog raise an informative error."""
with pytest.raises(LookupError) as e:
derivative_utils.check_pipeline_versions_are_recognized(
"""Test that versions of a pipeline are correctly classified as recognized or unrecognized according to the pipeline catalog."""
recog_versions, unrecog_versions = (
derivative_utils.classify_pipeline_versions(
"fmriprep", fmriprep_versions
)

assert all(
substr in str(e.value)
for substr in ["unrecognized fmriprep versions"] + unrecog_versions
)
# The order of the versions in the lists is not guaranteed
assert set(recog_versions) == set(expctd_recog_versions)
assert set(unrecog_versions) == set(expctd_unrecog_versions)


def test_create_completed_pipelines():
alyssadai marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading