From 043bb8a28abd60dd44f45c698179620e4750cf92 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 16 Jul 2024 15:59:28 +0800 Subject: [PATCH] some clean up for stream nodes' constructors Signed-off-by: Richard Chien --- .../optimizer/plan_node/stream_changelog.rs | 5 ++-- .../optimizer/plan_node/stream_delta_join.rs | 1 + .../src/optimizer/plan_node/stream_expand.rs | 8 ++--- .../optimizer/plan_node/stream_group_topn.rs | 2 ++ .../optimizer/plan_node/stream_hop_window.rs | 20 ++++++------- .../src/optimizer/plan_node/stream_project.rs | 26 +++++++++-------- .../optimizer/plan_node/stream_project_set.rs | 29 +++++++++++-------- src/frontend/src/optimizer/plan_node/utils.rs | 28 ++++++++---------- 8 files changed, 60 insertions(+), 59 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_changelog.rs b/src/frontend/src/optimizer/plan_node/stream_changelog.rs index 0ee696c58067a..b02c5eeb0c355 100644 --- a/src/frontend/src/optimizer/plan_node/stream_changelog.rs +++ b/src/frontend/src/optimizer/plan_node/stream_changelog.rs @@ -33,12 +33,13 @@ impl StreamChangeLog { pub fn new(core: generic::ChangeLog) -> Self { let input = core.input.clone(); let dist = input.distribution().clone(); + let input_len = input.schema().len(); // Filter executor won't change the append-only behavior of the stream. let mut watermark_columns = input.watermark_columns().clone(); if core.need_op { - watermark_columns.grow(input.watermark_columns().len() + 2); + watermark_columns.grow(input_len + 2); } else { - watermark_columns.grow(input.watermark_columns().len() + 1); + watermark_columns.grow(input_len + 1); } let base = PlanBase::new_stream_with_core( &core, diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index 49257676bc004..7a99c8f7955b8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -68,6 +68,7 @@ impl StreamDeltaJoin { let watermark_columns = from_left.bitand(&from_right); core.i2o_col_mapping().rewrite_bitset(&watermark_columns) }; + // TODO: derive from input let base = PlanBase::new_stream_with_core( &core, diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index 5eefede3469c0..4f38e95cdfea4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -33,6 +33,7 @@ pub struct StreamExpand { impl StreamExpand { pub fn new(core: generic::Expand) -> Self { let input = core.input.clone(); + let input_len = input.schema().len(); let dist = match input.distribution() { Distribution::Single => Distribution::Single, @@ -43,12 +44,7 @@ impl StreamExpand { }; let mut watermark_columns = FixedBitSet::with_capacity(core.output_len()); - watermark_columns.extend( - input - .watermark_columns() - .ones() - .map(|idx| idx + input.schema().len()), - ); + watermark_columns.extend(input.watermark_columns().ones().map(|idx| idx + input_len)); let base = PlanBase::new_stream_with_core( &core, diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 8500e24b0fd9f..0cd8edc996c89 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -42,6 +42,8 @@ impl StreamGroupTopN { let input = &core.input; let schema = input.schema().clone(); + // FIXME(rc): Actually only watermark messages on the first group-by column are propagated + // acccoring to the current GroupTopN implementation. This should be fixed. let watermark_columns = if input.append_only() { input.watermark_columns().clone() } else { diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 0cdddf77ed0e5..a94dfbe788f88 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -13,7 +13,6 @@ // limitations under the License. use pretty_xmlish::XmlNode; -use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::HopWindowNode; @@ -41,29 +40,28 @@ impl StreamHopWindow { window_end_exprs: Vec, ) -> Self { let input = core.input.clone(); - let i2o = core.i2o_col_mapping(); - let dist = i2o.rewrite_provided_distribution(input.distribution()); + let dist = core + .i2o_col_mapping() + .rewrite_provided_distribution(input.distribution()); - let mut watermark_columns = input.watermark_columns().clone(); + let input2internal = core.input2internal_col_mapping(); + let internal2output = core.internal2output_col_mapping(); + + let mut watermark_columns = input2internal.rewrite_bitset(input.watermark_columns()); watermark_columns.grow(core.internal_column_num()); - if watermark_columns.contains(core.time_col.index) { + if input.watermark_columns().contains(core.time_col.index) { // Watermark on `time_col` indicates watermark on both `window_start` and `window_end`. watermark_columns.insert(core.internal_window_start_col_idx()); watermark_columns.insert(core.internal_window_end_col_idx()); } - let watermark_columns = ColIndexMapping::with_remaining_columns( - &core.output_indices, - core.internal_column_num(), - ) - .rewrite_bitset(&watermark_columns); let base = PlanBase::new_stream_with_core( &core, dist, input.append_only(), input.emit_on_window_close(), - watermark_columns, + internal2output.rewrite_bitset(&watermark_columns), ); Self { base, diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index eae1bd5a34d5e..e5828a3267064 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -81,24 +81,26 @@ impl StreamProject { let mut watermark_derivations = vec![]; let mut nondecreasing_exprs = vec![]; - let mut watermark_columns = FixedBitSet::with_capacity(core.exprs.len()); + let mut out_watermark_columns = FixedBitSet::with_capacity(core.exprs.len()); for (expr_idx, expr) in core.exprs.iter().enumerate() { use monotonicity_variants::*; match analyze_monotonicity(expr) { + Inherent(Constant) => { + // XXX(rc): we can produce one watermark on each recovery for this case. + } + Inherent(monotonicity) => { + if monotonicity.is_non_decreasing() { + nondecreasing_exprs.push(expr_idx); // to produce watermarks + out_watermark_columns.insert(expr_idx); + } + } FollowingInput(input_idx) => { if input.watermark_columns().contains(input_idx) { - watermark_derivations.push((input_idx, expr_idx)); - watermark_columns.insert(expr_idx); + watermark_derivations.push((input_idx, expr_idx)); // to propagate watermarks + out_watermark_columns.insert(expr_idx); } } - Inherent(NonDecreasing) => { - nondecreasing_exprs.push(expr_idx); - watermark_columns.insert(expr_idx); - } - Inherent(Constant) => { - // XXX(rc): we can produce one watermark on each recovery for this case. - } - Inherent(_) | _FollowingInputInversely(_) => {} + _FollowingInputInversely(_) => {} } } // Project executor won't change the append-only behavior of the stream, so it depends on @@ -108,7 +110,7 @@ impl StreamProject { distribution, input.append_only(), input.emit_on_window_close(), - watermark_columns, + out_watermark_columns, ); StreamProject { diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index b65d4e8da0b53..4630e1c62c831 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -47,24 +47,29 @@ impl StreamProjectSet { let mut watermark_derivations = vec![]; let mut nondecreasing_exprs = vec![]; - let mut watermark_columns = FixedBitSet::with_capacity(core.output_len()); + let mut out_watermark_columns = FixedBitSet::with_capacity(core.output_len()); for (expr_idx, expr) in core.select_list.iter().enumerate() { + let out_expr_idx = expr_idx + 1; + use monotonicity_variants::*; match analyze_monotonicity(expr) { + Inherent(Constant) => { + // XXX(rc): we can produce one watermark on each recovery for this case. + } + Inherent(monotonicity) => { + if monotonicity.is_non_decreasing() { + // FIXME(rc): we need to check expr is not table function + nondecreasing_exprs.push(expr_idx); // to produce watermarks + out_watermark_columns.insert(out_expr_idx); + } + } FollowingInput(input_idx) => { if input.watermark_columns().contains(input_idx) { - watermark_derivations.push((input_idx, expr_idx)); - watermark_columns.insert(expr_idx + 1); + watermark_derivations.push((input_idx, expr_idx)); // to propagate watermarks + out_watermark_columns.insert(out_expr_idx); } } - Inherent(NonDecreasing) => { - nondecreasing_exprs.push(expr_idx); - watermark_columns.insert(expr_idx + 1); - } - Inherent(Constant) => { - // XXX(rc): we can produce one watermark on each recovery for this case. - } - Inherent(_) | _FollowingInputInversely(_) => {} + _FollowingInputInversely(_) => {} } } @@ -75,7 +80,7 @@ impl StreamProjectSet { distribution, input.append_only(), input.emit_on_window_close(), - watermark_columns, + out_watermark_columns, ); StreamProjectSet { base, diff --git a/src/frontend/src/optimizer/plan_node/utils.rs b/src/frontend/src/optimizer/plan_node/utils.rs index 288b0957db19a..155381ab4310e 100644 --- a/src/frontend/src/optimizer/plan_node/utils.rs +++ b/src/frontend/src/optimizer/plan_node/utils.rs @@ -106,11 +106,6 @@ impl TableCatalogBuilder { self.value_indices = Some(value_indices); } - #[allow(dead_code)] - pub fn set_watermark_columns(&mut self, watermark_columns: FixedBitSet) { - self.watermark_columns = Some(watermark_columns); - } - pub fn set_dist_key_in_pk(&mut self, dist_key_in_pk: Vec) { self.dist_key_in_pk = Some(dist_key_in_pk); } @@ -236,21 +231,22 @@ pub(crate) fn watermark_pretty<'a>( watermark_columns: &FixedBitSet, schema: &Schema, ) -> Option> { - if watermark_columns.count_ones(..) > 0 { - Some(watermark_fields_pretty(watermark_columns.ones(), schema)) - } else { - None - } + iter_fields_pretty(watermark_columns.ones(), schema) } -pub(crate) fn watermark_fields_pretty<'a>( - watermark_columns: impl Iterator, + +pub(crate) fn iter_fields_pretty<'a>( + columns: impl Iterator, schema: &Schema, -) -> Pretty<'a> { - let arr = watermark_columns +) -> Option> { + let arr = columns .map(|idx| FieldDisplay(schema.fields.get(idx).unwrap())) .map(|d| Pretty::display(&d)) - .collect(); - Pretty::Array(arr) + .collect::>(); + if arr.is_empty() { + None + } else { + Some(Pretty::Array(arr)) + } } #[derive(Clone, Copy)]