Skip to content

Commit

Permalink
Separate download for FASTA files via NCBI virus (#640)
Browse files Browse the repository at this point in the history
* Separate download for FASTA files via NCBI virus

* clean up imports

* Separate sequence chunks for SARS2 Genbank workflow

* Fix typo

* Another typo

* remove unused arg for chunk_data

* Remove old metadata concat code

* Migrate flu genbank workflow

* Move status output to trigger redownloads of sequences every day

* Retry downloads if they fail for whatever reason

* Fix placeholder scrape date limit

* Fix for mangled submission date formats

* Fix typo in script name

* Sequence quality for flu genbank workflow

* Sequence quality for RSV workflow
  • Loading branch information
atc3 authored Feb 24, 2024
1 parent 7df2e91 commit d61f167
Show file tree
Hide file tree
Showing 20 changed files with 1,034 additions and 211 deletions.
2 changes: 1 addition & 1 deletion config/config_flu_gisaid_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ chunk_size: 10000
start_date_cutoff:
# Don't process sequences after this date
# Leave empty to ignore
end_date_cutoff:
end_date_cutoff: 2021-03-01

# Don't process sequences after X days ago
# Leave empty to ignore
Expand Down
81 changes: 73 additions & 8 deletions workflow_flu_genbank_ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rule all:
)


rule download_chunk:
rule download_metadata_chunk:
"""Download the data feed in chunks, to avoid timeouts.
"""
output:
Expand All @@ -46,12 +46,27 @@ rule download_chunk:
end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z')
shell:
"""
python3 scripts/download.py \
python3 scripts/download_metadata.py \
--start-time {params.start_time} --end-time {params.end_time} \
> {output.feed}
"""

rule combine_download_chunks:
rule download_sequence_chunk:
"""Download the data feed metadata in chunks, to avoid timeouts.
"""
output:
feed = os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'),
params:
start_time = lambda wildcards: chunks[int(wildcards.chunk)].start_time.strftime('%Y-%m-%dT%H:%M:%S.00Z'),
end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z')
shell:
"""
python3 scripts/download_sequences.py \
--start-time {params.start_time} --end-time {params.end_time} \
> {output.feed}
"""

rule combine_metadata_chunks:
"""Combine all the downloaded chunks into a single CSV file.
Combining CSV files without duplicating header: https://apple.stackexchange.com/a/348619
"""
Expand All @@ -60,36 +75,86 @@ rule combine_download_chunks:
params:
chunk_glob = os.path.join(data_folder, 'feed_chunks', 'feed*.csv')
output:
feed = os.path.join(data_folder, 'feed.csv'),
metadata = os.path.join(data_folder, 'metadata_dirty.csv'),
status = touch(os.path.join(
data_folder, "status", "download_" + today_str + ".done"
))
shell:
"""
awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.feed}
awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.metadata}
"""

rule combine_sequence_chunks:
"""Combine all the downloaded chunks into a single FASTA file.
"""
input:
chunks = expand(os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'), chunk=DL_CHUNKS),
download_status = rules.combine_metadata_chunks.output.status # Force this step every day
params:
chunk_glob = os.path.join(data_folder, 'feed_chunks', 'feed*.fasta')
output:
feed = os.path.join(data_folder, 'feed.fasta'),
shell:
"""
cat {params.chunk_glob} > {output.feed}
"""


rule sequence_quality:
"""Collect statistics about sequence quality
"""
input:
fasta = os.path.join(data_folder, "fasta_raw", "feed_{chunk}.fasta")
output:
quality = os.path.join(data_folder, "quality", "{chunk}.csv")
shell:
"""
python3 scripts/sequence_quality.py \
--input {input.fasta} \
--output {output.quality}
"""

checkpoint combine_quality:
"""Combine all quality result chunks
"""
input:
quality = expand(
os.path.join(data_folder, "quality", "{chunk}.csv"),
chunk=glob_wildcards(os.path.join(data_folder, "fasta_raw", "{i}.fa.gz")).i
),
status = rules.all.input.copy_status
params:
chunk_glob = os.path.join(data_folder, "quality", "*.csv")
output:
quality = os.path.join(data_folder, "quality.csv")
shell:
"""
echo {input.quality}
awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.quality}
"""

rule clean_metadata:
"""Clean metadata, incorporate lineage assignments into metadata
"""
input:
metadata_dirty = rules.combine_download_chunks.output.feed
metadata_dirty = rules.combine_metadata_chunks.output.metadata,
quality = rules.combine_quality.output.quality
output:
metadata_clean = os.path.join(data_folder, "metadata_no_serotype.csv"),
metadata_virus = os.path.join(data_folder, "metadata_virus_no_serotype.csv")
shell:
"""
python3 scripts/clean_metadata.py \
--metadata-in {input.metadata_dirty} \
--quality {input.quality} \
--metadata-out {output.metadata_clean} \
--metadata-virus-out {output.metadata_virus}
"""


rule write_ha_fasta:
input:
feed = rules.combine_download_chunks.output.feed,
feed = rules.combine_sequence_chunks.output.feed,
metadata = rules.clean_metadata.output.metadata_clean
output:
sequences = os.path.join(data_folder, 'ha.fa.gz')
Expand Down Expand Up @@ -144,7 +209,7 @@ checkpoint chunk_sequences:
We end up with hundreds of files, but the filesystem seems to be handling it well.
"""
input:
feed = rules.combine_download_chunks.output.feed,
feed = rules.combine_sequence_chunks.output.feed,
metadata = rules.clean_metadata.output.metadata_clean,
metadata_virus = rules.join_serotype_assignments.output.metadata_virus_plus_serotype,
output:
Expand Down
41 changes: 29 additions & 12 deletions workflow_flu_genbank_ingest/scripts/chunk_sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@
"""

import argparse
import csv
import gzip
import pandas as pd
import sys

from collections import defaultdict
from pathlib import Path


csv.field_size_limit(sys.maxsize)


def flush_chunk(output_path, fasta_by_month_serotype_segment):
for (month, serotype, segment), seqs in fasta_by_month_serotype_segment.items():
# print(month, serotype, segment)
Expand All @@ -40,7 +35,7 @@ def main():
parser = argparse.ArgumentParser()

parser.add_argument(
"--feed", type=str, required=True, help="Path to data feed csv file"
"--feed", type=str, required=True, help="Path to feed FASTA file"
)
parser.add_argument(
"--metadata-in", type=str, required=True, help="Path to metadata csv file"
Expand Down Expand Up @@ -87,13 +82,35 @@ def main():
# Open up the initial fasta file for the first chunk
fasta_by_month_serotype_segment = defaultdict(list)

# Read sequences
cur_entry = ""
cur_seq = ""
entries = []

lines = fp_in.readlines()
for i, line in enumerate(lines):
# Strip whitespace
line = line.strip()

# Ignore empty lines that aren't the last line
if not line and i <= (len(lines) - 1):
continue

# If not the name of an entry, add this line to the current sequence
# (some FASTA files will have multiple lines per sequence)
if line[0] != ">":
cur_seq = cur_seq + line

# Start of another entry = end of the previous entry
if line[0] == ">" or i == (len(lines) - 1):
# Avoid capturing the first one and pushing an empty sequence
if cur_entry:
entries.append((cur_entry, cur_seq))

line_counter = 0
skip_counter = 0

feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"')
for row in feed_reader:

# Flush results if chunk is full
for cur_entry, cur_seq in entries:
if chunk_i == args.chunk_size:
print("Writing {} sequences".format(chunk_i))
flush_chunk(output_path, fasta_by_month_serotype_segment)
Expand All @@ -103,7 +120,7 @@ def main():
fasta_by_month_serotype_segment = defaultdict(list)

# If this entry isn't present in the cleaned metadata, then skip
accession_id = row["genbank_accession"]
accession_id = cur_entry.split("|")[0].strip()
if accession_id not in metadata.index:
skip_counter += 1
continue
Expand All @@ -126,7 +143,7 @@ def main():

# Store sequence in dictionary (By month, not day)
fasta_by_month_serotype_segment[(month, serotype, segment)].append(
(accession_id, row["sequence"])
(accession_id, cur_seq)
)

# Iterate the intra-chunk counter
Expand Down
28 changes: 23 additions & 5 deletions workflow_flu_genbank_ingest/scripts/clean_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ def protein_to_segment(x):


def collapse_segment_list(segments):
"""Choose consensus segment from list of derived segments
"""
"""Choose consensus segment from list of derived segments"""
# Remove non-integers from this list
segments = [x for x in segments if type(x) is int]

Expand Down Expand Up @@ -306,6 +305,12 @@ def main():
parser = argparse.ArgumentParser()

parser.add_argument("--metadata-in", type=str, required=True, help="Metadata in")
parser.add_argument(
"--quality",
type=str,
required=True,
help="Path to sequence quality input CSV file",
)
parser.add_argument("--metadata-out", type=str, required=True, help="Metadata out")
parser.add_argument(
"--metadata-virus-out", type=str, required=True, help="Aggregate metadata out"
Expand Down Expand Up @@ -508,9 +513,11 @@ def main():
# Clean segments
df["genome_coverage"] = df["genome_coverage"].apply(json.loads)
df["proteins"] = df["genome_coverage"].apply(
lambda x: [p["name"] for p in x[0]["proteins"]]
if x[0]["proteins"] is not None
else []
lambda x: (
[p["name"] for p in x[0]["proteins"]]
if x[0]["proteins"] is not None
else []
)
)
segment_df = df[["segments", "proteins"]].explode("proteins")
segment_df["protein_segment"] = protein_to_segment(segment_df["proteins"])
Expand Down Expand Up @@ -599,6 +606,17 @@ def datetime_to_date(x):
# are not correctly parsed and are cutoff when written to BAM files)
df.loc[:, "virus_name"] = df["virus_name"].str.replace(r"\s", "-")

# Load quality and join to dataframe
quality_df = pd.read_csv(args.quality, index_col="Accession ID")
df = df.join(quality_df, how="left")
df["length"] = df["length"].fillna(0).astype(int)
# Calculate percent ambiguous, drop the num_ambiguous column
df["num_ambiguous"] = ((df["num_ambiguous"] / df["length"]) * 100).fillna(0)
df.rename(columns={"num_ambiguous": "percent_ambiguous"}, inplace=True)

# Filter out entries without any sequence
df = df.loc[df["length"] > 0]

df.to_csv(args.metadata_out)

# Write summary dataframe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@
import argparse
import requests

RETRY_ATTEMPTS = 5

def main():

def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--start-time",
Expand Down Expand Up @@ -218,7 +219,7 @@ def main():
("authors", "Authors_csv"),
# Authors_ss - array of authors
("publications", "PubMed_csv"),
("sequence", "Nucleotide_seq"),
# ("sequence", "Nucleotide_seq"),
# FastaMD5_s - MD5 of sequence
# BioProjectCount_i - number of BioProjects
# BioProject_csv - list of BioProjects - csv
Expand All @@ -239,8 +240,22 @@ def main():
"User-Agent": "https://github.com/vector-engineering/covid-cg ([email protected])",
}

response = requests.get(endpoint, params=params, headers=headers, stream=True)
response.raise_for_status()
for _ in range(RETRY_ATTEMPTS):
try:
response = requests.get(
endpoint, params=params, headers=headers, stream=True
)
response.raise_for_status()
break
except requests.exceptions.RequestException as e:
print(f"Error downloading metadata: {e}")

if _ == RETRY_ATTEMPTS - 1:
raise Exception(
f"Failed to download metadata after {RETRY_ATTEMPTS} attempts"
)
else:
print(f"Retrying download...")

response_content = response.iter_content(chunk_size=1024, decode_unicode=True)

Expand Down
Loading

0 comments on commit d61f167

Please sign in to comment.