Skip to content

Commit

Permalink
MRG: integrate sketching utils from directsketch (#516)
Browse files Browse the repository at this point in the history
* add buildutils

* try using buildutils

* rename handle

* add params printing; better moltype filtering

* comment out tests for now

* update tests for new buildrecord default strategy

* require moltype in param str

* MRG: buildutils for singlesketch (#523)

* buildutils for singlesketch

* consolidate sig building

* consolidate sig writing

* gz + zip writing!

* check gz is actually gz

* black formatting

* clippy fixes; clean up

* also print sig templates for singlesketch

* upd tests for moltype requirement

* select in-place; rm unused filter_empty impl
  • Loading branch information
bluegenes authored Nov 20, 2024
1 parent fe07de9 commit c8229d5
Show file tree
Hide file tree
Showing 7 changed files with 1,559 additions and 338 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rustworkx-core = "0.15.1"
streaming-stats = "0.2.3"
rust_decimal = { version = "1.36.0", features = ["maths"] }
rust_decimal_macros = "1.36.0"
getset = "0.1"

[dev-dependencies]
assert_cmd = "2.0.16"
Expand Down
176 changes: 70 additions & 106 deletions src/manysketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,13 @@
use anyhow::{anyhow, Result};
use rayon::prelude::*;

use crate::utils::{load_fasta_fromfile, parse_params_str, sigwriter, Params};
use camino::Utf8Path as Path;
use needletail::parse_fastx_file;
use sourmash::cmd::ComputeParameters;
use sourmash::signature::Signature;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

pub fn build_siginfo(params: &[Params], input_moltype: &str) -> Vec<Signature> {
let mut sigs = Vec::new();

for param in params.iter().cloned() {
match input_moltype {
// if dna, only build dna sigs. if protein, only build protein sigs, etc
"dna" | "DNA" if !param.is_dna => continue,
"protein" if !param.is_protein && !param.is_dayhoff && !param.is_hp => continue,
_ => (),
}

// Adjust ksize value based on the is_protein flag
let adjusted_ksize = if param.is_protein || param.is_dayhoff || param.is_hp {
param.ksize * 3
} else {
param.ksize
};

let cp = ComputeParameters::builder()
.ksizes(vec![adjusted_ksize])
.scaled(param.scaled)
.protein(param.is_protein)
.dna(param.is_dna)
.dayhoff(param.is_dayhoff)
.hp(param.is_hp)
.num_hashes(param.num)
.track_abundance(param.track_abundance)
.build();

let sig = Signature::from_params(&cp);
sigs.push(sig);
}

sigs
}
use crate::utils::buildutils::{BuildCollection, MultiSelect, MultiSelection};
use crate::utils::{load_fasta_fromfile, zipwriter_handle};

pub fn manysketch(
filelist: String,
Expand All @@ -71,25 +35,25 @@ pub fn manysketch(
bail!("Output must be a zip file.");
}

// set up a multi-producer, single-consumer channel that receives Signature
// set up a multi-producer, single-consumer channel that receives BuildCollection
let (send, recv) =
std::sync::mpsc::sync_channel::<Option<Vec<Signature>>>(rayon::current_num_threads());
// need to use Arc so we can write the manifest after all sigs have written
// let send = std::sync::Arc::new(send);
std::sync::mpsc::sync_channel::<Option<BuildCollection>>(rayon::current_num_threads());

// & spawn a thread that is dedicated to printing to a buffered output
let thrd = sigwriter(recv, output);
let thrd = zipwriter_handle(recv, output);

// parse param string into params_vec, print error if fail
let param_result = parse_params_str(param_str);
let params_vec = match param_result {
Ok(params) => params,
// params --> buildcollection
let sig_template_result = BuildCollection::from_param_str(param_str.as_str());
let sig_templates = match sig_template_result {
Ok(sig_templates) => sig_templates,
Err(e) => {
eprintln!("Error parsing params string: {}", e);
bail!("Failed to parse params string");
bail!("Failed to parse params string: {}", e);
}
};

// print sig templates to build
let _params = sig_templates.summarize_params();

// iterate over filelist_paths
let processed_fastas = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);
Expand All @@ -103,22 +67,23 @@ pub fn manysketch(
.filter_map(|fastadata| {
let name = &fastadata.name;
let filenames = &fastadata.paths;
let moltype = &fastadata.input_type;
// build sig templates for these sketches from params, check if there are sigs to build
let sig_templates = build_siginfo(&params_vec, moltype);
let input_moltype = &fastadata.input_type;
let mut sigs = sig_templates.clone();
// filter sig templates for this fasta by moltype
// atm, we only do DNA->DNA, prot->prot Future -- figure out if we need to modify to allow translate/skip
let multiselection = MultiSelection::from_input_moltype(input_moltype.as_str())
.expect("could not build selection from input moltype");

sigs.select(&multiselection)
.expect("could not select on sig_templates");

// if no sigs to build, skip this iteration
if sig_templates.is_empty() {
if sigs.is_empty() {
skipped_paths.fetch_add(filenames.len(), atomic::Ordering::SeqCst);
processed_fastas.fetch_add(1, atomic::Ordering::SeqCst);
return None;
}

let mut sigs = sig_templates.clone();
// have name / filename been set for each sig yet?
let mut set_name = false;
// if merging multiple files, sourmash sets filename as last filename
let last_filename = filenames.last().unwrap();

for filename in filenames {
// increment processed_fastas counter; make 1-based for % reporting
let i = processed_fastas.fetch_add(1, atomic::Ordering::SeqCst);
Expand All @@ -132,60 +97,59 @@ pub fn manysketch(
percent_processed
);
}

// Open fasta file reader
let mut reader = match parse_fastx_file(filename) {
Ok(r) => r,
Err(err) => {
eprintln!("Error opening file {}: {:?}", filename, err);
failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
return None;
}
};

// parse fasta and add to signature
while let Some(record_result) = reader.next() {
match record_result {
Ok(record) => {
// do we need to normalize to make sure all the bases are consistently capitalized?
// let norm_seq = record.normalize(false);
sigs.iter_mut().for_each(|sig| {
if singleton {
let record_name = std::str::from_utf8(record.id())
.expect("could not get record id");
sig.set_name(record_name);
sig.set_filename(filename.as_str());
} else if !set_name {
sig.set_name(name);
// sourmash sets filename to last filename if merging fastas
sig.set_filename(last_filename.as_str());
};
if moltype == "protein" {
sig.add_protein(&record.seq())
.expect("Failed to add protein");
} else {
sig.add_sequence(&record.seq(), true)
.expect("Failed to add sequence");
// if not force, panics with 'N' in dna sequence
if singleton {
// Open fasta file reader
let mut reader = match parse_fastx_file(filename) {
Ok(r) => r,
Err(err) => {
eprintln!("Error opening file {}: {:?}", filename, err);
failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
return None;
}
};

while let Some(record_result) = reader.next() {
match record_result {
Ok(record) => {
if let Err(err) = sigs.build_singleton_sigs(
record,
input_moltype,
filename.to_string(),
) {
eprintln!(
"Error building signatures from file: {}, {:?}",
filename, err
);
// do we want to keep track of singleton sigs that fail? if so, how?
}
// send singleton sigs for writing
if let Err(e) = send.send(Some(sigs)) {
eprintln!("Unable to send internal data: {:?}", e);
return None;
}
});
if !set_name {
set_name = true;
sigs = sig_templates.clone();
}
Err(err) => eprintln!("Error while processing record: {:?}", err),
}
Err(err) => eprintln!("Error while processing record: {:?}", err),
}
if singleton {
// write sigs immediately to avoid memory issues
if let Err(e) = send.send(Some(sigs.clone())) {
eprintln!("Unable to send internal data: {:?}", e);
return None;
} else {
match sigs.build_sigs_from_file_or_stdin(
input_moltype,
name.clone(),
filename.to_string(),
) {
Ok(_record_count) => {}
Err(err) => {
eprintln!(
"Error building signatures from file: {}, {:?}",
filename, err
);
failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
sigs = sig_templates.clone();
}
}
}
// if singleton sketches, they have already been written; only write aggregate sketches
// if singleton sketches, they have already been written; only send aggregated sketches to be written
if singleton {
None
} else {
Expand All @@ -194,7 +158,7 @@ pub fn manysketch(
})
.try_for_each_with(
send.clone(),
|s: &mut std::sync::mpsc::SyncSender<Option<Vec<Signature>>>, sigs| {
|s: &mut std::sync::mpsc::SyncSender<Option<BuildCollection>>, sigs| {
if let Err(e) = s.send(Some(sigs)) {
Err(format!("Unable to send internal data: {:?}", e))
} else {
Expand Down
92 changes: 65 additions & 27 deletions src/python/tests/test_sketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def test_manysketch_bad_fa_csv_2(runtmp, capfd):
captured = capfd.readouterr()
print(captured.err)
assert "Could not load fasta files: no signatures created." in captured.err
assert "Error opening file bad2.fa: ParseError" in captured.err
assert "Error building signatures from file: bad2.fa" in captured.err


def test_manysketch_bad_fa_csv_3(runtmp, capfd):
Expand Down Expand Up @@ -453,32 +453,7 @@ def test_manysketch_bad_param_str_moltype(runtmp, capfd):
captured = capfd.readouterr()
print(captured.err)
assert (
"Error parsing params string: No moltype provided in params string k=31,scaled=100"
in captured.err
)
assert "Failed to parse params string" in captured.err


def test_manysketch_bad_param_str_ksize(runtmp, capfd):
# no ksize provided in param str
fa_csv = runtmp.output("db-fa.txt")

fa1 = get_test_data("short.fa")
fa2 = get_test_data("short2.fa")
fa3 = get_test_data("short3.fa")

make_assembly_csv(fa_csv, [fa1, fa2, fa3])
output = runtmp.output("out.zip")

with pytest.raises(utils.SourmashCommandFailed):
runtmp.sourmash(
"scripts", "manysketch", fa_csv, "-o", output, "-p", "dna,scaled=100"
)

captured = capfd.readouterr()
print(captured.err)
assert (
"Error parsing params string: No ksizes provided in params string dna,scaled=100"
"Error parsing params string 'k=31,scaled=100': No moltype provided"
in captured.err
)
assert "Failed to parse params string" in captured.err
Expand Down Expand Up @@ -1397,3 +1372,66 @@ def test_singlesketch_multimoltype_fail(runtmp):
"-p",
"protein,dna,k=7",
)


def test_singlesketch_gzipped_output(runtmp):
"""Test singlesketch with gzipped output."""
fa1 = get_test_data("short.fa")
output = runtmp.output("short.sig.gz")

# Run the singlesketch command
runtmp.sourmash("scripts", "singlesketch", fa1, "-o", output)

# Check if the output exists and contains the expected data
assert os.path.exists(output)

# Verify the file is gzipped
import gzip

try:
with gzip.open(output, "rt") as f:
f.read(1) # Try to read a single character to ensure it's valid gzip
except gzip.BadGzipFile:
assert False, f"Output file {output} is not a valid gzipped file."

# check the signatures
sig = sourmash.load_one_signature(output)

assert sig.name == "short.fa"
assert sig.minhash.ksize == 31
assert sig.minhash.is_dna
assert sig.minhash.scaled == 1000

# validate against sourmash sketch
output2 = runtmp.output("short2.sig")
runtmp.sourmash("sketch", "dna", fa1, "-o", output2)
sig2 = sourmash.load_one_signature(output2)
assert sig.minhash.hashes == sig2.minhash.hashes


def test_singlesketch_zip_output(runtmp):
"""Test singlesketch with zip output."""
fa1 = get_test_data("short.fa")
output = runtmp.output("short.zip")

# Run the singlesketch command
runtmp.sourmash("scripts", "singlesketch", fa1, "-o", output)

# Check if the output exists and contains the expected data
assert os.path.exists(output)
idx = sourmash.load_file_as_index(output)
sigs = list(idx.signatures())
assert len(sigs) == 1
print(sigs)
sig = sigs[0]

assert sig.name == "short.fa"
assert sig.minhash.ksize == 31
assert sig.minhash.is_dna
assert sig.minhash.scaled == 1000

# validate against sourmash sketch
output2 = runtmp.output("short2.sig")
runtmp.sourmash("sketch", "dna", fa1, "-o", output2)
sig2 = sourmash.load_one_signature(output2)
assert sig.minhash.hashes == sig2.minhash.hashes
Loading

0 comments on commit c8229d5

Please sign in to comment.