diff --git a/config/config_flu_gisaid_dev.yaml b/config/config_flu_gisaid_dev.yaml index c04915b9..8fbadbf6 100644 --- a/config/config_flu_gisaid_dev.yaml +++ b/config/config_flu_gisaid_dev.yaml @@ -37,7 +37,7 @@ chunk_size: 10000 start_date_cutoff: # Don't process sequences after this date # Leave empty to ignore -end_date_cutoff: +end_date_cutoff: 2021-03-01 # Don't process sequences after X days ago # Leave empty to ignore diff --git a/workflow_flu_genbank_ingest/Snakefile b/workflow_flu_genbank_ingest/Snakefile index 038d03ed..e3f7aced 100644 --- a/workflow_flu_genbank_ingest/Snakefile +++ b/workflow_flu_genbank_ingest/Snakefile @@ -36,7 +36,7 @@ rule all: ) -rule download_chunk: +rule download_metadata_chunk: """Download the data feed in chunks, to avoid timeouts. """ output: @@ -46,12 +46,27 @@ rule download_chunk: end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z') shell: """ - python3 scripts/download.py \ + python3 scripts/download_metadata.py \ --start-time {params.start_time} --end-time {params.end_time} \ > {output.feed} """ -rule combine_download_chunks: +rule download_sequence_chunk: + """Download the data feed metadata in chunks, to avoid timeouts. + """ + output: + feed = os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'), + params: + start_time = lambda wildcards: chunks[int(wildcards.chunk)].start_time.strftime('%Y-%m-%dT%H:%M:%S.00Z'), + end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z') + shell: + """ + python3 scripts/download_sequences.py \ + --start-time {params.start_time} --end-time {params.end_time} \ + > {output.feed} + """ + +rule combine_metadata_chunks: """Combine all the downloaded chunks into a single CSV file. Combining CSV files without duplicating header: https://apple.stackexchange.com/a/348619 """ @@ -60,21 +75,70 @@ rule combine_download_chunks: params: chunk_glob = os.path.join(data_folder, 'feed_chunks', 'feed*.csv') output: - feed = os.path.join(data_folder, 'feed.csv'), + metadata = os.path.join(data_folder, 'metadata_dirty.csv'), status = touch(os.path.join( data_folder, "status", "download_" + today_str + ".done" )) shell: """ - awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.feed} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.metadata} """ +rule combine_sequence_chunks: + """Combine all the downloaded chunks into a single FASTA file. + """ + input: + chunks = expand(os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'), chunk=DL_CHUNKS), + download_status = rules.combine_metadata_chunks.output.status # Force this step every day + params: + chunk_glob = os.path.join(data_folder, 'feed_chunks', 'feed*.fasta') + output: + feed = os.path.join(data_folder, 'feed.fasta'), + shell: + """ + cat {params.chunk_glob} > {output.feed} + """ + + +rule sequence_quality: + """Collect statistics about sequence quality + """ + input: + fasta = os.path.join(data_folder, "fasta_raw", "feed_{chunk}.fasta") + output: + quality = os.path.join(data_folder, "quality", "{chunk}.csv") + shell: + """ + python3 scripts/sequence_quality.py \ + --input {input.fasta} \ + --output {output.quality} + """ + +checkpoint combine_quality: + """Combine all quality result chunks + """ + input: + quality = expand( + os.path.join(data_folder, "quality", "{chunk}.csv"), + chunk=glob_wildcards(os.path.join(data_folder, "fasta_raw", "{i}.fa.gz")).i + ), + status = rules.all.input.copy_status + params: + chunk_glob = os.path.join(data_folder, "quality", "*.csv") + output: + quality = os.path.join(data_folder, "quality.csv") + shell: + """ + echo {input.quality} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.quality} + """ rule clean_metadata: """Clean metadata, incorporate lineage assignments into metadata """ input: - metadata_dirty = rules.combine_download_chunks.output.feed + metadata_dirty = rules.combine_metadata_chunks.output.metadata, + quality = rules.combine_quality.output.quality output: metadata_clean = os.path.join(data_folder, "metadata_no_serotype.csv"), metadata_virus = os.path.join(data_folder, "metadata_virus_no_serotype.csv") @@ -82,6 +146,7 @@ rule clean_metadata: """ python3 scripts/clean_metadata.py \ --metadata-in {input.metadata_dirty} \ + --quality {input.quality} \ --metadata-out {output.metadata_clean} \ --metadata-virus-out {output.metadata_virus} """ @@ -89,7 +154,7 @@ rule clean_metadata: rule write_ha_fasta: input: - feed = rules.combine_download_chunks.output.feed, + feed = rules.combine_sequence_chunks.output.feed, metadata = rules.clean_metadata.output.metadata_clean output: sequences = os.path.join(data_folder, 'ha.fa.gz') @@ -144,7 +209,7 @@ checkpoint chunk_sequences: We end up with hundreds of files, but the filesystem seems to be handling it well. """ input: - feed = rules.combine_download_chunks.output.feed, + feed = rules.combine_sequence_chunks.output.feed, metadata = rules.clean_metadata.output.metadata_clean, metadata_virus = rules.join_serotype_assignments.output.metadata_virus_plus_serotype, output: diff --git a/workflow_flu_genbank_ingest/scripts/chunk_sequences.py b/workflow_flu_genbank_ingest/scripts/chunk_sequences.py index 54b6f034..802f1dc8 100644 --- a/workflow_flu_genbank_ingest/scripts/chunk_sequences.py +++ b/workflow_flu_genbank_ingest/scripts/chunk_sequences.py @@ -11,18 +11,13 @@ """ import argparse -import csv import gzip import pandas as pd -import sys from collections import defaultdict from pathlib import Path -csv.field_size_limit(sys.maxsize) - - def flush_chunk(output_path, fasta_by_month_serotype_segment): for (month, serotype, segment), seqs in fasta_by_month_serotype_segment.items(): # print(month, serotype, segment) @@ -40,7 +35,7 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument( - "--feed", type=str, required=True, help="Path to data feed csv file" + "--feed", type=str, required=True, help="Path to feed FASTA file" ) parser.add_argument( "--metadata-in", type=str, required=True, help="Path to metadata csv file" @@ -87,13 +82,35 @@ def main(): # Open up the initial fasta file for the first chunk fasta_by_month_serotype_segment = defaultdict(list) + # Read sequences + cur_entry = "" + cur_seq = "" + entries = [] + + lines = fp_in.readlines() + for i, line in enumerate(lines): + # Strip whitespace + line = line.strip() + + # Ignore empty lines that aren't the last line + if not line and i <= (len(lines) - 1): + continue + + # If not the name of an entry, add this line to the current sequence + # (some FASTA files will have multiple lines per sequence) + if line[0] != ">": + cur_seq = cur_seq + line + + # Start of another entry = end of the previous entry + if line[0] == ">" or i == (len(lines) - 1): + # Avoid capturing the first one and pushing an empty sequence + if cur_entry: + entries.append((cur_entry, cur_seq)) + line_counter = 0 skip_counter = 0 - feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"') - for row in feed_reader: - - # Flush results if chunk is full + for cur_entry, cur_seq in entries: if chunk_i == args.chunk_size: print("Writing {} sequences".format(chunk_i)) flush_chunk(output_path, fasta_by_month_serotype_segment) @@ -103,7 +120,7 @@ def main(): fasta_by_month_serotype_segment = defaultdict(list) # If this entry isn't present in the cleaned metadata, then skip - accession_id = row["genbank_accession"] + accession_id = cur_entry.split("|")[0].strip() if accession_id not in metadata.index: skip_counter += 1 continue @@ -126,7 +143,7 @@ def main(): # Store sequence in dictionary (By month, not day) fasta_by_month_serotype_segment[(month, serotype, segment)].append( - (accession_id, row["sequence"]) + (accession_id, cur_seq) ) # Iterate the intra-chunk counter diff --git a/workflow_flu_genbank_ingest/scripts/clean_metadata.py b/workflow_flu_genbank_ingest/scripts/clean_metadata.py index 0b910b5e..e47ec38d 100644 --- a/workflow_flu_genbank_ingest/scripts/clean_metadata.py +++ b/workflow_flu_genbank_ingest/scripts/clean_metadata.py @@ -250,8 +250,7 @@ def protein_to_segment(x): def collapse_segment_list(segments): - """Choose consensus segment from list of derived segments - """ + """Choose consensus segment from list of derived segments""" # Remove non-integers from this list segments = [x for x in segments if type(x) is int] @@ -306,6 +305,12 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--metadata-in", type=str, required=True, help="Metadata in") + parser.add_argument( + "--quality", + type=str, + required=True, + help="Path to sequence quality input CSV file", + ) parser.add_argument("--metadata-out", type=str, required=True, help="Metadata out") parser.add_argument( "--metadata-virus-out", type=str, required=True, help="Aggregate metadata out" @@ -508,9 +513,11 @@ def main(): # Clean segments df["genome_coverage"] = df["genome_coverage"].apply(json.loads) df["proteins"] = df["genome_coverage"].apply( - lambda x: [p["name"] for p in x[0]["proteins"]] - if x[0]["proteins"] is not None - else [] + lambda x: ( + [p["name"] for p in x[0]["proteins"]] + if x[0]["proteins"] is not None + else [] + ) ) segment_df = df[["segments", "proteins"]].explode("proteins") segment_df["protein_segment"] = protein_to_segment(segment_df["proteins"]) @@ -599,6 +606,17 @@ def datetime_to_date(x): # are not correctly parsed and are cutoff when written to BAM files) df.loc[:, "virus_name"] = df["virus_name"].str.replace(r"\s", "-") + # Load quality and join to dataframe + quality_df = pd.read_csv(args.quality, index_col="Accession ID") + df = df.join(quality_df, how="left") + df["length"] = df["length"].fillna(0).astype(int) + # Calculate percent ambiguous, drop the num_ambiguous column + df["num_ambiguous"] = ((df["num_ambiguous"] / df["length"]) * 100).fillna(0) + df.rename(columns={"num_ambiguous": "percent_ambiguous"}, inplace=True) + + # Filter out entries without any sequence + df = df.loc[df["length"] > 0] + df.to_csv(args.metadata_out) # Write summary dataframe diff --git a/workflow_flu_genbank_ingest/scripts/download.py b/workflow_flu_genbank_ingest/scripts/download_metadata.py similarity index 93% rename from workflow_flu_genbank_ingest/scripts/download.py rename to workflow_flu_genbank_ingest/scripts/download_metadata.py index 34c88762..cc283966 100644 --- a/workflow_flu_genbank_ingest/scripts/download.py +++ b/workflow_flu_genbank_ingest/scripts/download_metadata.py @@ -128,9 +128,10 @@ import argparse import requests +RETRY_ATTEMPTS = 5 -def main(): +def main(): parser = argparse.ArgumentParser() parser.add_argument( "--start-time", @@ -218,7 +219,7 @@ def main(): ("authors", "Authors_csv"), # Authors_ss - array of authors ("publications", "PubMed_csv"), - ("sequence", "Nucleotide_seq"), + # ("sequence", "Nucleotide_seq"), # FastaMD5_s - MD5 of sequence # BioProjectCount_i - number of BioProjects # BioProject_csv - list of BioProjects - csv @@ -239,8 +240,22 @@ def main(): "User-Agent": "https://github.com/vector-engineering/covid-cg (covidcg@broadinstitute.org)", } - response = requests.get(endpoint, params=params, headers=headers, stream=True) - response.raise_for_status() + for _ in range(RETRY_ATTEMPTS): + try: + response = requests.get( + endpoint, params=params, headers=headers, stream=True + ) + response.raise_for_status() + break + except requests.exceptions.RequestException as e: + print(f"Error downloading metadata: {e}") + + if _ == RETRY_ATTEMPTS - 1: + raise Exception( + f"Failed to download metadata after {RETRY_ATTEMPTS} attempts" + ) + else: + print(f"Retrying download...") response_content = response.iter_content(chunk_size=1024, decode_unicode=True) diff --git a/workflow_flu_genbank_ingest/scripts/download_sequences.py b/workflow_flu_genbank_ingest/scripts/download_sequences.py new file mode 100644 index 00000000..4adb218a --- /dev/null +++ b/workflow_flu_genbank_ingest/scripts/download_sequences.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +""" +(2020-12-21) Derived from: +https://github.com/nextstrain/ncov-ingest: fetch-from-genbank script + +License is in LICENSE_NEXTSTRAIN, same folder + +------------------ + +Download all RSV sequences and their curated metadata from GenBank via +NCBI Virus. + +Outputs newline-delimited JSON records. + +The request this program makes is based on observing the network activity that + + https://www.ncbi.nlm.nih.gov/labs/virus/vssi/#/virus?SeqType_s=Nucleotide&VirusLineage_ss=Severe%20acute%20respiratory%20syndrome%20coronavirus%202,%20taxid:2697049 + +performs after clicking through the download interface. Some tweaks were made +by comparing different download requests and guessing, which allows us to +download the metadata + sequence in the same request instead of two. + +------------------ + +Example download: + +https://www.ncbi.nlm.nih.gov/genomes/VirusVariation/vvsearch2/?fq=%7B!tag%3DSeqType_s%7DSeqType_s:(%22Nucleotide%22)&fq=VirusLineageId_ss:(208893)&cmd=download&sort=SourceDB_s%20desc,CreateDate_dt%20desc,id%20asc&dlfmt=fasta&fl=AccVer_s,Definition_s,Nucleotide_seq + +https://www.ncbi.nlm.nih.gov/labs/virus/vssi/#/virus?SeqType_s=Nucleotide&VirusLineage_ss=Human%20respiratory%20syncytial%20virus%20A,%20taxid:208893 + +""" + + +import argparse +import requests + +RETRY_ATTEMPTS = 5 + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--start-time", + type=str, + required=True, + help="Start time in format YYYY-MM-DDTHH:MM:SS.00Z", + ) + parser.add_argument( + "--end-time", + type=str, + required=True, + help="Start time in format YYYY-MM-DDTHH:MM:SS.00Z", + ) + args = parser.parse_args() + + endpoint = "https://www.ncbi.nlm.nih.gov/genomes/VirusVariation/vvsearch2/" + params = { + # Search criteria + "fq": [ + '{!tag=SeqType_s}SeqType_s:("Nucleotide")', # Nucleotide sequences (as opposed to protein) + "VirusLineageId_ss:(197911 OR 197912)", # Alphainfluenzavirus OR Betainfluenzavirus + f"{{!tag=CreateDate_dt}}CreateDate_dt:([{args.start_time} TO {args.end_time}])", + ], + # Unclear, but seems necessary. + "q": "*:*", + # Response format + "cmd": "download", + "dlfmt": "fasta", + "fl": ",".join(["id", "AccVer_s", "Definition_s", "Nucleotide_seq"]), + # Stable sort with newest last so diffs work nicely. Columns are source + # data fields, not our output columns. + "sort": "SourceDB_s desc, CollectionDate_s asc, id asc", + # This isn't Entrez, but include the same email parameter it requires just + # to be nice. + "email": "covidcg@broadinstitute.org", + } + + headers = { + "User-Agent": "https://github.com/vector-engineering/covid-cg (covidcg@broadinstitute.org)", + } + + for _ in range(RETRY_ATTEMPTS): + try: + response = requests.get( + endpoint, params=params, headers=headers, stream=True + ) + response.raise_for_status() + break + except requests.exceptions.RequestException as e: + print(f"Error downloading metadata: {e}") + + if _ == RETRY_ATTEMPTS - 1: + raise Exception( + f"Failed to download metadata after {RETRY_ATTEMPTS} attempts" + ) + else: + print(f"Retrying download...") + + response_content = response.iter_content(chunk_size=1024, decode_unicode=True) + + for chunk in response_content: + print(chunk, end="") + + +if __name__ == "__main__": + main() diff --git a/workflow_flu_genbank_ingest/scripts/sequence_quality.py b/workflow_flu_genbank_ingest/scripts/sequence_quality.py new file mode 100644 index 00000000..6f6d1b2b --- /dev/null +++ b/workflow_flu_genbank_ingest/scripts/sequence_quality.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +"""Collect statistics on sequence quality + +Author: Albert Chen - Vector Engineering Team (chena@broadinstitute.org) +""" + +import argparse +import gzip +import re + + +def main(): + """Collect statistics on sequence quality""" + + parser = argparse.ArgumentParser() + + parser.add_argument("--input", type=str, required=True, help="Input FASTA file") + parser.add_argument( + "--output", + type=str, + required=True, + help="Output CSV file", + ) + + args = parser.parse_args() + + fp_in = gzip.open(args.input, "rt") + fp_out = open(args.output, "wt") + + fp_out.write("Accession ID,length,num_ambiguous\n") + + num_sequences = 0 + cur_entry = "" + cur_seq = "" + while True: + line = fp_in.readline() + + # Beginning of a new entry, or EOF = end of current entry + if not line or line[0] == ">": + if cur_entry: + num_ambiguous = 0 + for char in cur_seq: + if char == "N": + num_ambiguous += 1 + + fp_out.write(f"{cur_entry},{str(len(cur_seq))},{str(num_ambiguous)}\n") + num_sequences += 1 + + # If it's the end, then break out + if not line: + break + + # Reset sequence and name + cur_seq = "" + # Extract the name (up to the first whitespace) + # [1:] excludes the first '>' + # .split() breaks up the line into chunks separated by whitespace + # [0] gets the first chunk + # cur_entry = line[1:].split()[0] + # Nevermind, the fasta entries sometimes have spaces..... + # Just rstrip to remove the newline, that should work good enough + cur_entry = line[1:].rstrip() + + # Otherwise add sequence to the current entry + elif cur_entry: + cur_seq += re.sub(r"\s+", "", line).strip() + + fp_in.close() + fp_out.close() + + print("Processed {:,} sequences".format(num_sequences), flush=True) + + +if __name__ == "__main__": + main() diff --git a/workflow_flu_genbank_ingest/scripts/write_ha_fasta.py b/workflow_flu_genbank_ingest/scripts/write_ha_fasta.py index c6e01adb..92f77793 100644 --- a/workflow_flu_genbank_ingest/scripts/write_ha_fasta.py +++ b/workflow_flu_genbank_ingest/scripts/write_ha_fasta.py @@ -2,41 +2,60 @@ # coding: utf-8 import argparse -import csv import gzip import pandas as pd -import sys -csv.field_size_limit(sys.maxsize) def main(): parser = argparse.ArgumentParser() - parser.add_argument('--feed', type=str, required=True, help='Feed CSV file') - parser.add_argument('--metadata-in', type=str, required=True, help='Path to metadata csv file') - parser.add_argument('--out', type=str, required=True, help='Output fasta file suffix') + parser.add_argument("--feed", type=str, required=True, help="Feed CSV file") + parser.add_argument( + "--metadata-in", type=str, required=True, help="Path to metadata csv file" + ) + parser.add_argument( + "--out", type=str, required=True, help="Output fasta file suffix" + ) args = parser.parse_args() # Load metadata - metadata = pd.read_csv(args.metadata_in, index_col='Accession ID') + metadata = pd.read_csv(args.metadata_in, index_col="Accession ID") with open(args.feed, "r", newline="") as fp_in, gzip.open(args.out, "at") as fp_out: + # Read sequences + cur_entry = "" + cur_seq = "" - feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"') - for row in feed_reader: + lines = fp_in.readlines() + for i, line in enumerate(lines): + # Strip whitespace + line = line.strip() - # If this entry isn't present in the cleaned metadata, then skip - accession_id = row['genbank_accession'] - if accession_id not in metadata.index: + # Ignore empty lines that aren't the last line + if not line and i <= (len(lines) - 1): continue - segment = metadata.at[accession_id, 'segment'] - if segment != 4: - continue + # If not the name of an entry, add this line to the current sequence + # (some FASTA files will have multiple lines per sequence) + if line[0] != ">": + cur_seq = cur_seq + line + + # Start of another entry = end of the previous entry + if line[0] == ">" or i == (len(lines) - 1): + # Avoid capturing the first one and pushing an empty sequence + if cur_entry: + # If this entry isn't present in the cleaned metadata, then skip + accession_id = cur_entry.split("|")[0].strip() + if accession_id not in metadata.index: + continue + + segment = metadata.at[accession_id, "segment"] + if segment != 4: + continue - fp_out.write('>{}\n{}\n'.format(accession_id, row['sequence'])) + fp_out.write(">{}\n{}\n".format(accession_id, cur_seq)) -if __name__ == '__main__': - main() \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/workflow_rsv_genbank_ingest/Snakefile b/workflow_rsv_genbank_ingest/Snakefile index 67a6e3e8..495094f7 100644 --- a/workflow_rsv_genbank_ingest/Snakefile +++ b/workflow_rsv_genbank_ingest/Snakefile @@ -26,7 +26,7 @@ rule all: input: # Download latest data feed, process sequences os.path.join( - data_folder, "status", "download_" + today_str + ".done" + data_folder, "status", "download_metadata_" + today_str + ".done" ), os.path.join( data_folder, "status", "merge_sequences_" + today_str + ".done" @@ -41,8 +41,8 @@ rule all: seq_by_subtype_tar = os.path.join(data_folder, 'seq_by_subtype.tar.gz') -rule download_chunk: - """Download the data feed in chunks, to avoid timeouts. +rule download_metadata_chunk: + """Download the data feed metadata in chunks, to avoid timeouts. """ output: feed = os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.csv'), @@ -51,12 +51,28 @@ rule download_chunk: end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z') shell: """ - python3 scripts/download.py \ + python3 scripts/download_metadata.py \ --start-time {params.start_time} --end-time {params.end_time} \ > {output.feed} """ -rule combine_download_chunks: + +rule download_sequence_chunk: + """Download the data feed metadata in chunks, to avoid timeouts. + """ + output: + feed = os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'), + params: + start_time = lambda wildcards: chunks[int(wildcards.chunk)].start_time.strftime('%Y-%m-%dT%H:%M:%S.00Z'), + end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z') + shell: + """ + python3 scripts/download_sequences.py \ + --start-time {params.start_time} --end-time {params.end_time} \ + > {output.feed} + """ + +rule combine_metadata_chunks: """Combine all the downloaded chunks into a single CSV file. Combining CSV files without duplicating header: https://apple.stackexchange.com/a/348619 """ @@ -65,33 +81,81 @@ rule combine_download_chunks: params: chunk_glob = os.path.join(data_folder, 'feed_chunks', 'feed*.csv') output: - feed = os.path.join(data_folder, 'feed.csv'), + metadata_dirty = os.path.join(data_folder, 'metadata_dirty.csv'), status = touch(os.path.join( - data_folder, "status", "download_" + today_str + ".done" + data_folder, "status", "download_metadata_" + today_str + ".done" )) shell: """ - awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.feed} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.metadata_dirty} + """ + +rule combine_sequence_chunks: + """Combine all the downloaded chunks into a single FASTA file. + """ + input: + chunks = expand(os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'), chunk=DL_CHUNKS), + download_status = rules.combine_metadata_chunks.output.status # Force this step every day + params: + chunk_glob = os.path.join(data_folder, 'feed_chunks', 'feed*.fasta') + output: + feed = os.path.join(data_folder, 'feed.fasta'), + shell: + """ + cat {params.chunk_glob} > {output.feed} + """ + +rule sequence_quality: + """Collect statistics about sequence quality + """ + input: + fasta = rules.download_sequence_chunk.output.feed + output: + quality = os.path.join(data_folder, "quality", "{chunk}.csv") + shell: + """ + python3 scripts/sequence_quality.py \ + --input {input.fasta} \ + --output {output.quality} + """ + +rule combine_quality: + """Combine all quality result chunks + """ + input: + quality = expand( + os.path.join(data_folder, "quality", "{chunk}.csv"), + chunk=DL_CHUNKS + ) + params: + chunk_glob = os.path.join(data_folder, "quality", "*.csv") + output: + quality = os.path.join(data_folder, "quality.csv") + shell: + """ + echo {input.quality} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.quality} """ rule clean_metadata: """Clean metadata """ input: - metadata_dirty = rules.combine_download_chunks.output.feed, + metadata_dirty = rules.combine_metadata_chunks.output.metadata_dirty, + quality = rules.combine_quality.output.quality output: metadata_clean = os.path.join(data_folder, "metadata_no_subtype.csv") shell: """ python3 scripts/clean_metadata.py \ --metadata-in {input.metadata_dirty} \ + --quality {input.quality} \ --metadata-out {output.metadata_clean} """ rule write_all_fasta: input: - feed = rules.combine_download_chunks.output.feed, - download_status = rules.combine_download_chunks.output.status, # Force this step every day + feed = rules.combine_sequence_chunks.output.feed, metadata = rules.clean_metadata.output.metadata_clean output: sequences = os.path.join(data_folder, 'all_sequences.fa.gz') @@ -141,7 +205,7 @@ rule split_sequences_by_subtype: """Split sequences by subtype """ input: - feed = rules.combine_download_chunks.output.feed, + feed = rules.combine_sequence_chunks.output.feed, metadata = rules.join_subtype_assignments.output.metadata_plus_subtype output: seq_A = os.path.join(data_folder, 'seq_by_subtype', 'sequences_A.fa.gz'), @@ -175,7 +239,7 @@ checkpoint chunk_sequences: hundreds of files, but the filesystem seems to be handling it well. """ input: - feed = rules.combine_download_chunks.output.feed, + feed = rules.combine_sequence_chunks.output.feed, metadata = rules.join_subtype_assignments.output.metadata_plus_subtype, output: fasta = directory(os.path.join(data_folder, "fasta_temp")) diff --git a/workflow_rsv_genbank_ingest/scripts/chunk_sequences.py b/workflow_rsv_genbank_ingest/scripts/chunk_sequences.py index 2f05718f..313072f7 100755 --- a/workflow_rsv_genbank_ingest/scripts/chunk_sequences.py +++ b/workflow_rsv_genbank_ingest/scripts/chunk_sequences.py @@ -11,18 +11,13 @@ """ import argparse -import csv import gzip import pandas as pd -import sys from collections import defaultdict from pathlib import Path -csv.field_size_limit(sys.maxsize) - - def flush_chunk(output_path, fasta_by_month_subtype): for (month, subtype), seqs in fasta_by_month_subtype.items(): # print(month, subtype) @@ -38,7 +33,7 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument( - "--feed", type=str, required=True, help="Path to data feed csv file" + "--feed", type=str, required=True, help="Path to data feed FASTA file" ) parser.add_argument( "--metadata-in", type=str, required=True, help="Path to metadata csv file" @@ -74,16 +69,19 @@ def main(): # Keep track of how far we're along the current chunk chunk_i = 0 - with open(args.feed, "r", newline="") as fp_in: + # Read sequences + cur_entry = "" + cur_seq = "" + + with open(args.feed, "rt", newline="") as fp_in: # Open up the initial fasta file for the first chunk fasta_by_month_subtype = defaultdict(list) line_counter = 0 skip_counter = 0 - feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"') - for row in feed_reader: - + lines = fp_in.readlines() + for i, line in enumerate(lines): # Flush results if chunk is full if chunk_i == args.chunk_size: print("Writing {} sequences".format(chunk_i)) @@ -93,28 +91,45 @@ def main(): # Reset sequence dictionary fasta_by_month_subtype = defaultdict(list) - # If this entry isn't present in the cleaned metadata, then skip - accession_id = row["genbank_accession"] - if accession_id not in metadata.index: - skip_counter += 1 - continue + # Strip whitespace + line = line.strip() - subtype = metadata.at[accession_id, "subtype"] - if not subtype: - skip_counter += 1 + # Ignore empty lines that aren't the last line + if not line and i <= (len(lines) - 1): continue - month = metadata.at[accession_id, "submission_date"][0:7] - - # Store sequence in dictionary (By month, not day) - fasta_by_month_subtype[(month, subtype)].append( - (accession_id, row["sequence"]) - ) - - # Iterate the intra-chunk counter - chunk_i += 1 - - line_counter += 1 + # If not the name of an entry, add this line to the current sequence + # (some FASTA files will have multiple lines per sequence) + if line[0] != ">": + cur_seq = cur_seq + line + + # Start of another entry = end of the previous entry + if line[0] == ">" or i == (len(lines) - 1): + # Avoid capturing the first one and pushing an empty sequence + if cur_entry: + accession_id = cur_entry.split("|")[0].strip() + if accession_id in metadata.index: + subtype = metadata.at[accession_id, "subtype"] + if not subtype: + skip_counter += 1 + else: + month = metadata.at[accession_id, "submission_date"][0:7] + # Store sequence in dictionary (By month, not day) + fasta_by_month_subtype[(month, subtype)].append( + (accession_id, cur_seq) + ) + # Iterate the intra-chunk counter + chunk_i += 1 + line_counter += 1 + else: + skip_counter += 1 + + # Clear the entry and sequence + cur_entry = line[1:] + # Ignore anything past the first whitespace + if cur_entry: + cur_entry = cur_entry.split()[0] + cur_seq = "" # Flush the last chunk print("Writing {} sequences".format(chunk_i)) diff --git a/workflow_rsv_genbank_ingest/scripts/clean_metadata.py b/workflow_rsv_genbank_ingest/scripts/clean_metadata.py index ce9d1f8e..7f9304da 100755 --- a/workflow_rsv_genbank_ingest/scripts/clean_metadata.py +++ b/workflow_rsv_genbank_ingest/scripts/clean_metadata.py @@ -6,7 +6,6 @@ """ import argparse -import datetime import pandas as pd @@ -30,6 +29,9 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--metadata-in", type=str, required=True, help="Metadata in") + parser.add_argument( + "--quality", type=str, required=True, help="Path to input quality CSV file" + ) parser.add_argument("--metadata-out", type=str, required=True, help="Metadata out") args = parser.parse_args() @@ -98,10 +100,11 @@ def main(): # Remove "Z" from the end of the submission date string, and convert from # ISO datetime to ISO date - def datetime_to_date(x): - return datetime.datetime.fromisoformat(x[:-1]).strftime("%Y-%m-%d") - - df.loc[:, "submission_date"] = df["submission_date"].apply(datetime_to_date) + df.loc[:, "submission_date"] = pd.to_datetime( + df["submission_date"], errors="coerce" + ).apply(lambda x: x.strftime("%Y-%m-%d")) + # Remove rows with invalid submission dates + df = df.loc[~(df["submission_date"].isna())] # Parse location data def parse_genbank_location(s): @@ -163,6 +166,17 @@ def parse_genbank_location(s): # Segment = 1 df["segment"] = 1 + # Load quality and join to dataframe + quality_df = pd.read_csv(args.quality, index_col="Accession ID") + df = df.join(quality_df, how="left") + df["length"] = df["length"].fillna(0).astype(int) + # Calculate percent ambiguous, drop the num_ambiguous column + df["num_ambiguous"] = ((df["num_ambiguous"] / df["length"]) * 100).fillna(0) + df.rename(columns={"num_ambiguous": "percent_ambiguous"}, inplace=True) + + # Filter out entries without any sequence + df = df.loc[df["length"] > 0] + df.to_csv(args.metadata_out) diff --git a/workflow_rsv_genbank_ingest/scripts/download.py b/workflow_rsv_genbank_ingest/scripts/download_metadata.py similarity index 85% rename from workflow_rsv_genbank_ingest/scripts/download.py rename to workflow_rsv_genbank_ingest/scripts/download_metadata.py index ab2c68d6..57ad7d23 100755 --- a/workflow_rsv_genbank_ingest/scripts/download.py +++ b/workflow_rsv_genbank_ingest/scripts/download_metadata.py @@ -27,9 +27,10 @@ import argparse import requests +RETRY_ATTEMPTS = 5 -def main(): +def main(): parser = argparse.ArgumentParser() parser.add_argument( "--start-time", @@ -82,7 +83,7 @@ def main(): ("title", "Definition_s"), ("authors", "Authors_csv"), ("publications", "PubMed_csv"), - ("sequence", "Nucleotide_seq"), + # ("sequence", "Nucleotide_seq"), ("protein_names", "ProtNames_ss"), ] ), @@ -98,8 +99,22 @@ def main(): "User-Agent": "https://github.com/vector-engineering/covid-cg (covidcg@broadinstitute.org)", } - response = requests.get(endpoint, params=params, headers=headers, stream=True) - response.raise_for_status() + for _ in range(RETRY_ATTEMPTS): + try: + response = requests.get( + endpoint, params=params, headers=headers, stream=True + ) + response.raise_for_status() + break + except requests.exceptions.RequestException as e: + print(f"Error downloading metadata: {e}") + + if _ == RETRY_ATTEMPTS - 1: + raise Exception( + f"Failed to download metadata after {RETRY_ATTEMPTS} attempts" + ) + else: + print(f"Retrying download...") response_content = response.iter_content(chunk_size=1024, decode_unicode=True) diff --git a/workflow_rsv_genbank_ingest/scripts/download_sequences.py b/workflow_rsv_genbank_ingest/scripts/download_sequences.py new file mode 100644 index 00000000..b27eddb5 --- /dev/null +++ b/workflow_rsv_genbank_ingest/scripts/download_sequences.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +""" +(2020-12-21) Derived from: +https://github.com/nextstrain/ncov-ingest: fetch-from-genbank script + +License is in LICENSE_NEXTSTRAIN, same folder + +------------------ + +Download all RSV sequences and their curated metadata from GenBank via +NCBI Virus. + +Outputs newline-delimited JSON records. + +The request this program makes is based on observing the network activity that + + https://www.ncbi.nlm.nih.gov/labs/virus/vssi/#/virus?SeqType_s=Nucleotide&VirusLineage_ss=Severe%20acute%20respiratory%20syndrome%20coronavirus%202,%20taxid:2697049 + +performs after clicking through the download interface. Some tweaks were made +by comparing different download requests and guessing, which allows us to +download the metadata + sequence in the same request instead of two. + +------------------ + +Example download: + +https://www.ncbi.nlm.nih.gov/genomes/VirusVariation/vvsearch2/?fq=%7B!tag%3DSeqType_s%7DSeqType_s:(%22Nucleotide%22)&fq=VirusLineageId_ss:(208893)&cmd=download&sort=SourceDB_s%20desc,CreateDate_dt%20desc,id%20asc&dlfmt=fasta&fl=AccVer_s,Definition_s,Nucleotide_seq + +https://www.ncbi.nlm.nih.gov/labs/virus/vssi/#/virus?SeqType_s=Nucleotide&VirusLineage_ss=Human%20respiratory%20syncytial%20virus%20A,%20taxid:208893 + +""" + + +import argparse +import requests + +RETRY_ATTEMPTS = 5 + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--start-time", + type=str, + required=True, + help="Start time in format YYYY-MM-DDTHH:MM:SS.00Z", + ) + parser.add_argument( + "--end-time", + type=str, + required=True, + help="Start time in format YYYY-MM-DDTHH:MM:SS.00Z", + ) + args = parser.parse_args() + + endpoint = "https://www.ncbi.nlm.nih.gov/genomes/VirusVariation/vvsearch2/" + params = { + # Search criteria + "fq": [ + '{!tag=SeqType_s}SeqType_s:("Nucleotide")', # Nucleotide sequences (as opposed to protein) + "VirusLineageId_ss:(11250)", # NCBI Taxon id for RSV + f"{{!tag=CreateDate_dt}}CreateDate_dt:([{args.start_time} TO {args.end_time}])", + ], + # Unclear, but seems necessary. + "q": "*:*", + # Response format + "cmd": "download", + "dlfmt": "fasta", + "fl": ",".join(["id", "AccVer_s", "Definition_s", "Nucleotide_seq"]), + # Stable sort with newest last so diffs work nicely. Columns are source + # data fields, not our output columns. + "sort": "SourceDB_s desc, CollectionDate_s asc, id asc", + # This isn't Entrez, but include the same email parameter it requires just + # to be nice. + "email": "covidcg@broadinstitute.org", + } + + headers = { + "User-Agent": "https://github.com/vector-engineering/covid-cg (covidcg@broadinstitute.org)", + } + + for _ in range(RETRY_ATTEMPTS): + try: + response = requests.get( + endpoint, params=params, headers=headers, stream=True + ) + response.raise_for_status() + break + except requests.exceptions.RequestException as e: + print(f"Error downloading metadata: {e}") + + if _ == RETRY_ATTEMPTS - 1: + raise Exception( + f"Failed to download metadata after {RETRY_ATTEMPTS} attempts" + ) + else: + print(f"Retrying download...") + + response_content = response.iter_content(chunk_size=1024, decode_unicode=True) + + for chunk in response_content: + print(chunk, end="") + + +if __name__ == "__main__": + main() diff --git a/workflow_rsv_genbank_ingest/scripts/sequence_quality.py b/workflow_rsv_genbank_ingest/scripts/sequence_quality.py new file mode 100644 index 00000000..85c0f807 --- /dev/null +++ b/workflow_rsv_genbank_ingest/scripts/sequence_quality.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +"""Collect statistics on sequence quality + +Author: Albert Chen - Vector Engineering Team (chena@broadinstitute.org) +""" + +import argparse +import re + + +def main(): + """Collect statistics on sequence quality""" + + parser = argparse.ArgumentParser() + + parser.add_argument("--input", type=str, required=True, help="Input FASTA file") + parser.add_argument( + "--output", + type=str, + required=True, + help="Output CSV file", + ) + + args = parser.parse_args() + + fp_in = open(args.input, "rt") + fp_out = open(args.output, "wt") + + fp_out.write("Accession ID,length,num_ambiguous\n") + + num_sequences = 0 + cur_entry = "" + cur_seq = "" + while True: + line = fp_in.readline() + + # Beginning of a new entry, or EOF = end of current entry + if not line or line[0] == ">": + if cur_entry: + num_ambiguous = 0 + for char in cur_seq: + if char == "N": + num_ambiguous += 1 + + # Clean up entry ID + cur_entry = cur_entry.split("|")[0].strip() + + fp_out.write( + f'"{cur_entry}",{str(len(cur_seq))},{str(num_ambiguous)}\n' + ) + num_sequences += 1 + + # If it's the end, then break out + if not line: + break + + # Reset sequence and name + cur_seq = "" + # Extract the name (up to the first whitespace) + # [1:] excludes the first '>' + # .split() breaks up the line into chunks separated by whitespace + # [0] gets the first chunk + # cur_entry = line[1:].split()[0] + # Nevermind, the fasta entries sometimes have spaces..... + # Just rstrip to remove the newline, that should work good enough + cur_entry = line[1:].rstrip() + + # Otherwise add sequence to the current entry + elif cur_entry: + cur_seq += re.sub(r"\s+", "", line).strip() + + fp_in.close() + fp_out.close() + + print("Processed {:,} sequences".format(num_sequences), flush=True) + + +if __name__ == "__main__": + main() diff --git a/workflow_rsv_genbank_ingest/scripts/split_sequences_by_subtype.py b/workflow_rsv_genbank_ingest/scripts/split_sequences_by_subtype.py index c6695c1f..12a9b188 100644 --- a/workflow_rsv_genbank_ingest/scripts/split_sequences_by_subtype.py +++ b/workflow_rsv_genbank_ingest/scripts/split_sequences_by_subtype.py @@ -2,16 +2,11 @@ # coding: utf-8 import argparse -import csv import gzip import pandas as pd -import sys - -csv.field_size_limit(sys.maxsize) def main(): - """ python3 scripts/split_sequences_by_subtype.py \ --feed {input.feed} \ @@ -24,7 +19,7 @@ def main(): parser = argparse.ArgumentParser() - parser.add_argument("--feed", type=str, required=True, help="Input feed CSV") + parser.add_argument("--feed", type=str, required=True, help="Input feed FASTA") parser.add_argument( "--metadata", type=str, required=True, help="Input metadata + subtype CSV" ) @@ -44,23 +39,45 @@ def main(): seq_a = gzip.open(args.out_A, "wt") seq_b = gzip.open(args.out_B, "wt") - with open(args.feed, "r", newline="") as fp_in: - feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"') - for row in feed_reader: + # Read sequences + cur_entry = "" + cur_seq = "" - # If this entry isn't present in the cleaned metadata, then skip - accession_id = row["genbank_accession"] - if accession_id not in metadata.index: - continue + with open(args.feed, "rt", newline="") as fp_in: + lines = fp_in.readlines() + + for i, line in enumerate(lines): + # Strip whitespace + line = line.strip() - subtype = metadata.at[accession_id, "subtype"] - if not subtype: + # Ignore empty lines that aren't the last line + if not line and i <= (len(lines) - 1): continue - if subtype == "A": - seq_a.write(">{}\n{}\n".format(accession_id, row["sequence"])) - else: - seq_b.write(">{}\n{}\n".format(accession_id, row["sequence"])) + # If not the name of an entry, add this line to the current sequence + # (some FASTA files will have multiple lines per sequence) + if line[0] != ">": + cur_seq = cur_seq + line + + # Start of another entry = end of the previous entry + if line[0] == ">" or i == (len(lines) - 1): + # Avoid capturing the first one and pushing an empty sequence + if cur_entry: + accession_id = cur_entry.split("|")[0].strip() + if accession_id in metadata.index: + subtype = metadata.at[accession_id, "subtype"] + if subtype: + if subtype == "A": + seq_a.write(">{}\n{}\n".format(accession_id, cur_seq)) + else: + seq_b.write(">{}\n{}\n".format(accession_id, cur_seq)) + + # Clear the entry and sequence + cur_entry = line[1:] + # Ignore anything past the first whitespace + if cur_entry: + cur_entry = cur_entry.split()[0] + cur_seq = "" seq_a.close() seq_b.close() diff --git a/workflow_rsv_genbank_ingest/scripts/write_all_fasta.py b/workflow_rsv_genbank_ingest/scripts/write_all_fasta.py index f9061d80..e77312fd 100755 --- a/workflow_rsv_genbank_ingest/scripts/write_all_fasta.py +++ b/workflow_rsv_genbank_ingest/scripts/write_all_fasta.py @@ -9,30 +9,68 @@ csv.field_size_limit(sys.maxsize) + def main(): parser = argparse.ArgumentParser() - parser.add_argument('--feed', type=str, required=True, help='Feed CSV file') - parser.add_argument('--metadata-in', type=str, required=True, help='Path to metadata csv file') - parser.add_argument('--out', type=str, required=True, help='Output fasta file suffix') + parser.add_argument("--feed", type=str, required=True, help="Feed FASTA file") + parser.add_argument( + "--metadata-in", type=str, required=True, help="Path to metadata csv file" + ) + parser.add_argument( + "--out", type=str, required=True, help="Output fasta file suffix" + ) args = parser.parse_args() # Load metadata - metadata = pd.read_csv(args.metadata_in, index_col='Accession ID') + metadata = pd.read_csv(args.metadata_in, index_col="Accession ID") + + # Read sequences + cur_entry = "" + cur_seq = "" - with open(args.feed, "r", newline="") as fp_in, gzip.open(args.out, "at") as fp_out: + write_count = 0 + skip_count = 0 - feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"') - for row in feed_reader: + # with open(fasta_file, "rt", encoding="latin-1") as fp: + with open(args.feed, "rt", newline="") as fp_in, gzip.open( + args.out, "at" + ) as fp_out: + lines = fp_in.readlines() + for i, line in enumerate(lines): + # Strip whitespace + line = line.strip() - # If this entry isn't present in the cleaned metadata, then skip - accession_id = row['genbank_accession'] - if accession_id not in metadata.index: + # Ignore empty lines that aren't the last line + if not line and i <= (len(lines) - 1): continue - fp_out.write('>{}\n{}\n'.format(accession_id, row['sequence'])) + # If not the name of an entry, add this line to the current sequence + # (some FASTA files will have multiple lines per sequence) + if line[0] != ">": + cur_seq = cur_seq + line + + # Start of another entry = end of the previous entry + if line[0] == ">" or i == (len(lines) - 1): + # Avoid capturing the first one and pushing an empty sequence + if cur_entry: + accession_id = cur_entry.split("|")[0].strip() + if accession_id in metadata.index: + write_count += 1 + fp_out.write(">{}\n{}\n".format(accession_id, cur_seq)) + else: + skip_count += 1 + + # Clear the entry and sequence + cur_entry = line[1:] + # Ignore anything past the first whitespace + if cur_entry: + cur_entry = cur_entry.split()[0] + cur_seq = "" + + print(f"Wrote {write_count} entries, skipped {skip_count}") -if __name__ == '__main__': - main() \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/workflow_sars2_genbank_ingest/Snakefile b/workflow_sars2_genbank_ingest/Snakefile index 87523e7d..c7412d8d 100644 --- a/workflow_sars2_genbank_ingest/Snakefile +++ b/workflow_sars2_genbank_ingest/Snakefile @@ -20,7 +20,10 @@ rule all: input: # Download latest data feed, process sequences download_status = os.path.join( - data_folder, "status", "download_" + today_str + ".done" + data_folder, "status", "download_metadata_" + today_str + ".done" + ), + chunk_status = os.path.join( + data_folder, "status", "chunk_data_" + today_str + ".done" ), copy_status = os.path.join( data_folder, "status", "merge_sequences_" + today_str + ".done" @@ -29,13 +32,12 @@ rule all: metadata = os.path.join(data_folder, "metadata.csv") min_date = pd.to_datetime(config.get('min_date', '2019-12-01')) -# max_date = pd.to_datetime(datetime.date.today().isoformat()) -max_date = pd.to_datetime('2021-03-01') +max_date = pd.to_datetime(config.get('end_date_cutoff', datetime.date.today().isoformat())) chunks = [d for d in pd.period_range(start=min_date, end=max_date, freq=config.get('dl_chunk_period', 'W'))] DL_CHUNKS = [i for i in range(len(chunks))] -rule download_chunk: +rule download_metadata_chunk: """Download the data feed in chunks, to avoid timeouts. """ output: @@ -45,12 +47,27 @@ rule download_chunk: end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z') shell: """ - python3 scripts/download.py \ + python3 scripts/download_metadata.py \ + --start-time {params.start_time} --end-time {params.end_time} \ + > {output.feed} + """ + +rule download_sequence_chunk: + """Download the data feed metadata in chunks, to avoid timeouts. + """ + output: + feed = os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'), + params: + start_time = lambda wildcards: chunks[int(wildcards.chunk)].start_time.strftime('%Y-%m-%dT%H:%M:%S.00Z'), + end_time = lambda wildcards: chunks[int(wildcards.chunk)].end_time.strftime('%Y-%m-%dT%H:%M:%S.00Z') + shell: + """ + python3 scripts/download_sequences.py \ --start-time {params.start_time} --end-time {params.end_time} \ > {output.feed} """ -rule combine_download_chunks: +rule combine_metadata_chunks: """Combine all the downloaded chunks into a single CSV file. Combining CSV files without duplicating header: https://apple.stackexchange.com/a/348619 """ @@ -59,13 +76,13 @@ rule combine_download_chunks: params: chunk_glob = os.path.join(data_folder, 'feed_chunks', 'feed*.csv') output: - feed = os.path.join(data_folder, 'feed.csv'), + metadata = os.path.join(data_folder, 'metadata_dirty.csv'), status = touch(os.path.join( - data_folder, "status", "download_" + today_str + ".done" + data_folder, "status", "download_metadata_" + today_str + ".done" )) shell: """ - awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.feed} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.metadata} """ @@ -79,18 +96,21 @@ checkpoint chunk_data: We end up with hundreds of files, but the filesystem seems to be handling it well. """ input: - feed = rules.combine_download_chunks.output.feed + metadata = rules.combine_metadata_chunks.output.metadata, + sequence_chunks = expand(os.path.join(data_folder, 'feed_chunks', 'feed_{chunk}.fasta'), chunk=DL_CHUNKS) output: - metadata_dirty = os.path.join(data_folder, "metadata_dirty.csv") + status = touch(os.path.join( + data_folder, "status", "chunk_data_" + today_str + ".done" + )) params: fasta = os.path.join(data_folder, "fasta_temp"), chunk_size = config["chunk_size"] shell: """ python3 scripts/chunk_data.py \ - --data-feed {input.feed} \ + --metadata {input.metadata} \ + --sequence-chunks {input.sequence_chunks} \ --out-fasta {params.fasta} \ - --out-metadata {output.metadata_dirty} \ --chunk-size {params.chunk_size} \ --processes {workflow.cores} """ @@ -254,7 +274,7 @@ rule clean_metadata: """Clean metadata, incorporate lineage assignments into metadata """ input: - metadata_dirty = rules.chunk_data.output.metadata_dirty, + metadata_dirty = rules.combine_metadata_chunks.output.metadata, lineages = rules.combine_lineages.output.lineages, quality = rules.combine_quality.output.quality output: diff --git a/workflow_sars2_genbank_ingest/scripts/chunk_data.py b/workflow_sars2_genbank_ingest/scripts/chunk_data.py index 4a22bb7c..2c9521d8 100644 --- a/workflow_sars2_genbank_ingest/scripts/chunk_data.py +++ b/workflow_sars2_genbank_ingest/scripts/chunk_data.py @@ -6,21 +6,16 @@ """ import argparse -import csv import datetime import gzip import multiprocessing as mp import pandas as pd -import sys from collections import defaultdict from functools import partial from pathlib import Path -csv.field_size_limit(sys.maxsize) - - def write_sequences_day(fasta_out_path, seqs): # Mode 'at' is append, in text mode with gzip.open(fasta_out_path, "at") as fp_out: @@ -34,12 +29,12 @@ def main(): Parameters ---------- - data_feed: str - - Path to data feed csv file + metadata: str + - Path to metadata csv file + sequence_chunks: list of str + - List of paths to sequence FASTA files out_fasta: str - Path to fasta output directory - out_metadata: str - - Path to metadata.csv output file chunk_size: int - Number of records to hold in RAM before flushing to disk processes: int @@ -52,16 +47,17 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument( - "--data-feed", type=str, required=True, help="Path to data feed CSV file" - ) - parser.add_argument( - "--out-fasta", type=str, required=True, help="Path to output fasta directory" + "--metadata", type=str, required=True, help="Path to metadata CSV file" ) parser.add_argument( - "--out-metadata", + "--sequence-chunks", type=str, + nargs="+", required=True, - help="Path to output metadata CSV file", + help="Sequence FASTA files", + ) + parser.add_argument( + "--out-fasta", type=str, required=True, help="Path to output fasta directory" ) parser.add_argument( "--chunk-size", @@ -93,9 +89,7 @@ def main(): # Keep track of how far we're along the current chunk chunk_i = 0 - # Store metadata entries as a list of dictionaries, for now - # We'll wrap it in a pandas DataFrame later for easier serialization - metadata_df = [] + metadata = pd.read_csv(args.metadata, index_col="genbank_accession") def flush_chunk(fasta_by_subm_date): with mp.get_context("spawn").Pool(processes=args.processes) as pool: @@ -107,50 +101,65 @@ def flush_chunk(fasta_by_subm_date): pool.close() pool.join() - with open(args.data_feed, "r", newline="") as fp_in: - # Open up the initial fasta file for the first chunk - fasta_by_subm_date = defaultdict(list) - - line_counter = 0 - - feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"') - for row in feed_reader: - # Flush results if chunk is full - if chunk_i == args.chunk_size: - flush_chunk(fasta_by_subm_date) - # Reset chunk counter - chunk_i = 0 - # Reset sequence dictionary - fasta_by_subm_date = defaultdict(list) - - # Add to metadata list - metadata_df.append({k: row[k] for k in row.keys() if k != "sequence"}) - - # Store sequence in dictionary - # Chop off the "Z" at the end of the submission time string, then parse - # as an ISO datetime format, then return just the year-month-day - subm_date = datetime.datetime.fromisoformat(row["submitted"][:-1]).strftime( - "%Y-%m-%d" - ) - fasta_by_subm_date[subm_date].append( - (row["genbank_accession"], row["sequence"]) - ) - - # Iterate the intra-chunk counter - chunk_i += 1 - - line_counter += 1 - - # Flush the last chunk - flush_chunk(fasta_by_subm_date) - - # Cast the list of dictionaries (list of metadata entries) into a pandas - # DataFrame, and then serialize it to disk - # Do this step since pandas can handle some special serialization options - # that I didn't want to implement manually (such as wrapping certain strings - # in double quotes) - metadata_df = pd.DataFrame(metadata_df) - metadata_df.to_csv(args.out_metadata, index=False) + for file in args.sequence_chunks: + with open(file, mode="rt", newline="") as fp_in: + # Open up the initial fasta file for the first chunk + fasta_by_subm_date = defaultdict(list) + + # Read sequences + cur_entry = "" + cur_seq = "" + + entries = [] + lines = fp_in.readlines() + for i, line in enumerate(lines): + # Strip whitespace + line = line.strip() + + # Ignore empty lines that aren't the last line + if not line and i <= (len(lines) - 1): + continue + + # If not the name of an entry, add this line to the current sequence + # (some FASTA files will have multiple lines per sequence) + if line[0] != ">": + cur_seq = cur_seq + line + + # Start of another entry = end of the previous entry + if line[0] == ">" or i == (len(lines) - 1): + # Avoid capturing the first one and pushing an empty sequence + if cur_entry: + entries.append((cur_entry, cur_seq)) + + for cur_entry, cur_seq in entries: + # Flush results if chunk is full + if chunk_i == args.chunk_size: + flush_chunk(fasta_by_subm_date) + # Reset chunk counter + chunk_i = 0 + # Reset sequence dictionary + fasta_by_subm_date = defaultdict(list) + + accession_id = cur_entry.split("|")[0].strip() + if accession_id not in metadata.index: + continue + + submitted = metadata.at[accession_id, "submitted"] + # Store sequence in dictionary + # Chop off the "Z" at the end of the submission time string, then parse + # as an ISO datetime format, then return just the year-month-day + subm_date = datetime.datetime.fromisoformat(submitted[:-1]).strftime( + "%Y-%m-%d" + ) + fasta_by_subm_date[subm_date].append((accession_id, cur_seq)) + + # Iterate the intra-chunk counter + chunk_i += 1 + + line_counter += 1 + + # Flush the last chunk + flush_chunk(fasta_by_subm_date) if __name__ == "__main__": diff --git a/workflow_sars2_genbank_ingest/scripts/download.py b/workflow_sars2_genbank_ingest/scripts/download_metadata.py similarity index 85% rename from workflow_sars2_genbank_ingest/scripts/download.py rename to workflow_sars2_genbank_ingest/scripts/download_metadata.py index 0022ce32..b36a6168 100644 --- a/workflow_sars2_genbank_ingest/scripts/download.py +++ b/workflow_sars2_genbank_ingest/scripts/download_metadata.py @@ -26,9 +26,10 @@ import argparse import requests +RETRY_ATTEMPTS = 5 -def main(): +def main(): parser = argparse.ArgumentParser() parser.add_argument( "--start-time", @@ -81,7 +82,7 @@ def main(): ("title", "Definition_s"), ("authors", "Authors_csv"), ("publications", "PubMed_csv"), - ("sequence", "Nucleotide_seq"), + # ("sequence", "Nucleotide_seq"), ] ), # Stable sort with newest last so diffs work nicely. Columns are source @@ -96,8 +97,22 @@ def main(): "User-Agent": "https://github.com/vector-engineering/covid-cg (covidcg@broadinstitute.org)", } - response = requests.get(endpoint, params=params, headers=headers, stream=True) - response.raise_for_status() + for _ in range(RETRY_ATTEMPTS): + try: + response = requests.get( + endpoint, params=params, headers=headers, stream=True + ) + response.raise_for_status() + break + except requests.exceptions.RequestException as e: + print(f"Error downloading metadata: {e}") + + if _ == RETRY_ATTEMPTS - 1: + raise Exception( + f"Failed to download metadata after {RETRY_ATTEMPTS} attempts" + ) + else: + print(f"Retrying download...") response_content = response.iter_content(chunk_size=1024, decode_unicode=True) diff --git a/workflow_sars2_genbank_ingest/scripts/download_sequences.py b/workflow_sars2_genbank_ingest/scripts/download_sequences.py new file mode 100644 index 00000000..54478234 --- /dev/null +++ b/workflow_sars2_genbank_ingest/scripts/download_sequences.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +""" +(2020-12-21) Derived from: +https://github.com/nextstrain/ncov-ingest: fetch-from-genbank script + +License is in LICENSE_NEXTSTRAIN, same folder + +------------------ + +Download all RSV sequences and their curated metadata from GenBank via +NCBI Virus. + +Outputs newline-delimited JSON records. + +The request this program makes is based on observing the network activity that + + https://www.ncbi.nlm.nih.gov/labs/virus/vssi/#/virus?SeqType_s=Nucleotide&VirusLineage_ss=Severe%20acute%20respiratory%20syndrome%20coronavirus%202,%20taxid:2697049 + +performs after clicking through the download interface. Some tweaks were made +by comparing different download requests and guessing, which allows us to +download the metadata + sequence in the same request instead of two. + +------------------ + +Example download: + +https://www.ncbi.nlm.nih.gov/genomes/VirusVariation/vvsearch2/?fq=%7B!tag%3DSeqType_s%7DSeqType_s:(%22Nucleotide%22)&fq=VirusLineageId_ss:(208893)&cmd=download&sort=SourceDB_s%20desc,CreateDate_dt%20desc,id%20asc&dlfmt=fasta&fl=AccVer_s,Definition_s,Nucleotide_seq + +https://www.ncbi.nlm.nih.gov/labs/virus/vssi/#/virus?SeqType_s=Nucleotide&VirusLineage_ss=Human%20respiratory%20syncytial%20virus%20A,%20taxid:208893 + +""" + + +import argparse +import requests + +RETRY_ATTEMPTS = 5 + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--start-time", + type=str, + required=True, + help="Start time in format YYYY-MM-DDTHH:MM:SS.00Z", + ) + parser.add_argument( + "--end-time", + type=str, + required=True, + help="Start time in format YYYY-MM-DDTHH:MM:SS.00Z", + ) + args = parser.parse_args() + + endpoint = "https://www.ncbi.nlm.nih.gov/genomes/VirusVariation/vvsearch2/" + params = { + # Search criteria + "fq": [ + '{!tag=SeqType_s}SeqType_s:("Nucleotide")', # Nucleotide sequences (as opposed to protein) + "VirusLineageId_ss:(2697049)", # NCBI Taxon id for RSV + f"{{!tag=CreateDate_dt}}CreateDate_dt:([{args.start_time} TO {args.end_time}])", + ], + # Unclear, but seems necessary. + "q": "*:*", + # Response format + "cmd": "download", + "dlfmt": "fasta", + "fl": ",".join(["id", "AccVer_s", "Definition_s", "Nucleotide_seq"]), + # Stable sort with newest last so diffs work nicely. Columns are source + # data fields, not our output columns. + "sort": "SourceDB_s desc, CollectionDate_s asc, id asc", + # This isn't Entrez, but include the same email parameter it requires just + # to be nice. + "email": "covidcg@broadinstitute.org", + } + + headers = { + "User-Agent": "https://github.com/vector-engineering/covid-cg (covidcg@broadinstitute.org)", + } + + for _ in range(RETRY_ATTEMPTS): + try: + response = requests.get( + endpoint, params=params, headers=headers, stream=True + ) + response.raise_for_status() + break + except requests.exceptions.RequestException as e: + print(f"Error downloading metadata: {e}") + + if _ == RETRY_ATTEMPTS - 1: + raise Exception( + f"Failed to download metadata after {RETRY_ATTEMPTS} attempts" + ) + else: + print(f"Retrying download...") + + response_content = response.iter_content(chunk_size=1024, decode_unicode=True) + + for chunk in response_content: + print(chunk, end="") + + +if __name__ == "__main__": + main()