Skip to content

Commit

Permalink
feat(storage): compare deserialized row value in sanity check (#14178)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Dec 26, 2023
1 parent b1c61b7 commit 590fd7e
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 114 deletions.
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/state_store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ async fn test_replicated_local_hummock_storage() {
let mut local_hummock_storage = hummock_storage
.new_local(NewLocalOptions::new_replicated(
TEST_TABLE_ID,
false,
OpConsistencyLevel::Inconsistent,
TableOption {
retention_seconds: None,
},
Expand Down
10 changes: 8 additions & 2 deletions src/storage/hummock_trace/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,16 @@ impl From<TracedTableOption> for TableOption {
}
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
pub enum TracedOpConsistencyLevel {
Inconsistent,
ConsistentOldValue,
}

#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)]
pub struct TracedNewLocalOptions {
pub table_id: TracedTableId,
pub is_consistent_op: bool,
pub op_consistency_level: TracedOpConsistencyLevel,
pub table_option: TracedTableOption,
pub is_replicated: bool,
}
Expand All @@ -148,7 +154,7 @@ impl TracedNewLocalOptions {
pub(crate) fn for_test(table_id: u32) -> Self {
Self {
table_id: TracedTableId { table_id },
is_consistent_op: true,
op_consistency_level: TracedOpConsistencyLevel::Inconsistent,
table_option: TracedTableOption {
retention_seconds: None,
},
Expand Down
29 changes: 16 additions & 13 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct LocalHummockStorage {
epoch: Option<u64>,

table_id: TableId,
is_consistent_op: bool,
op_consistency_level: OpConsistencyLevel,
table_option: TableOption,

instance_guard: LocalInstanceGuard,
Expand Down Expand Up @@ -300,43 +300,46 @@ impl LocalStateStore for LocalHummockStorage {
// a workaround you may call disable the check by initializing the
// state store with `is_consistent_op=false`.
KeyOp::Insert(value) => {
if ENABLE_SANITY_CHECK && self.is_consistent_op {
if ENABLE_SANITY_CHECK {
do_insert_sanity_check(
key.clone(),
value.clone(),
&key,
&value,
self,
self.epoch(),
self.table_id,
self.table_option,
&self.op_consistency_level,
)
.await?;
}
kv_pairs.push((key, StorageValue::new_put(value)));
}
KeyOp::Delete(old_value) => {
if ENABLE_SANITY_CHECK && self.is_consistent_op {
if ENABLE_SANITY_CHECK {
do_delete_sanity_check(
key.clone(),
old_value,
&key,
&old_value,
self,
self.epoch(),
self.table_id,
self.table_option,
&self.op_consistency_level,
)
.await?;
}
kv_pairs.push((key, StorageValue::new_delete()));
}
KeyOp::Update((old_value, new_value)) => {
if ENABLE_SANITY_CHECK && self.is_consistent_op {
if ENABLE_SANITY_CHECK {
do_update_sanity_check(
key.clone(),
old_value,
new_value.clone(),
&key,
&old_value,
&new_value,
self,
self.epoch(),
self.table_id,
self.table_option,
&self.op_consistency_level,
)
.await?;
}
Expand Down Expand Up @@ -530,11 +533,11 @@ impl LocalHummockStorage {
) -> Self {
let stats = hummock_version_reader.stats().clone();
Self {
mem_table: MemTable::new(option.is_consistent_op),
mem_table: MemTable::new(option.op_consistency_level.clone()),
spill_offset: 0,
epoch: None,
table_id: option.table_id,
is_consistent_op: option.is_consistent_op,
op_consistency_level: option.op_consistency_level,
table_option: option.table_option,
is_replicated: option.is_replicated,
instance_guard,
Expand Down
52 changes: 32 additions & 20 deletions src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::{HummockError, HummockResult};
use crate::error::StorageResult;
use crate::hummock::CachePolicy;
use crate::mem_table::{KeyOp, MemTableError};
use crate::store::{ReadOptions, StateStoreRead};
use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead};

pub fn range_overlap<R, B>(
search_key_range: &R,
Expand Down Expand Up @@ -373,13 +373,17 @@ pub(crate) const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions);

/// Make sure the key to insert should not exist in storage.
pub(crate) async fn do_insert_sanity_check(
key: TableKey<Bytes>,
value: Bytes,
key: &TableKey<Bytes>,
value: &Bytes,
inner: &impl StateStoreRead,
epoch: u64,
table_id: TableId,
table_option: TableOption,
op_consistency_level: &OpConsistencyLevel,
) -> StorageResult<()> {
if let OpConsistencyLevel::Inconsistent = op_consistency_level {
return Ok(());
}
let read_options = ReadOptions {
retention_seconds: table_option.retention_seconds,
table_id,
Expand All @@ -390,9 +394,9 @@ pub(crate) async fn do_insert_sanity_check(

if let Some(stored_value) = stored_value {
return Err(Box::new(MemTableError::InconsistentOperation {
key,
key: key.clone(),
prev: KeyOp::Insert(stored_value),
new: KeyOp::Insert(value),
new: KeyOp::Insert(value.clone()),
})
.into());
}
Expand All @@ -401,13 +405,17 @@ pub(crate) async fn do_insert_sanity_check(

/// Make sure that the key to delete should exist in storage and the value should be matched.
pub(crate) async fn do_delete_sanity_check(
key: TableKey<Bytes>,
old_value: Bytes,
key: &TableKey<Bytes>,
old_value: &Bytes,
inner: &impl StateStoreRead,
epoch: u64,
table_id: TableId,
table_option: TableOption,
op_consistency_level: &OpConsistencyLevel,
) -> StorageResult<()> {
let OpConsistencyLevel::ConsistentOldValue(old_value_checker) = op_consistency_level else {
return Ok(());
};
let read_options = ReadOptions {
retention_seconds: table_option.retention_seconds,
table_id,
Expand All @@ -416,17 +424,17 @@ pub(crate) async fn do_delete_sanity_check(
};
match inner.get(key.clone(), epoch, read_options).await? {
None => Err(Box::new(MemTableError::InconsistentOperation {
key,
key: key.clone(),
prev: KeyOp::Delete(Bytes::default()),
new: KeyOp::Delete(old_value),
new: KeyOp::Delete(old_value.clone()),
})
.into()),
Some(stored_value) => {
if stored_value != old_value {
if !old_value_checker(&stored_value, old_value) {
Err(Box::new(MemTableError::InconsistentOperation {
key,
key: key.clone(),
prev: KeyOp::Insert(stored_value),
new: KeyOp::Delete(old_value),
new: KeyOp::Delete(old_value.clone()),
})
.into())
} else {
Expand All @@ -438,14 +446,18 @@ pub(crate) async fn do_delete_sanity_check(

/// Make sure that the key to update should exist in storage and the value should be matched
pub(crate) async fn do_update_sanity_check(
key: TableKey<Bytes>,
old_value: Bytes,
new_value: Bytes,
key: &TableKey<Bytes>,
old_value: &Bytes,
new_value: &Bytes,
inner: &impl StateStoreRead,
epoch: u64,
table_id: TableId,
table_option: TableOption,
op_consistency_level: &OpConsistencyLevel,
) -> StorageResult<()> {
let OpConsistencyLevel::ConsistentOldValue(old_value_checker) = op_consistency_level else {
return Ok(());
};
let read_options = ReadOptions {
retention_seconds: table_option.retention_seconds,
table_id,
Expand All @@ -455,17 +467,17 @@ pub(crate) async fn do_update_sanity_check(

match inner.get(key.clone(), epoch, read_options).await? {
None => Err(Box::new(MemTableError::InconsistentOperation {
key,
key: key.clone(),
prev: KeyOp::Delete(Bytes::default()),
new: KeyOp::Update((old_value, new_value)),
new: KeyOp::Update((old_value.clone(), new_value.clone())),
})
.into()),
Some(stored_value) => {
if stored_value != old_value {
if !old_value_checker(&stored_value, old_value) {
Err(Box::new(MemTableError::InconsistentOperation {
key,
key: key.clone(),
prev: KeyOp::Insert(stored_value),
new: KeyOp::Update((old_value, new_value)),
new: KeyOp::Update((old_value.clone(), new_value.clone())),
})
.into())
} else {
Expand Down
Loading

0 comments on commit 590fd7e

Please sign in to comment.