diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 9babe12b9054b..c59b027b44bc4 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1381,7 +1381,7 @@ async fn test_replicated_local_hummock_storage() { let mut local_hummock_storage = hummock_storage .new_local(NewLocalOptions::new_replicated( TEST_TABLE_ID, - false, + OpConsistencyLevel::Inconsistent, TableOption { retention_seconds: None, }, diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index dfa3ba46ac4e7..046d90a7e18d3 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -135,10 +135,16 @@ impl From for TableOption { } } +#[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)] +pub enum TracedOpConsistencyLevel { + Inconsistent, + ConsistentOldValue, +} + #[derive(Encode, Decode, PartialEq, Eq, Debug, Clone, Copy, Hash)] pub struct TracedNewLocalOptions { pub table_id: TracedTableId, - pub is_consistent_op: bool, + pub op_consistency_level: TracedOpConsistencyLevel, pub table_option: TracedTableOption, pub is_replicated: bool, } @@ -148,7 +154,7 @@ impl TracedNewLocalOptions { pub(crate) fn for_test(table_id: u32) -> Self { Self { table_id: TracedTableId { table_id }, - is_consistent_op: true, + op_consistency_level: TracedOpConsistencyLevel::Inconsistent, table_option: TracedTableOption { retention_seconds: None, }, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 72c24cc5b1cb7..8e489a5ebd975 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -58,7 +58,7 @@ pub struct LocalHummockStorage { epoch: Option, table_id: TableId, - is_consistent_op: bool, + op_consistency_level: OpConsistencyLevel, table_option: TableOption, instance_guard: LocalInstanceGuard, @@ -300,43 +300,46 @@ impl LocalStateStore for LocalHummockStorage { // a workaround you may call disable the check by initializing the // state store with `is_consistent_op=false`. KeyOp::Insert(value) => { - if ENABLE_SANITY_CHECK && self.is_consistent_op { + if ENABLE_SANITY_CHECK { do_insert_sanity_check( - key.clone(), - value.clone(), + &key, + &value, self, self.epoch(), self.table_id, self.table_option, + &self.op_consistency_level, ) .await?; } kv_pairs.push((key, StorageValue::new_put(value))); } KeyOp::Delete(old_value) => { - if ENABLE_SANITY_CHECK && self.is_consistent_op { + if ENABLE_SANITY_CHECK { do_delete_sanity_check( - key.clone(), - old_value, + &key, + &old_value, self, self.epoch(), self.table_id, self.table_option, + &self.op_consistency_level, ) .await?; } kv_pairs.push((key, StorageValue::new_delete())); } KeyOp::Update((old_value, new_value)) => { - if ENABLE_SANITY_CHECK && self.is_consistent_op { + if ENABLE_SANITY_CHECK { do_update_sanity_check( - key.clone(), - old_value, - new_value.clone(), + &key, + &old_value, + &new_value, self, self.epoch(), self.table_id, self.table_option, + &self.op_consistency_level, ) .await?; } @@ -530,11 +533,11 @@ impl LocalHummockStorage { ) -> Self { let stats = hummock_version_reader.stats().clone(); Self { - mem_table: MemTable::new(option.is_consistent_op), + mem_table: MemTable::new(option.op_consistency_level.clone()), spill_offset: 0, epoch: None, table_id: option.table_id, - is_consistent_op: option.is_consistent_op, + op_consistency_level: option.op_consistency_level, table_option: option.table_option, is_replicated: option.is_replicated, instance_guard, diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 9edfa8431f2e8..c32c0a02128ce 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -36,7 +36,7 @@ use super::{HummockError, HummockResult}; use crate::error::StorageResult; use crate::hummock::CachePolicy; use crate::mem_table::{KeyOp, MemTableError}; -use crate::store::{ReadOptions, StateStoreRead}; +use crate::store::{OpConsistencyLevel, ReadOptions, StateStoreRead}; pub fn range_overlap( search_key_range: &R, @@ -373,13 +373,17 @@ pub(crate) const ENABLE_SANITY_CHECK: bool = cfg!(debug_assertions); /// Make sure the key to insert should not exist in storage. pub(crate) async fn do_insert_sanity_check( - key: TableKey, - value: Bytes, + key: &TableKey, + value: &Bytes, inner: &impl StateStoreRead, epoch: u64, table_id: TableId, table_option: TableOption, + op_consistency_level: &OpConsistencyLevel, ) -> StorageResult<()> { + if let OpConsistencyLevel::Inconsistent = op_consistency_level { + return Ok(()); + } let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, @@ -390,9 +394,9 @@ pub(crate) async fn do_insert_sanity_check( if let Some(stored_value) = stored_value { return Err(Box::new(MemTableError::InconsistentOperation { - key, + key: key.clone(), prev: KeyOp::Insert(stored_value), - new: KeyOp::Insert(value), + new: KeyOp::Insert(value.clone()), }) .into()); } @@ -401,13 +405,17 @@ pub(crate) async fn do_insert_sanity_check( /// Make sure that the key to delete should exist in storage and the value should be matched. pub(crate) async fn do_delete_sanity_check( - key: TableKey, - old_value: Bytes, + key: &TableKey, + old_value: &Bytes, inner: &impl StateStoreRead, epoch: u64, table_id: TableId, table_option: TableOption, + op_consistency_level: &OpConsistencyLevel, ) -> StorageResult<()> { + let OpConsistencyLevel::ConsistentOldValue(old_value_checker) = op_consistency_level else { + return Ok(()); + }; let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, @@ -416,17 +424,17 @@ pub(crate) async fn do_delete_sanity_check( }; match inner.get(key.clone(), epoch, read_options).await? { None => Err(Box::new(MemTableError::InconsistentOperation { - key, + key: key.clone(), prev: KeyOp::Delete(Bytes::default()), - new: KeyOp::Delete(old_value), + new: KeyOp::Delete(old_value.clone()), }) .into()), Some(stored_value) => { - if stored_value != old_value { + if !old_value_checker(&stored_value, old_value) { Err(Box::new(MemTableError::InconsistentOperation { - key, + key: key.clone(), prev: KeyOp::Insert(stored_value), - new: KeyOp::Delete(old_value), + new: KeyOp::Delete(old_value.clone()), }) .into()) } else { @@ -438,14 +446,18 @@ pub(crate) async fn do_delete_sanity_check( /// Make sure that the key to update should exist in storage and the value should be matched pub(crate) async fn do_update_sanity_check( - key: TableKey, - old_value: Bytes, - new_value: Bytes, + key: &TableKey, + old_value: &Bytes, + new_value: &Bytes, inner: &impl StateStoreRead, epoch: u64, table_id: TableId, table_option: TableOption, + op_consistency_level: &OpConsistencyLevel, ) -> StorageResult<()> { + let OpConsistencyLevel::ConsistentOldValue(old_value_checker) = op_consistency_level else { + return Ok(()); + }; let read_options = ReadOptions { retention_seconds: table_option.retention_seconds, table_id, @@ -455,17 +467,17 @@ pub(crate) async fn do_update_sanity_check( match inner.get(key.clone(), epoch, read_options).await? { None => Err(Box::new(MemTableError::InconsistentOperation { - key, + key: key.clone(), prev: KeyOp::Delete(Bytes::default()), - new: KeyOp::Update((old_value, new_value)), + new: KeyOp::Update((old_value.clone(), new_value.clone())), }) .into()), Some(stored_value) => { - if stored_value != old_value { + if !old_value_checker(&stored_value, old_value) { Err(Box::new(MemTableError::InconsistentOperation { - key, + key: key.clone(), prev: KeyOp::Insert(stored_value), - new: KeyOp::Update((old_value, new_value)), + new: KeyOp::Update((old_value.clone(), new_value.clone())), }) .into()) } else { diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index eca66bfba2f74..6adf9a7ef5d7e 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -55,7 +55,7 @@ pub enum KeyOp { #[derive(Clone)] pub struct MemTable { pub(crate) buffer: MemTableStore, - pub(crate) is_consistent_op: bool, + pub(crate) op_consistency_level: OpConsistencyLevel, pub(crate) kv_size: KvSize, } @@ -108,17 +108,17 @@ impl RustIteratorBuilder for MemTableIteratorBuilder { pub type MemTableHummockIterator<'a> = FromRustIterator<'a, MemTableIteratorBuilder>; impl MemTable { - pub fn new(is_consistent_op: bool) -> Self { + pub fn new(op_consistency_level: OpConsistencyLevel) -> Self { Self { buffer: BTreeMap::new(), - is_consistent_op, + op_consistency_level, kv_size: KvSize::new(), } } pub fn drain(&mut self) -> Self { self.kv_size.set(0); - std::mem::replace(self, Self::new(self.is_consistent_op)) + std::mem::replace(self, Self::new(self.op_consistency_level.clone())) } pub fn is_dirty(&self) -> bool { @@ -127,7 +127,7 @@ impl MemTable { /// write methods pub fn insert(&mut self, pk: TableKey, value: Bytes) -> Result<()> { - if !self.is_consistent_op { + if let OpConsistencyLevel::Inconsistent = &self.op_consistency_level { let key_len = std::mem::size_of::() + pk.len(); let insert_value = KeyOp::Insert(value); self.kv_size.add(&pk, &insert_value); @@ -135,7 +135,7 @@ impl MemTable { self.sub_origin_size(origin_value, key_len); return Ok(()); - } + }; let entry = self.buffer.entry(pk); match entry { Entry::Vacant(e) => { @@ -170,13 +170,14 @@ impl MemTable { pub fn delete(&mut self, pk: TableKey, old_value: Bytes) -> Result<()> { let key_len = std::mem::size_of::() + pk.len(); - if !self.is_consistent_op { + let OpConsistencyLevel::ConsistentOldValue(value_checker) = &self.op_consistency_level + else { let delete_value = KeyOp::Delete(old_value); self.kv_size.add(&pk, &delete_value); let origin_value = self.buffer.insert(pk, delete_value); self.sub_origin_size(origin_value, key_len); return Ok(()); - } + }; let entry = self.buffer.entry(pk); match entry { Entry::Vacant(e) => { @@ -190,7 +191,7 @@ impl MemTable { self.kv_size.sub_val(origin_value); match origin_value { KeyOp::Insert(original_value) => { - if ENABLE_SANITY_CHECK && original_value != &old_value { + if ENABLE_SANITY_CHECK && !value_checker(original_value, &old_value) { return Err(Box::new(MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), @@ -234,7 +235,8 @@ impl MemTable { old_value: Bytes, new_value: Bytes, ) -> Result<()> { - if !self.is_consistent_op { + let OpConsistencyLevel::ConsistentOldValue(value_checker) = &self.op_consistency_level + else { let key_len = std::mem::size_of::() + pk.len(); let update_value = KeyOp::Update((old_value, new_value)); @@ -242,7 +244,7 @@ impl MemTable { let origin_value = self.buffer.insert(pk, update_value); self.sub_origin_size(origin_value, key_len); return Ok(()); - } + }; let entry = self.buffer.entry(pk); match entry { Entry::Vacant(e) => { @@ -256,7 +258,7 @@ impl MemTable { self.kv_size.sub_val(origin_value); match origin_value { KeyOp::Insert(original_new_value) => { - if ENABLE_SANITY_CHECK && original_new_value != &old_value { + if ENABLE_SANITY_CHECK && !value_checker(original_new_value, &old_value) { return Err(Box::new(MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), @@ -269,7 +271,7 @@ impl MemTable { Ok(()) } KeyOp::Update((origin_old_value, original_new_value)) => { - if ENABLE_SANITY_CHECK && original_new_value != &old_value { + if ENABLE_SANITY_CHECK && !value_checker(original_new_value, &old_value) { return Err(Box::new(MemTableError::InconsistentOperation { key: e.key().clone(), prev: e.get().clone(), @@ -427,7 +429,7 @@ pub struct MemtableLocalStateStore { epoch: Option, table_id: TableId, - is_consistent_op: bool, + op_consistency_level: OpConsistencyLevel, table_option: TableOption, } @@ -435,10 +437,10 @@ impl MemtableLocalStateStore { pub fn new(inner: S, option: NewLocalOptions) -> Self { Self { inner, - mem_table: MemTable::new(option.is_consistent_op), + mem_table: MemTable::new(option.op_consistency_level.clone()), epoch: None, table_id: option.table_id, - is_consistent_op: option.is_consistent_op, + op_consistency_level: option.op_consistency_level, table_option: option.table_option, } } @@ -525,45 +527,48 @@ impl LocalStateStore for MemtableLocalState match key_op { // Currently, some executors do not strictly comply with these semantics. As // a workaround you may call disable the check by initializing the - // state store with `is_consistent_op=false`. + // state store with `op_consistency_level=Inconsistent`. KeyOp::Insert(value) => { - if ENABLE_SANITY_CHECK && self.is_consistent_op { + if ENABLE_SANITY_CHECK { do_insert_sanity_check( - key.clone(), - value.clone(), + &key, + &value, &self.inner, self.epoch(), self.table_id, self.table_option, + &self.op_consistency_level, ) .await?; } kv_pairs.push((key, StorageValue::new_put(value))); } KeyOp::Delete(old_value) => { - if ENABLE_SANITY_CHECK && self.is_consistent_op { + if ENABLE_SANITY_CHECK { do_delete_sanity_check( - key.clone(), - old_value, + &key, + &old_value, &self.inner, self.epoch(), self.table_id, self.table_option, + &self.op_consistency_level, ) .await?; } kv_pairs.push((key, StorageValue::new_delete())); } KeyOp::Update((old_value, new_value)) => { - if ENABLE_SANITY_CHECK && self.is_consistent_op { + if ENABLE_SANITY_CHECK { do_update_sanity_check( - key.clone(), - old_value, - new_value.clone(), + &key, + &old_value, + &new_value, &self.inner, self.epoch(), self.table_id, self.table_option, + &self.op_consistency_level, ) .await?; } @@ -637,10 +642,13 @@ mod tests { use crate::hummock::iterator::HummockIterator; use crate::hummock::value::HummockValue; use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator}; + use crate::store::{OpConsistencyLevel, CHECK_BYTES_EQUAL}; #[tokio::test] async fn test_mem_table_memory_size() { - let mut mem_table = MemTable::new(true); + let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue( + CHECK_BYTES_EQUAL.clone(), + )); assert_eq!(mem_table.kv_size.size(), 0); mem_table @@ -750,7 +758,7 @@ mod tests { #[tokio::test] async fn test_mem_table_memory_size_not_consistent_op() { - let mut mem_table = MemTable::new(false); + let mut mem_table = MemTable::new(OpConsistencyLevel::Inconsistent); assert_eq!(mem_table.kv_size.size(), 0); mem_table @@ -833,7 +841,9 @@ mod tests { let mut test_data = ordered_test_data.clone(); test_data.shuffle(&mut rng); - let mut mem_table = MemTable::new(true); + let mut mem_table = MemTable::new(OpConsistencyLevel::ConsistentOldValue( + CHECK_BYTES_EQUAL.clone(), + )); for (key, op) in test_data { match op { KeyOp::Insert(value) => { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index cf5211d7069e5..cbdd719e486d3 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::default::Default; use std::future::Future; use std::ops::Bound; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use bytes::Bytes; use futures::{Stream, StreamExt, TryStreamExt}; @@ -30,8 +30,8 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::{HummockReadEpoch, LocalSstableInfo}; use risingwave_hummock_trace::{ - TracedInitOptions, TracedNewLocalOptions, TracedPrefetchOptions, TracedReadOptions, - TracedSealCurrentEpochOptions, TracedWriteOptions, + TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions, + TracedReadOptions, TracedSealCurrentEpochOptions, TracedWriteOptions, }; use crate::error::{StorageError, StorageResult}; @@ -397,6 +397,18 @@ impl From for WriteOptions { } } +pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync; + +pub static CHECK_BYTES_EQUAL: LazyLock> = + LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second)); + +#[derive(Default, Clone)] +pub enum OpConsistencyLevel { + #[default] + Inconsistent, + ConsistentOldValue(Arc), +} + #[derive(Clone, Default)] pub struct NewLocalOptions { pub table_id: TableId, @@ -407,7 +419,7 @@ pub struct NewLocalOptions { /// /// 2. The old value passed from /// `update` and `delete` should match the original stored value. - pub is_consistent_op: bool, + pub op_consistency_level: OpConsistencyLevel, pub table_option: TableOption, /// Indicate if this is replicated. If it is, we should not @@ -419,7 +431,12 @@ impl From for NewLocalOptions { fn from(value: TracedNewLocalOptions) -> Self { Self { table_id: value.table_id.into(), - is_consistent_op: value.is_consistent_op, + op_consistency_level: match value.op_consistency_level { + TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, + TracedOpConsistencyLevel::ConsistentOldValue => { + OpConsistencyLevel::ConsistentOldValue(CHECK_BYTES_EQUAL.clone()) + } + }, table_option: value.table_option.into(), is_replicated: value.is_replicated, } @@ -430,7 +447,12 @@ impl From for TracedNewLocalOptions { fn from(value: NewLocalOptions) -> Self { Self { table_id: value.table_id.into(), - is_consistent_op: value.is_consistent_op, + op_consistency_level: match value.op_consistency_level { + OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent, + OpConsistencyLevel::ConsistentOldValue(_) => { + TracedOpConsistencyLevel::ConsistentOldValue + } + }, table_option: value.table_option.into(), is_replicated: value.is_replicated, } @@ -438,10 +460,14 @@ impl From for TracedNewLocalOptions { } impl NewLocalOptions { - pub fn new(table_id: TableId, is_consistent_op: bool, table_option: TableOption) -> Self { + pub fn new( + table_id: TableId, + op_consistency_level: OpConsistencyLevel, + table_option: TableOption, + ) -> Self { NewLocalOptions { table_id, - is_consistent_op, + op_consistency_level, table_option, is_replicated: false, } @@ -449,12 +475,12 @@ impl NewLocalOptions { pub fn new_replicated( table_id: TableId, - is_consistent_op: bool, + op_consistency_level: OpConsistencyLevel, table_option: TableOption, ) -> Self { NewLocalOptions { table_id, - is_consistent_op, + op_consistency_level, table_option, is_replicated: true, } @@ -463,7 +489,7 @@ impl NewLocalOptions { pub fn for_test(table_id: TableId) -> Self { Self { table_id, - is_consistent_op: false, + op_consistency_level: OpConsistencyLevel::Inconsistent, table_option: TableOption { retention_seconds: None, }, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 74358f7944c3b..ca6137db47a70 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -20,7 +20,7 @@ use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntCounter}; use risingwave_connector::sink::log_store::LogStoreFactory; use risingwave_connector::sink::{SinkParam, SinkWriterParam}; use risingwave_pb::catalog::Table; -use risingwave_storage::store::NewLocalOptions; +use risingwave_storage::store::{NewLocalOptions, OpConsistencyLevel}; use risingwave_storage::StateStore; use tokio::sync::watch; @@ -237,7 +237,7 @@ impl LogStoreFactory for KvLogStoreFactory { table_id: TableId { table_id: self.table_catalog.id, }, - is_consistent_op: false, + op_consistency_level: OpConsistencyLevel::Inconsistent, table_option: TableOption { retention_seconds: None, }, diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index d887684686977..7126b67147ed8 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -54,8 +54,8 @@ use risingwave_storage::row_serde::row_serde_util::{ }; use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::store::{ - InitOptions, LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, - SealCurrentEpochOptions, StateStoreIterItemStream, + InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions, + ReadOptions, SealCurrentEpochOptions, StateStoreIterItemStream, }; use risingwave_storage::table::merge_sort::merge_sort; use risingwave_storage::table::{compute_chunk_vnode, compute_vnode, Distribution, KeyedRow}; @@ -212,6 +212,34 @@ where } } +fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel { + OpConsistencyLevel::ConsistentOldValue(Arc::new(move |first: &Bytes, second: &Bytes| { + if first == second { + return true; + } + let first = match row_serde.deserialize(first) { + Ok(rows) => rows, + Err(e) => { + error!(err = %e, value = ?first, "fail to deserialize serialized value"); + return false; + } + }; + let second = match row_serde.deserialize(second) { + Ok(rows) => rows, + Err(e) => { + error!(err = %e, value = ?second, "fail to deserialize serialized value"); + return false; + } + }; + if first != second { + error!(first = ?first, second = ?second, "sanity check fail"); + false + } else { + true + } + })) +} + // initialize // FIXME(kwannoel): Enforce that none of the constructors here // should be used by replicated state table. @@ -294,14 +322,6 @@ where .collect() }; - let table_option = TableOption::build_table_option(table_catalog.get_properties()); - let new_local_options = if IS_REPLICATED { - NewLocalOptions::new_replicated(table_id, is_consistent_op, table_option) - } else { - NewLocalOptions::new(table_id, is_consistent_op, table_option) - }; - let local_state_store = store.new_local(new_local_options).await; - let pk_data_types = pk_indices .iter() .map(|i| table_columns[*i].data_type.clone()) @@ -334,10 +354,29 @@ where }; let prefix_hint_len = table_catalog.read_prefix_len_hint as usize; - let row_serde = SD::new( - Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)), - Arc::from(table_columns.into_boxed_slice()), - ); + let make_row_serde = || { + SD::new( + Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)), + Arc::from(table_columns.clone().into_boxed_slice()), + ) + }; + + let op_consistency_level = if is_consistent_op { + let row_serde = make_row_serde(); + consistent_old_value_op(row_serde) + } else { + OpConsistencyLevel::Inconsistent + }; + + let table_option = TableOption::build_table_option(table_catalog.get_properties()); + let new_local_options = if IS_REPLICATED { + NewLocalOptions::new_replicated(table_id, op_consistency_level, table_option) + } else { + NewLocalOptions::new(table_id, op_consistency_level, table_option) + }; + let local_state_store = store.new_local(new_local_options).await; + + let row_serde = make_row_serde(); // If state table has versioning, that means it supports // Schema change. In that case, the row encoding should be column aware as well. @@ -527,13 +566,31 @@ where value_indices: Option>, is_consistent_op: bool, ) -> Self { + let make_row_serde = || { + SD::new( + Arc::from( + value_indices + .clone() + .unwrap_or_else(|| (0..table_columns.len()).collect_vec()) + .into_boxed_slice(), + ), + Arc::from(table_columns.clone().into_boxed_slice()), + ) + }; + let op_consistency_level = if is_consistent_op { + let row_serde = make_row_serde(); + consistent_old_value_op(row_serde) + } else { + OpConsistencyLevel::Inconsistent + }; let local_state_store = store .new_local(NewLocalOptions::new( table_id, - is_consistent_op, + op_consistency_level, TableOption::default(), )) .await; + let row_serde = make_row_serde(); let data_types: Vec = table_columns .iter() .map(|col| col.data_type.clone()) @@ -553,15 +610,7 @@ where table_id, local_store: local_state_store, pk_serde, - row_serde: SD::new( - Arc::from( - value_indices - .clone() - .unwrap_or_else(|| (0..table_columns.len()).collect_vec()) - .into_boxed_slice(), - ), - Arc::from(table_columns.into_boxed_slice()), - ), + row_serde, pk_indices, dist_key_in_pk_indices, prefix_hint_len: 0, @@ -909,7 +958,7 @@ where /// Update a row without giving old value. /// - /// `is_consistent_op` should be set to false. + /// `op_consistency_level` should be set to `Inconsistent`. pub fn update_without_old_value(&mut self, new_value: impl Row) { let new_pk = (&new_value).project(self.pk_indices()); let new_key_bytes = diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index a3ec5c36a5eb1..2db6047f716b1 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -85,16 +85,7 @@ impl MaterializeExecutor { ) -> Self { let arrange_key_indices: Vec = arrange_key.iter().map(|k| k.column_index).collect(); - let state_table = if table_catalog.version.is_some() { - // TODO: If we do some `Delete` after schema change, we cannot ensure the encoded value - // with the new version of serializer is the same as the old one, even if they can be - // decoded into the same value. The table is now performing consistency check on the raw - // bytes, so we need to turn off the check here. We may turn it on if we can compare the - // decoded row. - StateTableInner::from_table_catalog_inconsistent_op(table_catalog, store, vnodes).await - } else { - StateTableInner::from_table_catalog(table_catalog, store, vnodes).await - }; + let state_table = StateTableInner::from_table_catalog(table_catalog, store, vnodes).await; let metrics_info = MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");