Skip to content

Commit

Permalink
Fix bug in redis store when zero data stored but data does not exist (#…
Browse files Browse the repository at this point in the history
…1304)

Note: This is unlikely to have been observed from the RBE API.

We are supposed to check if an item exists even if it has zero bytes
stored in it, but since redis is quite tricky to do this, we were only
checking existance if it was a zero byte digest or if the length was
zero bytes. In this case we might have a zero-byte proto that is
actually valid that does not follow the CAS rules (hash of contents
equals key).

This is in prep for a Redis Scheduler.

fixes #1286
  • Loading branch information
allada authored Sep 3, 2024
1 parent 9f0e809 commit 59020f1
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 25 deletions.
54 changes: 30 additions & 24 deletions nativelink-store/src/redis_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,16 @@ impl RedisStore {

/// Used for testing when determinism is required.
pub fn new_from_builder_and_parts(
builder: Builder,
mut builder: Builder,
pub_sub_channel: Option<String>,
temp_name_generator_fn: fn() -> String,
key_prefix: String,
) -> Result<Self, Error> {
let client_pool = builder
.set_performance_config(PerformanceConfig {
broadcast_channel_capacity: 4096,
..Default::default()
})
.build_pool(CONNECTION_POOL_SIZE)
.err_tip(|| "while creating redis connection pool")?;

Expand Down Expand Up @@ -345,31 +349,14 @@ impl StoreDriver for RedisStore {
let encoded_key = self.encode_key(&key);
let encoded_key = encoded_key.as_ref();

if length == Some(0) {
// We're supposed to read 0 bytes, so just check if the key exists.
let exists = client
.exists(encoded_key)
.await
.err_tip(|| "In RedisStore::get_part::zero_exists")?;

return match exists {
0u8 => Err(make_err!(
Code::NotFound,
"Data not found in Redis store for digest: {key:?}"
)),
1 => writer
.send_eof()
.err_tip(|| "Failed to write EOF in redis store get_part"),
_ => unreachable!("only checked for existence of a single key"),
};
}

// N.B. the `-1`'s you see here are because redis GETRANGE is inclusive at both the start and end, so when we
// do math with indices we change them to be exclusive at the end.

// We want to read the data at the key from `offset` to `offset + length`.
let data_start = offset;
let data_end = data_start.saturating_add(length.unwrap_or(isize::MAX as usize)) - 1;
let data_end = data_start
.saturating_add(length.unwrap_or(isize::MAX as usize))
.saturating_sub(1);

// And we don't ever want to read more than `READ_CHUNK_SIZE` bytes at a time, so we'll need to iterate.
let mut chunk_start = data_start;
Expand All @@ -392,9 +379,7 @@ impl StoreDriver for RedisStore {
.err_tip(|| "Failed to write data in RedisStore::get_part")?;
}

return writer
.send_eof()
.err_tip(|| "Failed to write EOF in redis store get_part");
break; // No more data to read.
}

// We received a full chunk's worth of data, so write it...
Expand All @@ -407,6 +392,27 @@ impl StoreDriver for RedisStore {
chunk_start = chunk_end + 1;
chunk_end = cmp::min(chunk_start.saturating_add(READ_CHUNK_SIZE) - 1, data_end);
}

// If we didn't write any data, check if the key exists, if not return a NotFound error.
// This is required by spec.
if writer.get_bytes_written() == 0 {
// We're supposed to read 0 bytes, so just check if the key exists.
let exists = client
.exists::<bool, _>(encoded_key)
.await
.err_tip(|| "In RedisStore::get_part::zero_exists")?;

if !exists {
return Err(make_err!(
Code::NotFound,
"Data not found in Redis store for digest: {key:?}"
));
}
}

writer
.send_eof()
.err_tip(|| "Failed to write EOF in redis store get_part")
}

fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
Expand Down
51 changes: 50 additions & 1 deletion nativelink-store/tests/redis_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use fred::error::RedisError;
use fred::mocks::{MockCommand, Mocks};
use fred::prelude::Builder;
use fred::types::{RedisConfig, RedisValue};
use nativelink_error::Error;
use nativelink_error::{Code, Error};
use nativelink_macro::nativelink_test;
use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS;
use nativelink_store::redis_store::{RedisStore, READ_CHUNK_SIZE};
Expand Down Expand Up @@ -518,3 +518,52 @@ async fn yield_between_sending_packets_in_update() -> Result<(), Error> {

Ok(())
}

// Regression test for: https://github.com/TraceMachina/nativelink/issues/1286
#[nativelink_test]
async fn zero_len_items_exist_check() -> Result<(), Error> {
let mocks = Arc::new(MockRedisBackend::new());

let digest = DigestInfo::try_new(VALID_HASH1, 0)?;
let packed_hash_hex = format!("{}-{}", digest.hash_str(), digest.size_bytes);
let real_key = RedisValue::Bytes(packed_hash_hex.into());

mocks
.expect(
MockCommand {
cmd: Str::from_static("GETRANGE"),
subcommand: None,
args: vec![
real_key.clone(),
RedisValue::Integer(0),
// We expect to be asked for data from `0..READ_CHUNK_SIZE`, but since GETRANGE is inclusive
// the actual call should be from `0..=(READ_CHUNK_SIZE - 1)`.
RedisValue::Integer(READ_CHUNK_SIZE as i64 - 1),
],
},
Ok(RedisValue::String(Str::from_static(""))),
)
.expect(
MockCommand {
cmd: Str::from_static("EXISTS"),
subcommand: None,
args: vec![real_key],
},
Ok(RedisValue::Integer(0)),
);

let store = {
let mut builder = Builder::default_centralized();
builder.set_config(RedisConfig {
mocks: Some(Arc::clone(&mocks) as Arc<dyn Mocks>),
..Default::default()
});

RedisStore::new_from_builder_and_parts(builder, None, mock_uuid_generator, String::new())?
};

let result = store.get_part_unchunked(digest, 0, None).await;
assert_eq!(result.unwrap_err().code, Code::NotFound);

Ok(())
}

0 comments on commit 59020f1

Please sign in to comment.