Skip to content

Commit

Permalink
Cleanup digest function to use u64 instead of i64 (#1327)
Browse files Browse the repository at this point in the history
It is weird that we use i64 in the code base for this in the first
place. There is some historical reason for having it this way,
specifically around keeping it in sync with the Digest proto, but once
converted we don't need to do this.

Also implement a more standard format/display function for DigestInfo
using the common '{hash}-{size}' format.

Removes direct access to modify the internals of DigestInfo and requires
member access to go through accessors.
  • Loading branch information
allada authored Sep 6, 2024
1 parent 57c784a commit 140b7cb
Show file tree
Hide file tree
Showing 26 changed files with 193 additions and 209 deletions.
2 changes: 1 addition & 1 deletion nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl ByteStreamServer {
digest,
rx,
UploadSizeInfo::ExactSize(
usize::try_from(digest.size_bytes).err_tip(|| "Invalid digest size")?,
usize::try_from(digest.size_bytes()).err_tip(|| "Invalid digest size")?,
),
)
.await
Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl CasServer {
.err_tip(|| "Digest not found in request")?;
let request_data = request.data;
let digest_info = DigestInfo::try_from(digest.clone())?;
let size_bytes = usize::try_from(digest_info.size_bytes)
let size_bytes = usize::try_from(digest_info.size_bytes())
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
error_if!(
size_bytes != request_data.len(),
Expand Down Expand Up @@ -280,7 +280,7 @@ impl CasServer {
// `next_page_token` will return the `{hash_str}:{size_bytes}` of the next request's first directory digest.
// It will be an empty string when it reached the end of the directory tree.
let next_page_token: String = if let Some(value) = deque.front() {
format!("{}-{}", value.hash_str(), value.size_bytes)
format!("{value}")
} else {
String::new()
};
Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/tests/ac_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async fn empty_store() -> Result<(), Box<dyn std::error::Error>> {
assert_eq!(err.code(), Code::NotFound);
assert_eq!(
err.message(),
"Key Digest(DigestInfo { size_bytes: 0, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found"
"Key Digest(DigestInfo(\"0123456789abcdef000000000000000000000000000000000123456789abcdef-0\")) not found"
);
Ok(())
}
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn single_item_wrong_digest_size() -> Result<(), Box<dyn std::error::Error
assert_eq!(err.code(), Code::NotFound);
assert_eq!(
err.message(),
"Key Digest(DigestInfo { size_bytes: 146, hash: \"0123456789abcdef000000000000000000000000000000000123456789abcdef\" }) not found"
"Key Digest(DigestInfo(\"0123456789abcdef000000000000000000000000000000000123456789abcdef-146\")) not found"
);
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions nativelink-service/tests/bytestream_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,11 +842,11 @@ pub async fn read_with_not_found_does_not_deadlock() -> Result<(), Error> {

let result = result_fut.await.err_tip(|| "Expected result to be ready")?;
let expected_err_str = concat!(
"status: NotFound, message: \"Key Digest(DigestInfo { size_bytes: 55, hash: \\\"0123456789abcdef000000000000000000000000000000000123456789abcdef\\\" }) not found\", details: [], metadata: MetadataMap { headers: {} }",
"status: NotFound, message: \"Key Digest(DigestInfo(\\\"0123456789abcdef000000000000000000000000000000000123456789abcdef-55\\\")) not found\", details: [], metadata: MetadataMap { headers: {} }",
);
assert_eq!(
Error::from(result.unwrap_err()),
make_err!(Code::NotFound, "{}", expected_err_str),
make_err!(Code::NotFound, "{expected_err_str}"),
"Expected error data to match"
);
}
Expand Down
36 changes: 6 additions & 30 deletions nativelink-service/tests/cas_server_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,7 @@ async fn get_tree_read_directories_without_paging() -> Result<(), Box<dyn std::e
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 0,
page_token: format!(
"{}-{}",
root_directory_digest_info.hash_str(),
root_directory_digest_info.size_bytes
),
page_token: format!("{root_directory_digest_info}"),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand Down Expand Up @@ -442,11 +438,7 @@ async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::erro
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
root_directory_digest_info.hash_str(),
root_directory_digest_info.size_bytes
),
page_token: format!("{root_directory_digest_info}"),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand All @@ -461,22 +453,14 @@ async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::erro
.await,
vec![GetTreeResponse {
directories: vec![root_directory.clone(), sub_directories[0].clone()],
next_page_token: format!(
"{}-{}",
sub_directory_digest_infos[1].hash_str(),
sub_directory_digest_infos[1].size_bytes
),
next_page_token: format!("{}", sub_directory_digest_infos[1]),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
sub_directory_digest_infos[1].hash_str(),
sub_directory_digest_infos[1].size_bytes
),
page_token: format!("{}", sub_directory_digest_infos[1]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand All @@ -491,22 +475,14 @@ async fn get_tree_read_directories_with_paging() -> Result<(), Box<dyn std::erro
.await,
vec![GetTreeResponse {
directories: vec![sub_directories[1].clone(), sub_directories[2].clone()],
next_page_token: format!(
"{}-{}",
sub_directory_digest_infos[3].hash_str(),
sub_directory_digest_infos[3].size_bytes
),
next_page_token: format!("{}", sub_directory_digest_infos[3]),
}]
);
let raw_response = cas_server
.get_tree(Request::new(GetTreeRequest {
instance_name: INSTANCE_NAME.to_string(),
page_size: 2,
page_token: format!(
"{}-{}",
sub_directory_digest_infos[3].hash_str(),
sub_directory_digest_infos[3].size_bytes
),
page_token: format!("{}", sub_directory_digest_infos[3]),
root_digest: Some(root_directory_digest_info.into()),
digest_function: digest_function::Value::Sha256.into(),
}))
Expand Down
2 changes: 1 addition & 1 deletion nativelink-store/src/cas_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub const ZERO_BYTE_DIGESTS: [DigestInfo; 2] = [
#[inline]
pub fn is_zero_digest<'a>(digest: impl Into<StoreKey<'a>>) -> bool {
match digest.into() {
StoreKey::Digest(digest) => digest.size_bytes == 0 && ZERO_BYTE_DIGESTS.contains(&digest),
StoreKey::Digest(digest) => digest.size_bytes() == 0 && ZERO_BYTE_DIGESTS.contains(&digest),
_ => false,
}
}
8 changes: 3 additions & 5 deletions nativelink-store/src/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ impl DedupStore {
let digests: Vec<_> = index_entries
.entries
.into_iter()
.map(|index_entry| {
DigestInfo::new(index_entry.packed_hash, index_entry.size_bytes).into()
})
.map(StoreKey::Digest)
.collect();
let mut sum = 0;
for size in self.content_store.has_many(&digests).await? {
Expand Down Expand Up @@ -181,7 +179,7 @@ impl StoreDriver for DedupStore {
.map(|r| r.err_tip(|| "Failed to decode frame from fast_cdc"))
.map_ok(|frame| async move {
let hash = blake3::hash(&frame[..]).into();
let index_entry = DigestInfo::new(hash, frame.len() as i64);
let index_entry = DigestInfo::new(hash, frame.len() as u64);
if self
.content_store
.has(index_entry)
Expand Down Expand Up @@ -266,7 +264,7 @@ impl StoreDriver for DedupStore {
let mut entries = Vec::with_capacity(index_entries.entries.len());
for entry in index_entries.entries {
let first_byte = current_entries_sum;
let entry_size = usize::try_from(entry.size_bytes)
let entry_size = usize::try_from(entry.size_bytes())
.err_tip(|| "Failed to convert to usize in DedupStore")?;
current_entries_sum += entry_size;
// Filter any items who's end byte is before the first requested byte.
Expand Down
2 changes: 1 addition & 1 deletion nativelink-store/src/existence_cache_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl<I: InstantWrapper> StoreDriver for ExistenceCacheStore<I> {
.get_part(digest, writer, offset, length)
.await;
if result.is_ok() {
let size = usize::try_from(digest.size_bytes)
let size = usize::try_from(digest.size_bytes())
.err_tip(|| "Could not convert size_bytes in ExistenceCacheStore::get_part")?;
let _ = self
.existence_cache
Expand Down
8 changes: 5 additions & 3 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Drop for EncodedFilePath {

#[inline]
fn to_full_path_from_digest(folder: &str, digest: &DigestInfo) -> OsString {
format!("{}/{}-{}", folder, digest.hash_str(), digest.size_bytes).into()
format!("{folder}/{digest}").into()
}

pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
Expand Down Expand Up @@ -301,11 +301,13 @@ impl Debug for FileEntryImpl {

fn make_temp_digest(digest: &mut DigestInfo) {
static DELETE_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
digest.packed_hash[24..].clone_from_slice(
let mut hash = *digest.packed_hash();
hash[24..].clone_from_slice(
&DELETE_FILE_COUNTER
.fetch_add(1, Ordering::Relaxed)
.to_le_bytes(),
);
digest.set_packed_hash(hash);
}

impl LenEntry for FileEntryImpl {
Expand Down Expand Up @@ -751,7 +753,7 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
let send_eof_result = tx.send_eof();
self.update(digest.into(), rx, UploadSizeInfo::ExactSize(0))
.await
.err_tip(|| format!("Failed to create zero file for key {digest:?}"))
.err_tip(|| format!("Failed to create zero file for key {digest}"))
.merge(
send_eof_result
.err_tip(|| "Failed to send zero file EOF in filesystem store has"),
Expand Down
8 changes: 4 additions & 4 deletions nativelink-store/src/grpc_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ impl StoreDriver for GrpcStore {
{
match missing_digests.binary_search(&digest) {
Ok(_) => *result = None,
Err(_) => *result = Some(usize::try_from(digest.size_bytes)?),
Err(_) => *result = Some(usize::try_from(digest.size_bytes())?),
}
}

Expand All @@ -589,7 +589,7 @@ impl StoreDriver for GrpcStore {
&self.instance_name,
Uuid::new_v4().hyphenated().encode_lower(&mut buf),
digest.hash_str(),
digest.size_bytes,
digest.size_bytes(),
);

struct LocalState {
Expand Down Expand Up @@ -666,15 +666,15 @@ impl StoreDriver for GrpcStore {
}

// Shortcut for empty blobs.
if digest.size_bytes == 0 {
if digest.size_bytes() == 0 {
return writer.send_eof();
}

let resource_name = format!(
"{}/blobs/{}/{}",
&self.instance_name,
digest.hash_str(),
digest.size_bytes,
digest.size_bytes(),
);

struct LocalState<'a> {
Expand Down
18 changes: 9 additions & 9 deletions nativelink-store/src/shard_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,30 +93,30 @@ impl ShardStore {
// array length. They optimize especially well when the optimizer can easily determine
// the slice length, e.g. <[u8; 4]>::try_from(&slice[4..8]).unwrap(). Array implements
// TryFrom returning.
let size_bytes = digest.size_bytes.to_le_bytes();
let size_bytes = digest.size_bytes().to_le_bytes();
0.bitxor(u32::from_le_bytes(
digest.packed_hash[0..4].try_into().unwrap(),
digest.packed_hash()[0..4].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[4..8].try_into().unwrap(),
digest.packed_hash()[4..8].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[8..12].try_into().unwrap(),
digest.packed_hash()[8..12].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[12..16].try_into().unwrap(),
digest.packed_hash()[12..16].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[16..20].try_into().unwrap(),
digest.packed_hash()[16..20].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[20..24].try_into().unwrap(),
digest.packed_hash()[20..24].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[24..28].try_into().unwrap(),
digest.packed_hash()[24..28].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(
digest.packed_hash[28..32].try_into().unwrap(),
digest.packed_hash()[28..32].try_into().unwrap(),
))
.bitxor(u32::from_le_bytes(size_bytes[0..4].try_into().unwrap()))
.bitxor(u32::from_le_bytes(size_bytes[4..8].try_into().unwrap()))
Expand Down
12 changes: 6 additions & 6 deletions nativelink-store/src/size_partitioning_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::join;
#[derive(MetricsComponent)]
pub struct SizePartitioningStore {
#[metric(help = "Size to partition our data")]
partition_size: i64,
partition_size: u64,
#[metric(group = "lower_store")]
lower_store: Store,
#[metric(group = "upper_store")]
Expand All @@ -40,7 +40,7 @@ impl SizePartitioningStore {
upper_store: Store,
) -> Arc<Self> {
Arc::new(SizePartitioningStore {
partition_size: config.size as i64,
partition_size: config.size,
lower_store,
upper_store,
})
Expand All @@ -61,7 +61,7 @@ impl StoreDriver for SizePartitioningStore {
non_digest_sample = Some(k.borrow().into_owned());
return false;
};
digest.size_bytes < self.partition_size
digest.size_bytes() < self.partition_size
});
if let Some(non_digest) = non_digest_sample {
return Err(make_input_err!(
Expand Down Expand Up @@ -110,7 +110,7 @@ impl StoreDriver for SizePartitioningStore {
))
}
};
if digest.size_bytes < self.partition_size {
if digest.size_bytes() < self.partition_size {
return self.lower_store.update(digest, reader, size_info).await;
}
self.upper_store.update(digest, reader, size_info).await
Expand All @@ -131,7 +131,7 @@ impl StoreDriver for SizePartitioningStore {
))
}
};
if digest.size_bytes < self.partition_size {
if digest.size_bytes() < self.partition_size {
return self
.lower_store
.get_part(digest, writer, offset, length)
Expand All @@ -150,7 +150,7 @@ impl StoreDriver for SizePartitioningStore {
StoreKey::Digest(digest) => digest,
_ => return self,
};
if digest.size_bytes < self.partition_size {
if digest.size_bytes() < self.partition_size {
return self.lower_store.inner_store(Some(digest));
}
self.upper_store.inner_store(Some(digest))
Expand Down
12 changes: 6 additions & 6 deletions nativelink-store/src/verify_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl VerifyStore {
mut tx: DropCloserWriteHalf,
mut rx: DropCloserReadHalf,
maybe_expected_digest_size: Option<u64>,
original_hash: [u8; 32],
original_hash: &[u8; 32],
mut maybe_hasher: Option<&mut D>,
) -> Result<(), Error> {
let mut sum_size: u64 = 0;
Expand Down Expand Up @@ -114,7 +114,8 @@ impl VerifyStore {
}
}
if let Some(hasher) = maybe_hasher.as_mut() {
let hash_result: [u8; 32] = hasher.finalize_digest().packed_hash;
let digest = hasher.finalize_digest();
let hash_result = digest.packed_hash();
if original_hash != hash_result {
self.hash_verification_failures.inc();
return Err(make_input_err!(
Expand Down Expand Up @@ -167,15 +168,14 @@ impl StoreDriver for VerifyStore {
));
}
};
let digest_size = u64::try_from(digest.size_bytes)
.err_tip(|| "Digest size_bytes was not convertible to usize")?;
let digest_size = digest.size_bytes();
if let UploadSizeInfo::ExactSize(expected_size) = size_info {
if self.verify_size && expected_size as u64 != digest_size {
self.size_verification_failures.inc();
return Err(make_input_err!(
"Expected size to match. Got {} but digest says {} on update",
expected_size,
digest.size_bytes
digest_size
));
}
}
Expand Down Expand Up @@ -203,7 +203,7 @@ impl StoreDriver for VerifyStore {
tx,
reader,
maybe_digest_size,
digest.packed_hash,
digest.packed_hash(),
hasher.as_mut(),
);

Expand Down
Loading

0 comments on commit 140b7cb

Please sign in to comment.