From 59020f1e9c7f103afc4a8246dc17cae9910b3121 Mon Sep 17 00:00:00 2001 From: "Nathan (Blaise) Bruer" Date: Tue, 3 Sep 2024 13:39:33 -0500 Subject: [PATCH] Fix bug in redis store when zero data stored but data does not exist (#1304) Note: This is unlikely to have been observed from the RBE API. We are supposed to check if an item exists even if it has zero bytes stored in it, but since redis is quite tricky to do this, we were only checking existance if it was a zero byte digest or if the length was zero bytes. In this case we might have a zero-byte proto that is actually valid that does not follow the CAS rules (hash of contents equals key). This is in prep for a Redis Scheduler. fixes #1286 --- nativelink-store/src/redis_store.rs | 54 ++++++++++++---------- nativelink-store/tests/redis_store_test.rs | 51 +++++++++++++++++++- 2 files changed, 80 insertions(+), 25 deletions(-) diff --git a/nativelink-store/src/redis_store.rs b/nativelink-store/src/redis_store.rs index 222a6af55..9aa531096 100644 --- a/nativelink-store/src/redis_store.rs +++ b/nativelink-store/src/redis_store.rs @@ -109,12 +109,16 @@ impl RedisStore { /// Used for testing when determinism is required. pub fn new_from_builder_and_parts( - builder: Builder, + mut builder: Builder, pub_sub_channel: Option, temp_name_generator_fn: fn() -> String, key_prefix: String, ) -> Result { let client_pool = builder + .set_performance_config(PerformanceConfig { + broadcast_channel_capacity: 4096, + ..Default::default() + }) .build_pool(CONNECTION_POOL_SIZE) .err_tip(|| "while creating redis connection pool")?; @@ -345,31 +349,14 @@ impl StoreDriver for RedisStore { let encoded_key = self.encode_key(&key); let encoded_key = encoded_key.as_ref(); - if length == Some(0) { - // We're supposed to read 0 bytes, so just check if the key exists. - let exists = client - .exists(encoded_key) - .await - .err_tip(|| "In RedisStore::get_part::zero_exists")?; - - return match exists { - 0u8 => Err(make_err!( - Code::NotFound, - "Data not found in Redis store for digest: {key:?}" - )), - 1 => writer - .send_eof() - .err_tip(|| "Failed to write EOF in redis store get_part"), - _ => unreachable!("only checked for existence of a single key"), - }; - } - // N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we // do math with indices we change them to be exclusive at the end. // We want to read the data at the key from `offset` to `offset + length`. let data_start = offset; - let data_end = data_start.saturating_add(length.unwrap_or(isize::MAX as usize)) - 1; + let data_end = data_start + .saturating_add(length.unwrap_or(isize::MAX as usize)) + .saturating_sub(1); // And we don't ever want to read more than `READ_CHUNK_SIZE` bytes at a time, so we'll need to iterate. let mut chunk_start = data_start; @@ -392,9 +379,7 @@ impl StoreDriver for RedisStore { .err_tip(|| "Failed to write data in RedisStore::get_part")?; } - return writer - .send_eof() - .err_tip(|| "Failed to write EOF in redis store get_part"); + break; // No more data to read. } // We received a full chunk's worth of data, so write it... @@ -407,6 +392,27 @@ impl StoreDriver for RedisStore { chunk_start = chunk_end + 1; chunk_end = cmp::min(chunk_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end); } + + // If we didn't write any data, check if the key exists, if not return a NotFound error. + // This is required by spec. + if writer.get_bytes_written() == 0 { + // We're supposed to read 0 bytes, so just check if the key exists. + let exists = client + .exists::(encoded_key) + .await + .err_tip(|| "In RedisStore::get_part::zero_exists")?; + + if !exists { + return Err(make_err!( + Code::NotFound, + "Data not found in Redis store for digest: {key:?}" + )); + } + } + + writer + .send_eof() + .err_tip(|| "Failed to write EOF in redis store get_part") } fn inner_store(&self, _digest: Option) -> &dyn StoreDriver { diff --git a/nativelink-store/tests/redis_store_test.rs b/nativelink-store/tests/redis_store_test.rs index 98adce31c..7b40d15d7 100644 --- a/nativelink-store/tests/redis_store_test.rs +++ b/nativelink-store/tests/redis_store_test.rs @@ -22,7 +22,7 @@ use fred::error::RedisError; use fred::mocks::{MockCommand, Mocks}; use fred::prelude::Builder; use fred::types::{RedisConfig, RedisValue}; -use nativelink_error::Error; +use nativelink_error::{Code, Error}; use nativelink_macro::nativelink_test; use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS; use nativelink_store::redis_store::{RedisStore, READ_CHUNK_SIZE}; @@ -518,3 +518,52 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> { Ok(()) } + +// Regression test for: https://github.com/TraceMachina/nativelink/issues/1286 +#[nativelink_test] +async fn zero_len_items_exist_check() -> Result<(), Error> { + let mocks = Arc::new(MockRedisBackend::new()); + + let digest = DigestInfo::try_new(VALID_HASH1, 0)?; + let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes); + let real_key = RedisValue::Bytes(packed_hash_hex.into()); + + mocks + .expect( + MockCommand { + cmd: Str::from_static("GETRANGE"), + subcommand: None, + args: vec![ + real_key.clone(), + RedisValue::Integer(0), + // We expect to be asked for data from `0..READ_CHUNK_SIZE`, but since GETRANGE is inclusive + // the actual call should be from `0..=(READ_CHUNK_SIZE - 1)`. + RedisValue::Integer(READ_CHUNK_SIZE as i64 - 1), + ], + }, + Ok(RedisValue::String(Str::from_static(""))), + ) + .expect( + MockCommand { + cmd: Str::from_static("EXISTS"), + subcommand: None, + args: vec![real_key], + }, + Ok(RedisValue::Integer(0)), + ); + + let store = { + let mut builder = Builder::default_centralized(); + builder.set_config(RedisConfig { + mocks: Some(Arc::clone(&mocks) as Arc), + ..Default::default() + }); + + RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())? + }; + + let result = store.get_part_unchunked(digest, 0, None).await; + assert_eq!(result.unwrap_err().code, Code::NotFound); + + Ok(()) +}