From 05518a45dadd9b178d2e9e26bae4d0956d68a0ec Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 20 May 2024 13:18:59 +0800 Subject: [PATCH 1/5] align state table and streaming engine in terms of inconsistent mode Signed-off-by: Richard Chien --- src/storage/src/store.rs | 1 - src/stream/src/common/table/state_table.rs | 22 +++++++++++++------- src/stream/src/executor/mview/materialize.rs | 8 +++++-- src/stream/src/lib.rs | 8 +++---- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index f389691f94c20..1fa21fa9d8366 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -602,7 +602,6 @@ impl Eq for OpConsistencyLevel {} impl OpConsistencyLevel { pub fn update(&mut self, new_level: &OpConsistencyLevel) { - assert_ne!(self, new_level); *self = new_level.clone() } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 5be9dd96feb02..2b6f3cc7656d0 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -66,6 +66,7 @@ use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; use crate::cache::cache_may_stale; use crate::common::cache::{StateCache, StateCacheFiller}; use crate::common::table::state_table_cache::StateTableWatermarkCache; +use crate::consistency::enable_strict_consistency; use crate::executor::{StreamExecutorError, StreamExecutorResult}; /// This num is arbitrary and we may want to improve this choice in the future. @@ -408,12 +409,11 @@ where ) }; - let state_table_op_consistency_level = if crate::consistency::insane() { - // In insane mode, we will have inconsistent operations applied on the table, even if - // our executor code do not expect that. - StateTableOpConsistencyLevel::Inconsistent - } else { + let state_table_op_consistency_level = if enable_strict_consistency() { op_consistency_level + } else { + // disable sanity check in non-strict mode + StateTableOpConsistencyLevel::Inconsistent }; let op_consistency_level = match state_table_op_consistency_level { StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, @@ -644,10 +644,11 @@ where Arc::from(table_columns.clone().into_boxed_slice()), ) }; - let op_consistency_level = if is_consistent_op { + let op_consistency_level = if is_consistent_op && enable_strict_consistency() { let row_serde = make_row_serde(); consistent_old_value_op(row_serde, false) } else { + // disable sanity check in non-strict mode OpConsistencyLevel::Inconsistent }; let local_state_store = store @@ -1167,8 +1168,13 @@ where assert_eq!(self.epoch(), new_epoch.prev); let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| { assert_ne!(self.op_consistency_level, new_consistency_level); - self.op_consistency_level = new_consistency_level; - match new_consistency_level { + self.op_consistency_level = if enable_strict_consistency() { + new_consistency_level + } else { + // disable sanity check in non-strict mode + StateTableOpConsistencyLevel::Inconsistent + }; + match self.op_consistency_level { StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, StateTableOpConsistencyLevel::ConsistentOldValue => { consistent_old_value_op(self.row_serde.clone(), false) diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 9ad0dfbdfcea3..234d579715b4f 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -37,6 +37,7 @@ use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel}; +use crate::consistency::enable_strict_consistency; use crate::executor::prelude::*; /// `MaterializeExecutor` materializes changes in stream into a materialized view on storage. @@ -68,7 +69,10 @@ fn get_op_consistency_level( may_have_downstream: bool, depended_subscriptions: &HashSet, ) -> StateTableOpConsistencyLevel { - if !depended_subscriptions.is_empty() { + if !enable_strict_consistency() { + // disable sanity check in non-strict mode + StateTableOpConsistencyLevel::Inconsistent + } else if !depended_subscriptions.is_empty() { StateTableOpConsistencyLevel::LogStoreEnabled } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) { // Table with overwrite conflict behavior could disable conflict check @@ -274,7 +278,7 @@ impl MaterializeExecutor { self.state_table .commit_may_switch_consistent_op(b.epoch, op_consistency_level) .await?; - if !self.state_table.is_consistent_op() { + if enable_strict_consistency() && !self.state_table.is_consistent_op() { assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite); } diff --git a/src/stream/src/lib.rs b/src/stream/src/lib.rs index a6215d6e47aa1..51a735af0f5a7 100644 --- a/src/stream/src/lib.rs +++ b/src/stream/src/lib.rs @@ -93,12 +93,10 @@ mod consistency { /// Check if strict consistency is required. pub(crate) fn enable_strict_consistency() -> bool { let res = crate::CONFIG.try_with(|config| config.unsafe_enable_strict_consistency); - if cfg!(test) { - // use default value in tests - res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency()) - } else { - res.expect("streaming CONFIG is not set, which is highly probably a bug") + if res.is_err() && cfg!(not(test)) { + tracing::warn!("streaming CONFIG is not set, which is probably a bug"); } + res.unwrap_or_else(|_| default::streaming::unsafe_enable_strict_consistency()) } /// Log an error message for breaking consistency. Must only be called in non-strict mode. From 4b5c78940354bf92897df4a0249a0d7ac986f490 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 20 May 2024 14:37:40 +0800 Subject: [PATCH 2/5] restore check in OpConsistencyLevel::update Signed-off-by: Richard Chien --- src/storage/src/store.rs | 1 + src/stream/src/common/table/state_table.rs | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 1fa21fa9d8366..f389691f94c20 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -602,6 +602,7 @@ impl Eq for OpConsistencyLevel {} impl OpConsistencyLevel { pub fn update(&mut self, new_level: &OpConsistencyLevel) { + assert_ne!(self, new_level); *self = new_level.clone() } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 2b6f3cc7656d0..e61b847b8bc3c 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1152,6 +1152,12 @@ where new_epoch: EpochPair, op_consistency_level: StateTableOpConsistencyLevel, ) -> StreamExecutorResult<()> { + let op_consistency_level = if enable_strict_consistency() { + op_consistency_level + } else { + // disable sanity check in non-strict mode + StateTableOpConsistencyLevel::Inconsistent + }; if self.op_consistency_level != op_consistency_level { self.commit_inner(new_epoch, Some(op_consistency_level)) .await @@ -1168,13 +1174,8 @@ where assert_eq!(self.epoch(), new_epoch.prev); let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| { assert_ne!(self.op_consistency_level, new_consistency_level); - self.op_consistency_level = if enable_strict_consistency() { - new_consistency_level - } else { - // disable sanity check in non-strict mode - StateTableOpConsistencyLevel::Inconsistent - }; - match self.op_consistency_level { + self.op_consistency_level = new_consistency_level; + match new_consistency_level { StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, StateTableOpConsistencyLevel::ConsistentOldValue => { consistent_old_value_op(self.row_serde.clone(), false) From 35862ef7d66488364f26be915f0128f3c3d918ed Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 20 May 2024 15:24:44 +0800 Subject: [PATCH 3/5] move strict mode check to `from_table_catalog` Signed-off-by: Richard Chien --- src/stream/src/common/table/state_table.rs | 34 +++++++++------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index e61b847b8bc3c..dde7818319d75 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -284,11 +284,16 @@ where store: S, vnodes: Option>, ) -> Self { + let consistency_level = if enable_strict_consistency() { + StateTableOpConsistencyLevel::ConsistentOldValue + } else { + StateTableOpConsistencyLevel::Inconsistent + }; Self::from_table_catalog_with_consistency_level( table_catalog, store, vnodes, - StateTableOpConsistencyLevel::ConsistentOldValue, + consistency_level, ) .await } @@ -409,13 +414,8 @@ where ) }; - let state_table_op_consistency_level = if enable_strict_consistency() { - op_consistency_level - } else { - // disable sanity check in non-strict mode - StateTableOpConsistencyLevel::Inconsistent - }; - let op_consistency_level = match state_table_op_consistency_level { + let state_table_op_consistency_level = op_consistency_level; + let op_consistency_level = match op_consistency_level { StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, StateTableOpConsistencyLevel::ConsistentOldValue => { let row_serde = make_row_serde(); @@ -563,7 +563,7 @@ where order_types: Vec, pk_indices: Vec, ) -> Self { - Self::new_with_distribution_inner( + Self::new_with_distribution_inner_for_test( store, table_id, columns, @@ -587,7 +587,7 @@ where distribution: TableDistribution, value_indices: Option>, ) -> Self { - Self::new_with_distribution_inner( + Self::new_with_distribution_inner_for_test( store, table_id, table_columns, @@ -600,6 +600,7 @@ where .await } + /// Create a state table with distribution and without sanity check, used for unit tests. pub async fn new_with_distribution_inconsistent_op( store: S, table_id: TableId, @@ -609,7 +610,7 @@ where distribution: TableDistribution, value_indices: Option>, ) -> Self { - Self::new_with_distribution_inner( + Self::new_with_distribution_inner_for_test( store, table_id, table_columns, @@ -623,7 +624,7 @@ where } #[allow(clippy::too_many_arguments)] - async fn new_with_distribution_inner( + async fn new_with_distribution_inner_for_test( store: S, table_id: TableId, table_columns: Vec, @@ -644,11 +645,10 @@ where Arc::from(table_columns.clone().into_boxed_slice()), ) }; - let op_consistency_level = if is_consistent_op && enable_strict_consistency() { + let op_consistency_level = if is_consistent_op { let row_serde = make_row_serde(); consistent_old_value_op(row_serde, false) } else { - // disable sanity check in non-strict mode OpConsistencyLevel::Inconsistent }; let local_state_store = store @@ -1152,12 +1152,6 @@ where new_epoch: EpochPair, op_consistency_level: StateTableOpConsistencyLevel, ) -> StreamExecutorResult<()> { - let op_consistency_level = if enable_strict_consistency() { - op_consistency_level - } else { - // disable sanity check in non-strict mode - StateTableOpConsistencyLevel::Inconsistent - }; if self.op_consistency_level != op_consistency_level { self.commit_inner(new_epoch, Some(op_consistency_level)) .await From 6ca2d1aca5c1e074b7a830c3f10a99319f0f1d7d Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 23 May 2024 18:13:38 +0800 Subject: [PATCH 4/5] overwrite SANITY_CHECK_ENABLED for non-strict consistency mode Signed-off-by: Richard Chien --- .../hummock/store/local_hummock_storage.rs | 10 +- src/storage/src/hummock/utils.rs | 10 +- src/storage/src/mem_table.rs | 105 +++++++++++++----- src/stream/src/common/table/state_table.rs | 16 +-- src/stream/src/executor/mview/materialize.rs | 8 +- src/stream/src/task/stream_manager.rs | 6 + 6 files changed, 105 insertions(+), 50 deletions(-) diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index ae104c076a440..5a579fbd44ff8 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -39,8 +39,8 @@ use crate::hummock::shared_buffer::shared_buffer_batch::{ }; use crate::hummock::store::version::{read_filter_for_version, HummockVersionReader}; use crate::hummock::utils::{ - do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, wait_for_epoch, - ENABLE_SANITY_CHECK, + do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled, + wait_for_epoch, }; use crate::hummock::write_limiter::WriteLimiterRef; use crate::hummock::{MemoryLimiter, SstableIterator}; @@ -321,7 +321,7 @@ impl LocalStateStore for LocalHummockStorage { // a workaround you may call disable the check by initializing the // state store with `is_consistent_op=false`. KeyOp::Insert(value) => { - if ENABLE_SANITY_CHECK { + if sanity_check_enabled() { do_insert_sanity_check( &key, &value, @@ -339,7 +339,7 @@ impl LocalStateStore for LocalHummockStorage { } } KeyOp::Delete(old_value) => { - if ENABLE_SANITY_CHECK { + if sanity_check_enabled() { do_delete_sanity_check( &key, &old_value, @@ -357,7 +357,7 @@ impl LocalStateStore for LocalHummockStorage { } } KeyOp::Update((old_value, new_value)) => { - if ENABLE_SANITY_CHECK { + if sanity_check_enabled() { do_update_sanity_check( &key, &old_value, diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index a65f272e4fdb9..8d82b40a7a3e7 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -389,7 +389,15 @@ pub fn check_subset_preserve_order( true } -pub(crate) const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions); +static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions)); + +pub fn set_sanity_check_enabled(enabled: bool) { + SANITY_CHECK_ENABLED.store(enabled, AtomicOrdering::Release); +} + +pub(crate) fn sanity_check_enabled() -> bool { + SANITY_CHECK_ENABLED.load(AtomicOrdering::Acquire) +} /// Make sure the key to insert should not exist in storage. pub(crate) async fn do_insert_sanity_check( diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index bc7b43b1806ea..cb2c1e738c0d2 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -38,7 +38,7 @@ use crate::error::{StorageError, StorageResult}; use crate::hummock::iterator::{FromRustIterator, RustIteratorBuilder}; use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId}; use crate::hummock::utils::{ - do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, ENABLE_SANITY_CHECK, + do_delete_sanity_check, do_insert_sanity_check, do_update_sanity_check, sanity_check_enabled, }; use crate::hummock::value::HummockValue; use crate::row_serde::value_serde::ValueRowSerde; @@ -138,7 +138,6 @@ impl MemTable { self.kv_size.add(&pk, &op); let old_op = self.buffer.insert(pk, op); self.sub_old_op_size(old_op, key_len); - return Ok(()); }; let entry = self.buffer.entry(pk); @@ -151,21 +150,36 @@ impl MemTable { } Entry::Occupied(mut e) => { let old_op = e.get_mut(); - self.kv_size.sub_val(old_op); + let old_op_size = old_op.estimated_size(); + match old_op { KeyOp::Delete(ref mut old_op_old_value) => { let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value)); + self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) } KeyOp::Insert(_) | KeyOp::Update(_) => { - Err(MemTableError::InconsistentOperation { + let new_op = KeyOp::Insert(value); + let err = MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), - new: KeyOp::Insert(value), + new: new_op.clone(), + }; + + if sanity_check_enabled() { + Err(err.into()) + } else { + tracing::error!( + error = %err.as_report(), + "double insert / insert on updated, ignoring because sanity check is disabled" + ); + self.kv_size.sub_size(old_op_size); + self.kv_size.add_val(&new_op); + e.insert(new_op); + Ok(()) } - .into()) } } } @@ -195,10 +209,11 @@ impl MemTable { } Entry::Occupied(mut e) => { let old_op = e.get_mut(); - self.kv_size.sub_val(old_op); + let old_op_size = old_op.estimated_size(); + match old_op { KeyOp::Insert(old_op_new_value) => { - if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) { + if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) { return Err(Box::new(MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), @@ -206,26 +221,43 @@ impl MemTable { })); } + self.kv_size.sub_size(old_op_size); self.kv_size.sub_size(key_len); e.remove(); - Ok(()) } - KeyOp::Delete(_) => Err(MemTableError::InconsistentOperation { - key: e.key().clone(), - prev: e.get().clone(), - new: KeyOp::Delete(old_value), + KeyOp::Delete(_) => { + let new_op = KeyOp::Delete(old_value); + let err = MemTableError::InconsistentOperation { + key: e.key().clone(), + prev: e.get().clone(), + new: new_op.clone(), + }; + + if sanity_check_enabled() { + Err(err.into()) + } else { + tracing::error!( + error = %err.as_report(), + "double delete, ignoring because sanity check is disabled" + ); + self.kv_size.sub_size(old_op_size); + self.kv_size.add_val(&new_op); + e.insert(new_op); + Ok(()) + } } - .into()), KeyOp::Update((old_op_old_value, old_op_new_value)) => { - if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) { + if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) { return Err(Box::new(MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), new: KeyOp::Delete(old_value), })); } + let new_op = KeyOp::Delete(std::mem::take(old_op_old_value)); + self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) @@ -247,7 +279,6 @@ impl MemTable { } = &self.op_consistency_level else { let key_len = std::mem::size_of::() + pk.len(); - let op = KeyOp::Update((old_value, new_value)); self.kv_size.add(&pk, &op); let old_op = self.buffer.insert(pk, op); @@ -264,40 +295,60 @@ impl MemTable { } Entry::Occupied(mut e) => { let old_op = e.get_mut(); - self.kv_size.sub_val(old_op); + let old_op_size = old_op.estimated_size(); + match old_op { KeyOp::Insert(old_op_new_value) => { - if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) { + if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) { return Err(Box::new(MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), new: KeyOp::Update((old_value, new_value)), })); } + let new_op = KeyOp::Insert(new_value); + self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) } KeyOp::Update((old_op_old_value, old_op_new_value)) => { - if ENABLE_SANITY_CHECK && !value_checker(old_op_new_value, &old_value) { + if sanity_check_enabled() && !value_checker(old_op_new_value, &old_value) { return Err(Box::new(MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), new: KeyOp::Update((old_value, new_value)), })); } + let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value)); + self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) } - KeyOp::Delete(_) => Err(MemTableError::InconsistentOperation { - key: e.key().clone(), - prev: e.get().clone(), - new: KeyOp::Update((old_value, new_value)), + KeyOp::Delete(_) => { + let new_op = KeyOp::Update((old_value, new_value)); + let err = MemTableError::InconsistentOperation { + key: e.key().clone(), + prev: e.get().clone(), + new: new_op.clone(), + }; + + if sanity_check_enabled() { + Err(err.into()) + } else { + tracing::error!( + error = %err.as_report(), + "update on deleted, ignoring because sanity check is disabled" + ); + self.kv_size.sub_size(old_op_size); + self.kv_size.add_val(&new_op); + e.insert(new_op); + Ok(()) + } } - .into()), } } } @@ -532,7 +583,7 @@ impl LocalStateStore for MemtableLocalState // a workaround you may call disable the check by initializing the // state store with `op_consistency_level=Inconsistent`. KeyOp::Insert(value) => { - if ENABLE_SANITY_CHECK { + if sanity_check_enabled() { do_insert_sanity_check( &key, &value, @@ -547,7 +598,7 @@ impl LocalStateStore for MemtableLocalState kv_pairs.push((key, StorageValue::new_put(value))); } KeyOp::Delete(old_value) => { - if ENABLE_SANITY_CHECK { + if sanity_check_enabled() { do_delete_sanity_check( &key, &old_value, @@ -562,7 +613,7 @@ impl LocalStateStore for MemtableLocalState kv_pairs.push((key, StorageValue::new_delete())); } KeyOp::Update((old_value, new_value)) => { - if ENABLE_SANITY_CHECK { + if sanity_check_enabled() { do_update_sanity_check( &key, &old_value, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index dde7818319d75..1d1a5e2a6047f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -66,7 +66,6 @@ use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; use crate::cache::cache_may_stale; use crate::common::cache::{StateCache, StateCacheFiller}; use crate::common::table::state_table_cache::StateTableWatermarkCache; -use crate::consistency::enable_strict_consistency; use crate::executor::{StreamExecutorError, StreamExecutorResult}; /// This num is arbitrary and we may want to improve this choice in the future. @@ -284,16 +283,11 @@ where store: S, vnodes: Option>, ) -> Self { - let consistency_level = if enable_strict_consistency() { - StateTableOpConsistencyLevel::ConsistentOldValue - } else { - StateTableOpConsistencyLevel::Inconsistent - }; Self::from_table_catalog_with_consistency_level( table_catalog, store, vnodes, - consistency_level, + StateTableOpConsistencyLevel::ConsistentOldValue, ) .await } @@ -563,7 +557,7 @@ where order_types: Vec, pk_indices: Vec, ) -> Self { - Self::new_with_distribution_inner_for_test( + Self::new_with_distribution_inner( store, table_id, columns, @@ -587,7 +581,7 @@ where distribution: TableDistribution, value_indices: Option>, ) -> Self { - Self::new_with_distribution_inner_for_test( + Self::new_with_distribution_inner( store, table_id, table_columns, @@ -610,7 +604,7 @@ where distribution: TableDistribution, value_indices: Option>, ) -> Self { - Self::new_with_distribution_inner_for_test( + Self::new_with_distribution_inner( store, table_id, table_columns, @@ -624,7 +618,7 @@ where } #[allow(clippy::too_many_arguments)] - async fn new_with_distribution_inner_for_test( + async fn new_with_distribution_inner( store: S, table_id: TableId, table_columns: Vec, diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 234d579715b4f..9ad0dfbdfcea3 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -37,7 +37,6 @@ use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel}; -use crate::consistency::enable_strict_consistency; use crate::executor::prelude::*; /// `MaterializeExecutor` materializes changes in stream into a materialized view on storage. @@ -69,10 +68,7 @@ fn get_op_consistency_level( may_have_downstream: bool, depended_subscriptions: &HashSet, ) -> StateTableOpConsistencyLevel { - if !enable_strict_consistency() { - // disable sanity check in non-strict mode - StateTableOpConsistencyLevel::Inconsistent - } else if !depended_subscriptions.is_empty() { + if !depended_subscriptions.is_empty() { StateTableOpConsistencyLevel::LogStoreEnabled } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) { // Table with overwrite conflict behavior could disable conflict check @@ -278,7 +274,7 @@ impl MaterializeExecutor { self.state_table .commit_may_switch_consistent_op(b.epoch, op_consistency_level) .await?; - if enable_strict_consistency() && !self.state_table.is_consistent_op() { + if !self.state_table.is_consistent_op() { assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite); } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 346f200362b2d..c1ff395beda11 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -173,6 +173,12 @@ impl LocalStreamManager { await_tree_config: Option, watermark_epoch: AtomicU64Ref, ) -> Self { + if !env.config().unsafe_enable_strict_consistency { + // If strict consistency is disabled, should disable state table sanity check. + // Since this is a special config, we have to check it here. + risingwave_storage::hummock::utils::set_sanity_check_enabled(false); + } + let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new); let (actor_op_tx, actor_op_rx) = unbounded_channel(); From 28ee34650ae9f1c0e72e64c9e30e9f382fb9a4d0 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 24 May 2024 17:08:40 +0800 Subject: [PATCH 5/5] resolve review comments Signed-off-by: Richard Chien --- src/storage/src/hummock/utils.rs | 7 +++++-- src/storage/src/mem_table.rs | 14 +++----------- src/stream/src/task/stream_manager.rs | 4 ++-- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 8d82b40a7a3e7..4d61e7cd33674 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -391,8 +391,11 @@ pub fn check_subset_preserve_order( static SANITY_CHECK_ENABLED: AtomicBool = AtomicBool::new(cfg!(debug_assertions)); -pub fn set_sanity_check_enabled(enabled: bool) { - SANITY_CHECK_ENABLED.store(enabled, AtomicOrdering::Release); +/// This function is intended to be called during compute node initialization if the storage +/// sanity check is not desired. This controls a global flag so only need to be called once +/// if need to disable the sanity check. +pub fn disable_sanity_check() { + SANITY_CHECK_ENABLED.store(false, AtomicOrdering::Release); } pub(crate) fn sanity_check_enabled() -> bool { diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index cb2c1e738c0d2..7f067fc4df67e 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -150,12 +150,11 @@ impl MemTable { } Entry::Occupied(mut e) => { let old_op = e.get_mut(); - let old_op_size = old_op.estimated_size(); + self.kv_size.sub_val(old_op); match old_op { KeyOp::Delete(ref mut old_op_old_value) => { let new_op = KeyOp::Update((std::mem::take(old_op_old_value), value)); - self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) @@ -175,7 +174,6 @@ impl MemTable { error = %err.as_report(), "double insert / insert on updated, ignoring because sanity check is disabled" ); - self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) @@ -209,7 +207,7 @@ impl MemTable { } Entry::Occupied(mut e) => { let old_op = e.get_mut(); - let old_op_size = old_op.estimated_size(); + self.kv_size.sub_val(old_op); match old_op { KeyOp::Insert(old_op_new_value) => { @@ -221,7 +219,6 @@ impl MemTable { })); } - self.kv_size.sub_size(old_op_size); self.kv_size.sub_size(key_len); e.remove(); Ok(()) @@ -241,7 +238,6 @@ impl MemTable { error = %err.as_report(), "double delete, ignoring because sanity check is disabled" ); - self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) @@ -257,7 +253,6 @@ impl MemTable { } let new_op = KeyOp::Delete(std::mem::take(old_op_old_value)); - self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) @@ -295,7 +290,7 @@ impl MemTable { } Entry::Occupied(mut e) => { let old_op = e.get_mut(); - let old_op_size = old_op.estimated_size(); + self.kv_size.sub_val(old_op); match old_op { KeyOp::Insert(old_op_new_value) => { @@ -308,7 +303,6 @@ impl MemTable { } let new_op = KeyOp::Insert(new_value); - self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) @@ -323,7 +317,6 @@ impl MemTable { } let new_op = KeyOp::Update((std::mem::take(old_op_old_value), new_value)); - self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) @@ -343,7 +336,6 @@ impl MemTable { error = %err.as_report(), "update on deleted, ignoring because sanity check is disabled" ); - self.kv_size.sub_size(old_op_size); self.kv_size.add_val(&new_op); e.insert(new_op); Ok(()) diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index c1ff395beda11..88f9d3e27d2ea 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -174,9 +174,9 @@ impl LocalStreamManager { watermark_epoch: AtomicU64Ref, ) -> Self { if !env.config().unsafe_enable_strict_consistency { - // If strict consistency is disabled, should disable state table sanity check. + // If strict consistency is disabled, should disable storage sanity check. // Since this is a special config, we have to check it here. - risingwave_storage::hummock::utils::set_sanity_check_enabled(false); + risingwave_storage::hummock::utils::disable_sanity_check(); } let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);