Skip to content

Commit

Permalink
ENH: replace building standard DBs by fetching the pre-built ones (#34)
Browse files Browse the repository at this point in the history
* ENH: replace building standard DBs by fetching the pre-built ones
* use response streaming
* ENH: build Bracken database for custom Kraken 2 DBs
* small refactor
* add bracken conda pkg
* add tests
* add xmltodict to conda meta
* add missing test files
* add Bracken citation
* add download progress bar
* Lint
* Add tqdm to the list of dependencies
  • Loading branch information
misialq authored Jun 15, 2023
1 parent 3ee536b commit 8b4d715
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 264 deletions.
3 changes: 3 additions & 0 deletions ci/recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ requirements:
- setuptools

run:
- bracken
- kraken2
- metabat2
- samtools
- qiime2 {{ qiime2_epoch }}.*
- q2-types-genomics {{ qiime2_epoch }}.*
- tqdm
- xmltodict

test:
requires:
Expand Down
14 changes: 14 additions & 0 deletions q2_moshpit/citations.bib
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ @article{wood2019
pages = {257},
}

@article{lu2017,
title = {Bracken: Estimating Species Abundance in Metagenomics Data},
shorttitle = {Bracken},
author = {Lu, Jennifer and Breitwieser, Florian P. and Thielen, Peter and Salzberg, Steven L.},
year = {2017},
month = jan,
journal = {PeerJ Computer Science},
volume = {3},
pages = {e104},
publisher = {{PeerJ Inc.}},
issn = {2376-5992},
doi = {10.7717/peerj-cs.104},
}

@article{kang2019,
title = {{{MetaBAT}} 2: {{An}} Adaptive Binning Algorithm for Robust and Efficient Genome Reconstruction from Metagenome Assemblies},
author = {Kang, Dongwan D. and Li, Feng and Kirton, Edward and Thomas, Ashleigh and Egan, Rob and An, Hong and Wang, Zhong},
Expand Down
239 changes: 163 additions & 76 deletions q2_moshpit/kraken2/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,43 @@
# ----------------------------------------------------------------------------
import glob
import os
import re
import shutil
import subprocess
import tarfile
import tempfile
from contextlib import ExitStack
from copy import deepcopy
from typing import List

import requests
import xmltodict
from q2_types.feature_data import DNAFASTAFormat
from tqdm import tqdm

from q2_moshpit._utils import _process_common_input_params, run_command
from q2_types_genomics.kraken2 import (
Kraken2DBDirectoryFormat,
Kraken2DBDirectoryFormat, BrackenDBDirectoryFormat,
)

from q2_moshpit.kraken2.utils import _process_kraken2_arg


def _build_standard_db(db_dir: str, all_kwargs: dict):
kwargs = {
k: v for k, v in all_kwargs.items()
if k in ["threads", "no_masking", "use_ftp", "fast_build"]
}
common_args = _process_common_input_params(
processing_func=_process_kraken2_arg, params=kwargs
)
if all_kwargs["max_db_size"] > 0:
common_args.extend(
["--max-db-size", str(all_kwargs["max_db_size"])]
)
cmd = [
"kraken2-build", "--standard", "--db", db_dir, *common_args
]
try:
run_command(cmd=cmd, verbose=True)
except subprocess.CalledProcessError as e:
raise Exception(
"An error was encountered while building the standard "
f"library, (return code {e.returncode}), please inspect "
"stdout and stderr to learn more."
)
COLLECTIONS = {
"standard": "standard",
"viral": "viral",
"minusb": "minusb",
"standard8": "standard_08gb",
"standard16": "standard_16gb",
"pluspf": "pluspf",
"pluspf8": "pluspf_08gb",
"pluspf16": "pluspf_16gb",
"pluspfp": "pluspfp",
"pluspfp8": "pluspfp_08gb",
"pluspfp16": "pluspfp_16gb",
"eupathdb": "eupathdb48",
}
S3_COLLECTIONS_URL = 'https://genome-idx.s3.amazonaws.com'
CHUNK_SIZE = 8192


def _fetch_taxonomy(db_dir: str, threads: int, use_ftp: bool):
Expand Down Expand Up @@ -115,7 +112,7 @@ def _add_seqs_to_library(db_dir: str, seqs: DNAFASTAFormat, no_masking: bool):
)


def _build_database(db_dir: str, all_kwargs: dict):
def _build_kraken2_database(db_dir: str, all_kwargs: dict):
kwargs = {
k: v for k, v in all_kwargs.items()
if k in ["threads", "minimizer_len", "minimizer_spaces",
Expand All @@ -141,19 +138,129 @@ def _build_database(db_dir: str, all_kwargs: dict):
)


def _move_db_files(source: str, destination: str):
files = glob.glob(f"{source}/*.k2d")
def _build_bracken_database(
kraken2_db_dir: str, threads: int, kmer_len: int, read_len: int
):
cmd = [
"bracken-build", "-d", kraken2_db_dir, "-t", str(threads),
"-k", str(kmer_len), "-l", str(read_len)
]
try:
run_command(cmd=cmd, verbose=True)
except subprocess.CalledProcessError as e:
raise Exception(
"An error was encountered while building the Bracken "
f"database, (return code {e.returncode}), please inspect "
"stdout and stderr to learn more."
)


def _find_latest_db(collection: str, response: requests.Response) -> str:
collection_id = COLLECTIONS[collection]
pattern = fr'kraken\/k2_{collection_id}_\d{{8}}.tar.gz'

s3_objects = xmltodict.parse(response.content)
s3_objects = s3_objects.get('ListBucketResult')
if not s3_objects:
raise ValueError(
'No databases were found in the response returned by S3. '
'Please try again.'
)
s3_objects = [obj for obj in s3_objects['Contents']
if re.match(pattern, obj['Key'])]
s3_objects = sorted(
s3_objects, key=lambda x: x['LastModified'], reverse=True
)
latest_db = s3_objects[0]['Key']
return latest_db


def _fetch_db_collection(collection: str, tmp_dir: str):
err_msg = 'Could not connect to the server. Please check your internet ' \
'connection and try again. The error was: {}.'
try:
response = requests.get(S3_COLLECTIONS_URL)
except requests.exceptions.ConnectionError as e:
raise ValueError(err_msg.format(e))

if response.status_code == 200:
latest_db = _find_latest_db(collection, response)
print(f'Found the latest "{collection}" database: {latest_db}.')
else:
raise ValueError(
'Could not fetch the list of available databases. '
f'Status code was: {response.status_code}. '
'Please try again later.'
)

db_uri = f'{S3_COLLECTIONS_URL}/{latest_db}'
try:
response = requests.get(db_uri, stream=True)
total_size = int(response.headers.get('content-length', 0))
if total_size > 0:
progress_bar = tqdm(
desc=f'Downloading the "{latest_db}" database',
total=total_size, unit='B',
unit_scale=True, unit_divisor=1024,
)
db_path = os.path.join(tmp_dir, os.path.split(db_uri)[-1])
with open(db_path, "wb") as f:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
f.write(chunk) if chunk else False
if total_size > 0:
progress_bar.update(len(chunk))
progress_bar.close() if total_size > 0 else False
except requests.exceptions.ConnectionError as e:
raise ValueError(err_msg.format(e))

msg = "Download finished. Extracting database files..."
print(f"{msg}", end="", flush=True)
with tarfile.open(db_path, "r:gz") as tar:
tar.extractall(path=tmp_dir)
print(f"\r{msg} Done.", flush=True)


def _move_db_files(source: str, destination: str, extension: str = "k2d"):
files = glob.glob(f"{source}/*.{extension}")
for file in files:
new_file = os.path.join(destination, os.path.split(file)[-1])
shutil.move(file, new_file)


def _fetch_prebuilt_dbs(bracken_db, kraken2_db, collection, tmp):
# Find files with the latest version
_fetch_db_collection(collection=collection, tmp_dir=tmp)
# Move the Kraken2/Bracken database files to the final location
_move_db_files(tmp, str(kraken2_db.path), extension="k2d")
_move_db_files(tmp, str(bracken_db.path), extension="kmer_distrib")


def _build_dbs_from_seqs(bracken_db, kraken2_db, seqs, tmp_dir, common_args):
# Fetch taxonomy (also needed for custom databases)
_fetch_taxonomy(
db_dir=tmp_dir, threads=common_args["threads"],
use_ftp=common_args["use_ftp"]
)
for seq in seqs:
_add_seqs_to_library(
db_dir=tmp_dir, seqs=seq, no_masking=common_args["no_masking"]
)
# Build the Kraken2 database
_build_kraken2_database(db_dir=tmp_dir, all_kwargs=common_args)
# Build the Bracken database
for rl in common_args["read_len"]:
_build_bracken_database(
kraken2_db_dir=tmp_dir, threads=common_args["threads"],
kmer_len=common_args["kmer_len"], read_len=rl
)
# Move the Kraken2/Bracken database files to the final location
_move_db_files(tmp_dir, str(kraken2_db.path), extension="k2d")
_move_db_files(tmp_dir, str(bracken_db.path), extension="kmer_distrib")


def build_kraken_db(
seqs: DNAFASTAFormat = None,
standard: bool = False,
library_path: str = None,
libraries: List[str] = None,
library_exists: str = 'skip',
collection: str = None,
threads: int = 1,
kmer_len: int = 35,
minimizer_len: int = 31,
Expand All @@ -163,52 +270,32 @@ def build_kraken_db(
use_ftp: bool = False,
load_factor: float = 0.7,
fast_build: bool = False,
) -> Kraken2DBDirectoryFormat:
db = Kraken2DBDirectoryFormat()
db_dir = str(db.path)
read_len: int = None,
) -> (Kraken2DBDirectoryFormat, BrackenDBDirectoryFormat):
kraken2_db = Kraken2DBDirectoryFormat()
bracken_db = BrackenDBDirectoryFormat()

if standard and libraries:
raise ValueError(
'Standard Kraken2 database was requested but some libraries '
'were also provided. Please provide either only the "standard" '
'option or a list of "libraries" to be fetched for the database.'
)
if not read_len:
# use the same values as in the pre-built databases
read_len = [50, 75, 100, 150, 200, 250, 300]

with ExitStack() as stack:
if not library_path:
temp = tempfile.TemporaryDirectory()
temp_dir = temp.name
stack.enter_context(temp)
else:
os.makedirs(library_path, exist_ok=True)
temp_dir = library_path

# If requested, build the standard Kraken2 database
if standard:
_build_standard_db(db_dir=temp_dir, all_kwargs=locals())
_move_db_files(source=temp_dir, destination=db_dir)
return db

# Fetch taxonomy (required regardless of the build source)
_fetch_taxonomy(db_dir=temp_dir, threads=threads, use_ftp=use_ftp)

# If requested, download all the libraries
if libraries:
_fetch_libraries(
db_dir=temp_dir, libraries=libraries, all_kwargs=locals()
)

# If provided, add the additional sequences to the database
with tempfile.TemporaryDirectory() as tmp:
if seqs:
for seq in seqs:
_add_seqs_to_library(
db_dir=temp_dir, seqs=seq, no_masking=no_masking
)

# Finally, build the actual database
_build_database(db_dir=temp_dir, all_kwargs=locals())
# Construct the custom-made database
common_args = {k: v for k, v in locals().items()
if k not in ["seqs", "collection"]}

# Move the database files (*.k2d) to the final location
_move_db_files(source=temp_dir, destination=db_dir)
# Fetch taxonomy (also needed for custom databases)
_build_dbs_from_seqs(
bracken_db, kraken2_db, seqs, tmp, common_args
)
elif collection:
_fetch_prebuilt_dbs(bracken_db, kraken2_db, collection, tmp)
else:
raise ValueError(
'You need to either provide a list of sequences to build the '
'database from or a valid collection name to be fetched from '
'"Kraken 2/Bracken Refseq indexes" resource.'
)

return db
return kraken2_db, bracken_db
Empty file.
Empty file.
Loading

0 comments on commit 8b4d715

Please sign in to comment.