diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 3a629658c6f8e..66aeb270dc910 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -165,7 +165,7 @@ pub struct StateTableInner< /// 2. Computing output pk indices to used them for backfill state. output_indices: Vec, - is_consistent_op: bool, + op_consistency_level: StateTableOpConsistencyLevel, } /// `StateTable` will use `BasicSerde` as default @@ -218,7 +218,10 @@ where } } -fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel { +fn consistent_old_value_op( + row_serde: impl ValueRowSerde, + is_log_store: bool, +) -> OpConsistencyLevel { OpConsistencyLevel::ConsistentOldValue { check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| { if first == second { @@ -245,10 +248,23 @@ fn consistent_old_value_op(row_serde: impl ValueRowSerde) -> OpConsistencyLevel true } }), - is_log_store: false, + is_log_store, } } +#[derive(Eq, PartialEq, Copy, Clone, Debug)] +pub enum StateTableOpConsistencyLevel { + /// Op is inconsistent + Inconsistent, + /// Op is consistent. + /// - Insert op should ensure that the key does not exist previously + /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value + ConsistentOldValue, + /// The requirement on operation consistency is the same as `ConsistentOldValue`. + /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value. + LogStoreEnabled, +} + // initialize // FIXME(kwannoel): Enforce that none of the constructors here // should be used by replicated state table. @@ -266,7 +282,13 @@ where store: S, vnodes: Option>, ) -> Self { - Self::from_table_catalog_inner(table_catalog, store, vnodes, true, vec![]).await + Self::from_table_catalog_with_consistency_level( + table_catalog, + store, + vnodes, + StateTableOpConsistencyLevel::ConsistentOldValue, + ) + .await } /// Create state table from table catalog and store with sanity check disabled. @@ -275,7 +297,23 @@ where store: S, vnodes: Option>, ) -> Self { - Self::from_table_catalog_inner(table_catalog, store, vnodes, false, vec![]).await + Self::from_table_catalog_with_consistency_level( + table_catalog, + store, + vnodes, + StateTableOpConsistencyLevel::Inconsistent, + ) + .await + } + + pub async fn from_table_catalog_with_consistency_level( + table_catalog: &Table, + store: S, + vnodes: Option>, + consistency_level: StateTableOpConsistencyLevel, + ) -> Self { + Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![]) + .await } /// Create state table from table catalog and store. @@ -283,7 +321,7 @@ where table_catalog: &Table, store: S, vnodes: Option>, - is_consistent_op: bool, + op_consistency_level: StateTableOpConsistencyLevel, output_column_ids: Vec, ) -> Self { let table_id = TableId::new(table_catalog.id); @@ -369,18 +407,23 @@ where ) }; - let is_consistent_op = if crate::consistency::insane() { + let state_table_op_consistency_level = if crate::consistency::insane() { // In insane mode, we will have inconsistent operations applied on the table, even if // our executor code do not expect that. - false + StateTableOpConsistencyLevel::Inconsistent } else { - is_consistent_op + op_consistency_level }; - let op_consistency_level = if is_consistent_op { - let row_serde = make_row_serde(); - consistent_old_value_op(row_serde) - } else { - OpConsistencyLevel::Inconsistent + let op_consistency_level = match state_table_op_consistency_level { + StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, + StateTableOpConsistencyLevel::ConsistentOldValue => { + let row_serde = make_row_serde(); + consistent_old_value_op(row_serde, false) + } + StateTableOpConsistencyLevel::LogStoreEnabled => { + let row_serde = make_row_serde(); + consistent_old_value_op(row_serde, true) + } }; let table_option = TableOption::new(table_catalog.retention_seconds); @@ -466,7 +509,7 @@ where data_types, output_indices, i2o_mapping, - is_consistent_op, + op_consistency_level: state_table_op_consistency_level, } } @@ -602,7 +645,7 @@ where }; let op_consistency_level = if is_consistent_op { let row_serde = make_row_serde(); - consistent_old_value_op(row_serde) + consistent_old_value_op(row_serde, false) } else { OpConsistencyLevel::Inconsistent }; @@ -648,7 +691,11 @@ where data_types, output_indices: vec![], i2o_mapping: ColIndexMapping::new(vec![], 0), - is_consistent_op, + op_consistency_level: if is_consistent_op { + StateTableOpConsistencyLevel::ConsistentOldValue + } else { + StateTableOpConsistencyLevel::Inconsistent + }, } } @@ -711,7 +758,11 @@ where } pub fn is_consistent_op(&self) -> bool { - self.is_consistent_op + matches!( + self.op_consistency_level, + StateTableOpConsistencyLevel::ConsistentOldValue + | StateTableOpConsistencyLevel::LogStoreEnabled + ) } } @@ -728,7 +779,14 @@ where vnodes: Option>, output_column_ids: Vec, ) -> Self { - Self::from_table_catalog_inner(table_catalog, store, vnodes, false, output_column_ids).await + Self::from_table_catalog_inner( + table_catalog, + store, + vnodes, + StateTableOpConsistencyLevel::Inconsistent, + output_column_ids, + ) + .await } } @@ -1084,22 +1142,39 @@ where } pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> { - self.commit_with_switch_consistent_op(new_epoch, None).await + self.commit_inner(new_epoch, None).await } - pub async fn commit_with_switch_consistent_op( + pub async fn commit_may_switch_consistent_op( &mut self, new_epoch: EpochPair, - switch_consistent_op: Option, + op_consistency_level: StateTableOpConsistencyLevel, + ) -> StreamExecutorResult<()> { + if self.op_consistency_level != op_consistency_level { + self.commit_inner(new_epoch, Some(op_consistency_level)) + .await + } else { + self.commit_inner(new_epoch, None).await + } + } + + async fn commit_inner( + &mut self, + new_epoch: EpochPair, + switch_consistent_op: Option, ) -> StreamExecutorResult<()> { assert_eq!(self.epoch(), new_epoch.prev); - let switch_op_consistency_level = switch_consistent_op.map(|enable_consistent_op| { - assert_ne!(self.is_consistent_op, enable_consistent_op); - self.is_consistent_op = enable_consistent_op; - if enable_consistent_op { - consistent_old_value_op(self.row_serde.clone()) - } else { - OpConsistencyLevel::Inconsistent + let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| { + assert_ne!(self.op_consistency_level, new_consistency_level); + self.op_consistency_level = new_consistency_level; + match new_consistency_level { + StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent, + StateTableOpConsistencyLevel::ConsistentOldValue => { + consistent_old_value_op(self.row_serde.clone(), false) + } + StateTableOpConsistencyLevel::LogStoreEnabled => { + consistent_old_value_op(self.row_serde.clone(), true) + } } }); trace!( diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index d876fbe6c33b5..750d299b55e84 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -49,7 +49,7 @@ pub struct ActorContext { pub streaming_metrics: Arc, - pub dispatch_num: usize, + pub initial_dispatch_num: usize, } pub type ActorContextRef = Arc; @@ -65,7 +65,7 @@ impl ActorContext { total_mem_val: Arc::new(TrAdder::new()), streaming_metrics: Arc::new(StreamingMetrics::unused()), // Set 1 for test to enable sanity check on table - dispatch_num: 1, + initial_dispatch_num: 1, }) } @@ -73,7 +73,7 @@ impl ActorContext { stream_actor: &PbStreamActor, total_mem_val: Arc>, streaming_metrics: Arc, - dispatch_num: usize, + initial_dispatch_num: usize, ) -> ActorContextRef { Arc::new(Self { id: stream_actor.actor_id, @@ -83,7 +83,7 @@ impl ActorContext { last_mem_val: Arc::new(0.into()), total_mem_val, streaming_metrics, - dispatch_num, + initial_dispatch_num, }) } diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index 29a3e131fd2f9..ede0448f23887 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -36,7 +36,7 @@ use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew use crate::cache::{new_unbounded, ManagedLruCache}; use crate::common::metrics::MetricsInfo; -use crate::common::table::state_table::StateTableInner; +use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel}; use crate::executor::prelude::*; use crate::executor::{AddMutation, UpdateMutation}; @@ -58,6 +58,21 @@ pub struct MaterializeExecutor { conflict_behavior: ConflictBehavior, version_column_index: Option, + + may_have_downstream: bool, +} + +fn get_op_consistency_level( + conflict_behavior: ConflictBehavior, + may_have_downstream: bool, +) -> StateTableOpConsistencyLevel { + if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) { + // Table with overwrite conflict behavior could disable conflict check + // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well. + StateTableOpConsistencyLevel::Inconsistent + } else { + StateTableOpConsistencyLevel::ConsistentOldValue + } } impl MaterializeExecutor { @@ -90,16 +105,16 @@ impl MaterializeExecutor { ); let arrange_key_indices: Vec = arrange_key.iter().map(|k| k.column_index).collect(); - + let may_have_downstream = actor_context.initial_dispatch_num != 0; + let op_consistency_level = get_op_consistency_level(conflict_behavior, may_have_downstream); // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle. - let state_table = if matches!(conflict_behavior, ConflictBehavior::Overwrite) - && actor_context.dispatch_num == 0 - { - // Table with overwrite conflict behavior could disable conflict check if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well. - StateTableInner::from_table_catalog_inconsistent_op(table_catalog, store, vnodes).await - } else { - StateTableInner::from_table_catalog(table_catalog, store, vnodes).await - }; + let state_table = StateTableInner::from_table_catalog_with_consistency_level( + table_catalog, + store, + vnodes, + op_consistency_level, + ) + .await; let metrics_info = MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize"); @@ -117,6 +132,7 @@ impl MaterializeExecutor { ), conflict_behavior, version_column_index, + may_have_downstream, } } @@ -223,17 +239,20 @@ impl MaterializeExecutor { } } Message::Barrier(b) => { - let mutation = b.mutation.clone(); + let mutation = b.mutation.as_deref(); // If a downstream mv depends on the current table, we need to do conflict check again. - if !self.state_table.is_consistent_op() + if !self.may_have_downstream && Self::new_downstream_created(mutation, self.actor_context.id) { + self.may_have_downstream = true; + } + let op_consistency_level = + get_op_consistency_level(self.conflict_behavior, self.may_have_downstream); + self.state_table + .commit_may_switch_consistent_op(b.epoch, op_consistency_level) + .await?; + if !self.state_table.is_consistent_op() { assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite); - self.state_table - .commit_with_switch_consistent_op(b.epoch, Some(true)) - .await?; - } else { - self.state_table.commit(b.epoch).await?; } // Update the vnode bitmap for the state table if asked. @@ -252,8 +271,8 @@ impl MaterializeExecutor { } } - fn new_downstream_created(mutation: Option>, actor_id: ActorId) -> bool { - let Some(mutation) = mutation.as_deref() else { + fn new_downstream_created(mutation: Option<&Mutation>, actor_id: ActorId) -> bool { + let Some(mutation) = mutation else { return false; }; match mutation { @@ -330,6 +349,7 @@ impl MaterializeExecutor { ), conflict_behavior, version_column_index: None, + may_have_downstream: true, } } }