diff --git a/nativelink-config/src/stores.rs b/nativelink-config/src/stores.rs index fd4bcfb2f..1486e30ec 100644 --- a/nativelink-config/src/stores.rs +++ b/nativelink-config/src/stores.rs @@ -763,12 +763,12 @@ pub struct S3Store { /// upload will be aborted and the client will likely receive an error. /// /// Default: 5MB. - pub max_retry_buffer_per_request: Option, + pub max_retry_buffer_per_request: Option, /// Maximum number of concurrent UploadPart requests per MultipartUpload. /// /// Default: 10. - pub multipart_max_concurrent_uploads: Option, + pub multipart_max_concurrent_uploads: Option, /// Allow unencrypted HTTP connections. Only use this for local testing. /// diff --git a/nativelink-scheduler/tests/utils/mock_scheduler.rs b/nativelink-scheduler/tests/utils/mock_scheduler.rs index fe0e37035..113a3d25c 100644 --- a/nativelink-scheduler/tests/utils/mock_scheduler.rs +++ b/nativelink-scheduler/tests/utils/mock_scheduler.rs @@ -17,12 +17,10 @@ use std::sync::Arc; use async_trait::async_trait; use nativelink_error::{make_input_err, Error}; use nativelink_metric::{MetricsComponent, RootMetricsComponent}; -use nativelink_util::{ - action_messages::{ActionInfo, OperationId}, - known_platform_property_provider::KnownPlatformPropertyProvider, - operation_state_manager::{ - ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, - }, +use nativelink_util::action_messages::{ActionInfo, OperationId}; +use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider; +use nativelink_util::operation_state_manager::{ + ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter, }; use tokio::sync::{mpsc, Mutex}; diff --git a/nativelink-service/src/bytestream_server.rs b/nativelink-service/src/bytestream_server.rs index bb5ab6c91..39c817f14 100644 --- a/nativelink-service/src/bytestream_server.rs +++ b/nativelink-service/src/bytestream_server.rs @@ -249,13 +249,7 @@ impl ByteStreamServer { // `store` to ensure its lifetime follows the future and not the caller. store // Bytestream always uses digest size as the actual byte size. - .update( - digest, - rx, - UploadSizeInfo::ExactSize( - usize::try_from(digest.size_bytes()).err_tip(|| "Invalid digest size")?, - ), - ) + .update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes())) .await }); Ok(ActiveStreamGuard { @@ -275,8 +269,8 @@ impl ByteStreamServer { digest: DigestInfo, read_request: ReadRequest, ) -> Result, Error> { - let read_limit = usize::try_from(read_request.read_limit) - .err_tip(|| "read_limit has is not convertible to usize")?; + let read_limit = u64::try_from(read_request.read_limit) + .err_tip(|| "Could not convert read_limit to u64")?; let (tx, rx) = make_buf_channel_pair(); @@ -300,7 +294,13 @@ impl ByteStreamServer { maybe_get_part_result: None, get_part_fut: Box::pin(async move { store - .get_part(digest, tx, read_request.read_offset as usize, read_limit) + .get_part( + digest, + tx, + u64::try_from(read_request.read_offset) + .err_tip(|| "Could not convert read_offset to u64")?, + read_limit, + ) .await }), }); diff --git a/nativelink-store/src/ac_utils.rs b/nativelink-store/src/ac_utils.rs index 2915fb55d..c17ab649d 100644 --- a/nativelink-store/src/ac_utils.rs +++ b/nativelink-store/src/ac_utils.rs @@ -50,7 +50,7 @@ pub async fn get_and_decode_digest( pub async fn get_size_and_decode_digest( store: &impl StoreLike, key: impl Into>, -) -> Result<(T, usize), Error> { +) -> Result<(T, u64), Error> { let key = key.into(); // Note: For unknown reasons we appear to be hitting: // https://github.com/rust-lang/rust/issues/92096 @@ -58,7 +58,7 @@ pub async fn get_size_and_decode_digest( // are using the store driver function here. let mut store_data_resp = store .as_store_driver_pin() - .get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE)) + .get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE as u64)) .await; if let Err(err) = &mut store_data_resp { if err.code == Code::NotFound { @@ -69,7 +69,8 @@ pub async fn get_size_and_decode_digest( } } let store_data = store_data_resp?; - let store_data_len = store_data.len(); + let store_data_len = + u64::try_from(store_data.len()).err_tip(|| "Could not convert store_data.len() to u64")?; T::decode(store_data) .err_tip_with_code(|e| { diff --git a/nativelink-store/src/completeness_checking_store.rs b/nativelink-store/src/completeness_checking_store.rs index de6343d6f..4ccb2bc50 100644 --- a/nativelink-store/src/completeness_checking_store.rs +++ b/nativelink-store/src/completeness_checking_store.rs @@ -132,12 +132,12 @@ impl CompletenessCheckingStore { async fn inner_has_with_results( &self, action_result_digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // Holds shared state between the different futures. // This is how get around lifetime issues. struct State<'a> { - results: &'a mut [Option], + results: &'a mut [Option], digests_to_check: Vec>, digests_to_check_idxs: Vec, notify: Arc, @@ -342,7 +342,7 @@ impl StoreDriver for CompletenessCheckingStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.inner_has_with_results(keys, results).await } @@ -360,8 +360,8 @@ impl StoreDriver for CompletenessCheckingStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let results = &mut [None]; self.inner_has_with_results(&[key.borrow()], results) diff --git a/nativelink-store/src/compression_store.rs b/nativelink-store/src/compression_store.rs index fb3884015..7f7c259fb 100644 --- a/nativelink-store/src/compression_store.rs +++ b/nativelink-store/src/compression_store.rs @@ -42,7 +42,7 @@ pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1; // Default block size that will be used to slice stream into. pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024; -const U32_SZ: usize = std::mem::size_of::(); +const U32_SZ: u64 = std::mem::size_of::() as u64; type BincodeOptions = WithOtherIntEncoding; @@ -145,25 +145,25 @@ pub struct Footer { /// lz4_flex::block::get_maximum_output_size() way over estimates, so we use the /// one provided here: https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61 /// Local testing shows this gives quite accurate worst case given random input. -fn lz4_compress_bound(input_size: usize) -> usize { +fn lz4_compress_bound(input_size: u64) -> u64 { input_size + (input_size / 255) + 16 } struct UploadState { header: Header, footer: Footer, - max_output_size: usize, - input_max_size: usize, + max_output_size: u64, + input_max_size: u64, } impl UploadState { - pub fn new(store: &CompressionStore, upload_size: UploadSizeInfo) -> Self { + pub fn new(store: &CompressionStore, upload_size: UploadSizeInfo) -> Result { let input_max_size = match upload_size { UploadSizeInfo::ExactSize(sz) => sz, UploadSizeInfo::MaxSize(sz) => sz, }; - let max_index_count = (input_max_size / store.config.block_size as usize) + 1; + let max_index_count = (input_max_size / store.config.block_size as u64) + 1; let header = Header { version: CURRENT_STREAM_FORMAT_VERSION, @@ -177,7 +177,8 @@ impl UploadState { SliceIndex { ..Default::default() }; - max_index_count + usize::try_from(max_index_count) + .err_tip(|| "Could not convert max_index_count to usize")? ], index_count: max_index_count as u32, uncompressed_data_size: 0, // Updated later. @@ -186,22 +187,22 @@ impl UploadState { }; // This is more accurate of an estimate than what get_maximum_output_size calculates. - let max_block_size = lz4_compress_bound(store.config.block_size as usize) + U32_SZ + 1; + let max_block_size = lz4_compress_bound(store.config.block_size as u64) + U32_SZ + 1; let max_output_size = { - let header_size = store.bincode_options.serialized_size(&header).unwrap() as usize; + let header_size = store.bincode_options.serialized_size(&header).unwrap(); let max_content_size = max_block_size * max_index_count; let max_footer_size = - U32_SZ + 1 + store.bincode_options.serialized_size(&footer).unwrap() as usize; + U32_SZ + 1 + store.bincode_options.serialized_size(&footer).unwrap(); header_size + max_content_size + max_footer_size }; - Self { + Ok(Self { header, footer, max_output_size, input_max_size, - } + }) } } @@ -246,7 +247,7 @@ impl StoreDriver for CompressionStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.inner_store.has_with_results(digests, results).await } @@ -257,7 +258,7 @@ impl StoreDriver for CompressionStore { mut reader: DropCloserReadHalf, upload_size: UploadSizeInfo, ) -> Result<(), Error> { - let mut output_state = UploadState::new(&self, upload_size); + let mut output_state = UploadState::new(&self, upload_size)?; let (mut tx, rx) = make_buf_channel_pair(); @@ -307,7 +308,8 @@ impl StoreDriver for CompressionStore { break; // EOF. } - received_amt += chunk.len(); + received_amt += u64::try_from(chunk.len()) + .err_tip(|| "Could not convert chunk.len() to u64")?; error_if!( received_amt > output_state.input_max_size, "Got more data than stated in compression store upload request" @@ -360,7 +362,7 @@ impl StoreDriver for CompressionStore { }, ); output_state.footer.index_count = output_state.footer.indexes.len() as u32; - output_state.footer.uncompressed_data_size = received_amt as u64; + output_state.footer.uncompressed_data_size = received_amt; { // Write Footer. let serialized_footer = self @@ -392,8 +394,8 @@ impl StoreDriver for CompressionStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { if is_zero_digest(key.borrow()) { writer @@ -402,7 +404,6 @@ impl StoreDriver for CompressionStore { return Ok(()); } - let offset = offset as u64; let (tx, mut rx) = make_buf_channel_pair(); let inner_store = self.inner_store.clone(); @@ -474,7 +475,7 @@ impl StoreDriver for CompressionStore { let mut frame_sz = chunk.get_u32_le(); let mut uncompressed_data_sz: u64 = 0; - let mut remaining_bytes_to_send: u64 = length.unwrap_or(usize::MAX) as u64; + let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX); let mut chunks_count: u32 = 0; while frame_type != FOOTER_FRAME_TYPE { error_if!( diff --git a/nativelink-store/src/dedup_store.rs b/nativelink-store/src/dedup_store.rs index d497451e6..ebacad27d 100644 --- a/nativelink-store/src/dedup_store.rs +++ b/nativelink-store/src/dedup_store.rs @@ -34,9 +34,9 @@ use tracing::{event, Level}; // NOTE: If these change update the comments in `stores.rs` to reflect // the new defaults. -const DEFAULT_MIN_SIZE: usize = 64 * 1024; -const DEFAULT_NORM_SIZE: usize = 256 * 1024; -const DEFAULT_MAX_SIZE: usize = 512 * 1024; +const DEFAULT_MIN_SIZE: u64 = 64 * 1024; +const DEFAULT_NORM_SIZE: u64 = 256 * 1024; +const DEFAULT_MAX_SIZE: u64 = 512 * 1024; const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10; #[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)] @@ -61,37 +61,42 @@ impl DedupStore { config: &nativelink_config::stores::DedupStore, index_store: Store, content_store: Store, - ) -> Arc { + ) -> Result, Error> { let min_size = if config.min_size == 0 { DEFAULT_MIN_SIZE } else { - config.min_size as usize + config.min_size as u64 }; let normal_size = if config.normal_size == 0 { DEFAULT_NORM_SIZE } else { - config.normal_size as usize + config.normal_size as u64 }; let max_size = if config.max_size == 0 { DEFAULT_MAX_SIZE } else { - config.max_size as usize + config.max_size as u64 }; let max_concurrent_fetch_per_get = if config.max_concurrent_fetch_per_get == 0 { DEFAULT_MAX_CONCURRENT_FETCH_PER_GET } else { config.max_concurrent_fetch_per_get as usize }; - Arc::new(Self { + Ok(Arc::new(Self { index_store, content_store, - fast_cdc_decoder: FastCDC::new(min_size, normal_size, max_size), + fast_cdc_decoder: FastCDC::new( + usize::try_from(min_size).err_tip(|| "Could not convert min_size to usize")?, + usize::try_from(normal_size) + .err_tip(|| "Could not convert normal_size to usize")?, + usize::try_from(max_size).err_tip(|| "Could not convert max_size to usize")?, + ), max_concurrent_fetch_per_get, bincode_options: DefaultOptions::new().with_fixint_encoding(), - }) + })) } - async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { + async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { // First we need to load the index that contains where the individual parts actually // can be fetched from. let index_entries = { @@ -148,7 +153,7 @@ impl StoreDriver for DedupStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { digests .iter() @@ -225,8 +230,8 @@ impl StoreDriver for DedupStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { // Special case for if a client tries to read zero bytes. if length == Some(0) { @@ -255,7 +260,7 @@ impl StoreDriver for DedupStore { })? }; - let mut start_byte_in_stream: usize = 0; + let mut start_byte_in_stream: u64 = 0; let entries = { if offset == 0 && length.is_none() { index_entries.entries @@ -264,8 +269,7 @@ impl StoreDriver for DedupStore { let mut entries = Vec::with_capacity(index_entries.entries.len()); for entry in index_entries.entries { let first_byte = current_entries_sum; - let entry_size = usize::try_from(entry.size_bytes()) - .err_tip(|| "Failed to convert to usize in DedupStore")?; + let entry_size = entry.size_bytes(); current_entries_sum += entry_size; // Filter any items who's end byte is before the first requested byte. if current_entries_sum <= offset { @@ -308,8 +312,10 @@ impl StoreDriver for DedupStore { // In the event any of these error, we will abort early and abandon all the rest of the // streamed data. // Note: Need to take special care to ensure we send the proper slice of data requested. - let mut bytes_to_skip = offset - start_byte_in_stream; - let mut bytes_to_send = length.unwrap_or(usize::MAX - offset); + let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream) + .err_tip(|| "Could not convert (offset - start_byte_in_stream) to usize")?; + let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset)) + .err_tip(|| "Could not convert length to usize")?; while let Some(result) = entries_stream.next().await { let mut data = result.err_tip(|| "Inner store iterator closed early in DedupStore")?; assert!( diff --git a/nativelink-store/src/default_store_factory.rs b/nativelink-store/src/default_store_factory.rs index 0781ecf66..4d89a675f 100644 --- a/nativelink-store/src/default_store_factory.rs +++ b/nativelink-store/src/default_store_factory.rs @@ -66,7 +66,7 @@ pub fn store_factory<'a>( config, store_factory(&config.index_store, store_manager, None).await?, store_factory(&config.content_store, store_manager, None).await?, - ), + )?, StoreConfig::existence_cache(config) => ExistenceCacheStore::new( config, store_factory(&config.backend, store_manager, None).await?, diff --git a/nativelink-store/src/existence_cache_store.rs b/nativelink-store/src/existence_cache_store.rs index 43b925fbf..c28bdd4c1 100644 --- a/nativelink-store/src/existence_cache_store.rs +++ b/nativelink-store/src/existence_cache_store.rs @@ -85,7 +85,7 @@ impl ExistenceCacheStore { async fn inner_has_with_results( self: Pin<&Self>, keys: &[DigestInfo], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.existence_cache .sizes_for_keys(keys, results, true /* peek */) @@ -117,7 +117,7 @@ impl ExistenceCacheStore { .iter() .zip(inner_results.iter()) .filter_map(|(key, result)| { - result.map(|size| (key.borrow().into_digest(), ExistanceItem(size))) + result.map(|size| (key.borrow().into_digest(), ExistanceItem(size as usize))) }) .collect::>(); let _ = self.existence_cache.insert_many(inserts).await; @@ -151,7 +151,7 @@ impl StoreDriver for ExistenceCacheStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(allada) This is a bit of a hack to get around the lifetime issues with the // existence_cache. We need to convert the digests to owned values to be able to @@ -187,6 +187,7 @@ impl StoreDriver for ExistenceCacheStore { let result = self.inner_store.update(digest, reader, size_info).await; if result.is_ok() { if let UploadSizeInfo::ExactSize(size) = size_info { + let size = usize::try_from(size).err_tip(|| "Could not convert size to usize")?; let _ = self .existence_cache .insert(digest, ExistanceItem(size)) @@ -200,8 +201,8 @@ impl StoreDriver for ExistenceCacheStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = key.into_digest(); let result = self @@ -210,7 +211,7 @@ impl StoreDriver for ExistenceCacheStore { .await; if result.is_ok() { let size = usize::try_from(digest.size_bytes()) - .err_tip(|| "Could not convert size_bytes in ExistenceCacheStore::get_part")?; + .err_tip(|| "Could not convert digest.size_bytes() to usize")?; let _ = self .existence_cache .insert(digest, ExistanceItem(size)) diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs index 3ce160e36..3d01fd5fa 100644 --- a/nativelink-store/src/fast_slow_store.rs +++ b/nativelink-store/src/fast_slow_store.rs @@ -107,21 +107,26 @@ impl FastSlowStore { // TODO(allada) This should be put into utils, as this logic is used // elsewhere in the code. pub fn calculate_range( - received_range: &Range, - send_range: &Range, - ) -> Option> { + received_range: &Range, + send_range: &Range, + ) -> Result>, Error> { // Protect against subtraction overflow. if received_range.start >= received_range.end { - return None; + return Ok(None); } let start = max(received_range.start, send_range.start); let end = min(received_range.end, send_range.end); if received_range.contains(&start) && received_range.contains(&(end - 1)) { // Offset both to the start of the received_range. - Some(start - received_range.start..end - received_range.start) + Ok(Some( + usize::try_from(start - received_range.start) + .err_tip(|| "Could not convert (start - received_range.start) to usize")? + ..usize::try_from(end - received_range.start) + .err_tip(|| "Could not convert (end - received_range.start) to usize")?, + )) } else { - None + Ok(None) } } } @@ -131,7 +136,7 @@ impl StoreDriver for FastSlowStore { async fn has_with_results( self: Pin<&Self>, key: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // If our slow store is a noop store, it'll always return a 404, // so only check the fast store in such case. @@ -282,8 +287,8 @@ impl StoreDriver for FastSlowStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { // TODO(blaise.bruer) Investigate if we should maybe ignore errors here instead of // forwarding the up. @@ -316,8 +321,8 @@ impl StoreDriver for FastSlowStore { .slow_store_hit_count .fetch_add(1, Ordering::Acquire); - let send_range = offset..length.map_or(usize::MAX, |length| length + offset); - let mut bytes_received: usize = 0; + let send_range = offset..length.map_or(u64::MAX, |length| length + offset); + let mut bytes_received: u64 = 0; let (mut fast_tx, fast_rx) = make_buf_channel_pair(); let (slow_tx, mut slow_rx) = make_buf_channel_pair(); @@ -335,19 +340,21 @@ impl StoreDriver for FastSlowStore { let fast_res = fast_tx.send_eof(); return Ok::<_, Error>((fast_res, writer_pin)); } + let output_buf_len = u64::try_from(output_buf.len()) + .err_tip(|| "Could not output_buf.len() to u64")?; self.metrics .slow_store_downloaded_bytes - .fetch_add(output_buf.len() as u64, Ordering::Acquire); + .fetch_add(output_buf_len, Ordering::Acquire); let writer_fut = if let Some(range) = Self::calculate_range( - &(bytes_received..bytes_received + output_buf.len()), + &(bytes_received..bytes_received + output_buf_len), &send_range, - ) { + )? { writer_pin.send(output_buf.slice(range)).right_future() } else { futures::future::ready(Ok(())).left_future() }; - bytes_received += output_buf.len(); + bytes_received += output_buf_len; let (fast_tx_res, writer_res) = join!(fast_tx.send(output_buf), writer_fut); fast_tx_res.err_tip(|| "Failed to write to fast store in fast_slow store")?; diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs index 9aedba07d..e62e703ce 100644 --- a/nativelink-store/src/filesystem_store.rs +++ b/nativelink-store/src/filesystem_store.rs @@ -44,7 +44,7 @@ use tracing::{event, Level}; use crate::cas_utils::is_zero_digest; // Default size to allocate memory of the buffer when reading files. -const DEFAULT_BUFF_SIZE: usize = 32 * 1024; +const DEFAULT_BUFF_SIZE: u64 = 32 * 1024; // Default block size of all major filesystems is 4KB const DEFAULT_BLOCK_SIZE: u64 = 4 * 1024; @@ -400,7 +400,7 @@ pub fn digest_from_filename(file_name: &str) -> Result { /// The number of files to read the metadata for at the same time when running /// add_files_to_cache. -const SIMULTANEOUS_METADATA_READS: usize = 200; +const SIMULTANEOUS_METADATA_READS: u64 = 200; async fn add_files_to_cache( evicting_map: &EvictingMap, SystemTime>, @@ -471,7 +471,7 @@ async fn add_files_to_cache( }; Result::<(String, SystemTime, u64), Error>::Ok((file_name, atime, metadata.len())) }) - .buffer_unordered(SIMULTANEOUS_METADATA_READS) + .buffer_unordered(SIMULTANEOUS_METADATA_READS as usize) .try_collect() .await? }; @@ -528,7 +528,7 @@ pub struct FilesystemStore { #[metric(help = "Block size of the configured filesystem")] block_size: u64, #[metric(help = "Size of the configured read buffer size")] - read_buffer_size: usize, + read_buffer_size: u64, weak_self: Weak, sleep_fn: fn(Duration) -> Sleep, rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>, @@ -577,7 +577,7 @@ impl FilesystemStore { let read_buffer_size = if config.read_buffer_size == 0 { DEFAULT_BUFF_SIZE } else { - config.read_buffer_size as usize + config.read_buffer_size as u64 }; Ok(Arc::new_cyclic(|weak_self| Self { shared_context, @@ -729,7 +729,7 @@ impl StoreDriver for FilesystemStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(allada) This is a bit of a hack to get around the lifetime issues with the // existence_cache. We need to convert the digests to owned values to be able to @@ -799,7 +799,7 @@ impl StoreDriver for FilesystemStore { let digest = key.into_digest(); let path = file.get_path().as_os_str().to_os_string(); let file_size = match upload_size { - UploadSizeInfo::ExactSize(size) => size as u64, + UploadSizeInfo::ExactSize(size) => size, UploadSizeInfo::MaxSize(_) => file .as_reader() .await @@ -835,8 +835,8 @@ impl StoreDriver for FilesystemStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = key.into_digest(); if is_zero_digest(digest) { @@ -853,11 +853,14 @@ impl StoreDriver for FilesystemStore { self.evicting_map.get(&digest).await.ok_or_else(|| { make_err!(Code::NotFound, "{digest} not found in filesystem store") })?; - let read_limit = length.unwrap_or(usize::MAX) as u64; - let mut resumeable_temp_file = entry.read_file_part(offset as u64, read_limit).await?; + let read_limit = length.unwrap_or(u64::MAX); + let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?; loop { - let mut buf = BytesMut::with_capacity(self.read_buffer_size); + let mut buf = BytesMut::with_capacity( + usize::try_from(self.read_buffer_size) + .err_tip(|| "Could not convert read_buffer_size to usize")?, + ); resumeable_temp_file .as_reader() .await diff --git a/nativelink-store/src/grpc_store.rs b/nativelink-store/src/grpc_store.rs index cbbe1a079..27f760759 100644 --- a/nativelink-store/src/grpc_store.rs +++ b/nativelink-store/src/grpc_store.rs @@ -449,8 +449,8 @@ impl GrpcStore { &self, digest: DigestInfo, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let action_result = self .get_action_result_from_digest(digest) @@ -465,11 +465,18 @@ impl GrpcStore { .encode(&mut value) .err_tip(|| "Could not encode upstream action result")?; - let default_len = value.len() - offset; + let default_len = + u64::try_from(value.len()).err_tip(|| "Could not convert value.len() to u64")? - offset; let length = length.unwrap_or(default_len).min(default_len); if length > 0 { writer - .send(value.freeze().slice(offset..(offset + length))) + .send( + value.freeze().slice( + usize::try_from(offset).err_tip(|| "Could not convert offset to usize")? + ..usize::try_from(offset + length) + .err_tip(|| "Could not convert (offset + length) to usize")?, + ), + ) .await .err_tip(|| "Failed to write data in grpc store")?; } @@ -510,7 +517,7 @@ impl StoreDriver for GrpcStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { if matches!(self.store_type, nativelink_config::stores::StoreType::ac) { keys.iter() @@ -521,7 +528,7 @@ impl StoreDriver for GrpcStore { // hope that we detect incorrect usage. self.get_action_result_from_digest(key.borrow().into_digest()) .await?; - *result = Some(usize::MAX); + *result = Some(u64::MAX); Ok::<_, Error>(()) }) .collect::>() @@ -565,7 +572,7 @@ impl StoreDriver for GrpcStore { { match missing_digests.binary_search(&digest) { Ok(_) => *result = None, - Err(_) => *result = Some(usize::try_from(digest.size_bytes())?), + Err(_) => *result = Some(digest.size_bytes()), } } @@ -655,8 +662,8 @@ impl StoreDriver for GrpcStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = key.into_digest(); if matches!(self.store_type, nativelink_config::stores::StoreType::ac) { diff --git a/nativelink-store/src/memory_store.rs b/nativelink-store/src/memory_store.rs index 26642ff7b..ff0fc8dd3 100644 --- a/nativelink-store/src/memory_store.rs +++ b/nativelink-store/src/memory_store.rs @@ -113,8 +113,8 @@ impl StoreSubscriptionItem for MemoryStoreSubscriptionItem { async fn get_part( &self, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let store = self .store @@ -177,7 +177,7 @@ impl StoreDriver for MemoryStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(allada): This is a dirty hack to get around the lifetime issues with the // evicting map. @@ -200,7 +200,7 @@ impl StoreDriver for MemoryStore { self: Pin<&Self>, range: (Bound>, Bound>), handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), - ) -> Result { + ) -> Result { let range = ( range.0.map(|v| v.into_owned()), range.1.map(|v| v.into_owned()), @@ -247,8 +247,8 @@ impl StoreDriver for MemoryStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { if is_zero_digest(key.borrow()) { writer @@ -262,11 +262,18 @@ impl StoreDriver for MemoryStore { .get(&key.borrow().into_owned()) .await .err_tip_with_code(|_| (Code::NotFound, format!("Key {key:?} not found")))?; - let default_len = value.len() - offset; + let default_len = + u64::try_from(value.len()).err_tip(|| "Could not convert value.len() to u64")? - offset; let length = length.unwrap_or(default_len).min(default_len); if length > 0 { writer - .send(value.0.slice(offset..(offset + length))) + .send( + value.0.slice( + usize::try_from(offset).err_tip(|| "Could not convert offset to usize")? + ..usize::try_from(offset + length) + .err_tip(|| "Could not convert (offset + length) to usize")?, + ), + ) .await .err_tip(|| "Failed to write data in memory store")?; } diff --git a/nativelink-store/src/noop_store.rs b/nativelink-store/src/noop_store.rs index 457e8b199..1b8def3b6 100644 --- a/nativelink-store/src/noop_store.rs +++ b/nativelink-store/src/noop_store.rs @@ -48,7 +48,7 @@ impl StoreDriver for NoopStore { async fn has_with_results( self: Pin<&Self>, _keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { results.iter_mut().for_each(|r| *r = None); Ok(()) @@ -75,8 +75,8 @@ impl StoreDriver for NoopStore { self: Pin<&Self>, _key: StoreKey<'_>, _writer: &mut DropCloserWriteHalf, - _offset: usize, - _length: Option, + _offset: u64, + _length: Option, ) -> Result<(), Error> { Err(make_err!(Code::NotFound, "Not found in noop store")) } diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 9aa531096..be3a32eb1 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -178,7 +178,7 @@ impl StoreDriver for RedisStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { // TODO(caass): Optimize for the case where `keys.len() == 1` let pipeline = self.client_pool.next().pipeline(); @@ -334,8 +334,8 @@ impl StoreDriver for RedisStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { // To follow RBE spec we need to consider any digest's with // zero size to be existing. @@ -355,16 +355,25 @@ impl StoreDriver for RedisStore { // We want to read the data at the key from `offset` to `offset + length`. let data_start = offset; let data_end = data_start - .saturating_add(length.unwrap_or(isize::MAX as usize)) + .saturating_add(length.unwrap_or(isize::MAX as u64)) .saturating_sub(1); // And we don't ever want to read more than `READ_CHUNK_SIZE` bytes at a time, so we'll need to iterate. let mut chunk_start = data_start; - let mut chunk_end = cmp::min(data_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); + let mut chunk_end = cmp::min( + data_start.saturating_add(READ_CHUNK_SIZE as u64) - 1, + data_end, + ); loop { let chunk: Bytes = client - .getrange(encoded_key, chunk_start, chunk_end) + .getrange( + encoded_key, + usize::try_from(chunk_start) + .err_tip(|| "Could not convert chunk_start to usize")?, + usize::try_from(chunk_end) + .err_tip(|| "Could not convert chunk_end to usize")?, + ) .await .err_tip(|| "In RedisStore::get_part::getrange")?; @@ -390,7 +399,10 @@ impl StoreDriver for RedisStore { // ...and go grab the next chunk. chunk_start = chunk_end + 1; - chunk_end = cmp::min(chunk_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); + chunk_end = cmp::min( + chunk_start.saturating_add(READ_CHUNK_SIZE as u64) - 1, + data_end, + ); } // If we didn't write any data, check if the key exists, if not return a NotFound error. diff --git a/nativelink-store/src/ref_store.rs b/nativelink-store/src/ref_store.rs index cb2ba7cfd..d2446d174 100644 --- a/nativelink-store/src/ref_store.rs +++ b/nativelink-store/src/ref_store.rs @@ -105,7 +105,7 @@ impl StoreDriver for RefStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.get_store()?.has_with_results(keys, results).await } @@ -123,8 +123,8 @@ impl StoreDriver for RefStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { self.get_store()? .get_part(key, writer, offset, length) diff --git a/nativelink-store/src/s3_store.rs b/nativelink-store/src/s3_store.rs index 4bce1227b..27431268a 100644 --- a/nativelink-store/src/s3_store.rs +++ b/nativelink-store/src/s3_store.rs @@ -66,23 +66,23 @@ use crate::cas_utils::is_zero_digest; // S3 parts cannot be smaller than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html -const MIN_MULTIPART_SIZE: usize = 5 * 1024 * 1024; // 5MB. +const MIN_MULTIPART_SIZE: u64 = 5 * 1024 * 1024; // 5MB. // S3 parts cannot be larger than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html -const MAX_MULTIPART_SIZE: usize = 5 * 1024 * 1024 * 1024; // 5GB. +const MAX_MULTIPART_SIZE: u64 = 5 * 1024 * 1024 * 1024; // 5GB. // S3 parts cannot be more than this number. See: // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html -const MAX_UPLOAD_PARTS: usize = 10_000; +const MAX_UPLOAD_PARTS: u64 = 10_000; // Default max buffer size for retrying upload requests. // Note: If you change this, adjust the docs in the config. -const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: usize = 5 * 1024 * 1024; // 5MB. +const DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST: u32 = 5 * 1024 * 1024; // 5MB. // Default limit for concurrent part uploads per multipart upload. // Note: If you change this, adjust the docs in the config. -const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: usize = 10; +const DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS: u64 = 10; pub struct ConnectionWithPermit { connection: T, @@ -250,9 +250,9 @@ pub struct S3Store { #[metric(help = "The number of seconds to consider an object expired")] consider_expired_after_s: i64, #[metric(help = "The number of bytes to buffer for retrying requests")] - max_retry_buffer_per_request: usize, + max_retry_buffer_per_request: u64, #[metric(help = "The number of concurrent uploads allowed for multipart uploads")] - multipart_max_concurrent_uploads: usize, + multipart_max_concurrent_uploads: u64, } impl S3Store @@ -317,7 +317,8 @@ where consider_expired_after_s: i64::from(config.consider_expired_after_s), max_retry_buffer_per_request: config .max_retry_buffer_per_request - .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST), + .unwrap_or(DEFAULT_MAX_RETRY_BUFFER_PER_REQUEST) + as u64, multipart_max_concurrent_uploads: config .multipart_max_concurrent_uploads .map_or(DEFAULT_MULTIPART_MAX_CONCURRENT_UPLOADS, |v| v), @@ -328,7 +329,7 @@ where format!("{}{}", self.key_prefix, key.as_str(),) } - async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { + async fn has(self: Pin<&Self>, digest: &StoreKey<'_>) -> Result, Error> { self.retrier .retry(unfold((), move |state| async move { let result = self @@ -353,7 +354,7 @@ where return Some((RetryResult::Ok(None), state)); }; if length >= 0 { - return Some((RetryResult::Ok(Some(length as usize)), state)); + return Some((RetryResult::Ok(Some(length as u64)), state)); } Some(( RetryResult::Err(make_err!( @@ -388,7 +389,7 @@ where async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { keys.iter() .zip(results.iter_mut()) @@ -449,7 +450,7 @@ where .content_length(sz as i64) .body(ByteStream::from_body_1_x(BodyWrapper { reader: rx, - size: sz as u64, + size: sz, })) .send() .map_ok_or_else(|e| Err(make_err!(Code::Aborted, "{e:?}")), |_| Ok(())), @@ -532,14 +533,17 @@ where let upload_parts = move || async move { // This will ensure we only have `multipart_max_concurrent_uploads` * `bytes_per_upload_part` // bytes in memory at any given time waiting to be uploaded. - let (tx, mut rx) = mpsc::channel(self.multipart_max_concurrent_uploads); + let (tx, mut rx) = mpsc::channel( + usize::try_from(self.multipart_max_concurrent_uploads) + .err_tip(|| "Could not convert multipart_max_concurrent_uploads to usize")?, + ); let read_stream_fut = async move { let retrier = &Pin::get_ref(self).retrier; // Note: Our break condition is when we reach EOF. for part_number in 1..i32::MAX { let write_buf = reader - .consume(Some(bytes_per_upload_part)) + .consume(Some(usize::try_from(bytes_per_upload_part).err_tip(|| "Could not convert bytes_per_upload_part to usize")?)) .await .err_tip(|| "Failed to read chunk in s3_store")?; if write_buf.is_empty() { @@ -589,10 +593,13 @@ where let mut upload_futures = FuturesUnordered::new(); - let mut completed_parts = Vec::with_capacity(cmp::min( - MAX_UPLOAD_PARTS, - (max_size / bytes_per_upload_part) + 1, - )); + let mut completed_parts = Vec::with_capacity( + usize::try_from(cmp::min( + MAX_UPLOAD_PARTS, + (max_size / bytes_per_upload_part) + 1, + )) + .err_tip(|| "Could not convert u64 to usize")?, + ); tokio::pin!(read_stream_fut); loop { if read_stream_fut.is_terminated() && rx.is_empty() && upload_futures.is_empty() { @@ -671,8 +678,8 @@ where self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { if is_zero_digest(key.borrow()) { writer @@ -695,7 +702,7 @@ where .key(s3_path) .range(format!( "bytes={}-{}", - offset + writer.get_bytes_written() as usize, + offset + writer.get_bytes_written(), end_read_byte.map_or_else(String::new, |v| v.to_string()) )) .send() diff --git a/nativelink-store/src/shard_store.rs b/nativelink-store/src/shard_store.rs index 7dc6761c9..ae2ab82f5 100644 --- a/nativelink-store/src/shard_store.rs +++ b/nativelink-store/src/shard_store.rs @@ -144,7 +144,7 @@ impl StoreDriver for ShardStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { if keys.len() == 1 { // Hot path: It is very common to lookup only one key. @@ -212,8 +212,8 @@ impl StoreDriver for ShardStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let store = self.get_store(&key); store diff --git a/nativelink-store/src/size_partitioning_store.rs b/nativelink-store/src/size_partitioning_store.rs index 3d319beaf..a12976bbc 100644 --- a/nativelink-store/src/size_partitioning_store.rs +++ b/nativelink-store/src/size_partitioning_store.rs @@ -52,7 +52,7 @@ impl StoreDriver for SizePartitioningStore { async fn has_with_results( self: Pin<&Self>, keys: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { let mut non_digest_sample = None; let (lower_digests, upper_digests): (Vec<_>, Vec<_>) = @@ -120,8 +120,8 @@ impl StoreDriver for SizePartitioningStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { let digest = match key { StoreKey::Digest(digest) => digest, diff --git a/nativelink-store/src/verify_store.rs b/nativelink-store/src/verify_store.rs index 9ee668b93..e4e23ae50 100644 --- a/nativelink-store/src/verify_store.rs +++ b/nativelink-store/src/verify_store.rs @@ -148,7 +148,7 @@ impl StoreDriver for VerifyStore { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { self.inner_store.has_with_results(digests, results).await } @@ -169,7 +169,7 @@ impl StoreDriver for VerifyStore { }; let digest_size = digest.size_bytes(); if let UploadSizeInfo::ExactSize(expected_size) = size_info { - if self.verify_size && expected_size as u64 != digest_size { + if self.verify_size && expected_size != digest_size { self.size_verification_failures.inc(); return Err(make_input_err!( "Expected size to match. Got {} but digest says {} on update", @@ -215,8 +215,8 @@ impl StoreDriver for VerifyStore { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { self.inner_store.get_part(key, writer, offset, length).await } diff --git a/nativelink-store/tests/ac_utils_test.rs b/nativelink-store/tests/ac_utils_test.rs index 462fa6a76..7a97404a0 100644 --- a/nativelink-store/tests/ac_utils_test.rs +++ b/nativelink-store/tests/ac_utils_test.rs @@ -66,7 +66,7 @@ async fn upload_file_to_store_with_large_file() -> Result<(), Error> { .update_with_whole_file( digest, resumeable_file, - UploadSizeInfo::ExactSize(expected_data.len()), + UploadSizeInfo::ExactSize(u64::try_from(expected_data.len()).expect("Cast failed")), ) .await?; } diff --git a/nativelink-store/tests/compression_store_test.rs b/nativelink-store/tests/compression_store_test.rs index 479f4afd9..c1af28bd0 100644 --- a/nativelink-store/tests/compression_store_test.rs +++ b/nativelink-store/tests/compression_store_test.rs @@ -142,7 +142,11 @@ async fn partial_reads_test() -> Result<(), Error> { for read_slice_size in 0..(RAW_DATA.len() + 5) { for offset in 0..(RAW_DATA.len() + 5) { let store_data = store - .get_part_unchunked(digest, offset, Some(read_slice_size)) + .get_part_unchunked( + digest, + u64::try_from(offset).expect("Cast failed"), + Some(u64::try_from(read_slice_size).expect("Cast failed")), + ) .await .err_tip(|| { format!("Failed to get from inner store at {offset} - {read_slice_size}") @@ -255,7 +259,7 @@ async fn sanity_check_zero_bytes_test() -> Result<(), Error> { #[nativelink_test] async fn check_header_test() -> Result<(), Error> { const BLOCK_SIZE: u32 = 150; - const MAX_SIZE_INPUT: usize = 1024 * 1024; // 1MB. + const MAX_SIZE_INPUT: u64 = 1024 * 1024; // 1MB. let inner_store = MemoryStore::new(&nativelink_config::stores::MemoryStore::default()); let store_owned = CompressionStore::new( nativelink_config::stores::CompressionStore { diff --git a/nativelink-store/tests/dedup_store_test.rs b/nativelink-store/tests/dedup_store_test.rs index cc84c7985..5e0b87ffc 100644 --- a/nativelink-store/tests/dedup_store_test.rs +++ b/nativelink-store/tests/dedup_store_test.rs @@ -58,7 +58,7 @@ async fn simple_round_trip_test() -> Result<(), Error> { Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; let original_data = make_random_data(MEGABYTE_SZ); let digest = DigestInfo::try_new(VALID_HASH1, MEGABYTE_SZ).unwrap(); @@ -86,7 +86,7 @@ async fn check_missing_last_chunk_test() -> Result<(), Error> { &nativelink_config::stores::MemoryStore::default(), )), // Index store. Store::new(content_store.clone()), - ); + )?; let original_data = make_random_data(MEGABYTE_SZ); let digest = DigestInfo::try_new(VALID_HASH1, MEGABYTE_SZ).unwrap(); @@ -134,7 +134,7 @@ async fn fetch_part_test() -> Result<(), Error> { Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; const DATA_SIZE: usize = MEGABYTE_SZ / 4; let original_data = make_random_data(DATA_SIZE); @@ -147,7 +147,7 @@ async fn fetch_part_test() -> Result<(), Error> { const ONE_THIRD_SZ: usize = DATA_SIZE / 3; let rt_data = store - .get_part_unchunked(digest, ONE_THIRD_SZ, Some(ONE_THIRD_SZ)) + .get_part_unchunked(digest, ONE_THIRD_SZ as u64, Some(ONE_THIRD_SZ as u64)) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -186,7 +186,7 @@ async fn check_length_not_set_with_chunk_read_beyond_first_chunk_regression_test Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; const DATA_SIZE: usize = 30; let original_data = make_random_data(DATA_SIZE); @@ -200,7 +200,7 @@ async fn check_length_not_set_with_chunk_read_beyond_first_chunk_regression_test // This value must be larger than `max_size` in the config above. const START_READ_BYTE: usize = 7; let rt_data = store - .get_part_unchunked(digest, START_READ_BYTE, None) + .get_part_unchunked(digest, START_READ_BYTE as u64, None) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -238,7 +238,7 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { Store::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )), // Content store. - ); + )?; const DATA_SIZE: usize = 30; let original_data = make_random_data(DATA_SIZE); @@ -251,11 +251,19 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { for offset in 0..=DATA_SIZE { for len in 0..DATA_SIZE { // If reading at DATA_SIZE, we will set len to None to check that edge case. - let maybe_len = if offset == DATA_SIZE { None } else { Some(len) }; + let maybe_len = if offset == DATA_SIZE { + None + } else { + Some(u64::try_from(len).expect("Cast failed")) + }; let len = if maybe_len.is_none() { DATA_SIZE } else { len }; let rt_data = store - .get_part_unchunked(digest, offset, maybe_len) + .get_part_unchunked( + digest, + u64::try_from(offset).expect("Cast failed"), + maybe_len, + ) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -276,7 +284,7 @@ async fn check_chunk_boundary_reads_test() -> Result<(), Error> { // This value must be larger than `max_size` in the config above. const START_READ_BYTE: usize = 10; let rt_data = store - .get_part_unchunked(digest, START_READ_BYTE, None) + .get_part_unchunked(digest, START_READ_BYTE as u64, None) .await .err_tip(|| "Failed to get_part from dedup store")?; @@ -309,7 +317,7 @@ async fn has_checks_content_store() -> Result<(), Error> { &make_default_config(), Store::new(index_store.clone()), Store::new(content_store.clone()), - ); + )?; const DATA_SIZE: usize = MEGABYTE_SZ / 4; let original_data = make_random_data(DATA_SIZE); @@ -323,7 +331,7 @@ async fn has_checks_content_store() -> Result<(), Error> { { // Check to ensure we our baseline `.has()` succeeds. let size_info = store.has(digest1).await.err_tip(|| "Failed to run .has")?; - assert_eq!(size_info, Some(DATA_SIZE), "Expected sizes to match"); + assert_eq!(size_info, Some(DATA_SIZE as u64), "Expected sizes to match"); } { // There will be exactly 10 entries in our content_store based on our random seed data. @@ -339,7 +347,11 @@ async fn has_checks_content_store() -> Result<(), Error> { { // Check our recently added entry is still valid. let size_info = store.has(digest2).await.err_tip(|| "Failed to run .has")?; - assert_eq!(size_info, Some(DATA2.len()), "Expected sizes to match"); + assert_eq!( + size_info, + Some(DATA2.len() as u64), + "Expected sizes to match" + ); } { // Check our first added entry is now invalid (because part of it was evicted). @@ -370,7 +382,7 @@ async fn has_with_no_existing_index_returns_none_test() -> Result<(), Error> { &make_default_config(), Store::new(index_store.clone()), Store::new(content_store.clone()), - ); + )?; const DATA_SIZE: usize = 10; let digest = DigestInfo::try_new(VALID_HASH1, DATA_SIZE).unwrap(); diff --git a/nativelink-store/tests/existence_store_test.rs b/nativelink-store/tests/existence_store_test.rs index e48a0f6a1..5fafb9d36 100644 --- a/nativelink-store/tests/existence_store_test.rs +++ b/nativelink-store/tests/existence_store_test.rs @@ -58,7 +58,7 @@ async fn simple_exist_cache_test() -> Result<(), Error> { .has(digest) .await .err_tip(|| "Failed to check store")?, - Some(VALUE.len()), + Some(VALUE.len() as u64), "Expected digest to exist in store" ); @@ -145,20 +145,20 @@ async fn ensure_has_requests_eventually_do_let_evictions_happen() -> Result<(), MockInstantWrapped::default(), ); - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); // Now that our existence cache has been populated, remove // it from the inner store. inner_store.remove_entry(digest.into()).await; - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); - assert_eq!(store.has(digest).await, Ok(Some(VALUE.len()))); + assert_eq!(store.has(digest).await, Ok(Some(VALUE.len() as u64))); MockClock::advance(Duration::from_secs(3)); // It should have been evicted from the existence cache by now. diff --git a/nativelink-store/tests/fast_slow_store_test.rs b/nativelink-store/tests/fast_slow_store_test.rs index 96b3ccdc9..fe03977ef 100644 --- a/nativelink-store/tests/fast_slow_store_test.rs +++ b/nativelink-store/tests/fast_slow_store_test.rs @@ -113,10 +113,17 @@ async fn fetch_slow_store_puts_in_fast_store_test() -> Result<(), Error> { assert_eq!( fast_slow_store.has(digest).await, - Ok(Some(original_data.len())) + Ok(Some( + u64::try_from(original_data.len()).expect("Cast failed") + )) ); assert_eq!(fast_store.has(digest).await, Ok(None)); - assert_eq!(slow_store.has(digest).await, Ok(Some(original_data.len()))); + assert_eq!( + slow_store.has(digest).await, + Ok(Some( + u64::try_from(original_data.len()).expect("Cast failed") + )) + ); // This get() request should place the data in fast_store too. fast_slow_store.get_part_unchunked(digest, 0, None).await?; @@ -155,78 +162,79 @@ async fn partial_reads_copy_full_to_fast_store_test() -> Result<(), Error> { } #[test] -fn calculate_range_test() { +fn calculate_range_test() -> Result<(), Error> { let test = |start_range, end_range| FastSlowStore::calculate_range(&start_range, &end_range); { // Exact match. let received_range = 0..1; let send_range = 0..1; let expected_results = Some(0..1); - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // Minus one on received_range. let received_range = 1..4; let send_range = 1..5; let expected_results = Some(0..3); - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // Minus one on send_range. let received_range = 1..5; let send_range = 1..4; let expected_results = Some(0..3); - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // Should have already sent all data (start fence post). let received_range = 1..2; let send_range = 0..1; let expected_results = None; - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // Definiltly already sent data. let received_range = 2..3; let send_range = 0..1; let expected_results = None; - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // All data should be sent (inside range). let received_range = 3..4; let send_range = 0..100; let expected_results = Some(0..1); // Note: This is relative received_range.start. - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // Subset of received data should be sent. let received_range = 1..100; let send_range = 3..4; let expected_results = Some(2..3); // Note: This is relative received_range.start. - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // We are clearly not at the offset yet. let received_range = 0..1; let send_range = 3..4; let expected_results = None; - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // Not at offset yet (fence post). let received_range = 0..1; let send_range = 1..2; let expected_results = None; - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } { // Head part of the received data should be sent. let received_range = 1..3; let send_range = 2..5; let expected_results = Some(1..2); - assert_eq!(test(received_range, send_range), expected_results); + assert_eq!(test(received_range, send_range)?, expected_results); } + Ok(()) } #[nativelink_test] @@ -244,12 +252,12 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error> { if let Some(has_digest) = self.digest { for (digest, result) in digests.iter().zip(results.iter_mut()) { if *digest == has_digest.into() { - *result = Some(has_digest.size_bytes() as usize); + *result = Some(has_digest.size_bytes()); } } } @@ -281,13 +289,13 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { self: Pin<&Self>, key: StoreKey<'_>, writer: &mut nativelink_util::buf_channel::DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { // Gets called in the slow store and we provide the data that's // sent to the upstream and the fast store. - let bytes = length.unwrap_or(key.into_digest().size_bytes() as usize) - offset; - let data = vec![0_u8; bytes]; + let bytes = length.unwrap_or(key.into_digest().size_bytes()) - offset; + let data = vec![0_u8; usize::try_from(bytes).expect("Cast failed")]; writer.send(Bytes::copy_from_slice(&data)).await?; writer.send_eof() } @@ -350,7 +358,7 @@ async fn drop_on_eof_completes_store_futures() -> Result<(), Error> { // Drop get_part as soon as rx.drain() completes tokio::select!( res = rx.drain() => res, - res = fast_slow_store.get_part(digest, tx, 0, Some(digest.size_bytes() as usize)) => res, + res = fast_slow_store.get_part(digest, tx, 0, Some(digest.size_bytes())) => res, ) }, async move { @@ -438,7 +446,7 @@ async fn has_checks_fast_store_when_noop() -> Result<(), Error> { assert_eq!( fast_slow_store.has(digest).await, - Ok(Some(data.len())), + Ok(Some(u64::try_from(data.len()).expect("Cast failed"))), "Expected data to exist in store" ); diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs index 6e0db5b18..2a0252fb0 100644 --- a/nativelink-store/tests/filesystem_store_test.rs +++ b/nativelink-store/tests/filesystem_store_test.rs @@ -272,7 +272,7 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> { assert_eq!( store.has(digest).await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected filesystem store to have hash: {}", HASH1 ); @@ -1355,7 +1355,11 @@ async fn update_with_whole_file_closes_file() -> Result<(), Error> { } store - .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .update_with_whole_file( + digest, + file, + UploadSizeInfo::ExactSize(u64::try_from(value.len()).expect("Cast failed")), + ) .await?; Ok(()) } @@ -1423,7 +1427,11 @@ async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result< } store - .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .update_with_whole_file( + digest, + file, + UploadSizeInfo::ExactSize(u64::try_from(value.len()).expect("Cast failed")), + ) .await?; Ok(()) } @@ -1466,7 +1474,11 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> { .ino(); let result = store - .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len())) + .update_with_whole_file( + digest, + file, + UploadSizeInfo::ExactSize(u64::try_from(value.len()).expect("Cast failed")), + ) .await?; assert!( result.is_none(), diff --git a/nativelink-store/tests/memory_store_test.rs b/nativelink-store/tests/memory_store_test.rs index b1bc32da0..42db31807 100644 --- a/nativelink-store/tests/memory_store_test.rs +++ b/nativelink-store/tests/memory_store_test.rs @@ -45,15 +45,15 @@ async fn insert_one_item_then_update() -> Result<(), Error> { // Insert dummy value into store. store .update_oneshot( - DigestInfo::try_new(VALID_HASH1, VALUE1.len())?, + DigestInfo::try_new(VALID_HASH1, VALUE1.len() as u64)?, VALUE1.into(), ) .await?; assert_eq!( store - .has(DigestInfo::try_new(VALID_HASH1, VALUE1.len())?) + .has(DigestInfo::try_new(VALID_HASH1, VALUE1.len() as u64)?) .await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected memory store to have hash: {}", VALID_HASH1 ); diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index c5ae01210..4262276f9 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -195,7 +195,11 @@ async fn upload_and_get_data() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.len())) + .get_part_unchunked( + digest, + 0, + Some(u64::try_from(data.len()).expect("Cast failed")), + ) .await?; assert_eq!(result, data, "Expected redis store to have updated value",); @@ -275,7 +279,11 @@ async fn upload_and_get_data_with_prefix() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.len())) + .get_part_unchunked( + digest, + 0, + Some(u64::try_from(data.len()).expect("Cast failed")), + ) .await?; assert_eq!(result, data, "Expected redis store to have updated value",); @@ -416,7 +424,11 @@ async fn test_large_downloads_are_chunked() -> Result<(), Error> { ); let get_result: Bytes = store - .get_part_unchunked(digest, 0, Some(data.clone().len())) + .get_part_unchunked( + digest, + 0, + Some(u64::try_from(data.clone().len()).expect("Cast failed")), + ) .await?; assert_eq!( @@ -501,7 +513,11 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { tx.send(data_p2).await?; tx.send_eof()?; store - .update(digest, rx, UploadSizeInfo::ExactSize(data.len())) + .update( + digest, + rx, + UploadSizeInfo::ExactSize(u64::try_from(data.len()).expect("Cast failed")), + ) .await?; let result = store.has(digest).await?; @@ -511,7 +527,11 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { ); let result = store - .get_part_unchunked(digest, 0, Some(data.clone().len())) + .get_part_unchunked( + digest, + 0, + Some(u64::try_from(data.clone().len()).expect("Cast failed")), + ) .await?; assert_eq!(result, data, "Expected redis store to have updated value",); diff --git a/nativelink-store/tests/ref_store_test.rs b/nativelink-store/tests/ref_store_test.rs index 85cea6a6d..d4d268bf2 100644 --- a/nativelink-store/tests/ref_store_test.rs +++ b/nativelink-store/tests/ref_store_test.rs @@ -64,7 +64,7 @@ async fn has_test() -> Result<(), Error> { .await; assert_eq!( has_result, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected ref store to have data in ref store : {}", VALID_HASH1 ); diff --git a/nativelink-store/tests/s3_store_test.rs b/nativelink-store/tests/s3_store_test.rs index 593e547f2..b2e1ac669 100644 --- a/nativelink-store/tests/s3_store_test.rs +++ b/nativelink-store/tests/s3_store_test.rs @@ -221,7 +221,7 @@ async fn simple_update_ac() -> Result<(), Error> { .update( DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?, rx, - UploadSizeInfo::ExactSize(CONTENT_LENGTH), + UploadSizeInfo::ExactSize(CONTENT_LENGTH as u64), ) .await }); @@ -350,8 +350,8 @@ async fn smoke_test_get_part() -> Result<(), Error> { store .get_part_unchunked( DigestInfo::try_new(VALID_HASH1, AC_ENTRY_SIZE)?, - OFFSET, - Some(LENGTH), + OFFSET as u64, + Some(LENGTH as u64), ) .await?; @@ -589,7 +589,7 @@ async fn ensure_empty_string_in_stream_works_test() -> Result<(), Error> { store.get_part_unchunked( DigestInfo::try_new(VALID_HASH1, CAS_ENTRY_SIZE)?, 0, - Some(CAS_ENTRY_SIZE), + Some(CAS_ENTRY_SIZE as u64), ) ); assert_eq!( diff --git a/nativelink-store/tests/shard_store_test.rs b/nativelink-store/tests/shard_store_test.rs index e75208fb5..02dc13fda 100644 --- a/nativelink-store/tests/shard_store_test.rs +++ b/nativelink-store/tests/shard_store_test.rs @@ -106,7 +106,7 @@ async fn has_with_one_digest() -> Result<(), Error> { .update_oneshot(digest1, original_data.clone().into()) .await?; - assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ))); + assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ as u64))); Ok(()) } @@ -141,7 +141,7 @@ async fn has_with_many_digests_one_missing() -> Result<(), Error> { shard_store .has_many(&[digest1.into(), missing_digest.into()]) .await, - Ok(vec![Some(MEGABYTE_SZ), None]) + Ok(vec![Some(MEGABYTE_SZ as u64), None]) ); Ok(()) } @@ -165,7 +165,10 @@ async fn has_with_many_digests_both_exist() -> Result<(), Error> { shard_store .has_many(&[digest1.into(), digest2.into()]) .await, - Ok(vec![Some(original_data1.len()), Some(original_data2.len())]) + Ok(vec![ + Some(u64::try_from(original_data1.clone().len()).expect("Cast failed")), + Some(u64::try_from(original_data2.clone().len()).expect("Cast failed")) + ]) ); Ok(()) } @@ -253,7 +256,7 @@ async fn upload_download_has_check() -> Result<(), Error> { shard_store.get_part_unchunked(digest1, 0, None).await, Ok(original_data1.into()) ); - assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ))); + assert_eq!(shard_store.has(digest1).await, Ok(Some(MEGABYTE_SZ as u64))); Ok(()) } @@ -268,7 +271,7 @@ async fn weights_send_to_proper_store() -> Result<(), Error> { .update_oneshot(digest1, original_data1.clone().into()) .await?; - assert_eq!(stores[0].has(digest1).await, Ok(Some(MEGABYTE_SZ))); + assert_eq!(stores[0].has(digest1).await, Ok(Some(MEGABYTE_SZ as u64))); assert_eq!(stores[1].has(digest1).await, Ok(None)); Ok(()) } diff --git a/nativelink-store/tests/size_partitioning_store_test.rs b/nativelink-store/tests/size_partitioning_store_test.rs index 216e75ca2..0339121ec 100644 --- a/nativelink-store/tests/size_partitioning_store_test.rs +++ b/nativelink-store/tests/size_partitioning_store_test.rs @@ -84,7 +84,7 @@ async fn has_test() -> Result<(), Error> { .await; assert_eq!( small_has_result, - Ok(Some(SMALL_VALUE.len())), + Ok(Some(SMALL_VALUE.len() as u64)), "Expected size part store to have data in ref store : {}", SMALL_HASH ); @@ -96,7 +96,7 @@ async fn has_test() -> Result<(), Error> { .await; assert_eq!( small_has_result, - Ok(Some(BIG_VALUE.len())), + Ok(Some(BIG_VALUE.len() as u64)), "Expected size part store to have data in ref store : {}", BIG_HASH ); diff --git a/nativelink-store/tests/verify_store_test.rs b/nativelink-store/tests/verify_store_test.rs index d3c24b030..8382d48ba 100644 --- a/nativelink-store/tests/verify_store_test.rs +++ b/nativelink-store/tests/verify_store_test.rs @@ -55,7 +55,7 @@ async fn verify_size_false_passes_on_update() -> Result<(), Error> { ); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -121,7 +121,7 @@ async fn verify_size_true_suceeds_on_update() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE1.len())), + Ok(Some(VALUE1.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -187,7 +187,7 @@ async fn verify_sha256_hash_true_suceeds_on_update() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE.len())), + Ok(Some(VALUE.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -256,7 +256,7 @@ async fn verify_blake3_hash_true_suceeds_on_update() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE.len())), + Ok(Some(VALUE.len() as u64)), "Expected data to exist in store after update" ); Ok(()) @@ -374,7 +374,7 @@ async fn verify_size_and_hash_suceeds_on_small_data() -> Result<(), Error> { assert_eq!(result, Ok(()), "Expected success, got: {:?}", result); assert_eq!( inner_store.has(digest).await, - Ok(Some(VALUE.len())), + Ok(Some(VALUE.len() as u64)), "Expected data to exist in store after update" ); Ok(()) diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs index ed1f9b0ec..ba3aa05b6 100644 --- a/nativelink-util/src/buf_channel.rs +++ b/nativelink-util/src/buf_channel.rs @@ -268,8 +268,8 @@ impl DropCloserReadHalf { /// Sets the maximum size of the recent_data buffer. If the number of bytes /// received exceeds this size, the recent_data buffer will be cleared and /// no longer populated. - pub fn set_max_recent_data_size(&mut self, size: usize) { - self.max_recent_data_size = size as u64; + pub fn set_max_recent_data_size(&mut self, size: u64) { + self.max_recent_data_size = size; } /// Attempts to reset the stream to before any data was received. This will diff --git a/nativelink-util/src/default_store_key_subscribe.rs b/nativelink-util/src/default_store_key_subscribe.rs index 0638c239b..70ac89fc3 100644 --- a/nativelink-util/src/default_store_key_subscribe.rs +++ b/nativelink-util/src/default_store_key_subscribe.rs @@ -64,8 +64,8 @@ impl StoreSubscriptionItem for DefaultStoreSubscription async fn get_part( &self, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error> { Pin::new(self.store.as_ref()) .get_part(self.key.borrow(), writer, offset, length) diff --git a/nativelink-util/src/evicting_map.rs b/nativelink-util/src/evicting_map.rs index 90a1d5597..327d0a43d 100644 --- a/nativelink-util/src/evicting_map.rs +++ b/nativelink-util/src/evicting_map.rs @@ -210,7 +210,7 @@ where /// and return the number of items that were processed. /// The `handler` function should return `true` to continue processing the next item /// or `false` to stop processing. - pub async fn range(&self, prefix_range: impl RangeBounds, mut handler: F) -> usize + pub async fn range(&self, prefix_range: impl RangeBounds, mut handler: F) -> u64 where F: FnMut(&K, &T) -> bool, K: Borrow + Ord, @@ -300,7 +300,7 @@ where } /// Return the size of a `key`, if not found `None` is returned. - pub async fn size_for_key(&self, key: &Q) -> Option + pub async fn size_for_key(&self, key: &Q) -> Option where K: Borrow, Q: Ord + Hash + Eq + Debug, @@ -316,12 +316,8 @@ where /// If no key is found in the internal map, `None` is filled in its place. /// If `peek` is set to `true`, the items are not promoted to the front of the /// LRU cache. Note: peek may still evict, but won't promote. - pub async fn sizes_for_keys( - &self, - keys: It, - results: &mut [Option], - peek: bool, - ) where + pub async fn sizes_for_keys(&self, keys: It, results: &mut [Option], peek: bool) + where It: IntoIterator, // This may look strange, but what we are doing is saying: // * `K` must be able to borrow `Q` @@ -350,10 +346,10 @@ where // we are here. let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX); if !should_evict && peek { - *result = Some(entry.data.len()); + *result = Some(entry.data.len() as u64); } else if !should_evict && entry.data.touch().await { entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32; - *result = Some(entry.data.len()); + *result = Some(entry.data.len() as u64); } else { *result = None; if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) { diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs index 1323c6df5..5c7d67355 100644 --- a/nativelink-util/src/store_trait.rs +++ b/nativelink-util/src/store_trait.rs @@ -64,13 +64,13 @@ pub fn set_default_digest_size_health_check(size: usize) -> Result<(), Error> { pub enum UploadSizeInfo { /// When the data transfer amount is known to be exact size, this enum should be used. /// The receiver store can use this to better optimize the way the data is sent or stored. - ExactSize(usize), + ExactSize(u64), /// When the data transfer amount is not known to be exact, the caller should use this enum /// to provide the maximum size that could possibly be sent. This will bypass the exact size /// checks, but still provide useful information to the underlying store about the data being /// sent that it can then use to optimize the upload process. - MaxSize(usize), + MaxSize(u64), } /// Utility to send all the data to the store from a file. @@ -173,8 +173,8 @@ pub trait StoreSubscriptionItem: Send + Sync + Unpin { async fn get_part( &self, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error>; /// Same as `Store::get`, but without the key. @@ -436,7 +436,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn has<'a>( &'a self, digest: impl Into>, - ) -> impl Future, Error>> + 'a { + ) -> impl Future, Error>> + 'a { self.as_store_driver_pin().has(digest.into()) } @@ -448,7 +448,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn has_many<'a>( &'a self, digests: &'a [StoreKey<'a>], - ) -> impl Future>, Error>> + Send + 'a { + ) -> impl Future>, Error>> + Send + 'a { self.as_store_driver_pin().has_many(digests) } @@ -458,7 +458,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn has_with_results<'a>( &'a self, digests: &'a [StoreKey<'a>], - results: &'a mut [Option], + results: &'a mut [Option], ) -> impl Future> + Send + 'a { self.as_store_driver_pin() .has_with_results(digests, results) @@ -474,7 +474,7 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { &'a self, range: impl RangeBounds> + Send + 'b, mut handler: impl for<'c> FnMut(&'c StoreKey) -> bool + Send + Sync + 'a, - ) -> impl Future> + Send + 'a + ) -> impl Future> + Send + 'a where 'b: 'a, { @@ -544,8 +544,8 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { &'a self, digest: impl Into>, mut writer: impl BorrowMut + Send + 'a, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> impl Future> + Send + 'a { let key = digest.into(); // Note: We need to capture `writer` just in case the caller @@ -574,8 +574,8 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static { fn get_part_unchunked<'a>( &'a self, key: impl Into>, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> impl Future> + Send + 'a { self.as_store_driver_pin() .get_part_unchunked(key.into(), offset, length) @@ -598,7 +598,7 @@ pub trait StoreDriver: { /// See: [`StoreLike::has`] for details. #[inline] - async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { + async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result, Error> { let mut result = [None]; self.has_with_results(&[key], &mut result).await?; Ok(result[0]) @@ -609,7 +609,7 @@ pub trait StoreDriver: async fn has_many( self: Pin<&Self>, digests: &[StoreKey<'_>], - ) -> Result>, Error> { + ) -> Result>, Error> { let mut results = vec![None; digests.len()]; self.has_with_results(digests, &mut results).await?; Ok(results) @@ -619,7 +619,7 @@ pub trait StoreDriver: async fn has_with_results( self: Pin<&Self>, digests: &[StoreKey<'_>], - results: &mut [Option], + results: &mut [Option], ) -> Result<(), Error>; /// See: [`StoreLike::list`] for details. @@ -627,7 +627,7 @@ pub trait StoreDriver: self: Pin<&Self>, _range: (Bound>, Bound>), _handler: &mut (dyn for<'a> FnMut(&'a StoreKey) -> bool + Send + Sync + '_), - ) -> Result { + ) -> Result { // TODO(allada) We should force all stores to implement this function instead of // providing a default implementation. Err(make_err!( @@ -691,7 +691,13 @@ pub trait StoreDriver: }; try_join!( send_fut, - self.update(key, rx, UploadSizeInfo::ExactSize(data_len)) + self.update( + key, + rx, + UploadSizeInfo::ExactSize( + u64::try_from(data_len).err_tip(|| "Could not convert data_len to u64")? + ) + ) )?; Ok(()) } @@ -701,8 +707,8 @@ pub trait StoreDriver: self: Pin<&Self>, key: StoreKey<'_>, writer: &mut DropCloserWriteHalf, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result<(), Error>; /// See: [`StoreLike::get`] for details. @@ -719,8 +725,8 @@ pub trait StoreDriver: async fn get_part_unchunked( self: Pin<&Self>, key: StoreKey<'_>, - offset: usize, - length: Option, + offset: u64, + length: Option, ) -> Result { // TODO(blaise.bruer) This is extremely inefficient, since we have exactly // what we need here. Maybe we could instead make a version of the stream @@ -728,7 +734,7 @@ pub trait StoreDriver: let (mut tx, mut rx) = make_buf_channel_pair(); let (data_res, get_part_res) = join!( - rx.consume(length), + rx.consume(length.map(|value| value as usize)), // We use a closure here to ensure that the `tx` is dropped when the // future is done. async move { self.get_part(key, &mut tx, offset, length).await }, @@ -763,7 +769,7 @@ pub trait StoreDriver: let mut digest_hasher = default_digest_hasher_func().hasher(); digest_hasher.update(&digest_data); - let digest_data_len = digest_data.len(); + let digest_data_len = digest_data.len() as u64; let digest_info = StoreKey::from(digest_hasher.finalize_digest()); let digest_bytes = bytes::Bytes::copy_from_slice(&digest_data); diff --git a/nativelink-util/tests/evicting_map_test.rs b/nativelink-util/tests/evicting_map_test.rs index 1e33cbc66..0a4642aba 100644 --- a/nativelink-util/tests/evicting_map_test.rs +++ b/nativelink-util/tests/evicting_map_test.rs @@ -152,14 +152,14 @@ async fn insert_purges_at_max_bytes() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH3, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 3" ); assert_eq!( evicting_map .size_for_key(&DigestInfo::try_new(HASH4, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 4" ); @@ -216,7 +216,7 @@ async fn insert_purges_to_low_watermark_at_max_bytes() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH4, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 4" ); @@ -263,21 +263,21 @@ async fn insert_purges_at_max_seconds() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH2, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 2" ); assert_eq!( evicting_map .size_for_key(&DigestInfo::try_new(HASH3, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 3" ); assert_eq!( evicting_map .size_for_key(&DigestInfo::try_new(HASH4, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 4" ); @@ -329,7 +329,7 @@ async fn get_refreshes_time() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH3, 0)?) .await, - Some(DATA.len()), + Some(DATA.len() as u64), "Expected map to have item 3" ); @@ -486,14 +486,14 @@ async fn hashes_equal_sizes_different_doesnt_override() -> Result<(), Error> { evicting_map .size_for_key(&DigestInfo::try_new(HASH1, 0)?) .await, - Some(value1.len()), + Some(u64::try_from(value1.len()).expect("Cast failed")), "HASH1/0 should exist" ); assert_eq!( evicting_map .size_for_key(&DigestInfo::try_new(HASH1, 1)?) .await, - Some(value2.len()), + Some(u64::try_from(value2.len()).expect("Cast failed")), "HASH1/1 should exist" ); diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 570ea22fe..ebc0c9114 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -287,7 +287,7 @@ async fn upload_file( .update_with_whole_file( digest.into(), resumeable_file, - UploadSizeInfo::ExactSize(digest.size_bytes() as usize), + UploadSizeInfo::ExactSize(digest.size_bytes()), ) .await .err_tip(|| format!("for {full_path:?}"))?;