diff --git a/Cargo.lock b/Cargo.lock index bfadd3f..baa3b38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1632,7 +1632,7 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "trgt" -version = "1.1.1" +version = "1.2.0" dependencies = [ "arrayvec", "bio", diff --git a/Cargo.toml b/Cargo.toml index 955294e..21dbd3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "trgt" -version = "1.1.1" +version = "1.2.0" edition = "2021" build = "build.rs" diff --git a/LICENSE-THIRDPARTY.json b/LICENSE-THIRDPARTY.json index e4a9f26..6b67ff3 100644 --- a/LICENSE-THIRDPARTY.json +++ b/LICENSE-THIRDPARTY.json @@ -1367,6 +1367,15 @@ "license_file": null, "description": "Semantic version parsing and comparison." }, + { + "name": "semver", + "version": "1.0.23", + "authors": "David Tolnay ", + "repository": "https://github.com/dtolnay/semver", + "license": "Apache-2.0 OR MIT", + "license_file": null, + "description": "Parser and evaluator for Cargo's flavor of Semantic Versioning" + }, { "name": "serde", "version": "1.0.197", @@ -1621,7 +1630,7 @@ }, { "name": "trgt", - "version": "1.0.0", + "version": "1.2.0", "authors": null, "repository": null, "license": null, diff --git a/README.md b/README.md index 3dd3649..a54ff71 100644 --- a/README.md +++ b/README.md @@ -119,12 +119,25 @@ tandem repeats at genome scale. 2024](https://www.nature.com/articles/s41587-023 - Updated error handling: Malformed entries are now logged as errors without terminating the program. - Added shorthand CLI options to simplify command usage. - 1.1.0 - - Added a new subcommand `trgt merge`. This command merges VCF files generated by `trgt genotype` into a joint VCF file. **Works with VCFs generated by all versions of TRGT** (the resulting joint VCF will always be in the TRGT v1.0+ format which includes padding bases). + - Added a new subcommand `trgt merge`. This command merges VCF files generated by `trgt genotype` into a joint VCF file. **Works with VCFs generated by all versions of TRGT** (the resulting joint VCF will always be in the TRGT ≥v1.0.0 format which includes padding bases). - Added subsampling of regions with ultra-high coverage (`>MAX_DEPTH * 3`, by default 750); implemented via reservoir sampling. - Fixed a cluster genotyper bug that occurred when only a single read covered a locus. - Added new logic for filtering non-HiFi reads: remove up to 3% of lower quality reads that do not match the expected repeat sequence. - 1.1.1 - Hotfix: Read filtering logic no longer removes reads without RQ tags. +- 1.1.2 + - Hotfix: Prevent genotyping without reads. + - Added the `--disable-bam-output` flag to `trgt genotype`, allowing users to disable BAMlet generation. **However, please note that BAMlets are still required for downstream tasks like trgt plot.** +- 1.2.0 + - `trgt merge`: + - Multi-sample VCF Merging: Added support for merging TRGT VCFs with any number of samples, allowing updates to large, population-scale datasets with new samples. + - Synced contig indexing: Introduced support for VCFs with inconsistent contig orderings. Additionally the new `--contigs` flag allows specifying a comma-separated list of contigs to be merged. + - The reference genome is no longer required when merging TRGT VCFs from version 1.0.0 or later. + - Merging now skips and logs problematic loci by default. Use the `--quit-on-errors` flag to terminate on errors. Statistics are logged post-merge, including counts of failed and skipped TRs. + - `trgt validate` + - Always outputs statistics directly to stdout and stderr instead of logging them. + - Bug fix: + - Resolved issue with handling bgzip-compressed BED files. ### DISCLAIMER diff --git a/build.rs b/build.rs index aef09a5..3a031b0 100644 --- a/build.rs +++ b/build.rs @@ -11,7 +11,7 @@ fn main() -> Result<(), Box> { { Ok(_) => {} Err(_e) => { - println!("cargo:rustc-env=VERGEN_GIT_DESCRIBE=unknown"); + println!("cargo:rustc-env=VERGEN_GIT_DESCRIBE="); } } Ok(()) diff --git a/docs/cli.md b/docs/cli.md index 718c310..00d978e 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -3,6 +3,7 @@ Commands: - `genotype` - `plot` +- `merge` - `validate` Options: @@ -33,6 +34,7 @@ Advanced: - `--output-flank-len ` Length of flanking sequence to report on output [default: 50]. - `--fixed-flanks` Keep flank length fixed. - `--min-read-quality ` Minimum HiFi rq value required to use a read for genotyping [default: 0.98]. +- `--disable-bam-output` Disable BAM output. - `--max-depth ` Maximum locus depth [default: 250]. ## Plot command-line @@ -52,17 +54,34 @@ Plotting: allele. Waterfall plots display unaligned repeat sequences without aligning them to the (consensus) allele. Waterfall plots are especially useful for QC of repeat calls and for visualization of mosaic expansions [default: allele]. -- `--show ` either motifs (`motifs`) or methylation +- `--show ` either motifs (`motifs`) or methylation. (`meth`) is visualized, [default: motifs]. Advanced: - `--flank-len ` Length of flanking regions [default: 50]. +## Merge command-line + +Options: +- `-v, --vcf ` VCF files to merge. +- `-g, --genome ` Path to the FASTA file containing reference genome. +- `-o, --output ` Write output to a file [standard output]. + +Advanced: +- `-O, --output-type ` Output type: u|b|v|z, u/b: un/compressed BCF, v/z: un/compressed VCF. +- `--skip-n ` Skip the first N records. +- `--process-n ` Only process N records. +- `--print-header` Print only the merged header and exit. +- `--force-single` Run even if there is only one file on input. +- `--no-version` Do not append version and command line to the header. +- `--quit-on-errors` Quit immediately on errors during merging. +- `--contig ` Process only the specified contigs (comma-separated list). + ## Validate command-line Options: -- `-g, --genome ` Path to the FASTA file containing reference genome +- `-g, --genome ` Path to the FASTA file containing reference genome. - `-b, --repeats ` BED file with repeat coordinates and structure. Advanced: -- `--flank-len ` Length of flanking regions [default: 50] \ No newline at end of file +- `--flank-len ` Length of flanking regions [default: 50]. \ No newline at end of file diff --git a/src/cli.rs b/src/cli.rs index 9eebea4..1c44344 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -13,11 +13,12 @@ use std::{ }; pub static FULL_VERSION: Lazy = Lazy::new(|| { - format!( - "{}-{}", - env!("CARGO_PKG_VERSION"), - env!("VERGEN_GIT_DESCRIBE") - ) + let git_describe = env!("VERGEN_GIT_DESCRIBE"); + if git_describe.is_empty() { + env!("CARGO_PKG_VERSION").to_string() + } else { + format!("{}-{}", env!("CARGO_PKG_VERSION"), git_describe) + } }); #[derive(Parser)] @@ -63,6 +64,7 @@ pub struct MergeArgs { #[clap(help = "VCF files to merge")] #[clap(value_name = "VCF")] #[clap(num_args = 1..)] + #[arg(value_parser = check_file_exists)] pub vcfs: Vec, #[clap(short = 'g')] @@ -70,7 +72,7 @@ pub struct MergeArgs { #[clap(help = "Path to reference genome FASTA")] #[clap(value_name = "FASTA")] #[arg(value_parser = check_file_exists)] - pub genome_path: PathBuf, + pub genome_path: Option, #[clap(short = 'o')] #[clap(long = "output")] @@ -134,6 +136,18 @@ pub struct MergeArgs { #[clap(value_parser(["exact"]))] #[clap(default_value = "exact")] pub merge_strategy: String, + + #[clap(help_heading("Advanced"))] + #[clap(long = "quit-on-errors")] + #[clap(help = "Quit immediately on errors during merging")] + pub quit_on_error: bool, + + #[clap(help_heading("Advanced"))] + #[clap(long = "contig")] + #[clap(value_name = "CONTIG")] + #[clap(help = "Process only the specified contigs (comma-separated list)")] + #[clap(value_delimiter = ',')] + pub contigs: Option>, } #[derive(Parser, Debug)] @@ -248,6 +262,11 @@ pub struct GenotypeArgs { // #[arg(value_parser = ensure_unit_float)] pub min_hifi_read_qual: f64, + #[clap(help_heading("Advanced"))] + #[clap(long = "disable-bam-output")] + #[clap(help = "Disable BAM output")] + pub disable_bam_output: bool, + #[clap(help_heading("Advanced"))] #[clap(long = "max-depth")] #[clap(value_name = "MAX_DEPTH")] @@ -363,7 +382,8 @@ pub fn init_verbose(args: &Cli) { let filter_level: LevelFilter = match args.verbosity { 0 => LevelFilter::Warn, 1 => LevelFilter::Info, - _ => LevelFilter::Debug, + 2 => LevelFilter::Debug, + _ => LevelFilter::Trace, }; env_logger::Builder::from_default_env() diff --git a/src/commands/genotype.rs b/src/commands/genotype.rs index d00c590..fa7b2da 100644 --- a/src/commands/genotype.rs +++ b/src/commands/genotype.rs @@ -57,9 +57,15 @@ pub fn trgt(args: GenotypeArgs) -> Result<()> { })?; let output_flank_len = std::cmp::min(args.flank_len, args.output_flank_len); - let mut bam_writer = create_writer(&args.output_prefix, "spanning.bam", |path| { - BamWriter::new(path, bam_header, output_flank_len) - })?; + let bam_writer = if !args.disable_bam_output { + Some(create_writer( + &args.output_prefix, + "spanning.bam", + |path| BamWriter::new(path, bam_header, output_flank_len), + )?) + } else { + None + }; let (sender_locus, receiver_locus) = bounded(CHANNEL_BUFFER_SIZE); let locus_stream_thread = thread::spawn(move || { @@ -75,9 +81,15 @@ pub fn trgt(args: GenotypeArgs) -> Result<()> { let (sender_result, receiver_result) = bounded(CHANNEL_BUFFER_SIZE); let writer_thread = thread::spawn(move || { - for (locus, results) in &receiver_result { - vcf_writer.write(&locus, &results); - bam_writer.write(&locus, &results); + if let Some(mut bam_writer) = bam_writer { + for (locus, results) in &receiver_result { + vcf_writer.write(&locus, &results); + bam_writer.write(&locus, &results); + } + } else { + for (locus, results) in &receiver_result { + vcf_writer.write(&locus, &results); + } } }); diff --git a/src/commands/merge.rs b/src/commands/merge.rs index e94127e..f5cc7d3 100644 --- a/src/commands/merge.rs +++ b/src/commands/merge.rs @@ -12,7 +12,7 @@ pub fn merge(args: MergeArgs) -> Result<()> { return Ok(()); } - vcf_processor.merge_variants(); + vcf_processor.merge_variants()?; // TODO: If --output, --write-index is set and the output is compressed, index the file log::info!("Total execution time: {:.2?}", start_timer.elapsed()); diff --git a/src/commands/validate.rs b/src/commands/validate.rs index 4a10091..209de00 100644 --- a/src/commands/validate.rs +++ b/src/commands/validate.rs @@ -1,6 +1,9 @@ use crate::cli::ValidateArgs; use crate::trgt::locus::get_loci; -use crate::utils::{open_catalog_reader, open_genome_reader, Genotyper, Karyotype, Result}; +use crate::utils::{ + format_number_with_commas, open_catalog_reader, open_genome_reader, Genotyper, Karyotype, + Result, +}; pub fn validate(args: ValidateArgs) -> Result<()> { let catalog_reader = open_catalog_reader(&args.repeats_path)?; @@ -24,7 +27,7 @@ pub fn validate(args: ValidateArgs) -> Result<()> { success_count += 1 } Err(e) => { - log::error!("{}", e); + eprintln!("{}", e); error_count += 1; } } @@ -37,7 +40,7 @@ pub fn validate(args: ValidateArgs) -> Result<()> { let success_percentage = (success_count as f64 / total as f64) * 100.0; let error_percentage = (error_count as f64 / total as f64) * 100.0; - log::info!( + println!( "Motifs per Locus - Range: [{},{}], Median: {:.2}, Mean: {:.2}, StdDev: {:.2}", motifs_stats.min, motifs_stats.max, @@ -45,22 +48,21 @@ pub fn validate(args: ValidateArgs) -> Result<()> { motifs_stats.median, motifs_stats.std_dev ); - log::info!( + println!( "TR Lengths - Range: [{},{}], Median: {:.2}, Mean: {:.2}, StdDev: {:.2}", - tr_stats.min, - tr_stats.max, - tr_stats.mean, - tr_stats.median, - tr_stats.std_dev + tr_stats.min, tr_stats.max, tr_stats.mean, tr_stats.median, tr_stats.std_dev ); match error_count { - 0 => log::info!("Validation successful. Loci pass={}", success_count), - _ => log::info!( - "Validation failed. Loci pass={} ({:.2}%), fail={} ({:.2}%)", - success_count, + 0 => println!( + "Validation successful. Loci pass = {}", + format_number_with_commas(success_count) + ), + _ => println!( + "Validation failed. Loci pass = {} ({:.2}%), fail = {} ({:.2}%)", + format_number_with_commas(success_count), success_percentage, - error_count, + format_number_with_commas(error_count), error_percentage ), } diff --git a/src/merge/strategy/exact.rs b/src/merge/strategy/exact.rs index 74df07b..c5bb31d 100644 --- a/src/merge/strategy/exact.rs +++ b/src/merge/strategy/exact.rs @@ -1,20 +1,26 @@ +use crate::utils::Result; use rust_htslib::bcf::record::GenotypeAllele; use std::collections::{HashMap, HashSet}; +#[allow(clippy::type_complexity)] pub fn merge_exact( - sample_gts: Vec>, + vcf_gts: Vec>>, sample_alleles: Vec>, -) -> (Vec>, Vec<&[u8]>) { +) -> Result<(Vec>>, Vec<&[u8]>)> { let mut ref_allele: Option<&[u8]> = None; let mut all_alleles: HashSet<&[u8]> = HashSet::new(); for sample_allele in sample_alleles.iter() { if !sample_allele.is_empty() { - if let Some(ref_allele) = &ref_allele { - assert_eq!( - ref_allele, &sample_allele[0], - "Reference alleles do not match" - ); + if let Some(existing_ref_allele) = &ref_allele { + if existing_ref_allele != &sample_allele[0] { + return Err(format_args!( + "Reference alleles do not match: '{}' and '{}'", + String::from_utf8_lossy(existing_ref_allele), + String::from_utf8_lossy(sample_allele[0]) + ) + .to_string()); + } } else { ref_allele = Some(sample_allele[0]); } @@ -23,10 +29,13 @@ pub fn merge_exact( } } } - let ref_allele = ref_allele.expect("No reference allele found"); + let ref_allele = ref_allele.ok_or_else(|| "No reference allele found".to_string())?; let mut sorted_alleles: Vec<&[u8]> = all_alleles.into_iter().collect(); - sorted_alleles.sort_by_key(|a| a.len()); + sorted_alleles.sort_by(|a, b| match a.len().cmp(&b.len()) { + std::cmp::Ordering::Equal => a.cmp(b), + other => other, + }); sorted_alleles.insert(0, ref_allele); let allele_to_index: HashMap<&[u8], usize> = sorted_alleles @@ -35,37 +44,47 @@ pub fn merge_exact( .map(|(idx, &allele)| (allele, idx)) .collect(); - let mut out_sample_gts: Vec> = Vec::new(); - for (i, sample_gt) in sample_gts.iter().enumerate() { - let mut out_gt: Vec = Vec::new(); - for gt in sample_gt { - match gt { - GenotypeAllele::PhasedMissing | GenotypeAllele::UnphasedMissing => out_gt.push(*gt), - GenotypeAllele::Phased(pos) | GenotypeAllele::Unphased(pos) => { - let pos_usize: usize = (*pos).try_into().expect("Index out of range"); - let pos_converted = allele_to_index[&sample_alleles[i][pos_usize]]; - let new_gt = match gt { - GenotypeAllele::Phased(_) => GenotypeAllele::Phased(pos_converted as i32), - GenotypeAllele::Unphased(_) => { - GenotypeAllele::Unphased(pos_converted as i32) - } - _ => unreachable!(), - }; - out_gt.push(new_gt); + let mut out_sample_gts = Vec::new(); + for (i, vcf_gt) in vcf_gts.iter().enumerate() { + let mut out_gt = Vec::new(); + for sample_gt in vcf_gt { + let mut s_gt = Vec::new(); + for gt in sample_gt { + match gt { + GenotypeAllele::PhasedMissing | GenotypeAllele::UnphasedMissing => { + s_gt.push(*gt) + } + GenotypeAllele::Phased(pos) | GenotypeAllele::Unphased(pos) => { + let pos_usize: usize = (*pos) + .try_into() + .map_err(|_| format!("Index out of range: {}", pos))?; + let pos_converted = sample_alleles[i] + .get(pos_usize) + .and_then(|allele| allele_to_index.get(allele)) + .ok_or_else(|| { + format!( + "Index out of bounds or allele not found in index: {:?}", + &sample_alleles[i].get(pos_usize) + ) + })?; + let new_gt = match gt { + GenotypeAllele::Phased(_) => { + GenotypeAllele::Phased(*pos_converted as i32) + } + GenotypeAllele::Unphased(_) => { + GenotypeAllele::Unphased(*pos_converted as i32) + } + _ => unreachable!(), + }; + s_gt.push(new_gt); + } } } + out_gt.push(s_gt); } out_sample_gts.push(out_gt); } - (out_sample_gts, sorted_alleles) -} - -#[allow(dead_code)] -fn vec_to_comma_separated_string(vec: Vec<&[u8]>) -> String { - vec.into_iter() - .map(|slice| String::from_utf8_lossy(slice).to_string()) - .collect::>() - .join(",") + Ok((out_sample_gts, sorted_alleles)) } #[cfg(test)] @@ -75,14 +94,26 @@ mod tests { #[test] fn test_merge_exact() { let sample_gts = vec![ - vec![GenotypeAllele::Unphased(1), GenotypeAllele::Unphased(2)], - vec![GenotypeAllele::Unphased(1), GenotypeAllele::Unphased(2)], - vec![GenotypeAllele::Unphased(0), GenotypeAllele::Unphased(0)], - vec![ + vec![vec![ + GenotypeAllele::Unphased(1), + GenotypeAllele::Unphased(2), + ]], + vec![vec![ + GenotypeAllele::Unphased(1), + GenotypeAllele::Unphased(2), + ]], + vec![vec![ + GenotypeAllele::Unphased(0), + GenotypeAllele::Unphased(0), + ]], + vec![vec![ GenotypeAllele::UnphasedMissing, GenotypeAllele::UnphasedMissing, - ], - vec![GenotypeAllele::Unphased(1), GenotypeAllele::Unphased(2)], + ]], + vec![vec![ + GenotypeAllele::Unphased(1), + GenotypeAllele::Unphased(2), + ]], ]; let sample_alleles = vec![ @@ -117,7 +148,7 @@ mod tests { ], ]; - let (out_gts, sorted_alleles) = merge_exact(sample_gts, sample_alleles); + let (out_gts, sorted_alleles) = merge_exact(sample_gts, sample_alleles).unwrap(); assert_eq!( sorted_alleles[0], @@ -127,26 +158,26 @@ mod tests { assert_eq!(sorted_alleles.len(), 6); assert_eq!( - out_gts[0], + out_gts[0][0], vec![GenotypeAllele::Unphased(1), GenotypeAllele::Unphased(2)] ); assert_eq!( - out_gts[1], + out_gts[1][0], vec![GenotypeAllele::Unphased(1), GenotypeAllele::Unphased(3)] ); assert_eq!( - out_gts[2], + out_gts[2][0], vec![GenotypeAllele::Unphased(0), GenotypeAllele::Unphased(0)] ); assert_eq!( - out_gts[3], + out_gts[3][0], vec![ GenotypeAllele::UnphasedMissing, GenotypeAllele::UnphasedMissing ] ); assert_eq!( - out_gts[4], + out_gts[4][0], vec![GenotypeAllele::Unphased(4), GenotypeAllele::Unphased(5)] ); } @@ -154,8 +185,11 @@ mod tests { #[test] fn test_merge_exact_phasing() { let sample_gts = vec![ - vec![GenotypeAllele::Unphased(1), GenotypeAllele::Unphased(2)], - vec![GenotypeAllele::Phased(1), GenotypeAllele::Phased(2)], + vec![vec![ + GenotypeAllele::Unphased(1), + GenotypeAllele::Unphased(2), + ]], + vec![vec![GenotypeAllele::Phased(1), GenotypeAllele::Phased(2)]], ]; let sample_alleles = vec![ @@ -175,7 +209,7 @@ mod tests { ], ]; - let (out_gts, sorted_alleles) = merge_exact(sample_gts, sample_alleles); + let (out_gts, sorted_alleles) = merge_exact(sample_gts, sample_alleles).unwrap(); assert_eq!( sorted_alleles[0], @@ -185,11 +219,11 @@ mod tests { assert_eq!(sorted_alleles.len(), 5); assert_eq!( - out_gts[0], + out_gts[0][0], vec![GenotypeAllele::Unphased(1), GenotypeAllele::Unphased(2)] ); assert_eq!( - out_gts[1], + out_gts[1][0], vec![GenotypeAllele::Phased(3), GenotypeAllele::Phased(4)] ); } diff --git a/src/merge/vcf_processor.rs b/src/merge/vcf_processor.rs index 663207a..5ba4b9b 100644 --- a/src/merge/vcf_processor.rs +++ b/src/merge/vcf_processor.rs @@ -5,36 +5,47 @@ use super::{ }; use crate::{ cli::MergeArgs, - utils::{open_genome_reader, Result}, + utils::{format_number_with_commas, open_genome_reader, Result}, }; use once_cell::sync::Lazy; use rust_htslib::{ - bcf::{self, header::HeaderView, record::GenotypeAllele, Record}, + bcf::{self, record::GenotypeAllele, Record}, faidx, }; use semver::Version; -use std::{any::Any, cmp::Ordering, collections::BinaryHeap, env}; +use std::{ + any::Any, + cmp::Ordering, + collections::{BinaryHeap, HashSet}, + env, +}; const MISSING_INTEGER: i32 = i32::MIN; const VECTOR_END_INTEGER: i32 = i32::MIN + 1; static MISSING_FLOAT: Lazy = Lazy::new(|| f32::from_bits(0x7F80_0001)); static VECTOR_END_FLOAT: Lazy = Lazy::new(|| f32::from_bits(0x7F80_0002)); -fn _vec_to_comma_separated_string(vec: Vec<&[u8]>) -> String { - vec.into_iter() - .map(|slice| String::from_utf8_lossy(slice).to_string()) - .collect::>() - .join(",") +struct FormatData { + als: Vec, + allrs: Vec>, + sds: Vec, + mcs: Vec>, + mss: Vec>, + aps: Vec, + ams: Vec, } -fn _header_to_string(header: &bcf::Header) -> String { - unsafe { - let header_ptr = header.inner; - let mut header_len: i32 = 0; - let header_cstr = rust_htslib::htslib::bcf_hdr_fmt_text(header_ptr, 0, &mut header_len); - std::ffi::CStr::from_ptr(header_cstr) - .to_string_lossy() - .into_owned() +impl FormatData { + fn new() -> Self { + FormatData { + als: Vec::new(), + allrs: Vec::new(), + sds: Vec::new(), + mcs: Vec::new(), + mss: Vec::new(), + aps: Vec::new(), + ams: Vec::new(), + } } } @@ -100,7 +111,7 @@ struct VcfRecordWithSource { impl PartialEq for VcfRecordWithSource { fn eq(&self, other: &Self) -> bool { - self.record.rid() == other.record.rid() && self.record.pos() == other.record.pos() + self.record.pos() == other.record.pos() } } @@ -114,21 +125,19 @@ impl PartialOrd for VcfRecordWithSource { impl Ord for VcfRecordWithSource { fn cmp(&self, other: &Self) -> Ordering { - match self.record.rid().cmp(&other.record.rid()) { - Ordering::Equal => self.record.pos().cmp(&other.record.pos()).reverse(), - other => other.reverse(), - } + self.record.pos().cmp(&other.record.pos()).reverse() } } pub struct VcfProcessor { pub readers: Vec, pub writer: VcfWriter, - pub genome_reader: faidx::Reader, // TODO: Make optional? Only needed for <1.0 - // TODO: add args struct + pub genome_reader: Option, + pub contig_order: Vec, pub skip_n: usize, pub process_n: usize, pub needs_padding: bool, + pub quit_on_error: bool, } impl VcfProcessor { @@ -138,22 +147,52 @@ impl VcfProcessor { return Err("Expected two or more files to merge, got only one. Use --force-single to proceed anyway".into()); } - let genome_reader = open_genome_reader(&args.genome_path)?; - let out_header = Self::create_output_header(&vcf_readers, args)?; - let writer = VcfWriter::new(&out_header, &args.output_type, args.output.as_ref())?; + let mut contig_order = vcf_readers.get_contig_order()?; + + if let Some(ref user_contigs) = args.contigs { + let user_contig_set: HashSet<&String> = user_contigs.iter().collect(); + let original_contig_set: HashSet<&String> = contig_order.iter().collect(); + + if !user_contig_set.is_subset(&original_contig_set) { + let missing_contigs: Vec<&&String> = + user_contig_set.difference(&original_contig_set).collect(); + return Err(format!( + "The following user-specified contigs do not exist in the VCF files: {:?}", + missing_contigs + )); + } + contig_order.retain(|contig| user_contig_set.contains(contig)); + } let needs_padding = vcf_readers .readers .iter() .any(|reader| reader.version.major < Version::new(1, 0, 0).major); + let genome_reader = if needs_padding { + Some(open_genome_reader(args.genome_path.as_ref().ok_or( + "A reference genome is required for merging pre v1.0 TRGT VCFs, provide as --genome ref.fa" + )?)?) + } else { + None + }; + + let out_header = Self::create_output_header(&vcf_readers, args)?; + let writer = VcfWriter::new(&out_header, &args.output_type, args.output.as_ref())?; + + if needs_padding { + log::debug!("At least one VCF file is pre-1.0 and needs base padding!"); + } + Ok(VcfProcessor { readers: vcf_readers.readers, writer, genome_reader, + contig_order, skip_n: args.skip_n.unwrap_or(0), process_n: args.process_n.unwrap_or(usize::MAX), needs_padding, + quit_on_error: args.quit_on_error, }) } @@ -186,11 +225,194 @@ impl VcfProcessor { ); out_header.push_record(version_line.as_bytes()); - let command_line = env::args().collect::>().join(" "); - let command_line = format!("##{}Command={}", env!("CARGO_PKG_NAME"), command_line); + let command_line = format!( + "##{}Command={}", + env!("CARGO_PKG_NAME"), + env::args().collect::>().join(" ") + ); out_header.push_record(command_line.as_bytes()); } + pub fn merge_variants(&mut self) -> Result<()> { + let mut n = 0; + let mut n_processed = 0; + let mut n_failed = 0; + + let mut sample_records = vec![None; self.readers.len()]; + + for contig in &self.contig_order.clone() { + let mut heap = self.init_heap(contig)?; + + while let Some(min_element) = heap.pop() { + let min_pos = min_element.record.pos(); + sample_records[min_element.reader_index] = Some(min_element.record); + + while let Some(peek_next_element) = heap.peek() { + if peek_next_element.record.pos() == min_pos { + let next_element = heap.pop().unwrap(); + sample_records[next_element.reader_index] = Some(next_element.record); + } else { + break; + } + } + + if n >= self.skip_n { + log::trace!("Processing: {}:{}", contig, min_pos); + if self.needs_padding { + self.add_padding_base(&mut sample_records, contig, min_pos); + } + match self.merge_variant(&sample_records, contig, min_pos) { + Ok(_) => { + n_processed += 1; + if n_processed >= self.process_n { + return Ok(()); + } + } + Err(e) => { + if self.quit_on_error { + return Err(e); + } + n_failed += 1; + log::warn!("{} Skipping...", e); + } + } + } + n += 1; + + self.update_heap(&mut heap, &sample_records)?; + sample_records.fill(None); + } + } + let mut log_message = format!( + "Successfully merged {} TR sites.", + format_number_with_commas(n_processed) + ); + if n_failed > 0 { + log_message.push_str(&format!( + " Failed to merge {} TR sites!", + format_number_with_commas(n_failed) + )); + } + log::info!("{}", log_message); + Ok(()) + } + + fn init_heap(&mut self, contig: &str) -> Result> { + let mut heap = BinaryHeap::new(); + for (index, reader) in self.readers.iter_mut().enumerate() { + let rid = match reader.header.name2rid(contig.as_bytes()) { + Ok(id) => id, + Err(_) => continue, + }; + + if reader.reader.fetch(rid, 0, None).is_err() { + continue; + } + + if reader.advance() { + heap.push(VcfRecordWithSource { + record: reader.current_record.clone(), + reader_index: index, + }); + } + } + Ok(heap) + } + + fn update_heap( + &mut self, + heap: &mut BinaryHeap, + sample_records: &[Option], + ) -> Result<()> { + for (index, record) in sample_records.iter().enumerate() { + if record.is_some() && self.readers[index].advance() { + heap.push(VcfRecordWithSource { + record: self.readers[index].current_record.clone(), + reader_index: index, + }); + } + } + Ok(()) + } + + fn get_padding_base(&self, contig: &str, pos: i64) -> Vec { + if let Some(genome_reader) = &self.genome_reader { + genome_reader + .fetch_seq(contig, pos as usize, pos as usize) + .unwrap_or_else(|_| panic!("Failed to fetch sequence for chromosome {}", contig)) + .to_ascii_uppercase() + } else { + panic!("Genome reader is not available, but padding is required") + } + } + + fn add_padding_base(&mut self, sample_records: &mut [Option], contig: &str, pos: i64) { + let padding_base = self.get_padding_base(contig, pos); + + for (record, reader) in sample_records.iter_mut().zip(&self.readers) { + if reader.version.major >= Version::new(1, 0, 0).major { + continue; + } + + let Some(record) = record else { continue }; + + let al_0 = record + .format(b"AL") + .integer() + .expect("Error accessing FORMAT AL")[0] + .iter() + .min() + .cloned() + .unwrap(); + + // Skip zero-length allele records + if al_0 != 0 { + let new_alleles: Vec> = record + .alleles() + .iter() + .map(|allele| { + let mut new_allele = Vec::with_capacity(1 + allele.len()); + new_allele.extend_from_slice(&padding_base); + new_allele.extend_from_slice(allele); + new_allele + }) + .collect(); + let new_alleles_refs: Vec<&[u8]> = + new_alleles.iter().map(|a| a.as_slice()).collect(); + record + .set_alleles(&new_alleles_refs) + .expect("Failed to set alleles") + } + } + } + + fn merge_variant( + &mut self, + sample_records: &[Option], + contig: &str, + pos: i64, + ) -> Result<()> { + let template_record = self.get_template_record(sample_records); + self.set_dummy_record_fields(template_record); + + let mut format_data = FormatData::new(); + // The outermost Vec represents different VCFs > different samples within a VCF > genotype alleles of a sample + let mut gt_vecs: Vec>> = Vec::new(); + let mut alleles: Vec> = Vec::new(); + + for (record_i, record) in sample_records.iter().enumerate() { + if let Some(record) = record { + self.process_record(record, &mut format_data, &mut gt_vecs, &mut alleles); + } else { + self.process_missing_record(record_i, &mut format_data, &mut gt_vecs, &mut alleles); + } + } + + self.merge_and_write_data(format_data, gt_vecs, alleles) + .map_err(|e| format!("Failed to merge at {}:{}: {}.", contig, pos, e,))?; + Ok(()) + } + fn set_info_field(&mut self, record: &Record, field_name: &[u8], field_type: FieldType) { match field_type { FieldType::String => { @@ -210,311 +432,227 @@ impl VcfProcessor { } } - fn merge_variant(&mut self, sample_records: &[Option]) { + fn get_template_record<'a>(&self, sample_records: &'a [Option]) -> &'a Record { let template_index = sample_records.iter().position(|r| r.is_some()).unwrap(); - let template_record = sample_records[template_index].as_ref().unwrap(); + sample_records[template_index].as_ref().unwrap() + } + fn set_dummy_record_fields(&mut self, template_record: &Record) { self.writer.dummy_record.set_rid(template_record.rid()); self.writer.dummy_record.set_pos(template_record.pos()); self.writer.dummy_record.set_qual(template_record.qual()); - // TODO: Consolidate logic to allow for generic INFO fields: i32, f32, etc... self.set_info_field(template_record, b"TRID", FieldType::String); self.set_info_field(template_record, b"END", FieldType::Integer); self.set_info_field(template_record, b"MOTIFS", FieldType::String); self.set_info_field(template_record, b"STRUC", FieldType::String); + } - // TODO: Clean this up - // TODO: Consolidate logic to allow for generic FORMAT fields: i32, f32, etc... - let mut als = Vec::new(); - let mut allrs = Vec::new(); - let mut sds = Vec::new(); - let mut mcs = Vec::new(); - let mut mss = Vec::new(); - let mut aps = Vec::new(); - let mut ams = Vec::new(); - let mut gt_vecs = Vec::new(); - let mut alleles = Vec::new(); - - for record in sample_records.iter() { - if let Some(record) = record { - // TODO: Allow multiple Samples per record, at the moment we just take the first element - alleles.push(record.alleles()); + fn process_record<'a>( + &self, + record: &'a Record, + format_data: &mut FormatData, + gt_vecs: &mut Vec>>, + alleles: &mut Vec>, + ) { + alleles.push(record.alleles().to_vec()); + self.process_genotypes(record, gt_vecs); + self.process_format_fields(record, format_data); + } - // GT - let gt_field = record.genotypes().unwrap(); - let gt = gt_field.get(0); - gt_vecs.push(gt.iter().copied().collect()); + fn process_missing_record( + &self, + record_i: usize, + format_data: &mut FormatData, + gt_vecs: &mut Vec>>, + alleles: &mut Vec>, + ) { + alleles.push(vec![]); + let mut tmp_gt_vec = Vec::new(); + for _ in 0..self.readers[record_i].sample_n { + tmp_gt_vec.push(vec![GenotypeAllele::UnphasedMissing]); + PushMissingAndEnd::push_missing_and_end(&mut format_data.als); + PushMissingAndEnd::push_missing_and_end(&mut format_data.sds); + PushMissingAndEnd::push_missing_and_end(&mut format_data.aps); + PushMissingAndEnd::push_missing_and_end(&mut format_data.ams); + PushMissingAndEnd::push_missing_and_end(&mut format_data.allrs); + PushMissingAndEnd::push_missing_and_end(&mut format_data.mcs); + PushMissingAndEnd::push_missing_and_end(&mut format_data.mss); + } + gt_vecs.push(tmp_gt_vec); + } - // TODO: Factor out redundancy - let al_field = record - .format(b"AL") - .integer() - .expect("Error accessing FORMAT AL"); - als.extend(al_field[0].iter().copied()); - if al_field[0].len() == 1 { - als.push(VECTOR_END_INTEGER); - } + fn process_genotypes(&self, record: &Record, gt_vecs: &mut Vec>>) { + let gt_field = record.genotypes().unwrap(); + let mut file_gt_vecs: Vec> = Vec::new(); + for i in 0..record.sample_count() { + let gt = gt_field.get(i as usize); + file_gt_vecs.push(gt.iter().copied().collect()); + } + gt_vecs.push(file_gt_vecs); + } - let allr = match record.format(b"ALLR").string() { - Ok(field) => field[0].to_vec(), - // Handle TRGT <=v0.3.4 - Err(_) => { - let alci_field = record.format(b"ALCI").string().unwrap(); - alci_field[0].to_vec() - } - }; - allrs.push(allr); + fn merge_and_write_data( + &mut self, + format_data: FormatData, + gt_vecs: Vec>>, + alleles: Vec>, + ) -> Result<()> { + let (out_gts, out_alleles) = merge_exact(gt_vecs, alleles)?; + self.writer + .dummy_record + .set_alleles(&out_alleles) + .map_err(|e| e.to_string())?; - let sd_field = record - .format(b"SD") - .integer() - .expect("Error accessing FORMAT SD"); - sds.extend(sd_field[0].iter().copied()); - if sd_field[0].len() == 1 { - sds.push(VECTOR_END_INTEGER); - } + let gts_new = self.flatten_genotypes(out_gts); + self.write_format_fields(gts_new, format_data)?; - let mc_field = record - .format(b"MC") - .string() - .expect("Error acessing FORMAT MC"); - let mc = mc_field[0].to_vec(); - mcs.push(mc); - - let ms_field = record - .format(b"MS") - .string() - .expect("Error acessing FORMAT MS"); - let ms = ms_field[0].to_vec(); - mss.push(ms); - - let ap_field = record - .format(b"AP") - .float() - .expect("Error accessing FORMAT AP"); - aps.extend(ap_field[0].iter().copied()); - if ap_field[0].len() == 1 { - aps.push(*VECTOR_END_FLOAT); - } + self.writer + .writer + .write(&self.writer.dummy_record) + .map_err(|e| e.to_string())?; + self.writer.dummy_record.clear(); + Ok(()) + } - let am_field = match record.format(b"AM").float() { - Ok(field) => field[0].to_vec(), - // Handle TRGT <=v0.4.0 - Err(_) => { - let int_field = record - .format(b"AM") - .integer() - .expect("Error accessing FORMAT AM as an integer"); - int_field[0] - .iter() - .map(|&i| { - // Account for missing values - if i == i32::MIN { - *MISSING_FLOAT - } else { - i as f32 / 255.0 - } - }) - .collect::>() - } - }; - ams.extend(am_field.iter().copied()); - if am_field.len() == 1 { - ams.push(*VECTOR_END_FLOAT); + fn flatten_genotypes(&self, out_gts: Vec>>) -> Vec { + let mut gts_new = Vec::new(); + for file_gts in out_gts { + for sample_gt in file_gts { + let mut converted_sample_gt: Vec = + sample_gt.iter().map(|gt| i32::from(*gt)).collect(); + if converted_sample_gt.len() == 1 { + converted_sample_gt.push(VECTOR_END_INTEGER); } - } else { - gt_vecs.push(vec![GenotypeAllele::UnphasedMissing]); - alleles.push(vec![]); - - PushMissingAndEnd::push_missing_and_end(&mut als); - PushMissingAndEnd::push_missing_and_end(&mut sds); - PushMissingAndEnd::push_missing_and_end(&mut aps); - PushMissingAndEnd::push_missing_and_end(&mut ams); - PushMissingAndEnd::push_missing_and_end(&mut allrs); - PushMissingAndEnd::push_missing_and_end(&mut mcs); - PushMissingAndEnd::push_missing_and_end(&mut mss); + gts_new.extend(converted_sample_gt); } } + gts_new + } - // Merge alleles and genotypes - let (out_gts, out_alleles) = merge_exact(gt_vecs, alleles); - self.writer.dummy_record.set_alleles(&out_alleles).unwrap(); - - // Flatten to a 1D 2D representation using - let mut gts_new: Vec = Vec::new(); - for sample_gt in out_gts { - let mut converted_sample_gt: Vec = - sample_gt.iter().map(|gt| i32::from(*gt)).collect(); - if converted_sample_gt.len() == 1 { - converted_sample_gt.push(VECTOR_END_INTEGER); - } - gts_new.extend(converted_sample_gt); - } - // - + fn write_format_fields(&mut self, gts_new: Vec, format_data: FormatData) -> Result<()> { self.writer .dummy_record .push_format_integer(b"GT", >s_new) - .unwrap(); - + .map_err(|e| e.to_string())?; self.writer .dummy_record - .push_format_integer(b"AL", &als) - .unwrap(); - + .push_format_integer(b"AL", &format_data.als) + .map_err(|e| e.to_string())?; self.writer .dummy_record - .push_format_string(b"ALLR", &allrs) - .unwrap(); - + .push_format_string(b"ALLR", &format_data.allrs) + .map_err(|e| e.to_string())?; self.writer .dummy_record - .push_format_integer(b"SD", &sds) - .unwrap(); - + .push_format_integer(b"SD", &format_data.sds) + .map_err(|e| e.to_string())?; self.writer .dummy_record - .push_format_string(b"MC", &mcs) - .unwrap(); - + .push_format_string(b"MC", &format_data.mcs) + .map_err(|e| e.to_string())?; self.writer .dummy_record - .push_format_string(b"MS", &mss) - .unwrap(); - + .push_format_string(b"MS", &format_data.mss) + .map_err(|e| e.to_string())?; self.writer .dummy_record - .push_format_float(b"AP", &aps) - .unwrap(); - + .push_format_float(b"AP", &format_data.aps) + .map_err(|e| e.to_string())?; self.writer .dummy_record - .push_format_float(b"AM", &ams) - .unwrap(); + .push_format_float(b"AM", &format_data.ams) + .map_err(|e| e.to_string())?; + Ok(()) + } - self.writer.writer.write(&self.writer.dummy_record).unwrap(); + fn process_format_fields(&self, record: &Record, format_data: &mut FormatData) { + self.process_integer_field(record, b"AL", &mut format_data.als); - self.writer.dummy_record.clear(); + match record.format(b"ALLR").string() { + Ok(_) => self.process_string_field(record, b"ALLR", &mut format_data.allrs), + // Handle TRGT <=v0.3.4 + Err(_) => self.process_string_field(record, b"ALCI", &mut format_data.allrs), + } + + self.process_integer_field(record, b"SD", &mut format_data.sds); + self.process_string_field(record, b"MC", &mut format_data.mcs); + self.process_string_field(record, b"MS", &mut format_data.mss); + self.process_float_field(record, b"AP", &mut format_data.aps); + self.process_am_field(record, &mut format_data.ams); } - fn init_heap(&mut self) -> BinaryHeap { - let mut heap = BinaryHeap::new(); - for (index, reader) in self.readers.iter_mut().enumerate() { - if reader.advance() { - heap.push(VcfRecordWithSource { - record: reader.current_record.clone(), - reader_index: index, - }); + fn process_integer_field(&self, record: &Record, field_name: &[u8], values: &mut Vec) { + let field = record.format(field_name).integer().unwrap_or_else(|_| { + panic!( + "Error accessing FORMAT {}", + String::from_utf8_lossy(field_name) + ) + }); + for sample_values in field.iter() { + values.extend(sample_values.iter().copied()); + if sample_values.len() <= 1 { + values.push(VECTOR_END_INTEGER); } } - heap } - fn update_heap( - &mut self, - heap: &mut BinaryHeap, - sample_records: &[Option], - ) { - for (index, record) in sample_records.iter().enumerate() { - if record.is_some() && self.readers[index].advance() { - heap.push(VcfRecordWithSource { - record: self.readers[index].current_record.clone(), - reader_index: index, - }); + fn process_float_field(&self, record: &Record, field_name: &[u8], values: &mut Vec) { + let field = record.format(field_name).float().unwrap_or_else(|_| { + panic!( + "Error accessing FORMAT {}", + String::from_utf8_lossy(field_name) + ) + }); + for sample_values in field.iter() { + values.extend(sample_values.iter().copied()); + if sample_values.len() <= 1 { + values.push(*VECTOR_END_FLOAT); } } } - pub fn merge_variants(&mut self) { - let mut n = 0; - let mut n_processed = 0; - - let mut sample_records = vec![None; self.readers.len()]; - let mut heap = self.init_heap(); - while let Some(min_element) = heap.pop() { - let min_rid = min_element.record.rid().unwrap(); - let min_pos = min_element.record.pos(); - sample_records[min_element.reader_index] = Some(min_element.record); - - while let Some(peek_next_element) = heap.peek() { - if peek_next_element.record.rid().unwrap() == min_rid - && peek_next_element.record.pos() == min_pos - { - let next_element = heap.pop().unwrap(); - sample_records[next_element.reader_index] = Some(next_element.record); - } else { - break; - } - } - - if n >= self.skip_n { - log::info!("Processing: {}:{}", min_rid, min_pos); - if self.needs_padding { - let padding_base = self.get_padding_base( - min_rid, - min_pos, - &self.readers[min_element.reader_index].header, - ); - self.add_padding_base(&mut sample_records, padding_base); - } - - self.merge_variant(&sample_records); - n_processed += 1; - if n_processed >= self.process_n { - break; - } - } - n += 1; - - self.update_heap(&mut heap, &sample_records); - sample_records.fill(None); + fn process_string_field(&self, record: &Record, field_name: &[u8], values: &mut Vec>) { + let field = record.format(field_name).string().unwrap_or_else(|_| { + panic!( + "Error accessing FORMAT {}", + String::from_utf8_lossy(field_name) + ) + }); + for sample_value in field.iter() { + values.push(sample_value.to_vec()); } } - fn add_padding_base(&mut self, sample_records: &mut [Option], padding_base: Vec) { - for (index, record) in sample_records.iter_mut().enumerate() { - if self.readers[index].version.major < Version::new(1, 0, 0).major { - if let Some(record) = record { - let al_0 = record - .format(b"AL") - .integer() - .expect("Error accessing FORMAT AL")[0] + fn process_am_field(&self, record: &Record, ams: &mut Vec) { + let am_field = record.format(b"AM"); + match am_field.float() { + Ok(_) => self.process_float_field(record, b"AM", ams), + // Handle TRGT <=v0.4.0 + Err(_) => { + let int_field = record + .format(b"AM") + .integer() + .unwrap_or_else(|_| panic!("Error accessing FORMAT AM as an integer")); + for sample_am in int_field.iter() { + let converted_am: Vec = sample_am .iter() - .min() - .cloned() - .unwrap(); - // Zero-length allele records do not need to be updated - if al_0 != 0 { - let new_alleles: Vec> = record - .alleles() - .iter() - .map(|allele| { - let mut new_allele = padding_base.to_vec(); - new_allele.extend_from_slice(allele); - new_allele - }) - .collect(); - let new_alleles_refs: Vec<&[u8]> = - new_alleles.iter().map(|a| a.as_slice()).collect(); - record - .set_alleles(&new_alleles_refs) - .expect("Failed to set alleles") + .map(|&i| { + if i == i32::MIN { + *MISSING_FLOAT + } else { + i as f32 / 255.0 + } + }) + .collect(); + ams.extend(converted_am); + if sample_am.len() <= 1 { + ams.push(*VECTOR_END_FLOAT); } } } } } - - fn get_padding_base(&self, rid: u32, pos: i64, header: &HeaderView) -> Vec { - let chrom = header.rid2name(rid).unwrap(); - let chrom_str = std::str::from_utf8(chrom).expect("Invalid UTF-8 sequence"); - self.genome_reader - .fetch_seq(chrom_str, pos as usize, pos as usize) - .map(|seq| seq.to_vec()) - .ok() - .unwrap() - } } #[cfg(test)] @@ -545,7 +683,7 @@ mod tests { record3.set_pos(50); let mut record4 = reader.empty_record(); - record4.set_rid(Some(10)); + record4.set_rid(Some(1)); record4.set_pos(99); let mut heap = BinaryHeap::new(); @@ -572,14 +710,14 @@ mod tests { let r = heap.pop().unwrap(); assert_eq!(r.record.rid(), Some(1)); - assert_eq!(r.record.pos(), 100); + assert_eq!(r.record.pos(), 99); let r = heap.pop().unwrap(); assert_eq!(r.record.rid(), Some(1)); - assert_eq!(r.record.pos(), 2000); + assert_eq!(r.record.pos(), 100); let r = heap.pop().unwrap(); - assert_eq!(r.record.rid(), Some(10)); - assert_eq!(r.record.pos(), 99); + assert_eq!(r.record.rid(), Some(1)); + assert_eq!(r.record.pos(), 2000); } } diff --git a/src/merge/vcf_reader.rs b/src/merge/vcf_reader.rs index e5f6d64..3426392 100644 --- a/src/merge/vcf_reader.rs +++ b/src/merge/vcf_reader.rs @@ -2,47 +2,97 @@ use crate::utils::Result; use rust_htslib::bcf::{self, header::HeaderView, Header, HeaderRecord, Read}; use semver::Version; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, + io::{BufRead, BufReader, Seek}, path::{Path, PathBuf}, }; +const GZIP_MAGIC_NUMBER: [u8; 2] = [0x1f, 0x8b]; + +fn add_extension(path: &Path, ext: &str) -> PathBuf { + let mut file_name = path.file_name().unwrap().to_os_string(); + file_name.push("."); + file_name.push(ext); + path.with_file_name(file_name) +} + +fn is_indexed(file: &Path) -> bool { + let csi_index = add_extension(file, "csi"); + let tbi_index = add_extension(file, "tbi"); + csi_index.exists() || tbi_index.exists() +} + +fn validate_vcf_file(file: &Path) -> Result<()> { + let mut fp = std::fs::File::open(file).map_err(|e| format!("Failed to open file: {}", e))?; + let mut buffer = [0; 2]; + std::io::Read::read_exact(&mut fp, &mut buffer) + .map_err(|e| format!("Failed to read file: {}", e))?; + + if buffer != GZIP_MAGIC_NUMBER { + return Err(format!("File {} is not gzip compressed", file.display())); + } + + fp.rewind() + .map_err(|e| format!("Failed to rewind file: {}", e))?; + + let gz = flate2::read::GzDecoder::new(fp); + let mut reader = BufReader::new(gz); + let mut first_line = String::new(); + reader + .read_line(&mut first_line) + .map_err(|e| format!("Failed to read file: {}", e))?; + + if !first_line.trim().starts_with("##fileformat=VCFv") { + return Err(format!("File {} is not a valid VCF file", file.display())); + } + + Ok(()) +} + pub struct VcfReader { pub reader: bcf::IndexedReader, pub header: bcf::header::HeaderView, pub current_record: bcf::Record, pub version: Version, pub index: usize, + pub sample_n: u32, + pub file_path: String, } impl VcfReader { pub fn new(file: PathBuf, index: usize) -> Result { - log::info!("Start loading VCF {:?}", &file); - // TODO: Check if file is a VCF - // TODO: Check if indexed VCF - // TODO: Check if valid VCF + log::debug!("Start opening VCF {:?}", &file); + + validate_vcf_file(&file).map_err(|e| format!("Error validating VCF: {}", e))?; + + if !is_indexed(&file) { + return Err(format!("VCF file {} is not indexed", file.display())); + } + let reader = bcf::IndexedReader::from_path(&file) .map_err(|e| format!("Failed to open VCF file {}: {}", file.display(), e))?; let header = reader.header().clone(); let version = get_trgt_version(&header, &file)?; - log::debug!("{:?} has version: {}", file.file_name().unwrap(), version); + let sample_n = header.sample_count(); - if header.sample_count() > 1 { - return Err(format!( - "Unsupported: VCF file with multiple samples: {}", - file.display() - )); - } + log::debug!( + "{:?} has version: {}, samples n = {}", + file.file_name().unwrap(), + version, + sample_n + ); - // TODO: Create a normalized struct for variant records let current_record = reader.empty_record(); - log::info!("Finished loading VCF {:?}", &file); + log::debug!("Finished opening VCF {:?}", &file); Ok(VcfReader { reader, header, current_record, version, index, + sample_n, + file_path: file.to_string_lossy().into_owned(), }) } @@ -75,19 +125,18 @@ impl VcfReader { } fn get_trgt_version(vcf_header: &HeaderView, file: &Path) -> Result { - // TODO: Add logic to deal with merged TRGT VCFs (assume latest version?) let mut trgt_version = None; for record in vcf_header.header_records().iter() { if let HeaderRecord::Generic { key, value } = record { if key == "trgtVersion" { trgt_version = Some(value.clone()); - break; + // Don't break here, continue to find the last occurrence } } } - // If trgtVersion is not in the header its either a Result { for record in vcf_header.header_records().iter() { if let HeaderRecord::Format { key: _, values } = record { if let Some(id) = values.get("ID") { - if id == "ALLR" { - has_allr = true; - } - if id == "ALCI" { - has_alci = true; - } - if id == "AM" { - is_integer_am = values.get("Type").unwrap() == "Integer"; + match id.as_str() { + "ALLR" => has_allr = true, + "ALCI" => has_alci = true, + "AM" => { + if let Some(typ) = values.get("Type") { + is_integer_am = typ == "Integer"; + } + } + _ => {} } } } @@ -119,7 +169,8 @@ fn get_trgt_version(vcf_header: &HeaderView, file: &Path) -> Result { } } - let version = Version::parse(&trgt_version.unwrap()).unwrap(); + let version = Version::parse(&trgt_version.unwrap()) + .map_err(|e| format!("Failed to parse version: {}", e))?; Ok(version) } @@ -135,15 +186,46 @@ impl VcfReaders { .enumerate() .map(|(index, file)| VcfReader::new(file, index)) .collect::>>()?; + Ok(VcfReaders { readers }) } + pub fn get_contig_order(&self) -> Result> { + let mut contig_map: HashMap> = HashMap::new(); + let mut contig_order = Vec::new(); + for reader in &self.readers { + for record in reader.header.header_records() { + if let HeaderRecord::Contig { values, .. } = record { + let id = values.get("ID").unwrap().to_string(); + let length = values + .get("length") + .and_then(|l| l.parse::().ok()) + .unwrap_or(0); + let entry = contig_map.entry(id.clone()).or_insert_with(|| { + contig_order.push(id.clone()); + HashSet::new() + }); + entry.insert(length); + } + } + } + for id in &contig_order { + let lengths = contig_map.get(id).unwrap(); + if lengths.len() > 1 { + return Err(format!( + "Inconsistent contig definitions found in VCF headers: Contig '{}' is defined with multiple lengths: {:?}", + id, lengths + )); + } + } + Ok(contig_order) + } + pub fn merge_headers(&self, dst_header: &mut Header, force_samples: bool) -> Result<()> { let mut observed_sample_ids = HashSet::new(); for reader in &self.readers { let src_header = &reader.header; - // TODO: error handling unsafe { dst_header.inner = rust_htslib::htslib::bcf_hdr_merge(dst_header.inner, src_header.inner); diff --git a/src/trgt/workflows/tr.rs b/src/trgt/workflows/tr.rs index cadc6fe..7ab0e16 100644 --- a/src/trgt/workflows/tr.rs +++ b/src/trgt/workflows/tr.rs @@ -39,10 +39,6 @@ pub fn analyze( let (reads, spans) = get_spanning_reads(locus, params, reads); - if reads.is_empty() { - return Ok(LocusResult::empty()); - } - const MIN_RQ_FOR_PURITY: f64 = 0.9; let (reads, spans) = if params.min_read_qual < MIN_RQ_FOR_PURITY { let ret = filter_impure_trs(locus, &reads, &spans, MIN_RQ_FOR_PURITY); @@ -58,6 +54,10 @@ pub fn analyze( (reads, spans) }; + if reads.is_empty() { + return Ok(LocusResult::empty()); + } + let trs = reads .iter() .zip(spans.iter()) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index c8c1195..70eef20 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -18,4 +18,4 @@ pub use math::median; pub use ploidy::Ploidy; pub use readers::{open_catalog_reader, open_genome_reader}; pub use region::GenomicRegion; -pub use util::{handle_error_and_exit, Result}; +pub use util::{format_number_with_commas, handle_error_and_exit, Result}; diff --git a/src/utils/readers.rs b/src/utils/readers.rs index 5659f05..b926fa0 100644 --- a/src/utils/readers.rs +++ b/src/utils/readers.rs @@ -1,5 +1,5 @@ use super::Result; -use flate2::read::GzDecoder; +use flate2::read::MultiGzDecoder; use rust_htslib::faidx; use std::fs::File; use std::io::{BufReader, Read as ioRead}; @@ -12,7 +12,7 @@ pub fn open_catalog_reader(path: &Path) -> Result>> { } let file = File::open(path).map_err(|e| e.to_string())?; if is_gzipped(path) { - let gz_decoder = GzDecoder::new(file); + let gz_decoder = MultiGzDecoder::new(file); if gz_decoder.header().is_some() { Ok(BufReader::new(Box::new(gz_decoder))) } else { diff --git a/src/utils/util.rs b/src/utils/util.rs index fcecd8b..71411c3 100644 --- a/src/utils/util.rs +++ b/src/utils/util.rs @@ -1,6 +1,143 @@ +use std::fmt::{Binary, Display}; + pub type Result = std::result::Result; pub fn handle_error_and_exit(err: String) -> ! { log::error!("{}", err); std::process::exit(1); } + +pub fn format_number_with_commas(n: T) -> String +where + T: Display + Binary, +{ + let s = n.to_string(); + let (sign, digits) = s.strip_prefix('-').map_or(("", s.as_str()), |d| ("-", d)); + + if let 0..=3 = digits.len() { + return s; + } + + let mut result = String::with_capacity(digits.len() + (digits.len() - 1) / 3 + sign.len()); + for (digit_count, c) in digits.chars().rev().enumerate() { + if digit_count > 0 && digit_count % 3 == 0 { + result.push(','); + } + result.push(c); + } + + result = result.chars().rev().collect(); + if !sign.is_empty() { + result.insert_str(0, sign); + } + + result +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_number_with_commas_i8() { + assert_eq!(format_number_with_commas(0i8), "0"); + assert_eq!(format_number_with_commas(100i8), "100"); + assert_eq!(format_number_with_commas(-100i8), "-100"); + assert_eq!(format_number_with_commas(i8::MAX), "127"); + assert_eq!(format_number_with_commas(i8::MIN), "-128"); + } + + #[test] + fn test_format_number_with_commas_u8() { + assert_eq!(format_number_with_commas(0u8), "0"); + assert_eq!(format_number_with_commas(100u8), "100"); + assert_eq!(format_number_with_commas(u8::MAX), "255"); + } + + #[test] + fn test_format_number_with_commas_i16() { + assert_eq!(format_number_with_commas(0i16), "0"); + assert_eq!(format_number_with_commas(1_000i16), "1,000"); + assert_eq!(format_number_with_commas(-1_000i16), "-1,000"); + assert_eq!(format_number_with_commas(i16::MAX), "32,767"); + assert_eq!(format_number_with_commas(i16::MIN), "-32,768"); + } + + #[test] + fn test_format_number_with_commas_u16() { + assert_eq!(format_number_with_commas(0u16), "0"); + assert_eq!(format_number_with_commas(1_000u16), "1,000"); + assert_eq!(format_number_with_commas(u16::MAX), "65,535"); + } + + #[test] + fn test_format_number_with_commas_i32() { + assert_eq!(format_number_with_commas(0i32), "0"); + assert_eq!(format_number_with_commas(10_000i32), "10,000"); + assert_eq!(format_number_with_commas(-10_000i32), "-10,000"); + assert_eq!(format_number_with_commas(i32::MAX), "2,147,483,647"); + assert_eq!(format_number_with_commas(i32::MIN), "-2,147,483,648"); + } + + #[test] + fn test_format_number_with_commas_u32() { + assert_eq!(format_number_with_commas(0u32), "0"); + assert_eq!(format_number_with_commas(10_000u32), "10,000"); + assert_eq!(format_number_with_commas(u32::MAX), "4,294,967,295"); + } + + #[test] + fn test_format_number_with_commas_i64() { + assert_eq!(format_number_with_commas(0i64), "0"); + assert_eq!(format_number_with_commas(1_000_000i64), "1,000,000"); + assert_eq!(format_number_with_commas(-1_000_000i64), "-1,000,000"); + assert_eq!( + format_number_with_commas(i64::MAX), + "9,223,372,036,854,775,807" + ); + assert_eq!( + format_number_with_commas(i64::MIN), + "-9,223,372,036,854,775,808" + ); + } + + #[test] + fn test_format_number_with_commas_u64() { + assert_eq!(format_number_with_commas(0u64), "0"); + assert_eq!(format_number_with_commas(1_000_000u64), "1,000,000"); + assert_eq!( + format_number_with_commas(u64::MAX), + "18,446,744,073,709,551,615" + ); + } + + #[test] + fn test_format_number_with_commas_usize() { + assert_eq!(format_number_with_commas(0usize), "0"); + assert_eq!( + format_number_with_commas(1_234_567_890usize), + "1,234,567,890" + ); + assert_eq!( + format_number_with_commas(usize::MAX), + "18,446,744,073,709,551,615" + ); + } + + #[test] + fn test_format_number_with_commas_isize() { + assert_eq!(format_number_with_commas(0isize), "0"); + assert_eq!( + format_number_with_commas(-1_234_567_890isize), + "-1,234,567,890" + ); + assert_eq!( + format_number_with_commas(isize::MAX), + "9,223,372,036,854,775,807" + ); + assert_eq!( + format_number_with_commas(isize::MIN), + "-9,223,372,036,854,775,808" + ); + } +}