From 636d235ea54f56f60ce48d537569e6c124705d3d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Fri, 20 Oct 2023 18:50:59 +0800 Subject: [PATCH] more type exercises Signed-off-by: Bugen Zhao --- .../src/optimizer/plan_node/batch_limit.rs | 1 + .../optimizer/plan_node/batch_over_window.rs | 1 + .../src/optimizer/plan_node/batch_project.rs | 1 + .../src/optimizer/plan_node/batch_seq_scan.rs | 3 +- .../src/optimizer/plan_node/batch_source.rs | 1 + .../src/optimizer/plan_node/batch_update.rs | 1 + .../src/optimizer/plan_node/generic/mod.rs | 3 +- .../src/optimizer/plan_node/logical_apply.rs | 4 +- .../src/optimizer/plan_node/logical_expand.rs | 1 + .../src/optimizer/plan_node/logical_filter.rs | 1 + .../optimizer/plan_node/logical_hop_window.rs | 2 +- .../optimizer/plan_node/logical_multi_join.rs | 1 + .../src/optimizer/plan_node/logical_update.rs | 1 + .../src/optimizer/plan_node/merge_eq_nodes.rs | 1 + src/frontend/src/optimizer/plan_node/mod.rs | 38 +++- .../src/optimizer/plan_node/plan_base.rs | 170 +++++++++--------- .../src/optimizer/plan_node/stream_dml.rs | 1 + .../plan_node/stream_eowc_over_window.rs | 2 +- .../optimizer/plan_node/stream_exchange.rs | 2 +- .../optimizer/plan_node/stream_group_topn.rs | 2 +- .../optimizer/plan_node/stream_hash_agg.rs | 2 +- .../optimizer/plan_node/stream_hop_window.rs | 1 + .../src/optimizer/plan_node/stream_project.rs | 1 + .../src/optimizer/plan_node/stream_share.rs | 2 + .../optimizer/plan_node/stream_simple_agg.rs | 3 +- .../src/optimizer/plan_node/stream_sort.rs | 2 +- .../plan_node/stream_stateless_simple_agg.rs | 1 + .../optimizer/plan_node/stream_table_scan.rs | 2 + .../src/optimizer/plan_node/stream_topn.rs | 3 +- .../src/optimizer/plan_node/stream_union.rs | 1 + .../src/optimizer/plan_node/stream_values.rs | 1 + .../optimizer/plan_rewriter/plan_cloner.rs | 1 + .../plan_rewriter/share_source_rewriter.rs | 1 + .../plan_visitor/share_parent_counter.rs | 1 + .../rule/always_false_filter_rule.rs | 1 + .../rule/apply_join_transpose_rule.rs | 1 + .../optimizer/rule/index_selection_rule.rs | 1 + .../rule/left_deep_tree_join_ordering_rule.rs | 1 + .../optimizer/rule/merge_multijoin_rule.rs | 1 + .../optimizer/rule/min_max_on_index_rule.rs | 2 +- .../rule/over_window_to_topn_rule.rs | 1 + .../rule/pull_up_correlated_predicate_rule.rs | 1 + .../rule/trivial_project_to_values_rule.rs | 1 + .../rule/union_input_values_merge_rule.rs | 1 + 44 files changed, 166 insertions(+), 104 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/batch_limit.rs b/src/frontend/src/optimizer/plan_node/batch_limit.rs index c395f443e57d8..c6e8091705443 100644 --- a/src/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/src/frontend/src/optimizer/plan_node/batch_limit.rs @@ -16,6 +16,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LimitNode; +use super::generic::PhysicalPlanRef; use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, diff --git a/src/frontend/src/optimizer/plan_node/batch_over_window.rs b/src/frontend/src/optimizer/plan_node/batch_over_window.rs index 9de951ae22eef..ba2ec93053406 100644 --- a/src/frontend/src/optimizer/plan_node/batch_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_over_window.rs @@ -17,6 +17,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortOverWindowNode; +use super::batch::BatchPlanRef; use super::generic::PlanWindowFunction; use super::utils::impl_distill_by_unit; use super::{ diff --git a/src/frontend/src/optimizer/plan_node/batch_project.rs b/src/frontend/src/optimizer/plan_node/batch_project.rs index d82970605db50..34ee2c6de174c 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project.rs @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::ProjectNode; use risingwave_pb::expr::ExprNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 353254eb84b16..4785e64b91562 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -24,7 +24,8 @@ use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize; use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode}; use risingwave_pb::plan_common::PbColumnDesc; -use super::generic::GenericPlanRef; +use super::batch::BatchPlanRef; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch}; use crate::catalog::ColumnId; diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index e9d18c4a09135..e956892e78a5f 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -19,6 +19,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SourceNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, column_names_pretty, Distill}; use super::{ generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index 19bb60b9aa1d8..49c2d57b138c1 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -18,6 +18,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::UpdateNode; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index ca7205b3ac324..aec59c90bcc4e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -18,7 +18,7 @@ use std::hash::Hash; use pretty_xmlish::XmlNode; use risingwave_common::catalog::Schema; -use super::{stream, EqJoinPredicate}; +use super::{stream, EqJoinPredicate, PlanNodeId}; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::{Distribution, FunctionalDependencySet}; @@ -85,6 +85,7 @@ macro_rules! impl_distill_unit_from_fields { pub(super) use impl_distill_unit_from_fields; pub trait GenericPlanRef: Eq + Hash { + fn id(&self) -> PlanNodeId; fn schema(&self) -> &Schema; fn stream_key(&self) -> Option<&[usize]>; fn functional_dependency(&self) -> &FunctionalDependencySet; diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs index 7640f093fc933..b398ce7494f61 100644 --- a/src/frontend/src/optimizer/plan_node/logical_apply.rs +++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs @@ -18,7 +18,9 @@ use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::plan_common::JoinType; -use super::generic::{self, push_down_into_join, push_down_join_condition, GenericPlanNode}; +use super::generic::{ + self, push_down_into_join, push_down_join_condition, GenericPlanNode, GenericPlanRef, +}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, LogicalJoin, LogicalProject, PlanBase, PlanRef, PlanTreeNodeBinary, diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs index 5cc823cb1e1b7..d1f3b666feef5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_expand.rs +++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs @@ -15,6 +15,7 @@ use itertools::Itertools; use risingwave_common::error::Result; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchExpand, ColPrunable, ExprRewritable, PlanBase, PlanRef, diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index 4d5ab47c8fa1d..a62b91aac5277 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -18,6 +18,7 @@ use risingwave_common::bail; use risingwave_common::error::Result; use risingwave_common::types::DataType; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ generic, ColPrunable, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index ac193f068ba0d..da2ec2138c3d1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -17,7 +17,7 @@ use itertools::Itertools; use risingwave_common::error::Result; use risingwave_common::types::Interval; -use super::generic::GenericPlanNode; +use super::generic::{GenericPlanNode, GenericPlanRef}; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter, diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index dc4d627f3f37b..9b740abd7718e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -863,6 +863,7 @@ mod test { use super::*; use crate::expr::{FunctionCall, InputRef}; use crate::optimizer::optimizer_context::OptimizerContext; + use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::LogicalValues; use crate::optimizer::property::FunctionalDependency; #[tokio::test] diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 80e4f350d8edb..1dbe1d3d3c5c9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -15,6 +15,7 @@ use risingwave_common::catalog::TableVersionId; use risingwave_common::error::Result; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, LogicalProject, diff --git a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs index 73f82e86aa260..9f2e8d94634be 100644 --- a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs +++ b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::hash::Hash; +use super::generic::GenericPlanRef; use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan}; use crate::optimizer::plan_visitor; use crate::utils::{Endo, Visit}; diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 75113a631d8de..b133f2d140233 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -419,7 +419,28 @@ impl PlanTreeNode for PlanRef { } } -impl GenericPlanRef for PlanRef { +impl PlanNodeMeta for PlanRef { + fn node_type(&self) -> PlanNodeType { + self.0.node_type() + } + + fn plan_base(&self) -> &PlanBase { + self.0.plan_base() + } + + fn convention(&self) -> Convention { + self.0.convention() + } +} + +impl

GenericPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ + fn id(&self) -> PlanNodeId { + self.plan_base().id() + } + fn schema(&self) -> &Schema { self.plan_base().schema() } @@ -437,13 +458,19 @@ impl GenericPlanRef for PlanRef { } } -impl PhysicalPlanRef for PlanRef { +impl

PhysicalPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ fn distribution(&self) -> &Distribution { self.plan_base().distribution() } } -impl StreamPlanRef for PlanRef { +impl

StreamPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ fn append_only(&self) -> bool { self.plan_base().append_only() } @@ -457,7 +484,10 @@ impl StreamPlanRef for PlanRef { } } -impl BatchPlanRef for PlanRef { +impl

BatchPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ fn order(&self) -> &Order { self.plan_base().order() } diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 31dc16123be5f..2ccbb1b59aeeb 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -14,15 +14,63 @@ use educe::Educe; use fixedbitset::FixedBitSet; -use paste::paste; use risingwave_common::catalog::Schema; use super::generic::GenericPlanNode; use super::*; -use crate::for_all_plan_nodes; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order}; +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct StreamExtra { + /// The append-only property of the PlanNode's output is a stream-only property. Append-only + /// means the stream contains only insert operation. + append_only: bool, + /// Whether the output is emitted on window close. + emit_on_window_close: bool, + /// The watermark column indices of the PlanNode's output. There could be watermark output from + /// this stream operator. + watermark_columns: FixedBitSet, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct BatchExtra { + /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect + /// correctness, but insert unnecessary sort in plan + order: Order, +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +enum PhysicalExtraInner { + Stream(StreamExtra), + Batch(BatchExtra), +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct PhysicalExtra { + /// The distribution property of the PlanNode's output, store an `Distribution::any()` here + /// will not affect correctness, but insert unnecessary exchange in plan + dist: Distribution, + + inner: PhysicalExtraInner, +} + +impl PhysicalExtra { + fn stream(&self) -> &StreamExtra { + match &self.inner { + PhysicalExtraInner::Stream(extra) => extra, + _ => panic!("access stream properties from batch plan node"), + } + } + + fn batch(&self) -> &BatchExtra { + match &self.inner { + PhysicalExtraInner::Batch(extra) => extra, + _ => panic!("access batch properties from stream plan node"), + } + } +} + /// the common fields of all nodes, please make a field named `base` in /// every planNode and correctly value it when construct the planNode. /// @@ -47,35 +95,28 @@ pub struct PlanBase { stream_key: Option>, functional_dependency: FunctionalDependencySet, - // -- physical-only fields -- - /// The distribution property of the PlanNode's output, store an `Distribution::any()` here - /// will not affect correctness, but insert unnecessary exchange in plan - dist: Distribution, - - // -- batch-only fields -- - /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect - /// correctness, but insert unnecessary sort in plan - order: Order, - - // -- stream-only fields -- - /// The append-only property of the PlanNode's output is a stream-only property. Append-only - /// means the stream contains only insert operation. - append_only: bool, - /// Whether the output is emitted on window close. - emit_on_window_close: bool, - /// The watermark column indices of the PlanNode's output. There could be watermark output from - /// this stream operator. - watermark_columns: FixedBitSet, + physical_extra: Option, } impl PlanBase { - /// The unique id of the `PlanNode` in this optimizer context. - pub fn id(&self) -> PlanNodeId { - self.id + fn physical_extra(&self) -> &PhysicalExtra { + self.physical_extra + .as_ref() + .expect("access physical properties from logical plan node") + } + + fn physical_extra_mut(&mut self) -> &mut PhysicalExtra { + self.physical_extra + .as_mut() + .expect("access physical properties from logical plan node") } } impl generic::GenericPlanRef for PlanBase { + fn id(&self) -> PlanNodeId { + self.id + } + fn schema(&self) -> &Schema { &self.schema } @@ -95,27 +136,27 @@ impl generic::GenericPlanRef for PlanBase { impl generic::PhysicalPlanRef for PlanBase { fn distribution(&self) -> &Distribution { - &self.dist + &self.physical_extra().dist } } impl stream::StreamPlanRef for PlanBase { fn append_only(&self) -> bool { - self.append_only + self.physical_extra().stream().append_only } fn emit_on_window_close(&self) -> bool { - self.emit_on_window_close + self.physical_extra().stream().emit_on_window_close } fn watermark_columns(&self) -> &FixedBitSet { - &self.watermark_columns + &self.physical_extra().stream().watermark_columns } } impl batch::BatchPlanRef for PlanBase { fn order(&self) -> &Order { - &self.order + &self.physical_extra().batch().order } } @@ -127,19 +168,13 @@ impl PlanBase { functional_dependency: FunctionalDependencySet, ) -> Self { let id = ctx.next_plan_node_id(); - let watermark_columns = FixedBitSet::with_capacity(schema.len()); Self { id, ctx, schema, stream_key, - dist: Distribution::Single, - order: Order::any(), - // Logical plan node won't touch `append_only` field - append_only: true, - emit_on_window_close: false, functional_dependency, - watermark_columns, + physical_extra: None, } } @@ -168,13 +203,16 @@ impl PlanBase { id, ctx, schema, - dist, - order: Order::any(), stream_key, - append_only, - emit_on_window_close, functional_dependency, - watermark_columns, + physical_extra: Some(PhysicalExtra { + dist, + inner: PhysicalExtraInner::Stream(StreamExtra { + append_only, + emit_on_window_close, + watermark_columns, + }), + }), } } @@ -205,19 +243,16 @@ impl PlanBase { ) -> Self { let id = ctx.next_plan_node_id(); let functional_dependency = FunctionalDependencySet::new(schema.len()); - let watermark_columns = FixedBitSet::with_capacity(schema.len()); Self { id, ctx, schema, - dist, - order, stream_key: None, - // Batch plan node won't touch `append_only` field - append_only: true, - emit_on_window_close: false, // TODO(rc): batch EOWC support? functional_dependency, - watermark_columns, + physical_extra: Some(PhysicalExtra { + dist, + inner: PhysicalExtraInner::Batch(BatchExtra { order }), + }), } } @@ -237,7 +272,7 @@ impl PlanBase { pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self { let mut new = self.clone(); - new.dist = dist; + new.physical_extra_mut().dist = dist; new } } @@ -249,40 +284,3 @@ impl PlanBase { &mut self.functional_dependency } } - -macro_rules! impl_base_delegate { - ($( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl [<$convention $name>] { - pub fn id(&self) -> PlanNodeId { - self.plan_base().id - } - pub fn ctx(&self) -> OptimizerContextRef { - self.plan_base().ctx() - } - pub fn schema(&self) -> &Schema { - &self.plan_base().schema - } - pub fn stream_key(&self) -> Option<&[usize]> { - self.plan_base().stream_key() - } - pub fn order(&self) -> &Order { - &self.plan_base().order - } - pub fn distribution(&self) -> &Distribution { - &self.plan_base().dist - } - pub fn append_only(&self) -> bool { - self.plan_base().append_only - } - pub fn emit_on_window_close(&self) -> bool { - self.plan_base().emit_on_window_close - } - pub fn functional_dependency(&self) -> &FunctionalDependencySet { - &self.plan_base().functional_dependency - } - } - })* - } -} -for_all_plan_nodes! { impl_base_delegate } diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index c9f969384c3a4..9b000974786e4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use super::stream::StreamPlanRef; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 922fd1e59812d..4e8aff5bba262 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::{self, PlanWindowFunction}; +use super::generic::{self, GenericPlanRef, PlanWindowFunction}; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 6166a5553cc9d..99e6c3c5161a1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -16,7 +16,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; -use super::generic::PhysicalPlanRef; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; use super::stream::StreamPlanRef; use super::utils::{childless_record, plan_node_name, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 79240a27d3fd4..03bbafbcb2274 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -16,7 +16,7 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::{DistillUnit, TopNLimit}; +use super::generic::{DistillUnit, GenericPlanRef, TopNLimit}; use super::stream::StreamPlanRef; use super::utils::{plan_node_name, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode}; diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 5959da98609e0..78c3d758a5a47 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -18,7 +18,7 @@ use pretty_xmlish::XmlNode; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::{self, PlanAggCall}; +use super::generic::{self, GenericPlanRef, PlanAggCall}; use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index e39f536172e19..26822b6a5cb53 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -17,6 +17,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::HopWindowNode; +use super::generic::GenericPlanRef; use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index 50fef2f1d937d..3648cc21d17a0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -17,6 +17,7 @@ use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ProjectNode; +use super::generic::GenericPlanRef; use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 06de3eb9f602c..fb53ffea364d2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -16,6 +16,8 @@ use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::PbStreamNode; +use super::generic::GenericPlanRef; +use super::stream::StreamPlanRef; use super::utils::Distill; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode}; use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode}; diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index 9471d2333c5e6..ad1e80910c6b2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -50,8 +50,7 @@ impl StreamSimpleAgg { let watermark_columns = FixedBitSet::with_capacity(logical.output_len()); // Simple agg executor might change the append-only behavior of the stream. - let base = - PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns); + let base = PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns); StreamSimpleAgg { base, logical, diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index 0ca0429ad8bff..41a56a0fd5df2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -20,7 +20,7 @@ use risingwave_common::catalog::FieldDisplay; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::PhysicalPlanRef; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; use super::stream::StreamPlanRef; use super::utils::{childless_record, Distill, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 0046bca258bf9..3e8f560267ecc 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -20,6 +20,7 @@ use super::generic::{self, PlanAggCall}; use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; +use crate::optimizer::plan_node::generic::PhysicalPlanRef; use crate::optimizer::property::RequiredDist; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 21a02581b9aee..6d8e75ebec6e6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ChainType, PbStreamNode}; +use super::generic::PhysicalPlanRef; use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode}; use crate::catalog::ColumnId; use crate::expr::{ExprRewriter, FunctionCall}; use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 969d747941a76..20a0fe01b47a9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -40,8 +40,7 @@ impl StreamTopN { }; let watermark_columns = FixedBitSet::with_capacity(input.schema().len()); - let base = - PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns); + let base = PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns); StreamTopN { base, logical } } diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index 9348ccf132a59..f5ce62f334519 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -19,6 +19,7 @@ use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::UnionNode; +use super::generic::GenericPlanRef; use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanRef}; diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index fb0b844411f63..f8cc5db851159 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -18,6 +18,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::values_node::ExprTuple; use risingwave_pb::stream_plan::ValuesNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode}; use crate::expr::{Expr, ExprImpl}; diff --git a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs index f30f3d9fa4966..7e53b903ac962 100644 --- a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs +++ b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use itertools::Itertools; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare}; use crate::optimizer::PlanRewriter; use crate::PlanRef; diff --git a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs index db7ecd92c7aa5..b1042c3c3f98f 100644 --- a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs +++ b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use crate::catalog::SourceId; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ LogicalShare, LogicalSource, PlanNodeId, PlanTreeNode, StreamShare, }; diff --git a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs index 2b21f9d806e72..907fa183c7a99 100644 --- a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs +++ b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use super::{DefaultBehavior, DefaultValue}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNodeUnary}; use crate::optimizer::plan_visitor::PlanVisitor; diff --git a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs index 02165232372e4..eeba7d9f3be3b 100644 --- a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs +++ b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs @@ -15,6 +15,7 @@ use risingwave_common::types::ScalarImpl; use super::Rule; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalFilter, LogicalValues}; use crate::PlanRef; diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs index 66579248a76f9..7ac121692c81d 100644 --- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs @@ -23,6 +23,7 @@ use crate::expr::{ CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, }; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary}; use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder}; use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter; diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index 9103d1bc906bc..323cc59ef3558 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -66,6 +66,7 @@ use crate::expr::{ FunctionCall, InputRef, }; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ generic, ColumnPruningContext, LogicalJoin, LogicalScan, LogicalUnion, PlanTreeNode, PlanTreeNodeBinary, PredicatePushdown, PredicatePushdownContext, diff --git a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs index dcbb6f7b015ee..bd2db0ac67cca 100644 --- a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs +++ b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs @@ -47,6 +47,7 @@ mod tests { use super::*; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::optimizer_context::OptimizerContext; + use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::utils::Condition; #[tokio::test] diff --git a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs index c496a906400ae..8682db8491a1d 100644 --- a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs +++ b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs @@ -46,6 +46,7 @@ mod tests { use super::*; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::optimizer_context::OptimizerContext; + use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::utils::Condition; #[tokio::test] diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs index ea8386bc227f8..c32ae40531cd0 100644 --- a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs +++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs @@ -27,7 +27,7 @@ use risingwave_expr::aggregate::AggKind; use super::{BoxedRule, Rule}; use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef}; -use crate::optimizer::plan_node::generic::Agg; +use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef}; use crate::optimizer::plan_node::{ LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary, }; diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs index dfb6963c7fb4f..93637d3ba8193 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs @@ -18,6 +18,7 @@ use risingwave_expr::window_function::WindowFuncKind; use super::Rule; use crate::expr::{collect_input_refs, ExprImpl, ExprType}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalFilter, LogicalTopN, PlanTreeNodeUnary}; use crate::optimizer::property::Order; use crate::planner::LIMIT_ALL_COUNT; diff --git a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs index dc5f9c2bc9aba..f34146ba80050 100644 --- a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs +++ b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs @@ -18,6 +18,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use super::super::plan_node::*; use super::{BoxedRule, Rule}; use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_visitor::{PlanCorrelatedIdFinder, PlanVisitor}; use crate::optimizer::PlanRef; use crate::utils::Condition; diff --git a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs index 9759739490fe6..a13bef3baa9d9 100644 --- a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs +++ b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalValues, PlanTreeNodeUnary}; use crate::optimizer::plan_visitor::{LogicalCardinalityExt, SideEffectVisitor}; use crate::optimizer::{PlanRef, PlanVisitor}; diff --git a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs index 8119b8847b600..7b83c017ab781 100644 --- a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs +++ b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::LogicalValues; use crate::optimizer::{PlanRef, PlanTreeNode};