Skip to content

Commit

Permalink
fix(consistency): enable state table sanity check only in strict cons…
Browse files Browse the repository at this point in the history
…itency mode (#16147)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored May 24, 2024
1 parent eb3f53f commit 7a16a2c
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 43 deletions.
10 changes: 5 additions & 5 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use crate::hummock::shared_buffer::shared_buffer_batch::{
};
use crate::hummock::store::version::{read_filter_for_version, HummockVersionReader};
use crate::hummock::utils::{
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, wait_for_epoch,
ENABLE_SANITY_CHECK,
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
wait_for_epoch,
};
use crate::hummock::write_limiter::WriteLimiterRef;
use crate::hummock::{
Expand Down Expand Up @@ -409,7 +409,7 @@ impl LocalStateStore for LocalHummockStorage {
// a workaround you may call disable the check by initializing the
// state store with `is_consistent_op=false`.
KeyOp::Insert(value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_insert_sanity_check(
&key,
&value,
Expand All @@ -427,7 +427,7 @@ impl LocalStateStore for LocalHummockStorage {
}
}
KeyOp::Delete(old_value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_delete_sanity_check(
&key,
&old_value,
Expand All @@ -445,7 +445,7 @@ impl LocalStateStore for LocalHummockStorage {
}
}
KeyOp::Update((old_value, new_value)) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_update_sanity_check(
&key,
&old_value,
Expand Down
13 changes: 12 additions & 1 deletion src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,18 @@ pub fn check_subset_preserve_order<T: Eq>(
true
}

pub(crate) const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions);
static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));

/// This function is intended to be called during compute node initialization if the storage
/// sanity check is not desired. This controls a global flag so only need to be called once
/// if need to disable the sanity check.
pub fn disable_sanity_check() {
SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
}

pub(crate) fn sanity_check_enabled() -> bool {
SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
}

/// Make sure the key to insert should not exist in storage.
pub(crate) async fn do_insert_sanity_check(
Expand Down
91 changes: 67 additions & 24 deletions src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::error::{StorageError, StorageResult};
use crate::hummock::iterator::{Backward, Forward, FromRustIterator, RustIteratorBuilder};
use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
use crate::hummock::utils::{
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, ENABLE_SANITY_CHECK,
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
};
use crate::hummock::value::HummockValue;
use crate::row_serde::value_serde::ValueRowSerde;
Expand Down Expand Up @@ -162,7 +162,6 @@ impl MemTable {
self.kv_size.add(&pk, &op);
let old_op = self.buffer.insert(pk, op);
self.sub_old_op_size(old_op, key_len);

return Ok(());
};
let entry = self.buffer.entry(pk);
Expand All @@ -176,6 +175,7 @@ impl MemTable {
Entry::Occupied(mut e) => {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);

match old_op {
KeyOp::Delete(ref mut old_op_old_value) => {
let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
Expand All @@ -184,12 +184,24 @@ impl MemTable {
Ok(())
}
KeyOp::Insert(_) | KeyOp::Update(_) => {
Err(MemTableError::InconsistentOperation {
let new_op = KeyOp::Insert(value);
let err = MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Insert(value),
new: new_op.clone(),
};

if sanity_check_enabled() {
Err(err.into())
} else {
tracing::error!(
error = %err.as_report(),
"double insert / insert on updated, ignoring because sanity check is disabled"
);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
.into())
}
}
}
Expand Down Expand Up @@ -220,9 +232,10 @@ impl MemTable {
Entry::Occupied(mut e) => {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);

match old_op {
KeyOp::Insert(old_op_new_value) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
Expand All @@ -232,23 +245,37 @@ impl MemTable {

self.kv_size.sub_size(key_len);
e.remove();

Ok(())
}
KeyOp::Delete(_) => Err(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Delete(old_value),
KeyOp::Delete(_) => {
let new_op = KeyOp::Delete(old_value);
let err = MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: new_op.clone(),
};

if sanity_check_enabled() {
Err(err.into())
} else {
tracing::error!(
error = %err.as_report(),
"double delete, ignoring because sanity check is disabled"
);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
}
.into()),
KeyOp::Update((old_op_old_value, old_op_new_value)) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Delete(old_value),
}));
}

let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
self.kv_size.add_val(&new_op);
e.insert(new_op);
Expand All @@ -271,7 +298,6 @@ impl MemTable {
} = &self.op_consistency_level
else {
let key_len = std::mem::size_of::<Bytes>() + pk.len();

let op = KeyOp::Update((old_value, new_value));
self.kv_size.add(&pk, &op);
let old_op = self.buffer.insert(pk, op);
Expand All @@ -289,39 +315,56 @@ impl MemTable {
Entry::Occupied(mut e) => {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);

match old_op {
KeyOp::Insert(old_op_new_value) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
}));
}

let new_op = KeyOp::Insert(new_value);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
KeyOp::Update((old_op_old_value, old_op_new_value)) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
}));
}

let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
KeyOp::Delete(_) => Err(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
KeyOp::Delete(_) => {
let new_op = KeyOp::Update((old_value, new_value));
let err = MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: new_op.clone(),
};

if sanity_check_enabled() {
Err(err.into())
} else {
tracing::error!(
error = %err.as_report(),
"update on deleted, ignoring because sanity check is disabled"
);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
}
.into()),
}
}
}
Expand Down Expand Up @@ -587,7 +630,7 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
// a workaround you may call disable the check by initializing the
// state store with `op_consistency_level=Inconsistent`.
KeyOp::Insert(value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_insert_sanity_check(
&key,
&value,
Expand All @@ -602,7 +645,7 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
kv_pairs.push((key, StorageValue::new_put(value)));
}
KeyOp::Delete(old_value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_delete_sanity_check(
&key,
&old_value,
Expand All @@ -617,7 +660,7 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
kv_pairs.push((key, StorageValue::new_delete()));
}
KeyOp::Update((old_value, new_value)) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_update_sanity_check(
&key,
&old_value,
Expand Down
11 changes: 3 additions & 8 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,8 @@ where
)
};

let state_table_op_consistency_level = if crate::consistency::insane() {
// In insane mode, we will have inconsistent operations applied on the table, even if
// our executor code do not expect that.
StateTableOpConsistencyLevel::Inconsistent
} else {
op_consistency_level
};
let op_consistency_level = match state_table_op_consistency_level {
let state_table_op_consistency_level = op_consistency_level;
let op_consistency_level = match op_consistency_level {
StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
StateTableOpConsistencyLevel::ConsistentOldValue => {
let row_serde = make_row_serde();
Expand Down Expand Up @@ -600,6 +594,7 @@ where
.await
}

/// Create a state table with distribution and without sanity check, used for unit tests.
pub async fn new_with_distribution_inconsistent_op(
store: S,
table_id: TableId,
Expand Down
8 changes: 3 additions & 5 deletions src/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,10 @@ mod consistency {
/// Check if strict consistency is required.
pub(crate) fn enable_strict_consistency() -> bool {
let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency);
if cfg!(test) {
// use default value in tests
res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
} else {
res.expect("streaming CONFIG is not set, which is highly probably a bug")
if res.is_err() && cfg!(not(test)) {
tracing::warn!("streaming CONFIG is not set, which is probably a bug");
}
res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
}

/// Log an error message for breaking consistency. Must only be called in non-strict mode.
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ impl LocalStreamManager {
await_tree_config: Option<await_tree::Config>,
watermark_epoch: AtomicU64Ref,
) -> Self {
if !env.config().unsafe_enable_strict_consistency {
// If strict consistency is disabled, should disable storage sanity check.
// Since this is a special config, we have to check it here.
risingwave_storage::hummock::utils::disable_sanity_check();
}

let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);

let (actor_op_tx, actor_op_rx) = unbounded_channel();
Expand Down

0 comments on commit 7a16a2c

Please sign in to comment.