Skip to content

Commit

Permalink
Merge branch '21_kmer_query' into 22_kmer_build
Browse files Browse the repository at this point in the history
  • Loading branch information
VinzentRisch committed Mar 27, 2024
2 parents 46ddd3f + 5e134c3 commit 9010c71
Show file tree
Hide file tree
Showing 44 changed files with 1,353 additions and 5 deletions.
222 changes: 222 additions & 0 deletions q2_amr/card/kmer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import json
import os
import shutil
import subprocess
import tempfile
import warnings

from q2_amr.card.utils import load_card_db, run_command
from q2_amr.types import (
CARDAlleleAnnotationDirectoryFormat,
CARDAnnotationDirectoryFormat,
CARDDatabaseDirectoryFormat,
CARDKmerDatabaseDirectoryFormat,
CARDMAGsKmerAnalysisDirectoryFormat,
CARDReadsAlleleKmerAnalysisDirectoryFormat,
CARDReadsGeneKmerAnalysisDirectoryFormat,
)


def kmer_query_mags_card(
ctx, amr_annotations, kmer_db, card_db, minimum=10, threads=1, num_partitions=None
):
# Define all actions used by the pipeline
partition_method = ctx.get_action("amr", "partition_mags_annotations")
collate_method = ctx.get_action("amr", "collate_mags_kmer_analyses")
kmer_query = ctx.get_action("amr", "_kmer_query_mags")

# Partition the annotations
(partitioned_annotations,) = partition_method(amr_annotations, num_partitions)

kmer_analyses = []

# Run _kmer_query_mags for every partition
for partition in partitioned_annotations.values():
(kmer_analysis,) = kmer_query(
card_db=card_db,
kmer_db=kmer_db,
amr_annotations=partition,
minimum=minimum,
threads=threads,
)

# Append resulting kmer analysis artifacts to lists
kmer_analyses.append(kmer_analysis)

# Collate kmer analysis artifacts
(collated_kmer_analyses,) = collate_method(kmer_analyses)

return collated_kmer_analyses


def kmer_query_reads_card(
ctx, amr_annotations, card_db, kmer_db, minimum=10, threads=1, num_partitions=None
):
# Define all actions used by the pipeline
partition_method = ctx.get_action("amr", "partition_reads_allele_annotations")
collate_method_allele = ctx.get_action("amr", "collate_reads_allele_kmer_analyses")
collate_method_gene = ctx.get_action("amr", "collate_reads_gene_kmer_analyses")
kmer_query = ctx.get_action("amr", "_kmer_query_reads")

# Partition the annotations
(partitioned_annotations,) = partition_method(amr_annotations, num_partitions)

kmer_analyses_allele = []
kmer_analyses_gene = []

# Run _kmer_query_reads for every partition
for part in partitioned_annotations.values():
(kmer_analysis_allele, kmer_analysis_gene) = kmer_query(
card_db=card_db,
kmer_db=kmer_db,
amr_annotations=part,
minimum=minimum,
threads=threads,
)
# Append resulting kmer analysis artifacts to lists
kmer_analyses_allele.append(kmer_analysis_allele)
kmer_analyses_gene.append(kmer_analysis_gene)

# Collate kmer analysis artifacts
(collated_kmer_analysis_allele,) = collate_method_allele(kmer_analyses_allele)
(collated_kmer_analysis_gene,) = collate_method_gene(kmer_analyses_gene)

return collated_kmer_analysis_allele, collated_kmer_analysis_gene


def _kmer_query_mags(
card_db: CARDDatabaseDirectoryFormat,
kmer_db: CARDKmerDatabaseDirectoryFormat,
amr_annotations: CARDAnnotationDirectoryFormat,
minimum: int = 10,
threads: int = 1,
) -> CARDMAGsKmerAnalysisDirectoryFormat:
return _kmer_query_helper(card_db, kmer_db, amr_annotations, minimum, threads)


def _kmer_query_reads(
card_db: CARDDatabaseDirectoryFormat,
kmer_db: CARDKmerDatabaseDirectoryFormat,
amr_annotations: CARDAlleleAnnotationDirectoryFormat,
minimum: int = 10,
threads: int = 1,
) -> (
CARDReadsAlleleKmerAnalysisDirectoryFormat,
CARDReadsGeneKmerAnalysisDirectoryFormat,
):
return _kmer_query_helper(card_db, kmer_db, amr_annotations, minimum, threads)


def _kmer_query_helper(card_db, kmer_db, amr_annotations, minimum, threads):
if type(amr_annotations) is CARDAlleleAnnotationDirectoryFormat:
annotation_file = "sorted.length_100.bam"
input_type = "bwt"

reads_allele_kmer_analysis = CARDReadsAlleleKmerAnalysisDirectoryFormat()
reads_gene_kmer_analysis = CARDReadsGeneKmerAnalysisDirectoryFormat()
kmer_analysis = (reads_allele_kmer_analysis, reads_gene_kmer_analysis)

else:
annotation_file = "amr_annotation.json"
input_type = "rgi"

mags_kmer_analysis = CARDMAGsKmerAnalysisDirectoryFormat()
kmer_analysis = mags_kmer_analysis

with tempfile.TemporaryDirectory() as tmp:
# Load all necessary database files and retrieve Kmer size
kmer_size = load_card_db(tmp=tmp, card_db=card_db, kmer_db=kmer_db, kmer=True)

# Run once per annotation file
for root, dirs, files in os.walk(str(amr_annotations)):
if annotation_file in files:
input_path = os.path.join(root, annotation_file)

# Run kmer_query
_run_rgi_kmer_query(
tmp=tmp,
input_file=input_path,
input_type=input_type,
kmer_size=kmer_size,
minimum=minimum,
threads=threads,
)

# Define path to JSON kmer analysis file and split it into components
path_json = os.path.join(tmp, f"output_{kmer_size}mer_analysis.json")
path_split = input_path.split(os.path.sep)

# Load dictionary from result JSON file to check if its empty and print
# warning
with open(path_json) as json_file:
json_dict = json.load(json_file)

if not json_dict:
warnings.warn(
f"No taxonomic prediction could be made for "
f"{path_split[-2]}.",
UserWarning,
)

# Define filenames and paths to des directories for reads or MAGs
# analysis
if type(amr_annotations) is CARDAlleleAnnotationDirectoryFormat:
files = (
f"output_{kmer_size}mer_analysis.allele.txt",
f"output_{kmer_size}mer_analysis.json",
f"output_{kmer_size}mer_analysis.gene.txt",
)
des_dir_allele = os.path.join(
str(reads_allele_kmer_analysis), path_split[-2]
)
des_dir_gene = os.path.join(
str(reads_gene_kmer_analysis), path_split[-2]
)
des_dirs = [des_dir_allele, des_dir_allele, des_dir_gene]

else:
files = (
f"output_{kmer_size}mer_analysis_rgi_summary.txt",
f"output_{kmer_size}mer_analysis.json",
)
des_dir = os.path.join(
str(mags_kmer_analysis), path_split[-3], path_split[-2]
)
des_dirs = [des_dir, des_dir]

# Copy Kmer analysis files into the destination directories and remove
# "output_" prefix from filenames
for file, des_dir in zip(files, des_dirs):
os.makedirs(des_dir, exist_ok=True)
des_path = os.path.join(des_dir, file[7:])
shutil.move(os.path.join(tmp, file), des_path)

return kmer_analysis


def _run_rgi_kmer_query(tmp, input_file, input_type, kmer_size, minimum, threads):
cmd = [
"rgi",
"kmer_query",
"--input",
input_file,
f"--{input_type}",
"--kmer_size",
kmer_size,
"--minimum",
str(minimum),
"--threads",
str(threads),
"--output",
"output",
"--local",
]

try:
run_command(cmd, tmp, verbose=True)
except subprocess.CalledProcessError as e:
raise Exception(
"An error was encountered while running rgi, "
f"(return code {e.returncode}), please inspect "
"stdout and stderr to learn more."
)
Loading

0 comments on commit 9010c71

Please sign in to comment.