Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consistency): enable state table sanity check only in strict consitency mode #16147

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use crate::hummock::shared_buffer::shared_buffer_batch::{
};
use crate::hummock::store::version::{read_filter_for_version, HummockVersionReader};
use crate::hummock::utils::{
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, wait_for_epoch,
ENABLE_SANITY_CHECK,
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
wait_for_epoch,
};
use crate::hummock::write_limiter::WriteLimiterRef;
use crate::hummock::{MemoryLimiter, SstableIterator};
Expand Down Expand Up @@ -321,7 +321,7 @@ impl LocalStateStore for LocalHummockStorage {
// a workaround you may call disable the check by initializing the
// state store with `is_consistent_op=false`.
KeyOp::Insert(value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_insert_sanity_check(
&key,
&value,
Expand All @@ -339,7 +339,7 @@ impl LocalStateStore for LocalHummockStorage {
}
}
KeyOp::Delete(old_value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_delete_sanity_check(
&key,
&old_value,
Expand All @@ -357,7 +357,7 @@ impl LocalStateStore for LocalHummockStorage {
}
}
KeyOp::Update((old_value, new_value)) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_update_sanity_check(
&key,
&old_value,
Expand Down
13 changes: 12 additions & 1 deletion src/storage/src/hummock/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,18 @@ pub fn check_subset_preserve_order<T: Eq>(
true
}

pub(crate) const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions);
static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions));

/// This function is intended to be called during compute node initialization if the storage
/// sanity check is not desired. This controls a global flag so only need to be called once
/// if need to disable the sanity check.
pub fn disable_sanity_check() {
SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release);
}

pub(crate) fn sanity_check_enabled() -> bool {
SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire)
}

/// Make sure the key to insert should not exist in storage.
pub(crate) async fn do_insert_sanity_check(
Expand Down
91 changes: 67 additions & 24 deletions src/storage/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::error::{StorageError, StorageResult};
use crate::hummock::iterator::{FromRustIterator, RustIteratorBuilder};
use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
use crate::hummock::utils::{
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, ENABLE_SANITY_CHECK,
do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled,
};
use crate::hummock::value::HummockValue;
use crate::row_serde::value_serde::ValueRowSerde;
Expand Down Expand Up @@ -138,7 +138,6 @@ impl MemTable {
self.kv_size.add(&pk, &op);
let old_op = self.buffer.insert(pk, op);
self.sub_old_op_size(old_op, key_len);

return Ok(());
};
let entry = self.buffer.entry(pk);
Expand All @@ -152,6 +151,7 @@ impl MemTable {
Entry::Occupied(mut e) => {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);

match old_op {
KeyOp::Delete(ref mut old_op_old_value) => {
let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value));
Expand All @@ -160,12 +160,24 @@ impl MemTable {
Ok(())
}
KeyOp::Insert(_) | KeyOp::Update(_) => {
Err(MemTableError::InconsistentOperation {
let new_op = KeyOp::Insert(value);
let err = MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Insert(value),
new: new_op.clone(),
};

if sanity_check_enabled() {
Err(err.into())
} else {
tracing::error!(
error = %err.as_report(),
"double insert / insert on updated, ignoring because sanity check is disabled"
);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
.into())
}
}
}
Expand Down Expand Up @@ -196,9 +208,10 @@ impl MemTable {
Entry::Occupied(mut e) => {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);

match old_op {
KeyOp::Insert(old_op_new_value) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
Expand All @@ -208,23 +221,37 @@ impl MemTable {

self.kv_size.sub_size(key_len);
e.remove();

Ok(())
}
KeyOp::Delete(_) => Err(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Delete(old_value),
KeyOp::Delete(_) => {
let new_op = KeyOp::Delete(old_value);
let err = MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: new_op.clone(),
};

if sanity_check_enabled() {
Err(err.into())
} else {
tracing::error!(
error = %err.as_report(),
"double delete, ignoring because sanity check is disabled"
);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
}
.into()),
KeyOp::Update((old_op_old_value, old_op_new_value)) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Delete(old_value),
}));
}

let new_op = KeyOp::Delete(std::mem::take(old_op_old_value));
self.kv_size.add_val(&new_op);
e.insert(new_op);
Expand All @@ -247,7 +274,6 @@ impl MemTable {
} = &self.op_consistency_level
else {
let key_len = std::mem::size_of::<Bytes>() + pk.len();

let op = KeyOp::Update((old_value, new_value));
self.kv_size.add(&pk, &op);
let old_op = self.buffer.insert(pk, op);
Expand All @@ -265,39 +291,56 @@ impl MemTable {
Entry::Occupied(mut e) => {
let old_op = e.get_mut();
self.kv_size.sub_val(old_op);

match old_op {
KeyOp::Insert(old_op_new_value) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
}));
}

let new_op = KeyOp::Insert(new_value);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
KeyOp::Update((old_op_old_value, old_op_new_value)) => {
if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) {
if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) {
return Err(Box::new(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
}));
}

let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value));
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
KeyOp::Delete(_) => Err(MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: KeyOp::Update((old_value, new_value)),
KeyOp::Delete(_) => {
let new_op = KeyOp::Update((old_value, new_value));
let err = MemTableError::InconsistentOperation {
key: e.key().clone(),
prev: e.get().clone(),
new: new_op.clone(),
};

if sanity_check_enabled() {
Err(err.into())
} else {
tracing::error!(
error = %err.as_report(),
"update on deleted, ignoring because sanity check is disabled"
);
self.kv_size.add_val(&new_op);
e.insert(new_op);
Ok(())
}
}
.into()),
}
}
}
Expand Down Expand Up @@ -532,7 +575,7 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
// a workaround you may call disable the check by initializing the
// state store with `op_consistency_level=Inconsistent`.
KeyOp::Insert(value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_insert_sanity_check(
&key,
&value,
Expand All @@ -547,7 +590,7 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
kv_pairs.push((key, StorageValue::new_put(value)));
}
KeyOp::Delete(old_value) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_delete_sanity_check(
&key,
&old_value,
Expand All @@ -562,7 +605,7 @@ impl<S: StateStoreWrite + StateStoreRead> LocalStateStore for MemtableLocalState
kv_pairs.push((key, StorageValue::new_delete()));
}
KeyOp::Update((old_value, new_value)) => {
if ENABLE_SANITY_CHECK {
if sanity_check_enabled() {
do_update_sanity_check(
&key,
&old_value,
Expand Down
11 changes: 3 additions & 8 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,8 @@ where
)
};

let state_table_op_consistency_level = if crate::consistency::insane() {
// In insane mode, we will have inconsistent operations applied on the table, even if
// our executor code do not expect that.
StateTableOpConsistencyLevel::Inconsistent
} else {
op_consistency_level
};
let op_consistency_level = match state_table_op_consistency_level {
let state_table_op_consistency_level = op_consistency_level;
let op_consistency_level = match op_consistency_level {
StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
StateTableOpConsistencyLevel::ConsistentOldValue => {
let row_serde = make_row_serde();
Expand Down Expand Up @@ -600,6 +594,7 @@ where
.await
}

/// Create a state table with distribution and without sanity check, used for unit tests.
pub async fn new_with_distribution_inconsistent_op(
store: S,
table_id: TableId,
Expand Down
8 changes: 3 additions & 5 deletions src/stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,10 @@ mod consistency {
/// Check if strict consistency is required.
pub(crate) fn enable_strict_consistency() -> bool {
let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency);
if cfg!(test) {
// use default value in tests
res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
} else {
res.expect("streaming CONFIG is not set, which is highly probably a bug")
if res.is_err() && cfg!(not(test)) {
tracing::warn!("streaming CONFIG is not set, which is probably a bug");
}
res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency())
}

/// Log an error message for breaking consistency. Must only be called in non-strict mode.
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ impl LocalStreamManager {
await_tree_config: Option<await_tree::Config>,
watermark_epoch: AtomicU64Ref,
) -> Self {
if !env.config().unsafe_enable_strict_consistency {
// If strict consistency is disabled, should disable storage sanity check.
// Since this is a special config, we have to check it here.
risingwave_storage::hummock::utils::disable_sanity_check();
}

let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);

let (actor_op_tx, actor_op_rx) = unbounded_channel();
Expand Down
Loading