From 821121222384e52e97b24d1c7a3d00983f8af4f7 Mon Sep 17 00:00:00 2001 From: Michelle Wang Date: Mon, 2 Oct 2023 16:15:08 -0400 Subject: [PATCH] Make backups of `bagel.csv` and check it when tracking pipelines (+ refactor status -> doughnut) (#162) * try to manually add back changes * fix bugs from previous changes * fix column order and participant_sessions selection * move bagel filename/directory into utils * update by session+pipeline instead of pipeline only * fix bug where session_ids is overwritten * address Nikhil comments + refactor status -> doughnut * do not use get_end_time() for now because it can be very slow --- nipoppy/trackers/run_tracker.py | 175 ++++++++++++------ nipoppy/workflow/bids_conv/run_bids_conv.py | 20 +- nipoppy/workflow/catalog.py | 56 +++--- .../workflow/dicom_org/check_dicom_status.py | 94 +++++----- nipoppy/workflow/dicom_org/run_dicom_org.py | 19 +- nipoppy/workflow/utils.py | 10 +- 6 files changed, 221 insertions(+), 153 deletions(-) diff --git a/nipoppy/trackers/run_tracker.py b/nipoppy/trackers/run_tracker.py index e9f3f88b..03dd5baf 100755 --- a/nipoppy/trackers/run_tracker.py +++ b/nipoppy/trackers/run_tracker.py @@ -1,19 +1,26 @@ -import pandas as pd -from pathlib import Path +#!/usr/bin/env python import argparse -from nipoppy.trackers.tracker import UNAVAILABLE, Tracker, get_start_time -from nipoppy.trackers import fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker -import nipoppy.workflow.logger as my_logger import json +import warnings +from pathlib import Path + +import pandas as pd +import nipoppy.workflow.logger as my_logger +from nipoppy.trackers.tracker import Tracker, get_start_time, get_end_time, UNAVAILABLE, TRUE +from nipoppy.trackers import fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker from nipoppy.workflow.utils import ( COL_SUBJECT_MANIFEST, COL_BIDS_ID_MANIFEST, COL_SESSION_MANIFEST, COL_CONV_STATUS, - load_status, - session_id_to_bids_session -) + DNAME_BACKUPS_BAGEL, + FNAME_BAGEL, + FNAME_DOUGHNUT, + load_doughnut, + save_backup, + session_id_to_bids_session, +) # Globals PIPELINE_STATUS_COLUMNS = "PIPELINE_STATUS_COLUMNS" @@ -21,7 +28,7 @@ "freesurfer": fs_tracker.tracker_configs, "fmriprep": fmriprep_tracker.tracker_configs, "mriqc": mriqc_tracker.tracker_configs, - "tractoflow": tractoflow_tracker.tracker_configs + "tractoflow": tractoflow_tracker.tracker_configs, } BIDS_PIPES = ["mriqc","fmriprep", "tractoflow"] @@ -43,47 +50,76 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, else: session_ids = [session_id] - logger.info(f"tracking session_ids: {session_ids}") - - proc_status_dfs = [] # list of dataframes - for session_id in session_ids: + logger.info(f"tracking session_ids: {session_ids}") - session = session_id_to_bids_session(session_id) + for pipeline in pipelines: + pipe_tracker = Tracker(global_configs, dash_schema_file, pipeline) + + dataset_root, _, version = pipe_tracker.get_global_configs() + schema = pipe_tracker.get_dash_schema() + tracker_configs = pipeline_tracker_config_dict[pipeline] - for pipeline in pipelines: - pipe_tracker = Tracker(global_configs, dash_schema_file, pipeline) - - # TODO revise tracker class - DATASET_ROOT, session_ids, version = pipe_tracker.get_global_configs() - schema = pipe_tracker.get_dash_schema() - tracker_configs = pipeline_tracker_config_dict[pipeline] - - # Grab BIDS participants from the doughnut - doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/doughnut.csv" - doughnut_df = load_status(doughnut_file) - bids_participants = doughnut_df[(doughnut_df[COL_SESSION_MANIFEST]==session) & - (doughnut_df[COL_CONV_STATUS])][COL_BIDS_ID_MANIFEST].unique() - n_bids_participants = len(bids_participants) - - logger.info("-"*50) - logger.info(f"pipeline: {pipeline}, version: {version}") - logger.info(f"n_participants: {n_bids_participants}, session_ids: {session_ids}") - logger.info("-"*50) - - status_check_dict = pipe_tracker.get_pipe_tasks(tracker_configs, PIPELINE_STATUS_COLUMNS, pipeline, version) - - dash_col_list = list(schema["GLOBAL_COLUMNS"].keys()) - - logger.info(f"Checking session: {session_id}") - _df = pd.DataFrame(index=bids_participants, columns=dash_col_list) - _df["session"] = session - _df["pipeline_name"] = pipeline + # Grab BIDS participants from the doughnut + doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/{FNAME_DOUGHNUT}" + doughnut_df = load_doughnut(doughnut_file) + participants_total = doughnut_df[doughnut_df[COL_CONV_STATUS]][COL_BIDS_ID_MANIFEST].unique() + n_participants_total = len(participants_total) + + logger.info("-"*50) + logger.info(f"pipeline: {pipeline}, version: {version}") + logger.info(f"n_participants_total: {n_participants_total}, session_ids: {session_ids}") + logger.info("-"*50) + + status_check_dict = pipe_tracker.get_pipe_tasks(tracker_configs, PIPELINE_STATUS_COLUMNS, pipeline, version) + + # only use non-prefixed columns at this stage + # for prefixed columns we need to generate the column name + dash_col_list = list(key for key, value in schema["GLOBAL_COLUMNS"].items() if not value["IsPrefixedColumn"]) + + for session_id in session_ids: + session = session_id_to_bids_session(session_id) + logger.info(f"Checking session: {session}") + + participants_session = doughnut_df[(doughnut_df[COL_BIDS_ID_MANIFEST].isin(participants_total)) & (doughnut_df[COL_SESSION_MANIFEST] == session)][COL_BIDS_ID_MANIFEST].drop_duplicates().astype(str).str.strip().values + n_participants_session = len(participants_session) + logger.info(f"n_participants_session: {n_participants_session}") + + _df = pd.DataFrame(index=participants_session, columns=dash_col_list) + _df[COL_SESSION_MANIFEST] = session + _df["pipeline_name"] = pipeline _df["pipeline_version"] = version + _df["has_mri_data"] = TRUE # everyone in the doughnut file has MRI data + + fpath_bagel = Path(dataset_root, 'derivatives', FNAME_BAGEL) + if fpath_bagel.exists(): + df_bagel_old_full = load_bagel(fpath_bagel) + + df_bagel_old_session = df_bagel_old_full.loc[df_bagel_old_full[COL_SESSION_MANIFEST] == session] + old_participants_session = set(df_bagel_old_session[COL_BIDS_ID_MANIFEST]) + old_pipelines_session = set(df_bagel_old_session['pipeline_name']) + + # make sure the number of participants is consistent across pipelines + if set(participants_session) != old_participants_session and not old_pipelines_session.issubset(set(pipelines)): + warnings.warn( + f'The existing bagel file might be obsolete (participant list does not match the doughnut file for session {session})' + f'. Rerun the tracker script with --pipelines {" ".join(old_pipelines_session.union(pipelines))}' + ) + + df_bagel_old = df_bagel_old_full.loc[ + ~( + (df_bagel_old_full["pipeline_name"] == pipeline) & + (df_bagel_old_full["pipeline_version"] == version) & + (df_bagel_old_full[COL_SESSION_MANIFEST] == session) + ) + ] + + else: + df_bagel_old = None - for bids_id in bids_participants: + for bids_id in participants_session: participant_id = doughnut_df[doughnut_df[COL_BIDS_ID_MANIFEST]==bids_id][COL_SUBJECT_MANIFEST].values[0] - _df.loc[bids_id,COL_SUBJECT_MANIFEST] = participant_id - logger.debug(f"bids_id: {bids_id}, participant_id: {participant_id}") + _df.loc[bids_id, COL_SUBJECT_MANIFEST] = participant_id + _df.loc[bids_id, COL_BIDS_ID_MANIFEST] = bids_id if pipeline == "freesurfer": subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/ses-{session_id}/{bids_id}" @@ -97,11 +133,12 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, if dir_status: for name, func in status_check_dict.items(): - status = func(subject_dir, session, run_id) + status = func(subject_dir, session_id, run_id) logger.info(f"task_name: {name}, status: {status}") _df.loc[bids_id,name] = status _df.loc[bids_id,"pipeline_starttime"] = get_start_time(subject_dir) - _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # TODO + # TODO only check files listed in the tracker config + _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # get_end_time(subject_dir) else: logger.error(f"Output for pipeline: {pipeline} not found for bids_id: {bids_id}, session: {session}") for name in status_check_dict.keys(): @@ -109,17 +146,45 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id=1, _df.loc[bids_id,"pipeline_starttime"] = UNAVAILABLE _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE - proc_status_dfs.append(_df) + _df = _df.reset_index(drop=True) + + # add old rows from other pipelines/sessions and sort for consistent order + df_bagel: pd.DataFrame = pd.concat([df_bagel_old, _df], axis='index') + df_bagel = df_bagel.sort_values(["pipeline_name", "pipeline_version", COL_BIDS_ID_MANIFEST], ignore_index=True) - proc_status_df = pd.concat(proc_status_dfs, axis='index') + # don't write a new file if no changes + try: + if len(df_bagel.compare(df_bagel_old_full)) == 0: + logger.info(f'\nNo change in bagel file for pipeline {pipeline}, session {session}') + continue + except Exception: + pass + + # save bagel + save_backup(df_bagel, fpath_bagel, DNAME_BACKUPS_BAGEL) - # Save proc_status_df - tracker_csv = f"{DATASET_ROOT}/derivatives/bagel.csv" - proc_status_df = proc_status_df.drop(columns=COL_BIDS_ID_MANIFEST) - proc_status_df.index.name = COL_BIDS_ID_MANIFEST - proc_status_df.to_csv(tracker_csv) +def load_bagel(fpath_bagel): - logger.info(f"Saved to {tracker_csv}") + def time_converter(value): + # convert to datetime if possible + if str(value) != UNAVAILABLE: + return pd.to_datetime(value) + return value + + df_bagel = pd.read_csv( + fpath_bagel, + dtype={ + 'has_mri_data': bool, + 'participant_id': str, + 'session': str, + }, + converters={ + 'pipeline_starttime': time_converter, + 'pipeline_endtime': time_converter, + } + ) + + return df_bagel if __name__ == '__main__': # argparse diff --git a/nipoppy/workflow/bids_conv/run_bids_conv.py b/nipoppy/workflow/bids_conv/run_bids_conv.py index dc334928..cf460197 100755 --- a/nipoppy/workflow/bids_conv/run_bids_conv.py +++ b/nipoppy/workflow/bids_conv/run_bids_conv.py @@ -18,9 +18,9 @@ from nipoppy.workflow.utils import ( COL_CONV_STATUS, COL_DICOM_ID, - DNAME_BACKUPS_STATUS, - FNAME_STATUS, - load_status, + DNAME_BACKUPS_DOUGHNUT, + FNAME_DOUGHNUT, + load_doughnut, save_backup, session_id_to_bids_session, ) @@ -118,18 +118,18 @@ def run(global_configs, session_id, stage=2, overlays=None, n_jobs=2, dicom_id=N logger.info(f"Running HeuDiConv stage: {stage}") logger.info(f"Number of parallel jobs: {n_jobs}") - fpath_status = Path(DATASET_ROOT, 'scratch', 'raw_dicom', FNAME_STATUS) + fpath_doughnut = Path(DATASET_ROOT, 'scratch', 'raw_dicom', FNAME_DOUGHNUT) bids_dir = f"{DATASET_ROOT}/bids/" - df_status = load_status(fpath_status) + df_doughnut = load_doughnut(fpath_doughnut) # participants to process with Heudiconv if dicom_id is None: - heudiconv_df = catalog.get_new_dicoms(fpath_status, session_id, logger) + heudiconv_df = catalog.get_new_dicoms(fpath_doughnut, session_id, logger) else: # filter by DICOM ID if needed logger.info(f'Only running for participant: {dicom_id}') - heudiconv_df = df_status.loc[df_status[COL_DICOM_ID] == dicom_id] + heudiconv_df = df_doughnut.loc[df_doughnut[COL_DICOM_ID] == dicom_id] heudiconv_participants = set(heudiconv_df["dicom_id"].values) n_heudiconv_participants = len(heudiconv_participants) @@ -185,8 +185,8 @@ def run(global_configs, session_id, stage=2, overlays=None, n_jobs=2, dicom_id=N if len(new_participants_with_bids) > 0: heudiconv_df.loc[heudiconv_df[COL_DICOM_ID].isin(new_participants_with_bids), COL_CONV_STATUS] = True - df_status.loc[heudiconv_df.index] = heudiconv_df - save_backup(df_status, fpath_status, DNAME_BACKUPS_STATUS) + df_doughnut.loc[heudiconv_df.index] = heudiconv_df + save_backup(df_doughnut, fpath_doughnut, DNAME_BACKUPS_DOUGHNUT) else: logger.info(f"No new participants found for bids conversion...") @@ -206,7 +206,7 @@ def run(global_configs, session_id, stage=2, overlays=None, n_jobs=2, dicom_id=N parser.add_argument('--stage', type=int, default=2, help='heudiconv stage (either 1 or 2, default: 2)') parser.add_argument('--overlay', type=str, nargs='+', help='path(s) to Squashfs overlay(s)') parser.add_argument('--n_jobs', type=int, default=2, help='number of parallel processes (default: 2)') - parser.add_argument('--dicom_id', type=str, help='dicom id for a single participant to run (default: run on all participants in the status file)') + parser.add_argument('--dicom_id', type=str, help='dicom id for a single participant to run (default: run on all participants in the doughnut file)') parser.add_argument('--copy_files', nargs='+', type=str, help='path(s) to file(s) to copy to /scratch/proc in the container') args = parser.parse_args() diff --git a/nipoppy/workflow/catalog.py b/nipoppy/workflow/catalog.py index 913940da..a85fb131 100644 --- a/nipoppy/workflow/catalog.py +++ b/nipoppy/workflow/catalog.py @@ -11,18 +11,18 @@ COL_ORG_STATUS, COL_SESSION_MANIFEST, COL_SUBJECT_MANIFEST, - load_status, + load_doughnut, ) -def read_and_process_status(status_csv, session_id, logger): +def read_and_process_doughnut(fpath_doughnut, session_id, logger): # read current participant manifest - status_df = load_status(status_csv) + df_doughnut = load_doughnut(fpath_doughnut) session = f"ses-{session_id}" # filter session - status_df = status_df.loc[status_df[COL_SESSION_MANIFEST] == session] + df_doughnut = df_doughnut.loc[df_doughnut[COL_SESSION_MANIFEST] == session] - return status_df + return df_doughnut def list_dicoms(dcm_dir, logger): # check current dicom dir @@ -50,11 +50,11 @@ def list_bids(bids_dir, session_id, logger): return current_bids_session_dirs -def get_new_downloads(status_csv, raw_dicom_dir, session_id, logger): +def get_new_downloads(fpath_doughnut, raw_dicom_dir, session_id, logger): """ Identify new dicoms not yet inside /scratch/raw_dicom """ - status_df = read_and_process_status(status_csv, session_id, logger) - participants = set(status_df[COL_SUBJECT_MANIFEST]) + df_doughnut = read_and_process_doughnut(fpath_doughnut, session_id, logger) + participants = set(df_doughnut[COL_SUBJECT_MANIFEST]) n_participants = len(participants) logger.info("-"*50) @@ -66,52 +66,52 @@ def get_new_downloads(status_csv, raw_dicom_dir, session_id, logger): logger.info("-"*50) n_available_raw_dicom_dirs = len(available_raw_dicom_dirs) - available_raw_dicom_dirs_participant_ids = list(status_df[status_df[COL_PARTICIPANT_DICOM_DIR].isin(available_raw_dicom_dirs)][COL_SUBJECT_MANIFEST].astype(str).values) + available_raw_dicom_dirs_participant_ids = list(df_doughnut[df_doughnut[COL_PARTICIPANT_DICOM_DIR].isin(available_raw_dicom_dirs)][COL_SUBJECT_MANIFEST].astype(str).values) - # check mismatch between status file and raw_dicoms + # check mismatch between doughnut file and raw_dicoms download_dicom_dir_participant_ids = set(participants) - set(available_raw_dicom_dirs_participant_ids) n_download_dicom_dirs = len(download_dicom_dir_participant_ids) - download_df = status_df[status_df[COL_SUBJECT_MANIFEST].isin(download_dicom_dir_participant_ids)] + download_df = df_doughnut[df_doughnut[COL_SUBJECT_MANIFEST].isin(download_dicom_dir_participant_ids)] logger.info("-"*50) logger.info(f"Identifying participants to be downloaded\n\n \ - - n_participants (listed in the status file): {n_participants}\n \ + - n_participants (listed in the doughnut file): {n_participants}\n \ - n_available_raw_dicom_dirs: {n_available_raw_dicom_dirs}\n \ - n_download_dicom_dirs: {n_download_dicom_dirs}\n") logger.info("-"*50) return download_df -def get_new_raw_dicoms(status_csv, session_id, logger): +def get_new_raw_dicoms(fpath_doughnut, session_id, logger): """ Identify new raw_dicoms not yet reorganized inside /dicom """ - status_df = read_and_process_status(status_csv, session_id, logger) - participants_all = set(status_df[COL_SUBJECT_MANIFEST]) + df_doughnut = read_and_process_doughnut(fpath_doughnut, session_id, logger) + participants_all = set(df_doughnut[COL_SUBJECT_MANIFEST]) n_participants_all = len(participants_all) # check raw dicom dir (downloaded) - downloaded = set(status_df.loc[status_df[COL_DOWNLOAD_STATUS], COL_SUBJECT_MANIFEST]) + downloaded = set(df_doughnut.loc[df_doughnut[COL_DOWNLOAD_STATUS], COL_SUBJECT_MANIFEST]) n_downloaded = len(downloaded) # check current dicom dir (already reorganized) - downloaded_but_not_reorganized = downloaded & set(status_df.loc[~status_df[COL_ORG_STATUS], COL_SUBJECT_MANIFEST]) + downloaded_but_not_reorganized = downloaded & set(df_doughnut.loc[~df_doughnut[COL_ORG_STATUS], COL_SUBJECT_MANIFEST]) n_downloaded_but_not_reorganized = len(downloaded_but_not_reorganized) - reorg_df = status_df.loc[status_df[COL_SUBJECT_MANIFEST].isin(downloaded_but_not_reorganized)] + reorg_df = df_doughnut.loc[df_doughnut[COL_SUBJECT_MANIFEST].isin(downloaded_but_not_reorganized)] # check participant dicom dirs if not reorg_df[COL_PARTICIPANT_DICOM_DIR].isna().all(): - logger.info("Using dicom filename from the status file") + logger.info("Using dicom filename from the doughnut file") else: - logger.warning(f"{COL_PARTICIPANT_DICOM_DIR} is not specified in the status file") + logger.warning(f"{COL_PARTICIPANT_DICOM_DIR} is not specified in the doughnut file") logger.info(f"Assuming {COL_SUBJECT_MANIFEST} is the dicom filename") reorg_df[COL_PARTICIPANT_DICOM_DIR] = reorg_df[COL_SUBJECT_MANIFEST].copy() logger.info("-"*50) logger.info( f"Identifying participants to be reorganized\n\n" - f"- n_participants_all (listed in the status file): {n_participants_all}\n" + f"- n_participants_all (listed in the doughnut file): {n_participants_all}\n" f"- n_downloaded: {n_downloaded}\n" f"- n_missing: {n_participants_all - n_downloaded}\n" f"- n_downloaded_but_not_reorganized: {n_downloaded_but_not_reorganized}\n" @@ -120,27 +120,27 @@ def get_new_raw_dicoms(status_csv, session_id, logger): return reorg_df -def get_new_dicoms(status_csv, session_id, logger): +def get_new_dicoms(fpath_doughnut, session_id, logger): """ Identify new dicoms not yet BIDSified """ - status_df = read_and_process_status(status_csv, session_id, logger) - participants_all = set(status_df[COL_SUBJECT_MANIFEST]) + df_doughnut = read_and_process_doughnut(fpath_doughnut, session_id, logger) + participants_all = set(df_doughnut[COL_SUBJECT_MANIFEST]) n_participants_all = len(participants_all) # check current dicom dir (reorganized) - organized = set(status_df.loc[status_df[COL_ORG_STATUS], COL_SUBJECT_MANIFEST]) + organized = set(df_doughnut.loc[df_doughnut[COL_ORG_STATUS], COL_SUBJECT_MANIFEST]) n_organized = len(organized) # check bids dir (already converted) - organized_but_not_bids = organized & set(status_df.loc[~status_df[COL_CONV_STATUS], COL_SUBJECT_MANIFEST]) + organized_but_not_bids = organized & set(df_doughnut.loc[~df_doughnut[COL_CONV_STATUS], COL_SUBJECT_MANIFEST]) n_organized_but_not_bids = len(organized_but_not_bids) - heudiconv_df = status_df.loc[status_df[COL_SUBJECT_MANIFEST].isin(organized_but_not_bids)] + heudiconv_df = df_doughnut.loc[df_doughnut[COL_SUBJECT_MANIFEST].isin(organized_but_not_bids)] logger.info("-"*50) logger.info( "Identifying participants to be BIDSified\n\n" - f"- n_participants (listed in the status file): {n_participants_all}\n" + f"- n_participants (listed in the doughnut file): {n_participants_all}\n" f"- n_organized: {n_organized}\n" f"- n_missing: {n_participants_all - n_organized}\n" f"- n_organized_but_not_bids: {n_organized_but_not_bids}\n" diff --git a/nipoppy/workflow/dicom_org/check_dicom_status.py b/nipoppy/workflow/dicom_org/check_dicom_status.py index 6b97e24c..65dc7784 100755 --- a/nipoppy/workflow/dicom_org/check_dicom_status.py +++ b/nipoppy/workflow/dicom_org/check_dicom_status.py @@ -20,8 +20,8 @@ COL_SESSION_MANIFEST, COL_SUBJECT_MANIFEST, COLS_STATUS, - DNAME_BACKUPS_STATUS, - FNAME_STATUS, + DNAME_BACKUPS_DOUGHNUT, + FNAME_DOUGHNUT, FNAME_MANIFEST, load_manifest, participant_id_to_bids_id, @@ -30,7 +30,7 @@ save_backup, ) -DPATH_STATUS_RELATIVE = Path('scratch', 'raw_dicom') +DPATH_DOUGHNUT_RELATIVE = Path('scratch', 'raw_dicom') FPATH_MANIFEST_RELATIVE = Path('tabular') / FNAME_MANIFEST FLAG_EMPTY = '--empty' @@ -51,8 +51,8 @@ def run(global_config_file, regenerate=False, empty=False): dpath_organized_dicom = dpath_dataset / 'dicom' dpath_converted = dpath_dataset / 'bids' - # get path to status file - fpath_status_symlink = dpath_dataset / DPATH_STATUS_RELATIVE / FNAME_STATUS + # get path to doughnut file + fpath_doughnut_symlink = dpath_dataset / DPATH_DOUGHNUT_RELATIVE / FNAME_DOUGHNUT # load manifest fpath_manifest = dpath_dataset / FPATH_MANIFEST_RELATIVE @@ -71,46 +71,46 @@ def run(global_config_file, regenerate=False, empty=False): ) # only participants with imaging data have non-empty session column - df_status = df_manifest.loc[~df_manifest[COL_SESSION_MANIFEST].isna()].copy() + df_doughnut = df_manifest.loc[~df_manifest[COL_SESSION_MANIFEST].isna()].copy() # sanity check that everyone who has session_id also has non-empty datatype list - has_datatypes = df_status.set_index(COL_SUBJECT_MANIFEST)[COL_DATATYPE_MANIFEST].apply(lambda datatypes: len(datatypes) > 0) + has_datatypes = df_doughnut.set_index(COL_SUBJECT_MANIFEST)[COL_DATATYPE_MANIFEST].apply(lambda datatypes: len(datatypes) > 0) participants_without_datatypes = has_datatypes.loc[~has_datatypes].index.values if len(participants_without_datatypes) > 0: raise ValueError( f'Some participants have a value in "{COL_SESSION_MANIFEST}" but nothing in "{COL_DATATYPE_MANIFEST}": {participants_without_datatypes}' ) - # look for existing status file - if fpath_status_symlink.exists() and not empty: - df_status_old = pd.read_csv(fpath_status_symlink, dtype=str) + # look for existing doughnut file + if fpath_doughnut_symlink.exists() and not empty: + df_doughnut_old = pd.read_csv(fpath_doughnut_symlink, dtype=str) else: - df_status_old = None + df_doughnut_old = None if (not empty) and (not regenerate): raise ValueError( - f'Did not find an existing {FNAME_STATUS} file' + f'Did not find an existing {FNAME_DOUGHNUT} file' f'. Use {FLAG_EMPTY} to create an empty one' f' or {FLAG_REGENERATE} to create one based on current files' ' in the dataset (can be slow)' ) # generate bids_id - df_status.loc[:, COL_BIDS_ID_MANIFEST] = df_status[COL_SUBJECT_MANIFEST].apply( + df_doughnut.loc[:, COL_BIDS_ID_MANIFEST] = df_doughnut[COL_SUBJECT_MANIFEST].apply( participant_id_to_bids_id ) # initialize dicom dir (cannot be inferred directly from participant id) - df_status.loc[:, COL_PARTICIPANT_DICOM_DIR] = np.nan + df_doughnut.loc[:, COL_PARTICIPANT_DICOM_DIR] = np.nan # populate dicom_id - df_status.loc[:, COL_DICOM_ID] = df_status[COL_SUBJECT_MANIFEST].apply( + df_doughnut.loc[:, COL_DICOM_ID] = df_doughnut[COL_SUBJECT_MANIFEST].apply( participant_id_to_dicom_id ) # initialize all status columns for col in [COL_DOWNLOAD_STATUS, COL_ORG_STATUS, COL_CONV_STATUS]: - df_status[col] = False + df_doughnut[col] = False if regenerate: @@ -127,7 +127,7 @@ def run(global_config_file, regenerate=False, empty=False): 'See sample_dicom_dir_func.py for an example.' ) - df_status[COL_PARTICIPANT_DICOM_DIR] = df_status.apply( + df_doughnut[COL_PARTICIPANT_DICOM_DIR] = df_doughnut.apply( lambda row: participant_id_to_dicom_dir( row[COL_SUBJECT_MANIFEST], str(row[COL_SESSION_MANIFEST]).removeprefix(BIDS_SESSION_PREFIX), @@ -137,67 +137,67 @@ def run(global_config_file, regenerate=False, empty=False): ) # look for raw DICOM: scratch/raw_dicom/session/dicom_dir - df_status[COL_DOWNLOAD_STATUS] = check_status( - df_status, dpath_downloaded_dicom, COL_PARTICIPANT_DICOM_DIR, session_first=True, + df_doughnut[COL_DOWNLOAD_STATUS] = check_status( + df_doughnut, dpath_downloaded_dicom, COL_PARTICIPANT_DICOM_DIR, session_first=True, ) # look for organized DICOM - df_status[COL_ORG_STATUS] = check_status( - df_status, dpath_organized_dicom, COL_DICOM_ID, session_first=True, + df_doughnut[COL_ORG_STATUS] = check_status( + df_doughnut, dpath_organized_dicom, COL_DICOM_ID, session_first=True, ) # look for BIDS: bids/bids_id/session - df_status[COL_CONV_STATUS] = check_status( - df_status, dpath_converted, COL_BIDS_ID_MANIFEST, session_first=False, + df_doughnut[COL_CONV_STATUS] = check_status( + df_doughnut, dpath_converted, COL_BIDS_ID_MANIFEST, session_first=False, ) # warn user if there are rows with a 'True' column after one or more 'False' columns has_lost_files = ( - (df_status[COL_CONV_STATUS] & ~(df_status[COL_ORG_STATUS] | df_status[COL_DOWNLOAD_STATUS])) | - (df_status[COL_ORG_STATUS] & ~df_status[COL_DOWNLOAD_STATUS]) + (df_doughnut[COL_CONV_STATUS] & ~(df_doughnut[COL_ORG_STATUS] | df_doughnut[COL_DOWNLOAD_STATUS])) | + (df_doughnut[COL_ORG_STATUS] & ~df_doughnut[COL_DOWNLOAD_STATUS]) ) if has_lost_files.any(): warnings.warn( 'Some participants-session pairs seem to have lost files:' - f'\n{df_status.loc[has_lost_files]}' + f'\n{df_doughnut.loc[has_lost_files]}' ) else: - df_status = df_status.set_index([COL_SUBJECT_MANIFEST, COL_SESSION_MANIFEST]) + df_doughnut = df_doughnut.set_index([COL_SUBJECT_MANIFEST, COL_SESSION_MANIFEST]) - if df_status_old is not None: + if df_doughnut_old is not None: subject_session_pairs_old = pd.Index(zip( - df_status_old[COL_SUBJECT_MANIFEST], - df_status_old[COL_SESSION_MANIFEST], + df_doughnut_old[COL_SUBJECT_MANIFEST], + df_doughnut_old[COL_SESSION_MANIFEST], )) - df_status_deleted_rows = df_status_old.loc[~subject_session_pairs_old.isin(df_status.index)] + df_doughnut_deleted_rows = df_doughnut_old.loc[~subject_session_pairs_old.isin(df_doughnut.index)] - # error if new status file loses subject-session pairs - if len(df_status_deleted_rows) > 0: + # error if new doughnut file loses subject-session pairs + if len(df_doughnut_deleted_rows) > 0: raise RuntimeError( - 'Some of the subject/session pairs in the old status file do not' + 'Some of the subject/session pairs in the old doughnut file do not' ' seem to exist anymore:' - f'\n{df_status_deleted_rows}' - f'\nUse {FLAG_REGENERATE} to fully regenerate the status file') + f'\n{df_doughnut_deleted_rows}' + f'\nUse {FLAG_REGENERATE} to fully regenerate the doughnut file') else: subject_session_pairs_old = pd.Index([]) - df_status_new_rows = df_status.loc[~df_status.index.isin(subject_session_pairs_old)] - df_status_new_rows = df_status_new_rows.reset_index()[COLS_STATUS] - df_status = pd.concat([df_status_old, df_status_new_rows], axis='index') - print(f'\nAdded {len(df_status_new_rows)} rows to existing status file') + df_doughnut_new_rows = df_doughnut.loc[~df_doughnut.index.isin(subject_session_pairs_old)] + df_doughnut_new_rows = df_doughnut_new_rows.reset_index()[COLS_STATUS] + df_doughnut = pd.concat([df_doughnut_old, df_doughnut_new_rows], axis='index') + print(f'\nAdded {len(df_doughnut_new_rows)} rows to existing doughnut file') - df_status = df_status[COLS_STATUS].drop_duplicates(ignore_index=True) - df_status = df_status.sort_values([COL_SUBJECT_MANIFEST, COL_SESSION_MANIFEST], ignore_index=True) + df_doughnut = df_doughnut[COLS_STATUS].drop_duplicates(ignore_index=True) + df_doughnut = df_doughnut.sort_values([COL_SUBJECT_MANIFEST, COL_SESSION_MANIFEST], ignore_index=True) # do not write file if there are no changes from previous one - if df_status_old is not None and df_status.equals(df_status_old): - print(f'\nNo change from existing status file. Will not write new status file.') + if df_doughnut_old is not None and df_doughnut.equals(df_doughnut_old): + print(f'\nNo change from existing doughnut file. Will not write new doughnut file.') return # save backup and make symlink - save_backup(df_status, fpath_status_symlink, DNAME_BACKUPS_STATUS) + save_backup(df_doughnut, fpath_doughnut_symlink, DNAME_BACKUPS_DOUGHNUT) def check_status(df: pd.DataFrame, dpath, col_dname, session_first=True): @@ -234,12 +234,12 @@ def check_dir(dpath): help='path to global config file for your nipoppy dataset (required)') parser.add_argument( FLAG_REGENERATE, action='store_true', - help=('regenerate entire status file' + help=('regenerate entire doughnut file' ' (default: only append rows for new subjects/sessions)'), ) parser.add_argument( FLAG_EMPTY, action='store_true', - help='generate empty status file (without checking what\'s on the disk)') + help='generate empty doughnut file (without checking what\'s on the disk)') args = parser.parse_args() # parse diff --git a/nipoppy/workflow/dicom_org/run_dicom_org.py b/nipoppy/workflow/dicom_org/run_dicom_org.py index c0a3182d..40e93847 100755 --- a/nipoppy/workflow/dicom_org/run_dicom_org.py +++ b/nipoppy/workflow/dicom_org/run_dicom_org.py @@ -11,10 +11,11 @@ import nipoppy.workflow.catalog as catalog from nipoppy.workflow.dicom_org.utils import search_dicoms, copy_dicoms from nipoppy.workflow.utils import ( - COL_ORG_STATUS, - DNAME_BACKUPS_STATUS, - load_status, - participant_id_to_dicom_id, + COL_ORG_STATUS, + DNAME_BACKUPS_DOUGHNUT, + FNAME_DOUGHNUT, + load_doughnut, + participant_id_to_dicom_id, save_backup, session_id_to_bids_session, ) @@ -65,8 +66,8 @@ def run(global_configs, session_id, logger=None, use_symlinks=True, skip_dcm_che log_dir = f"{DATASET_ROOT}/scratch/logs/" invalid_dicom_dir = f"{log_dir}/invalid_dicom_dir/" - fpath_status = f"{DATASET_ROOT}/scratch/raw_dicom/doughnut.csv" - df_status = load_status(fpath_status) + fpath_doughnut = f"{DATASET_ROOT}/scratch/raw_dicom/{FNAME_DOUGHNUT}" + df_doughnut = load_doughnut(fpath_doughnut) if logger is None: log_file = f"{log_dir}/dicom_org.log" @@ -78,7 +79,7 @@ def run(global_configs, session_id, logger=None, use_symlinks=True, skip_dcm_che logger.info(f"session: {session}") logger.info(f"Number of parallel jobs: {n_jobs}") - reorg_df = catalog.get_new_raw_dicoms(fpath_status, session_id, logger) + reorg_df = catalog.get_new_raw_dicoms(fpath_doughnut, session_id, logger) n_dicom_reorg_participants = len(reorg_df) # start reorganizing @@ -107,8 +108,8 @@ def run(global_configs, session_id, logger=None, use_symlinks=True, skip_dcm_che logger.info(f"DICOMs are now copied into {dicom_dir} and ready for bids conversion!") reorg_df[COL_ORG_STATUS] = True - df_status.loc[reorg_df.index] = reorg_df - save_backup(df_status, fpath_status, DNAME_BACKUPS_STATUS) + df_doughnut.loc[reorg_df.index] = reorg_df + save_backup(df_doughnut, fpath_doughnut, DNAME_BACKUPS_DOUGHNUT) else: logger.info(f"No new participants found for dicom reorg...") diff --git a/nipoppy/workflow/utils.py b/nipoppy/workflow/utils.py index 3e5b635f..4e48e1f2 100644 --- a/nipoppy/workflow/utils.py +++ b/nipoppy/workflow/utils.py @@ -10,9 +10,11 @@ # directory/file names DNAME_BACKUPS_MANIFEST = '.manifests' -DNAME_BACKUPS_STATUS = '.doughnuts' +DNAME_BACKUPS_DOUGHNUT = '.doughnuts' +DNAME_BACKUPS_BAGEL = '.bagels' FNAME_MANIFEST = 'manifest.csv' -FNAME_STATUS = 'doughnut.csv' +FNAME_DOUGHNUT = 'doughnut.csv' +FNAME_BAGEL = 'bagel.csv' # for creating backups TIMESTAMP_FORMAT = '%Y%m%d_%H%M' @@ -92,9 +94,9 @@ def load_manifest(fpath_manifest): converters={COL_DATATYPE_MANIFEST: pd.eval} ) -def load_status(fpath_status): +def load_doughnut(fpath_doughnut): return pd.read_csv( - fpath_status, + fpath_doughnut, dtype={ col: str for col in [