From ba3594e640c61564e33374896ca594b1ec02f507 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 4 Jul 2024 16:57:16 +0800 Subject: [PATCH 1/6] add `columns_monotonicity` field for PlanNode Signed-off-by: Richard Chien --- src/frontend/src/optimizer/plan_node/mod.rs | 6 +++- .../src/optimizer/plan_node/plan_base.rs | 20 ++++++++++- .../src/optimizer/plan_node/stream.rs | 2 ++ .../src/optimizer/property/monotonicity.rs | 34 +++++++++++++++++++ 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index b5062398270e5..ee2b16265e7aa 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -49,7 +49,7 @@ use self::batch::BatchPlanRef; use self::generic::{GenericPlanRef, PhysicalPlanRef}; use self::stream::StreamPlanRef; use self::utils::Distill; -use super::property::{Distribution, FunctionalDependencySet, Order}; +use super::property::{Distribution, FunctionalDependencySet, MonotonicityMap, Order}; use crate::error::{ErrorCode, Result}; use crate::optimizer::ExpressionSimplifyRewriter; use crate::session::current::notice_to_user; @@ -609,6 +609,10 @@ impl StreamPlanRef for PlanRef { fn watermark_columns(&self) -> &FixedBitSet { self.plan_base().watermark_columns() } + + fn columns_monotonicity(&self) -> &MonotonicityMap { + self.plan_base().columns_monotonicity() + } } /// Allow access to all fields defined in [`BatchPlanRef`] for the type-erased plan node. diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 12fba475241c5..f4e1f93d90158 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -58,7 +58,9 @@ pub struct StreamExtra { emit_on_window_close: bool, /// The watermark column indices of the `PlanNode`'s output. There could be watermark output from /// this stream operator. - watermark_columns: FixedBitSet, + watermark_columns: FixedBitSet, // TODO(): use `column_monotonicity` instead + /// The monotonicity of columns in the output. + columns_monotonicity: MonotonicityMap, } impl GetPhysicalCommon for StreamExtra { @@ -168,6 +170,10 @@ impl stream::StreamPlanRef for PlanBase { fn watermark_columns(&self) -> &FixedBitSet { &self.extra.watermark_columns } + + fn columns_monotonicity(&self) -> &MonotonicityMap { + &self.extra.columns_monotonicity + } } impl batch::BatchPlanRef for PlanBase { @@ -222,6 +228,7 @@ impl PlanBase { append_only: bool, emit_on_window_close: bool, watermark_columns: FixedBitSet, + columns_monotonicity: MonotonicityMap, ) -> Self { let id = ctx.next_plan_node_id(); assert_eq!(watermark_columns.len(), schema.len()); @@ -236,6 +243,7 @@ impl PlanBase { append_only, emit_on_window_close, watermark_columns, + columns_monotonicity, }, } } @@ -246,6 +254,7 @@ impl PlanBase { append_only: bool, emit_on_window_close: bool, watermark_columns: FixedBitSet, + columns_monotonicity: MonotonicityMap, ) -> Self { Self::new_stream( core.ctx(), @@ -256,6 +265,7 @@ impl PlanBase { append_only, emit_on_window_close, watermark_columns, + columns_monotonicity, ) } } @@ -383,6 +393,10 @@ impl<'a> PlanBaseRef<'a> { dispatch_plan_base!(self, [Stream], StreamPlanRef::watermark_columns) } + pub(super) fn columns_monotonicity(self) -> &'a MonotonicityMap { + dispatch_plan_base!(self, [Stream], StreamPlanRef::columns_monotonicity) + } + pub(super) fn order(self) -> &'a Order { dispatch_plan_base!(self, [Batch], BatchPlanRef::order) } @@ -428,6 +442,10 @@ impl StreamPlanRef for PlanBaseRef<'_> { fn watermark_columns(&self) -> &FixedBitSet { (*self).watermark_columns() } + + fn columns_monotonicity(&self) -> &MonotonicityMap { + (*self).columns_monotonicity() + } } impl BatchPlanRef for PlanBaseRef<'_> { diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 42a599ccd60b0..e2df99d13d9f4 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -15,6 +15,7 @@ use fixedbitset::FixedBitSet; use super::generic::PhysicalPlanRef; +use crate::optimizer::property::MonotonicityMap; /// A subtrait of [`PhysicalPlanRef`] for stream plans. /// @@ -29,6 +30,7 @@ pub trait StreamPlanRef: PhysicalPlanRef { fn append_only(&self) -> bool; fn emit_on_window_close(&self) -> bool; fn watermark_columns(&self) -> &FixedBitSet; + fn columns_monotonicity(&self) -> &MonotonicityMap; } /// Prelude for stream plan nodes. diff --git a/src/frontend/src/optimizer/property/monotonicity.rs b/src/frontend/src/optimizer/property/monotonicity.rs index 87f74c25b83f4..0dfc1c47ef43d 100644 --- a/src/frontend/src/optimizer/property/monotonicity.rs +++ b/src/frontend/src/optimizer/property/monotonicity.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; +use std::ops::{Index, IndexMut}; + use enum_as_inner::EnumAsInner; use risingwave_common::types::DataType; use risingwave_pb::expr::expr_node::Type as ExprType; @@ -271,3 +274,34 @@ impl MonotonicityAnalyzer { Inherent(Unknown) } } + +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] +pub struct MonotonicityMap(BTreeMap); + +impl MonotonicityMap { + pub fn new() -> Self { + MonotonicityMap(BTreeMap::new()) + } + + pub fn insert(&mut self, idx: usize, monotonicity: Monotonicity) { + self.0.insert(idx, monotonicity); + } + + pub fn get(&self, idx: usize) -> Monotonicity { + self.0.get(&idx).copied().unwrap_or(Monotonicity::Unknown) + } +} + +impl Index for MonotonicityMap { + type Output = Monotonicity; + + fn index(&self, idx: usize) -> &Self::Output { + self.0.get(&idx).unwrap_or(&Monotonicity::Unknown) + } +} + +impl IndexMut for MonotonicityMap { + fn index_mut(&mut self, idx: usize) -> &mut Self::Output { + self.0.entry(idx).or_insert(Monotonicity::Unknown) + } +} From 01fda0041b00bc27f717e34f667eed6a5460cedc Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 9 Jul 2024 16:25:33 +0800 Subject: [PATCH 2/6] derive monotonicity in each stream node Signed-off-by: Richard Chien --- .../optimizer/plan_node/generic/cdc_scan.rs | 6 ++++- .../src/optimizer/plan_node/logical_source.rs | 1 + .../src/optimizer/plan_node/plan_base.rs | 2 +- .../plan_node/stream_cdc_table_scan.rs | 1 + .../optimizer/plan_node/stream_changelog.rs | 1 + .../src/optimizer/plan_node/stream_dedup.rs | 1 + .../optimizer/plan_node/stream_delta_join.rs | 1 + .../src/optimizer/plan_node/stream_dml.rs | 1 + .../plan_node/stream_dynamic_filter.rs | 1 + .../plan_node/stream_eowc_over_window.rs | 1 + .../optimizer/plan_node/stream_exchange.rs | 2 ++ .../src/optimizer/plan_node/stream_expand.rs | 1 + .../src/optimizer/plan_node/stream_filter.rs | 1 + .../optimizer/plan_node/stream_fs_fetch.rs | 1 + .../optimizer/plan_node/stream_group_topn.rs | 1 + .../optimizer/plan_node/stream_hash_agg.rs | 1 + .../optimizer/plan_node/stream_hash_join.rs | 1 + .../optimizer/plan_node/stream_hop_window.rs | 1 + .../optimizer/plan_node/stream_materialize.rs | 1 + .../src/optimizer/plan_node/stream_now.rs | 7 +++++- .../optimizer/plan_node/stream_over_window.rs | 1 + .../src/optimizer/plan_node/stream_project.rs | 13 ++++++---- .../optimizer/plan_node/stream_project_set.rs | 9 +++---- .../optimizer/plan_node/stream_row_id_gen.rs | 1 + .../src/optimizer/plan_node/stream_share.rs | 1 + .../optimizer/plan_node/stream_simple_agg.rs | 9 ++++++- .../src/optimizer/plan_node/stream_sort.rs | 8 ++++++ .../src/optimizer/plan_node/stream_source.rs | 1 + .../optimizer/plan_node/stream_source_scan.rs | 1 + .../plan_node/stream_stateless_simple_agg.rs | 1 + .../optimizer/plan_node/stream_table_scan.rs | 1 + .../plan_node/stream_temporal_join.rs | 7 ++++++ .../src/optimizer/plan_node/stream_topn.rs | 9 ++++++- .../src/optimizer/plan_node/stream_union.rs | 2 ++ .../src/optimizer/plan_node/stream_values.rs | 1 + .../plan_node/stream_watermark_filter.rs | 2 ++ .../src/optimizer/property/monotonicity.rs | 25 ++++++++++++++++--- .../src/utils/column_index_mapping.rs | 13 +++++++++- 38 files changed, 119 insertions(+), 19 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs b/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs index 2d7d708291e47..ff1018de2e633 100644 --- a/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs @@ -33,7 +33,7 @@ use crate::catalog::ColumnId; use crate::error::Result; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::property::FunctionalDependencySet; +use crate::optimizer::property::{FunctionalDependencySet, MonotonicityMap}; use crate::WithOptions; /// [`CdcScan`] reads rows of a table from an external upstream database @@ -125,6 +125,10 @@ impl CdcScan { FixedBitSet::with_capacity(self.get_table_columns().len()) } + pub fn columns_monotonicity(&self) -> MonotonicityMap { + MonotonicityMap::new() + } + pub(crate) fn column_names_with_table_prefix(&self) -> Vec { self.output_col_idx .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 918db2919e626..d22f26184bc08 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -229,6 +229,7 @@ impl LogicalSource { true, // `list` will keep listing all objects, it must be append-only false, FixedBitSet::with_capacity(logical_source.column_catalog.len()), + Default::default(), ), core: logical_source, } diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index f4e1f93d90158..02c85858967f8 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -58,7 +58,7 @@ pub struct StreamExtra { emit_on_window_close: bool, /// The watermark column indices of the `PlanNode`'s output. There could be watermark output from /// this stream operator. - watermark_columns: FixedBitSet, // TODO(): use `column_monotonicity` instead + watermark_columns: FixedBitSet, /// The monotonicity of columns in the output. columns_monotonicity: MonotonicityMap, } diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 9fe3347171453..a7aef5195ea5a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -50,6 +50,7 @@ impl StreamCdcTableScan { core.append_only(), false, core.watermark_columns(), + core.columns_monotonicity(), ); Self { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_changelog.rs b/src/frontend/src/optimizer/plan_node/stream_changelog.rs index b02c5eeb0c355..d305e9a315814 100644 --- a/src/frontend/src/optimizer/plan_node/stream_changelog.rs +++ b/src/frontend/src/optimizer/plan_node/stream_changelog.rs @@ -48,6 +48,7 @@ impl StreamChangeLog { true, input.emit_on_window_close(), watermark_columns, + Default::default(), ); StreamChangeLog { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index b31415c125507..d642d0f9e7ee0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -44,6 +44,7 @@ impl StreamDedup { true, input.emit_on_window_close(), input.watermark_columns().clone(), + input.columns_monotonicity().clone(), ); StreamDedup { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 7a99c8f7955b8..ab79b368b2373 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -76,6 +76,7 @@ impl StreamDeltaJoin { append_only, false, // TODO(rc): derive EOWC property from input watermark_columns, + Default::default(), ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index e777041718291..244853c66eb1d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -41,6 +41,7 @@ impl StreamDml { append_only, false, // TODO(rc): decide EOWC property FixedBitSet::with_capacity(input.schema().len()), // no watermark if dml is allowed + Default::default(), ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index a6c31b9197eb8..69386bdb8079c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -77,6 +77,7 @@ impl StreamDynamicFilter { out_append_only, false, // TODO(rc): decide EOWC property Self::derive_watermark_columns(&core), + Default::default(), ); let cleaned_by_watermark = Self::cleaned_by_watermark(&core); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 78cb0e3b9d605..af37ddeb79f7f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -58,6 +58,7 @@ impl StreamEowcOverWindow { true, true, watermark_columns, + input.columns_monotonicity().clone(), ); StreamEowcOverWindow { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 964accc0f69a4..9fc2d3d748097 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -44,6 +44,7 @@ impl StreamExchange { input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), + input.columns_monotonicity().clone(), ); StreamExchange { base, @@ -64,6 +65,7 @@ impl StreamExchange { input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), + input.columns_monotonicity().clone(), ); StreamExchange { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index 4f38e95cdfea4..16f89459a1c69 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -52,6 +52,7 @@ impl StreamExpand { input.append_only(), input.emit_on_window_close(), watermark_columns, + Default::default(), ); StreamExpand { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 586b5ac8a84b9..0a3126ffe7180 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -42,6 +42,7 @@ impl StreamFilter { input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), + input.columns_monotonicity().clone(), ); StreamFilter { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index c574941236729..19ee7c3037d09 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -55,6 +55,7 @@ impl StreamFsFetch { source.catalog.as_ref().map_or(true, |s| s.append_only), false, FixedBitSet::with_capacity(source.column_catalog.len()), + Default::default(), ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 0cd8edc996c89..cab663ada96a4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -79,6 +79,7 @@ impl StreamGroupTopN { // TODO: https://github.com/risingwavelabs/risingwave/issues/8348 false, watermark_columns, + Default::default(), ); StreamGroupTopN { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index eb69d5c259bb6..55b25860c0f53 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -93,6 +93,7 @@ impl StreamHashAgg { emit_on_window_close, // in EOWC mode, we produce append only output emit_on_window_close, watermark_columns, + Default::default(), ); StreamHashAgg { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index b803fccef7b07..5cf381eef1e72 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -196,6 +196,7 @@ impl StreamHashJoin { append_only, false, // TODO(rc): derive EOWC property from input watermark_columns, + Default::default(), ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index a94dfbe788f88..f7284f0d8e8f3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -62,6 +62,7 @@ impl StreamHopWindow { input.append_only(), input.emit_on_window_close(), internal2output.rewrite_bitset(&watermark_columns), + Default::default(), // hop window start/end jumps, so monotonicity is not propagated ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index e5a2496916adc..865dc71191b46 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -59,6 +59,7 @@ impl StreamMaterialize { input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), + input.columns_monotonicity().clone(), ); Self { base, input, table } } diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index 22a0d2c5fb0fb..9ec80d15bac30 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -26,7 +26,7 @@ use super::utils::{childless_record, Distill, TableCatalogBuilder}; use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, Monotonicity, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -39,12 +39,17 @@ impl StreamNow { pub fn new(core: generic::Now) -> Self { let mut watermark_columns = FixedBitSet::with_capacity(1); watermark_columns.set(0, true); + + let mut columns_monotonicity = MonotonicityMap::new(); + columns_monotonicity.insert(0, Monotonicity::NonDecreasing); + let base = PlanBase::new_stream_with_core( &core, Distribution::Single, core.mode.is_generate_series(), // append only core.mode.is_generate_series(), // emit on window close watermark_columns, + columns_monotonicity, ); Self { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index be6c63bcb50de..42d864d78bdaf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -45,6 +45,7 @@ impl StreamOverWindow { false, // general over window cannot be append-only false, watermark_columns, + Default::default(), ); StreamOverWindow { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index e5828a3267064..ef879627c66bd 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -22,7 +22,7 @@ use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants}; +use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -82,19 +82,21 @@ impl StreamProject { let mut watermark_derivations = vec![]; let mut nondecreasing_exprs = vec![]; let mut out_watermark_columns = FixedBitSet::with_capacity(core.exprs.len()); + let mut out_monotonicity_map = MonotonicityMap::new(); for (expr_idx, expr) in core.exprs.iter().enumerate() { use monotonicity_variants::*; match analyze_monotonicity(expr) { - Inherent(Constant) => { - // XXX(rc): we can produce one watermark on each recovery for this case. - } Inherent(monotonicity) => { - if monotonicity.is_non_decreasing() { + out_monotonicity_map.insert(expr_idx, monotonicity); + if monotonicity.is_non_decreasing() && !monotonicity.is_constant() { + // TODO(rc): may be we should also derive watermark for constant later nondecreasing_exprs.push(expr_idx); // to produce watermarks out_watermark_columns.insert(expr_idx); } } FollowingInput(input_idx) => { + let in_monotonicity = input.columns_monotonicity()[input_idx]; + out_monotonicity_map.insert(expr_idx, in_monotonicity); if input.watermark_columns().contains(input_idx) { watermark_derivations.push((input_idx, expr_idx)); // to propagate watermarks out_watermark_columns.insert(expr_idx); @@ -111,6 +113,7 @@ impl StreamProject { input.append_only(), input.emit_on_window_close(), out_watermark_columns, + out_monotonicity_map, ); StreamProject { diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 4630e1c62c831..7a5d629255e01 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -22,7 +22,7 @@ use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants}; +use crate::optimizer::property::{analyze_monotonicity, monotonicity_variants, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -53,11 +53,9 @@ impl StreamProjectSet { use monotonicity_variants::*; match analyze_monotonicity(expr) { - Inherent(Constant) => { - // XXX(rc): we can produce one watermark on each recovery for this case. - } Inherent(monotonicity) => { - if monotonicity.is_non_decreasing() { + if monotonicity.is_non_decreasing() && !monotonicity.is_constant() { + // TODO(rc): may be we should also derive watermark for constant later // FIXME(rc): we need to check expr is not table function nondecreasing_exprs.push(expr_idx); // to produce watermarks out_watermark_columns.insert(out_expr_idx); @@ -81,6 +79,7 @@ impl StreamProjectSet { input.append_only(), input.emit_on_window_close(), out_watermark_columns, + MonotonicityMap::new(), ); StreamProjectSet { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index bf4bafeed26d0..36b96f4dad36e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -50,6 +50,7 @@ impl StreamRowIdGen { input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), + input.columns_monotonicity().clone(), ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 5bf575f622bce..b082d82b022d6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -44,6 +44,7 @@ impl StreamShare { input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), + input.columns_monotonicity().clone(), ) }; diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index 28a377ca04cd2..604f7ab9b7c8c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -50,7 +50,14 @@ impl StreamSimpleAgg { let watermark_columns = FixedBitSet::with_capacity(core.output_len()); // Simple agg executor might change the append-only behavior of the stream. - let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns); + let base = PlanBase::new_stream_with_core( + &core, + dist, + false, + false, + watermark_columns, + Default::default(), + ); StreamSimpleAgg { base, core, diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index 6b45f8fd35a6a..c4acd275f1236 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -24,6 +24,7 @@ use super::stream::prelude::*; use super::utils::{childless_record, Distill, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::{Monotonicity, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -53,8 +54,14 @@ impl StreamEowcSort { let stream_key = input.stream_key().map(|v| v.to_vec()); let fd_set = input.functional_dependency().clone(); let dist = input.distribution().clone(); + let mut watermark_columns = FixedBitSet::with_capacity(input.schema().len()); watermark_columns.insert(sort_column_index); + + // StreamEowcSort makes the sorting watermark column non-decreasing + let mut columns_monotonicity = MonotonicityMap::new(); + columns_monotonicity.insert(sort_column_index, Monotonicity::NonDecreasing); + let base = PlanBase::new_stream( input.ctx(), schema, @@ -64,6 +71,7 @@ impl StreamEowcSort { true, true, watermark_columns, + columns_monotonicity, ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 7b0703aa8436d..723705b6655f1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -64,6 +64,7 @@ impl StreamSource { core.catalog.as_ref().map_or(true, |s| s.append_only), false, FixedBitSet::with_capacity(core.column_catalog.len()), + Default::default(), ); Self { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index b947cee641d4b..4191a70f715a3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -75,6 +75,7 @@ impl StreamSourceScan { core.catalog.as_ref().map_or(true, |s| s.append_only), false, FixedBitSet::with_capacity(core.column_catalog.len()), + Default::default(), ); Self { base, core } diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 8ce0997b7fe1f..8de79b4f7f3ae 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -57,6 +57,7 @@ impl StreamStatelessSimpleAgg { input.append_only(), input.emit_on_window_close(), watermark_columns, + Default::default(), ); StreamStatelessSimpleAgg { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 1e93514f6c0f7..91a8674eff85d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -74,6 +74,7 @@ impl StreamTableScan { core.append_only(), false, core.watermark_columns(), + Default::default(), ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index f94dbba36cb79..7f50d4b27e625 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -69,12 +69,19 @@ impl StreamTemporalJoin { .rewrite_bitset(core.left.watermark_columns()), ); + let columns_monotonicity = core.i2o_col_mapping().rewrite_monotonicity_map( + &core + .l2i_col_mapping() + .rewrite_monotonicity_map(core.left.columns_monotonicity()), + ); + let base = PlanBase::new_stream_with_core( &core, dist, append_only, false, // TODO(rc): derive EOWC property from input watermark_columns, + columns_monotonicity, ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 9581c07ff297b..cc32e9d9c4c86 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -42,7 +42,14 @@ impl StreamTopN { }; let watermark_columns = FixedBitSet::with_capacity(input.schema().len()); - let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns); + let base = PlanBase::new_stream_with_core( + &core, + dist, + false, + false, + watermark_columns, + Default::default(), + ); StreamTopN { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index 1c269ec0c5ad2..934710d3913b8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -46,6 +46,7 @@ impl StreamUnion { pub fn new_with_dist(core: generic::Union, dist: Distribution) -> Self { let inputs = &core.inputs; + // FIXME(rc): is this even correct?? let watermark_columns = inputs.iter().fold( { let mut bitset = FixedBitSet::with_capacity(core.schema().len()); @@ -61,6 +62,7 @@ impl StreamUnion { inputs.iter().all(|x| x.append_only()), inputs.iter().all(|x| x.emit_on_window_close()), watermark_columns, + Default::default(), ); StreamUnion { base, core } diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index 05cb1659b96ef..42d38254803ca 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -48,6 +48,7 @@ impl StreamValues { true, false, FixedBitSet::with_capacity(logical.schema().len()), + Default::default(), ); Self { base, logical } } diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index ffb08776b3fe5..7ea0dbaf6dd95 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -49,6 +49,8 @@ impl StreamWatermarkFilter { input.append_only(), false, // TODO(rc): decide EOWC property watermark_columns, + // watermark filter preserves input order and hence monotonicity + input.columns_monotonicity().clone(), ); Self::with_base(base, input, watermark_descs) } diff --git a/src/frontend/src/optimizer/property/monotonicity.rs b/src/frontend/src/optimizer/property/monotonicity.rs index 0dfc1c47ef43d..f28cb493fa7ab 100644 --- a/src/frontend/src/optimizer/property/monotonicity.rs +++ b/src/frontend/src/optimizer/property/monotonicity.rs @@ -284,11 +284,15 @@ impl MonotonicityMap { } pub fn insert(&mut self, idx: usize, monotonicity: Monotonicity) { - self.0.insert(idx, monotonicity); + if monotonicity != Monotonicity::Unknown { + self.0.insert(idx, monotonicity); + } } - pub fn get(&self, idx: usize) -> Monotonicity { - self.0.get(&idx).copied().unwrap_or(Monotonicity::Unknown) + pub fn iter(&self) -> impl Iterator + '_ { + self.0 + .iter() + .map(|(idx, monotonicity)| (*idx, *monotonicity)) } } @@ -305,3 +309,18 @@ impl IndexMut for MonotonicityMap { self.0.entry(idx).or_insert(Monotonicity::Unknown) } } + +impl IntoIterator for MonotonicityMap { + type IntoIter = std::collections::btree_map::IntoIter; + type Item = (usize, Monotonicity); + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl FromIterator<(usize, Monotonicity)> for MonotonicityMap { + fn from_iter>(iter: T) -> Self { + MonotonicityMap(iter.into_iter().collect()) + } +} diff --git a/src/frontend/src/utils/column_index_mapping.rs b/src/frontend/src/utils/column_index_mapping.rs index 08343eb9f09ce..4a7a729eb9b72 100644 --- a/src/frontend/src/utils/column_index_mapping.rs +++ b/src/frontend/src/utils/column_index_mapping.rs @@ -20,7 +20,8 @@ use risingwave_common::util::sort_util::ColumnOrder; use crate::expr::{Expr, ExprImpl, ExprRewriter, InputRef}; use crate::optimizer::property::{ - Distribution, FunctionalDependency, FunctionalDependencySet, Order, RequiredDist, + Distribution, FunctionalDependency, FunctionalDependencySet, MonotonicityMap, Order, + RequiredDist, }; /// Extension trait for [`ColIndexMapping`] to rewrite frontend structures. @@ -186,6 +187,16 @@ impl ColIndexMapping { } ret } + + pub fn rewrite_monotonicity_map(&self, map: &MonotonicityMap) -> MonotonicityMap { + let mut new_map = MonotonicityMap::new(); + for (i, monotonicity) in map.iter() { + if let Some(mapped_i) = self.try_map(i) { + new_map.insert(mapped_i, monotonicity); + } + } + new_map + } } impl ExprRewriter for ColIndexMapping { From 33324636e221804d914a73b02d657d88fafc99ce Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 16 Jul 2024 15:40:43 +0800 Subject: [PATCH 3/6] update Signed-off-by: Richard Chien --- .../src/optimizer/plan_node/logical_source.rs | 4 ++-- .../optimizer/plan_node/stream_changelog.rs | 3 ++- .../optimizer/plan_node/stream_delta_join.rs | 4 ++-- .../src/optimizer/plan_node/stream_dml.rs | 3 ++- .../plan_node/stream_dynamic_filter.rs | 4 ++-- .../optimizer/plan_node/stream_exchange.rs | 4 ++-- .../src/optimizer/plan_node/stream_expand.rs | 4 ++-- .../optimizer/plan_node/stream_fs_fetch.rs | 4 ++-- .../optimizer/plan_node/stream_group_topn.rs | 4 ++-- .../optimizer/plan_node/stream_hash_agg.rs | 3 ++- .../optimizer/plan_node/stream_hash_join.rs | 4 ++-- .../optimizer/plan_node/stream_hop_window.rs | 3 ++- .../optimizer/plan_node/stream_over_window.rs | 3 ++- .../optimizer/plan_node/stream_project_set.rs | 2 +- .../optimizer/plan_node/stream_simple_agg.rs | 4 ++-- .../src/optimizer/plan_node/stream_source.rs | 4 ++-- .../plan_node/stream_stateless_simple_agg.rs | 4 ++-- .../optimizer/plan_node/stream_table_scan.rs | 4 ++-- .../src/optimizer/plan_node/stream_topn.rs | 4 ++-- .../src/optimizer/plan_node/stream_union.rs | 5 ++--- .../src/optimizer/plan_node/stream_values.rs | 4 ++-- .../src/optimizer/property/monotonicity.rs | 20 ++++++++++++++++++- 22 files changed, 60 insertions(+), 38 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index d22f26184bc08..50024b4274e77 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -44,7 +44,7 @@ use crate::optimizer::plan_node::{ ToStreamContext, }; use crate::optimizer::property::Distribution::HashShard; -use crate::optimizer::property::{Distribution, Order, RequiredDist}; +use crate::optimizer::property::{Distribution, MonotonicityMap, Order, RequiredDist}; use crate::utils::{ColIndexMapping, Condition, IndexRewriter}; /// `LogicalSource` returns contents of a table or other equivalent object @@ -229,7 +229,7 @@ impl LogicalSource { true, // `list` will keep listing all objects, it must be append-only false, FixedBitSet::with_capacity(logical_source.column_catalog.len()), - Default::default(), + MonotonicityMap::new(), ), core: logical_source, } diff --git a/src/frontend/src/optimizer/plan_node/stream_changelog.rs b/src/frontend/src/optimizer/plan_node/stream_changelog.rs index d305e9a315814..34bfdec281815 100644 --- a/src/frontend/src/optimizer/plan_node/stream_changelog.rs +++ b/src/frontend/src/optimizer/plan_node/stream_changelog.rs @@ -20,6 +20,7 @@ use super::stream::prelude::PhysicalPlanRef; use super::stream::StreamPlanRef; use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamNode}; +use crate::optimizer::property::MonotonicityMap; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; @@ -48,7 +49,7 @@ impl StreamChangeLog { true, input.emit_on_window_close(), watermark_columns, - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); StreamChangeLog { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index ab79b368b2373..f53d4331ae617 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -27,7 +27,7 @@ use crate::expr::{Expr, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay, TryToStreamPb}; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -76,7 +76,7 @@ impl StreamDeltaJoin { append_only, false, // TODO(rc): derive EOWC property from input watermark_columns, - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index 244853c66eb1d..7b671efa24c23 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -21,6 +21,7 @@ use super::stream::prelude::*; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::MonotonicityMap; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -41,7 +42,7 @@ impl StreamDml { append_only, false, // TODO(rc): decide EOWC property FixedBitSet::with_capacity(input.schema().len()), // no watermark if dml is allowed - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index 69386bdb8079c..f32bd63753d2b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -27,7 +27,7 @@ use super::{generic, ExprRewritable, PlanTreeNodeUnary}; use crate::expr::Expr; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::{PlanBase, PlanTreeNodeBinary, StreamNode}; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::optimizer::PlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -77,7 +77,7 @@ impl StreamDynamicFilter { out_append_only, false, // TODO(rc): decide EOWC property Self::derive_watermark_columns(&core), - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); let cleaned_by_watermark = Self::cleaned_by_watermark(&core); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 9fc2d3d748097..802f2e3d227c1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -20,7 +20,7 @@ use super::stream::prelude::*; use super::utils::{childless_record, plan_node_name, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::{Distribution, DistributionDisplay}; +use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamExchange` imposes a particular distribution on its input @@ -44,7 +44,7 @@ impl StreamExchange { input.append_only(), input.emit_on_window_close(), input.watermark_columns().clone(), - input.columns_monotonicity().clone(), + MonotonicityMap::new(), // we lost monotonicity information when shuffling ); StreamExchange { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index 16f89459a1c69..fa0268a46fcf5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -21,7 +21,7 @@ use super::stream::prelude::*; use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -52,7 +52,7 @@ impl StreamExpand { input.append_only(), input.emit_on_window_close(), watermark_columns, - Default::default(), + MonotonicityMap::new(), ); StreamExpand { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 19ee7c3037d09..08516631dc75b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -26,7 +26,7 @@ use crate::catalog::source_catalog::SourceCatalog; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode}; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -55,7 +55,7 @@ impl StreamFsFetch { source.catalog.as_ref().map_or(true, |s| s.append_only), false, FixedBitSet::with_capacity(source.column_catalog.len()), - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index cab663ada96a4..b9230270e634e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -22,7 +22,7 @@ use super::utils::{plan_node_name, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::generic::GenericPlanNode; -use crate::optimizer::property::Order; +use crate::optimizer::property::{MonotonicityMap, Order}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; @@ -79,7 +79,7 @@ impl StreamGroupTopN { // TODO: https://github.com/risingwavelabs/risingwave/issues/8348 false, watermark_columns, - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); StreamGroupTopN { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 55b25860c0f53..2dfad775ecc6e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -24,6 +24,7 @@ use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::error::{ErrorCode, Result}; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::MonotonicityMap; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet}; @@ -93,7 +94,7 @@ impl StreamHashAgg { emit_on_window_close, // in EOWC mode, we produce append only output emit_on_window_close, watermark_columns, - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); StreamHashAgg { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 5cf381eef1e72..cbce1e1caf45a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -30,7 +30,7 @@ use crate::expr::{Expr, ExprDisplay, ExprRewriter, ExprVisitor, InequalityInputP use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -196,7 +196,7 @@ impl StreamHashJoin { append_only, false, // TODO(rc): derive EOWC property from input watermark_columns, - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); Self { diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index f7284f0d8e8f3..4a50387c50be0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -21,6 +21,7 @@ use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::MonotonicityMap; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; @@ -62,7 +63,7 @@ impl StreamHopWindow { input.append_only(), input.emit_on_window_close(), internal2output.rewrite_bitset(&watermark_columns), - Default::default(), // hop window start/end jumps, so monotonicity is not propagated + MonotonicityMap::new(), /* hop window start/end jumps, so monotonicity is not propagated */ ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index 42d864d78bdaf..6b0beaa9f99cc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -23,6 +23,7 @@ use super::stream::prelude::*; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::MonotonicityMap; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -45,7 +46,7 @@ impl StreamOverWindow { false, // general over window cannot be append-only false, watermark_columns, - Default::default(), + MonotonicityMap::new(), // TODO: derive monotonicity ); StreamOverWindow { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 7a5d629255e01..5735c4b9d5644 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -79,7 +79,7 @@ impl StreamProjectSet { input.append_only(), input.emit_on_window_close(), out_watermark_columns, - MonotonicityMap::new(), + MonotonicityMap::new(), // TODO: derive monotonicity ); StreamProjectSet { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index 604f7ab9b7c8c..6ecaa4c308f5e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -23,7 +23,7 @@ use super::utils::{childless_record, plan_node_name, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -56,7 +56,7 @@ impl StreamSimpleAgg { false, false, watermark_columns, - Default::default(), + MonotonicityMap::new(), ); StreamSimpleAgg { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 723705b6655f1..980df7911c7f3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -29,7 +29,7 @@ use super::{generic, ExprRewritable, PlanBase, StreamNode}; use crate::catalog::source_catalog::SourceCatalog; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::column_names_pretty; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; /// [`StreamSource`] represents a table/connector source at the very beginning of the graph. @@ -64,7 +64,7 @@ impl StreamSource { core.catalog.as_ref().map_or(true, |s| s.append_only), false, FixedBitSet::with_capacity(core.column_catalog.len()), - Default::default(), + MonotonicityMap::new(), ); Self { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 8de79b4f7f3ae..93c56efad3d5f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -22,7 +22,7 @@ use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::RequiredDist; +use crate::optimizer::property::{MonotonicityMap, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; /// Streaming stateless simple agg. @@ -57,7 +57,7 @@ impl StreamStatelessSimpleAgg { input.append_only(), input.emit_on_window_close(), watermark_columns, - Default::default(), + MonotonicityMap::new(), ); StreamStatelessSimpleAgg { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 91a8674eff85d..2255194dbee64 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -31,7 +31,7 @@ use crate::catalog::ColumnId; use crate::expr::{ExprRewriter, ExprVisitor, FunctionCall}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; -use crate::optimizer::property::{Distribution, DistributionDisplay}; +use crate::optimizer::property::{Distribution, DistributionDisplay, MonotonicityMap}; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -74,7 +74,7 @@ impl StreamTableScan { core.append_only(), false, core.watermark_columns(), - Default::default(), + MonotonicityMap::new(), ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index cc32e9d9c4c86..80ca9141033c5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -21,7 +21,7 @@ use super::stream::prelude::*; use super::utils::{plan_node_name, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::{Distribution, Order}; +use crate::optimizer::property::{Distribution, MonotonicityMap, Order}; use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap @@ -48,7 +48,7 @@ impl StreamTopN { false, false, watermark_columns, - Default::default(), + MonotonicityMap::new(), ); StreamTopN { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index 934710d3913b8..2e424fc0604b6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -25,7 +25,7 @@ use super::{generic, ExprRewritable, PlanRef}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::generic::GenericPlanNode; use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode}; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamUnion` implements [`super::LogicalUnion`] @@ -46,7 +46,6 @@ impl StreamUnion { pub fn new_with_dist(core: generic::Union, dist: Distribution) -> Self { let inputs = &core.inputs; - // FIXME(rc): is this even correct?? let watermark_columns = inputs.iter().fold( { let mut bitset = FixedBitSet::with_capacity(core.schema().len()); @@ -62,7 +61,7 @@ impl StreamUnion { inputs.iter().all(|x| x.append_only()), inputs.iter().all(|x| x.emit_on_window_close()), watermark_columns, - Default::default(), + MonotonicityMap::new(), ); StreamUnion { base, core } diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index 42d38254803ca..0a71c208c32ee 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -23,7 +23,7 @@ use super::utils::{childless_record, Distill}; use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::stream_fragmenter::BuildFragmentGraphState; /// `StreamValues` implements `LogicalValues.to_stream()` @@ -48,7 +48,7 @@ impl StreamValues { true, false, FixedBitSet::with_capacity(logical.schema().len()), - Default::default(), + MonotonicityMap::new(), ); Self { base, logical } } diff --git a/src/frontend/src/optimizer/property/monotonicity.rs b/src/frontend/src/optimizer/property/monotonicity.rs index f28cb493fa7ab..66ebdc02cf825 100644 --- a/src/frontend/src/optimizer/property/monotonicity.rs +++ b/src/frontend/src/optimizer/property/monotonicity.rs @@ -46,7 +46,7 @@ impl MonotonicityDerivation { } /// Represents the monotonicity of a column. `NULL`s are considered largest when analyzing monotonicity. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Monotonicity { Constant, NonDecreasing, @@ -55,6 +55,24 @@ pub enum Monotonicity { } impl Monotonicity { + pub fn is_constant(self) -> bool { + matches!(self, Monotonicity::Constant) + } + + pub fn is_non_decreasing(self) -> bool { + // we don't use `EnumAsInner` here because we need to include `Constant` + matches!(self, Monotonicity::NonDecreasing | Monotonicity::Constant) + } + + pub fn is_non_increasing(self) -> bool { + // similar to `is_non_decreasing` + matches!(self, Monotonicity::NonIncreasing | Monotonicity::Constant) + } + + pub fn is_unknown(self) -> bool { + matches!(self, Monotonicity::Unknown) + } + pub fn inverse(self) -> Self { use Monotonicity::*; match self { From 15a38e7ec1ce9ff3d21bd56db51418878d96c541 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 16 Jul 2024 17:47:22 +0800 Subject: [PATCH 4/6] update Signed-off-by: Richard Chien --- src/frontend/src/optimizer/plan_node/stream_source_scan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 4191a70f715a3..47689c1038790 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -32,7 +32,7 @@ use crate::catalog::source_catalog::SourceCatalog; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{generic, ExprRewritable, StreamNode}; -use crate::optimizer::property::Distribution; +use crate::optimizer::property::{Distribution, MonotonicityMap}; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::{Explain, TableCatalog}; @@ -75,7 +75,7 @@ impl StreamSourceScan { core.catalog.as_ref().map_or(true, |s| s.append_only), false, FixedBitSet::with_capacity(core.column_catalog.len()), - Default::default(), + MonotonicityMap::new(), ); Self { base, core } From 27444b5d23fa2fcfb7d3bb50c1dda0e200bb8659 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 17 Jul 2024 14:46:31 +0800 Subject: [PATCH 5/6] add comment for monotonicity Signed-off-by: Richard Chien --- .../src/optimizer/property/monotonicity.rs | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/optimizer/property/monotonicity.rs b/src/frontend/src/optimizer/property/monotonicity.rs index 66ebdc02cf825..d3091ca029db3 100644 --- a/src/frontend/src/optimizer/property/monotonicity.rs +++ b/src/frontend/src/optimizer/property/monotonicity.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; -use std::ops::{Index, IndexMut}; +use std::ops::Index; use enum_as_inner::EnumAsInner; use risingwave_common::types::DataType; @@ -45,7 +45,28 @@ impl MonotonicityDerivation { } } -/// Represents the monotonicity of a column. `NULL`s are considered largest when analyzing monotonicity. +/// Represents the monotonicity of a column. +/// +/// Monotonicity is a property of the output column of stream node that describes the the order +/// of the values in the column. One [`Monotonicity`] value is associated with one column, so +/// each stream node should have a [`MonotonicityMap`] to describe the monotonicity of all its +/// output columns. +/// +/// For operator that yields append-only stream, the monotonicity being `NonDecreasing` means +/// that it will never yield a row smaller than any previously yielded row. +/// +/// For operator that yields non-append-only stream, the monotonicity being `NonDecreasing` means +/// that it will never yield a change that has smaller value than any previously yielded change, +/// ignoring the `Op`. So if such operator yields a `NonDecreasing` column, `Delete` and `UpdateDelete`s +/// can only happen on the last emitted row (or last rows with the same value on the column). This +/// is especially useful for `StreamNow` operator with `UpdateCurrent` mode, in which case only +/// one output row is actively maintained and the value is non-decreasing. +/// +/// Monotonicity property is be considered in default order type, i.e., ASC NULLS LAST. This means +/// that `NULL`s are considered largest when analyzing monotonicity. +/// +/// For distributed operators, the monotonicity describes the property of the output column of +/// each shard of the operator. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Monotonicity { Constant, @@ -293,6 +314,7 @@ impl MonotonicityAnalyzer { } } +/// A map from column index to its monotonicity. #[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] pub struct MonotonicityMap(BTreeMap); @@ -322,12 +344,6 @@ impl Index for MonotonicityMap { } } -impl IndexMut for MonotonicityMap { - fn index_mut(&mut self, idx: usize) -> &mut Self::Output { - self.0.entry(idx).or_insert(Monotonicity::Unknown) - } -} - impl IntoIterator for MonotonicityMap { type IntoIter = std::collections::btree_map::IntoIter; type Item = (usize, Monotonicity); From 0141a2b8c46283ee20b31dc31700de63e118e123 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 17 Jul 2024 14:46:42 +0800 Subject: [PATCH 6/6] fix monotonicity for StreamEowcOverWindow Signed-off-by: Richard Chien --- .../src/optimizer/plan_node/stream_eowc_over_window.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index af37ddeb79f7f..4d134df37799b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -23,6 +23,7 @@ use super::stream::prelude::*; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::property::MonotonicityMap; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -58,7 +59,8 @@ impl StreamEowcOverWindow { true, true, watermark_columns, - input.columns_monotonicity().clone(), + // we cannot derive monotonicity for any column for the same reason as watermark columns + MonotonicityMap::new(), ); StreamEowcOverWindow { base, core } }