From 88f1c27e1566c4a5b0464cd5dd60c5deab46f361 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 1 Aug 2024 23:30:11 +0800 Subject: [PATCH] fix(dyn-filter): filter left changes according to watermark before writing state table (#17816) Signed-off-by: Richard Chien Co-authored-by: Patrick Huang --- proto/stream_plan.proto | 2 +- src/common/src/util/stream_graph_visitor.rs | 7 +- .../testdata/output/temporal_filter.yaml | 24 +-- .../plan_node/stream_dynamic_filter.rs | 19 +- src/storage/src/store.rs | 1 + src/stream/src/common/table/mod.rs | 1 - src/stream/src/common/table/state_table.rs | 148 +++++++------ .../src/common/table/test_state_table.rs | 8 +- src/stream/src/common/table/watermark.rs | 60 ------ src/stream/src/executor/dynamic_filter.rs | 201 +++++++++--------- src/stream/src/executor/hash_agg.rs | 2 +- src/stream/src/executor/join/hash_join.rs | 4 +- src/stream/src/executor/sort_buffer.rs | 4 +- src/stream/src/executor/top_n/group_top_n.rs | 3 +- .../executor/top_n/group_top_n_appendonly.rs | 3 +- src/stream/src/executor/top_n/top_n_state.rs | 4 +- src/stream/src/from_proto/dynamic_filter.rs | 4 - 17 files changed, 205 insertions(+), 290 deletions(-) delete mode 100644 src/stream/src/common/table/watermark.rs diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 3469851c566b..51bb66289e60 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -480,7 +480,7 @@ message DynamicFilterNode { // When the condition changes, we will tell downstream to insert the LHS records which now match the condition. // If this is false, we need to store RHS records which match the condition in the internal table. // When the condition changes, we will tell downstream to delete the LHS records which now no longer match the condition. - bool condition_always_relax = 5; + bool condition_always_relax = 5 [deprecated = true]; } // Delta join with two indexes. This is a pseudo plan node generated on frontend. On meta diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index 553ede90bcb7..88e4b032a6db 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -133,12 +133,7 @@ pub fn visit_stream_node_tables_inner( optional!(node.memo_table, "TemporalJoinMemo"); } NodeBody::DynamicFilter(node) => { - if node.condition_always_relax { - always!(node.left_table, "DynamicFilterLeftNotSatisfy"); - } else { - always!(node.left_table, "DynamicFilterLeft"); - } - + always!(node.left_table, "DynamicFilterLeft"); always!(node.right_table, "DynamicFilterRight"); } diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml index edc3bb6c364c..514a56f7dff6 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml @@ -42,7 +42,7 @@ select * from t1 where now() - interval '15 minutes' > ts; stream_plan: |- StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } - └─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } + └─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } ├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } └─StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } @@ -50,8 +50,7 @@ stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } - ├── tables: [ DynamicFilterLeftNotSatisfy: 0, DynamicFilterRight: 1 ] + └── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } { tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ] } ├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ ├── tables: [ StreamScan: 2 ] │ ├── Upstream @@ -141,7 +140,7 @@ select * from t1 where now() - interval '15 minutes' > ts; stream_plan: |- StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } - └─StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } + └─StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } ├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: Broadcast } └─StreamProject { exprs: [SubtractWithTimeZone(now, '00:15:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } @@ -149,8 +148,8 @@ stream_dist_plan: |+ Fragment 0 StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck } { tables: [ Materialize: 4294967294 ] } - └── StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } - ├── tables: [ DynamicFilterLeftNotSatisfy: 0, DynamicFilterRight: 1 ] + └── StreamDynamicFilter [append_only] { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } + ├── tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ] ├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ ├── tables: [ StreamScan: 2 ] │ ├── Upstream @@ -184,7 +183,7 @@ stream_plan: |- StreamMaterialize { columns: [ts, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck, watermark_columns: [ts] } └─StreamDynamicFilter { predicate: (t1.ts >= $expr2), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true } - ├─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } + ├─StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } │ ├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ └─StreamExchange { dist: Broadcast } │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } @@ -198,8 +197,7 @@ ├── tables: [ Materialize: 4294967294 ] └── StreamDynamicFilter { predicate: (t1.ts >= $expr2), output_watermarks: [t1.ts], output: [t1.ts, t1._row_id], cleaned_by_watermark: true } ├── tables: [ DynamicFilterLeft: 0, DynamicFilterRight: 1 ] - ├── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], condition_always_relax: true } - │ ├── tables: [ DynamicFilterLeftNotSatisfy: 2, DynamicFilterRight: 3 ] + ├── StreamDynamicFilter { predicate: (t1.ts < $expr1), output: [t1.ts, t1._row_id], cleaned_by_watermark: true } { tables: [ DynamicFilterLeft: 2, DynamicFilterRight: 3 ] } │ ├── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ │ ├── tables: [ StreamScan: 4 ] │ │ ├── Upstream @@ -248,7 +246,7 @@ └─StreamHashJoin { type: Inner, predicate: t1.a = t2.b, output: [t1.a, t1.ta, t2.b, t2.tb, t1._row_id, t2._row_id] } ├─StreamExchange { dist: HashShard(t1.a) } │ └─StreamDynamicFilter { predicate: (t1.ta >= $expr2), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } - │ ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true } + │ ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } │ │ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ │ └─StreamExchange { dist: Broadcast } │ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } @@ -279,7 +277,7 @@ │ └─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } └─StreamExchange { dist: HashShard(t1.a) } └─StreamDynamicFilter { predicate: (t1.ta >= $expr2), output_watermarks: [t1.ta], output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } - ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], condition_always_relax: true } + ├─StreamDynamicFilter { predicate: (t1.ta < $expr1), output: [t1.a, t1.ta, t1._row_id], cleaned_by_watermark: true } │ ├─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } │ └─StreamExchange { dist: Broadcast } │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } @@ -308,7 +306,7 @@ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.ta, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) } └─StreamExchange { dist: HashShard(t2.b) } └─StreamDynamicFilter { predicate: (t2.tb >= $expr2), output_watermarks: [t2.tb], output: [t2.b, t2.tb, t2._row_id], cleaned_by_watermark: true } - ├─StreamDynamicFilter { predicate: (t2.tb < $expr1), output: [t2.b, t2.tb, t2._row_id], condition_always_relax: true } + ├─StreamDynamicFilter { predicate: (t2.tb < $expr1), output: [t2.b, t2.tb, t2._row_id], cleaned_by_watermark: true } │ ├─StreamTableScan { table: t2, columns: [t2.b, t2.tb, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) } │ └─StreamExchange { dist: Broadcast } │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] } @@ -416,7 +414,7 @@ └─StreamUnion { all: true } ├─StreamExchange { dist: HashShard(t._row_id, $src, 0:Int32) } │ └─StreamProject { exprs: [t.t, t.a, $src, t._row_id, 0:Int32] } - │ └─StreamDynamicFilter { predicate: (t.t < $expr2), output: [t.t, t.a, t._row_id, $src], condition_always_relax: true } + │ └─StreamDynamicFilter { predicate: (t.t < $expr2), output: [t.t, t.a, t._row_id, $src], cleaned_by_watermark: true } │ ├─StreamFilter { predicate: Not((t.a > 1:Int32)) } │ │ └─StreamShare { id: 13 } │ │ └─StreamUnion { all: true } diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index 1d01650d68a0..02fb426905c7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -36,7 +36,6 @@ pub struct StreamDynamicFilter { pub base: PlanBase, core: generic::DynamicFilter, cleaned_by_watermark: bool, - condition_always_relax: bool, } impl StreamDynamicFilter { @@ -67,7 +66,6 @@ impl StreamDynamicFilter { base, core, cleaned_by_watermark, - condition_always_relax, } } @@ -100,7 +98,13 @@ impl StreamDynamicFilter { // downstream. See `derive_watermark_columns`. true } - _ => false, + ExprType::LessThan | ExprType::LessThanOrEqual => { + // For <= and <, watermark on rhs means all rows older than the watermark should already be emitted, + // and future lhs inputs should be directly passed to downstream without any state table operation. + // So, the state table can be cleaned up. + true + } + _ => unreachable!(), } } else { false @@ -124,12 +128,6 @@ impl Distill for StreamDynamicFilter { Pretty::display(&self.cleaned_by_watermark), )); } - if self.condition_always_relax { - vec.push(( - "condition_always_relax", - Pretty::display(&self.condition_always_relax), - )); - } childless_record( plan_node_name!( "StreamDynamicFilter", @@ -172,12 +170,13 @@ impl StreamNode for StreamDynamicFilter { let right = self.right(); let right_table = infer_right_internal_table_catalog(right.plan_base()) .with_id(state.gen_table_id_wrapped()); + #[allow(deprecated)] NodeBody::DynamicFilter(DynamicFilterNode { left_key: left_index as u32, condition, left_table: Some(left_table.to_internal_table_prost()), right_table: Some(right_table.to_internal_table_prost()), - condition_always_relax: self.condition_always_relax, + condition_always_relax: false, // deprecated }) } } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index fbff01658944..15d60f6546f9 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -403,6 +403,7 @@ pub trait LocalStateStore: StaticSendSync { read_options: ReadOptions, ) -> impl Future>> + Send + '_; + /// Get last persisted watermark for a given vnode. fn get_table_watermark(&self, vnode: VirtualNode) -> Option; /// Inserts a key-value entry associated with a given `epoch` into the state store. diff --git a/src/stream/src/common/table/mod.rs b/src/stream/src/common/table/mod.rs index 2fe40ab1cf6a..6a130cbd255a 100644 --- a/src/stream/src/common/table/mod.rs +++ b/src/stream/src/common/table/mod.rs @@ -14,7 +14,6 @@ pub mod state_table; mod state_table_cache; -mod watermark; #[cfg(test)] pub mod test_state_table; diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 070c0fad8349..6e4c083c33d4 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -64,21 +64,16 @@ use risingwave_storage::StateStore; use thiserror_ext::AsReport; use tracing::{trace, Instrument}; -use super::watermark::{WatermarkBufferByEpoch, WatermarkBufferStrategy}; use crate::cache::cache_may_stale; use crate::common::state_cache::{StateCache, StateCacheFiller}; use crate::common::table::state_table_cache::StateTableWatermarkCache; use crate::executor::{StreamExecutorError, StreamExecutorResult}; -/// This num is arbitrary and we may want to improve this choice in the future. -const STATE_CLEANING_PERIOD_EPOCH: usize = 300; /// Mostly watermark operators will have inserts (append-only). /// So this number should not need to be very large. /// But we may want to improve this choice in the future. const WATERMARK_CACHE_ENTRIES: usize = 16; -type DefaultWatermarkBufferStrategy = WatermarkBufferByEpoch; - /// This macro is used to mark a point where we want to randomly discard the operation and early /// return, only in insane mode. macro_rules! insane_mode_discard_point { @@ -97,12 +92,10 @@ pub struct StateTableInner< S, SD = BasicSerde, const IS_REPLICATED: bool = false, - W = DefaultWatermarkBufferStrategy, const USE_WATERMARK_CACHE: bool = false, > where S: StateStore, SD: ValueRowSerde, - W: WatermarkBufferStrategy, { /// Id for this table. table_id: TableId, @@ -138,15 +131,11 @@ pub struct StateTableInner< value_indices: Option>, - /// Strategy to buffer watermark for lazy state cleaning. - watermark_buffer_strategy: W, - /// State cleaning watermark. Old states will be cleaned under this watermark when committing. - state_clean_watermark: Option, - - /// Watermark of the last committed state cleaning. - prev_cleaned_watermark: Option, - - /// Watermark cache + /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing. + pending_watermark: Option, + /// Last committed watermark for state cleaning. Will be restored on state table recovery. + committed_watermark: Option, + /// Cache for the top-N primary keys for reducing unnecessary range deletion. watermark_cache: StateTableWatermarkCache, /// Data Types @@ -176,17 +165,15 @@ pub type StateTable = StateTableInner; pub type ReplicatedStateTable = StateTableInner; /// `WatermarkCacheStateTable` caches the watermark column. /// It will reduce state cleaning overhead. -pub type WatermarkCacheStateTable = - StateTableInner; +pub type WatermarkCacheStateTable = StateTableInner; pub type WatermarkCacheParameterizedStateTable = - StateTableInner; + StateTableInner; // initialize -impl StateTableInner +impl StateTableInner where S: StateStore, SD: ValueRowSerde, - W: WatermarkBufferStrategy, { /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called /// async interface only used for replicated state table, @@ -197,12 +184,10 @@ where } // initialize -impl - StateTableInner +impl StateTableInner where S: StateStore, SD: ValueRowSerde, - W: WatermarkBufferStrategy, { /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called /// No need to `wait_for_epoch`, so it should complete immediately. @@ -270,12 +255,11 @@ pub enum StateTableOpConsistencyLevel { // FIXME(kwannoel): Enforce that none of the constructors here // should be used by replicated state table. // Apart from from_table_catalog_inner. -impl - StateTableInner +impl + StateTableInner where S: StateStore, SD: ValueRowSerde, - W: WatermarkBufferStrategy, { /// Create state table from table catalog and store. /// @@ -453,6 +437,36 @@ where row_serde.kind().is_column_aware() ); + // Restore persisted table watermark. + let prefix_deser = if pk_indices.is_empty() { + None + } else { + Some(pk_serde.prefix(1)) + }; + let max_watermark_of_vnodes = distribution + .vnodes() + .iter_vnodes() + .filter_map(|vnode| local_state_store.get_table_watermark(vnode)) + .max(); + let committed_watermark = if let Some(deser) = prefix_deser + && let Some(max_watermark) = max_watermark_of_vnodes + { + let deserialized = deser + .deserialize(&max_watermark) + .ok() + .and_then(|row| row[0].clone()); + if deserialized.is_none() { + tracing::error!( + vnodes = ?distribution.vnodes(), + watermark = ?max_watermark, + "Failed to deserialize persisted watermark from state store.", + ); + } + deserialized + } else { + None + }; + let watermark_cache = if USE_WATERMARK_CACHE { StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES) } else { @@ -499,9 +513,8 @@ where prefix_hint_len, table_option, value_indices, - watermark_buffer_strategy: W::default(), - state_clean_watermark: None, - prev_cleaned_watermark: None, + pending_watermark: None, + committed_watermark, watermark_cache, data_types, output_indices, @@ -666,11 +679,15 @@ where .collect(); let pk_serde = OrderedRowSerde::new(pk_data_types, order_types); + // TODO: let's not restore watermark in unit tests for now, to avoid complexity. + let committed_watermark = None; + let watermark_cache = if USE_WATERMARK_CACHE { StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES) } else { StateTableWatermarkCache::new(0) }; + Self { table_id, local_store: local_state_store, @@ -682,9 +699,8 @@ where prefix_hint_len: 0, table_option: Default::default(), value_indices, - watermark_buffer_strategy: W::default(), - state_clean_watermark: None, - prev_cleaned_watermark: None, + pending_watermark: None, + committed_watermark, watermark_cache, data_types, output_indices: vec![], @@ -752,7 +768,7 @@ where } fn is_dirty(&self) -> bool { - self.local_store.is_dirty() || self.state_clean_watermark.is_some() + self.local_store.is_dirty() || self.pending_watermark.is_some() } pub fn is_consistent_op(&self) -> bool { @@ -764,11 +780,10 @@ where } } -impl StateTableInner +impl StateTableInner where S: StateStore, SD: ValueRowSerde, - W: WatermarkBufferStrategy, { /// Create replicated state table from table catalog with output indices pub async fn from_table_catalog_with_output_column_ids( @@ -789,13 +804,8 @@ where } // point get -impl< - S, - SD, - const IS_REPLICATED: bool, - W: WatermarkBufferStrategy, - const USE_WATERMARK_CACHE: bool, - > StateTableInner +impl + StateTableInner where S: StateStore, SD: ValueRowSerde, @@ -895,7 +905,7 @@ where let cache_may_stale = cache_may_stale(self.vnodes(), &new_vnodes); if cache_may_stale { - self.state_clean_watermark = None; + self.pending_watermark = None; if USE_WATERMARK_CACHE { self.watermark_cache.clear(); } @@ -909,13 +919,8 @@ where } // write -impl< - S, - SD, - const IS_REPLICATED: bool, - W: WatermarkBufferStrategy, - const USE_WATERMARK_CACHE: bool, - > StateTableInner +impl + StateTableInner where S: StateStore, SD: ValueRowSerde, @@ -1131,12 +1136,15 @@ where /// # Arguments /// /// * `watermark` - Latest watermark received. - /// * `eager_cleaning` - Whether to clean up the state table eagerly. - pub fn update_watermark(&mut self, watermark: ScalarImpl, eager_cleaning: bool) { + pub fn update_watermark(&mut self, watermark: ScalarImpl) { trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark"); - if self.watermark_buffer_strategy.apply() || eager_cleaning { - self.state_clean_watermark = Some(watermark); - } + self.pending_watermark = Some(watermark); + } + + /// Get the committed watermark of the state table. Watermarks should be fed into the state + /// table through `update_watermark` method. + pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> { + self.committed_watermark.as_ref() } pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> { @@ -1180,9 +1188,7 @@ where epoch = ?self.epoch(), "commit state table" ); - // Tick the watermark buffer here because state table is expected to be committed once - // per epoch. - self.watermark_buffer_strategy.tick(); + if !self.is_dirty() { // If the state table is not modified, go fast path. self.local_store.seal_current_epoch( @@ -1201,7 +1207,7 @@ where // Refresh watermark cache if it is out of sync. if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() { - if let Some(ref watermark) = self.prev_cleaned_watermark { + if let Some(ref watermark) = self.committed_watermark { let range: (Bound>, Bound>) = (Included(once(Some(watermark.clone()))), Unbounded); // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache @@ -1256,7 +1262,7 @@ where next_epoch: u64, switch_op_consistency_level: Option, ) -> StreamExecutorResult<()> { - let watermark = self.state_clean_watermark.take(); + let watermark = self.pending_watermark.take(); watermark.as_ref().inspect(|watermark| { trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning"); }); @@ -1328,7 +1334,7 @@ where )); } } - self.prev_cleaned_watermark = watermark; + self.committed_watermark = watermark; // Clear the watermark cache and force a resync. // TODO(kwannoel): This can be further optimized: @@ -1368,13 +1374,8 @@ impl<'a, T> KeyedRowStream<'a> for T where } // Iterator functions -impl< - S, - SD, - const IS_REPLICATED: bool, - W: WatermarkBufferStrategy, - const USE_WATERMARK_CACHE: bool, - > StateTableInner +impl + StateTableInner where S: StateStore, SD: ValueRowSerde, @@ -1575,13 +1576,8 @@ where } } -impl< - S, - SD, - const IS_REPLICATED: bool, - W: WatermarkBufferStrategy, - const USE_WATERMARK_CACHE: bool, - > StateTableInner +impl + StateTableInner where S: StateStore, SD: ValueRowSerde, diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index d6e9c9bed5b9..098548c21ac9 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -1399,7 +1399,7 @@ async fn test_state_table_watermark_cache_ignore_null() { assert_eq!(cache.len(), 0); let watermark = Timestamptz::from_secs(2500).unwrap().to_scalar_value(); - state_table.update_watermark(watermark, true); + state_table.update_watermark(watermark); epoch.inc_for_test(); test_env @@ -1486,7 +1486,7 @@ async fn test_state_table_watermark_cache_write_chunk() { assert_eq!(cache.len(), 0); let watermark = Timestamptz::from_secs(0).unwrap().to_scalar_value(); - state_table.update_watermark(watermark, true); + state_table.update_watermark(watermark); epoch.inc_for_test(); test_env @@ -1598,7 +1598,7 @@ async fn test_state_table_watermark_cache_write_chunk() { // Should not cleanup anything. let watermark = Timestamptz::from_secs(2500).unwrap().to_scalar_value(); - state_table.update_watermark(watermark, true); + state_table.update_watermark(watermark); epoch.inc_for_test(); test_env @@ -1701,7 +1701,7 @@ async fn test_state_table_watermark_cache_refill() { assert_eq!(cache.len(), 0); let watermark = Timestamptz::from_secs(2500).unwrap().to_scalar_value(); - state_table.update_watermark(watermark, true); + state_table.update_watermark(watermark); epoch.inc_for_test(); test_env diff --git a/src/stream/src/common/table/watermark.rs b/src/stream/src/common/table/watermark.rs deleted file mode 100644 index 3e618873ec65..000000000000 --- a/src/stream/src/common/table/watermark.rs +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/// Strategy to decide how to buffer the watermarks, used for state cleaning. -pub trait WatermarkBufferStrategy: Default { - /// Trigger when a epoch is committed. - fn tick(&mut self); - - /// Whether to clear the buffer. - /// - /// Returns true to indicate that the buffer should be cleared and the strategy states reset. - fn apply(&mut self) -> bool; -} - -/// No buffer, apply watermark to memory immediately. -/// Use the strategy when you want to apply the watermark immediately. -#[derive(Default, Debug)] -pub struct WatermarkNoBuffer; - -impl WatermarkBufferStrategy for WatermarkNoBuffer { - fn tick(&mut self) {} - - fn apply(&mut self) -> bool { - true - } -} - -/// Buffer the watermark by a epoch period. -/// The strategy reduced the delete-range calls to storage. -#[derive(Default, Debug)] -pub struct WatermarkBufferByEpoch { - /// number of epochs since the last time we did state cleaning by watermark. - buffered_epochs_cnt: usize, -} - -impl WatermarkBufferStrategy for WatermarkBufferByEpoch { - fn tick(&mut self) { - self.buffered_epochs_cnt += 1; - } - - fn apply(&mut self) -> bool { - if self.buffered_epochs_cnt >= PERIOD { - self.buffered_epochs_cnt = 0; - true - } else { - false - } - } -} diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 65530f816295..ff44399b22a3 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -16,7 +16,6 @@ use std::ops::Bound::{self, *}; use futures::stream; use risingwave_common::array::{Array, ArrayImpl, Op}; -use risingwave_common::bail; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{self, once, OwnedRow as RowData}; @@ -52,10 +51,6 @@ pub struct DynamicFilterExecutor metrics: Arc, /// The maximum size of the chunk produced by executor at a time. chunk_size: usize, - /// If the right side's change always make the condition more relaxed. - /// In other words, make more record in the left side satisfy the condition. - /// In that case, there are only records which does not satisfy the condition in the table. - condition_always_relax: bool, cleaned_by_watermark: bool, } @@ -73,7 +68,6 @@ impl DynamicFilterExecutor, metrics: Arc, chunk_size: usize, - condition_always_relax: bool, cleaned_by_watermark: bool, ) -> Self { Self { @@ -88,7 +82,6 @@ impl DynamicFilterExecutor DynamicFilterExecutor, + below_watermark_condition: Option, ) -> Result<(Vec, Bitmap), StreamExecutorError> { let mut new_ops = Vec::with_capacity(chunk.capacity()); let mut new_visibility = BitmapBuilder::with_capacity(chunk.capacity()); @@ -108,6 +102,12 @@ impl DynamicFilterExecutor DynamicFilterExecutor { @@ -162,9 +172,16 @@ impl DynamicFilterExecutor= and >, watermark on rhs means that any future changes that satisfy the + // filter condition should >= state clean watermark, and those below the watermark + // will never satisfy the filter condition. + // 2. For < and <=, watermark on rhs means that any future changes that are below + // the watermark will always satisfy the filter condition, so those changes should + // always be directly sent to downstream without any state table maintenance. + if below_watermark { continue; } @@ -274,11 +291,13 @@ impl DynamicFilterExecutor DynamicFilterExecutor DynamicFilterExecutor DynamicFilterExecutor DynamicFilterExecutor (MessageSender, MessageSender, BoxedMessageStream) { - let mem_state = MemoryStateStore::new(); - create_executor_inner(comparator, mem_state, false).await - } - - async fn create_executor_inner( - comparator: PbExprNodeType, - mem_state: MemoryStateStore, - always_relax: bool, - ) -> (MessageSender, MessageSender, BoxedMessageStream) { - let (mem_state_l, mem_state_r) = create_in_memory_state_table(mem_state).await; + let (mem_state_l, mem_state_r) = create_in_memory_state_table(store).await; let schema = Schema { fields: vec![Field::unnamed(DataType::Int64)], }; @@ -572,8 +584,7 @@ mod tests { mem_state_r, Arc::new(StreamingMetrics::unused()), 1024, - always_relax, - false, + cleaned_by_watermark, ); (tx_l, tx_r, executor.boxed().execute()) } @@ -608,9 +619,9 @@ mod tests { " I + 4", ); - let mem_state = MemoryStateStore::new(); + let mem_store = MemoryStateStore::new(); let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor_inner(PbExprNodeType::GreaterThan, mem_state.clone(), false).await; + create_executor(PbExprNodeType::GreaterThan, mem_store.clone(), false).await; // push the init barrier for left and right tx_l.push_barrier(test_epoch(1), false); @@ -633,7 +644,7 @@ mod tests { // Recover executor from state store let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor_inner(PbExprNodeType::GreaterThan, mem_state.clone(), false).await; + create_executor(PbExprNodeType::GreaterThan, mem_store.clone(), false).await; // push the recovery barrier for left and right tx_l.push_barrier(test_epoch(2), false); @@ -680,7 +691,7 @@ mod tests { // Recover executor from state store let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor_inner(PbExprNodeType::GreaterThan, mem_state.clone(), false).await; + create_executor(PbExprNodeType::GreaterThan, mem_store.clone(), false).await; // push recovery barrier tx_l.push_barrier(test_epoch(3), false); @@ -765,8 +776,9 @@ mod tests { " I + 4", ); + let mem_store = MemoryStateStore::new(); let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor(PbExprNodeType::GreaterThan).await; + create_executor(PbExprNodeType::GreaterThan, mem_store, false).await; // push the init barrier for left and right tx_l.push_barrier(test_epoch(1), false); @@ -871,8 +883,9 @@ mod tests { " I + 5", ); + let mem_store = MemoryStateStore::new(); let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor(PbExprNodeType::GreaterThanOrEqual).await; + create_executor(PbExprNodeType::GreaterThanOrEqual, mem_store, false).await; // push the init barrier for left and right tx_l.push_barrier(test_epoch(1), false); @@ -977,8 +990,9 @@ mod tests { " I + 1", ); + let mem_store = MemoryStateStore::new(); let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor(PbExprNodeType::LessThan).await; + create_executor(PbExprNodeType::LessThan, mem_store, false).await; // push the init barrier for left and right tx_l.push_barrier(test_epoch(1), false); @@ -1083,8 +1097,9 @@ mod tests { " I + 0", ); + let mem_store = MemoryStateStore::new(); let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor(PbExprNodeType::LessThanOrEqual).await; + create_executor(PbExprNodeType::LessThanOrEqual, mem_store, false).await; // push the init barrier for left and right tx_l.push_barrier(test_epoch(1), false); @@ -1176,9 +1191,10 @@ mod tests { } #[tokio::test] - async fn test_dynamic_filter_always_relax() -> StreamExecutorResult<()> { + async fn test_dynamic_filter_state_cleaning() -> StreamExecutorResult<()> { let chunk_l1 = StreamChunk::from_pretty( " I + + 1 + 2 + 3 + 4 @@ -1194,17 +1210,19 @@ mod tests { " I + 2", ); + let watermark_r1 = 2; let chunk_r2 = StreamChunk::from_pretty( " I + 5", ); + let watermark_r2 = 5; - let mem_state = MemoryStateStore::new(); + let mem_store = MemoryStateStore::new(); let (mut tx_l, mut tx_r, mut dynamic_filter) = - create_executor_inner(PbExprNodeType::LessThanOrEqual, mem_state.clone(), true).await; + create_executor(PbExprNodeType::LessThanOrEqual, mem_store.clone(), true).await; let column_descs = ColumnDesc::unnamed(ColumnId::new(0), DataType::Int64); let table = StorageTable::for_test( - mem_state.clone(), + mem_store.clone(), TableId::new(0), vec![column_descs], vec![OrderType::ascending()], @@ -1217,69 +1235,48 @@ mod tests { tx_r.push_barrier(test_epoch(1), false); dynamic_filter.next_unwrap_ready_barrier()?; - // push the 1st right chunk + // push the 1st set of messages + tx_l.push_chunk(chunk_l1); tx_r.push_chunk(chunk_r1); - - // push the init barrier for left and right + tx_r.push_int64_watermark(0, watermark_r1); tx_l.push_barrier(test_epoch(2), false); tx_r.push_barrier(test_epoch(2), false); - // Get the barrier - dynamic_filter.next_unwrap_ready_barrier()?; - - // push the 1st left chunk - tx_l.push_chunk(chunk_l1); - - let chunk = dynamic_filter.next_unwrap_ready_chunk()?.compact(); + let chunk = dynamic_filter.expect_chunk().await; assert_eq!( - chunk, + chunk.compact(), StreamChunk::from_pretty( " I + + 1 + 2" ) ); - // push the init barrier for left and right - tx_l.push_barrier(test_epoch(3), false); - tx_r.push_barrier(test_epoch(3), false); - - // Get the barrier - dynamic_filter.next_unwrap_ready_barrier()?; + dynamic_filter.expect_barrier().await; - assert!(!in_table(&table, 2).await); + assert!(!in_table(&table, 1).await); // `1` should be cleaned because it's less than watermark + assert!(in_table(&table, 2).await); assert!(in_table(&table, 3).await); assert!(in_table(&table, 4).await); - // push the 2nd left chunk + // push the 2nd set of messages tx_l.push_chunk(chunk_l2); - let chunk = dynamic_filter.next_unwrap_ready_chunk()?.compact(); + tx_r.push_chunk(chunk_r2); + tx_r.push_int64_watermark(0, watermark_r2); + tx_l.push_barrier(test_epoch(3), false); + tx_r.push_barrier(test_epoch(3), false); + + let chunk = dynamic_filter.expect_chunk().await; assert_eq!( - chunk, + chunk.compact(), StreamChunk::from_pretty( + // the two rows are directly sent to the output cuz they satisfy the condition of previously committed rhs " I + 1 - 2" ) ); - // push the init barrier for left and right - tx_l.push_barrier(test_epoch(4), false); - tx_r.push_barrier(test_epoch(4), false); - // Get the barrier - dynamic_filter.next_unwrap_ready_barrier()?; - - assert!(!in_table(&table, 2).await); - assert!(!in_table(&table, 2).await); - assert!(!in_table(&table, 3).await); - assert!(in_table(&table, 4).await); - - // push the 2nd right chunk - tx_r.push_chunk(chunk_r2); - - // push the init barrier for left and right - tx_l.push_barrier(test_epoch(5), false); - tx_r.push_barrier(test_epoch(5), false); - - let chunk = dynamic_filter.next_unwrap_ready_chunk()?.compact(); + let chunk = dynamic_filter.expect_chunk().await; assert_eq!( chunk, StreamChunk::from_pretty( @@ -1289,17 +1286,13 @@ mod tests { ) ); - // Get the barrier - dynamic_filter.next_unwrap_ready_barrier()?; - tx_l.push_barrier(test_epoch(6), false); - tx_r.push_barrier(test_epoch(6), false); - // Get the barrier - dynamic_filter.next_unwrap_ready_barrier()?; - // This part test need change the `DefaultWatermarkBufferStrategy` to `super::watermark::WatermarkNoBuffer` - // clean is the Bound::Exclude - // TODO: https://github.com/risingwavelabs/risingwave/issues/14014 - // assert!(!in_table(&table, 4).await); - // assert!(in_table(&table, 5).await); + dynamic_filter.expect_barrier().await; + + assert!(!in_table(&table, 2).await); + assert!(!in_table(&table, 3).await); + assert!(!in_table(&table, 4).await); + assert!(in_table(&table, 5).await); + Ok(()) } } diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 41ab0ddb12c5..6c6a99f4c242 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -489,7 +489,7 @@ impl HashAggExecutor { if let Some(watermark) = window_watermark { // Update watermark of state tables, for state cleaning. this.all_state_tables_mut() - .for_each(|table| table.update_watermark(watermark.clone(), false)); + .for_each(|table| table.update_watermark(watermark.clone())); } // Flush distinct dedup state. diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 8feed4a13b24..10e9e26f784c 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -310,8 +310,8 @@ impl JoinHashMap { pub fn update_watermark(&mut self, watermark: ScalarImpl) { // TODO: remove data in cache. - self.state.table.update_watermark(watermark.clone(), false); - self.degree_state.table.update_watermark(watermark, false); + self.state.table.update_watermark(watermark.clone()); + self.degree_state.table.update_watermark(watermark); } /// Take the state for the given `key` out of the hash table and return it. One **MUST** call diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index c2dfcea07684..b3d658f3af4a 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -177,8 +177,8 @@ impl SortBuffer { } // TODO(rc): Need something like `table.range_delete()`. Here we call - // `update_watermark(watermark, true)` as an alternative to `range_delete((..watermark))`. - buffer_table.update_watermark(watermark, true); + // `update_watermark(watermark)` as an alternative to `range_delete((..watermark))`. + buffer_table.update_watermark(watermark); } #[try_stream(ok = OwnedRow, error = StreamExecutorError)] diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index b690dce70057..0b65dcaa4682 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -239,8 +239,7 @@ where async fn handle_watermark(&mut self, watermark: Watermark) -> Option { if watermark.col_idx == self.group_by[0] { - self.managed_state - .update_watermark(watermark.val.clone(), false); + self.managed_state.update_watermark(watermark.val.clone()); Some(watermark) } else { None diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index 83ef8df772fc..2f9bd2d54702 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -208,8 +208,7 @@ where async fn handle_watermark(&mut self, watermark: Watermark) -> Option { if watermark.col_idx == self.group_by[0] { - self.managed_state - .update_watermark(watermark.val.clone(), false); + self.managed_state.update_watermark(watermark.val.clone()); Some(watermark) } else { None diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 58a0c732f803..5e6600209c74 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -77,8 +77,8 @@ impl ManagedTopNState { } /// Update watermark for the managed state table. - pub fn update_watermark(&mut self, watermark: ScalarImpl, eager_cleaning: bool) { - self.state_table.update_watermark(watermark, eager_cleaning) + pub fn update_watermark(&mut self, watermark: ScalarImpl) { + self.state_table.update_watermark(watermark) } pub fn insert(&mut self, value: impl Row) { diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index c6ccd7038254..dbccf78e2aaf 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -50,8 +50,6 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { ); } - let condition_always_relax = node.get_condition_always_relax(); - let state_table_r = StateTable::from_table_catalog(node.get_right_table()?, store.clone(), None).await; @@ -75,7 +73,6 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { state_table_r, params.executor_stats, params.env.config().developer.chunk_size, - condition_always_relax, cleaned_by_watermark, ) .boxed() @@ -95,7 +92,6 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { state_table_r, params.executor_stats, params.env.config().developer.chunk_size, - condition_always_relax, cleaned_by_watermark, ) .boxed()