Skip to content

Commit

Permalink
Update to match neurodatascience/nipoppy:main (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
michellewang authored Jun 11, 2024
2 parents c621f1b + 00c104f commit 307dbbd
Show file tree
Hide file tree
Showing 12 changed files with 592 additions and 50 deletions.
33 changes: 33 additions & 0 deletions nipoppy_cli/nipoppy/tabular/bagel.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,36 @@ class Bagel(BaseTabular):

# set the model
model = BagelModel

def get_completed_participants_sessions(
self,
pipeline_name: str,
pipeline_version: str,
participant: Optional[str] = None,
session: Optional[str] = None,
):
"""
Get participant-session pairs that have successfully completed a pipeline run.
Can optionally filter within a specific participant and/or session.
"""
if participant is None:
participants = set(self[self.col_participant_id])
else:
participants = {participant}
if session is None:
session = set(self[self.col_session])
else:
session = {session}

bagel_subset = self.loc[
(self[self.col_pipeline_name] == pipeline_name)
& (self[self.col_pipeline_version] == pipeline_version)
& (self[self.col_participant_id].isin(participants))
& (self[self.col_session].isin(session))
& (self[self.col_pipeline_complete] == self.status_success)
]

yield from bagel_subset[[self.col_participant_id, self.col_session]].itertuples(
index=False
)
11 changes: 9 additions & 2 deletions nipoppy_cli/nipoppy/workflows/bids_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,16 @@ def get_participants_sessions_to_run(
self, participant: Optional[str], session: Optional[str]
):
"""Return participant-session pairs to run the pipeline on."""
return self.doughnut.get_organized_participants_sessions(
participant=participant, session=session
participants_sessions_bidsified = set(
self.doughnut.get_bidsified_participants_sessions(
participant=participant, session=session
)
)
for participant_session in self.doughnut.get_organized_participants_sessions(
participant=participant, session=session
):
if participant_session not in participants_sessions_bidsified:
yield participant_session

def run_single(self, participant: str, session: str):
"""Run BIDS conversion on a single participant/session."""
Expand Down
24 changes: 23 additions & 1 deletion nipoppy_cli/nipoppy/workflows/dicom_reorg.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pydicom

from nipoppy.tabular.doughnut import update_doughnut
from nipoppy.utils import StrOrPathLike
from nipoppy.workflows.base import BaseWorkflow

Expand Down Expand Up @@ -129,12 +130,33 @@ def run_single(self, participant: str, session: str):
status=True,
)

def get_participants_sessions_to_run(self):
"""Return participant-session pairs to reorganize."""
participants_sessions_organized = set(
self.doughnut.get_organized_participants_sessions()
)
for participant_session in self.doughnut.get_downloaded_participants_sessions():
if participant_session not in participants_sessions_organized:
yield participant_session

def run_setup(self):
"""Update the doughnut in case it is not up-to-date."""
self.doughnut = update_doughnut(
doughnut=self.doughnut,
manifest=self.manifest,
dicom_dir_map=self.dicom_dir_map,
dpath_downloaded=self.layout.dpath_raw_dicom,
dpath_organized=self.layout.dpath_sourcedata,
dpath_bidsified=self.layout.dpath_bids,
logger=self.logger,
)

def run_main(self):
"""Reorganize all downloaded DICOM files."""
for (
participant,
session,
) in self.doughnut.get_downloaded_participants_sessions():
) in self.get_participants_sessions_to_run():
try:
self.run_single(participant, session)
except Exception as exception:
Expand Down
25 changes: 11 additions & 14 deletions nipoppy_cli/nipoppy/workflows/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,26 +359,23 @@ def run_cleanup(self, **kwargs):
self.rm(self.dpath_pipeline_work)
return super().run_cleanup(**kwargs)

@abstractmethod
def get_participants_sessions_to_run(
self, participant: Optional[str], session: Optional[str]
):
"""Return participant-session pairs to run the pipeline on."""
# TODO add option in Boutiques descriptor of pipeline
# 1. "manifest" (or "all"?)
# 2. "downloaded" but not "organized" (from doughnut)
# 3. "organized" but not "bidsified" (from doughnut)
# 4. "bidsified" but not completed (from doughnut/bagel)
# 5. "dataset" (i.e. apply on entire dataset, do not loop over anything)

# for now just check the participants/sessions that have BIDS data
return self.doughnut.get_bidsified_participants_sessions(
participant=participant, session=session
)
"""
Return participant-session pairs to loop over with run_single().
This is an abstract method that should be defined explicitly in subclasses.
"""

@abstractmethod
def run_single(self, participant: Optional[str], session: Optional[str]):
"""Run on a single participant/session."""
pass
"""
Run on a single participant/session.
This is an abstract method that should be defined explicitly in subclasses.
"""

def generate_fpath_log(
self,
Expand Down
30 changes: 30 additions & 0 deletions nipoppy_cli/nipoppy/workflows/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from nipoppy.config.boutiques import BoutiquesConfig
from nipoppy.config.container import ContainerConfig, prepare_container
from nipoppy.tabular.bagel import Bagel
from nipoppy.utils import StrOrPathLike
from nipoppy.workflows.pipeline import BasePipelineWorkflow

Expand Down Expand Up @@ -146,6 +147,35 @@ def launch_boutiques_run(

return descriptor_str, invocation_str

def get_participants_sessions_to_run(
self, participant: Optional[str], session: Optional[str]
):
"""Generate a list of participants and sessions to run.
Specifically, this list will include participants who have BIDS data but
who have not previously successfully completed the pipeline (according)
to the bagel file.
"""
self.check_pipeline_version() # in case this is called outside of run()
if self.layout.fpath_imaging_bagel.exists():
bagel = Bagel.load(self.layout.fpath_imaging_bagel)
participants_sessions_completed = set(
bagel.get_completed_participants_sessions(
pipeline_name=self.pipeline_name,
pipeline_version=self.pipeline_version,
participant=participant,
session=session,
)
)
else:
participants_sessions_completed = {}

for participant_session in self.doughnut.get_bidsified_participants_sessions(
participant=participant, session=session
):
if participant_session not in participants_sessions_completed:
yield participant_session

def run_single(self, participant: str, session: str):
"""Run pipeline on a single participant/session."""
# set up PyBIDS database
Expand Down
9 changes: 8 additions & 1 deletion nipoppy_cli/nipoppy/workflows/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,21 @@ def check_status(self, relative_paths: StrOrPathLike):
f"Checking path {self.dpath_pipeline_output / relative_path}"
)

# TODO handle potentially zipped archives
matches = list(self.dpath_pipeline_output.glob(str(relative_path)))
self.logger.debug(f"Matches: {matches}")
if not matches:
return Bagel.status_fail

return Bagel.status_success

def get_participants_sessions_to_run(
self, participant: Optional[str], session: Optional[str]
):
"""Get participant-session pairs with BIDS data to run the tracker on."""
return self.doughnut.get_bidsified_participants_sessions(
participant=participant, session=session
)

def run_single(self, participant: str, session: str):
"""Run tracker on a single participant/session."""
# load tracker configs from file
Expand Down
73 changes: 73 additions & 0 deletions nipoppy_cli/tests/test_tabular_bagel.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,76 @@ def test_add_or_update_records(data_orig, data_new, data_expected):
bagel = bagel.add_or_update_records(data_new)
expected_bagel = Bagel(data_expected).validate()
assert bagel.equals(expected_bagel)


@pytest.mark.parametrize(
"data,pipeline_name,pipeline_version,participant,session,expected",
[
(
[
["01", "ses-1", "pipeline1", "1.0", Bagel.status_success],
["02", "ses-1", "pipeline1", "1.0", Bagel.status_fail],
["03", "ses-1", "pipeline1", "1.0", Bagel.status_incomplete],
["04", "ses-1", "pipeline1", "1.0", Bagel.status_unavailable],
],
"pipeline1",
"1.0",
None,
None,
[("01", "ses-1")],
),
(
[
["S01", "ses-BL", "pipeline1", "1.0", Bagel.status_success],
["S01", "ses-BL", "pipeline1", "2.0", Bagel.status_success],
["S01", "ses-BL", "pipeline2", "1.0", Bagel.status_success],
["S01", "ses-BL", "pipeline2", "2.0", Bagel.status_success],
],
"pipeline2",
"2.0",
None,
None,
[("S01", "ses-BL")],
),
(
[
["S01", "ses-BL", "pipeline1", "1.0", Bagel.status_success],
["S01", "ses-BL", "pipeline1", "2.0", Bagel.status_success],
["S01", "ses-BL", "pipeline2", "1.0", Bagel.status_success],
["S01", "ses-M12", "pipeline2", "2.0", Bagel.status_success],
["S02", "ses-BL", "pipeline1", "1.0", Bagel.status_success],
["S02", "ses-BL", "pipeline1", "2.0", Bagel.status_success],
["S02", "ses-BL", "pipeline2", "2.0", Bagel.status_success],
["S02", "ses-M12", "pipeline2", "2.0", Bagel.status_success],
],
"pipeline2",
"2.0",
"S02",
"ses-M12",
[("S02", "ses-M12")],
),
],
)
def test_get_completed_participants_sessions(
data, pipeline_name, pipeline_version, participant, session, expected
):
bagel = Bagel(
data,
columns=[
Bagel.col_participant_id,
Bagel.col_session,
Bagel.col_pipeline_name,
Bagel.col_pipeline_version,
Bagel.col_pipeline_complete,
],
).validate()

assert [
tuple(x)
for x in bagel.get_completed_participants_sessions(
pipeline_name=pipeline_name,
pipeline_version=pipeline_version,
participant=participant,
session=session,
)
] == expected
87 changes: 87 additions & 0 deletions nipoppy_cli/tests/test_workflow_bids_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest

from nipoppy.config.main import Config
from nipoppy.tabular.doughnut import Doughnut
from nipoppy.workflows.bids_conversion import BidsConversionRunner

from .conftest import create_empty_dataset, get_config
Expand Down Expand Up @@ -39,3 +40,89 @@ def test_setup(config: Config, tmp_path: Path):
config.save(workflow.layout.fpath_config)
workflow.run_setup()
assert not workflow.dpath_pipeline.exists()


@pytest.mark.parametrize(
"doughnut_data,participant,session,expected",
[
(
[
{
Doughnut.col_participant_id: "S01",
Doughnut.col_session: "ses-1",
Doughnut.col_organized: True,
Doughnut.col_bidsified: False,
},
{
Doughnut.col_participant_id: "S01",
Doughnut.col_session: "ses-2",
Doughnut.col_organized: True,
Doughnut.col_bidsified: True,
},
{
Doughnut.col_participant_id: "S02",
Doughnut.col_session: "ses-3",
Doughnut.col_organized: False,
Doughnut.col_bidsified: False,
},
],
None,
None,
[("S01", "ses-1")],
),
(
[
{
Doughnut.col_participant_id: "P01",
Doughnut.col_session: "ses-A",
Doughnut.col_organized: True,
Doughnut.col_bidsified: False,
},
{
Doughnut.col_participant_id: "P01",
Doughnut.col_session: "ses-B",
Doughnut.col_organized: True,
Doughnut.col_bidsified: False,
},
{
Doughnut.col_participant_id: "P02",
Doughnut.col_session: "ses-B",
Doughnut.col_organized: True,
Doughnut.col_bidsified: False,
},
],
"P01",
"ses-B",
[("P01", "ses-B")],
),
],
)
def test_get_participants_sessions_to_run(
doughnut_data, participant, session, expected, tmp_path: Path
):
workflow = BidsConversionRunner(
dpath_root=tmp_path / "my_dataset",
pipeline_name="heudiconv",
pipeline_version="0.12.2",
pipeline_step="prepare",
)
workflow.doughnut = Doughnut().add_or_update_records(
records=[
{
**data,
Doughnut.col_visit: data[Doughnut.col_session],
Doughnut.col_datatype: None,
Doughnut.col_participant_dicom_dir: "",
Doughnut.col_dicom_id: "",
Doughnut.col_bids_id: "",
Doughnut.col_downloaded: False,
}
for data in doughnut_data
]
)
assert [
tuple(x)
for x in workflow.get_participants_sessions_to_run(
participant=participant, session=session
)
] == expected
Loading

0 comments on commit 307dbbd

Please sign in to comment.