diff --git a/Cargo.lock b/Cargo.lock index 98dca2e4b78145..f0603e177b8d42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15306,6 +15306,7 @@ dependencies = [ "serde_json", "sui-config", "sui-core", + "sui-indexer-alt-framework", "sui-protocol-config", "sui-storage", "sui-types", diff --git a/crates/sui-snapshot/Cargo.toml b/crates/sui-snapshot/Cargo.toml index cdc3db01304bc0..f223db60dd5a00 100644 --- a/crates/sui-snapshot/Cargo.toml +++ b/crates/sui-snapshot/Cargo.toml @@ -25,6 +25,7 @@ prometheus.workspace = true sui-types.workspace = true sui-config.workspace = true sui-core.workspace = true +sui-indexer-alt-framework.workspace = true sui-storage.workspace = true sui-protocol-config.workspace = true fastcrypto = { workspace = true, features = ["copy_key"] } diff --git a/crates/sui-snapshot/src/reader.rs b/crates/sui-snapshot/src/reader.rs index befe6a857a42db..b22abf56a2acf7 100644 --- a/crates/sui-snapshot/src/reader.rs +++ b/crates/sui-snapshot/src/reader.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use sui_config::object_storage_config::ObjectStoreConfig; use sui_core::authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject}; use sui_core::authority::AuthorityStore; +use sui_indexer_alt_framework::task::TrySpawnStreamExt; use sui_storage::blob::{Blob, BlobEncoding}; use sui_storage::object_store::http::HttpDownloaderBuilder; use sui_storage::object_store::util::{copy_file, copy_files, path_to_filesystem}; @@ -41,6 +42,7 @@ use tracing::{error, info}; pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator); pub type DigestByBucketAndPartition = BTreeMap>; pub type Sha3DigestType = Arc>>>; +#[derive(Clone)] pub struct StateSnapshotReaderV1 { epoch: u64, local_staging_dir_root: PathBuf, @@ -235,33 +237,54 @@ impl StateSnapshotReaderV1 { ), ); - for (bucket, part_files) in self.ref_files.clone().iter() { - for (part, _part_file) in part_files.iter() { - let mut sha3_digests = sha3_digests.lock().await; - let ref_iter = self.ref_iter(*bucket, *part)?; - let mut hasher = Sha3_256::default(); - let mut empty = true; - self.object_files - .get(bucket) - .context(format!("No bucket exists for: {bucket}"))? - .get(part) - .context(format!("No part exists for bucket: {bucket}, part: {part}"))?; - for object_ref in ref_iter { - hasher.update(object_ref.2.inner()); - empty = false; - } - if !empty { - sha3_digests - .entry(*bucket) - .or_insert(BTreeMap::new()) - .entry(*part) - .or_insert(hasher.finalize().digest); + let ref_files_iter = self.ref_files.clone().into_iter(); + futures::stream::iter(ref_files_iter) + .flat_map(|(bucket, part_files)| { + futures::stream::iter( + part_files + .into_iter() + .map(move |(part, part_file)| (bucket, part, part_file)), + ) + }) + .try_for_each_spawned(self.concurrency, |(bucket, part, _part_file)| { + let sha3_digests = sha3_digests.clone(); + let object_files = self.object_files.clone(); + let bar = checksum_progress_bar.clone(); + let this = self.clone(); + + async move { + let ref_iter = this.ref_iter(bucket, part)?; + let mut hasher = Sha3_256::default(); + let mut empty = true; + + object_files + .get(&bucket) + .context(format!("No bucket exists for: {bucket}"))? + .get(&part) + .context(format!("No part exists for bucket: {bucket}, part: {part}"))?; + + for object_ref in ref_iter { + hasher.update(object_ref.2.inner()); + empty = false; + } + + if !empty { + let mut digests = sha3_digests.lock().await; + digests + .entry(bucket) + .or_insert(BTreeMap::new()) + .entry(part) + .or_insert(hasher.finalize().digest); + } + + bar.inc(1); + bar.set_message(format!("Bucket: {}, Part: {}", bucket, part)); + Ok::<(), anyhow::Error>(()) } - checksum_progress_bar.inc(1); - checksum_progress_bar.set_message(format!("Bucket: {}, Part: {}", bucket, part)); - } - } + }) + .await?; checksum_progress_bar.finish_with_message("Checksumming complete"); + info!("Checksumming complete"); Ok((sha3_digests, num_part_files)) }