From fa66cbda67c9996821bb4f1d7d03026fada0a712 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 20 Oct 2023 06:48:14 -0500 Subject: [PATCH] refactor(optimizer): clean up dead code for optimizer v2 (#12973) Signed-off-by: Bugen Zhao --- src/frontend/src/optimizer/mod.rs | 1 - src/frontend/src/optimizer/plan_node/batch.rs | 5 + .../src/optimizer/plan_node/generic/join.rs | 78 +- .../src/optimizer/plan_node/logical_agg.rs | 1 - .../src/optimizer/plan_node/logical_join.rs | 1 - src/frontend/src/optimizer/plan_node/mod.rs | 3 - .../src/optimizer/plan_node/plan_base.rs | 2 + .../src/optimizer/plan_node/plan_tree_node.rs | 41 - .../optimizer/plan_node/plan_tree_node_v2.rs | 124 --- .../src/optimizer/plan_node/stream.rs | 772 +----------------- .../src/optimizer/plan_node/stream_dedup.rs | 2 +- .../optimizer/plan_node/stream_delta_join.rs | 1 - .../src/optimizer/plan_node/stream_derive.rs | 630 -------------- .../plan_node/stream_eowc_over_window.rs | 1 - .../optimizer/plan_node/stream_exchange.rs | 1 - .../src/optimizer/plan_node/stream_expand.rs | 1 - .../src/optimizer/plan_node/stream_filter.rs | 1 - .../optimizer/plan_node/stream_hash_join.rs | 6 +- .../optimizer/plan_node/stream_hop_window.rs | 1 - .../src/optimizer/plan_node/stream_project.rs | 1 - .../optimizer/plan_node/stream_project_set.rs | 1 - .../optimizer/plan_node/stream_row_id_gen.rs | 1 - .../plan_node/stream_stateless_simple_agg.rs | 1 - .../plan_node/stream_temporal_join.rs | 1 - .../src/optimizer/plan_node/stream_union.rs | 1 - .../src/optimizer/property/distribution.rs | 18 - 26 files changed, 94 insertions(+), 1602 deletions(-) delete mode 100644 src/frontend/src/optimizer/plan_node/plan_tree_node_v2.rs delete mode 100644 src/frontend/src/optimizer/plan_node/stream_derive.rs diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index aaab8ebde3dc..b4238f57b1f5 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -57,7 +57,6 @@ use self::plan_visitor::{has_batch_exchange, CardinalityVisitor}; use self::property::{Cardinality, RequiredDist}; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, }; diff --git a/src/frontend/src/optimizer/plan_node/batch.rs b/src/frontend/src/optimizer/plan_node/batch.rs index 2ac1e278f7d8..d62a85095d21 100644 --- a/src/frontend/src/optimizer/plan_node/batch.rs +++ b/src/frontend/src/optimizer/plan_node/batch.rs @@ -15,6 +15,11 @@ use super::generic::GenericPlanRef; use crate::optimizer::property::Order; +/// A subtrait of [`GenericPlanRef`] for batch plans. +/// +/// Due to the lack of refactoring, all plan nodes currently implement this trait +/// through [`super::PlanBase`]. One may still use this trait as a bound for +/// expecting a batch plan, in contrast to [`GenericPlanRef`]. pub trait BatchPlanRef: GenericPlanRef { fn order(&self) -> &Order; } diff --git a/src/frontend/src/optimizer/plan_node/generic/join.rs b/src/frontend/src/optimizer/plan_node/generic/join.rs index 95bee8413f56..87c03cc14c8c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/join.rs +++ b/src/frontend/src/optimizer/plan_node/generic/join.rs @@ -12,15 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use itertools::EitherOrBoth; -use risingwave_common::catalog::Schema; +use itertools::{EitherOrBoth, Itertools}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::types::DataType; +use risingwave_common::util::sort_util::OrderType; use risingwave_pb::plan_common::JoinType; use super::{EqJoinPredicate, GenericPlanNode, GenericPlanRef}; use crate::expr::ExprRewriter; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::plan_node::stream; +use crate::optimizer::plan_node::utils::TableCatalogBuilder; use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition}; +use crate::TableCatalog; /// [`Join`] combines two relations according to some condition. /// @@ -65,6 +70,75 @@ impl Join { } } +impl Join { + /// Return stream hash join internal table catalog and degree table catalog. + pub fn infer_internal_and_degree_table_catalog( + input: &PlanRef, + join_key_indices: Vec, + dk_indices_in_jk: Vec, + ) -> (TableCatalog, TableCatalog, Vec) { + let schema = input.schema(); + + let internal_table_dist_keys = dk_indices_in_jk + .iter() + .map(|idx| join_key_indices[*idx]) + .collect_vec(); + + let degree_table_dist_keys = dk_indices_in_jk.clone(); + + // The pk of hash join internal and degree table should be join_key + input_pk. + let join_key_len = join_key_indices.len(); + let mut pk_indices = join_key_indices; + + // dedup the pk in dist key.. + let mut deduped_input_pk_indices = vec![]; + for input_pk_idx in input.stream_key().unwrap() { + if !pk_indices.contains(input_pk_idx) + && !deduped_input_pk_indices.contains(input_pk_idx) + { + deduped_input_pk_indices.push(*input_pk_idx); + } + } + + pk_indices.extend(deduped_input_pk_indices.clone()); + + // Build internal table + let mut internal_table_catalog_builder = + TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset()); + let internal_columns_fields = schema.fields().to_vec(); + + internal_columns_fields.iter().for_each(|field| { + internal_table_catalog_builder.add_column(field); + }); + pk_indices.iter().for_each(|idx| { + internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) + }); + + // Build degree table. + let mut degree_table_catalog_builder = + TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset()); + + let degree_column_field = Field::with_name(DataType::Int64, "_degree"); + + pk_indices.iter().enumerate().for_each(|(order_idx, idx)| { + degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]); + degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending()); + }); + degree_table_catalog_builder.add_column(°ree_column_field); + degree_table_catalog_builder + .set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]); + + internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone()); + degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk); + + ( + internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len), + degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len), + deduped_input_pk_indices, + ) + } +} + impl GenericPlanNode for Join { fn schema(&self) -> Schema { let left_schema = self.left.schema(); diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 0ab848eb29f6..cffe453879bd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -31,7 +31,6 @@ use crate::expr::{ WindowFunction, }; use crate::optimizer::plan_node::generic::GenericPlanNode; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::{ gen_filter_and_pushdown, BatchSortAgg, ColumnPruningContext, LogicalDedup, LogicalProject, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index e76a99fd15e2..cfc49a1da335 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -31,7 +31,6 @@ use super::{ }; use crate::expr::{CollectInputRef, Expr, ExprImpl, ExprRewriter, ExprType, InputRef}; use crate::optimizer::plan_node::generic::DynamicFilter; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{ BatchHashJoin, BatchLookupJoin, BatchNestedLoopJoin, ColumnPruningContext, EqJoinPredicate, diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 0a9e2a5dee51..188787c93b8c 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -617,8 +617,6 @@ impl dyn PlanNode { } mod plan_base; -#[macro_use] -mod plan_tree_node_v2; pub use plan_base::*; #[macro_use] mod plan_tree_node; @@ -641,7 +639,6 @@ pub use merge_eq_nodes::*; pub mod batch; pub mod generic; pub mod stream; -pub mod stream_derive; pub use generic::{PlanAggCall, PlanAggCallDisplay}; diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 22e239a7369d..e9a5bf26885b 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -85,11 +85,13 @@ impl stream::StreamPlanRef for PlanBase { self.emit_on_window_close } } + impl batch::BatchPlanRef for PlanBase { fn order(&self) -> &Order { &self.order } } + impl PlanBase { pub fn new_logical( ctx: OptimizerContextRef, diff --git a/src/frontend/src/optimizer/plan_node/plan_tree_node.rs b/src/frontend/src/optimizer/plan_node/plan_tree_node.rs index e1435a6b7b20..0c46d91f7a56 100644 --- a/src/frontend/src/optimizer/plan_node/plan_tree_node.rs +++ b/src/frontend/src/optimizer/plan_node/plan_tree_node.rs @@ -109,19 +109,6 @@ macro_rules! impl_plan_tree_node_for_leaf { self.clone().into() } } - - impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $leaf_node_type { - type PlanRef = crate::optimizer::PlanRef; - - fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> { - smallvec::smallvec![] - } - - fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self { - assert!(inputs.next().is_none(), "expect exactly no input"); - self.clone() - } - } }; } @@ -141,20 +128,6 @@ macro_rules! impl_plan_tree_node_for_unary { self.clone_with_input(inputs[0].clone()).into() } } - - impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $unary_node_type { - type PlanRef = crate::optimizer::PlanRef; - - fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> { - smallvec::smallvec![self.input()] - } - - fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self { - let input = inputs.next().expect("expect exactly 1 input"); - assert!(inputs.next().is_none(), "expect exactly 1 input"); - self.clone_with_input(input).into() - } - } }; } @@ -174,19 +147,5 @@ macro_rules! impl_plan_tree_node_for_binary { .into() } } - impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $binary_node_type { - type PlanRef = crate::optimizer::PlanRef; - - fn inputs(&self) -> smallvec::SmallVec<[crate::optimizer::PlanRef; 2]> { - smallvec::smallvec![self.left(), self.right()] - } - - fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self { - let left = inputs.next().expect("expect exactly 2 input"); - let right = inputs.next().expect("expect exactly 2 input"); - assert!(inputs.next().is_none(), "expect exactly 2 input"); - self.clone_with_left_right(left, right).into() - } - } }; } diff --git a/src/frontend/src/optimizer/plan_node/plan_tree_node_v2.rs b/src/frontend/src/optimizer/plan_node/plan_tree_node_v2.rs deleted file mode 100644 index e598c7dd61ca..000000000000 --- a/src/frontend/src/optimizer/plan_node/plan_tree_node_v2.rs +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use smallvec::SmallVec; - -pub trait PlanTreeNodeV2 { - type PlanRef; - - fn inputs(&self) -> SmallVec<[Self::PlanRef; 2]>; - fn clone_with_inputs(&self, inputs: impl Iterator) -> Self; -} - -macro_rules! impl_plan_tree_node_v2_for_stream_leaf_node { - ($node_type:ident) => { - impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $node_type { - type PlanRef = crate::optimizer::plan_node::stream::PlanRef; - - fn inputs(&self) -> smallvec::SmallVec<[Self::PlanRef; 2]> { - smallvec::smallvec![] - } - - fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self { - assert!(inputs.next().is_none(), "expect exactly no input"); - self.clone() - } - } - }; -} - -macro_rules! impl_plan_tree_node_v2_for_stream_unary_node { - ($node_type:ident, $input_field:ident) => { - impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $node_type { - type PlanRef = crate::optimizer::plan_node::stream::PlanRef; - - fn inputs(&self) -> smallvec::SmallVec<[Self::PlanRef; 2]> { - smallvec::smallvec![self.$input_field.clone()] - } - - fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self { - let mut new = self.clone(); - new.$input_field = inputs.next().expect("expect exactly 1 input"); - assert!(inputs.next().is_none(), "expect exactly 1 input"); - new.clone() - } - } - }; -} - -// macro_rules! impl_plan_tree_node_v2_for_stream_binary_node { -// ($node_type:ident, $first_input_field:ident, $second_input_field:ident) => { -// impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $node_type { -// type PlanRef = crate::optimizer::plan_node::stream::PlanRef; - -// fn inputs(&self) -> smallvec::SmallVec<[Self::PlanRef; 2]> { -// smallvec::smallvec![ -// self.$first_input_field.clone(), -// self.$second_input_field.clone() -// ] -// } - -// fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self -// { let mut new = self.clone(); -// new.$first_input_field = inputs.next().expect("expect exactly 2 input"); -// new.$second_input_field = inputs.next().expect("expect exactly 2 input"); -// assert!(inputs.next().is_none(), "expect exactly 2 input"); -// new.clone() -// } -// } -// }; -// } - -macro_rules! impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating { - ($node_type:ident, $core_field:ident, $input_field:ident) => { - impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $node_type { - type PlanRef = crate::optimizer::plan_node::stream::PlanRef; - - fn inputs(&self) -> smallvec::SmallVec<[Self::PlanRef; 2]> { - smallvec::smallvec![self.$core_field.$input_field.clone()] - } - - fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self { - let mut new = self.clone(); - new.$core_field.$input_field = inputs.next().expect("expect exactly 1 input"); - assert!(inputs.next().is_none(), "expect exactly 1 input"); - new.clone() - } - } - }; -} - -macro_rules! impl_plan_tree_node_v2_for_stream_binary_node_with_core_delegating { - ($node_type:ident, $core_field:ident, $first_input_field:ident, $second_input_field:ident) => { - impl crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2 for $node_type { - type PlanRef = crate::optimizer::plan_node::stream::PlanRef; - - fn inputs(&self) -> smallvec::SmallVec<[Self::PlanRef; 2]> { - smallvec::smallvec![ - self.$core_field.$first_input_field.clone(), - self.$core_field.$second_input_field.clone() - ] - } - - fn clone_with_inputs(&self, mut inputs: impl Iterator) -> Self { - let mut new = self.clone(); - new.$core_field.$first_input_field = inputs.next().expect("expect exactly 2 input"); - new.$core_field.$second_input_field = - inputs.next().expect("expect exactly 2 input"); - assert!(inputs.next().is_none(), "expect exactly 2 input"); - new.clone() - } - } - }; -} diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 36b2c77d22ae..2edf997bf91f 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -12,774 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use educe::Educe; -use generic::PlanAggCall; -use itertools::Itertools; -use pb::stream_node as pb_node; -use risingwave_common::catalog::{ColumnDesc, Field, Schema}; -use risingwave_common::types::DataType; -use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_connector::sink::catalog::desc::SinkDesc; -use risingwave_pb::stream_plan as pb; -use smallvec::SmallVec; - -use super::generic::{GenericPlanNode, GenericPlanRef}; -use super::utils::TableCatalogBuilder; -use super::{generic, EqJoinPredicate, PlanNodeId}; -use crate::expr::{Expr, ExprImpl}; -use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::plan_node::plan_tree_node_v2::PlanTreeNodeV2; -use crate::optimizer::property::{Distribution, FunctionalDependencySet}; -use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::TableCatalog; - -macro_rules! impl_node { -($base:ident, $($t:ident),*) => { - #[derive(Debug, Clone, PartialEq, Eq, Hash)] - pub enum Node { - $($t(Box<$t>),)* - } - pub type PlanOwned = ($base, Node); - pub type PlanRef = std::rc::Rc; - $( - impl From<$t> for PlanRef { - fn from(o: $t) -> PlanRef { - std::rc::Rc::new((o.to_stream_base(), Node::$t(Box::new(o)))) - } - } - )* - impl PlanTreeNodeV2 for PlanRef { - type PlanRef = PlanRef; - - fn inputs(&self) -> SmallVec<[Self::PlanRef; 2]> { - match &self.1 { - $(Node::$t(inner) => inner.inputs(),)* - } - } - fn clone_with_inputs(&self, inputs: impl Iterator) -> Self { - match &self.1 { - $(Node::$t(inner) => inner.clone_with_inputs(inputs).into(),)* - } - } - - } -}; -} - -pub trait StreamPlanNode: GenericPlanNode { - fn distribution(&self) -> Distribution; - fn append_only(&self) -> bool; - fn emit_on_window_close(&self) -> bool; - fn to_stream_base(&self) -> PlanBase { - let ctx = self.ctx(); - PlanBase { - id: ctx.next_plan_node_id(), - ctx, - schema: self.schema(), - stream_key: self.stream_key(), - dist: self.distribution(), - append_only: self.append_only(), - emit_on_window_close: self.emit_on_window_close(), - } - } -} +use super::generic::GenericPlanRef; +use crate::optimizer::property::Distribution; +/// A subtrait of [`GenericPlanRef`] for stream plans. +/// +/// Due to the lack of refactoring, all plan nodes currently implement this trait +/// through [`super::PlanBase`]. One may still use this trait as a bound for +/// expecting a stream plan, in contrast to [`GenericPlanRef`]. pub trait StreamPlanRef: GenericPlanRef { fn distribution(&self) -> &Distribution; fn append_only(&self) -> bool; fn emit_on_window_close(&self) -> bool; } - -impl generic::GenericPlanRef for PlanRef { - fn schema(&self) -> &Schema { - &self.0.schema - } - - fn stream_key(&self) -> Option<&[usize]> { - self.0.stream_key.as_deref() - } - - fn ctx(&self) -> OptimizerContextRef { - self.0.ctx.clone() - } - - fn functional_dependency(&self) -> &FunctionalDependencySet { - self.0.functional_dependency() - } -} - -impl generic::GenericPlanRef for PlanBase { - fn schema(&self) -> &Schema { - &self.schema - } - - fn stream_key(&self) -> Option<&[usize]> { - self.stream_key.as_deref() - } - - fn ctx(&self) -> OptimizerContextRef { - self.ctx.clone() - } - - fn functional_dependency(&self) -> &FunctionalDependencySet { - todo!() - } -} - -impl StreamPlanRef for PlanBase { - fn distribution(&self) -> &Distribution { - &self.dist - } - - fn append_only(&self) -> bool { - self.append_only - } - - fn emit_on_window_close(&self) -> bool { - self.emit_on_window_close - } -} - -impl StreamPlanRef for PlanRef { - fn distribution(&self) -> &Distribution { - &self.0.dist - } - - fn append_only(&self) -> bool { - self.0.append_only - } - - fn emit_on_window_close(&self) -> bool { - self.0.emit_on_window_close - } -} - -/// Implements [`generic::Join`] with delta join. It requires its two -/// inputs to be indexes. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DeltaJoin { - pub core: generic::Join, - - /// The join condition must be equivalent to `logical.on`, but separated into equal and - /// non-equal parts to facilitate execution later - pub eq_join_predicate: EqJoinPredicate, -} -impl_plan_tree_node_v2_for_stream_binary_node_with_core_delegating!(DeltaJoin, core, left, right); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct DynamicFilter { - pub core: generic::DynamicFilter, -} -impl_plan_tree_node_v2_for_stream_binary_node_with_core_delegating!( - DynamicFilter, - core, - left, - right -); -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Exchange { - pub dist: Distribution, - pub input: PlanRef, -} -impl_plan_tree_node_v2_for_stream_unary_node!(Exchange, input); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Expand { - pub core: generic::Expand, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(Expand, core, input); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Filter { - pub core: generic::Filter, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(Filter, core, input); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct SimpleAgg { - pub core: generic::Agg, - /// The index of `count(*)` in `agg_calls`. - row_count_idx: usize, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(SimpleAgg, core, input); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct GroupTopN { - pub core: generic::TopN, - /// an optional column index which is the vnode of each row computed by the input's consistent - /// hash distribution - pub vnode_col_idx: Option, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(GroupTopN, core, input); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct HashAgg { - pub core: generic::Agg, - /// An optional column index which is the vnode of each row computed by the input's consistent - /// hash distribution. - vnode_col_idx: Option, - /// The index of `count(*)` in `agg_calls`. - row_count_idx: usize, - /// Whether to emit output only when the window is closed by watermark. - emit_on_window_close: bool, - /// The watermark column that Emit-On-Window-Close behavior is based on. - window_col_idx: Option, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(HashAgg, core, input); - -/// Implements [`generic::Join`] with hash table. It builds a hash table -/// from inner (right-side) relation and probes with data from outer (left-side) relation to -/// get output rows. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct HashJoin { - pub core: generic::Join, - - /// The join condition must be equivalent to `logical.on`, but separated into equal and - /// non-equal parts to facilitate execution later - pub eq_join_predicate: EqJoinPredicate, - - /// Whether can optimize for append-only stream. - /// It is true if input of both side is append-only - pub is_append_only: bool, -} -impl_plan_tree_node_v2_for_stream_binary_node_with_core_delegating!(HashJoin, core, left, right); - -impl HashJoin { - /// Return hash join internal table catalog and degree table catalog. - pub fn infer_internal_and_degree_table_catalog( - input: &impl StreamPlanRef, - join_key_indices: Vec, - dk_indices_in_jk: Vec, - ) -> (TableCatalog, TableCatalog, Vec) { - let schema = input.schema(); - - let internal_table_dist_keys = dk_indices_in_jk - .iter() - .map(|idx| join_key_indices[*idx]) - .collect_vec(); - - let degree_table_dist_keys = dk_indices_in_jk.clone(); - - // The pk of hash join internal and degree table should be join_key + input_pk. - let join_key_len = join_key_indices.len(); - let mut pk_indices = join_key_indices; - - // dedup the pk in dist key.. - let mut deduped_input_pk_indices = vec![]; - for input_pk_idx in input.stream_key().unwrap() { - if !pk_indices.contains(input_pk_idx) - && !deduped_input_pk_indices.contains(input_pk_idx) - { - deduped_input_pk_indices.push(*input_pk_idx); - } - } - - pk_indices.extend(deduped_input_pk_indices.clone()); - - // Build internal table - let mut internal_table_catalog_builder = - TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset()); - let internal_columns_fields = schema.fields().to_vec(); - - internal_columns_fields.iter().for_each(|field| { - internal_table_catalog_builder.add_column(field); - }); - pk_indices.iter().for_each(|idx| { - internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) - }); - - // Build degree table. - let mut degree_table_catalog_builder = - TableCatalogBuilder::new(input.ctx().with_options().internal_table_subset()); - - let degree_column_field = Field::with_name(DataType::Int64, "_degree"); - - pk_indices.iter().enumerate().for_each(|(order_idx, idx)| { - degree_table_catalog_builder.add_column(&internal_columns_fields[*idx]); - degree_table_catalog_builder.add_order_column(order_idx, OrderType::ascending()); - }); - degree_table_catalog_builder.add_column(°ree_column_field); - degree_table_catalog_builder - .set_value_indices(vec![degree_table_catalog_builder.columns().len() - 1]); - - internal_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk.clone()); - degree_table_catalog_builder.set_dist_key_in_pk(dk_indices_in_jk); - - ( - internal_table_catalog_builder.build(internal_table_dist_keys, join_key_len), - degree_table_catalog_builder.build(degree_table_dist_keys, join_key_len), - deduped_input_pk_indices, - ) - } -} - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct HopWindow { - pub core: generic::HopWindow, - window_start_exprs: Vec, - window_end_exprs: Vec, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(HopWindow, core, input); - -/// [`IndexScan`] is a virtual plan node to represent a stream table scan. It will be converted -/// to chain + merge node (for upstream materialize) + batch table scan when converting to `MView` -/// creation request. Compared with [`TableScan`], it will reorder columns, and the chain node -/// doesn't allow rearrange. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct IndexScan { - pub core: generic::Scan, - pub batch_plan_id: PlanNodeId, -} -impl_plan_tree_node_v2_for_stream_leaf_node!(IndexScan); - -/// Stateless simple agg. -/// -/// Should only be used for stateless agg, including `sum`, `count` and *append-only* `min`/`max`. -/// -/// The output of `StatelessSimpleAgg` doesn't have pk columns, so the result can only be used by -/// `SimpleAgg` with `ManagedValueState`s. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct StatelessSimpleAgg { - pub core: generic::Agg, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(StatelessSimpleAgg, core, input); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Materialize { - /// Child of Materialize plan - pub input: PlanRef, - pub table: TableCatalog, -} -impl_plan_tree_node_v2_for_stream_unary_node!(Materialize, input); - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct ProjectSet { - pub core: generic::ProjectSet, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(ProjectSet, core, input); - -/// `Project` implements [`super::LogicalProject`] to evaluate specified expressions on input -/// rows. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Project { - pub core: generic::Project, - watermark_derivations: Vec<(usize, usize)>, - merge_chunk: bool, - nondecreasing_exprs: Vec, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(Project, core, input); - -/// [`Sink`] represents a table/connector sink at the very end of the graph. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Sink { - pub input: PlanRef, - pub sink_desc: SinkDesc, -} -impl_plan_tree_node_v2_for_stream_unary_node!(Sink, input); -/// [`Source`] represents a table/connector source at the very beginning of the graph. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct Source { - pub core: generic::Source, -} -impl_plan_tree_node_v2_for_stream_leaf_node!(Source); - -/// `TableScan` is a virtual plan node to represent a stream table scan. It will be converted -/// to chain + merge node (for upstream materialize) + batch table scan when converting to `MView` -/// creation request. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TableScan { - pub core: generic::Scan, - pub batch_plan_id: PlanNodeId, -} -impl_plan_tree_node_v2_for_stream_leaf_node!(TableScan); - -/// `TopN` implements [`super::LogicalTopN`] to find the top N elements with a heap -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct TopN { - pub core: generic::TopN, -} -impl_plan_tree_node_v2_for_stream_unary_node_with_core_delegating!(TopN, core, input); - -#[derive(Clone, Debug, Educe)] -#[educe(PartialEq, Eq, Hash)] -pub struct PlanBase { - #[educe(PartialEq(ignore))] - #[educe(Hash(ignore))] - pub id: PlanNodeId, - #[educe(PartialEq(ignore))] - #[educe(Hash(ignore))] - pub ctx: OptimizerContextRef, - pub schema: Schema, - pub stream_key: Option>, - #[educe(PartialEq(ignore))] - #[educe(Hash(ignore))] - pub dist: Distribution, - pub append_only: bool, - pub emit_on_window_close: bool, -} - -impl_node!( - PlanBase, - Exchange, - DynamicFilter, - DeltaJoin, - Expand, - Filter, - SimpleAgg, - GroupTopN, - HashAgg, - HashJoin, - HopWindow, - IndexScan, - StatelessSimpleAgg, - Materialize, - ProjectSet, - Project, - Sink, - Source, - TableScan, - TopN -); - -use pb_node::PbNodeBody; -#[allow(dead_code)] -pub fn to_stream_prost_body( - (base, core): &PlanOwned, - state: &mut BuildFragmentGraphState, -) -> PbNodeBody { - use pb::*; - match core { - Node::TableScan(_) => todo!(), - Node::IndexScan(_) => todo!(), - // ^ need standalone implementations - Node::Exchange(_) => PbNodeBody::Exchange(ExchangeNode { - strategy: Some(DispatchStrategy { - r#type: match &base.dist { - Distribution::HashShard(_) => DispatcherType::Hash, - Distribution::Single => DispatcherType::Simple, - Distribution::Broadcast => DispatcherType::Broadcast, - _ => panic!("Do not allow Any or AnyShard in serialization process"), - } as i32, - dist_key_indices: match &base.dist { - Distribution::HashShard(keys) => keys.iter().map(|&num| num as u32).collect(), - _ => vec![], - }, - output_indices: (0..base.schema().len() as u32).collect(), - }), - }), - Node::DynamicFilter(me) => { - use generic::dynamic_filter::*; - let me = &me.core; - let condition = me - .predicate() - .as_expr_unless_true() - .map(|x| x.to_expr_proto()); - let left_table = infer_left_internal_table_catalog(base, me.left_index()) - .with_id(state.gen_table_id_wrapped()); - let right_table = infer_right_internal_table_catalog(&me.right().0) - .with_id(state.gen_table_id_wrapped()); - PbNodeBody::DynamicFilter(DynamicFilterNode { - left_key: me.left_index() as u32, - condition, - left_table: Some(left_table.to_internal_table_prost()), - right_table: Some(right_table.to_internal_table_prost()), - }) - } - Node::DeltaJoin(me) => { - let (_, left_node) = &*me.core.left; - let (_, right_node) = &*me.core.right; - fn cast(node: &Node) -> &IndexScan { - match node { - Node::IndexScan(scan) => scan, - _ => unreachable!(), - } - } - let left_table = cast(left_node); - let right_table = cast(right_node); - let left_table_desc = &*left_table.core.table_desc; - let right_table_desc = &*right_table.core.table_desc; - - // TODO: add a separate delta join node in proto, or move fragmenter to frontend so that - // we don't need an intermediate representation. - PbNodeBody::DeltaIndexJoin(DeltaIndexJoinNode { - join_type: me.core.join_type as i32, - left_key: me - .eq_join_predicate - .left_eq_indexes() - .iter() - .map(|v| *v as i32) - .collect(), - right_key: me - .eq_join_predicate - .right_eq_indexes() - .iter() - .map(|v| *v as i32) - .collect(), - condition: me - .eq_join_predicate - .other_cond() - .as_expr_unless_true() - .map(|x| x.to_expr_proto()), - left_table_id: left_table_desc.table_id.table_id(), - right_table_id: right_table_desc.table_id.table_id(), - left_info: Some(ArrangementInfo { - arrange_key_orders: left_table_desc.arrange_key_orders_protobuf(), - column_descs: left_table - .core - .column_descs() - .iter() - .map(ColumnDesc::to_protobuf) - .collect(), - table_desc: Some(left_table_desc.to_protobuf()), - }), - right_info: Some(ArrangementInfo { - arrange_key_orders: right_table_desc.arrange_key_orders_protobuf(), - column_descs: right_table - .core - .column_descs() - .iter() - .map(ColumnDesc::to_protobuf) - .collect(), - table_desc: Some(right_table_desc.to_protobuf()), - }), - output_indices: me.core.output_indices.iter().map(|&x| x as u32).collect(), - }) - } - Node::Expand(me) => { - use pb::expand_node::Subset; - - let me = &me.core; - PbNodeBody::Expand(ExpandNode { - column_subsets: me - .column_subsets - .iter() - .map(|subset| { - let column_indices = subset.iter().map(|&key| key as u32).collect(); - Subset { column_indices } - }) - .collect(), - }) - } - Node::Filter(me) => { - let me = &me.core; - PbNodeBody::Filter(FilterNode { - search_condition: Some(ExprImpl::from(me.predicate.clone()).to_expr_proto()), - }) - } - Node::SimpleAgg(me) => { - let intermediate_state_table = me.core.infer_intermediate_state_table(base, None, None); - let agg_states = me.core.infer_stream_agg_state(base, None, None); - let distinct_dedup_tables = me.core.infer_distinct_dedup_tables(base, None, None); - - PbNodeBody::SimpleAgg(SimpleAggNode { - agg_calls: me - .core - .agg_calls - .iter() - .map(PlanAggCall::to_protobuf) - .collect(), - row_count_index: me.row_count_idx as u32, - distribution_key: base - .dist - .dist_column_indices() - .iter() - .map(|&idx| idx as u32) - .collect(), - is_append_only: me.core.input.0.append_only, - agg_call_states: agg_states - .into_iter() - .map(|s| s.into_prost(state)) - .collect(), - intermediate_state_table: Some( - intermediate_state_table - .with_id(state.gen_table_id_wrapped()) - .to_internal_table_prost(), - ), - distinct_dedup_tables: distinct_dedup_tables - .into_iter() - .map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost())) - .collect(), - }) - } - Node::GroupTopN(me) => { - let input = &me.core.input.0; - let table = me - .core - .infer_internal_table_catalog( - input.schema(), - input.ctx(), - input.stream_key().unwrap(), - me.vnode_col_idx, - ) - .with_id(state.gen_table_id_wrapped()); - let group_topn_node = GroupTopNNode { - limit: me.core.limit_attr.limit(), - offset: me.core.offset, - with_ties: me.core.limit_attr.with_ties(), - group_key: me.core.group_key.iter().map(|idx| *idx as u32).collect(), - table: Some(table.to_internal_table_prost()), - order_by: me.core.order.to_protobuf(), - }; - - PbNodeBody::GroupTopN(group_topn_node) - } - Node::HashAgg(me) => { - let intermediate_state_table = - me.core - .infer_intermediate_state_table(base, me.vnode_col_idx, me.window_col_idx); - let agg_states = - me.core - .infer_stream_agg_state(base, me.vnode_col_idx, me.window_col_idx); - let distinct_dedup_tables = - me.core - .infer_distinct_dedup_tables(base, me.vnode_col_idx, me.window_col_idx); - - PbNodeBody::HashAgg(HashAggNode { - group_key: me.core.group_key.indices().map(|idx| idx as u32).collect(), - agg_calls: me - .core - .agg_calls - .iter() - .map(PlanAggCall::to_protobuf) - .collect(), - row_count_index: me.row_count_idx as u32, - is_append_only: me.core.input.0.append_only, - agg_call_states: agg_states - .into_iter() - .map(|s| s.into_prost(state)) - .collect(), - intermediate_state_table: Some( - intermediate_state_table - .with_id(state.gen_table_id_wrapped()) - .to_internal_table_prost(), - ), - distinct_dedup_tables: distinct_dedup_tables - .into_iter() - .map(|(key_idx, table)| (key_idx as u32, table.to_internal_table_prost())) - .collect(), - emit_on_window_close: me.emit_on_window_close(), - }) - } - Node::HashJoin(_) => { - unreachable!(); - } - Node::HopWindow(me) => { - let window_start_exprs = me - .window_start_exprs - .clone() - .iter() - .map(|x| x.to_expr_proto()) - .collect(); - let window_end_exprs = me - .window_end_exprs - .clone() - .iter() - .map(|x| x.to_expr_proto()) - .collect(); - let me = &me.core; - PbNodeBody::HopWindow(HopWindowNode { - time_col: me.time_col.index() as _, - window_slide: Some(me.window_slide.into()), - window_size: Some(me.window_size.into()), - output_indices: me.output_indices.iter().map(|&x| x as u32).collect(), - window_start_exprs, - window_end_exprs, - }) - } - Node::StatelessSimpleAgg(me) => { - let me = &me.core; - PbNodeBody::StatelessSimpleAgg(SimpleAggNode { - agg_calls: me.agg_calls.iter().map(PlanAggCall::to_protobuf).collect(), - row_count_index: u32::MAX, // this is not used - distribution_key: base - .dist - .dist_column_indices() - .iter() - .map(|&idx| idx as u32) - .collect(), - agg_call_states: vec![], - intermediate_state_table: None, - is_append_only: me.input.0.append_only, - distinct_dedup_tables: Default::default(), - }) - } - Node::Materialize(me) => { - PbNodeBody::Materialize(MaterializeNode { - // We don't need table id for materialize node in frontend. The id will be generated - // on meta catalog service. - table_id: 0, - column_orders: me.table.pk().iter().map(ColumnOrder::to_protobuf).collect(), - table: Some(me.table.to_internal_table_prost()), - }) - } - Node::ProjectSet(_) => { - unreachable!() - } - Node::Project(me) => PbNodeBody::Project(ProjectNode { - select_list: me.core.exprs.iter().map(|x| x.to_expr_proto()).collect(), - watermark_input_cols: me - .watermark_derivations - .iter() - .map(|(x, _)| *x as u32) - .collect(), - watermark_output_cols: me - .watermark_derivations - .iter() - .map(|(_, y)| *y as u32) - .collect(), - nondecreasing_exprs: me.nondecreasing_exprs.iter().map(|i| *i as u32).collect(), - }), - Node::Sink(me) => PbNodeBody::Sink(SinkNode { - sink_desc: Some(me.sink_desc.to_proto()), - table: None, // TODO: Refactor sink to have a generic core. - log_store_type: SinkLogStoreType::InMemoryLogStore as i32, - }), - Node::Source(me) => { - // TODO(kwannoel): Is branch used, seems to be a duplicate of stream_source? - let rate_limit = me.ctx().session_ctx().config().get_streaming_rate_limit(); - let me = &me.core.catalog; - let source_inner = me.as_ref().map(|me| StreamSource { - source_id: me.id, - source_name: me.name.clone(), - state_table: Some( - generic::Source::infer_internal_table_catalog() - .with_id(state.gen_table_id_wrapped()) - .to_internal_table_prost(), - ), - info: Some(me.info.clone()), - row_id_index: me.row_id_index.map(|index| index as _), - columns: me.columns.iter().map(|c| c.to_protobuf()).collect(), - properties: me.properties.clone().into_iter().collect(), - rate_limit, - }); - PbNodeBody::Source(SourceNode { source_inner }) - } - Node::TopN(me) => { - let input = &me.core.input.0; - let me = &me.core; - let topn_node = TopNNode { - limit: me.limit_attr.limit(), - offset: me.offset, - with_ties: me.limit_attr.with_ties(), - table: Some( - me.infer_internal_table_catalog( - input.schema(), - input.ctx(), - input - .stream_key() - .expect("should always have a stream key in the stream plan but not"), - None, - ) - .with_id(state.gen_table_id_wrapped()) - .to_internal_table_prost(), - ), - order_by: me.order.to_protobuf(), - }; - if me.input.0.append_only { - PbNodeBody::AppendOnlyTopN(topn_node) - } else { - PbNodeBody::TopN(topn_node) - } - } - } -} diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index 44acf722eae6..847616629355 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -18,9 +18,9 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::DedupNode; use super::generic::{self, GenericPlanNode, GenericPlanRef}; +use super::stream::StreamPlanRef; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode}; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::PlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index ede1ae68b082..580c3563bfa0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -24,7 +24,6 @@ use super::generic::{self}; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; use crate::expr::{Expr, ExprRewriter}; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{EqJoinPredicate, EqJoinPredicateDisplay}; use crate::optimizer::property::Distribution; diff --git a/src/frontend/src/optimizer/plan_node/stream_derive.rs b/src/frontend/src/optimizer/plan_node/stream_derive.rs deleted file mode 100644 index f3da2b1b6a1d..000000000000 --- a/src/frontend/src/optimizer/plan_node/stream_derive.rs +++ /dev/null @@ -1,630 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use risingwave_common::catalog::Schema; - -use super::generic::GenericPlanNode; -use super::stream::*; -use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::property::{Distribution, FunctionalDependencySet}; -use crate::utils::ColIndexMappingRewriteExt; - -impl GenericPlanNode for DynamicFilter { - fn schema(&self) -> Schema { - todo!("new plan node derivation") - } - - fn stream_key(&self) -> Option> { - todo!("new plan node derivation") - } - - fn ctx(&self) -> OptimizerContextRef { - todo!("new plan node derivation") - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - todo!("new plan node derivation") - } -} -impl StreamPlanNode for DynamicFilter { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for Exchange { - fn schema(&self) -> Schema { - todo!("new plan node derivation") - } - - fn stream_key(&self) -> Option> { - todo!("new plan node derivation") - } - - fn ctx(&self) -> OptimizerContextRef { - todo!("new plan node derivation") - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - todo!("new plan node derivation") - } -} - -impl StreamPlanNode for Exchange { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for DeltaJoin { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for DeltaJoin { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for Expand { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for Expand { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for Filter { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for Filter { - fn distribution(&self) -> Distribution { - self.core.input.distribution().clone() - } - - fn append_only(&self) -> bool { - self.core.input.append_only() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for SimpleAgg { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for SimpleAgg { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for GroupTopN { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for GroupTopN { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for HashAgg { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for HashAgg { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for HashJoin { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for HashJoin { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for HopWindow { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for HopWindow { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for IndexScan { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for IndexScan { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for StatelessSimpleAgg { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for StatelessSimpleAgg { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for Materialize { - fn schema(&self) -> Schema { - todo!("new plan node derivation") - } - - fn stream_key(&self) -> Option> { - todo!("new plan node derivation") - } - - fn ctx(&self) -> OptimizerContextRef { - todo!("new plan node derivation") - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - todo!("new plan node derivation") - } -} - -impl StreamPlanNode for Materialize { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for ProjectSet { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for ProjectSet { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for Project { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for Project { - fn distribution(&self) -> Distribution { - self.core - .i2o_col_mapping() - .rewrite_provided_distribution(self.core.input.distribution()) - } - - fn append_only(&self) -> bool { - self.core.input.append_only() - } - - fn emit_on_window_close(&self) -> bool { - self.core.input.emit_on_window_close() - } -} - -impl GenericPlanNode for Sink { - fn schema(&self) -> Schema { - todo!("new plan node derivation") - } - - fn stream_key(&self) -> Option> { - todo!("new plan node derivation") - } - - fn ctx(&self) -> OptimizerContextRef { - todo!("new plan node derivation") - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - todo!("new plan node derivation") - } -} - -impl StreamPlanNode for Sink { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for Source { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for Source { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for TableScan { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for TableScan { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} - -impl GenericPlanNode for TopN { - fn schema(&self) -> Schema { - self.core.schema() - } - - fn stream_key(&self) -> Option> { - self.core.stream_key() - } - - fn ctx(&self) -> OptimizerContextRef { - self.core.ctx() - } - - fn functional_dependency(&self) -> FunctionalDependencySet { - self.core.functional_dependency() - } -} - -impl StreamPlanNode for TopN { - fn distribution(&self) -> Distribution { - todo!() - } - - fn append_only(&self) -> bool { - todo!() - } - - fn emit_on_window_close(&self) -> bool { - todo!() - } -} diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 427c0de4c89c..797091338915 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -21,7 +21,6 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::generic::{self, PlanWindowFunction}; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 45bcccff7a6c..0fa1713bf448 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -16,7 +16,6 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; -use super::stream::StreamPlanRef; use super::utils::{childless_record, plan_node_name, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, DistributionDisplay}; diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index 619bc98fc640..c7a59b1f847f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -17,7 +17,6 @@ use risingwave_pb::stream_plan::expand_node::Subset; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ExpandNode; -use super::stream::StreamPlanRef; use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 285a8086f5c8..ed4d506b47ae 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -15,7 +15,6 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::FilterNode; -use super::stream::StreamPlanRef; use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index ea0925cf33bb..989de9f8757e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -20,6 +20,7 @@ use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DeltaExpression, HashJoinNode, PbInequalityPair}; +use super::generic::Join; use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill}; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode, @@ -366,15 +367,14 @@ impl StreamNode for StreamHashJoin { let dk_indices_in_jk = self.derive_dist_key_in_join_key(); - use super::stream::HashJoin; let (left_table, left_degree_table, left_deduped_input_pk_indices) = - HashJoin::infer_internal_and_degree_table_catalog( + Join::infer_internal_and_degree_table_catalog( self.left().plan_base(), left_jk_indices, dk_indices_in_jk.clone(), ); let (right_table, right_degree_table, right_deduped_input_pk_indices) = - HashJoin::infer_internal_and_degree_table_catalog( + Join::infer_internal_and_degree_table_catalog( self.right().plan_base(), right_jk_indices, dk_indices_in_jk, diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index ec18ee258877..3780a6cda3f5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -17,7 +17,6 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::HopWindowNode; -use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index 3a159a957af4..8e1b30eaafad 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -17,7 +17,6 @@ use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ProjectNode; -use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{try_derive_watermark, Expr, ExprImpl, ExprRewriter, WatermarkDerivation}; diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index 619fec1f80d1..97c4b70433cb 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -17,7 +17,6 @@ use itertools::Itertools; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ProjectSetNode; -use super::stream::StreamPlanRef; use super::utils::impl_distill_by_unit; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{try_derive_watermark, ExprRewriter, WatermarkDerivation}; diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index c85bd10dc163..083cb877cd4d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -17,7 +17,6 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 639b6c5782bb..296c58944fac 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -20,7 +20,6 @@ use super::generic::{self, PlanAggCall}; use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::property::RequiredDist; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index f9fb325b8af8..eb883103fe51 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -22,7 +22,6 @@ use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; use crate::expr::{Expr, ExprRewriter}; use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{ EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index 74e8dbcf5c48..1d259115b5ce 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -22,7 +22,6 @@ use risingwave_pb::stream_plan::UnionNode; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanRef}; use crate::optimizer::plan_node::generic::GenericPlanNode; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index c4b09bd910c5..4fcaf959eac8 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -59,7 +59,6 @@ use risingwave_pb::batch_plan::ExchangeInfo; use super::super::plan_node::*; use crate::catalog::catalog_service::CatalogReader; use crate::catalog::FragmentId; -use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::property::Order; use crate::optimizer::PlanRef; use crate::scheduler::worker_node_manager::WorkerNodeSelector; @@ -314,23 +313,6 @@ impl RequiredDist { } } - #[allow(dead_code)] - pub fn enforce_stream_if_not_satisfies( - &self, - plan: stream::PlanRef, - ) -> Result { - if !plan.distribution().satisfies(self) { - // FIXME(st1page); - Ok(stream::Exchange { - dist: self.to_dist(), - input: plan, - } - .into()) - } else { - Ok(plan) - } - } - /// check if the distribution satisfies other required distribution pub fn satisfies(&self, required: &RequiredDist) -> bool { match self {