Skip to content

Commit

Permalink
Store new libraries to db while updating others (#7)
Browse files Browse the repository at this point in the history
* Store new libraries to db while updating others

* Rename read_number to read_type in logging
  • Loading branch information
dfornika authored Sep 17, 2022
1 parent 87f99bc commit bdf125f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 47 deletions.
18 changes: 12 additions & 6 deletions auto_fastq_symlink/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python

import argparse
import datetime
import json
import time
import logging
Expand All @@ -24,7 +25,7 @@ def main():
log_level = logging.INFO

logging.basicConfig(
format='{"timestamp": "%(asctime)s.%(msecs)03d", "log_level": "%(levelname)s", "module", "%(module)s", "function_name": "%(funcName)s", "line_num", %(lineno)d, "message": %(message)s}',
format='{"timestamp": "%(asctime)s.%(msecs)03d", "level": "%(levelname)s", "module", "%(module)s", "function_name": "%(funcName)s", "line_num", %(lineno)d, "message": %(message)s}',
datefmt='%Y-%m-%dT%H:%M:%S',
encoding='utf-8',
level=log_level,
Expand All @@ -34,14 +35,11 @@ def main():
scan_interval = args.scan_interval

# We'll trap any KeyboardInterrupt and toggle this to True,
# then exit at a safe time (in between scan & symlink passes).
# then exit at a safe time (in between runs or at the end of a scan of all runs)
quit_when_safe = False

while(True):
try:
if quit_when_safe:
exit(0)

if args.config:
logging.info(json.dumps({"event_type": "load_config_start", "config_file": os.path.abspath(args.config)}))
try:
Expand All @@ -54,10 +52,18 @@ def main():
logging.error(json.dumps({"event_type": "load_config_failed", "config_file": os.path.abspath(args.config)}))

# All of the action happens here.
scan_start_timestamp = datetime.datetime.now()
for run in core.scan(config):
core.symlink_run(config, run)
if quit_when_safe:
exit(0)
exit(0)
scan_complete_timestamp = datetime.datetime.now()
scan_duration_delta = scan_complete_timestamp - scan_start_timestamp
scan_duration_seconds = scan_duration_delta.total_seconds()
logging.info(json.dumps({"event_type": "scan_complete", "scan_duration_seconds": scan_duration_seconds}))

if quit_when_safe:
exit(0)

if "scan_interval_seconds" in config:
scan_interval = config['scan_interval_seconds']
Expand Down
94 changes: 57 additions & 37 deletions auto_fastq_symlink/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import logging
import os
import re
from typing import Iterable

import auto_fastq_symlink.samplesheet as ss
import auto_fastq_symlink.db as db


def collect_project_info(config):
def collect_project_info(config: dict[str, object]) -> dict[str, str]:
"""
"""
projects = {}
projects = config['projects']
# projects_json_safe = auto_fastq_symlink.config.make_config_json_serializable(config)
Expand Down Expand Up @@ -107,7 +110,7 @@ def _determine_library_id_header(samplesheet, instrument_type):
return library_id_header


def sanitize_library_id(library_id: str):
def _sanitize_library_id(library_id: str):
"""
"""
sanitized_library_id = library_id.strip()
Expand All @@ -117,17 +120,29 @@ def sanitize_library_id(library_id: str):
for s in symbols:
sanitized_library_id = sanitized_library_id.replace(s, '-')
sanitized_library_id = re.sub('-+', '-', sanitized_library_id) #

return sanitized_library_id


def find_libraries(run, samplesheet, fastq_extensions):
def find_libraries(run: dict[str, object], samplesheet: dict[str, object], fastq_extensions: list[str]) -> list[dict[str, object]]:
"""
Use parsed samplesheet and run info to find all libraries on the run, along with their project ID and fastq paths.
:param run: The run info.
:type run: dict[str, object]
:param samplesheet: The parsed SampleSheet.
:type samplesheet: dict[str, object]
:param fastq_extensions: A list of valid fastq filename extensions (defined in config)
:type fastq_extensions: list[str]
:return: Libraries
:rtype: list[dict[str, object]]
"""
run_id = run['run_id']
libraries = []
# If we can't confirm that the fastq directory exists, no sense continuing.
# Short-circuit and return an empty list.
if not run['fastq_directory'] or not os.path.exists(run['fastq_directory']):
logging.error(json.dumps({"event_type": "find_libraries_failed", "run_fastq_directory": run['fastq_directory']}))
logging.error(json.dumps({"event_type": "find_libraries_failed", "sequencing_run_id": run_id, "run_fastq_directory": run['fastq_directory']}))
return libraries
libraries_section = _determine_libraries_section(samplesheet, run['instrument_type'])
project_header = _determine_project_header(samplesheet, run['instrument_type'])
Expand All @@ -142,14 +157,16 @@ def find_libraries(run, samplesheet, fastq_extensions):
for item in samplesheet[libraries_section]:
required_item_keys = [project_header, library_id_header]
if not all([k in item for k in required_item_keys]):
logging.warning(json.dumps({"event_type": "library_missing_required_fields", "required_fields": required_item_keys, "library": item}))
continue
library = {}
library_id = sanitize_library_id(item[library_id_header])
library_id = _sanitize_library_id(item[library_id_header])
project_id = item[project_header]
if library_id == "" or library_id in found_library_ids:
continue
logging.debug(json.dumps({"event_type": "found_library", "library_id": library_id}))
library['library_id'] = library_id
library['project_id'] = item[project_header]
library['project_id'] = project_id
logging.debug(json.dumps({"event_type": "found_library", "library_id": library_id, "project_id": project_id}))
r1_fastq_filenames = list(filter(lambda x: re.match(library_id + '.*' + '_R1_' + '.*', x), run_fastq_files))
if len(r1_fastq_filenames) > 0:
r1_fastq_filename = r1_fastq_filenames[0]
Expand All @@ -163,22 +180,23 @@ def find_libraries(run, samplesheet, fastq_extensions):
r1_fastq_path = os.path.join(run['fastq_directory'], r1_fastq_filename) if r1_fastq_filename else None
r2_fastq_path = os.path.join(run['fastq_directory'], r2_fastq_filename) if r2_fastq_filename else None
if r1_fastq_path and os.path.exists(r1_fastq_path):
logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "fastq_path": r1_fastq_path}))
logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "read_type": "R1", "fastq_path": r1_fastq_path}))
library['fastq_path_r1'] = r1_fastq_path
else:
library['fastq_path_r1'] = None
if r2_fastq_path and os.path.exists(r2_fastq_path):
logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "fastq_path": r2_fastq_path}))
logging.debug(json.dumps({"event_type": "found_library_fastq_file", "library_id": library_id, "read_type": "R2", "fastq_path": r2_fastq_path}))
library['fastq_path_r2'] = r2_fastq_path
else:
library['fastq_path_r2'] = None
found_library_ids.add(library_id)
libraries.append(library)


return libraries


def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[str, object]:
def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> Iterable[dict[str, object]]:
"""
Find all sequencing runs under all of the `run_parent_dirs` from the config.
Runs are found by matching sub-directory names against the following regexes: `"\d{6}_M\d{5}_\d+_\d{9}-[A-Z0-9]{5}"` (MiSeq) and `"\d{6}_VH\d{5}_\d+_[A-Z0-9]{9}"` (NextSeq)
Expand All @@ -188,7 +206,7 @@ def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[s
:param fastq_extensions: List of valid fastq file extensions (eg: `[".fastq", ".fastq.gz", ".fq", ".fq.gz"]`)
:type fastq_extensions: list[str]
:return: Dictionary of sequencin run info, indexed by sequencing run ID.
:rtype: dict[str, object]
:rtype: Iterable[dict[str, object]]
"""
run = {}
miseq_run_id_regex = "\d{6}_M\d{5}_\d+_\d{9}-[A-Z0-9]{5}"
Expand All @@ -203,7 +221,7 @@ def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[s
instrument_type = "miseq"
elif re.match(nextseq_run_id_regex, run_id):
instrument_type = "nextseq"
if subdir.is_dir() and instrument_type != None:
if subdir.is_dir() and instrument_type != None and os.path.exists(os.path.join(subdir.path, "upload_complete.json")):
samplesheet_paths = ss.find_samplesheets(subdir.path, instrument_type)
fastq_directory = _find_fastq_directory(subdir.path, instrument_type)
if fastq_directory != None:
Expand All @@ -215,7 +233,7 @@ def find_runs(run_parent_dirs: list[str], fastq_extensions: list[str]) -> dict[s
"run_directory": subdir.path,
"fastq_directory": fastq_directory,
}
samplesheet_to_parse = ss.choose_samplesheet_to_parse(run['samplesheet_files'], run['instrument_type'])
samplesheet_to_parse = ss.choose_samplesheet_to_parse(run['samplesheet_files'], run['instrument_type'], run_id)
if samplesheet_to_parse:
logging.debug(json.dumps({"event_type": "samplesheet_found", "sequencing_run_id": run_id, "samplesheet_path": samplesheet_to_parse}))
else:
Expand Down Expand Up @@ -262,15 +280,16 @@ def find_symlinks(projects):
return symlinks_by_project


def determine_symlinks_to_create_for_run(config, run_id):
def determine_symlinks_to_create_for_run(config: dict[str, object], run_id: str) -> dict[str, dict[str, str]]:
"""
:param config: Application config
:type config: dict[str, object]
:param run_id: Sequencing run identifier
:type run_id: str
:return: Dictionary of symlinks to create, indexed by project ID
:rtype: dict[str, object]
:return: Dictionary of file paths to fastq files for which symlinks should be created, indexed by project ID
:rtype: dict[str, dict[str, str]]
"""
logging.debug(json.dumps({"event_type": "determine_symlinks_for_run_start", "sequencing_run_id": run_id}))
symlinks_to_create_by_project_id = {}

existing_symlinks = db.get_symlinks_by_run_id(config, run_id)
Expand Down Expand Up @@ -302,18 +321,33 @@ def determine_symlinks_to_create_for_run(config, run_id):
}
symlinks_to_create_by_project_id[project_id].append(fastq_path_r2)

total_num_symlinks_to_create = 0
num_symlinks_to_create_by_project_id = {}
for project_id, symlinks in symlinks_to_create_by_project_id.items():
num_symlinks_to_create_by_project_id[project_id] = len(symlinks)
logging.debug(json.dumps({
"event_type": "determine_symlinks_for_run_complete",
"sequencing_run_id": run_id,
"total_num_symlinks_to_create": total_num_symlinks_to_create,
"num_symlinks_to_create_by_project_id": num_symlinks_to_create_by_project_id,
}))

return symlinks_to_create_by_project_id


def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id: dict[str, object]):
def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id: dict[str, list[dict[str, str]]], run_id: str):
"""
:param config:
:type config: dict[str, object]
:param symlinks_to_create_by_project_id:
:type symlinks_to_create_by_project_id: dict[str, object]
:param run_id:
:type run_id: str
:return: Dictionary of symlinks created by project ID.
:rtype: dict[str, object]
:rtype: dict[str, list[dict[str, str]]]
"""
logging.debug(json.dumps({"event_type": "create_symlinks_start", "sequencing_run_id": run_id}))

symlinks_complete_by_project_id = {}
for project_id, symlinks in symlinks_to_create_by_project_id.items():
project_fastq_symlinks_dir = config['projects'][project_id]['fastq_symlinks_dir']
Expand Down Expand Up @@ -347,6 +381,7 @@ def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id:
except FileExistsError as e:
logging.warning(json.dumps({
"event_type": "attempted_to_create_existing_symlink",
"sequencing_run_id": run_id,
"symlink_target": symlink['target'],
"symlink_path": symlink['path'],
}))
Expand All @@ -362,7 +397,7 @@ def create_symlinks(config: dict[str, object], symlinks_to_create_by_project_id:
return symlinks_complete_by_project_id


def scan(config: dict[str, object]) -> dict[str, object]:
def scan(config: dict[str, object]) -> Iterable[dict[str, object]]:
"""
Scanning involves looking for all existing runs and storing them to the database,
then looking for all existing symlinks and storing them to the database.
Expand Down Expand Up @@ -405,9 +440,7 @@ def scan(config: dict[str, object]) -> dict[str, object]:
num_runs_found += 1
yield run

logging.debug(json.dumps({"event_type": "find_and_store_runs_complete", "num_runs_found": num_runs_found}))

logging.info(json.dumps({"event_type": "scan_complete", "num_runs_found": num_runs_found, "num_symlinks_found": num_symlinks_found}))
logging.info(json.dumps({"event_type": "find_and_store_runs_complete", "num_runs_found": num_runs_found}))


def symlink_run(config: dict[str, object], run: dict[str, object]):
Expand All @@ -423,24 +456,11 @@ def symlink_run(config: dict[str, object], run: dict[str, object]):
:rtype: NoneType
"""
run_id = run['run_id']
logging.info(json.dumps({"event_type": "symlink_run_start", "sequencing_run_id": run_id}))
logging.debug(json.dumps({"event_type": "determine_symlinks_for_run_start", "sequencing_run_id": run_id}))
logging.debug(json.dumps({"event_type": "symlink_run_start", "sequencing_run_id": run_id}))

symlinks_to_create = determine_symlinks_to_create_for_run(config, run_id)
total_num_symlinks_to_create = 0
num_symlinks_to_create_by_project_id = {}
for project_id, symlinks in symlinks_to_create.items():
num_symlinks_to_create_by_project_id[project_id] = len(symlinks)

logging.debug(json.dumps({
"event_type": "determine_symlinks_for_run_complete",
"sequencing_run_id": run_id,
"total_num_symlinks_to_create": total_num_symlinks_to_create,
"num_symlinks_to_create_by_project_id": num_symlinks_to_create_by_project_id,
}))

logging.debug(json.dumps({"event_type": "create_symlinks_start", "sequencing_run_id": run_id}))
symlinks_complete_by_project_id = create_symlinks(config, symlinks_to_create)
symlinks_complete_by_project_id = create_symlinks(config, symlinks_to_create, run_id)
total_num_symlinks_created = 0
for project_id, symlinks_complete in symlinks_complete_by_project_id.items():
total_num_symlinks_created += len(symlinks_complete)
Expand Down
11 changes: 11 additions & 0 deletions auto_fastq_symlink/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,17 @@ def update_libraries(session: Session, run: dict[str, object]):
existing_library.fastq_path_r2 = library['fastq_path_r2']
session.commit()
logging.debug(json.dumps({"event_type": "library_updated", "sequencing_run_id": run_id, "library_id": library['library_id']}))
else:
l = Library(
library_id = library['library_id'],
sequencing_run_id = run_id,
project_id = library['project_id'],
fastq_path_r1 = library['fastq_path_r1'],
fastq_path_r2 = library['fastq_path_r2'],
)
session.add(l)
session.commit()
logging.debug(json.dumps({"event_type": "library_stored", "sequencing_run_id": run_id, "library_id": library['library_id']}))


def store_run(config: dict[str, object], run: dict[str, object]):
Expand Down
16 changes: 12 additions & 4 deletions auto_fastq_symlink/samplesheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,16 @@ def find_samplesheets(run_dir, instrument_type):
return samplesheet_paths


def choose_samplesheet_to_parse(samplesheet_paths, instrument_type):
def choose_samplesheet_to_parse(samplesheet_paths: list[str], instrument_type: str, run_id: str):
"""
A run directory may have multiple SampleSheet.csv files in it. Choose only one to parse.
:param samplesheet_paths: List of paths to SampleSheet.csv files
:type samplesheet_paths: list[str]
:param instrument_type: Instrument type, should be one of: "miseq", "nextseq"
:type instrument_type: str
:param run_id: Sequencing run ID
:type run_id: str
"""
samplesheet_to_parse = None
if instrument_type == 'miseq':
Expand All @@ -394,8 +402,8 @@ def choose_samplesheet_to_parse(samplesheet_paths, instrument_type):
samplesheets_by_analysis_num[analysis_num] = samplesheet_path
largest_analysis_num = 0
for analysis_num, samplesheet_path in samplesheets_by_analysis_num.items():
if int(analysis_num) > largest_analysis_num:
largest_analysis_num = int(analysis_num)
if analysis_num > largest_analysis_num:
largest_analysis_num = analysis_num
if largest_analysis_num > 0:
samplesheet_to_parse = samplesheets_by_analysis_num[largest_analysis_num]
if not samplesheet_to_parse:
Expand All @@ -405,7 +413,7 @@ def choose_samplesheet_to_parse(samplesheet_paths, instrument_type):
# If there isn't a top-level "SampleSheet.csv", and there are more than
# one SampleSheet, then we have no other way of deciding which is preferable.
pass
# print(json.dumps(samplesheets_by_analysis_num, indent=2))


return samplesheet_to_parse

Expand Down

0 comments on commit bdf125f

Please sign in to comment.