Skip to content

Commit

Permalink
added reads pipeline for parsl parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
VinzentRisch committed Mar 25, 2024
1 parent 6065096 commit df5530e
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 21 deletions.
55 changes: 55 additions & 0 deletions q2_amr/card/reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import pandas as pd
from q2_types.per_sample_sequences import (
PairedEndSequencesWithQuality,
SequencesWithQuality,
SingleLanePerSamplePairedEndFastqDirFmt,
SingleLanePerSampleSingleEndFastqDirFmt,
)
from q2_types.sample_data import SampleData

from q2_amr.card.utils import create_count_table, load_card_db, read_in_txt, run_command
from q2_amr.types import (
Expand All @@ -19,6 +22,58 @@


def annotate_reads_card(
ctx,
reads,
card_db,
aligner="kma",
threads=1,
include_wildcard=False,
include_other_models=False,
num_partitions=None,
):
annotate = ctx.get_action("amr", "_annotate_reads_card")
collate_allele_annotations = ctx.get_action(
"amr", "collate_reads_allele_annotations"
)
collate_gene_annotations = ctx.get_action("amr", "collate_reads_gene_annotations")
merge_tables = ctx.get_action("feature-table", "merge")

if reads.type <= SampleData[SequencesWithQuality]:
partition_method = ctx.get_action("demux", "partition_samples_single")
elif reads.type <= SampleData[PairedEndSequencesWithQuality]:
partition_method = ctx.get_action("demux", "partition_samples_paired")

(partitioned_seqs,) = partition_method(reads, num_partitions)

allele_annotations = []
gene_annotations = []
allele_tables = []
gene_tables = []

for read in partitioned_seqs.values():
(allele_annotation, gene_annotation, allele_table, gene_table) = annotate(
read, card_db, aligner, threads, include_wildcard, include_other_models
)

allele_annotations.append(allele_annotation)
gene_annotations.append(gene_annotation)
allele_tables.append(allele_table)
gene_tables.append(gene_table)

(collated_allele_annotations,) = collate_allele_annotations(allele_annotations)
(collated_gene_annotations,) = collate_gene_annotations(gene_annotations)
(collated_allele_tables,) = merge_tables(allele_tables)
(collated_gene_tables,) = merge_tables(gene_tables)

return (
collated_allele_annotations,
collated_gene_annotations,
collated_allele_tables,
collated_gene_tables,
)


def _annotate_reads_card(
reads: Union[
SingleLanePerSamplePairedEndFastqDirFmt, SingleLanePerSampleSingleEndFastqDirFmt
],
Expand Down
6 changes: 3 additions & 3 deletions q2_amr/card/tests/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)
from qiime2.plugin.testing import TestPluginBase

from q2_amr.card.reads import annotate_reads_card, run_rgi_bwt
from q2_amr.card.reads import _annotate_reads_card, run_rgi_bwt
from q2_amr.types import (
CARDAlleleAnnotationDirectoryFormat,
CARDDatabaseDirectoryFormat,
Expand Down Expand Up @@ -78,8 +78,8 @@ def annotate_reads_card_test_body(self, read_type):
), patch("q2_amr.card.reads.read_in_txt", mock_read_in_txt), patch(
"q2_amr.card.reads.create_count_table", mock_create_count_table
):
# Run annotate_reads_card function
result = annotate_reads_card(reads, card_db)
# Run _annotate_reads_card function
result = _annotate_reads_card(reads, card_db)

# Retrieve the path to cwd directory from mock_run_rgi_bwt arguments
first_call_args = mock_run_rgi_bwt.call_args_list[0]
Expand Down
81 changes: 63 additions & 18 deletions q2_amr/plugin_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
collate_reads_gene_annotations,
collate_reads_gene_kmer_analyses,
)
from q2_amr.card.reads import annotate_reads_card
from q2_amr.card.reads import _annotate_reads_card, annotate_reads_card
from q2_amr.types import (
CARDAnnotationJSONFormat,
CARDAnnotationTXTFormat,
Expand Down Expand Up @@ -137,32 +137,77 @@
citations=[citations["alcock_card_2023"]],
)

P_aligner, T_allele_annotation, T_gene_annotation = TypeMap(
P_aligner, T_allele_annotation = TypeMap(
{
Str % Choices("kma"): SampleData[CARDAlleleAnnotation % Properties("kma")],
Str
% Choices("kma"): (
SampleData[CARDAlleleAnnotation % Properties("kma")],
SampleData[CARDGeneAnnotation % Properties("kma")],
),
Str
% Choices("bowtie2"): (
SampleData[CARDAlleleAnnotation % Properties("bowtie2")],
SampleData[CARDGeneAnnotation % Properties("bowtie2")],
),
Str
% Choices("bwa"): (
SampleData[CARDAlleleAnnotation % Properties("bwa")],
SampleData[CARDGeneAnnotation % Properties("bwa")],
),
% Choices("bowtie2"): SampleData[CARDAlleleAnnotation % Properties("bowtie2")],
Str % Choices("bwa"): SampleData[CARDAlleleAnnotation % Properties("bwa")],
}
)

plugin.methods.register_function(
plugin.pipelines.register_function(
function=annotate_reads_card,
inputs={
"reads": SampleData[PairedEndSequencesWithQuality | SequencesWithQuality],
"card_db": CARDDatabase,
},
parameters={
"aligner": P_aligner,
"threads": Int % Range(0, None, inclusive_start=False),
"include_wildcard": Bool,
"include_other_models": Bool,
"num_partitions": Int % Range(0, None, inclusive_start=False),
},
outputs=[
("amr_allele_annotation", T_allele_annotation),
("amr_gene_annotation", SampleData[CARDGeneAnnotation]),
("allele_feature_table", FeatureTable[Frequency]),
("gene_feature_table", FeatureTable[Frequency]),
],
input_descriptions={
"reads": "Paired or single end reads.",
"card_db": "CARD Database",
},
parameter_descriptions={
"aligner": "Specify alignment tool.",
"threads": "Number of threads (CPUs) to use.",
"include_wildcard": "Additionally align reads to the in silico predicted "
"allelic variants available in CARD's Resistomes & Variants"
" data set. This is highly recommended for non-clinical "
"samples .",
"include_other_models": "The default settings will align reads against "
"CARD's protein homolog models. With include_other_"
"models set to True, reads are additionally aligned to "
"protein variant models, rRNA mutation models, and "
"protein over-expression models. These three model "
"types require comparison to CARD's curated lists of "
"mutations known to confer phenotypic antibiotic "
"resistance to differentiate alleles conferring "
"resistance from antibiotic susceptible alleles, "
"but RGI as of yet does not perform this comparison. "
"Use these results with caution.",
"num_partitions": "Number of partitions that should run in parallel.",
},
output_descriptions={
"amr_allele_annotation": "AMR annotation mapped on alleles.",
"amr_gene_annotation": "AMR annotation mapped on genes.",
"allele_feature_table": "Frequency table of ARGs in all samples for allele "
"mapping.",
"gene_feature_table": "Frequency table of ARGs in all samples for gene "
"mapping.",
},
name="Annotate reads with antimicrobial resistance genes from CARD.",
description="Annotate reads with antimicrobial resistance genes from CARD.",
citations=[citations["alcock_card_2023"]],
)

plugin.methods.register_function(
function=_annotate_reads_card,
inputs={
"reads": SampleData[PairedEndSequencesWithQuality | SequencesWithQuality],
"card_db": CARDDatabase,
},
parameters={
"aligner": P_aligner,
"threads": Int % Range(0, None, inclusive_start=False),
Expand All @@ -171,7 +216,7 @@
},
outputs=[
("amr_allele_annotation", T_allele_annotation),
("amr_gene_annotation", T_gene_annotation),
("amr_gene_annotation", SampleData[CARDGeneAnnotation]),
("allele_feature_table", FeatureTable[Frequency]),
("gene_feature_table", FeatureTable[Frequency]),
],
Expand Down

0 comments on commit df5530e

Please sign in to comment.