Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: update zip crate to 2.0 #385

Merged
merged 9 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ updates:
allow:
- dependency-type: "direct"
open-pull-requests-limit: 10
ignore:
- dependency-name: "zip"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
Expand Down
47 changes: 40 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ log = "0.4.22"
env_logger = { version = "0.11.3", optional = true }
simple-error = "0.3.1"
anyhow = "1.0.86"
zip = { version = "0.6", default-features = false, features = ["deflate"] }
zip = { version = "2.0", default-features = false }
tempfile = "3.10"
needletail = "0.5.1"
csv = "1.3.0"
Expand Down
23 changes: 11 additions & 12 deletions src/manysketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use anyhow::{anyhow, Result};
use rayon::prelude::*;

use crate::utils::{load_fasta_fromfile, sigwriter, Params, ZipMessage};
use crate::utils::{load_fasta_fromfile, sigwriter, Params};
use camino::Utf8Path as Path;
use needletail::parse_fastx_file;
use sourmash::cmd::ComputeParameters;
Expand Down Expand Up @@ -139,9 +139,10 @@ pub fn manysketch(
}

// set up a multi-producer, single-consumer channel that receives Signature
let (send, recv) = std::sync::mpsc::sync_channel::<ZipMessage>(rayon::current_num_threads());
let (send, recv) =
std::sync::mpsc::sync_channel::<Option<Vec<Signature>>>(rayon::current_num_threads());
// need to use Arc so we can write the manifest after all sigs have written
let send = std::sync::Arc::new(send);
// let send = std::sync::Arc::new(send);

// & spawn a thread that is dedicated to printing to a buffered output
let thrd = sigwriter(recv, output);
Expand Down Expand Up @@ -243,7 +244,7 @@ pub fn manysketch(
}
if singleton {
// write sigs immediately to avoid memory issues
if let Err(e) = send.send(ZipMessage::SignatureData(sigs.clone())) {
if let Err(e) = send.send(Some(sigs.clone())) {
eprintln!("Unable to send internal data: {:?}", e);
return None;
}
Expand All @@ -260,21 +261,19 @@ pub fn manysketch(
})
.try_for_each_with(
send.clone(),
|s: &mut std::sync::Arc<std::sync::mpsc::SyncSender<ZipMessage>>, sigs| {
if let Err(e) = s.send(ZipMessage::SignatureData(sigs)) {
|s: &mut std::sync::mpsc::SyncSender<Option<Vec<Signature>>>, sigs| {
if let Err(e) = s.send(Some(sigs)) {
Err(format!("Unable to send internal data: {:?}", e))
} else {
Ok(())
}
},
);

// After the parallel work, send the WriteManifest message
std::sync::Arc::try_unwrap(send)
.unwrap()
.send(ZipMessage::WriteManifest)
.unwrap();

// Send None to sigwriter to signal completion + write manifest
if let Err(e) = send.send(None) {
eprintln!("Unable to send completion signal: {:?}", e);
}
// do some cleanup and error handling -
if let Err(e) = send_result {
eprintln!("Error during parallel processing: {}", e);
Expand Down
41 changes: 19 additions & 22 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::io::{BufRead, BufReader, BufWriter, Write};
use std::panic;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use zip::write::{ExtendedFileOptions, FileOptions, ZipWriter};
use zip::CompressionMethod;

use sourmash::ani_utils::{ani_ci_from_containment, ani_from_containment};
use sourmash::collection::Collection;
Expand Down Expand Up @@ -1277,32 +1279,30 @@ impl Hash for Params {
}
}

pub enum ZipMessage {
SignatureData(Vec<Signature>),
WriteManifest,
}

pub fn sigwriter(
recv: std::sync::mpsc::Receiver<ZipMessage>,
recv: std::sync::mpsc::Receiver<Option<Vec<Signature>>>,
output: String,
) -> std::thread::JoinHandle<Result<()>> {
std::thread::spawn(move || -> Result<()> {
// cast output as pathbuf
// cast output as PathBuf
let outpath: PathBuf = output.into();

let file_writer = open_output_file(&outpath);

let options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Stored)
let options = FileOptions::default()
.compression_method(CompressionMethod::Stored)
.unix_permissions(0o644)
.large_file(true);
let mut zip = zip::ZipWriter::new(file_writer);

let mut zip = ZipWriter::new(file_writer);
let mut manifest_rows: Vec<Record> = Vec::new();
// keep track of md5sum occurrences to prevent overwriting duplicates
// keep track of MD5 sum occurrences to prevent overwriting duplicates
let mut md5sum_occurrences: HashMap<String, usize> = HashMap::new();

// Process all incoming signatures
while let Ok(message) = recv.recv() {
match message {
ZipMessage::SignatureData(sigs) => {
Some(sigs) => {
for sig in sigs.iter() {
let md5sum_str = sig.md5sum();
let count = md5sum_occurrences.entry(md5sum_str.clone()).or_insert(0);
Expand All @@ -1312,22 +1312,19 @@ pub fn sigwriter(
} else {
format!("signatures/{}.sig.gz", md5sum_str)
};
write_signature(sig, &mut zip, options, &sig_filename);
write_signature(sig, &mut zip, options.clone(), &sig_filename);
let records: Vec<Record> = Record::from_sig(sig, sig_filename.as_str());
manifest_rows.extend(records);
}
}
ZipMessage::WriteManifest => {
None => {
// Write the manifest and finish the ZIP file
println!("Writing manifest");
// Start the CSV file inside the zip
zip.start_file("SOURMASH-MANIFEST.csv", options).unwrap();
zip.start_file("SOURMASH-MANIFEST.csv", options)?;
let manifest: Manifest = manifest_rows.clone().into();
manifest.to_writer(&mut zip)?;

// Properly finish writing to the ZIP file
if let Err(e) = zip.finish() {
eprintln!("Error finalizing ZIP file: {:?}", e);
}
zip.finish()?;
break;
}
}
}
Expand Down Expand Up @@ -1357,7 +1354,7 @@ pub fn csvwriter_thread<T: Serialize + Send + 'static>(
pub fn write_signature(
sig: &Signature,
zip: &mut zip::ZipWriter<BufWriter<File>>,
zip_options: zip::write::FileOptions,
zip_options: zip::write::FileOptions<ExtendedFileOptions>,
sig_filename: &str,
) {
let wrapped_sig = vec![sig];
Expand Down