Skip to content

Commit

Permalink
Cleanup digest function to use u64 instead of i64
Browse files Browse the repository at this point in the history
It is weird that we use i64 in the code base for this in the first
place. There is some historical reason for having it this way,
specifically around keeping it in sync with the Digest proto, but once
converted we don't need to do this.

Also implement a more standard format/display function for DigestInfo
using the common '{hash}-{size}' format.

Removes direct access to modify the internals of DigestInfo and requires
member access to go through accessors.
  • Loading branch information
allada committed Sep 6, 2024
1 parent 84eab85 commit 57ea600
Show file tree
Hide file tree
Showing 24 changed files with 184 additions and 192 deletions.
2 changes: 1 addition & 1 deletion nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl ByteStreamServer {
digest,
rx,
UploadSizeInfo::ExactSize(
usize::try_from(digest.size_bytes).err_tip(|| "Invalid digest size")?,
usize::try_from(digest.size_bytes()).err_tip(|| "Invalid digest size")?,
),
)
.await
Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl CasServer {
.err_tip(|| "Digest not found in request")?;
let request_data = request.data;
let digest_info = DigestInfo::try_from(digest.clone())?;
let size_bytes = usize::try_from(digest_info.size_bytes)
let size_bytes = usize::try_from(digest_info.size_bytes())
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
error_if!(
size_bytes != request_data.len(),
Expand Down Expand Up @@ -280,7 +280,7 @@ impl CasServer {
// `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
// It will be an empty string when it reached the end of the directory tree.
let next_page_token: String = if let Some(value) = deque.front() {
format!("{}-{}", value.hash_str(), value.size_bytes)
format!("{value}")
} else {
String::new()
};
Expand Down
36 changes: 6 additions & 30 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,7 @@ async fn get_tree_read_directories_without_paging() -> Result<(), Box<dyn std::e
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 0,
page_token: format!(
"{}-{}",
root_directory_digest_info.hash_str(),
root_directory_digest_info.size_bytes
),
page_token: format!("{}", root_directory_digest_info),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand Down Expand Up @@ -442,11 +438,7 @@ async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::erro
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
root_directory_digest_info.hash_str(),
root_directory_digest_info.size_bytes
),
page_token: format!("{}", root_directory_digest_info),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand All @@ -461,22 +453,14 @@ async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::erro
.await,
vec![GetTreeResponse {
directories: vec![root_directory.clone(), sub_directories[0].clone()],
next_page_token: format!(
"{}-{}",
sub_directory_digest_infos[1].hash_str(),
sub_directory_digest_infos[1].size_bytes
),
next_page_token: format!("{}", sub_directory_digest_infos[1]),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
sub_directory_digest_infos[1].hash_str(),
sub_directory_digest_infos[1].size_bytes
),
page_token: format!("{}", sub_directory_digest_infos[1]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand All @@ -491,22 +475,14 @@ async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::erro
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[1].clone(), sub_directories[2].clone()],
next_page_token: format!(
"{}-{}",
sub_directory_digest_infos[3].hash_str(),
sub_directory_digest_infos[3].size_bytes
),
next_page_token: format!("{}", sub_directory_digest_infos[3]),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
sub_directory_digest_infos[3].hash_str(),
sub_directory_digest_infos[3].size_bytes
),
page_token: format!("{}", sub_directory_digest_infos[3]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand Down
2 changes: 1 addition & 1 deletion nativelink-store/src/cas_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
#[inline]
pub fn is_zero_digest<'a>(digest: impl Into<StoreKey<'a>>) -> bool {
match digest.into() {
StoreKey::Digest(digest) => digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(&digest),
StoreKey::Digest(digest) => digest.size_bytes() == 0 && ZERO_BYTE_DIGESTS.contains(&digest),
_ => false,
}
}
8 changes: 3 additions & 5 deletions nativelink-store/src/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ impl DedupStore {
let digests: Vec<_> = index_entries
.entries
.into_iter()
.map(|index_entry| {
DigestInfo::new(index_entry.packed_hash, index_entry.size_bytes).into()
})
.map(StoreKey::Digest)
.collect();
let mut sum = 0;
for size in self.content_store.has_many(&digests).await? {
Expand Down Expand Up @@ -181,7 +179,7 @@ impl StoreDriver for DedupStore {
.map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc"))
.map_ok(|frame| async move {
let hash = blake3::hash(&frame[..]).into();
let index_entry = DigestInfo::new(hash, frame.len() as i64);
let index_entry = DigestInfo::new(hash, frame.len() as u64);
if self
.content_store
.has(index_entry)
Expand Down Expand Up @@ -266,7 +264,7 @@ impl StoreDriver for DedupStore {
let mut entries = Vec::with_capacity(index_entries.entries.len());
for entry in index_entries.entries {
let first_byte = current_entries_sum;
let entry_size = usize::try_from(entry.size_bytes)
let entry_size = usize::try_from(entry.size_bytes())
.err_tip(|| "Failed to convert to usize in DedupStore")?;
current_entries_sum += entry_size;
// Filter any items who's end byte is before the first requested byte.
Expand Down
2 changes: 1 addition & 1 deletion nativelink-store/src/existence_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl<I: InstantWrapper> StoreDriver for ExistenceCacheStore<I> {
.get_part(digest, writer, offset, length)
.await;
if result.is_ok() {
let size = usize::try_from(digest.size_bytes)
let size = usize::try_from(digest.size_bytes())
.err_tip(|| "Could not convert size_bytes in ExistenceCacheStore::get_part")?;
let _ = self
.existence_cache
Expand Down
6 changes: 4 additions & 2 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Drop for EncodedFilePath {

#[inline]
fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
format!("{}/{}-{}", folder, digest.hash_str(), digest.size_bytes).into()
format!("{folder}/{digest}").into()
}

pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
Expand Down Expand Up @@ -301,11 +301,13 @@ impl Debug for FileEntryImpl {

fn make_temp_digest(digest: &mut DigestInfo) {
static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
digest.packed_hash[24..].clone_from_slice(
let mut hash = *digest.packed_hash();
hash[24..].clone_from_slice(
&DELETE_FILE_COUNTER
.fetch_add(1, Ordering::Relaxed)
.to_le_bytes(),
);
digest.set_packed_hash(hash);
}

impl LenEntry for FileEntryImpl {
Expand Down
8 changes: 4 additions & 4 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ impl StoreDriver for GrpcStore {
{
match missing_digests.binary_search(&digest) {
Ok(_) => *result = None,
Err(_) => *result = Some(usize::try_from(digest.size_bytes)?),
Err(_) => *result = Some(usize::try_from(digest.size_bytes())?),
}
}

Expand All @@ -589,7 +589,7 @@ impl StoreDriver for GrpcStore {
&self.instance_name,
Uuid::new_v4().hyphenated().encode_lower(&mut buf),
digest.hash_str(),
digest.size_bytes,
digest.size_bytes(),
);

struct LocalState {
Expand Down Expand Up @@ -666,15 +666,15 @@ impl StoreDriver for GrpcStore {
}

// Shortcut for empty blobs.
if digest.size_bytes == 0 {
if digest.size_bytes() == 0 {
return writer.send_eof();
}

let resource_name = format!(
"{}/blobs/{}/{}",
&self.instance_name,
digest.hash_str(),
digest.size_bytes,
digest.size_bytes(),
);

struct LocalState<'a> {
Expand Down
18 changes: 9 additions & 9 deletions nativelink-store/src/shard_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,30 +93,30 @@ impl ShardStore {
// array length. They optimize especially well when the optimizer can easily determine
// the slice length, e.g. <[u8; 4]>::try_from(&slice[4..8]).unwrap(). Array implements
// TryFrom returning.
let size_bytes = digest.size_bytes.to_le_bytes();
let size_bytes = digest.size_bytes().to_le_bytes();
0.bitxor(u32::from_le_bytes(
digest.packed_hash[0..4].try_into().unwrap(),
digest.packed_hash()[0..4].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[4..8].try_into().unwrap(),
digest.packed_hash()[4..8].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[8..12].try_into().unwrap(),
digest.packed_hash()[8..12].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[12..16].try_into().unwrap(),
digest.packed_hash()[12..16].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[16..20].try_into().unwrap(),
digest.packed_hash()[16..20].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[20..24].try_into().unwrap(),
digest.packed_hash()[20..24].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[24..28].try_into().unwrap(),
digest.packed_hash()[24..28].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[28..32].try_into().unwrap(),
digest.packed_hash()[28..32].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(size_bytes[0..4].try_into().unwrap()))
.bitxor(u32::from_le_bytes(size_bytes[4..8].try_into().unwrap()))
Expand Down
12 changes: 6 additions & 6 deletions nativelink-store/src/size_partitioning_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::join;
#[derive(MetricsComponent)]
pub struct SizePartitioningStore {
#[metric(help = "Size to partition our data")]
partition_size: i64,
partition_size: u64,
#[metric(group = "lower_store")]
lower_store: Store,
#[metric(group = "upper_store")]
Expand All @@ -40,7 +40,7 @@ impl SizePartitioningStore {
upper_store: Store,
) -> Arc<Self> {
Arc::new(SizePartitioningStore {
partition_size: config.size as i64,
partition_size: config.size,
lower_store,
upper_store,
})
Expand All @@ -61,7 +61,7 @@ impl StoreDriver for SizePartitioningStore {
non_digest_sample = Some(k.borrow().into_owned());
return false;
};
digest.size_bytes < self.partition_size
digest.size_bytes() < self.partition_size
});
if let Some(non_digest) = non_digest_sample {
return Err(make_input_err!(
Expand Down Expand Up @@ -110,7 +110,7 @@ impl StoreDriver for SizePartitioningStore {
))
}
};
if digest.size_bytes < self.partition_size {
if digest.size_bytes() < self.partition_size {
return self.lower_store.update(digest, reader, size_info).await;
}
self.upper_store.update(digest, reader, size_info).await
Expand All @@ -131,7 +131,7 @@ impl StoreDriver for SizePartitioningStore {
))
}
};
if digest.size_bytes < self.partition_size {
if digest.size_bytes() < self.partition_size {
return self
.lower_store
.get_part(digest, writer, offset, length)
Expand All @@ -150,7 +150,7 @@ impl StoreDriver for SizePartitioningStore {
StoreKey::Digest(digest) => digest,
_ => return self,
};
if digest.size_bytes < self.partition_size {
if digest.size_bytes() < self.partition_size {
return self.lower_store.inner_store(Some(digest));
}
self.upper_store.inner_store(Some(digest))
Expand Down
12 changes: 6 additions & 6 deletions nativelink-store/src/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl VerifyStore {
mut tx: DropCloserWriteHalf,
mut rx: DropCloserReadHalf,
maybe_expected_digest_size: Option<u64>,
original_hash: [u8; 32],
original_hash: &[u8; 32],
mut maybe_hasher: Option<&mut D>,
) -> Result<(), Error> {
let mut sum_size: u64 = 0;
Expand Down Expand Up @@ -114,7 +114,8 @@ impl VerifyStore {
}
}
if let Some(hasher) = maybe_hasher.as_mut() {
let hash_result: [u8; 32] = hasher.finalize_digest().packed_hash;
let digest = hasher.finalize_digest();
let hash_result = digest.packed_hash();
if original_hash != hash_result {
self.hash_verification_failures.inc();
return Err(make_input_err!(
Expand Down Expand Up @@ -167,15 +168,14 @@ impl StoreDriver for VerifyStore {
));
}
};
let digest_size = u64::try_from(digest.size_bytes)
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
let digest_size = digest.size_bytes();
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
if self.verify_size && expected_size as u64 != digest_size {
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected size to match. Got {} but digest says {} on update",
expected_size,
digest.size_bytes
digest_size
));
}
}
Expand Down Expand Up @@ -203,7 +203,7 @@ impl StoreDriver for VerifyStore {
tx,
reader,
maybe_digest_size,
digest.packed_hash,
digest.packed_hash(),
hasher.as_mut(),
);

Expand Down
20 changes: 4 additions & 16 deletions nativelink-store/tests/cas_utils_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,28 @@ use sha2::{Digest, Sha256};

#[test]
fn sha256_is_zero_digest() {
let digest = DigestInfo {
packed_hash: Sha256::new().finalize().into(),
size_bytes: 0,
};
let digest = DigestInfo::new(Sha256::new().finalize().into(), 0);
assert!(is_zero_digest(digest));
}

#[test]
fn sha256_is_non_zero_digest() {
let mut hasher = Sha256::new();
hasher.update(b"a");
let digest = DigestInfo {
packed_hash: hasher.finalize().into(),
size_bytes: 1,
};
let digest = DigestInfo::new(hasher.finalize().into(), 1);
assert!(!is_zero_digest(digest));
}

#[test]
fn blake_is_zero_digest() {
let digest = DigestInfo {
packed_hash: Blake3::new().finalize().into(),
size_bytes: 0,
};
let digest = DigestInfo::new(Blake3::new().finalize().into(), 0);
assert!(is_zero_digest(digest));
}

#[test]
fn blake_is_non_zero_digest() {
let mut hasher = Blake3::new();
hasher.update(b"a");
let digest = DigestInfo {
packed_hash: hasher.finalize().into(),
size_bytes: 1,
};
let digest = DigestInfo::new(hasher.finalize().into(), 1);
assert!(!is_zero_digest(digest));
}
Loading

0 comments on commit 57ea600

Please sign in to comment.