diff --git a/nipoppy/trackers/bagel_schema.json b/nipoppy/trackers/bagel_schema.json new file mode 100644 index 00000000..a2f1a411 --- /dev/null +++ b/nipoppy/trackers/bagel_schema.json @@ -0,0 +1,94 @@ +{ + "GLOBAL_COLUMNS": { + "participant_id": { + "Description": "Participant identifier within a given dataset.", + "dtype": "str", + "IsRequired": true, + "IsPrefixedColumn": false + }, + "bids_id": { + "Description": "BIDS dataset identifier for a participant, if available/different from the participant_id.", + "dtype": "str", + "IsRequired": false, + "IsPrefixedColumn": false + }, + "session": { + "Description": "Participant session ID.", + "dtype": "str", + "IsRequired": true, + "IsPrefixedColumn": false + }, + "has_mri_data": { + "Description": "Whether or not participant had MRI data acquired in a given session.", + "dtype": "bool", + "IsRequired": false, + "Range": [true, false], + "IsPrefixedColumn": false + }, + "HAS_DATATYPE__": { + "Description": "Whether or not participant session has specified raw BIDS datatype. Column suffix should correspond to a specific BIDS subdirectory. e.g., 'HAS_DATATYPE__anat'", + "dtype": "bool", + "IsRequired": false, + "Range": [true, false], + "IsPrefixedColumn": true + }, + "HAS_IMAGE__": { + "Description": "Whether or not participant session has specified imaging file. Column suffix should correspond to a BIDS file suffix. e.g. 'HAS_IMAGE__T1w'", + "dtype": "bool", + "IsRequired": false, + "Range": [true, false], + "IsPrefixedColumn": true + }, + "pipeline_name": { + "Description": "Name of a pipeline that was run for the participant, if applicable. Example value: 'freesurfer'", + "dtype": "str", + "IsRequired": true, + "MissingValue": "UNAVAILABLE", + "IsPrefixedColumn": false + }, + "pipeline_version": { + "description": "Version of pipeline that was run. Must have a value if the value for 'pipeline_name' is not 'UNAVAILABLE'. Example value: '7.3.0'", + "dtype": "str", + "IsRequired": true, + "MissingValue": "UNAVAILABLE", + "IsPrefixedColumn": false + }, + "pipeline_starttime": { + "Description": "Date/time that pipeline run was started. In format of 'YYYY-MM-DD HH:MM:SS'.", + "dtype": "str", + "IsRequired": true, + "MissingValue": "UNAVAILABLE", + "IsPrefixedColumn": false + }, + "pipeline_endtime": { + "Description": "Date/time that pipeline run ended. In format of 'YYYY-MM-DD HH:MM:SS'.", + "dtype": "str", + "IsRequired": false, + "MissingValue": "UNAVAILABLE", + "IsPrefixedColumn": false + } + }, + "PIPELINE_STATUS_COLUMNS": { + "pipeline_complete": { + "Description": "Status of pipeline run. 'SUCCESS': All stages of pipeline (as configured by user) finished successfully (all expected pipeline output files are present). 'FAIL': At least one stage of the pipeline failed. 'INCOMPLETE': Pipeline has not yet been run for the participant or at least one stage is unfinished/still running. 'UNAVAILABLE': Relevant data modality for pipeline not available for participant.", + "dtype": "str", + "IsRequired": true, + "Range": ["SUCCESS", "FAIL", "INCOMPLETE", "UNAVAILABLE"], + "IsPrefixedColumn": false + }, + "PHASE__": { + "Description": "Completion status of tracker-specified phase/subworkflow of a pipeline. This prefix must be followed by a second that is a composite of {pipeline_name}-{pipeline_version} to be grouped to the relevant pipeline. e.g., 'PHASE__fmriprep-20.2.7__func'. Each phase may correspond to a specific output subdirectory, and may be associated with multiple related output files. If phase and stage columns are both present, each phase is expected to correspond to >= 1 stage. 'SUCCESS': All output files corresponding to phase are present. 'FAIL': At least one output file of phase is missing. This status may be used to indicate that the phase crashed. 'INCOMPLETE': Output files for phase are not present. This status may be used to indicate that the phase was not configured for the run (e.g., if it corresponds to a specific derivative type). 'UNAVAILABLE': Relevant data modality for pipeline not available for participant. '' (no value): Specified phase not part of pipeline described by current row/record.", + "dtype": "str", + "IsRequired": false, + "Range": ["SUCCESS", "FAIL", "INCOMPLETE", "UNAVAILABLE", ""], + "IsPrefixedColumn": true + }, + "STAGE__": { + "Description": "Completion status of tracker-specified stage of a pipeline. This prefix must be followed by a second that is a composite of {pipeline_name}-{pipeline_version} to be grouped to the relevant pipeline. e.g., 'STAGE__fmriprep-20.2.7__space-MNI152Lin_res-1'. Each stage may correspond to a single output file or a few linked outputs expected to always coexist. If phase and stage columns are both present, each phase is expected to correspond to >= 1 stage. 'SUCCESS': All output files corresponding to stage are present. 'FAIL': At least one output file of stage is missing. This status may be used to indicate that the stage crashed. 'INCOMPLETE': Output files for phase are not present. This status may be used to indicate that this stage was not configured for the run. 'UNAVAILABLE': Relevant data modality for pipeline not available for participant. '' (no value): Specified stage not part of pipeline described by current row/record.", + "dtype": "str", + "IsRequired": false, + "Range": ["SUCCESS", "FAIL", "INCOMPLETE", "UNAVAILABLE", ""], + "IsPrefixedColumn": true + } + } +} \ No newline at end of file diff --git a/nipoppy/trackers/run_tracker.py b/nipoppy/trackers/run_tracker.py index 1e06e10a..0b93c0d2 100755 --- a/nipoppy/trackers/run_tracker.py +++ b/nipoppy/trackers/run_tracker.py @@ -1,28 +1,26 @@ #!/usr/bin/env python import argparse -import json import bids import json +import tarfile import warnings from pathlib import Path import pandas as pd import nipoppy.workflow.logger as my_logger -from nipoppy.trackers.tracker import Tracker, get_start_time, get_end_time, UNAVAILABLE, INCOMPLETE, TRUE +from nipoppy.trackers.tracker import Tracker, get_start_time, get_end_time, SUCCESS, UNAVAILABLE, INCOMPLETE, TRUE from nipoppy.trackers import bids_tracker, fs_tracker, fmriprep_tracker, mriqc_tracker, tractoflow_tracker from nipoppy.workflow.utils import ( - BIDS_SUBJECT_PREFIX, BIDS_SESSION_PREFIX, + COL_DATATYPE_MANIFEST, COL_SUBJECT_MANIFEST, COL_BIDS_ID_MANIFEST, COL_SESSION_MANIFEST, - COL_CONV_STATUS, DNAME_BACKUPS_BAGEL, FNAME_BAGEL, - FNAME_DOUGHNUT, - load_doughnut, + FNAME_MANIFEST, + load_manifest, save_backup, - session_id_to_bids_session, ) # Globals @@ -34,6 +32,14 @@ "mriqc": mriqc_tracker.tracker_configs, "tractoflow": tractoflow_tracker.tracker_configs, } +PIPELINE_REQUIRED_DATATYPES = { + "heudiconv": [], + "freesurfer": ["anat"], + "fmriprep": ["anat"], + "mriqc": ["anat"], + "tractoflow": ["anat", "dwi"], +} +ALL_DATATYPES = sorted(["anat", "dwi", "func", "fmap"]) BIDS_PIPES = ["mriqc","fmriprep"] NO_TRACKER_PIPES = ["maget_brain"] @@ -45,10 +51,6 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 # for bids tracker bids_dir = f"{DATASET_ROOT}/bids/" - # Grab BIDS participants from the doughnut - doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/doughnut.csv" - doughnut_df = load_doughnut(doughnut_file) - # logging log_dir = f"{DATASET_ROOT}/scratch/logs/" if logger is None: @@ -56,6 +58,7 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 logger = my_logger.get_logger(log_file, level=log_level) logger.info(f"Tracking pipelines: {pipelines}") + logger.info(f"Tracking run: {run_id} and acq_label: {acq_label}") if session_id == "ALL": sessions = global_configs["SESSIONS"] @@ -69,7 +72,6 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 pipe_tracker = Tracker(global_configs, dash_schema_file, pipeline) # TODO revise tracker class - # DATASET_ROOT, session_ids, version = pipe_tracker.get_global_configs() if pipeline == "heudiconv": version = global_configs["BIDS"][pipeline]["VERSION"] else: @@ -82,15 +84,15 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 else: logger.warning(f"Skipping pipeline: {pipeline}. Tracker not listed in the config") - # Grab BIDS participants from the doughnut - doughnut_file = f"{DATASET_ROOT}/scratch/raw_dicom/{FNAME_DOUGHNUT}" - doughnut_df = load_doughnut(doughnut_file) - participants_total = doughnut_df[doughnut_df[COL_CONV_STATUS]][COL_BIDS_ID_MANIFEST].unique() - n_participants_total = len(participants_total) + # Grab BIDS participants from the manifest + fpath_manifest = f"{DATASET_ROOT}/tabular/{FNAME_MANIFEST}" + df_manifest = load_manifest(fpath_manifest) + df_manifest_imaging = df_manifest.loc[df_manifest[COL_DATATYPE_MANIFEST].apply(lambda x: len(x) != 0)] + n_participants_with_imaging = len(df_manifest_imaging[COL_BIDS_ID_MANIFEST].unique()) logger.info("-"*50) logger.info(f"pipeline: {pipeline}, version: {version}") - logger.info(f"n_participants_total: {n_participants_total}, sessions: {sessions}") + logger.info(f"n_participants_with_imaging: {n_participants_with_imaging}, sessions: {sessions}") logger.info("-"*50) status_check_dict = pipe_tracker.get_pipe_tasks(tracker_configs, PIPELINE_STATUS_COLUMNS, pipeline, version) @@ -99,13 +101,16 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 # for prefixed columns we need to generate the column name dash_col_list = list(key for key, value in schema["GLOBAL_COLUMNS"].items() if not value["IsPrefixedColumn"]) # status_check_dict will typically only have minimal pipeline_complete key + for datatype in ALL_DATATYPES: + dash_col_list.append(f"HAS_DATATYPE__{datatype}") dash_col_list = dash_col_list + list(status_check_dict.keys()) for session in sessions: session_id = session.removeprefix(BIDS_SESSION_PREFIX) logger.info(f"Checking session: {session}") - participants_session = doughnut_df[(doughnut_df[COL_BIDS_ID_MANIFEST].isin(participants_total)) & (doughnut_df[COL_SESSION_MANIFEST] == session)][COL_BIDS_ID_MANIFEST].drop_duplicates().astype(str).str.strip().values + df_manifest_session = df_manifest_imaging.loc[df_manifest_imaging[COL_SESSION_MANIFEST] == session] + participants_session = df_manifest_session[COL_BIDS_ID_MANIFEST].unique() n_participants_session = len(participants_session) logger.info(f"n_participants_session: {n_participants_session}") @@ -113,9 +118,9 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 _df[COL_SESSION_MANIFEST] = session _df["pipeline_name"] = pipeline _df["pipeline_version"] = version - _df["has_mri_data"] = TRUE # everyone in the doughnut file has MRI data + _df["has_mri_data"] = TRUE # everyone in participants_session has MRI data - # Set correct dtype based on dash schema to avoid panads warning + # Set correct dtype based on dash schema to avoid pandas warning # i.e. "FutureWarning: Setting an item of incompatible dtype" dash_col_dtype = "str" for dash_col, _ in status_check_dict.items(): @@ -139,7 +144,7 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 # make sure the number of participants is consistent across pipelines if set(participants_session) != old_participants_session and not old_pipelines_session.issubset(set(pipelines)): warnings.warn( - f'The existing bagel file might be obsolete (participant list does not match the doughnut file for session {session})' + f'The existing bagel file might be obsolete (participant list does not match the manifest file for session {session})' f'. Rerun the tracker script with --pipelines {" ".join(old_pipelines_session.union(pipelines))}' ) @@ -154,61 +159,106 @@ def run(global_configs, dash_schema_file, pipelines, session_id="ALL", run_id="1 else: df_bagel_old = None - for bids_id in participants_session: - participant_id = doughnut_df[doughnut_df[COL_BIDS_ID_MANIFEST]==bids_id][COL_SUBJECT_MANIFEST].values[0] + for bids_id, participant_id, available_datatypes in df_manifest_session[[COL_BIDS_ID_MANIFEST, COL_SUBJECT_MANIFEST, COL_DATATYPE_MANIFEST]].itertuples(index=False): _df.loc[bids_id, COL_SUBJECT_MANIFEST] = participant_id _df.loc[bids_id, COL_BIDS_ID_MANIFEST] = bids_id + # TODO eventually we should move these to the {pipeline}_tracker.py files if pipeline == "heudiconv": subject_dir = f"{DATASET_ROOT}/bids/{bids_id}" subject_ses_dir = f"{subject_dir}/{session}" - elif pipeline in ["freesurfer","tractoflow"]: - subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/{session}/{bids_id}" + elif pipeline in ["freesurfer", "tractoflow"]: + subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/{version}/output/{session}/{bids_id}" subject_ses_dir = subject_dir elif pipeline in BIDS_PIPES: - subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/v{version}/output/{bids_id}" + subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/{version}/output/{bids_id}" + # NOTE temporary solution while we refactor tracker configs to be version-specific + if pipeline == "fmriprep": + subject_ses_dir = f"{subject_dir}/{session}" + subject_ses_tar_paths = [ + Path(subject_ses_dir).with_suffix('.tar'), + Path(subject_ses_dir).with_suffix('.tar.gz'), + ] + if ( + not Path(subject_dir).is_dir() and + not any([path.exists() for path in subject_ses_tar_paths]) + ): + subject_dir = f"{DATASET_ROOT}/derivatives/{pipeline}/{version}/output/fmriprep/{bids_id}" subject_ses_dir = f"{subject_dir}/{session}" elif pipeline in NO_TRACKER_PIPES: logger.warning(f"pipeline: {pipeline} does not have a tracker yet...") else: logger.error(f"unknown pipeline: {pipeline}") - - subject_ses_dir_status = Path(subject_ses_dir).is_dir() - logger.debug(f"subject_dir:{subject_ses_dir_status}, dir_status: {subject_ses_dir_status}") - # TODO incorporate manifest datatype availability - if subject_ses_dir_status: - for name, func in status_check_dict.items(): - if pipeline == "heudiconv": - status = func(bids_layout, participant_id, session_id, run_id, acq_label) - else: - status = func(subject_dir, session_id, run_id, acq_label) + # populate HAS_DATATYPE__ columns + # and check if all required datatypes are available + required_datatypes = PIPELINE_REQUIRED_DATATYPES[pipeline] + has_required_datatypes = True + for datatype in ALL_DATATYPES: + _df.loc[bids_id, f"HAS_DATATYPE__{datatype}"] = datatype in available_datatypes + if (datatype in required_datatypes) and (datatype not in available_datatypes): + has_required_datatypes = False + + if has_required_datatypes: + + subject_ses_dir_status = Path(subject_ses_dir).is_dir() + subject_ses_tar_paths = [ + Path(subject_ses_dir).with_suffix('.tar'), + Path(subject_ses_dir).with_suffix('.tar.gz'), + ] + subject_ses_tar_status = any([path.exists() for path in subject_ses_tar_paths]) + logger.debug(f"subject_ses_dir: {subject_ses_dir}, dir_status: {subject_ses_dir_status}, subject_ses_tar_status: {subject_ses_tar_status}") + + if subject_ses_tar_status: + logger.debug(f"subject_ses_dir: {subject_ses_dir} is a tar file") + for name in status_check_dict.keys(): + if name == 'pipeline_complete': + _df.loc[bids_id,name] = SUCCESS + else: + # here, UNAVAILABLE refers to the functionality not being implemented yet for phases/stages + # unrelated to pipeline_complete being UNAVAILABLE, which is related to the datatypes column in the manifest + _df.loc[bids_id,name] = UNAVAILABLE # TODO check if files are available in the tar file + _df.loc[bids_id,"pipeline_starttime"] = UNAVAILABLE + _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE + elif subject_ses_dir_status: + for name, func in status_check_dict.items(): + if pipeline == "heudiconv": + status = func(bids_layout, participant_id, session_id, run_id, acq_label) + else: + status = func(subject_dir, session_id, run_id, acq_label) - logger.debug(f"task_name: {name}, status: {status}") + logger.debug(f"task_name: {name}, status: {status}") - _df.loc[bids_id,name] = status - _df.loc[bids_id,"pipeline_starttime"] = get_start_time(subject_dir) - # TODO only check files listed in the tracker config - _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # get_end_time(subject_dir) + _df.loc[bids_id,name] = status + _df.loc[bids_id,"pipeline_starttime"] = get_start_time(subject_dir) + # TODO only check files listed in the tracker config + _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE # get_end_time(subject_dir) + else: + logger.debug(f"{pipeline} output is expected based on manifest but not found for bids_id: {bids_id}, session: {session}") + for name in status_check_dict.keys(): + _df.loc[bids_id, name] = INCOMPLETE + _df.loc[bids_id, "pipeline_starttime"] = UNAVAILABLE + _df.loc[bids_id, "pipeline_endtime"] = UNAVAILABLE else: - logger.debug(f"{pipeline} output is expected based on manifest but not found for bids_id: {bids_id}, session: {session}") - for name in status_check_dict.keys(): - _df.loc[bids_id,name] = INCOMPLETE - _df.loc[bids_id,"pipeline_starttime"] = UNAVAILABLE - _df.loc[bids_id,"pipeline_endtime"] = UNAVAILABLE + logger.debug(f"{pipeline} output is not expected based on manifest for bids_id: {bids_id}, session: {session}") + for name in status_check_dict.keys(): + _df.loc[bids_id, name] = UNAVAILABLE + _df.loc[bids_id, "pipeline_starttime"] = UNAVAILABLE + _df.loc[bids_id, "pipeline_endtime"] = UNAVAILABLE _df = _df.reset_index(drop=True) # add old rows from other pipelines/sessions and sort for consistent order - df_bagel: pd.DataFrame = pd.concat([df_bagel_old, _df], axis='index') - df_bagel = df_bagel.sort_values(["pipeline_name", "pipeline_version", COL_BIDS_ID_MANIFEST], ignore_index=True) + df_bagel: pd.DataFrame = pd.concat([df_bagel_old, _df], axis='index', ignore_index=True) + df_bagel = df_bagel.sort_values(["pipeline_name", "pipeline_version", COL_BIDS_ID_MANIFEST, COL_SESSION_MANIFEST], ignore_index=True) # don't write a new file if no changes try: - if len(df_bagel.compare(df_bagel_old_full)) == 0: + if (df_bagel_old is not None) and (df_bagel.shape == df_bagel_old_full.shape) and (set(df_bagel.columns) == set(df_bagel_old_full.columns)) and (len(df_bagel.compare(df_bagel_old_full)) == 0): logger.info(f'No change in bagel file for pipeline {pipeline}, session {session}') continue - except Exception: + except Exception as exception: + logger.warning(exception) pass # save bagel