From 57ea600c64fa5f84d9ed64d0e88f32d49911d4fd Mon Sep 17 00:00:00 2001 From: Blaise Bruer Date: Fri, 6 Sep 2024 01:25:51 -0500 Subject: [PATCH] Cleanup digest function to use u64 instead of i64 It is weird that we use i64 in the code base for this in the first place. There is some historical reason for having it this way, specifically around keeping it in sync with the Digest proto, but once converted we don't need to do this. Also implement a more standard format/display function for DigestInfo using the common '{hash}-{size}' format. Removes direct access to modify the internals of DigestInfo and requires member access to go through accessors. --- nativelink-service/src/bytestream_server.rs | 2 +- nativelink-service/src/cas_server.rs | 4 +- nativelink-service/tests/cas_server_test.rs | 36 +----- nativelink-store/src/cas_utils.rs | 2 +- nativelink-store/src/dedup_store.rs | 8 +- nativelink-store/src/existence_cache_store.rs | 2 +- nativelink-store/src/filesystem_store.rs | 6 +- nativelink-store/src/grpc_store.rs | 8 +- nativelink-store/src/shard_store.rs | 18 +-- .../src/size_partitioning_store.rs | 12 +- nativelink-store/src/verify_store.rs | 12 +- nativelink-store/tests/cas_utils_test.rs | 20 +-- .../tests/compression_store_test.rs | 5 +- .../tests/fast_slow_store_test.rs | 6 +- .../tests/filesystem_store_test.rs | 52 ++------ nativelink-store/tests/memory_store_test.rs | 10 +- nativelink-store/tests/redis_store_test.rs | 10 +- nativelink-store/tests/s3_store_test.rs | 10 +- nativelink-util/src/action_messages.rs | 12 +- nativelink-util/src/common.rs | 120 +++++++++++++++--- nativelink-util/src/digest_hasher.rs | 6 +- nativelink-util/src/store_trait.rs | 2 +- .../tests/proto_stream_utils_test.rs | 4 +- .../src/running_actions_manager.rs | 9 +- 24 files changed, 184 insertions(+), 192 deletions(-) diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index 5e30ca86b..71b7eff7a 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -253,7 +253,7 @@ impl ByteStreamServer { digest, rx, UploadSizeInfo::ExactSize( - usize::try_from(digest.size_bytes).err_tip(|| "Invalid digest size")?, + usize::try_from(digest.size_bytes()).err_tip(|| "Invalid digest size")?, ), ) .await diff --git a/nativelink-service/src/cas_server.rs b/nativelink-service/src/cas_server.rs index 5eb062d4c..372e1e2d2 100644 --- a/nativelink-service/src/cas_server.rs +++ b/nativelink-service/src/cas_server.rs @@ -123,7 +123,7 @@ impl CasServer { .err_tip(|| "Digest not found in request")?; let request_data = request.data; let digest_info = DigestInfo::try_from(digest.clone())?; - let size_bytes = usize::try_from(digest_info.size_bytes) + let size_bytes = usize::try_from(digest_info.size_bytes()) .err_tip(|| "Digest size_bytes was not convertible to usize")?; error_if!( size_bytes != request_data.len(), @@ -280,7 +280,7 @@ impl CasServer { // `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest. // It will be an empty string when it reached the end of the directory tree. let next_page_token: String = if let Some(value) = deque.front() { - format!("{}-{}", value.hash_str(), value.size_bytes) + format!("{value}") } else { String::new() }; diff --git a/nativelink-service/tests/cas_server_test.rs b/nativelink-service/tests/cas_server_test.rs index 65e0219af..9d1128197 100644 --- a/nativelink-service/tests/cas_server_test.rs +++ b/nativelink-service/tests/cas_server_test.rs @@ -387,11 +387,7 @@ async fn get_tree_read_directories_without_paging() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box(digest: impl Into>) -> bool { match digest.into() { - StoreKey::Digest(digest) => digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(&digest), + StoreKey::Digest(digest) => digest.size_bytes() == 0 && ZERO_BYTE_DIGESTS.contains(&digest), _ => false, } } diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index bfeaa8815..d497451e6 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -128,9 +128,7 @@ impl DedupStore { let digests: Vec<_> = index_entries .entries .into_iter() - .map(|index_entry| { - DigestInfo::new(index_entry.packed_hash, index_entry.size_bytes).into() - }) + .map(StoreKey::Digest) .collect(); let mut sum = 0; for size in self.content_store.has_many(&digests).await? { @@ -181,7 +179,7 @@ impl StoreDriver for DedupStore { .map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc")) .map_ok(|frame| async move { let hash = blake3::hash(&frame[..]).into(); - let index_entry = DigestInfo::new(hash, frame.len() as i64); + let index_entry = DigestInfo::new(hash, frame.len() as u64); if self .content_store .has(index_entry) @@ -266,7 +264,7 @@ impl StoreDriver for DedupStore { let mut entries = Vec::with_capacity(index_entries.entries.len()); for entry in index_entries.entries { let first_byte = current_entries_sum; - let entry_size = usize::try_from(entry.size_bytes) + let entry_size = usize::try_from(entry.size_bytes()) .err_tip(|| "Failed to convert to usize in DedupStore")?; current_entries_sum += entry_size; // Filter any items who's end byte is before the first requested byte. diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index b4a84e6a4..43b925fbf 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -209,7 +209,7 @@ impl StoreDriver for ExistenceCacheStore { .get_part(digest, writer, offset, length) .await; if result.is_ok() { - let size = usize::try_from(digest.size_bytes) + let size = usize::try_from(digest.size_bytes()) .err_tip(|| "Could not convert size_bytes in ExistenceCacheStore::get_part")?; let _ = self .existence_cache diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 541740d25..6b7fa3e1b 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -132,7 +132,7 @@ impl Drop for EncodedFilePath { #[inline] fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString { - format!("{}/{}-{}", folder, digest.hash_str(), digest.size_bytes).into() + format!("{folder}/{digest}").into() } pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static { @@ -301,11 +301,13 @@ impl Debug for FileEntryImpl { fn make_temp_digest(digest: &mut DigestInfo) { static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0); - digest.packed_hash[24..].clone_from_slice( + let mut hash = *digest.packed_hash(); + hash[24..].clone_from_slice( &DELETE_FILE_COUNTER .fetch_add(1, Ordering::Relaxed) .to_le_bytes(), ); + digest.set_packed_hash(hash); } impl LenEntry for FileEntryImpl { diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index 6f0e38d7a..8366d7638 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -565,7 +565,7 @@ impl StoreDriver for GrpcStore { { match missing_digests.binary_search(&digest) { Ok(_) => *result = None, - Err(_) => *result = Some(usize::try_from(digest.size_bytes)?), + Err(_) => *result = Some(usize::try_from(digest.size_bytes())?), } } @@ -589,7 +589,7 @@ impl StoreDriver for GrpcStore { &self.instance_name, Uuid::new_v4().hyphenated().encode_lower(&mut buf), digest.hash_str(), - digest.size_bytes, + digest.size_bytes(), ); struct LocalState { @@ -666,7 +666,7 @@ impl StoreDriver for GrpcStore { } // Shortcut for empty blobs. - if digest.size_bytes == 0 { + if digest.size_bytes() == 0 { return writer.send_eof(); } @@ -674,7 +674,7 @@ impl StoreDriver for GrpcStore { "{}/blobs/{}/{}", &self.instance_name, digest.hash_str(), - digest.size_bytes, + digest.size_bytes(), ); struct LocalState<'a> { diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 1dc4e18a4..7dc6761c9 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -93,30 +93,30 @@ impl ShardStore { // array length. They optimize especially well when the optimizer can easily determine // the slice length, e.g. <[u8; 4]>::try_from(&slice[4..8]).unwrap(). Array implements // TryFrom returning. - let size_bytes = digest.size_bytes.to_le_bytes(); + let size_bytes = digest.size_bytes().to_le_bytes(); 0.bitxor(u32::from_le_bytes( - digest.packed_hash[0..4].try_into().unwrap(), + digest.packed_hash()[0..4].try_into().unwrap(), )) .bitxor(u32::from_le_bytes( - digest.packed_hash[4..8].try_into().unwrap(), + digest.packed_hash()[4..8].try_into().unwrap(), )) .bitxor(u32::from_le_bytes( - digest.packed_hash[8..12].try_into().unwrap(), + digest.packed_hash()[8..12].try_into().unwrap(), )) .bitxor(u32::from_le_bytes( - digest.packed_hash[12..16].try_into().unwrap(), + digest.packed_hash()[12..16].try_into().unwrap(), )) .bitxor(u32::from_le_bytes( - digest.packed_hash[16..20].try_into().unwrap(), + digest.packed_hash()[16..20].try_into().unwrap(), )) .bitxor(u32::from_le_bytes( - digest.packed_hash[20..24].try_into().unwrap(), + digest.packed_hash()[20..24].try_into().unwrap(), )) .bitxor(u32::from_le_bytes( - digest.packed_hash[24..28].try_into().unwrap(), + digest.packed_hash()[24..28].try_into().unwrap(), )) .bitxor(u32::from_le_bytes( - digest.packed_hash[28..32].try_into().unwrap(), + digest.packed_hash()[28..32].try_into().unwrap(), )) .bitxor(u32::from_le_bytes(size_bytes[0..4].try_into().unwrap())) .bitxor(u32::from_le_bytes(size_bytes[4..8].try_into().unwrap())) diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 2ac1cde7c..3d319beaf 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -26,7 +26,7 @@ use tokio::join; #[derive(MetricsComponent)] pub struct SizePartitioningStore { #[metric(help = "Size to partition our data")] - partition_size: i64, + partition_size: u64, #[metric(group = "lower_store")] lower_store: Store, #[metric(group = "upper_store")] @@ -40,7 +40,7 @@ impl SizePartitioningStore { upper_store: Store, ) -> Arc { Arc::new(SizePartitioningStore { - partition_size: config.size as i64, + partition_size: config.size, lower_store, upper_store, }) @@ -61,7 +61,7 @@ impl StoreDriver for SizePartitioningStore { non_digest_sample = Some(k.borrow().into_owned()); return false; }; - digest.size_bytes < self.partition_size + digest.size_bytes() < self.partition_size }); if let Some(non_digest) = non_digest_sample { return Err(make_input_err!( @@ -110,7 +110,7 @@ impl StoreDriver for SizePartitioningStore { )) } }; - if digest.size_bytes < self.partition_size { + if digest.size_bytes() < self.partition_size { return self.lower_store.update(digest, reader, size_info).await; } self.upper_store.update(digest, reader, size_info).await @@ -131,7 +131,7 @@ impl StoreDriver for SizePartitioningStore { )) } }; - if digest.size_bytes < self.partition_size { + if digest.size_bytes() < self.partition_size { return self .lower_store .get_part(digest, writer, offset, length) @@ -150,7 +150,7 @@ impl StoreDriver for SizePartitioningStore { StoreKey::Digest(digest) => digest, _ => return self, }; - if digest.size_bytes < self.partition_size { + if digest.size_bytes() < self.partition_size { return self.lower_store.inner_store(Some(digest)); } self.upper_store.inner_store(Some(digest)) diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index c5178c397..8d9c87358 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -61,7 +61,7 @@ impl VerifyStore { mut tx: DropCloserWriteHalf, mut rx: DropCloserReadHalf, maybe_expected_digest_size: Option, - original_hash: [u8; 32], + original_hash: &[u8; 32], mut maybe_hasher: Option<&mut D>, ) -> Result<(), Error> { let mut sum_size: u64 = 0; @@ -114,7 +114,8 @@ impl VerifyStore { } } if let Some(hasher) = maybe_hasher.as_mut() { - let hash_result: [u8; 32] = hasher.finalize_digest().packed_hash; + let digest = hasher.finalize_digest(); + let hash_result = digest.packed_hash(); if original_hash != hash_result { self.hash_verification_failures.inc(); return Err(make_input_err!( @@ -167,15 +168,14 @@ impl StoreDriver for VerifyStore { )); } }; - let digest_size = u64::try_from(digest.size_bytes) - .err_tip(|| "Digest size_bytes was not convertible to usize")?; + let digest_size = digest.size_bytes(); if let UploadSizeInfo::ExactSize(expected_size) = size_info { if self.verify_size && expected_size as u64 != digest_size { self.size_verification_failures.inc(); return Err(make_input_err!( "Expected size to match. Got {} but digest says {} on update", expected_size, - digest.size_bytes + digest_size )); } } @@ -203,7 +203,7 @@ impl StoreDriver for VerifyStore { tx, reader, maybe_digest_size, - digest.packed_hash, + digest.packed_hash(), hasher.as_mut(), ); diff --git a/nativelink-store/tests/cas_utils_test.rs b/nativelink-store/tests/cas_utils_test.rs index f352de961..13035b07e 100644 --- a/nativelink-store/tests/cas_utils_test.rs +++ b/nativelink-store/tests/cas_utils_test.rs @@ -19,10 +19,7 @@ use sha2::{Digest, Sha256}; #[test] fn sha256_is_zero_digest() { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); assert!(is_zero_digest(digest)); } @@ -30,19 +27,13 @@ fn sha256_is_zero_digest() { fn sha256_is_non_zero_digest() { let mut hasher = Sha256::new(); hasher.update(b"a"); - let digest = DigestInfo { - packed_hash: hasher.finalize().into(), - size_bytes: 1, - }; + let digest = DigestInfo::new(hasher.finalize().into(), 1); assert!(!is_zero_digest(digest)); } #[test] fn blake_is_zero_digest() { - let digest = DigestInfo { - packed_hash: Blake3::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Blake3::new().finalize().into(), 0); assert!(is_zero_digest(digest)); } @@ -50,9 +41,6 @@ fn blake_is_zero_digest() { fn blake_is_non_zero_digest() { let mut hasher = Blake3::new(); hasher.update(b"a"); - let digest = DigestInfo { - packed_hash: hasher.finalize().into(), - size_bytes: 1, - }; + let digest = DigestInfo::new(hasher.finalize().into(), 1); assert!(!is_zero_digest(digest)); } diff --git a/nativelink-store/tests/compression_store_test.rs b/nativelink-store/tests/compression_store_test.rs index 65f75de4e..479f4afd9 100644 --- a/nativelink-store/tests/compression_store_test.rs +++ b/nativelink-store/tests/compression_store_test.rs @@ -487,10 +487,7 @@ async fn check_footer_test() -> Result<(), Error> { #[nativelink_test] async fn get_part_is_zero_digest() -> Result<(), Error> { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); const BLOCK_SIZE: u32 = 32 * 1024; let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 47748d0c8..96b3ccdc9 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -249,7 +249,7 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { if let Some(has_digest) = self.digest { for (digest, result) in digests.iter().zip(results.iter_mut()) { if *digest == has_digest.into() { - *result = Some(has_digest.size_bytes as usize); + *result = Some(has_digest.size_bytes() as usize); } } } @@ -286,7 +286,7 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { ) -> Result<(), Error> { // Gets called in the slow store and we provide the data that's // sent to the upstream and the fast store. - let bytes = length.unwrap_or(key.into_digest().size_bytes as usize) - offset; + let bytes = length.unwrap_or(key.into_digest().size_bytes() as usize) - offset; let data = vec![0_u8; bytes]; writer.send(Bytes::copy_from_slice(&data)).await?; writer.send_eof() @@ -350,7 +350,7 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { // Drop get_part as soon as rx.drain() completes tokio::select!( res = rx.drain() => res, - res = fast_slow_store.get_part(digest, tx, 0, Some(digest.size_bytes as usize)) => res, + res = fast_slow_store.get_part(digest, tx, 0, Some(digest.size_bytes() as usize)) => res, ) }, async move { diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 352ab4d0e..bdb3cea71 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -328,12 +328,7 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> { store.update_oneshot(digest1, VALUE1.into()).await?; - let expected_file_name = OsString::from(format!( - "{}/{}-{}", - content_path, - digest1.hash_str(), - digest1.size_bytes - )); + let expected_file_name = OsString::from(format!("{}/{}", content_path, digest1,)); { // Check to ensure our file exists where it should and content matches. let data = read_file_contents(&expected_file_name).await?; @@ -669,18 +664,8 @@ async fn oldest_entry_evicted_with_access_times_loaded_from_disk() -> Result<(), fs::create_dir_all(&content_path).await?; // Make the two files on disk before loading the store. - let file1 = OsString::from(format!( - "{}/{}-{}", - content_path, - digest1.hash_str(), - digest1.size_bytes - )); - let file2 = OsString::from(format!( - "{}/{}-{}", - content_path, - digest2.hash_str(), - digest2.size_bytes - )); + let file1 = OsString::from(format!("{}/{}", content_path, digest1,)); + let file2 = OsString::from(format!("{}/{}", content_path, digest2,)); write_file(&file1, VALUE1.as_bytes()).await?; write_file(&file2, VALUE2.as_bytes()).await?; set_file_atime(&file1, FileTime::from_unix_time(0, 0))?; @@ -1060,10 +1045,7 @@ async fn get_part_timeout_test() -> Result<(), Error> { #[serial] #[nativelink_test] async fn get_part_is_zero_digest() -> Result<(), Error> { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); let content_path = make_temp_path("content_path"); let temp_path = make_temp_path("temp_path"); @@ -1105,10 +1087,7 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { #[serial] #[nativelink_test] async fn has_with_results_on_zero_digests() -> Result<(), Error> { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); let content_path = make_temp_path("content_path"); let temp_path = make_temp_path("temp_path"); @@ -1145,12 +1124,7 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> { loop { yield_fn().await?; - let empty_digest_file_name = OsString::from(format!( - "{}/{}-{}", - content_path, - digest.hash_str(), - digest.size_bytes - )); + let empty_digest_file_name = OsString::from(format!("{}/{}", content_path, digest,)); let file_metadata = fs::metadata(empty_digest_file_name) .await @@ -1253,12 +1227,7 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> { .get_file_path_locked(move |file_path| async move { assert_eq!( file_path, - OsString::from(format!( - "{}/{}-{}", - content_path, - digest.hash_str(), - digest.size_bytes - )) + OsString::from(format!("{}/{}", content_path, digest,)) ); Ok(()) }) @@ -1290,12 +1259,7 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> { store.update_oneshot(digest, VALUE1.into()).await?; - let stored_file_path = OsString::from(format!( - "{}/{}-{}", - content_path, - digest.hash_str(), - digest.size_bytes - )); + let stored_file_path = OsString::from(format!("{}/{}", content_path, digest,)); std::fs::remove_file(stored_file_path)?; let digest_result = store diff --git a/nativelink-store/tests/memory_store_test.rs b/nativelink-store/tests/memory_store_test.rs index b22d71672..b1bc32da0 100644 --- a/nativelink-store/tests/memory_store_test.rs +++ b/nativelink-store/tests/memory_store_test.rs @@ -240,10 +240,7 @@ async fn errors_with_invalid_inputs() -> Result<(), Error> { #[nativelink_test] async fn get_part_is_zero_digest() -> Result<(), Error> { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); let store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); let store_clone = store.clone(); @@ -269,10 +266,7 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { #[nativelink_test] async fn has_with_results_on_zero_digests() -> Result<(), Error> { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); let keys = vec![digest.into()]; let mut results = vec![None]; diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 7b40d15d7..c5ae01210 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -126,7 +126,7 @@ async fn upload_and_get_data() -> Result<(), Error> { // Construct a digest for our data and create a key based on that digest. let digest = DigestInfo::try_new(VALID_HASH1, 2)?; - let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); + let packed_hash_hex = format!("{digest}"); // Construct our Redis store with a mocked out backend. let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); @@ -211,7 +211,7 @@ async fn upload_and_get_data_with_prefix() -> Result<(), Error> { let prefix = "TEST_PREFIX-"; let digest = DigestInfo::try_new(VALID_HASH1, 2)?; - let packed_hash_hex = format!("{prefix}{}-{}", digest.hash_str(), digest.size_bytes); + let packed_hash_hex = format!("{prefix}{digest}"); let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); let real_key = RedisValue::Bytes(packed_hash_hex.into()); @@ -337,7 +337,7 @@ async fn test_large_downloads_are_chunked() -> Result<(), Error> { let data = Bytes::from(vec![0u8; READ_CHUNK_SIZE + 128]); let digest = DigestInfo::try_new(VALID_HASH1, 1)?; - let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); + let packed_hash_hex = format!("{digest}"); let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); let real_key = RedisValue::Bytes(packed_hash_hex.into()); @@ -435,7 +435,7 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { let data_p2 = Bytes::from(vec![0u8; 4 * 1024]); let digest = DigestInfo::try_new(VALID_HASH1, 2)?; - let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); + let packed_hash_hex = format!("{digest}"); let temp_key = RedisValue::Bytes(make_temp_key(&packed_hash_hex).into()); let real_key = RedisValue::Bytes(packed_hash_hex.into()); @@ -525,7 +525,7 @@ async fn zero_len_items_exist_check() -> Result<(), Error> { let mocks = Arc::new(MockRedisBackend::new()); let digest = DigestInfo::try_new(VALID_HASH1, 0)?; - let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); + let packed_hash_hex = format!("{digest}"); let real_key = RedisValue::Bytes(packed_hash_hex.into()); mocks diff --git a/nativelink-store/tests/s3_store_test.rs b/nativelink-store/tests/s3_store_test.rs index afc030ab3..593e547f2 100644 --- a/nativelink-store/tests/s3_store_test.rs +++ b/nativelink-store/tests/s3_store_test.rs @@ -603,10 +603,7 @@ async fn ensure_empty_string_in_stream_works_test() -> Result<(), Error> { #[nativelink_test] async fn get_part_is_zero_digest() -> Result<(), Error> { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); let mock_client = StaticReplayClient::new(vec![]); let test_config = Builder::new() @@ -646,10 +643,7 @@ async fn get_part_is_zero_digest() -> Result<(), Error> { #[nativelink_test] async fn has_with_results_on_zero_digests() -> Result<(), Error> { - let digest = DigestInfo { - packed_hash: Sha256::new().finalize().into(), - size_bytes: 0, - }; + let digest = DigestInfo::new(Sha256::new().finalize().into(), 0); let keys = vec![digest.into()]; let mut results = vec![None]; diff --git a/nativelink-util/src/action_messages.rs b/nativelink-util/src/action_messages.rs index d5a9ad365..d389c17c5 100644 --- a/nativelink-util/src/action_messages.rs +++ b/nativelink-util/src/action_messages.rs @@ -215,11 +215,10 @@ impl std::fmt::Display for ActionUniqueQualifier { f.write_fmt(format_args!( // Note: We use underscores because it makes escaping easier // for redis. - "{}/{}/{}-{}/{}", + "{}/{}/{}/{}", unique_key.instance_name, unique_key.digest_function, - unique_key.digest.hash_str(), - unique_key.digest.size_bytes, + unique_key.digest, if cachable { 'c' } else { 'u' }, )) } @@ -243,11 +242,8 @@ pub struct ActionUniqueKey { impl std::fmt::Display for ActionUniqueKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( - "{}/{}/{}-{}", - self.instance_name, - self.digest_function, - self.digest.hash_str(), - self.digest.size_bytes + "{}/{}/{}", + self.instance_name, self.digest_function, self.digest, )) } } diff --git a/nativelink-util/src/common.rs b/nativelink-util/src/common.rs index 19b3673c9..b3afc4de4 100644 --- a/nativelink-util/src/common.rs +++ b/nativelink-util/src/common.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Ordering; use std::collections::HashMap; use std::fmt; use std::hash::Hash; +use std::{cmp::Ordering, fmt::Display}; use bytes::{BufMut, Bytes, BytesMut}; -use hex::FromHex; +// use hex::FromHex; use nativelink_error::{make_input_err, Error, ResultExt}; use nativelink_metric::{ MetricFieldData, MetricKind, MetricPublishKnownKindData, MetricsComponent, @@ -26,6 +26,7 @@ use nativelink_metric::{ use nativelink_proto::build::bazel::remote::execution::v2::Digest; use prost::Message; use serde::{Deserialize, Serialize}; +use tracing::{event, Level}; pub use crate::fs; @@ -33,10 +34,10 @@ pub use crate::fs; #[repr(C)] pub struct DigestInfo { /// Raw hash in packed form. - pub packed_hash: [u8; 32], + packed_hash: PackedHash, /// Possibly the size of the digest in bytes. - pub size_bytes: i64, + size_bytes: u64, } impl MetricsComponent for DigestInfo { @@ -45,28 +46,28 @@ impl MetricsComponent for DigestInfo { _kind: MetricKind, field_metadata: MetricFieldData, ) -> Result { - format!("{}-{}", self.hash_str(), self.size_bytes) + format!("{}-{}", self.packed_hash, self.size_bytes) .publish(MetricKind::String, field_metadata) } } impl DigestInfo { - pub const fn new(packed_hash: [u8; 32], size_bytes: i64) -> Self { + pub const fn new(packed_hash: [u8; 32], size_bytes: u64) -> Self { DigestInfo { size_bytes, - packed_hash, + packed_hash: PackedHash(packed_hash), } } pub fn try_new(hash: &str, size_bytes: T) -> Result where - T: TryInto + std::fmt::Display + Copy, + T: TryInto + std::fmt::Display + Copy, { let packed_hash = - <[u8; 32]>::from_hex(hash).err_tip(|| format!("Invalid sha256 hash: {hash}"))?; + PackedHash::from_hex(hash).err_tip(|| format!("Invalid sha256 hash: {hash}"))?; let size_bytes = size_bytes .try_into() - .map_err(|_| make_input_err!("Could not convert {} into i64", size_bytes))?; + .map_err(|_| make_input_err!("Could not convert {} into u64", size_bytes))?; Ok(DigestInfo { size_bytes, packed_hash, @@ -74,23 +75,40 @@ impl DigestInfo { } pub fn hash_str(&self) -> String { - hex::encode(self.packed_hash) + format!("{}", self.packed_hash) } pub const fn zero_digest() -> DigestInfo { DigestInfo { size_bytes: 0, - // Magic hash of a sha256 of empty string. - packed_hash: [0u8; 32], + packed_hash: PackedHash::new(), } } + + pub const fn packed_hash(&self) -> &[u8; 32] { + &self.packed_hash.0 + } + + pub fn set_packed_hash(&mut self, packed_hash: [u8; 32]) { + self.packed_hash = PackedHash(packed_hash); + } + + pub const fn size_bytes(&self) -> u64 { + self.size_bytes + } +} + +impl Display for DigestInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}-{}", self.packed_hash, self.size_bytes) + } } impl fmt::Debug for DigestInfo { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("DigestInfo") + .field("hash", &self.packed_hash) .field("size_bytes", &self.size_bytes) - .field("hash", &self.hash_str()) .finish() } } @@ -113,10 +131,14 @@ impl TryFrom for DigestInfo { type Error = Error; fn try_from(digest: Digest) -> Result { - let packed_hash = <[u8; 32]>::from_hex(&digest.hash) + let packed_hash = PackedHash::from_hex(&digest.hash) .err_tip(|| format!("Invalid sha256 hash: {}", digest.hash))?; + let size_bytes = digest + .size_bytes + .try_into() + .map_err(|_| make_input_err!("Could not convert {} into u64", digest.size_bytes))?; Ok(DigestInfo { - size_bytes: digest.size_bytes, + size_bytes, packed_hash, }) } @@ -126,10 +148,14 @@ impl TryFrom<&Digest> for DigestInfo { type Error = Error; fn try_from(digest: &Digest) -> Result { - let packed_hash = <[u8; 32]>::from_hex(&digest.hash) + let packed_hash = PackedHash::from_hex(&digest.hash) .err_tip(|| format!("Invalid sha256 hash: {}", digest.hash))?; + let size_bytes = digest + .size_bytes + .try_into() + .map_err(|_| make_input_err!("Could not convert {} into u64", digest.size_bytes))?; Ok(DigestInfo { - size_bytes: digest.size_bytes, + size_bytes, packed_hash, }) } @@ -139,7 +165,16 @@ impl From for Digest { fn from(val: DigestInfo) -> Self { Digest { hash: val.hash_str(), - size_bytes: val.size_bytes, + size_bytes: val.size_bytes.try_into().unwrap_or_else(|e| { + event!( + Level::ERROR, + "Could not convert {} into u64 - {e:?}", + val.size_bytes + ); + // This is a placeholder value that can help a user identify + // that the conversion failed. + -255 + }), } } } @@ -148,11 +183,56 @@ impl From<&DigestInfo> for Digest { fn from(val: &DigestInfo) -> Self { Digest { hash: val.hash_str(), - size_bytes: val.size_bytes, + size_bytes: val.size_bytes.try_into().unwrap_or_else(|e| { + event!( + Level::ERROR, + "Could not convert {} into u64 - {e:?}", + val.size_bytes + ); + // This is a placeholder value that can help a user identify + // that the conversion failed. + -255 + }), } } } +#[derive(Serialize, Deserialize, Default, Clone, Copy, Eq, PartialEq, Hash, PartialOrd, Ord)] +struct PackedHash([u8; 32]); + +impl PackedHash { + pub const fn new() -> Self { + PackedHash([0; 32]) + } + + fn from_hex(hash: &str) -> Result { + let mut packed_hash = [0u8; 32]; + hex::decode_to_slice(hash, &mut packed_hash) + .map_err(|e| make_input_err!("Invalid sha256 hash: {hash} - {e:?}"))?; + Ok(PackedHash(packed_hash)) + } +} + +impl fmt::Display for PackedHash { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Remember: 32 bytes * 2 hex characters per byte when + // going to hex. + let mut hash = [0u8; std::mem::size_of::() * 2]; + hex::encode_to_slice(self.0, &mut hash).map_err(|_| fmt::Error)?; + match std::str::from_utf8(&hash) { + Ok(hash) => f.write_str(hash)?, + Err(_) => f.write_str(&format!("Could not convert hash to utf8 {:?}", self.0))?, + } + Ok(()) + } +} + +impl fmt::Debug for PackedHash { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_fmt(format_args!("{self}")) + } +} + // Simple utility trait that makes it easier to apply `.try_map` to Vec. // This will convert one vector into another vector with a different type. pub trait VecExt { diff --git a/nativelink-util/src/digest_hasher.rs b/nativelink-util/src/digest_hasher.rs index 72721bffe..6d57785a3 100644 --- a/nativelink-util/src/digest_hasher.rs +++ b/nativelink-util/src/digest_hasher.rs @@ -221,7 +221,7 @@ pub enum DigestHasherFuncImpl { /// The individual implementation of the hash function. pub struct DigestHasherImpl { - hashed_size: i64, + hashed_size: u64, hash_func_impl: DigestHasherFuncImpl, } @@ -243,7 +243,7 @@ impl DigestHasherImpl { impl DigestHasher for DigestHasherImpl { #[inline] fn update(&mut self, input: &[u8]) { - self.hashed_size += input.len() as i64; + self.hashed_size += input.len() as u64; match &mut self.hash_func_impl { DigestHasherFuncImpl::Sha256(h) => sha2::digest::Update::update(h, input), DigestHasherFuncImpl::Blake3(h) => { @@ -288,7 +288,7 @@ impl DigestHasher for DigestHasherImpl { make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}") })?; Result::<_, Error>::Ok(( - DigestInfo::new(hasher.finalize().into(), hasher.count() as i64), + DigestInfo::new(hasher.finalize().into(), hasher.count()), file, )) }) diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 0f081f7ff..55d221c01 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -245,7 +245,7 @@ impl<'a> StoreKey<'a> { match self { StoreKey::Str(Cow::Owned(s)) => Cow::Borrowed(s), StoreKey::Str(Cow::Borrowed(s)) => Cow::Borrowed(s), - StoreKey::Digest(d) => Cow::Owned(format!("{}-{}", d.hash_str(), d.size_bytes)), + StoreKey::Digest(d) => Cow::Owned(format!("{d}")), } } } diff --git a/nativelink-util/tests/proto_stream_utils_test.rs b/nativelink-util/tests/proto_stream_utils_test.rs index b177c2dbc..b1e19104e 100644 --- a/nativelink-util/tests/proto_stream_utils_test.rs +++ b/nativelink-util/tests/proto_stream_utils_test.rs @@ -35,12 +35,12 @@ async fn ensure_no_errors_if_only_first_message_has_resource_name_set() -> Resul let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); const RAW_DATA: &str = "thisdatafoo"; - const DIGEST: DigestInfo = DigestInfo::new([0u8; 32], RAW_DATA.len() as i64); + const DIGEST: DigestInfo = DigestInfo::new([0u8; 32], RAW_DATA.len() as u64); let message1 = WriteRequest { resource_name: format!( "{INSTANCE_NAME}/uploads/some-uuid/blobs/{}/{}", DIGEST.hash_str(), - DIGEST.size_bytes + DIGEST.size_bytes() ), write_offset: 0, finish_write: false, diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 7ed7191f3..cfb7aa804 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -287,7 +287,7 @@ async fn upload_file( .update_with_whole_file( digest.into(), resumeable_file, - UploadSizeInfo::ExactSize(digest.size_bytes as usize), + UploadSizeInfo::ExactSize(digest.size_bytes() as usize), ) .await .err_tip(|| format!("for {full_path:?}"))?; @@ -1473,10 +1473,13 @@ impl UploadActionResults { hasher.proto_digest_func().as_str_name().to_lowercase(), ); template_str.replace("action_digest_hash", action_digest_info.hash_str()); - template_str.replace("action_digest_size", action_digest_info.size_bytes); + template_str.replace("action_digest_size", action_digest_info.size_bytes()); if let Some(historical_digest_info) = maybe_historical_digest_info { template_str.replace("historical_results_hash", historical_digest_info.hash_str()); - template_str.replace("historical_results_size", historical_digest_info.size_bytes); + template_str.replace( + "historical_results_size", + historical_digest_info.size_bytes(), + ); } else { template_str.replace("historical_results_hash", ""); template_str.replace("historical_results_size", "");