Skip to content

Commit

Permalink
Make backups of bagel.csv and check it when tracking pipelines (+ r…
Browse files Browse the repository at this point in the history
…efactor status -> doughnut) (nipoppy#162)

* try to manually add back changes

* fix bugs from previous changes

* fix column order and participant_sessions selection

* move bagel filename/directory into utils

* update by session+pipeline instead of pipeline only

* fix bug where session_ids is overwritten

* address Nikhil comments + refactor status -> doughnut

* do not use get_end_time() for now because it can be very slow
  • Loading branch information
michellewang authored Oct 2, 2023
1 parent 336ad63 commit 8211212
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 153 deletions.
175 changes: 120 additions & 55 deletions nipoppy/trackers/run_tracker.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
import pandas as pd
from pathlib import Path
#!/usr/bin/env python
import argparse
from nipoppy.trackers.tracker import UNAVAILABLE, Tracker, get_start_time
from nipoppy.trackers import fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker
import nipoppy.workflow.logger as my_logger
import json
import warnings
from pathlib import Path

import pandas as pd

import nipoppy.workflow.logger as my_logger
from nipoppy.trackers.tracker import Tracker, get_start_time, get_end_time, UNAVAILABLE, TRUE
from nipoppy.trackers import fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker
from nipoppy.workflow.utils import (
COL_SUBJECT_MANIFEST,
COL_BIDS_ID_MANIFEST,
COL_SESSION_MANIFEST,
COL_CONV_STATUS,
load_status,
session_id_to_bids_session
)
DNAME_BACKUPS_BAGEL,
FNAME_BAGEL,
FNAME_DOUGHNUT,
load_doughnut,
save_backup,
session_id_to_bids_session,
)

# Globals
PIPELINE_STATUS_COLUMNS = "PIPELINE_STATUS_COLUMNS"
pipeline_tracker_config_dict = {
"freesurfer": fs_tracker.tracker_configs,
"fmriprep": fmriprep_tracker.tracker_configs,
"mriqc": mriqc_tracker.tracker_configs,
"tractoflow": tractoflow_tracker.tracker_configs
"tractoflow": tractoflow_tracker.tracker_configs,
}
BIDS_PIPES = ["mriqc","fmriprep", "tractoflow"]

Expand All @@ -43,47 +50,76 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1,
else:
session_ids = [session_id]

logger.info(f"tracking session_ids: {session_ids}")

proc_status_dfs = [] # list of dataframes
for session_id in session_ids:
logger.info(f"tracking session_ids: {session_ids}")

session = session_id_to_bids_session(session_id)
for pipeline in pipelines:
pipe_tracker = Tracker(global_configs, dash_schema_file, pipeline)

dataset_root, _, version = pipe_tracker.get_global_configs()
schema = pipe_tracker.get_dash_schema()
tracker_configs = pipeline_tracker_config_dict[pipeline]

for pipeline in pipelines:
pipe_tracker = Tracker(global_configs, dash_schema_file, pipeline)
# TODO revise tracker class
DATASET_ROOT, session_ids, version = pipe_tracker.get_global_configs()
schema = pipe_tracker.get_dash_schema()
tracker_configs = pipeline_tracker_config_dict[pipeline]

# Grab BIDS participants from the doughnut
doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/doughnut.csv"
doughnut_df = load_status(doughnut_file)
bids_participants = doughnut_df[(doughnut_df[COL_SESSION_MANIFEST]==session) &
(doughnut_df[COL_CONV_STATUS])][COL_BIDS_ID_MANIFEST].unique()
n_bids_participants = len(bids_participants)

logger.info("-"*50)
logger.info(f"pipeline: {pipeline}, version: {version}")
logger.info(f"n_participants: {n_bids_participants}, session_ids: {session_ids}")
logger.info("-"*50)

status_check_dict = pipe_tracker.get_pipe_tasks(tracker_configs, PIPELINE_STATUS_COLUMNS, pipeline, version)

dash_col_list = list(schema["GLOBAL_COLUMNS"].keys())

logger.info(f"Checking session: {session_id}")
_df = pd.DataFrame(index=bids_participants, columns=dash_col_list)
_df["session"] = session
_df["pipeline_name"] = pipeline
# Grab BIDS participants from the doughnut
doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/{FNAME_DOUGHNUT}"
doughnut_df = load_doughnut(doughnut_file)
participants_total = doughnut_df[doughnut_df[COL_CONV_STATUS]][COL_BIDS_ID_MANIFEST].unique()
n_participants_total = len(participants_total)

logger.info("-"*50)
logger.info(f"pipeline: {pipeline}, version: {version}")
logger.info(f"n_participants_total: {n_participants_total}, session_ids: {session_ids}")
logger.info("-"*50)

status_check_dict = pipe_tracker.get_pipe_tasks(tracker_configs, PIPELINE_STATUS_COLUMNS, pipeline, version)

# only use non-prefixed columns at this stage
# for prefixed columns we need to generate the column name
dash_col_list = list(key for key, value in schema["GLOBAL_COLUMNS"].items() if not value["IsPrefixedColumn"])

for session_id in session_ids:
session = session_id_to_bids_session(session_id)
logger.info(f"Checking session: {session}")

participants_session = doughnut_df[(doughnut_df[COL_BIDS_ID_MANIFEST].isin(participants_total)) & (doughnut_df[COL_SESSION_MANIFEST] == session)][COL_BIDS_ID_MANIFEST].drop_duplicates().astype(str).str.strip().values
n_participants_session = len(participants_session)
logger.info(f"n_participants_session: {n_participants_session}")

_df = pd.DataFrame(index=participants_session, columns=dash_col_list)
_df[COL_SESSION_MANIFEST] = session
_df["pipeline_name"] = pipeline
_df["pipeline_version"] = version
_df["has_mri_data"] = TRUE # everyone in the doughnut file has MRI data

fpath_bagel = Path(dataset_root, 'derivatives', FNAME_BAGEL)
if fpath_bagel.exists():
df_bagel_old_full = load_bagel(fpath_bagel)

df_bagel_old_session = df_bagel_old_full.loc[df_bagel_old_full[COL_SESSION_MANIFEST] == session]
old_participants_session = set(df_bagel_old_session[COL_BIDS_ID_MANIFEST])
old_pipelines_session = set(df_bagel_old_session['pipeline_name'])

# make sure the number of participants is consistent across pipelines
if set(participants_session) != old_participants_session and not old_pipelines_session.issubset(set(pipelines)):
warnings.warn(
f'The existing bagel file might be obsolete (participant list does not match the doughnut file for session {session})'
f'. Rerun the tracker script with --pipelines {" ".join(old_pipelines_session.union(pipelines))}'
)

df_bagel_old = df_bagel_old_full.loc[
~(
(df_bagel_old_full["pipeline_name"] == pipeline) &
(df_bagel_old_full["pipeline_version"] == version) &
(df_bagel_old_full[COL_SESSION_MANIFEST] == session)
)
]

else:
df_bagel_old = None

for bids_id in bids_participants:
for bids_id in participants_session:
participant_id = doughnut_df[doughnut_df[COL_BIDS_ID_MANIFEST]==bids_id][COL_SUBJECT_MANIFEST].values[0]
_df.loc[bids_id,COL_SUBJECT_MANIFEST] = participant_id
logger.debug(f"bids_id: {bids_id}, participant_id: {participant_id}")
_df.loc[bids_id, COL_SUBJECT_MANIFEST] = participant_id
_df.loc[bids_id, COL_BIDS_ID_MANIFEST] = bids_id

if pipeline == "freesurfer":
subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/ses-{session_id}/{bids_id}"
Expand All @@ -97,29 +133,58 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1,

if dir_status:
for name, func in status_check_dict.items():
status = func(subject_dir, session, run_id)
status = func(subject_dir, session_id, run_id)
logger.info(f"task_name: {name}, status: {status}")
_df.loc[bids_id,name] = status
_df.loc[bids_id,"pipeline_starttime"] = get_start_time(subject_dir)
_df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # TODO
# TODO only check files listed in the tracker config
_df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # get_end_time(subject_dir)
else:
logger.error(f"Output for pipeline: {pipeline} not found for bids_id: {bids_id}, session: {session}")
for name in status_check_dict.keys():
_df.loc[bids_id,name] = UNAVAILABLE
_df.loc[bids_id,"pipeline_starttime"] = UNAVAILABLE
_df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE

proc_status_dfs.append(_df)
_df = _df.reset_index(drop=True)

# add old rows from other pipelines/sessions and sort for consistent order
df_bagel: pd.DataFrame = pd.concat([df_bagel_old, _df], axis='index')
df_bagel = df_bagel.sort_values(["pipeline_name", "pipeline_version", COL_BIDS_ID_MANIFEST], ignore_index=True)

proc_status_df = pd.concat(proc_status_dfs, axis='index')
# don't write a new file if no changes
try:
if len(df_bagel.compare(df_bagel_old_full)) == 0:
logger.info(f'\nNo change in bagel file for pipeline {pipeline}, session {session}')
continue
except Exception:
pass

# save bagel
save_backup(df_bagel, fpath_bagel, DNAME_BACKUPS_BAGEL)

# Save proc_status_df
tracker_csv = f"{DATASET_ROOT}/derivatives/bagel.csv"
proc_status_df = proc_status_df.drop(columns=COL_BIDS_ID_MANIFEST)
proc_status_df.index.name = COL_BIDS_ID_MANIFEST
proc_status_df.to_csv(tracker_csv)
def load_bagel(fpath_bagel):

logger.info(f"Saved to {tracker_csv}")
def time_converter(value):
# convert to datetime if possible
if str(value) != UNAVAILABLE:
return pd.to_datetime(value)
return value

df_bagel = pd.read_csv(
fpath_bagel,
dtype={
'has_mri_data': bool,
'participant_id': str,
'session': str,
},
converters={
'pipeline_starttime': time_converter,
'pipeline_endtime': time_converter,
}
)

return df_bagel

if __name__ == '__main__':
# argparse
Expand Down
20 changes: 10 additions & 10 deletions nipoppy/workflow/bids_conv/run_bids_conv.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from nipoppy.workflow.utils import (
COL_CONV_STATUS,
COL_DICOM_ID,
DNAME_BACKUPS_STATUS,
FNAME_STATUS,
load_status,
DNAME_BACKUPS_DOUGHNUT,
FNAME_DOUGHNUT,
load_doughnut,
save_backup,
session_id_to_bids_session,
)
Expand Down Expand Up @@ -118,18 +118,18 @@ def run(global_configs, session_id, stage=2, overlays=None, n_jobs=2, dicom_id=N
logger.info(f"Running HeuDiConv stage: {stage}")
logger.info(f"Number of parallel jobs: {n_jobs}")

fpath_status = Path(DATASET_ROOT, 'scratch', 'raw_dicom', FNAME_STATUS)
fpath_doughnut = Path(DATASET_ROOT, 'scratch', 'raw_dicom', FNAME_DOUGHNUT)
bids_dir = f"{DATASET_ROOT}/bids/"

df_status = load_status(fpath_status)
df_doughnut = load_doughnut(fpath_doughnut)

# participants to process with Heudiconv
if dicom_id is None:
heudiconv_df = catalog.get_new_dicoms(fpath_status, session_id, logger)
heudiconv_df = catalog.get_new_dicoms(fpath_doughnut, session_id, logger)
else:
# filter by DICOM ID if needed
logger.info(f'Only running for participant: {dicom_id}')
heudiconv_df = df_status.loc[df_status[COL_DICOM_ID] == dicom_id]
heudiconv_df = df_doughnut.loc[df_doughnut[COL_DICOM_ID] == dicom_id]

heudiconv_participants = set(heudiconv_df["dicom_id"].values)
n_heudiconv_participants = len(heudiconv_participants)
Expand Down Expand Up @@ -185,8 +185,8 @@ def run(global_configs, session_id, stage=2, overlays=None, n_jobs=2, dicom_id=N

if len(new_participants_with_bids) > 0:
heudiconv_df.loc[heudiconv_df[COL_DICOM_ID].isin(new_participants_with_bids), COL_CONV_STATUS] = True
df_status.loc[heudiconv_df.index] = heudiconv_df
save_backup(df_status, fpath_status, DNAME_BACKUPS_STATUS)
df_doughnut.loc[heudiconv_df.index] = heudiconv_df
save_backup(df_doughnut, fpath_doughnut, DNAME_BACKUPS_DOUGHNUT)

else:
logger.info(f"No new participants found for bids conversion...")
Expand All @@ -206,7 +206,7 @@ def run(global_configs, session_id, stage=2, overlays=None, n_jobs=2, dicom_id=N
parser.add_argument('--stage', type=int, default=2, help='heudiconv stage (either 1 or 2, default: 2)')
parser.add_argument('--overlay', type=str, nargs='+', help='path(s) to Squashfs overlay(s)')
parser.add_argument('--n_jobs', type=int, default=2, help='number of parallel processes (default: 2)')
parser.add_argument('--dicom_id', type=str, help='dicom id for a single participant to run (default: run on all participants in the status file)')
parser.add_argument('--dicom_id', type=str, help='dicom id for a single participant to run (default: run on all participants in the doughnut file)')
parser.add_argument('--copy_files', nargs='+', type=str, help='path(s) to file(s) to copy to /scratch/proc in the container')

args = parser.parse_args()
Expand Down
Loading

0 comments on commit 8211212

Please sign in to comment.