diff --git a/src/frontend/src/optimizer/plan_node/batch_limit.rs b/src/frontend/src/optimizer/plan_node/batch_limit.rs
index c395f443e57d8..c6e8091705443 100644
--- a/src/frontend/src/optimizer/plan_node/batch_limit.rs
+++ b/src/frontend/src/optimizer/plan_node/batch_limit.rs
@@ -16,6 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LimitNode;
+use super::generic::PhysicalPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
diff --git a/src/frontend/src/optimizer/plan_node/batch_over_window.rs b/src/frontend/src/optimizer/plan_node/batch_over_window.rs
index 9de951ae22eef..ba2ec93053406 100644
--- a/src/frontend/src/optimizer/plan_node/batch_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/batch_over_window.rs
@@ -17,6 +17,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SortOverWindowNode;
+use super::batch::BatchPlanRef;
use super::generic::PlanWindowFunction;
use super::utils::impl_distill_by_unit;
use super::{
diff --git a/src/frontend/src/optimizer/plan_node/batch_project.rs b/src/frontend/src/optimizer/plan_node/batch_project.rs
index d82970605db50..34ee2c6de174c 100644
--- a/src/frontend/src/optimizer/plan_node/batch_project.rs
+++ b/src/frontend/src/optimizer/plan_node/batch_project.rs
@@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ProjectNode;
use risingwave_pb::expr::ExprNode;
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
index 353254eb84b16..4785e64b91562 100644
--- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
@@ -24,7 +24,8 @@ use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize;
use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode};
use risingwave_pb::plan_common::PbColumnDesc;
-use super::generic::GenericPlanRef;
+use super::batch::BatchPlanRef;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::catalog::ColumnId;
diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs
index e9d18c4a09135..e956892e78a5f 100644
--- a/src/frontend/src/optimizer/plan_node/batch_source.rs
+++ b/src/frontend/src/optimizer/plan_node/batch_source.rs
@@ -19,6 +19,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SourceNode;
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, column_names_pretty, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch,
diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs
index 19bb60b9aa1d8..49c2d57b138c1 100644
--- a/src/frontend/src/optimizer/plan_node/batch_update.rs
+++ b/src/frontend/src/optimizer/plan_node/batch_update.rs
@@ -18,6 +18,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::UpdateNode;
+use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs
index ca7205b3ac324..aec59c90bcc4e 100644
--- a/src/frontend/src/optimizer/plan_node/generic/mod.rs
+++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs
@@ -18,7 +18,7 @@ use std::hash::Hash;
use pretty_xmlish::XmlNode;
use risingwave_common::catalog::Schema;
-use super::{stream, EqJoinPredicate};
+use super::{stream, EqJoinPredicate, PlanNodeId};
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::{Distribution, FunctionalDependencySet};
@@ -85,6 +85,7 @@ macro_rules! impl_distill_unit_from_fields {
pub(super) use impl_distill_unit_from_fields;
pub trait GenericPlanRef: Eq + Hash {
+ fn id(&self) -> PlanNodeId;
fn schema(&self) -> &Schema;
fn stream_key(&self) -> Option<&[usize]>;
fn functional_dependency(&self) -> &FunctionalDependencySet;
diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs
index 7640f093fc933..b398ce7494f61 100644
--- a/src/frontend/src/optimizer/plan_node/logical_apply.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs
@@ -18,7 +18,9 @@ use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_pb::plan_common::JoinType;
-use super::generic::{self, push_down_into_join, push_down_join_condition, GenericPlanNode};
+use super::generic::{
+ self, push_down_into_join, push_down_join_condition, GenericPlanNode, GenericPlanRef,
+};
use super::utils::{childless_record, Distill};
use super::{
ColPrunable, LogicalJoin, LogicalProject, PlanBase, PlanRef, PlanTreeNodeBinary,
diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs
index 5cc823cb1e1b7..d1f3b666feef5 100644
--- a/src/frontend/src/optimizer/plan_node/logical_expand.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs
@@ -15,6 +15,7 @@
use itertools::Itertools;
use risingwave_common::error::Result;
+use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchExpand, ColPrunable, ExprRewritable, PlanBase, PlanRef,
diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs
index 4d5ab47c8fa1d..a62b91aac5277 100644
--- a/src/frontend/src/optimizer/plan_node/logical_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs
@@ -18,6 +18,7 @@ use risingwave_common::bail;
use risingwave_common::error::Result;
use risingwave_common::types::DataType;
+use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
generic, ColPrunable, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary,
diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs
index ac193f068ba0d..da2ec2138c3d1 100644
--- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs
@@ -17,7 +17,7 @@ use itertools::Itertools;
use risingwave_common::error::Result;
use risingwave_common::types::Interval;
-use super::generic::GenericPlanNode;
+use super::generic::{GenericPlanNode, GenericPlanRef};
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter,
diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs
index dc4d627f3f37b..9b740abd7718e 100644
--- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs
@@ -863,6 +863,7 @@ mod test {
use super::*;
use crate::expr::{FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::property::FunctionalDependency;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs
index 80e4f350d8edb..1dbe1d3d3c5c9 100644
--- a/src/frontend/src/optimizer/plan_node/logical_update.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_update.rs
@@ -15,6 +15,7 @@
use risingwave_common::catalog::TableVersionId;
use risingwave_common::error::Result;
+use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, LogicalProject,
diff --git a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
index 73f82e86aa260..9f2e8d94634be 100644
--- a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
+++ b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::hash::Hash;
+use super::generic::GenericPlanRef;
use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan};
use crate::optimizer::plan_visitor;
use crate::utils::{Endo, Visit};
diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs
index 75113a631d8de..b133f2d140233 100644
--- a/src/frontend/src/optimizer/plan_node/mod.rs
+++ b/src/frontend/src/optimizer/plan_node/mod.rs
@@ -419,7 +419,28 @@ impl PlanTreeNode for PlanRef {
}
}
-impl GenericPlanRef for PlanRef {
+impl PlanNodeMeta for PlanRef {
+ fn node_type(&self) -> PlanNodeType {
+ self.0.node_type()
+ }
+
+ fn plan_base(&self) -> &PlanBase {
+ self.0.plan_base()
+ }
+
+ fn convention(&self) -> Convention {
+ self.0.convention()
+ }
+}
+
+impl
GenericPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn id(&self) -> PlanNodeId {
+ self.plan_base().id()
+ }
+
fn schema(&self) -> &Schema {
self.plan_base().schema()
}
@@ -437,13 +458,19 @@ impl GenericPlanRef for PlanRef {
}
}
-impl PhysicalPlanRef for PlanRef {
+impl
PhysicalPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
fn distribution(&self) -> &Distribution {
self.plan_base().distribution()
}
}
-impl StreamPlanRef for PlanRef {
+impl
StreamPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
fn append_only(&self) -> bool {
self.plan_base().append_only()
}
@@ -457,7 +484,10 @@ impl StreamPlanRef for PlanRef {
}
}
-impl BatchPlanRef for PlanRef {
+impl
BatchPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
fn order(&self) -> &Order {
self.plan_base().order()
}
diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs
index 31dc16123be5f..2ccbb1b59aeeb 100644
--- a/src/frontend/src/optimizer/plan_node/plan_base.rs
+++ b/src/frontend/src/optimizer/plan_node/plan_base.rs
@@ -14,15 +14,63 @@
use educe::Educe;
use fixedbitset::FixedBitSet;
-use paste::paste;
use risingwave_common::catalog::Schema;
use super::generic::GenericPlanNode;
use super::*;
-use crate::for_all_plan_nodes;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order};
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct StreamExtra {
+ /// The append-only property of the PlanNode's output is a stream-only property. Append-only
+ /// means the stream contains only insert operation.
+ append_only: bool,
+ /// Whether the output is emitted on window close.
+ emit_on_window_close: bool,
+ /// The watermark column indices of the PlanNode's output. There could be watermark output from
+ /// this stream operator.
+ watermark_columns: FixedBitSet,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct BatchExtra {
+ /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
+ /// correctness, but insert unnecessary sort in plan
+ order: Order,
+}
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+enum PhysicalExtraInner {
+ Stream(StreamExtra),
+ Batch(BatchExtra),
+}
+
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct PhysicalExtra {
+ /// The distribution property of the PlanNode's output, store an `Distribution::any()` here
+ /// will not affect correctness, but insert unnecessary exchange in plan
+ dist: Distribution,
+
+ inner: PhysicalExtraInner,
+}
+
+impl PhysicalExtra {
+ fn stream(&self) -> &StreamExtra {
+ match &self.inner {
+ PhysicalExtraInner::Stream(extra) => extra,
+ _ => panic!("access stream properties from batch plan node"),
+ }
+ }
+
+ fn batch(&self) -> &BatchExtra {
+ match &self.inner {
+ PhysicalExtraInner::Batch(extra) => extra,
+ _ => panic!("access batch properties from stream plan node"),
+ }
+ }
+}
+
/// the common fields of all nodes, please make a field named `base` in
/// every planNode and correctly value it when construct the planNode.
///
@@ -47,35 +95,28 @@ pub struct PlanBase {
stream_key: Option>,
functional_dependency: FunctionalDependencySet,
- // -- physical-only fields --
- /// The distribution property of the PlanNode's output, store an `Distribution::any()` here
- /// will not affect correctness, but insert unnecessary exchange in plan
- dist: Distribution,
-
- // -- batch-only fields --
- /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
- /// correctness, but insert unnecessary sort in plan
- order: Order,
-
- // -- stream-only fields --
- /// The append-only property of the PlanNode's output is a stream-only property. Append-only
- /// means the stream contains only insert operation.
- append_only: bool,
- /// Whether the output is emitted on window close.
- emit_on_window_close: bool,
- /// The watermark column indices of the PlanNode's output. There could be watermark output from
- /// this stream operator.
- watermark_columns: FixedBitSet,
+ physical_extra: Option,
}
impl PlanBase {
- /// The unique id of the `PlanNode` in this optimizer context.
- pub fn id(&self) -> PlanNodeId {
- self.id
+ fn physical_extra(&self) -> &PhysicalExtra {
+ self.physical_extra
+ .as_ref()
+ .expect("access physical properties from logical plan node")
+ }
+
+ fn physical_extra_mut(&mut self) -> &mut PhysicalExtra {
+ self.physical_extra
+ .as_mut()
+ .expect("access physical properties from logical plan node")
}
}
impl generic::GenericPlanRef for PlanBase {
+ fn id(&self) -> PlanNodeId {
+ self.id
+ }
+
fn schema(&self) -> &Schema {
&self.schema
}
@@ -95,27 +136,27 @@ impl generic::GenericPlanRef for PlanBase {
impl generic::PhysicalPlanRef for PlanBase {
fn distribution(&self) -> &Distribution {
- &self.dist
+ &self.physical_extra().dist
}
}
impl stream::StreamPlanRef for PlanBase {
fn append_only(&self) -> bool {
- self.append_only
+ self.physical_extra().stream().append_only
}
fn emit_on_window_close(&self) -> bool {
- self.emit_on_window_close
+ self.physical_extra().stream().emit_on_window_close
}
fn watermark_columns(&self) -> &FixedBitSet {
- &self.watermark_columns
+ &self.physical_extra().stream().watermark_columns
}
}
impl batch::BatchPlanRef for PlanBase {
fn order(&self) -> &Order {
- &self.order
+ &self.physical_extra().batch().order
}
}
@@ -127,19 +168,13 @@ impl PlanBase {
functional_dependency: FunctionalDependencySet,
) -> Self {
let id = ctx.next_plan_node_id();
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
stream_key,
- dist: Distribution::Single,
- order: Order::any(),
- // Logical plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false,
functional_dependency,
- watermark_columns,
+ physical_extra: None,
}
}
@@ -168,13 +203,16 @@ impl PlanBase {
id,
ctx,
schema,
- dist,
- order: Order::any(),
stream_key,
- append_only,
- emit_on_window_close,
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra {
+ dist,
+ inner: PhysicalExtraInner::Stream(StreamExtra {
+ append_only,
+ emit_on_window_close,
+ watermark_columns,
+ }),
+ }),
}
}
@@ -205,19 +243,16 @@ impl PlanBase {
) -> Self {
let id = ctx.next_plan_node_id();
let functional_dependency = FunctionalDependencySet::new(schema.len());
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
- dist,
- order,
stream_key: None,
- // Batch plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false, // TODO(rc): batch EOWC support?
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra {
+ dist,
+ inner: PhysicalExtraInner::Batch(BatchExtra { order }),
+ }),
}
}
@@ -237,7 +272,7 @@ impl PlanBase {
pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
let mut new = self.clone();
- new.dist = dist;
+ new.physical_extra_mut().dist = dist;
new
}
}
@@ -249,40 +284,3 @@ impl PlanBase {
&mut self.functional_dependency
}
}
-
-macro_rules! impl_base_delegate {
- ($( { $convention:ident, $name:ident }),*) => {
- $(paste! {
- impl [<$convention $name>] {
- pub fn id(&self) -> PlanNodeId {
- self.plan_base().id
- }
- pub fn ctx(&self) -> OptimizerContextRef {
- self.plan_base().ctx()
- }
- pub fn schema(&self) -> &Schema {
- &self.plan_base().schema
- }
- pub fn stream_key(&self) -> Option<&[usize]> {
- self.plan_base().stream_key()
- }
- pub fn order(&self) -> &Order {
- &self.plan_base().order
- }
- pub fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
- }
- pub fn append_only(&self) -> bool {
- self.plan_base().append_only
- }
- pub fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
- }
- pub fn functional_dependency(&self) -> &FunctionalDependencySet {
- &self.plan_base().functional_dependency
- }
- }
- })*
- }
-}
-for_all_plan_nodes! { impl_base_delegate }
diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs
index c9f969384c3a4..9b000974786e4 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dml.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs
@@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
index 922fd1e59812d..4e8aff5bba262 100644
--- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
@@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanWindowFunction};
+use super::generic::{self, GenericPlanRef, PlanWindowFunction};
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
index 6166a5553cc9d..99e6c3c5161a1 100644
--- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
@@ -16,7 +16,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};
-use super::generic::PhysicalPlanRef;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::stream::StreamPlanRef;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
index 79240a27d3fd4..03bbafbcb2274 100644
--- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
@@ -16,7 +16,7 @@ use fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{DistillUnit, TopNLimit};
+use super::generic::{DistillUnit, GenericPlanRef, TopNLimit};
use super::stream::StreamPlanRef;
use super::utils::{plan_node_name, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
index 5959da98609e0..78c3d758a5a47 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
@@ -18,7 +18,7 @@ use pretty_xmlish::XmlNode;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanAggCall};
+use super::generic::{self, GenericPlanRef, PlanAggCall};
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
index e39f536172e19..26822b6a5cb53 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
@@ -17,6 +17,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::HopWindowNode;
+use super::generic::GenericPlanRef;
use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs
index 50fef2f1d937d..3648cc21d17a0 100644
--- a/src/frontend/src/optimizer/plan_node/stream_project.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_project.rs
@@ -17,6 +17,7 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ProjectNode;
+use super::generic::GenericPlanRef;
use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs
index 06de3eb9f602c..fb53ffea364d2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_share.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_share.rs
@@ -16,6 +16,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::PbStreamNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::Distill;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode};
use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
index 9471d2333c5e6..ad1e80910c6b2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
@@ -50,8 +50,7 @@ impl StreamSimpleAgg {
let watermark_columns = FixedBitSet::with_capacity(logical.output_len());
// Simple agg executor might change the append-only behavior of the stream.
- let base =
- PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns);
StreamSimpleAgg {
base,
logical,
diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs
index 0ca0429ad8bff..41a56a0fd5df2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sort.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs
@@ -20,7 +20,7 @@ use risingwave_common::catalog::FieldDisplay;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::PhysicalPlanRef;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
index 0046bca258bf9..3e8f560267ecc 100644
--- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
@@ -20,6 +20,7 @@ use super::generic::{self, PlanAggCall};
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::property::RequiredDist;
use crate::stream_fragmenter::BuildFragmentGraphState;
diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
index 21a02581b9aee..6d8e75ebec6e6 100644
--- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
@@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ChainType, PbStreamNode};
+use super::generic::PhysicalPlanRef;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::{ExprRewriter, FunctionCall};
use crate::optimizer::plan_node::generic::GenericPlanRef;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs
index 969d747941a76..20a0fe01b47a9 100644
--- a/src/frontend/src/optimizer/plan_node/stream_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs
@@ -40,8 +40,7 @@ impl StreamTopN {
};
let watermark_columns = FixedBitSet::with_capacity(input.schema().len());
- let base =
- PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&logical, dist, false, false, watermark_columns);
StreamTopN { base, logical }
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs
index 9348ccf132a59..f5ce62f334519 100644
--- a/src/frontend/src/optimizer/plan_node/stream_union.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_union.rs
@@ -19,6 +19,7 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::UnionNode;
+use super::generic::GenericPlanRef;
use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanRef};
diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs
index fb0b844411f63..f8cc5db851159 100644
--- a/src/frontend/src/optimizer/plan_node/stream_values.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_values.rs
@@ -18,6 +18,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::values_node::ExprTuple;
use risingwave_pb::stream_plan::ValuesNode;
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
use crate::expr::{Expr, ExprImpl};
diff --git a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
index f30f3d9fa4966..7e53b903ac962 100644
--- a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
@@ -16,6 +16,7 @@ use std::collections::HashMap;
use itertools::Itertools;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare};
use crate::optimizer::PlanRewriter;
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
index db7ecd92c7aa5..b1042c3c3f98f 100644
--- a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
@@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet};
use itertools::Itertools;
use crate::catalog::SourceId;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalShare, LogicalSource, PlanNodeId, PlanTreeNode, StreamShare,
};
diff --git a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
index 2b21f9d806e72..907fa183c7a99 100644
--- a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
+++ b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use super::{DefaultBehavior, DefaultValue};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::PlanVisitor;
diff --git a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
index 02165232372e4..eeba7d9f3be3b 100644
--- a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
+++ b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
@@ -15,6 +15,7 @@
use risingwave_common::types::ScalarImpl;
use super::Rule;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalValues};
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
index 66579248a76f9..7ac121692c81d 100644
--- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
+++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
@@ -23,6 +23,7 @@ use crate::expr::{
CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall,
InputRef,
};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary};
use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder};
use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter;
diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs
index 9103d1bc906bc..323cc59ef3558 100644
--- a/src/frontend/src/optimizer/rule/index_selection_rule.rs
+++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs
@@ -66,6 +66,7 @@ use crate::expr::{
FunctionCall, InputRef,
};
use crate::optimizer::optimizer_context::OptimizerContextRef;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
generic, ColumnPruningContext, LogicalJoin, LogicalScan, LogicalUnion, PlanTreeNode,
PlanTreeNodeBinary, PredicatePushdown, PredicatePushdownContext,
diff --git a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
index dcbb6f7b015ee..bd2db0ac67cca 100644
--- a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
+++ b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
@@ -47,6 +47,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
index c496a906400ae..8682db8491a1d 100644
--- a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
+++ b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
@@ -46,6 +46,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
index ea8386bc227f8..c32ae40531cd0 100644
--- a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
+++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
@@ -27,7 +27,7 @@ use risingwave_expr::aggregate::AggKind;
use super::{BoxedRule, Rule};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
-use crate::optimizer::plan_node::generic::Agg;
+use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{
LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
};
diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
index dfb6963c7fb4f..93637d3ba8193 100644
--- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
+++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
@@ -18,6 +18,7 @@ use risingwave_expr::window_function::WindowFuncKind;
use super::Rule;
use crate::expr::{collect_input_refs, ExprImpl, ExprType};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalTopN, PlanTreeNodeUnary};
use crate::optimizer::property::Order;
use crate::planner::LIMIT_ALL_COUNT;
diff --git a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
index dc5f9c2bc9aba..f34146ba80050 100644
--- a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
+++ b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
@@ -18,6 +18,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use super::super::plan_node::*;
use super::{BoxedRule, Rule};
use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_visitor::{PlanCorrelatedIdFinder, PlanVisitor};
use crate::optimizer::PlanRef;
use crate::utils::Condition;
diff --git a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
index 9759739490fe6..a13bef3baa9d9 100644
--- a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
+++ b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalValues, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::{LogicalCardinalityExt, SideEffectVisitor};
use crate::optimizer::{PlanRef, PlanVisitor};
diff --git a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
index 8119b8847b600..7b83c017ab781 100644
--- a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
+++ b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::{PlanRef, PlanTreeNode};