From 2a55f1ebd0f0b8c8915af7015f12f59b56593920 Mon Sep 17 00:00:00 2001 From: Aleksandar Mladenovic <142597090+aleksdmladenovic@users.noreply.github.com> Date: Thu, 19 Sep 2024 23:24:55 -0400 Subject: [PATCH] Convert usize to u63 in Store trait APIs (#1344) APIs in `StoreLike` now uses `u62` instead of `usize` It's because `u64` expresses the size of the data, but `usize` expresses local memory limits. --- .../src/memory_awaited_action_db.rs | 2 +- .../tests/utils/mock_scheduler.rs | 10 ++-- nativelink-service/src/bytestream_server.rs | 20 ++++---- nativelink-store/src/ac_utils.rs | 7 +-- .../src/completeness_checking_store.rs | 10 ++-- nativelink-store/src/compression_store.rs | 41 ++++++++-------- nativelink-store/src/dedup_store.rs | 44 +++++++++-------- nativelink-store/src/default_store_factory.rs | 2 +- nativelink-store/src/existence_cache_store.rs | 16 +++---- nativelink-store/src/fast_slow_store.rs | 36 ++++++++------ nativelink-store/src/filesystem_store.rs | 16 +++---- nativelink-store/src/grpc_store.rs | 22 +++++---- nativelink-store/src/memory_store.rs | 21 ++++++--- nativelink-store/src/noop_store.rs | 6 +-- nativelink-store/src/redis_store.rs | 11 +++-- nativelink-store/src/ref_store.rs | 6 +-- nativelink-store/src/s3_store.rs | 36 ++++++++------ nativelink-store/src/shard_store.rs | 6 +-- .../src/size_partitioning_store.rs | 6 +-- nativelink-store/src/verify_store.rs | 8 ++-- nativelink-store/tests/ac_utils_test.rs | 2 +- .../tests/compression_store_test.rs | 4 +- nativelink-store/tests/dedup_store_test.rs | 36 ++++++++------ .../tests/existence_store_test.rs | 10 ++-- .../tests/fast_slow_store_test.rs | 26 +++++----- .../tests/filesystem_store_test.rs | 10 ++-- nativelink-store/tests/memory_store_test.rs | 6 +-- nativelink-store/tests/redis_store_test.rs | 10 ++-- nativelink-store/tests/ref_store_test.rs | 2 +- nativelink-store/tests/s3_store_test.rs | 8 ++-- nativelink-store/tests/shard_store_test.rs | 13 +++-- .../tests/size_partitioning_store_test.rs | 4 +- nativelink-store/tests/verify_store_test.rs | 10 ++-- nativelink-util/src/buf_channel.rs | 4 +- nativelink-util/src/evicting_map.rs | 24 ++++------ nativelink-util/src/store_trait.rs | 47 ++++++++++--------- nativelink-util/tests/evicting_map_test.rs | 20 ++++---- .../src/running_actions_manager.rs | 2 +- 38 files changed, 307 insertions(+), 257 deletions(-) diff --git a/nativelink-scheduler/src/memory_awaited_action_db.rs b/nativelink-scheduler/src/memory_awaited_action_db.rs index 514460aea..2c0a11f49 100644 --- a/nativelink-scheduler/src/memory_awaited_action_db.rs +++ b/nativelink-scheduler/src/memory_awaited_action_db.rs @@ -83,7 +83,7 @@ impl Drop for ClientAwaitedAction { /// why the implementation has fixed default values in it. impl LenEntry for ClientAwaitedAction { #[inline] - fn len(&self) -> usize { + fn len(&self) -> u64 { 0 } diff --git a/nativelink-scheduler/tests/utils/mock_scheduler.rs b/nativelink-scheduler/tests/utils/mock_scheduler.rs index fe0e37035..113a3d25c 100644 --- a/nativelink-scheduler/tests/utils/mock_scheduler.rs +++ b/nativelink-scheduler/tests/utils/mock_scheduler.rs @@ -17,12 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; use nativelink_error::{make_input_err, Error}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; -use nativelink_util::{ - action_messages::{ActionInfo, OperationId}, - known_platform_property_provider::KnownPlatformPropertyProvider, - operation_state_manager::{ - ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, - }, +use nativelink_util::action_messages::{ActionInfo, OperationId}; +use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; +use nativelink_util::operation_state_manager::{ + ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, }; use tokio::sync::{mpsc, Mutex}; diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index bb5ab6c91..39c817f14 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -249,13 +249,7 @@ impl ByteStreamServer { // `store` to ensure its lifetime follows the future and not the caller. store // Bytestream always uses digest size as the actual byte size. - .update( - digest, - rx, - UploadSizeInfo::ExactSize( - usize::try_from(digest.size_bytes()).err_tip(|| "Invalid digest size")?, - ), - ) + .update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes())) .await }); Ok(ActiveStreamGuard { @@ -275,8 +269,8 @@ impl ByteStreamServer { digest: DigestInfo, read_request: ReadRequest, ) -> Result, Error> { - let read_limit = usize::try_from(read_request.read_limit) - .err_tip(|| "read_limit has is not convertible to usize")?; + let read_limit = u64::try_from(read_request.read_limit) + .err_tip(|| "Could not convert read_limit to u64")?; let (tx, rx) = make_buf_channel_pair(); @@ -300,7 +294,13 @@ impl ByteStreamServer { maybe_get_part_result: None, get_part_fut: Box::pin(async move { store - .get_part(digest, tx, read_request.read_offset as usize, read_limit) + .get_part( + digest, + tx, + u64::try_from(read_request.read_offset) + .err_tip(|| "Could not convert read_offset to u64")?, + read_limit, + ) .await }), }); diff --git a/nativelink-store/src/ac_utils.rs b/nativelink-store/src/ac_utils.rs index 2915fb55d..c17ab649d 100644 --- a/nativelink-store/src/ac_utils.rs +++ b/nativelink-store/src/ac_utils.rs @@ -50,7 +50,7 @@ pub async fn get_and_decode_digest( pub async fn get_size_and_decode_digest( store: &impl StoreLike, key: impl Into>, -) -> Result<(T, usize), Error> { +) -> Result<(T, u64), Error> { let key = key.into(); // Note: For unknown reasons we appear to be hitting: // https://github.com/rust-lang/rust/issues/92096 @@ -58,7 +58,7 @@ pub async fn get_size_and_decode_digest( // are using the store driver function here. let mut store_data_resp = store .as_store_driver_pin() - .get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE)) + .get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE as u64)) .await; if let Err(err) = &mut store_data_resp { if err.code == Code::NotFound { @@ -69,7 +69,8 @@ pub async fn get_size_and_decode_digest( } } let store_data = store_data_resp?; - let store_data_len = store_data.len(); + let store_data_len = + u64::try_from(store_data.len()).err_tip(|| "Could not convert store_data.len() to u64")?; T::decode(store_data) .err_tip_with_code(|e| { diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index de6343d6f..4ccb2bc50 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -132,12 +132,12 @@ impl CompletenessCheckingStore { async fn inner_has_with_results( &self, action_result_digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // Holds shared state between the different futures. // This is how get around lifetime issues. struct State<'a> { - results: &'a mut [Option], + results: &'a mut [Option], digests_to_check: Vec>, digests_to_check_idxs: Vec, notify: Arc, @@ -342,7 +342,7 @@ impl StoreDriver for CompletenessCheckingStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.inner_has_with_results(keys, results).await } @@ -360,8 +360,8 @@ impl StoreDriver for CompletenessCheckingStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let results = &mut [None]; self.inner_has_with_results(&[key.borrow()], results) diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index fb3884015..7f7c259fb 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -42,7 +42,7 @@ pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1; // Default block size that will be used to slice stream into. pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024; -const U32_SZ: usize = std::mem::size_of::(); +const U32_SZ: u64 = std::mem::size_of::() as u64; type BincodeOptions = WithOtherIntEncoding; @@ -145,25 +145,25 @@ pub struct Footer { /// lz4_flex::block::get_maximum_output_size() way over estimates, so we use the /// one provided here: https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61 /// Local testing shows this gives quite accurate worst case given random input. -fn lz4_compress_bound(input_size: usize) -> usize { +fn lz4_compress_bound(input_size: u64) -> u64 { input_size + (input_size / 255) + 16 } struct UploadState { header: Header, footer: Footer, - max_output_size: usize, - input_max_size: usize, + max_output_size: u64, + input_max_size: u64, } impl UploadState { - pub fn new(store: &CompressionStore, upload_size: UploadSizeInfo) -> Self { + pub fn new(store: &CompressionStore, upload_size: UploadSizeInfo) -> Result { let input_max_size = match upload_size { UploadSizeInfo::ExactSize(sz) => sz, UploadSizeInfo::MaxSize(sz) => sz, }; - let max_index_count = (input_max_size / store.config.block_size as usize) + 1; + let max_index_count = (input_max_size / store.config.block_size as u64) + 1; let header = Header { version: CURRENT_STREAM_FORMAT_VERSION, @@ -177,7 +177,8 @@ impl UploadState { SliceIndex { ..Default::default() }; - max_index_count + usize::try_from(max_index_count) + .err_tip(|| "Could not convert max_index_count to usize")? ], index_count: max_index_count as u32, uncompressed_data_size: 0, // Updated later. @@ -186,22 +187,22 @@ impl UploadState { }; // This is more accurate of an estimate than what get_maximum_output_size calculates. - let max_block_size = lz4_compress_bound(store.config.block_size as usize) + U32_SZ + 1; + let max_block_size = lz4_compress_bound(store.config.block_size as u64) + U32_SZ + 1; let max_output_size = { - let header_size = store.bincode_options.serialized_size(&header).unwrap() as usize; + let header_size = store.bincode_options.serialized_size(&header).unwrap(); let max_content_size = max_block_size * max_index_count; let max_footer_size = - U32_SZ + 1 + store.bincode_options.serialized_size(&footer).unwrap() as usize; + U32_SZ + 1 + store.bincode_options.serialized_size(&footer).unwrap(); header_size + max_content_size + max_footer_size }; - Self { + Ok(Self { header, footer, max_output_size, input_max_size, - } + }) } } @@ -246,7 +247,7 @@ impl StoreDriver for CompressionStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.inner_store.has_with_results(digests, results).await } @@ -257,7 +258,7 @@ impl StoreDriver for CompressionStore { mut reader: DropCloserReadHalf, upload_size: UploadSizeInfo, ) -> Result<(), Error> { - let mut output_state = UploadState::new(&self, upload_size); + let mut output_state = UploadState::new(&self, upload_size)?; let (mut tx, rx) = make_buf_channel_pair(); @@ -307,7 +308,8 @@ impl StoreDriver for CompressionStore { break; // EOF. } - received_amt += chunk.len(); + received_amt += u64::try_from(chunk.len()) + .err_tip(|| "Could not convert chunk.len() to u64")?; error_if!( received_amt > output_state.input_max_size, "Got more data than stated in compression store upload request" @@ -360,7 +362,7 @@ impl StoreDriver for CompressionStore { }, ); output_state.footer.index_count = output_state.footer.indexes.len() as u32; - output_state.footer.uncompressed_data_size = received_amt as u64; + output_state.footer.uncompressed_data_size = received_amt; { // Write Footer. let serialized_footer = self @@ -392,8 +394,8 @@ impl StoreDriver for CompressionStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { if is_zero_digest(key.borrow()) { writer @@ -402,7 +404,6 @@ impl StoreDriver for CompressionStore { return Ok(()); } - let offset = offset as u64; let (tx, mut rx) = make_buf_channel_pair(); let inner_store = self.inner_store.clone(); @@ -474,7 +475,7 @@ impl StoreDriver for CompressionStore { let mut frame_sz = chunk.get_u32_le(); let mut uncompressed_data_sz: u64 = 0; - let mut remaining_bytes_to_send: u64 = length.unwrap_or(usize::MAX) as u64; + let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX); let mut chunks_count: u32 = 0; while frame_type != FOOTER_FRAME_TYPE { error_if!( diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index d497451e6..ebacad27d 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -34,9 +34,9 @@ use tracing::{event, Level}; // NOTE: If these change update the comments in `stores.rs` to reflect // the new defaults. -const DEFAULT_MIN_SIZE: usize = 64 * 1024; -const DEFAULT_NORM_SIZE: usize = 256 * 1024; -const DEFAULT_MAX_SIZE: usize = 512 * 1024; +const DEFAULT_MIN_SIZE: u64 = 64 * 1024; +const DEFAULT_NORM_SIZE: u64 = 256 * 1024; +const DEFAULT_MAX_SIZE: u64 = 512 * 1024; const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10; #[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)] @@ -61,37 +61,42 @@ impl DedupStore { config: &nativelink_config::stores::DedupStore, index_store: Store, content_store: Store, - ) -> Arc { + ) -> Result, Error> { let min_size = if config.min_size == 0 { DEFAULT_MIN_SIZE } else { - config.min_size as usize + config.min_size as u64 }; let normal_size = if config.normal_size == 0 { DEFAULT_NORM_SIZE } else { - config.normal_size as usize + config.normal_size as u64 }; let max_size = if config.max_size == 0 { DEFAULT_MAX_SIZE } else { - config.max_size as usize + config.max_size as u64 }; let max_concurrent_fetch_per_get = if config.max_concurrent_fetch_per_get == 0 { DEFAULT_MAX_CONCURRENT_FETCH_PER_GET } else { config.max_concurrent_fetch_per_get as usize }; - Arc::new(Self { + Ok(Arc::new(Self { index_store, content_store, - fast_cdc_decoder: FastCDC::new(min_size, normal_size, max_size), + fast_cdc_decoder: FastCDC::new( + usize::try_from(min_size).err_tip(|| "Could not convert min_size to usize")?, + usize::try_from(normal_size) + .err_tip(|| "Could not convert normal_size to usize")?, + usize::try_from(max_size).err_tip(|| "Could not convert max_size to usize")?, + ), max_concurrent_fetch_per_get, bincode_options: DefaultOptions::new().with_fixint_encoding(), - }) + })) } - async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { + async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { // First we need to load the index that contains where the individual parts actually // can be fetched from. let index_entries = { @@ -148,7 +153,7 @@ impl StoreDriver for DedupStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { digests .iter() @@ -225,8 +230,8 @@ impl StoreDriver for DedupStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { // Special case for if a client tries to read zero bytes. if length == Some(0) { @@ -255,7 +260,7 @@ impl StoreDriver for DedupStore { })? }; - let mut start_byte_in_stream: usize = 0; + let mut start_byte_in_stream: u64 = 0; let entries = { if offset == 0 && length.is_none() { index_entries.entries @@ -264,8 +269,7 @@ impl StoreDriver for DedupStore { let mut entries = Vec::with_capacity(index_entries.entries.len()); for entry in index_entries.entries { let first_byte = current_entries_sum; - let entry_size = usize::try_from(entry.size_bytes()) - .err_tip(|| "Failed to convert to usize in DedupStore")?; + let entry_size = entry.size_bytes(); current_entries_sum += entry_size; // Filter any items who's end byte is before the first requested byte. if current_entries_sum <= offset { @@ -308,8 +312,10 @@ impl StoreDriver for DedupStore { // In the event any of these error, we will abort early and abandon all the rest of the // streamed data. // Note: Need to take special care to ensure we send the proper slice of data requested. - let mut bytes_to_skip = offset - start_byte_in_stream; - let mut bytes_to_send = length.unwrap_or(usize::MAX - offset); + let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream) + .err_tip(|| "Could not convert (offset - start_byte_in_stream) to usize")?; + let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset)) + .err_tip(|| "Could not convert length to usize")?; while let Some(result) = entries_stream.next().await { let mut data = result.err_tip(|| "Inner store iterator closed early in DedupStore")?; assert!( diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 0781ecf66..4d89a675f 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -66,7 +66,7 @@ pub fn store_factory<'a>( config, store_factory(&config.index_store, store_manager, None).await?, store_factory(&config.content_store, store_manager, None).await?, - ), + )?, StoreConfig::existence_cache(config) => ExistenceCacheStore::new( config, store_factory(&config.backend, store_manager, None).await?, diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 43b925fbf..087e37b8c 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -29,11 +29,11 @@ use nativelink_util::instant_wrapper::InstantWrapper; use nativelink_util::store_trait::{Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo}; #[derive(Clone, Debug)] -struct ExistanceItem(usize); +struct ExistanceItem(u64); impl LenEntry for ExistanceItem { #[inline] - fn len(&self) -> usize { + fn len(&self) -> u64 { self.0 } @@ -85,7 +85,7 @@ impl ExistenceCacheStore { async fn inner_has_with_results( self: Pin<&Self>, keys: &[DigestInfo], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.existence_cache .sizes_for_keys(keys, results, true /* peek */) @@ -151,7 +151,7 @@ impl StoreDriver for ExistenceCacheStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(allada) This is a bit of a hack to get around the lifetime issues with the // existence_cache. We need to convert the digests to owned values to be able to @@ -200,8 +200,8 @@ impl StoreDriver for ExistenceCacheStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = key.into_digest(); let result = self @@ -209,11 +209,9 @@ impl StoreDriver for ExistenceCacheStore { .get_part(digest, writer, offset, length) .await; if result.is_ok() { - let size = usize::try_from(digest.size_bytes()) - .err_tip(|| "Could not convert size_bytes in ExistenceCacheStore::get_part")?; let _ = self .existence_cache - .insert(digest, ExistanceItem(size)) + .insert(digest, ExistanceItem(digest.size_bytes())) .await; } result diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 3ce160e36..a6ae8ba71 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -107,21 +107,25 @@ impl FastSlowStore { // TODO(allada) This should be put into utils, as this logic is used // elsewhere in the code. pub fn calculate_range( - received_range: &Range, - send_range: &Range, - ) -> Option> { + received_range: &Range, + send_range: &Range, + ) -> Result>, Error> { // Protect against subtraction overflow. if received_range.start >= received_range.end { - return None; + return Ok(None); } let start = max(received_range.start, send_range.start); let end = min(received_range.end, send_range.end); if received_range.contains(&start) && received_range.contains(&(end - 1)) { // Offset both to the start of the received_range. - Some(start - received_range.start..end - received_range.start) + let calculated_range_start = usize::try_from(start - received_range.start) + .err_tip(|| "Could not convert (start - received_range.start) to usize")?; + let calculated_range_end = usize::try_from(end - received_range.start) + .err_tip(|| "Could not convert (end - received_range.start) to usize")?; + Ok(Some(calculated_range_start..calculated_range_end)) } else { - None + Ok(None) } } } @@ -131,7 +135,7 @@ impl StoreDriver for FastSlowStore { async fn has_with_results( self: Pin<&Self>, key: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // If our slow store is a noop store, it'll always return a 404, // so only check the fast store in such case. @@ -282,8 +286,8 @@ impl StoreDriver for FastSlowStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { // TODO(blaise.bruer) Investigate if we should maybe ignore errors here instead of // forwarding the up. @@ -316,8 +320,8 @@ impl StoreDriver for FastSlowStore { .slow_store_hit_count .fetch_add(1, Ordering::Acquire); - let send_range = offset..length.map_or(usize::MAX, |length| length + offset); - let mut bytes_received: usize = 0; + let send_range = offset..length.map_or(u64::MAX, |length| length + offset); + let mut bytes_received: u64 = 0; let (mut fast_tx, fast_rx) = make_buf_channel_pair(); let (slow_tx, mut slow_rx) = make_buf_channel_pair(); @@ -335,19 +339,21 @@ impl StoreDriver for FastSlowStore { let fast_res = fast_tx.send_eof(); return Ok::<_, Error>((fast_res, writer_pin)); } + let output_buf_len = u64::try_from(output_buf.len()) + .err_tip(|| "Could not output_buf.len() to u64")?; self.metrics .slow_store_downloaded_bytes - .fetch_add(output_buf.len() as u64, Ordering::Acquire); + .fetch_add(output_buf_len, Ordering::Acquire); let writer_fut = if let Some(range) = Self::calculate_range( - &(bytes_received..bytes_received + output_buf.len()), + &(bytes_received..bytes_received + output_buf_len), &send_range, - ) { + )? { writer_pin.send(output_buf.slice(range)).right_future() } else { futures::future::ready(Ok(())).left_future() }; - bytes_received += output_buf.len(); + bytes_received += output_buf_len; let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut); fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")?; diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 9aedba07d..faadbb04c 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -312,8 +312,8 @@ fn make_temp_digest(digest: &mut DigestInfo) { impl LenEntry for FileEntryImpl { #[inline] - fn len(&self) -> usize { - self.size_on_disk() as usize + fn len(&self) -> u64 { + self.size_on_disk() } fn is_empty(&self) -> bool { @@ -729,7 +729,7 @@ impl StoreDriver for FilesystemStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(allada) This is a bit of a hack to get around the lifetime issues with the // existence_cache. We need to convert the digests to owned values to be able to @@ -799,7 +799,7 @@ impl StoreDriver for FilesystemStore { let digest = key.into_digest(); let path = file.get_path().as_os_str().to_os_string(); let file_size = match upload_size { - UploadSizeInfo::ExactSize(size) => size as u64, + UploadSizeInfo::ExactSize(size) => size, UploadSizeInfo::MaxSize(_) => file .as_reader() .await @@ -835,8 +835,8 @@ impl StoreDriver for FilesystemStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = key.into_digest(); if is_zero_digest(digest) { @@ -853,8 +853,8 @@ impl StoreDriver for FilesystemStore { self.evicting_map.get(&digest).await.ok_or_else(|| { make_err!(Code::NotFound, "{digest} not found in filesystem store") })?; - let read_limit = length.unwrap_or(usize::MAX) as u64; - let mut resumeable_temp_file = entry.read_file_part(offset as u64, read_limit).await?; + let read_limit = length.unwrap_or(u64::MAX); + let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?; loop { let mut buf = BytesMut::with_capacity(self.read_buffer_size); diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index cbbe1a079..11d4017fe 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -469,7 +469,7 @@ impl GrpcStore { let length = length.unwrap_or(default_len).min(default_len); if length > 0 { writer - .send(value.freeze().slice(offset..(offset + length))) + .send(value.freeze().slice(offset..offset + length)) .await .err_tip(|| "Failed to write data in grpc store")?; } @@ -510,7 +510,7 @@ impl StoreDriver for GrpcStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { if matches!(self.store_type, nativelink_config::stores::StoreType::ac) { keys.iter() @@ -521,7 +521,7 @@ impl StoreDriver for GrpcStore { // hope that we detect incorrect usage. self.get_action_result_from_digest(key.borrow().into_digest()) .await?; - *result = Some(usize::MAX); + *result = Some(u64::MAX); Ok::<_, Error>(()) }) .collect::>() @@ -565,7 +565,7 @@ impl StoreDriver for GrpcStore { { match missing_digests.binary_search(&digest) { Ok(_) => *result = None, - Err(_) => *result = Some(usize::try_from(digest.size_bytes())?), + Err(_) => *result = Some(digest.size_bytes()), } } @@ -655,11 +655,16 @@ impl StoreDriver for GrpcStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = key.into_digest(); if matches!(self.store_type, nativelink_config::stores::StoreType::ac) { + let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?; + let length = length + .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) + .transpose()?; + return self .get_action_result_as_part(digest, writer, offset, length) .await; @@ -687,8 +692,9 @@ impl StoreDriver for GrpcStore { let local_state = LocalState { resource_name, writer, - read_offset: offset as i64, - read_limit: length.unwrap_or(0) as i64, + read_offset: i64::try_from(offset).err_tip(|| "Could not convert offset to i64")?, + read_limit: i64::try_from(length.unwrap_or(0)) + .err_tip(|| "Could not convert length to i64")?, }; self.retrier diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index ad6ac7d8e..a3973d072 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -40,8 +40,8 @@ impl Debug for BytesWrapper { impl LenEntry for BytesWrapper { #[inline] - fn len(&self) -> usize { - Bytes::len(&self.0) + fn len(&self) -> u64 { + Bytes::len(&self.0) as u64 } #[inline] @@ -81,7 +81,7 @@ impl StoreDriver for MemoryStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(allada): This is a dirty hack to get around the lifetime issues with the // evicting map. @@ -104,7 +104,7 @@ impl StoreDriver for MemoryStore { self: Pin<&Self>, range: (Bound>, Bound>), handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), - ) -> Result { + ) -> Result { let range = ( range.0.map(|v| v.into_owned()), range.1.map(|v| v.into_owned()), @@ -144,9 +144,14 @@ impl StoreDriver for MemoryStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { + let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?; + let length = length + .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) + .transpose()?; + if is_zero_digest(key.borrow()) { writer .send_eof() @@ -159,7 +164,9 @@ impl StoreDriver for MemoryStore { .get(&key.borrow().into_owned()) .await .err_tip_with_code(|_| (Code::NotFound, format!("Key {key:?} not found")))?; - let default_len = value.len() - offset; + let default_len = usize::try_from(value.len()) + .err_tip(|| "Could not convert value.len() to usize")? + .saturating_sub(offset); let length = length.unwrap_or(default_len).min(default_len); if length > 0 { writer diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index 457e8b199..1b8def3b6 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -48,7 +48,7 @@ impl StoreDriver for NoopStore { async fn has_with_results( self: Pin<&Self>, _keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { results.iter_mut().for_each(|r| *r = None); Ok(()) @@ -75,8 +75,8 @@ impl StoreDriver for NoopStore { self: Pin<&Self>, _key: StoreKey<'_>, _writer: &mut DropCloserWriteHalf, - _offset: usize, - _length: Option, + _offset: u64, + _length: Option, ) -> Result<(), Error> { Err(make_err!(Code::NotFound, "Not found in noop store")) } diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 828c775a0..d4560dd22 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -199,7 +199,7 @@ impl StoreDriver for RedisStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(caass): Optimize for the case where `keys.len() == 1` let pipeline = self.client_pool.next().pipeline(); @@ -397,9 +397,14 @@ impl StoreDriver for RedisStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { + let offset = usize::try_from(offset).err_tip(|| "Could not convert offset to usize")?; + let length = length + .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) + .transpose()?; + // To follow RBE spec we need to consider any digest's with // zero size to be existing. if is_zero_digest(key.borrow()) { diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index cb2ba7cfd..d2446d174 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -105,7 +105,7 @@ impl StoreDriver for RefStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.get_store()?.has_with_results(keys, results).await } @@ -123,8 +123,8 @@ impl StoreDriver for RefStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { self.get_store()? .get_part(key, writer, offset, length) diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 4bce1227b..699e4684d 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -66,11 +66,11 @@ use crate::cas_utils::is_zero_digest; // S3 parts cannot be smaller than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html -const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5MB. +const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB. // S3 parts cannot be larger than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html -const MAX_MULTIPART_SIZE: usize = 5 * 1024 * 1024 * 1024; // 5GB. +const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB. // S3 parts cannot be more than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html @@ -328,7 +328,7 @@ where format!("{}{}", self.key_prefix, key.as_str(),) } - async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { + async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { self.retrier .retry(unfold((), move |state| async move { let result = self @@ -353,7 +353,7 @@ where return Some((RetryResult::Ok(None), state)); }; if length >= 0 { - return Some((RetryResult::Ok(Some(length as usize)), state)); + return Some((RetryResult::Ok(Some(length as u64)), state)); } Some(( RetryResult::Err(make_err!( @@ -388,7 +388,7 @@ where async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { keys.iter() .zip(results.iter_mut()) @@ -427,7 +427,10 @@ where // Note(allada) If the upload size is not known, we go down the multipart upload path. // This is not very efficient, but it greatly reduces the complexity of the code. if max_size < MIN_MULTIPART_SIZE && matches!(upload_size, UploadSizeInfo::ExactSize(_)) { - reader.set_max_recent_data_size(self.max_retry_buffer_per_request); + reader.set_max_recent_data_size( + u64::try_from(self.max_retry_buffer_per_request) + .err_tip(|| "Could not convert max_retry_buffer_per_request to u64")?, + ); return self .retrier .retry(unfold(reader, move |mut reader| async move { @@ -449,7 +452,7 @@ where .content_length(sz as i64) .body(ByteStream::from_body_1_x(BodyWrapper { reader: rx, - size: sz as u64, + size: sz, })) .send() .map_ok_or_else(|e| Err(make_err!(Code::Aborted, "{e:?}")), |_| Ok(())), @@ -539,7 +542,7 @@ where // Note: Our break condition is when we reach EOF. for part_number in 1..i32::MAX { let write_buf = reader - .consume(Some(bytes_per_upload_part)) + .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(|| "Could not convert bytes_per_upload_part to usize")?)) .await .err_tip(|| "Failed to read chunk in s3_store")?; if write_buf.is_empty() { @@ -589,10 +592,13 @@ where let mut upload_futures = FuturesUnordered::new(); - let mut completed_parts = Vec::with_capacity(cmp::min( - MAX_UPLOAD_PARTS, - (max_size / bytes_per_upload_part) + 1, - )); + let mut completed_parts = Vec::with_capacity( + usize::try_from(cmp::min( + MAX_UPLOAD_PARTS as u64, + (max_size / bytes_per_upload_part) + 1, + )) + .err_tip(|| "Could not convert u64 to usize")?, + ); tokio::pin!(read_stream_fut); loop { if read_stream_fut.is_terminated() && rx.is_empty() && upload_futures.is_empty() { @@ -671,8 +677,8 @@ where self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { if is_zero_digest(key.borrow()) { writer @@ -695,7 +701,7 @@ where .key(s3_path) .range(format!( "bytes={}-{}", - offset + writer.get_bytes_written() as usize, + offset + writer.get_bytes_written(), end_read_byte.map_or_else(String::new, |v| v.to_string()) )) .send() diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 7dc6761c9..ae2ab82f5 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -144,7 +144,7 @@ impl StoreDriver for ShardStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { if keys.len() == 1 { // Hot path: It is very common to lookup only one key. @@ -212,8 +212,8 @@ impl StoreDriver for ShardStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let store = self.get_store(&key); store diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 3d319beaf..a12976bbc 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -52,7 +52,7 @@ impl StoreDriver for SizePartitioningStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { let mut non_digest_sample = None; let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = @@ -120,8 +120,8 @@ impl StoreDriver for SizePartitioningStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = match key { StoreKey::Digest(digest) => digest, diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 9ee668b93..e4e23ae50 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -148,7 +148,7 @@ impl StoreDriver for VerifyStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.inner_store.has_with_results(digests, results).await } @@ -169,7 +169,7 @@ impl StoreDriver for VerifyStore { }; let digest_size = digest.size_bytes(); if let UploadSizeInfo::ExactSize(expected_size) = size_info { - if self.verify_size && expected_size as u64 != digest_size { + if self.verify_size && expected_size != digest_size { self.size_verification_failures.inc(); return Err(make_input_err!( "Expected size to match. Got {} but digest says {} on update", @@ -215,8 +215,8 @@ impl StoreDriver for VerifyStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { self.inner_store.get_part(key, writer, offset, length).await } diff --git a/nativelink-store/tests/ac_utils_test.rs b/nativelink-store/tests/ac_utils_test.rs index 462fa6a76..f4d50b46d 100644 --- a/nativelink-store/tests/ac_utils_test.rs +++ b/nativelink-store/tests/ac_utils_test.rs @@ -66,7 +66,7 @@ async fn upload_file_to_store_with_large_file() -> Result<(), Error> { .update_with_whole_file( digest, resumeable_file, - UploadSizeInfo::ExactSize(expected_data.len()), + UploadSizeInfo::ExactSize(expected_data.len() as u64), ) .await?; } diff --git a/nativelink-store/tests/compression_store_test.rs b/nativelink-store/tests/compression_store_test.rs index 479f4afd9..8742195c9 100644 --- a/nativelink-store/tests/compression_store_test.rs +++ b/nativelink-store/tests/compression_store_test.rs @@ -142,7 +142,7 @@ async fn partial_reads_test() -> Result<(), Error> { for read_slice_size in 0..(RAW_DATA.len() + 5) { for offset in 0..(RAW_DATA.len() + 5) { let store_data = store - .get_part_unchunked(digest, offset, Some(read_slice_size)) + .get_part_unchunked(digest, offset as u64, Some(read_slice_size as u64)) .await .err_tip(|| { format!("Failed to get from inner store at {offset} - {read_slice_size}") @@ -255,7 +255,7 @@ async fn sanity_check_zero_bytes_test() -> Result<(), Error> { #[nativelink_test] async fn check_header_test() -> Result<(), Error> { const BLOCK_SIZE: u32 = 150; - const MAX_SIZE_INPUT: usize = 1024 * 1024; // 1MB. + const MAX_SIZE_INPUT: u64 = 1024 * 1024; // 1MB. let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); let store_owned = CompressionStore::new( nativelink_config::stores::CompressionStore { diff --git a/nativelink-store/tests/dedup_store_test.rs b/nativelink-store/tests/dedup_store_test.rs index cc84c7985..a19c6a8ec 100644 --- a/nativelink-store/tests/dedup_store_test.rs +++ b/nativelink-store/tests/dedup_store_test.rs @@ -58,7 +58,7 @@ async fn simple_round_trip_test() -> Result<(), Error> { Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; let original_data = make_random_data(MEGABYTE_SZ); let digest = DigestInfo::try_new(VALID_HASH1, MEGABYTE_SZ).unwrap(); @@ -86,7 +86,7 @@ async fn check_missing_last_chunk_test() -> Result<(), Error> { &nativelink_config::stores::MemoryStore::default(), )), // Index store. Store::new(content_store.clone()), - ); + )?; let original_data = make_random_data(MEGABYTE_SZ); let digest = DigestInfo::try_new(VALID_HASH1, MEGABYTE_SZ).unwrap(); @@ -134,7 +134,7 @@ async fn fetch_part_test() -> Result<(), Error> { Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; const DATA_SIZE: usize = MEGABYTE_SZ / 4; let original_data = make_random_data(DATA_SIZE); @@ -147,7 +147,7 @@ async fn fetch_part_test() -> Result<(), Error> { const ONE_THIRD_SZ: usize = DATA_SIZE / 3; let rt_data = store - .get_part_unchunked(digest, ONE_THIRD_SZ, Some(ONE_THIRD_SZ)) + .get_part_unchunked(digest, ONE_THIRD_SZ as u64, Some(ONE_THIRD_SZ as u64)) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -186,7 +186,7 @@ async fn check_length_not_set_with_chunk_read_beyond_first_chunk_regression_test Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; const DATA_SIZE: usize = 30; let original_data = make_random_data(DATA_SIZE); @@ -200,7 +200,7 @@ async fn check_length_not_set_with_chunk_read_beyond_first_chunk_regression_test // This value must be larger than `max_size` in the config above. const START_READ_BYTE: usize = 7; let rt_data = store - .get_part_unchunked(digest, START_READ_BYTE, None) + .get_part_unchunked(digest, START_READ_BYTE as u64, None) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -238,7 +238,7 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; const DATA_SIZE: usize = 30; let original_data = make_random_data(DATA_SIZE); @@ -251,11 +251,15 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { for offset in 0..=DATA_SIZE { for len in 0..DATA_SIZE { // If reading at DATA_SIZE, we will set len to None to check that edge case. - let maybe_len = if offset == DATA_SIZE { None } else { Some(len) }; + let maybe_len = if offset == DATA_SIZE { + None + } else { + Some(len as u64) + }; let len = if maybe_len.is_none() { DATA_SIZE } else { len }; let rt_data = store - .get_part_unchunked(digest, offset, maybe_len) + .get_part_unchunked(digest, offset as u64, maybe_len) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -276,7 +280,7 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { // This value must be larger than `max_size` in the config above. const START_READ_BYTE: usize = 10; let rt_data = store - .get_part_unchunked(digest, START_READ_BYTE, None) + .get_part_unchunked(digest, START_READ_BYTE as u64, None) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -309,7 +313,7 @@ async fn has_checks_content_store() -> Result<(), Error> { &make_default_config(), Store::new(index_store.clone()), Store::new(content_store.clone()), - ); + )?; const DATA_SIZE: usize = MEGABYTE_SZ / 4; let original_data = make_random_data(DATA_SIZE); @@ -323,7 +327,7 @@ async fn has_checks_content_store() -> Result<(), Error> { { // Check to ensure we our baseline `.has()` succeeds. let size_info = store.has(digest1).await.err_tip(|| "Failed to run .has")?; - assert_eq!(size_info, Some(DATA_SIZE), "Expected sizes to match"); + assert_eq!(size_info, Some(DATA_SIZE as u64), "Expected sizes to match"); } { // There will be exactly 10 entries in our content_store based on our random seed data. @@ -339,7 +343,11 @@ async fn has_checks_content_store() -> Result<(), Error> { { // Check our recently added entry is still valid. let size_info = store.has(digest2).await.err_tip(|| "Failed to run .has")?; - assert_eq!(size_info, Some(DATA2.len()), "Expected sizes to match"); + assert_eq!( + size_info, + Some(DATA2.len() as u64), + "Expected sizes to match" + ); } { // Check our first added entry is now invalid (because part of it was evicted). @@ -370,7 +378,7 @@ async fn has_with_no_existing_index_returns_none_test() -> Result<(), Error> { &make_default_config(), Store::new(index_store.clone()), Store::new(content_store.clone()), - ); + )?; const DATA_SIZE: usize = 10; let digest = DigestInfo::try_new(VALID_HASH1, DATA_SIZE).unwrap(); diff --git a/nativelink-store/tests/existence_store_test.rs b/nativelink-store/tests/existence_store_test.rs index e48a0f6a1..5fafb9d36 100644 --- a/nativelink-store/tests/existence_store_test.rs +++ b/nativelink-store/tests/existence_store_test.rs @@ -58,7 +58,7 @@ async fn simple_exist_cache_test() -> Result<(), Error> { .has(digest) .await .err_tip(|| "Failed to check store")?, - Some(VALUE.len()), + Some(VALUE.len() as u64), "Expected digest to exist in store" ); @@ -145,20 +145,20 @@ async fn ensure_has_requests_eventually_do_let_evictions_happen() -> Result<(), MockInstantWrapped::default(), ); - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); // Now that our existence cache has been populated, remove // it from the inner store. inner_store.remove_entry(digest.into()).await; - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); // It should have been evicted from the existence cache by now. diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 96b3ccdc9..caaaa7344 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -113,10 +113,13 @@ async fn fetch_slow_store_puts_in_fast_store_test() -> Result<(), Error> { assert_eq!( fast_slow_store.has(digest).await, - Ok(Some(original_data.len())) + Ok(Some(original_data.len() as u64)) ); assert_eq!(fast_store.has(digest).await, Ok(None)); - assert_eq!(slow_store.has(digest).await, Ok(Some(original_data.len()))); + assert_eq!( + slow_store.has(digest).await, + Ok(Some(original_data.len() as u64)) + ); // This get() request should place the data in fast_store too. fast_slow_store.get_part_unchunked(digest, 0, None).await?; @@ -156,7 +159,8 @@ async fn partial_reads_copy_full_to_fast_store_test() -> Result<(), Error> { #[test] fn calculate_range_test() { - let test = |start_range, end_range| FastSlowStore::calculate_range(&start_range, &end_range); + let test = + |start_range, end_range| FastSlowStore::calculate_range(&start_range, &end_range).unwrap(); { // Exact match. let received_range = 0..1; @@ -244,12 +248,12 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { if let Some(has_digest) = self.digest { for (digest, result) in digests.iter().zip(results.iter_mut()) { if *digest == has_digest.into() { - *result = Some(has_digest.size_bytes() as usize); + *result = Some(has_digest.size_bytes()); } } } @@ -281,13 +285,13 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut nativelink_util::buf_channel::DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { // Gets called in the slow store and we provide the data that's // sent to the upstream and the fast store. - let bytes = length.unwrap_or(key.into_digest().size_bytes() as usize) - offset; - let data = vec![0_u8; bytes]; + let bytes = length.unwrap_or(key.into_digest().size_bytes()) - offset; + let data = vec![0_u8; bytes as usize]; writer.send(Bytes::copy_from_slice(&data)).await?; writer.send_eof() } @@ -350,7 +354,7 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { // Drop get_part as soon as rx.drain() completes tokio::select!( res = rx.drain() => res, - res = fast_slow_store.get_part(digest, tx, 0, Some(digest.size_bytes() as usize)) => res, + res = fast_slow_store.get_part(digest, tx, 0, Some(digest.size_bytes())) => res, ) }, async move { @@ -438,7 +442,7 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { assert_eq!( fast_slow_store.has(digest).await, - Ok(Some(data.len())), + Ok(Some(data.len() as u64)), "Expected data to exist in store" ); diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 6e0db5b18..f2b4f1b07 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -140,7 +140,7 @@ impl FileEntry for TestFileEntry< } impl LenEntry for TestFileEntry { - fn len(&self) -> usize { + fn len(&self) -> u64 { self.inner.as_ref().unwrap().len() } @@ -272,7 +272,7 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> { assert_eq!( store.has(digest).await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected filesystem store to have hash: {}", HASH1 ); @@ -1355,7 +1355,7 @@ async fn update_with_whole_file_closes_file() -> Result<(), Error> { } store - .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64)) .await?; Ok(()) } @@ -1423,7 +1423,7 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result< } store - .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64)) .await?; Ok(()) } @@ -1466,7 +1466,7 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { .ino(); let result = store - .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64)) .await?; assert!( result.is_none(), diff --git a/nativelink-store/tests/memory_store_test.rs b/nativelink-store/tests/memory_store_test.rs index 3f03cbe5d..a829ac816 100644 --- a/nativelink-store/tests/memory_store_test.rs +++ b/nativelink-store/tests/memory_store_test.rs @@ -44,15 +44,15 @@ async fn insert_one_item_then_update() -> Result<(), Error> { // Insert dummy value into store. store .update_oneshot( - DigestInfo::try_new(VALID_HASH1, VALUE1.len())?, + DigestInfo::try_new(VALID_HASH1, VALUE1.len() as u64)?, VALUE1.into(), ) .await?; assert_eq!( store - .has(DigestInfo::try_new(VALID_HASH1, VALUE1.len())?) + .has(DigestInfo::try_new(VALID_HASH1, VALUE1.len() as u64)?) .await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected memory store to have hash: {}", VALID_HASH1 ); diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 4b4df3595..dbe55cc37 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -237,7 +237,7 @@ async fn upload_and_get_data() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.len())) + .get_part_unchunked(digest, 0, Some(data.len() as u64)) .await .unwrap(); @@ -318,7 +318,7 @@ async fn upload_and_get_data_with_prefix() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.len())) + .get_part_unchunked(digest, 0, Some(data.len() as u64)) .await .unwrap(); @@ -462,7 +462,7 @@ async fn test_large_downloads_are_chunked() -> Result<(), Error> { ); let get_result: Bytes = store - .get_part_unchunked(digest, 0, Some(data.clone().len())) + .get_part_unchunked(digest, 0, Some(data.clone().len() as u64)) .await .unwrap(); @@ -553,7 +553,7 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { tokio::try_join!( async { store - .update(digest, rx, UploadSizeInfo::ExactSize(data.len())) + .update(digest, rx, UploadSizeInfo::ExactSize(data.len() as u64)) .await .unwrap(); @@ -576,7 +576,7 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.clone().len())) + .get_part_unchunked(digest, 0, Some(data.clone().len() as u64)) .await .unwrap(); diff --git a/nativelink-store/tests/ref_store_test.rs b/nativelink-store/tests/ref_store_test.rs index 85cea6a6d..d4d268bf2 100644 --- a/nativelink-store/tests/ref_store_test.rs +++ b/nativelink-store/tests/ref_store_test.rs @@ -64,7 +64,7 @@ async fn has_test() -> Result<(), Error> { .await; assert_eq!( has_result, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected ref store to have data in ref store : {}", VALID_HASH1 ); diff --git a/nativelink-store/tests/s3_store_test.rs b/nativelink-store/tests/s3_store_test.rs index 593e547f2..b2e1ac669 100644 --- a/nativelink-store/tests/s3_store_test.rs +++ b/nativelink-store/tests/s3_store_test.rs @@ -221,7 +221,7 @@ async fn simple_update_ac() -> Result<(), Error> { .update( DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?, rx, - UploadSizeInfo::ExactSize(CONTENT_LENGTH), + UploadSizeInfo::ExactSize(CONTENT_LENGTH as u64), ) .await }); @@ -350,8 +350,8 @@ async fn smoke_test_get_part() -> Result<(), Error> { store .get_part_unchunked( DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?, - OFFSET, - Some(LENGTH), + OFFSET as u64, + Some(LENGTH as u64), ) .await?; @@ -589,7 +589,7 @@ async fn ensure_empty_string_in_stream_works_test() -> Result<(), Error> { store.get_part_unchunked( DigestInfo::try_new(VALID_HASH1, CAS_ENTRY_SIZE)?, 0, - Some(CAS_ENTRY_SIZE), + Some(CAS_ENTRY_SIZE as u64), ) ); assert_eq!( diff --git a/nativelink-store/tests/shard_store_test.rs b/nativelink-store/tests/shard_store_test.rs index e75208fb5..54e328d38 100644 --- a/nativelink-store/tests/shard_store_test.rs +++ b/nativelink-store/tests/shard_store_test.rs @@ -106,7 +106,7 @@ async fn has_with_one_digest() -> Result<(), Error> { .update_oneshot(digest1, original_data.clone().into()) .await?; - assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ))); + assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ as u64))); Ok(()) } @@ -141,7 +141,7 @@ async fn has_with_many_digests_one_missing() -> Result<(), Error> { shard_store .has_many(&[digest1.into(), missing_digest.into()]) .await, - Ok(vec![Some(MEGABYTE_SZ), None]) + Ok(vec![Some(MEGABYTE_SZ as u64), None]) ); Ok(()) } @@ -165,7 +165,10 @@ async fn has_with_many_digests_both_exist() -> Result<(), Error> { shard_store .has_many(&[digest1.into(), digest2.into()]) .await, - Ok(vec![Some(original_data1.len()), Some(original_data2.len())]) + Ok(vec![ + Some(original_data1.len() as u64), + Some(original_data2.len() as u64) + ]) ); Ok(()) } @@ -253,7 +256,7 @@ async fn upload_download_has_check() -> Result<(), Error> { shard_store.get_part_unchunked(digest1, 0, None).await, Ok(original_data1.into()) ); - assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ))); + assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ as u64))); Ok(()) } @@ -268,7 +271,7 @@ async fn weights_send_to_proper_store() -> Result<(), Error> { .update_oneshot(digest1, original_data1.clone().into()) .await?; - assert_eq!(stores[0].has(digest1).await, Ok(Some(MEGABYTE_SZ))); + assert_eq!(stores[0].has(digest1).await, Ok(Some(MEGABYTE_SZ as u64))); assert_eq!(stores[1].has(digest1).await, Ok(None)); Ok(()) } diff --git a/nativelink-store/tests/size_partitioning_store_test.rs b/nativelink-store/tests/size_partitioning_store_test.rs index 216e75ca2..0339121ec 100644 --- a/nativelink-store/tests/size_partitioning_store_test.rs +++ b/nativelink-store/tests/size_partitioning_store_test.rs @@ -84,7 +84,7 @@ async fn has_test() -> Result<(), Error> { .await; assert_eq!( small_has_result, - Ok(Some(SMALL_VALUE.len())), + Ok(Some(SMALL_VALUE.len() as u64)), "Expected size part store to have data in ref store : {}", SMALL_HASH ); @@ -96,7 +96,7 @@ async fn has_test() -> Result<(), Error> { .await; assert_eq!( small_has_result, - Ok(Some(BIG_VALUE.len())), + Ok(Some(BIG_VALUE.len() as u64)), "Expected size part store to have data in ref store : {}", BIG_HASH ); diff --git a/nativelink-store/tests/verify_store_test.rs b/nativelink-store/tests/verify_store_test.rs index d3c24b030..8382d48ba 100644 --- a/nativelink-store/tests/verify_store_test.rs +++ b/nativelink-store/tests/verify_store_test.rs @@ -55,7 +55,7 @@ async fn verify_size_false_passes_on_update() -> Result<(), Error> { ); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -121,7 +121,7 @@ async fn verify_size_true_suceeds_on_update() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -187,7 +187,7 @@ async fn verify_sha256_hash_true_suceeds_on_update() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE.len())), + Ok(Some(VALUE.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -256,7 +256,7 @@ async fn verify_blake3_hash_true_suceeds_on_update() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE.len())), + Ok(Some(VALUE.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -374,7 +374,7 @@ async fn verify_size_and_hash_suceeds_on_small_data() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE.len())), + Ok(Some(VALUE.len() as u64)), "Expected data to exist in store after update" ); Ok(()) diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs index 75f8c3e95..983acab93 100644 --- a/nativelink-util/src/buf_channel.rs +++ b/nativelink-util/src/buf_channel.rs @@ -266,8 +266,8 @@ impl DropCloserReadHalf { /// Sets the maximum size of the recent_data buffer. If the number of bytes /// received exceeds this size, the recent_data buffer will be cleared and /// no longer populated. - pub fn set_max_recent_data_size(&mut self, size: usize) { - self.max_recent_data_size = size as u64; + pub fn set_max_recent_data_size(&mut self, size: u64) { + self.max_recent_data_size = size; } /// Attempts to reset the stream to before any data was received. This will diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 90a1d5597..09e22d897 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -45,7 +45,7 @@ struct EvictionItem { pub trait LenEntry: 'static { /// Length of referenced data. - fn len(&self) -> usize; + fn len(&self) -> u64; /// Returns `true` if `self` has zero length. fn is_empty(&self) -> bool; @@ -77,7 +77,7 @@ pub trait LenEntry: 'static { impl LenEntry for Arc { #[inline] - fn len(&self) -> usize { + fn len(&self) -> u64 { T::len(self.as_ref()) } @@ -126,13 +126,13 @@ impl State if let Some(btree) = &mut self.btree { btree.remove(key.borrow()); } - self.sum_store_size -= eviction_item.data.len() as u64; + self.sum_store_size -= eviction_item.data.len(); if replaced { self.replaced_items.inc(); - self.replaced_bytes.add(eviction_item.data.len() as u64); + self.replaced_bytes.add(eviction_item.data.len()); } else { self.evicted_items.inc(); - self.evicted_bytes.add(eviction_item.data.len() as u64); + self.evicted_bytes.add(eviction_item.data.len()); } // Note: See comment in `unref()` requring global lock of insert/remove. eviction_item.data.unref().await; @@ -210,7 +210,7 @@ where /// and return the number of items that were processed. /// The `handler` function should return `true` to continue processing the next item /// or `false` to stop processing. - pub async fn range(&self, prefix_range: impl RangeBounds, mut handler: F) -> usize + pub async fn range(&self, prefix_range: impl RangeBounds, mut handler: F) -> u64 where F: FnMut(&K, &T) -> bool, K: Borrow + Ord, @@ -300,7 +300,7 @@ where } /// Return the size of a `key`, if not found `None` is returned. - pub async fn size_for_key(&self, key: &Q) -> Option + pub async fn size_for_key(&self, key: &Q) -> Option where K: Borrow, Q: Ord + Hash + Eq + Debug, @@ -316,12 +316,8 @@ where /// If no key is found in the internal map, `None` is filled in its place. /// If `peek` is set to `true`, the items are not promoted to the front of the /// LRU cache. Note: peek may still evict, but won't promote. - pub async fn sizes_for_keys( - &self, - keys: It, - results: &mut [Option], - peek: bool, - ) where + pub async fn sizes_for_keys(&self, keys: It, results: &mut [Option], peek: bool) + where It: IntoIterator, // This may look strange, but what we are doing is saying: // * `K` must be able to borrow `Q` @@ -428,7 +424,7 @@ where ) -> Vec { let mut replaced_items = Vec::new(); for (key, data) in inserts.into_iter() { - let new_item_size = data.len() as u64; + let new_item_size = data.len(); let eviction_item = EvictionItem { seconds_since_anchor, data, diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index def77d2a8..fcd29a66e 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -63,13 +63,13 @@ pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> { pub enum UploadSizeInfo { /// When the data transfer amount is known to be exact size, this enum should be used. /// The receiver store can use this to better optimize the way the data is sent or stored. - ExactSize(usize), + ExactSize(u64), /// When the data transfer amount is not known to be exact, the caller should use this enum /// to provide the maximum size that could possibly be sent. This will bypass the exact size /// checks, but still provide useful information to the underlying store about the data being /// sent that it can then use to optimize the upload process. - MaxSize(usize), + MaxSize(u64), } /// Utility to send all the data to the store from a file. @@ -382,7 +382,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn has<'a>( &'a self, digest: impl Into>, - ) -> impl Future, Error>> + 'a { + ) -> impl Future, Error>> + 'a { self.as_store_driver_pin().has(digest.into()) } @@ -394,7 +394,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn has_many<'a>( &'a self, digests: &'a [StoreKey<'a>], - ) -> impl Future>, Error>> + Send + 'a { + ) -> impl Future>, Error>> + Send + 'a { self.as_store_driver_pin().has_many(digests) } @@ -404,7 +404,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn has_with_results<'a>( &'a self, digests: &'a [StoreKey<'a>], - results: &'a mut [Option], + results: &'a mut [Option], ) -> impl Future> + Send + 'a { self.as_store_driver_pin() .has_with_results(digests, results) @@ -420,7 +420,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { &'a self, range: impl RangeBounds> + Send + 'b, mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a, - ) -> impl Future> + Send + 'a + ) -> impl Future> + Send + 'a where 'b: 'a, { @@ -490,8 +490,8 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { &'a self, digest: impl Into>, mut writer: impl BorrowMut + Send + 'a, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> impl Future> + Send + 'a { let key = digest.into(); // Note: We need to capture `writer` just in case the caller @@ -520,8 +520,8 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn get_part_unchunked<'a>( &'a self, key: impl Into>, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> impl Future> + Send + 'a { self.as_store_driver_pin() .get_part_unchunked(key.into(), offset, length) @@ -544,7 +544,7 @@ pub trait StoreDriver: { /// See: [`StoreLike::has`] for details. #[inline] - async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { + async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { let mut result = [None]; self.has_with_results(&[key], &mut result).await?; Ok(result[0]) @@ -555,7 +555,7 @@ pub trait StoreDriver: async fn has_many( self: Pin<&Self>, digests: &[StoreKey<'_>], - ) -> Result>, Error> { + ) -> Result>, Error> { let mut results = vec![None; digests.len()]; self.has_with_results(digests, &mut results).await?; Ok(results) @@ -565,7 +565,7 @@ pub trait StoreDriver: async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error>; /// See: [`StoreLike::list`] for details. @@ -573,7 +573,7 @@ pub trait StoreDriver: self: Pin<&Self>, _range: (Bound>, Bound>), _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), - ) -> Result { + ) -> Result { // TODO(allada) We should force all stores to implement this function instead of // providing a default implementation. Err(make_err!( @@ -623,7 +623,8 @@ pub trait StoreDriver: // that can take objects already fully in memory instead? let (mut tx, rx) = make_buf_channel_pair(); - let data_len = data.len(); + let data_len = + u64::try_from(data.len()).err_tip(|| "Could not convert data.len() to u64")?; let send_fut = async move { // Only send if we are not EOF. if !data.is_empty() { @@ -647,8 +648,8 @@ pub trait StoreDriver: self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error>; /// See: [`StoreLike::get`] for details. @@ -665,16 +666,20 @@ pub trait StoreDriver: async fn get_part_unchunked( self: Pin<&Self>, key: StoreKey<'_>, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result { + let length_usize = length + .map(|v| usize::try_from(v).err_tip(|| "Could not convert length to usize")) + .transpose()?; + // TODO(blaise.bruer) This is extremely inefficient, since we have exactly // what we need here. Maybe we could instead make a version of the stream // that can take objects already fully in memory instead? let (mut tx, mut rx) = make_buf_channel_pair(); let (data_res, get_part_res) = join!( - rx.consume(length), + rx.consume(length_usize), // We use a closure here to ensure that the `tx` is dropped when the // future is done. async move { self.get_part(key, &mut tx, offset, length).await }, @@ -704,7 +709,7 @@ pub trait StoreDriver: let mut digest_hasher = default_digest_hasher_func().hasher(); digest_hasher.update(&digest_data); - let digest_data_len = digest_data.len(); + let digest_data_len = digest_data.len() as u64; let digest_info = StoreKey::from(digest_hasher.finalize_digest()); let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data); diff --git a/nativelink-util/tests/evicting_map_test.rs b/nativelink-util/tests/evicting_map_test.rs index 1e33cbc66..f02f8a21d 100644 --- a/nativelink-util/tests/evicting_map_test.rs +++ b/nativelink-util/tests/evicting_map_test.rs @@ -31,8 +31,8 @@ pub struct BytesWrapper(Bytes); impl LenEntry for BytesWrapper { #[inline] - fn len(&self) -> usize { - Bytes::len(&self.0) + fn len(&self) -> u64 { + Bytes::len(&self.0) as u64 } #[inline] @@ -152,14 +152,14 @@ async fn insert_purges_at_max_bytes() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH3, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 3" ); assert_eq!( evicting_map .size_for_key(&DigestInfo::try_new(HASH4, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 4" ); @@ -216,7 +216,7 @@ async fn insert_purges_to_low_watermark_at_max_bytes() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH4, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 4" ); @@ -263,21 +263,21 @@ async fn insert_purges_at_max_seconds() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH2, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 2" ); assert_eq!( evicting_map .size_for_key(&DigestInfo::try_new(HASH3, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 3" ); assert_eq!( evicting_map .size_for_key(&DigestInfo::try_new(HASH4, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 4" ); @@ -329,7 +329,7 @@ async fn get_refreshes_time() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH3, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 3" ); @@ -345,7 +345,7 @@ async fn unref_called_on_replace() -> Result<(), Error> { } impl LenEntry for MockEntry { - fn len(&self) -> usize { + fn len(&self) -> u64 { // Note: We are not testing this functionality. 0 } diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 570ea22fe..ebc0c9114 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -287,7 +287,7 @@ async fn upload_file( .update_with_whole_file( digest.into(), resumeable_file, - UploadSizeInfo::ExactSize(digest.size_bytes() as usize), + UploadSizeInfo::ExactSize(digest.size_bytes()), ) .await .err_tip(|| format!("for {full_path:?}"))?;