diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 222a6af55..9aa531096 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -109,12 +109,16 @@ impl RedisStore { /// Used for testing when determinism is required. pub fn new_from_builder_and_parts( - builder: Builder, + mut builder: Builder, pub_sub_channel: Option, temp_name_generator_fn: fn() -> String, key_prefix: String, ) -> Result { let client_pool = builder + .set_performance_config(PerformanceConfig { + broadcast_channel_capacity: 4096, + ..Default::default() + }) .build_pool(CONNECTION_POOL_SIZE) .err_tip(|| "while creating redis connection pool")?; @@ -345,31 +349,14 @@ impl StoreDriver for RedisStore { let encoded_key = self.encode_key(&key); let encoded_key = encoded_key.as_ref(); - if length == Some(0) { - // We're supposed to read 0 bytes, so just check if the key exists. - let exists = client - .exists(encoded_key) - .await - .err_tip(|| "In RedisStore::get_part::zero_exists")?; - - return match exists { - 0u8 => Err(make_err!( - Code::NotFound, - "Data not found in Redis store for digest: {key:?}" - )), - 1 => writer - .send_eof() - .err_tip(|| "Failed to write EOF in redis store get_part"), - _ => unreachable!("only checked for existence of a single key"), - }; - } - // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we // do math with indices we change them to be exclusive at the end. // We want to read the data at the key from `offset` to `offset + length`. let data_start = offset; - let data_end = data_start.saturating_add(length.unwrap_or(isize::MAX as usize)) - 1; + let data_end = data_start + .saturating_add(length.unwrap_or(isize::MAX as usize)) + .saturating_sub(1); // And we don't ever want to read more than `READ_CHUNK_SIZE` bytes at a time, so we'll need to iterate. let mut chunk_start = data_start; @@ -392,9 +379,7 @@ impl StoreDriver for RedisStore { .err_tip(|| "Failed to write data in RedisStore::get_part")?; } - return writer - .send_eof() - .err_tip(|| "Failed to write EOF in redis store get_part"); + break; // No more data to read. } // We received a full chunk's worth of data, so write it... @@ -407,6 +392,27 @@ impl StoreDriver for RedisStore { chunk_start = chunk_end + 1; chunk_end = cmp::min(chunk_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); } + + // If we didn't write any data, check if the key exists, if not return a NotFound error. + // This is required by spec. + if writer.get_bytes_written() == 0 { + // We're supposed to read 0 bytes, so just check if the key exists. + let exists = client + .exists::(encoded_key) + .await + .err_tip(|| "In RedisStore::get_part::zero_exists")?; + + if !exists { + return Err(make_err!( + Code::NotFound, + "Data not found in Redis store for digest: {key:?}" + )); + } + } + + writer + .send_eof() + .err_tip(|| "Failed to write EOF in redis store get_part") } fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 98adce31c..7b40d15d7 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -22,7 +22,7 @@ use fred::error::RedisError; use fred::mocks::{MockCommand, Mocks}; use fred::prelude::Builder; use fred::types::{RedisConfig, RedisValue}; -use nativelink_error::Error; +use nativelink_error::{Code, Error}; use nativelink_macro::nativelink_test; use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS; use nativelink_store::redis_store::{RedisStore, READ_CHUNK_SIZE}; @@ -518,3 +518,52 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { Ok(()) } + +// Regression test for: https://github.com/TraceMachina/nativelink/issues/1286 +#[nativelink_test] +async fn zero_len_items_exist_check() -> Result<(), Error> { + let mocks = Arc::new(MockRedisBackend::new()); + + let digest = DigestInfo::try_new(VALID_HASH1, 0)?; + let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); + let real_key = RedisValue::Bytes(packed_hash_hex.into()); + + mocks + .expect( + MockCommand { + cmd: Str::from_static("GETRANGE"), + subcommand: None, + args: vec![ + real_key.clone(), + RedisValue::Integer(0), + // We expect to be asked for data from `0..READ_CHUNK_SIZE`, but since GETRANGE is inclusive + // the actual call should be from `0..=(READ_CHUNK_SIZE - 1)`. + RedisValue::Integer(READ_CHUNK_SIZE as i64 - 1), + ], + }, + Ok(RedisValue::String(Str::from_static(""))), + ) + .expect( + MockCommand { + cmd: Str::from_static("EXISTS"), + subcommand: None, + args: vec![real_key], + }, + Ok(RedisValue::Integer(0)), + ); + + let store = { + let mut builder = Builder::default_centralized(); + builder.set_config(RedisConfig { + mocks: Some(Arc::clone(&mocks) as Arc), + ..Default::default() + }); + + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + }; + + let result = store.get_part_unchunked(digest, 0, None).await; + assert_eq!(result.unwrap_err().code, Code::NotFound); + + Ok(()) +}