From bdf125fa06054aab69fe40f39b569bfc22b056e6 Mon Sep 17 00:00:00 2001 From: Dan Fornika Date: Sat, 17 Sep 2022 13:07:57 -0700 Subject: [PATCH] Store new libraries to db while updating others (#7) * Store new libraries to db while updating others * Rename read_number to read_type in logging --- auto_fastq_symlink/__main__.py | 18 ++++-- auto_fastq_symlink/core.py | 94 +++++++++++++++++++------------ auto_fastq_symlink/db.py | 11 ++++ auto_fastq_symlink/samplesheet.py | 16 ++++-- 4 files changed, 92 insertions(+), 47 deletions(-) diff --git a/auto_fastq_symlink/__main__.py b/auto_fastq_symlink/__main__.py index b109e61..b11d685 100644 --- a/auto_fastq_symlink/__main__.py +++ b/auto_fastq_symlink/__main__.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import argparse +import datetime import json import time import logging @@ -24,7 +25,7 @@ def main(): log_level = logging.INFO logging.basicConfig( - format='{"timestamp": "%(asctime)s.%(msecs)03d", "log_level": "%(levelname)s", "module", "%(module)s", "function_name": "%(funcName)s", "line_num", %(lineno)d, "message": %(message)s}', + format='{"timestamp": "%(asctime)s.%(msecs)03d", "level": "%(levelname)s", "module", "%(module)s", "function_name": "%(funcName)s", "line_num", %(lineno)d, "message": %(message)s}', datefmt='%Y-%m-%dT%H:%M:%S', encoding='utf-8', level=log_level, @@ -34,14 +35,11 @@ def main(): scan_interval = args.scan_interval # We'll trap any KeyboardInterrupt and toggle this to True, - # then exit at a safe time (in between scan & symlink passes). + # then exit at a safe time (in between runs or at the end of a scan of all runs) quit_when_safe = False while(True): try: - if quit_when_safe: - exit(0) - if args.config: logging.info(json.dumps({"event_type": "load_config_start", "config_file": os.path.abspath(args.config)})) try: @@ -54,10 +52,18 @@ def main(): logging.error(json.dumps({"event_type": "load_config_failed", "config_file": os.path.abspath(args.config)})) # All of the action happens here. + scan_start_timestamp = datetime.datetime.now() for run in core.scan(config): core.symlink_run(config, run) if quit_when_safe: - exit(0) + exit(0) + scan_complete_timestamp = datetime.datetime.now() + scan_duration_delta = scan_complete_timestamp - scan_start_timestamp + scan_duration_seconds = scan_duration_delta.total_seconds() + logging.info(json.dumps({"event_type": "scan_complete", "scan_duration_seconds": scan_duration_seconds})) + + if quit_when_safe: + exit(0) if "scan_interval_seconds" in config: scan_interval = config['scan_interval_seconds'] diff --git a/auto_fastq_symlink/core.py b/auto_fastq_symlink/core.py index 1a31a87..3a585fd 100644 --- a/auto_fastq_symlink/core.py +++ b/auto_fastq_symlink/core.py @@ -3,12 +3,15 @@ import logging import os import re +from typing import Iterable import auto_fastq_symlink.samplesheet as ss import auto_fastq_symlink.db as db -def collect_project_info(config): +def collect_project_info(config: dict[str, object]) -> dict[str, str]: + """ + """ projects = {} projects = config['projects'] # projects_json_safe = auto_fastq_symlink.config.make_config_json_serializable(config) @@ -107,7 +110,7 @@ def _determine_library_id_header(samplesheet, instrument_type): return library_id_header -def sanitize_library_id(library_id: str): +def _sanitize_library_id(library_id: str): """ """ sanitized_library_id = library_id.strip() @@ -117,17 +120,29 @@ def sanitize_library_id(library_id: str): for s in symbols: sanitized_library_id = sanitized_library_id.replace(s, '-') sanitized_library_id = re.sub('-+', '-', sanitized_library_id) # + return sanitized_library_id -def find_libraries(run, samplesheet, fastq_extensions): +def find_libraries(run: dict[str, object], samplesheet: dict[str, object], fastq_extensions: list[str]) -> list[dict[str, object]]: """ + Use parsed samplesheet and run info to find all libraries on the run, along with their project ID and fastq paths. + + :param run: The run info. + :type run: dict[str, object] + :param samplesheet: The parsed SampleSheet. + :type samplesheet: dict[str, object] + :param fastq_extensions: A list of valid fastq filename extensions (defined in config) + :type fastq_extensions: list[str] + :return: Libraries + :rtype: list[dict[str, object]] """ + run_id = run['run_id'] libraries = [] # If we can't confirm that the fastq directory exists, no sense continuing. # Short-circuit and return an empty list. if not run['fastq_directory'] or not os.path.exists(run['fastq_directory']): - logging.error(json.dumps({"event_type": "find_libraries_failed", "run_fastq_directory": run['fastq_directory']})) + logging.error(json.dumps({"event_type": "find_libraries_failed", "sequencing_run_id": run_id, "run_fastq_directory": run['fastq_directory']})) return libraries libraries_section = _determine_libraries_section(samplesheet, run['instrument_type']) project_header = _determine_project_header(samplesheet, run['instrument_type']) @@ -142,14 +157,16 @@ def find_libraries(run, samplesheet, fastq_extensions): for item in samplesheet[libraries_section]: required_item_keys = [project_header, library_id_header] if not all([k in item for k in required_item_keys]): + logging.warning(json.dumps({"event_type": "library_missing_required_fields", "required_fields": required_item_keys, "library": item})) continue library = {} - library_id = sanitize_library_id(item[library_id_header]) + library_id = _sanitize_library_id(item[library_id_header]) + project_id = item[project_header] if library_id == "" or library_id in found_library_ids: continue - logging.debug(json.dumps({"event_type": "found_library", "library_id": library_id})) library['library_id'] = library_id - library['project_id'] = item[project_header] + library['project_id'] = project_id + logging.debug(json.dumps({"event_type": "found_library", "library_id": library_id, "project_id": project_id})) r1_fastq_filenames = list(filter(lambda x: re.match(library_id + '.*' + '_R1_' + '.*', x), run_fastq_files)) if len(r1_fastq_filenames) > 0: r1_fastq_filename = r1_fastq_filenames[0] @@ -163,22 +180,23 @@ def find_libraries(run, samplesheet, fastq_extensions): r1_fastq_path = os.path.join(run['fastq_directory'], r1_fastq_filename) if r1_fastq_filename else None r2_fastq_path = os.path.join(run['fastq_directory'], r2_fastq_filename) if r2_fastq_filename else None if r1_fastq_path and os.path.exists(r1_fastq_path): - logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "fastq_path": r1_fastq_path})) + logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "read_type": "R1", "fastq_path": r1_fastq_path})) library['fastq_path_r1'] = r1_fastq_path else: library['fastq_path_r1'] = None if r2_fastq_path and os.path.exists(r2_fastq_path): - logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "fastq_path": r2_fastq_path})) + logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "read_type": "R2", "fastq_path": r2_fastq_path})) library['fastq_path_r2'] = r2_fastq_path else: library['fastq_path_r2'] = None found_library_ids.add(library_id) libraries.append(library) + return libraries -def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[str, object]: +def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> Iterable[dict[str, object]]: """ Find all sequencing runs under all of the `run_parent_dirs` from the config. Runs are found by matching sub-directory names against the following regexes: `"\d{6}_M\d{5}_\d+_\d{9}-[A-Z0-9]{5}"` (MiSeq) and `"\d{6}_VH\d{5}_\d+_[A-Z0-9]{9}"` (NextSeq) @@ -188,7 +206,7 @@ def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[s :param fastq_extensions: List of valid fastq file extensions (eg: `[".fastq", ".fastq.gz", ".fq", ".fq.gz"]`) :type fastq_extensions: list[str] :return: Dictionary of sequencin run info, indexed by sequencing run ID. - :rtype: dict[str, object] + :rtype: Iterable[dict[str, object]] """ run = {} miseq_run_id_regex = "\d{6}_M\d{5}_\d+_\d{9}-[A-Z0-9]{5}" @@ -203,7 +221,7 @@ def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[s instrument_type = "miseq" elif re.match(nextseq_run_id_regex, run_id): instrument_type = "nextseq" - if subdir.is_dir() and instrument_type != None: + if subdir.is_dir() and instrument_type != None and os.path.exists(os.path.join(subdir.path, "upload_complete.json")): samplesheet_paths = ss.find_samplesheets(subdir.path, instrument_type) fastq_directory = _find_fastq_directory(subdir.path, instrument_type) if fastq_directory != None: @@ -215,7 +233,7 @@ def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[s "run_directory": subdir.path, "fastq_directory": fastq_directory, } - samplesheet_to_parse = ss.choose_samplesheet_to_parse(run['samplesheet_files'], run['instrument_type']) + samplesheet_to_parse = ss.choose_samplesheet_to_parse(run['samplesheet_files'], run['instrument_type'], run_id) if samplesheet_to_parse: logging.debug(json.dumps({"event_type": "samplesheet_found", "sequencing_run_id": run_id, "samplesheet_path": samplesheet_to_parse})) else: @@ -262,15 +280,16 @@ def find_symlinks(projects): return symlinks_by_project -def determine_symlinks_to_create_for_run(config, run_id): +def determine_symlinks_to_create_for_run(config: dict[str, object], run_id: str) -> dict[str, dict[str, str]]: """ :param config: Application config :type config: dict[str, object] :param run_id: Sequencing run identifier :type run_id: str - :return: Dictionary of symlinks to create, indexed by project ID - :rtype: dict[str, object] + :return: Dictionary of file paths to fastq files for which symlinks should be created, indexed by project ID + :rtype: dict[str, dict[str, str]] """ + logging.debug(json.dumps({"event_type": "determine_symlinks_for_run_start", "sequencing_run_id": run_id})) symlinks_to_create_by_project_id = {} existing_symlinks = db.get_symlinks_by_run_id(config, run_id) @@ -302,18 +321,33 @@ def determine_symlinks_to_create_for_run(config, run_id): } symlinks_to_create_by_project_id[project_id].append(fastq_path_r2) + total_num_symlinks_to_create = 0 + num_symlinks_to_create_by_project_id = {} + for project_id, symlinks in symlinks_to_create_by_project_id.items(): + num_symlinks_to_create_by_project_id[project_id] = len(symlinks) + logging.debug(json.dumps({ + "event_type": "determine_symlinks_for_run_complete", + "sequencing_run_id": run_id, + "total_num_symlinks_to_create": total_num_symlinks_to_create, + "num_symlinks_to_create_by_project_id": num_symlinks_to_create_by_project_id, + })) + return symlinks_to_create_by_project_id -def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id: dict[str, object]): +def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id: dict[str, list[dict[str, str]]], run_id: str): """ :param config: :type config: dict[str, object] :param symlinks_to_create_by_project_id: :type symlinks_to_create_by_project_id: dict[str, object] + :param run_id: + :type run_id: str :return: Dictionary of symlinks created by project ID. - :rtype: dict[str, object] + :rtype: dict[str, list[dict[str, str]]] """ + logging.debug(json.dumps({"event_type": "create_symlinks_start", "sequencing_run_id": run_id})) + symlinks_complete_by_project_id = {} for project_id, symlinks in symlinks_to_create_by_project_id.items(): project_fastq_symlinks_dir = config['projects'][project_id]['fastq_symlinks_dir'] @@ -347,6 +381,7 @@ def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id: except FileExistsError as e: logging.warning(json.dumps({ "event_type": "attempted_to_create_existing_symlink", + "sequencing_run_id": run_id, "symlink_target": symlink['target'], "symlink_path": symlink['path'], })) @@ -362,7 +397,7 @@ def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id: return symlinks_complete_by_project_id -def scan(config: dict[str, object]) -> dict[str, object]: +def scan(config: dict[str, object]) -> Iterable[dict[str, object]]: """ Scanning involves looking for all existing runs and storing them to the database, then looking for all existing symlinks and storing them to the database. @@ -405,9 +440,7 @@ def scan(config: dict[str, object]) -> dict[str, object]: num_runs_found += 1 yield run - logging.debug(json.dumps({"event_type": "find_and_store_runs_complete", "num_runs_found": num_runs_found})) - - logging.info(json.dumps({"event_type": "scan_complete", "num_runs_found": num_runs_found, "num_symlinks_found": num_symlinks_found})) + logging.info(json.dumps({"event_type": "find_and_store_runs_complete", "num_runs_found": num_runs_found})) def symlink_run(config: dict[str, object], run: dict[str, object]): @@ -423,24 +456,11 @@ def symlink_run(config: dict[str, object], run: dict[str, object]): :rtype: NoneType """ run_id = run['run_id'] - logging.info(json.dumps({"event_type": "symlink_run_start", "sequencing_run_id": run_id})) - logging.debug(json.dumps({"event_type": "determine_symlinks_for_run_start", "sequencing_run_id": run_id})) + logging.debug(json.dumps({"event_type": "symlink_run_start", "sequencing_run_id": run_id})) symlinks_to_create = determine_symlinks_to_create_for_run(config, run_id) - total_num_symlinks_to_create = 0 - num_symlinks_to_create_by_project_id = {} - for project_id, symlinks in symlinks_to_create.items(): - num_symlinks_to_create_by_project_id[project_id] = len(symlinks) - logging.debug(json.dumps({ - "event_type": "determine_symlinks_for_run_complete", - "sequencing_run_id": run_id, - "total_num_symlinks_to_create": total_num_symlinks_to_create, - "num_symlinks_to_create_by_project_id": num_symlinks_to_create_by_project_id, - })) - - logging.debug(json.dumps({"event_type": "create_symlinks_start", "sequencing_run_id": run_id})) - symlinks_complete_by_project_id = create_symlinks(config, symlinks_to_create) + symlinks_complete_by_project_id = create_symlinks(config, symlinks_to_create, run_id) total_num_symlinks_created = 0 for project_id, symlinks_complete in symlinks_complete_by_project_id.items(): total_num_symlinks_created += len(symlinks_complete) diff --git a/auto_fastq_symlink/db.py b/auto_fastq_symlink/db.py index 92bfedc..d9348b3 100644 --- a/auto_fastq_symlink/db.py +++ b/auto_fastq_symlink/db.py @@ -97,6 +97,17 @@ def update_libraries(session: Session, run: dict[str, object]): existing_library.fastq_path_r2 = library['fastq_path_r2'] session.commit() logging.debug(json.dumps({"event_type": "library_updated", "sequencing_run_id": run_id, "library_id": library['library_id']})) + else: + l = Library( + library_id = library['library_id'], + sequencing_run_id = run_id, + project_id = library['project_id'], + fastq_path_r1 = library['fastq_path_r1'], + fastq_path_r2 = library['fastq_path_r2'], + ) + session.add(l) + session.commit() + logging.debug(json.dumps({"event_type": "library_stored", "sequencing_run_id": run_id, "library_id": library['library_id']})) def store_run(config: dict[str, object], run: dict[str, object]): diff --git a/auto_fastq_symlink/samplesheet.py b/auto_fastq_symlink/samplesheet.py index cb55f1a..211607e 100644 --- a/auto_fastq_symlink/samplesheet.py +++ b/auto_fastq_symlink/samplesheet.py @@ -370,8 +370,16 @@ def find_samplesheets(run_dir, instrument_type): return samplesheet_paths -def choose_samplesheet_to_parse(samplesheet_paths, instrument_type): +def choose_samplesheet_to_parse(samplesheet_paths: list[str], instrument_type: str, run_id: str): """ + A run directory may have multiple SampleSheet.csv files in it. Choose only one to parse. + + :param samplesheet_paths: List of paths to SampleSheet.csv files + :type samplesheet_paths: list[str] + :param instrument_type: Instrument type, should be one of: "miseq", "nextseq" + :type instrument_type: str + :param run_id: Sequencing run ID + :type run_id: str """ samplesheet_to_parse = None if instrument_type == 'miseq': @@ -394,8 +402,8 @@ def choose_samplesheet_to_parse(samplesheet_paths, instrument_type): samplesheets_by_analysis_num[analysis_num] = samplesheet_path largest_analysis_num = 0 for analysis_num, samplesheet_path in samplesheets_by_analysis_num.items(): - if int(analysis_num) > largest_analysis_num: - largest_analysis_num = int(analysis_num) + if analysis_num > largest_analysis_num: + largest_analysis_num = analysis_num if largest_analysis_num > 0: samplesheet_to_parse = samplesheets_by_analysis_num[largest_analysis_num] if not samplesheet_to_parse: @@ -405,7 +413,7 @@ def choose_samplesheet_to_parse(samplesheet_paths, instrument_type): # If there isn't a top-level "SampleSheet.csv", and there are more than # one SampleSheet, then we have no other way of deciding which is preferable. pass - # print(json.dumps(samplesheets_by_analysis_num, indent=2)) + return samplesheet_to_parse