diff --git a/proto/catalog.proto b/proto/catalog.proto index 7aecf69943435..b60f8f33a1c19 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -239,6 +239,11 @@ message Table { optional uint64 initialized_at_epoch = 28; optional uint64 created_at_epoch = 29; + // This field is introduced in v1.2.0. It is used to indicate whether the table should use + // watermark_cache to avoid state cleaning as a performance optimization. + // In older versions we can just initialize without it. + bool cleaned_by_watermark = 30; + // Per-table catalog version, used by schema change. `None` for internal tables and tests. // Not to be confused with the global catalog version for notification service. TableVersion version = 100; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index a828bb430591a..fd842a1e87681 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -146,6 +146,9 @@ pub struct TableCatalog { pub created_at_epoch: Option, pub initialized_at_epoch: Option, + + /// Indicate whether to use watermark cache for state table. + pub cleaned_by_watermark: bool, } #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] @@ -233,6 +236,11 @@ impl TableCatalog { self } + pub fn with_cleaned_by_watermark(mut self, cleaned_by_watermark: bool) -> Self { + self.cleaned_by_watermark = cleaned_by_watermark; + self + } + pub fn conflict_behavior(&self) -> ConflictBehavior { self.conflict_behavior } @@ -392,6 +400,7 @@ impl TableCatalog { cardinality: Some(self.cardinality.to_protobuf()), initialized_at_epoch: self.initialized_at_epoch.map(|epoch| epoch.0), created_at_epoch: self.created_at_epoch.map(|epoch| epoch.0), + cleaned_by_watermark: self.cleaned_by_watermark, } } @@ -497,6 +506,7 @@ impl From for TableCatalog { .unwrap_or_else(Cardinality::unknown), created_at_epoch: tb.created_at_epoch.map(Epoch::from), initialized_at_epoch: tb.initialized_at_epoch.map(Epoch::from), + cleaned_by_watermark: matches!(tb.cleaned_by_watermark, true), } } } @@ -586,6 +596,7 @@ mod tests { dist_key_in_pk: vec![], cardinality: None, created_at_epoch: None, + cleaned_by_watermark: false, } .into(); @@ -639,6 +650,7 @@ mod tests { cardinality: Cardinality::unknown(), created_at_epoch: None, initialized_at_epoch: None, + cleaned_by_watermark: false, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index 40c15e2c2086e..491f747266546 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -85,6 +85,16 @@ impl_plan_tree_node_for_binary! { StreamDynamicFilter } impl StreamNode for StreamDynamicFilter { fn to_stream_prost_body(&self, state: &mut BuildFragmentGraphState) -> NodeBody { use generic::dynamic_filter::*; + let cleaned_by_watermark = if let Some(condition) = self.core.predicate().as_expr_unless_true() + && let Some((_input_expr, cmp, _now_expr)) = condition.as_now_comparison_cond() { + // Only GT / GTE will undergo state cleaning. + // As such, we only need cache for these cases. + matches!(cmp, ExprType::GreaterThan | ExprType::GreaterThanOrEqual) + // NOTE(kwannoel): `now_expr` is checked within `as_now_comparison_cond`. + // so we don't need to check it here. + } else { + false + }; let condition = self .core .predicate() @@ -92,7 +102,8 @@ impl StreamNode for StreamDynamicFilter { .map(|x| x.to_expr_proto()); let left_index = self.core.left_index(); let left_table = infer_left_internal_table_catalog(&self.base, left_index) - .with_id(state.gen_table_id_wrapped()); + .with_id(state.gen_table_id_wrapped()) + .with_cleaned_by_watermark(cleaned_by_watermark); let right = self.right(); let right_table = infer_right_internal_table_catalog(right.plan_base()) .with_id(state.gen_table_id_wrapped()); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 5e5789e2b0de2..66be991bae95c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -221,6 +221,7 @@ impl StreamMaterialize { cardinality, created_at_epoch: None, initialized_at_epoch: None, + cleaned_by_watermark: false, }) } diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index e3ec3ef50e538..1f97b28d89e49 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -176,6 +176,7 @@ impl TableCatalogBuilder { cardinality: Cardinality::unknown(), // TODO(card): cardinality of internal table created_at_epoch: None, initialized_at_epoch: None, + cleaned_by_watermark: false, } } diff --git a/src/storage/src/filter_key_extractor.rs b/src/storage/src/filter_key_extractor.rs index f11f9d8d7c3f1..77dd68a198854 100644 --- a/src/storage/src/filter_key_extractor.rs +++ b/src/storage/src/filter_key_extractor.rs @@ -505,6 +505,7 @@ mod tests { dist_key_in_pk: vec![], cardinality: None, created_at_epoch: None, + cleaned_by_watermark: false, } } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 3142f381231b7..508ef2536c0f6 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -146,7 +146,6 @@ pub type ReplicatedStateTable = StateTableInner; /// It will reduce state cleaning overhead. pub type WatermarkCacheStateTable = StateTableInner; - pub type WatermarkCacheParameterizedStateTable = StateTableInner; diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index c3ce5691d6a9b..4a9ce3bd565f8 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -39,11 +39,11 @@ use super::monitor::StreamingMetrics; use super::{ ActorContextRef, BoxedExecutor, BoxedMessageStream, Executor, Message, PkIndices, PkIndicesRef, }; -use crate::common::table::state_table::StateTable; +use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable}; use crate::common::StreamChunkBuilder; use crate::executor::expect_first_barrier_from_aligned_stream; -pub struct DynamicFilterExecutor { +pub struct DynamicFilterExecutor { ctx: ActorContextRef, source_l: Option, source_r: Option, @@ -51,7 +51,7 @@ pub struct DynamicFilterExecutor { pk_indices: PkIndices, identity: String, comparator: ExprNodeType, - left_table: StateTable, + left_table: WatermarkCacheParameterizedStateTable, right_table: StateTable, schema: Schema, metrics: Arc, @@ -59,7 +59,7 @@ pub struct DynamicFilterExecutor { chunk_size: usize, } -impl DynamicFilterExecutor { +impl DynamicFilterExecutor { #[allow(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, @@ -69,7 +69,7 @@ impl DynamicFilterExecutor { pk_indices: PkIndices, executor_id: u64, comparator: ExprNodeType, - state_table_l: StateTable, + state_table_l: WatermarkCacheParameterizedStateTable, state_table_r: StateTable, metrics: Arc, chunk_size: usize, @@ -464,7 +464,9 @@ impl DynamicFilterExecutor { } } -impl Executor for DynamicFilterExecutor { +impl Executor + for DynamicFilterExecutor +{ fn execute(self: Box) -> BoxedMessageStream { self.into_stream().boxed() } @@ -536,7 +538,7 @@ mod tests { let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]); let (tx_r, source_r) = MockSource::channel(schema, vec![]); - let executor = DynamicFilterExecutor::::new( + let executor = DynamicFilterExecutor::::new( ActorContext::create(123), Box::new(source_l), Box::new(source_r), diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index 999dece6dcdfa..23341c157524a 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -20,7 +20,7 @@ use risingwave_pb::expr::expr_node::Type::{ use risingwave_pb::stream_plan::DynamicFilterNode; use super::*; -use crate::common::table::state_table::StateTable; +use crate::common::table::state_table::{StateTable, WatermarkCacheStateTable}; use crate::executor::DynamicFilterExecutor; pub struct DynamicFilterExecutorBuilder; @@ -56,30 +56,48 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { ); } - // TODO: use consistent operation for dynamic filter - let state_table_l = StateTable::from_table_catalog_inconsistent_op( - node.get_left_table()?, - store.clone(), - Some(vnodes), - ) - .await; - let state_table_r = - StateTable::from_table_catalog_inconsistent_op(node.get_right_table()?, store, None) - .await; + StateTable::from_table_catalog(node.get_right_table()?, store.clone(), None).await; - Ok(Box::new(DynamicFilterExecutor::new( - params.actor_context, - source_l, - source_r, - key_l, - params.pk_indices, - params.executor_id, - comparator, - state_table_l, - state_table_r, - params.executor_stats, - params.env.config().developer.chunk_size, - ))) + let left_table = node.get_left_table()?; + if left_table.get_cleaned_by_watermark() { + let state_table_l = WatermarkCacheStateTable::from_table_catalog( + node.get_left_table()?, + store, + Some(vnodes), + ) + .await; + + Ok(Box::new(DynamicFilterExecutor::new( + params.actor_context, + source_l, + source_r, + key_l, + params.pk_indices, + params.executor_id, + comparator, + state_table_l, + state_table_r, + params.executor_stats, + params.env.config().developer.chunk_size, + ))) + } else { + let state_table_l = + StateTable::from_table_catalog(node.get_left_table()?, store, Some(vnodes)).await; + + Ok(Box::new(DynamicFilterExecutor::new( + params.actor_context, + source_l, + source_r, + key_l, + params.pk_indices, + params.executor_id, + comparator, + state_table_l, + state_table_r, + params.executor_stats, + params.env.config().developer.chunk_size, + ))) + } } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 979e4c4825c0f..8aecfec7585ce 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -145,6 +145,7 @@ async fn compaction_test( dist_key_in_pk: vec![], cardinality: None, created_at_epoch: None, + cleaned_by_watermark: false, }; let mut delete_range_table = delete_key_table.clone(); delete_range_table.id = 2;