Skip to content

Commit

Permalink
Convert usize to u63 in Store trait APIs (#1344)
Browse files Browse the repository at this point in the history
APIs in `StoreLike` now uses `u62` instead of `usize` It's because
`u64` expresses the size of the data, but `usize` expresses local
memory limits.
  • Loading branch information
aleksdmladenovic authored Sep 20, 2024
1 parent 8d957a5 commit 2a55f1e
Show file tree
Hide file tree
Showing 38 changed files with 307 additions and 257 deletions.
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/memory_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl Drop for ClientAwaitedAction {
/// why the implementation has fixed default values in it.
impl LenEntry for ClientAwaitedAction {
#[inline]
fn len(&self) -> usize {
fn len(&self) -> u64 {
0
}

Expand Down
10 changes: 4 additions & 6 deletions nativelink-scheduler/tests/utils/mock_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use nativelink_error::{make_input_err, Error};
use nativelink_metric::{MetricsComponent, RootMetricsComponent};
use nativelink_util::{
action_messages::{ActionInfo, OperationId},
known_platform_property_provider::KnownPlatformPropertyProvider,
operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
},
use nativelink_util::action_messages::{ActionInfo, OperationId};
use nativelink_util::known_platform_property_provider::KnownPlatformPropertyProvider;
use nativelink_util::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
};
use tokio::sync::{mpsc, Mutex};

Expand Down
20 changes: 10 additions & 10 deletions nativelink-service/src/bytestream_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,7 @@ impl ByteStreamServer {
// `store` to ensure its lifetime follows the future and not the caller.
store
// Bytestream always uses digest size as the actual byte size.
.update(
digest,
rx,
UploadSizeInfo::ExactSize(
usize::try_from(digest.size_bytes()).err_tip(|| "Invalid digest size")?,
),
)
.update(digest, rx, UploadSizeInfo::ExactSize(digest.size_bytes()))
.await
});
Ok(ActiveStreamGuard {
Expand All @@ -275,8 +269,8 @@ impl ByteStreamServer {
digest: DigestInfo,
read_request: ReadRequest,
) -> Result<Response<ReadStream>, Error> {
let read_limit = usize::try_from(read_request.read_limit)
.err_tip(|| "read_limit has is not convertible to usize")?;
let read_limit = u64::try_from(read_request.read_limit)
.err_tip(|| "Could not convert read_limit to u64")?;

let (tx, rx) = make_buf_channel_pair();

Expand All @@ -300,7 +294,13 @@ impl ByteStreamServer {
maybe_get_part_result: None,
get_part_fut: Box::pin(async move {
store
.get_part(digest, tx, read_request.read_offset as usize, read_limit)
.get_part(
digest,
tx,
u64::try_from(read_request.read_offset)
.err_tip(|| "Could not convert read_offset to u64")?,
read_limit,
)
.await
}),
});
Expand Down
7 changes: 4 additions & 3 deletions nativelink-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ pub async fn get_and_decode_digest<T: Message + Default + 'static>(
pub async fn get_size_and_decode_digest<T: Message + Default + 'static>(
store: &impl StoreLike,
key: impl Into<StoreKey<'_>>,
) -> Result<(T, usize), Error> {
) -> Result<(T, u64), Error> {
let key = key.into();
// Note: For unknown reasons we appear to be hitting:
// https://github.com/rust-lang/rust/issues/92096
// or a smiliar issue if we try to use the non-store driver function, so we
// are using the store driver function here.
let mut store_data_resp = store
.as_store_driver_pin()
.get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE))
.get_part_unchunked(key.borrow(), 0, Some(MAX_ACTION_MSG_SIZE as u64))
.await;
if let Err(err) = &mut store_data_resp {
if err.code == Code::NotFound {
Expand All @@ -69,7 +69,8 @@ pub async fn get_size_and_decode_digest<T: Message + Default + 'static>(
}
}
let store_data = store_data_resp?;
let store_data_len = store_data.len();
let store_data_len =
u64::try_from(store_data.len()).err_tip(|| "Could not convert store_data.len() to u64")?;

T::decode(store_data)
.err_tip_with_code(|e| {
Expand Down
10 changes: 5 additions & 5 deletions nativelink-store/src/completeness_checking_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ impl CompletenessCheckingStore {
async fn inner_has_with_results(
&self,
action_result_digests: &[StoreKey<'_>],
results: &mut [Option<usize>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
// Holds shared state between the different futures.
// This is how get around lifetime issues.
struct State<'a> {
results: &'a mut [Option<usize>],
results: &'a mut [Option<u64>],
digests_to_check: Vec<StoreKey<'a>>,
digests_to_check_idxs: Vec<usize>,
notify: Arc<Notify>,
Expand Down Expand Up @@ -342,7 +342,7 @@ impl StoreDriver for CompletenessCheckingStore {
async fn has_with_results(
self: Pin<&Self>,
keys: &[StoreKey<'_>],
results: &mut [Option<usize>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
self.inner_has_with_results(keys, results).await
}
Expand All @@ -360,8 +360,8 @@ impl StoreDriver for CompletenessCheckingStore {
self: Pin<&Self>,
key: StoreKey<'_>,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
offset: u64,
length: Option<u64>,
) -> Result<(), Error> {
let results = &mut [None];
self.inner_has_with_results(&[key.borrow()], results)
Expand Down
41 changes: 21 additions & 20 deletions nativelink-store/src/compression_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub const CURRENT_STREAM_FORMAT_VERSION: u8 = 1;
// Default block size that will be used to slice stream into.
pub const DEFAULT_BLOCK_SIZE: u32 = 64 * 1024;

const U32_SZ: usize = std::mem::size_of::<u8>();
const U32_SZ: u64 = std::mem::size_of::<u8>() as u64;

type BincodeOptions = WithOtherIntEncoding<DefaultOptions, FixintEncoding>;

Expand Down Expand Up @@ -145,25 +145,25 @@ pub struct Footer {
/// lz4_flex::block::get_maximum_output_size() way over estimates, so we use the
/// one provided here: https://github.com/torvalds/linux/blob/master/include/linux/lz4.h#L61
/// Local testing shows this gives quite accurate worst case given random input.
fn lz4_compress_bound(input_size: usize) -> usize {
fn lz4_compress_bound(input_size: u64) -> u64 {
input_size + (input_size / 255) + 16
}

struct UploadState {
header: Header,
footer: Footer,
max_output_size: usize,
input_max_size: usize,
max_output_size: u64,
input_max_size: u64,
}

impl UploadState {
pub fn new(store: &CompressionStore, upload_size: UploadSizeInfo) -> Self {
pub fn new(store: &CompressionStore, upload_size: UploadSizeInfo) -> Result<Self, Error> {
let input_max_size = match upload_size {
UploadSizeInfo::ExactSize(sz) => sz,
UploadSizeInfo::MaxSize(sz) => sz,
};

let max_index_count = (input_max_size / store.config.block_size as usize) + 1;
let max_index_count = (input_max_size / store.config.block_size as u64) + 1;

let header = Header {
version: CURRENT_STREAM_FORMAT_VERSION,
Expand All @@ -177,7 +177,8 @@ impl UploadState {
SliceIndex {
..Default::default()
};
max_index_count
usize::try_from(max_index_count)
.err_tip(|| "Could not convert max_index_count to usize")?
],
index_count: max_index_count as u32,
uncompressed_data_size: 0, // Updated later.
Expand All @@ -186,22 +187,22 @@ impl UploadState {
};

// This is more accurate of an estimate than what get_maximum_output_size calculates.
let max_block_size = lz4_compress_bound(store.config.block_size as usize) + U32_SZ + 1;
let max_block_size = lz4_compress_bound(store.config.block_size as u64) + U32_SZ + 1;

let max_output_size = {
let header_size = store.bincode_options.serialized_size(&header).unwrap() as usize;
let header_size = store.bincode_options.serialized_size(&header).unwrap();
let max_content_size = max_block_size * max_index_count;
let max_footer_size =
U32_SZ + 1 + store.bincode_options.serialized_size(&footer).unwrap() as usize;
U32_SZ + 1 + store.bincode_options.serialized_size(&footer).unwrap();
header_size + max_content_size + max_footer_size
};

Self {
Ok(Self {
header,
footer,
max_output_size,
input_max_size,
}
})
}
}

Expand Down Expand Up @@ -246,7 +247,7 @@ impl StoreDriver for CompressionStore {
async fn has_with_results(
self: Pin<&Self>,
digests: &[StoreKey<'_>],
results: &mut [Option<usize>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
self.inner_store.has_with_results(digests, results).await
}
Expand All @@ -257,7 +258,7 @@ impl StoreDriver for CompressionStore {
mut reader: DropCloserReadHalf,
upload_size: UploadSizeInfo,
) -> Result<(), Error> {
let mut output_state = UploadState::new(&self, upload_size);
let mut output_state = UploadState::new(&self, upload_size)?;

let (mut tx, rx) = make_buf_channel_pair();

Expand Down Expand Up @@ -307,7 +308,8 @@ impl StoreDriver for CompressionStore {
break; // EOF.
}

received_amt += chunk.len();
received_amt += u64::try_from(chunk.len())
.err_tip(|| "Could not convert chunk.len() to u64")?;
error_if!(
received_amt > output_state.input_max_size,
"Got more data than stated in compression store upload request"
Expand Down Expand Up @@ -360,7 +362,7 @@ impl StoreDriver for CompressionStore {
},
);
output_state.footer.index_count = output_state.footer.indexes.len() as u32;
output_state.footer.uncompressed_data_size = received_amt as u64;
output_state.footer.uncompressed_data_size = received_amt;
{
// Write Footer.
let serialized_footer = self
Expand Down Expand Up @@ -392,8 +394,8 @@ impl StoreDriver for CompressionStore {
self: Pin<&Self>,
key: StoreKey<'_>,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
offset: u64,
length: Option<u64>,
) -> Result<(), Error> {
if is_zero_digest(key.borrow()) {
writer
Expand All @@ -402,7 +404,6 @@ impl StoreDriver for CompressionStore {
return Ok(());
}

let offset = offset as u64;
let (tx, mut rx) = make_buf_channel_pair();

let inner_store = self.inner_store.clone();
Expand Down Expand Up @@ -474,7 +475,7 @@ impl StoreDriver for CompressionStore {
let mut frame_sz = chunk.get_u32_le();

let mut uncompressed_data_sz: u64 = 0;
let mut remaining_bytes_to_send: u64 = length.unwrap_or(usize::MAX) as u64;
let mut remaining_bytes_to_send: u64 = length.unwrap_or(u64::MAX);
let mut chunks_count: u32 = 0;
while frame_type != FOOTER_FRAME_TYPE {
error_if!(
Expand Down
44 changes: 25 additions & 19 deletions nativelink-store/src/dedup_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use tracing::{event, Level};

// NOTE: If these change update the comments in `stores.rs` to reflect
// the new defaults.
const DEFAULT_MIN_SIZE: usize = 64 * 1024;
const DEFAULT_NORM_SIZE: usize = 256 * 1024;
const DEFAULT_MAX_SIZE: usize = 512 * 1024;
const DEFAULT_MIN_SIZE: u64 = 64 * 1024;
const DEFAULT_NORM_SIZE: u64 = 256 * 1024;
const DEFAULT_MAX_SIZE: u64 = 512 * 1024;
const DEFAULT_MAX_CONCURRENT_FETCH_PER_GET: usize = 10;

#[derive(Serialize, Deserialize, PartialEq, Debug, Default, Clone)]
Expand All @@ -61,37 +61,42 @@ impl DedupStore {
config: &nativelink_config::stores::DedupStore,
index_store: Store,
content_store: Store,
) -> Arc<Self> {
) -> Result<Arc<Self>, Error> {
let min_size = if config.min_size == 0 {
DEFAULT_MIN_SIZE
} else {
config.min_size as usize
config.min_size as u64
};
let normal_size = if config.normal_size == 0 {
DEFAULT_NORM_SIZE
} else {
config.normal_size as usize
config.normal_size as u64
};
let max_size = if config.max_size == 0 {
DEFAULT_MAX_SIZE
} else {
config.max_size as usize
config.max_size as u64
};
let max_concurrent_fetch_per_get = if config.max_concurrent_fetch_per_get == 0 {
DEFAULT_MAX_CONCURRENT_FETCH_PER_GET
} else {
config.max_concurrent_fetch_per_get as usize
};
Arc::new(Self {
Ok(Arc::new(Self {
index_store,
content_store,
fast_cdc_decoder: FastCDC::new(min_size, normal_size, max_size),
fast_cdc_decoder: FastCDC::new(
usize::try_from(min_size).err_tip(|| "Could not convert min_size to usize")?,
usize::try_from(normal_size)
.err_tip(|| "Could not convert normal_size to usize")?,
usize::try_from(max_size).err_tip(|| "Could not convert max_size to usize")?,
),
max_concurrent_fetch_per_get,
bincode_options: DefaultOptions::new().with_fixint_encoding(),
})
}))
}

async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<usize>, Error> {
async fn has(self: Pin<&Self>, key: StoreKey<'_>) -> Result<Option<u64>, Error> {
// First we need to load the index that contains where the individual parts actually
// can be fetched from.
let index_entries = {
Expand Down Expand Up @@ -148,7 +153,7 @@ impl StoreDriver for DedupStore {
async fn has_with_results(
self: Pin<&Self>,
digests: &[StoreKey<'_>],
results: &mut [Option<usize>],
results: &mut [Option<u64>],
) -> Result<(), Error> {
digests
.iter()
Expand Down Expand Up @@ -225,8 +230,8 @@ impl StoreDriver for DedupStore {
self: Pin<&Self>,
key: StoreKey<'_>,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
offset: u64,
length: Option<u64>,
) -> Result<(), Error> {
// Special case for if a client tries to read zero bytes.
if length == Some(0) {
Expand Down Expand Up @@ -255,7 +260,7 @@ impl StoreDriver for DedupStore {
})?
};

let mut start_byte_in_stream: usize = 0;
let mut start_byte_in_stream: u64 = 0;
let entries = {
if offset == 0 && length.is_none() {
index_entries.entries
Expand All @@ -264,8 +269,7 @@ impl StoreDriver for DedupStore {
let mut entries = Vec::with_capacity(index_entries.entries.len());
for entry in index_entries.entries {
let first_byte = current_entries_sum;
let entry_size = usize::try_from(entry.size_bytes())
.err_tip(|| "Failed to convert to usize in DedupStore")?;
let entry_size = entry.size_bytes();
current_entries_sum += entry_size;
// Filter any items who's end byte is before the first requested byte.
if current_entries_sum <= offset {
Expand Down Expand Up @@ -308,8 +312,10 @@ impl StoreDriver for DedupStore {
// In the event any of these error, we will abort early and abandon all the rest of the
// streamed data.
// Note: Need to take special care to ensure we send the proper slice of data requested.
let mut bytes_to_skip = offset - start_byte_in_stream;
let mut bytes_to_send = length.unwrap_or(usize::MAX - offset);
let mut bytes_to_skip = usize::try_from(offset - start_byte_in_stream)
.err_tip(|| "Could not convert (offset - start_byte_in_stream) to usize")?;
let mut bytes_to_send = usize::try_from(length.unwrap_or(u64::MAX - offset))
.err_tip(|| "Could not convert length to usize")?;
while let Some(result) = entries_stream.next().await {
let mut data = result.err_tip(|| "Inner store iterator closed early in DedupStore")?;
assert!(
Expand Down
2 changes: 1 addition & 1 deletion nativelink-store/src/default_store_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn store_factory<'a>(
config,
store_factory(&config.index_store, store_manager, None).await?,
store_factory(&config.content_store, store_manager, None).await?,
),
)?,
StoreConfig::existence_cache(config) => ExistenceCacheStore::new(
config,
store_factory(&config.backend, store_manager, None).await?,
Expand Down
Loading

0 comments on commit 2a55f1e

Please sign in to comment.