Skip to content

Commit

Permalink
ENH: add action to estimate MAG abundance (#177)
Browse files Browse the repository at this point in the history
  • Loading branch information
misialq authored Aug 30, 2024
1 parent e16ac9d commit c58cd10
Show file tree
Hide file tree
Showing 34 changed files with 409 additions and 27 deletions.
3 changes: 2 additions & 1 deletion q2_moshpit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
from . import abundance
from . import busco
from . import eggnog
from . import partition
Expand All @@ -29,5 +30,5 @@
'kaiju_class', 'kaiju_db', 'dereplicate_mags', 'eggnog',
'busco', 'prodigal', 'kraken_helpers', 'partition',
'filter_derep_mags', 'filter_mags', 'get_feature_lengths',
'filter_reads_pangenome'
'abundance', 'filter_reads_pangenome'
]
14 changes: 9 additions & 5 deletions q2_moshpit/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@

from q2_types.feature_data_mag import MAGSequencesDirFmt

EXTERNAL_CMD_WARNING = (
"Running external command line application(s). "
"This may print messages to stdout and/or stderr.\n"
"The command(s) being run are below. These commands "
"cannot be manually re-run as they will depend on "
"temporary files that no longer exist."
)


def run_command(cmd, env=None, verbose=True, pipe=False, **kwargs):
if verbose:
print("Running external command line application(s). This may print "
"messages to stdout and/or stderr.")
print("The command(s) being run are below. These commands cannot "
"be manually re-run as they will depend on temporary files that "
"no longer exist.")
print(EXTERNAL_CMD_WARNING)
print("\nCommand:", end=' ')
print(" ".join(cmd), end='\n\n')

Expand Down
11 changes: 11 additions & 0 deletions q2_moshpit/abundance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2022-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

from .abundance import estimate_mag_abundance

__all__ = ["estimate_mag_abundance", ]
186 changes: 186 additions & 0 deletions q2_moshpit/abundance/abundance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2022-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import glob
import os
import tempfile

import pandas as pd

from q2_assembly._utils import run_commands_with_pipe
from q2_moshpit._utils import run_command
from q2_types.per_sample_sequences import BAMDirFmt


def rpkm(
df: pd.DataFrame,
length_col: str = "length",
read_counts_col: str = "numreads",
) -> pd.Series:
"""
Calculate Reads Per Kilobase (of MAG), per Million mapped reads (RPKM).
Args:
df (pd.DataFrame): DataFrame containing the read counts and lengths
for each sample and MAG.
length_col (str): Name of the column containing the MAG lengths.
read_counts_col (str): Name of the column containing the read counts.
Returns:
A pandas Series containing the RPKM values for each sample and MAG.
"""
df['rpk'] = df[read_counts_col] / (df[length_col] / 10**3)
reads_per_sample = df.groupby("sample-id")[read_counts_col].sum()
return df['rpk'] * 10**6 / df["sample-id"].map(reads_per_sample)


def tpm(
df: pd.DataFrame,
length_col: str = "length",
read_counts_col: str = "numreads",
) -> pd.Series:
"""
Calculate Transcripts Per Million (TPM).
Args:
df (pd.DataFrame): DataFrame containing the read counts and lengths
for each sample and MAG.
length_col (str): Name of the column containing the MAG lengths.
read_counts_col (str): Name of the column containing the read counts.
Returns:
A pandas Series containing the TPM values for each sample and MAG.
"""
df['rpk'] = df[read_counts_col] / df[length_col] / 10**3
rpk_per_sample = df.groupby("sample-id")['rpk'].sum()
return df['rpk'] / df["sample-id"].map(rpk_per_sample) * 10**6


def _merge_frames(
coverage_df: pd.DataFrame, lengths_df: pd.DataFrame
) -> pd.DataFrame:
"""
Merge coverage data with lengths data on MAG IDs.
Args:
coverage_df (pd.DataFrame): DataFrame containing coverage data
for each sample and MAG.
lengths_df (pd.DataFrame): DataFrame containing lengths for each MAG.
Returns:
A merged DataFrame with summed coverage data and lengths
for each MAG per sample.
"""
coverage_summed = coverage_df.groupby(
["sample-id", "mag-id"]
).sum().reset_index(drop=False)
coverage_summed = coverage_summed.merge(
lengths_df, left_on="mag-id", right_index=True
)
return coverage_summed


def _calculate_coverage(
sample_fp: str, sample_id: str, temp_dir: str,
min_mapq: int, min_query_len: int, min_base_quality: int,
min_read_len: int, threads: int
) -> pd.DataFrame:
"""
Calculate the coverage of a sample.
This function sorts the sample file using samtools, calculates the
coverage, and then reads the coverage file into a DataFrame.
Args:
sample_fp (str): The file path of the sample.
sample_id (str): The ID of the sample.
temp_dir (str): The directory to store temporary files.
min_mapq (int): The minimum mapping quality.
min_query_len (int): The minimum query length.
min_base_quality (int): The minimum base quality.
min_read_len (int): The minimum read length.
threads (int): The number of threads to use.
Returns:
pd.DataFrame: A DataFrame containing the coverage information.
"""
sample_dir = os.path.join(temp_dir, sample_id)
output_fp = os.path.join(str(temp_dir), f"{sample_id}.bam")
coverage_fp = os.path.join(str(temp_dir), f"{sample_id}.coverage.tsv")

os.makedirs(sample_dir)

# sort the BAM file
run_commands_with_pipe(
cmd1=[
"samtools", "view",
"-q", str(min_mapq), "-m", str(min_query_len),
"--threads", str(threads), sample_fp
],
cmd2=[
"samtools", "sort",
"-o", output_fp, "--threads", str(threads), sample_fp
],
verbose=True
)

# calculate the coverage
run_command(
cmd=[
"samtools", "coverage",
"-Q", str(min_base_quality), "-l", str(min_read_len),
"-o", coverage_fp, output_fp
],
verbose=True
)

df = pd.read_csv(coverage_fp, sep="\t", index_col=0)
df["sample-id"] = sample_id
df["mag-id"] = df.index.map(lambda x: x.split("_", maxsplit=1)[0])

return df


def estimate_mag_abundance(
maps: BAMDirFmt,
mag_lengths: pd.DataFrame,
metric: str = "rpkm",
min_mapq: int = 0,
min_query_len: int = 0,
min_base_quality: int = 0,
min_read_len: int = 0,
threads: int = 1,
) -> pd.DataFrame:
metric_func = {"rpkm": rpkm, "tpm": tpm}[metric]

sample_ids_bam = {
os.path.basename(x).split("_alignment")[0]: x for x
in glob.glob(os.path.join(str(maps), "*.bam"))
}

# calculate coverage for each sample
with tempfile.TemporaryDirectory() as temp_dir:
dfs = []
for sample_id, sample_fp in sample_ids_bam.items():
dfs.append(
_calculate_coverage(
sample_fp, sample_id, temp_dir, min_mapq, min_query_len,
min_base_quality, min_read_len, threads
)
)

coverage_df = pd.concat(dfs)
coverage_summed = _merge_frames(coverage_df, mag_lengths)
coverage_summed["abundance"] = metric_func(coverage_summed)

# transform into a feature table
feature_table = coverage_summed.pivot(
index='sample-id', columns='mag-id', values='abundance'
)
feature_table.index.name = "sample-id"

return feature_table
7 changes: 7 additions & 0 deletions q2_moshpit/abundance/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2022-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
3 changes: 3 additions & 0 deletions q2_moshpit/abundance/tests/data/mag-length.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id length
2317a4bd-778d-46fa-901c-88428b2b863e 3500000
890676ee-1310-477d-b713-457ef661b8f3 3650000

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
102 changes: 102 additions & 0 deletions q2_moshpit/abundance/tests/test_abundance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# ----------------------------------------------------------------------------
# Copyright (c) 2022-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import pandas as pd
import qiime2
from qiime2.plugin.testing import TestPluginBase

from q2_moshpit.abundance.abundance import rpkm, tpm, estimate_mag_abundance
from q2_types.per_sample_sequences import BAMDirFmt


class TestAbundance(TestPluginBase):
package = "q2_moshpit.abundance.tests"

def setUp(self):
super().setUp()
self.df = pd.DataFrame({
'sample-id': [*['s1']*4, *['s2']*4, *['s3']*4],
'mag-id': ['m1', 'm2', 'm3', 'm4']*3,
'length': [2000, 4000, 1000, 10000]*3,
'numreads': [10, 20, 5, 0, 12, 25, 8, 0, 30, 60, 15, 1],
})
self.mags_derep = qiime2.Artifact.import_data(
'FeatureData[MAG]', self.get_data_path('mag-sequences')
)
self.mag_length_df = pd.read_csv(
self.get_data_path('mag-length.tsv'), sep='\t', index_col=0
)
self.mag_length = qiime2.Artifact.import_data(
'FeatureData[SequenceCharacteristics % Properties("length")]',
self.mag_length_df
)
self.mapped_reads = qiime2.Artifact.import_data(
'FeatureData[AlignmentMap]', self.get_data_path('reads-mapped')
)

def test_rpkm(self):
obs = rpkm(self.df)
exp = pd.Series(
[142857.14, 142857.14, 142857.14, 0.0,
133333.33, 138888.89, 177777.78, 0.0,
141509.43, 141509.43, 141509.43, 943.40]
)
pd.testing.assert_series_equal(obs, exp)

def test_tpm(self):
obs = tpm(self.df)
exp = pd.Series(
[333333.33, 333333.33, 333333.33, 0.0,
296296.3, 308641.98, 395061.73, 0.0,
332594.24, 332594.24, 332594.24, 2217.29]
)
pd.testing.assert_series_equal(obs, exp)

def test_estimate_mag_abundance(self):
obs = estimate_mag_abundance(
maps=self.mapped_reads.view(BAMDirFmt),
mag_lengths=self.mag_length_df,
metric='rpkm'
)
exp = pd.DataFrame({
"2317a4bd-778d-46fa-901c-88428b2b863e":
[0.0, 285.7, 142.9, 271.4, 14.3],
"890676ee-1310-477d-b713-457ef661b8f3":
[274.0, 0.0, 137.0, 13.7, 260.3]
},
index=pd.Index(
[f"sample{i}" for i in range(1, 6)],
name="sample-id"
)
)
exp.columns.name = "mag-id"
pd.testing.assert_frame_equal(obs, exp, check_exact=False, rtol=0.1)

# just double-check that we have what we expect:
# 2317: E. coli, 8906: M. tuberculosis
# | 2317 | 8906
# sample1 | 0 reads | 100 reads
# sample2 | 100 reads | 0 reads
# sample3 | 50 reads | 50 reads
# sample4 | 95 reads | 5 reads
# sample5 | 5 reads | 95 reads
obs_rel = obs.div(obs.sum(axis=1), axis=0)
exp_rel = pd.DataFrame({
"2317a4bd-778d-46fa-901c-88428b2b863e":
[0.0, 1.0, 0.5, 0.95, 0.05],
"890676ee-1310-477d-b713-457ef661b8f3":
[1.0, 0.0, 0.5, 0.05, 0.95]
},
index=pd.Index(
[f"sample{i}" for i in range(1, 6)],
name="sample-id"
)
)
exp_rel.columns.name = "mag-id"
pd.testing.assert_frame_equal(
obs_rel, exp_rel, check_exact=False, rtol=0.1
)
2 changes: 1 addition & 1 deletion q2_moshpit/eggnog/orthologs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def _search_runner(
- output_loc: Directory where the output files will be stored.
- num_cpus: Number of CPUs to use for the search.
- db_in_memory: Boolean indicating whether the database should be loaded
into memory.
into memory.
- runner_args: Additional arguments to pass to the eggNOG-mapper command.
"""
cmd = [
Expand Down
Loading

0 comments on commit c58cd10

Please sign in to comment.