diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 8d13badedeb4b..f16ebfb0c792c 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -433,6 +433,7 @@ impl PlanNodeMeta for PlanRef { } } +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. impl

GenericPlanRef for P where P: PlanNodeMeta + Eq + Hash, @@ -458,6 +459,8 @@ where } } +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. +// TODO: further constrain the convention to be `Stream` or `Batch`. impl

PhysicalPlanRef for P where P: PlanNodeMeta + Eq + Hash, @@ -467,6 +470,8 @@ where } } +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. +// TODO: further constrain the convention to be `Stream`. impl

StreamPlanRef for P where P: PlanNodeMeta + Eq + Hash, @@ -484,6 +489,8 @@ where } } +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. +// TODO: further constrain the convention to be `Batch`. impl

BatchPlanRef for P where P: PlanNodeMeta + Eq + Hash, diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 2ccbb1b59aeeb..51b1aa5f41141 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -21,8 +21,20 @@ use super::*; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order}; +/// Common extra fields for physical plan nodes. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct PhysicalCommonExtra { + /// The distribution property of the PlanNode's output, store an `Distribution::any()` here + /// will not affect correctness, but insert unnecessary exchange in plan + dist: Distribution, +} + +/// Extra fields for stream plan nodes. #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct StreamExtra { + /// Common fields for physical plan nodes. + physical: PhysicalCommonExtra, + /// The append-only property of the PlanNode's output is a stream-only property. Append-only /// means the stream contains only insert operation. append_only: bool, @@ -33,39 +45,49 @@ struct StreamExtra { watermark_columns: FixedBitSet, } +/// Extra fields for batch plan nodes. #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct BatchExtra { + /// Common fields for physical plan nodes. + physical: PhysicalCommonExtra, + /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect /// correctness, but insert unnecessary sort in plan order: Order, } +/// Extra fields for physical plan nodes. #[derive(Clone, Debug, PartialEq, Eq, Hash)] -enum PhysicalExtraInner { +enum PhysicalExtra { Stream(StreamExtra), Batch(BatchExtra), } -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -struct PhysicalExtra { - /// The distribution property of the PlanNode's output, store an `Distribution::any()` here - /// will not affect correctness, but insert unnecessary exchange in plan - dist: Distribution, +impl PhysicalExtra { + fn common(&self) -> &PhysicalCommonExtra { + match self { + PhysicalExtra::Stream(stream) => &stream.physical, + PhysicalExtra::Batch(batch) => &batch.physical, + } + } - inner: PhysicalExtraInner, -} + fn common_mut(&mut self) -> &mut PhysicalCommonExtra { + match self { + PhysicalExtra::Stream(stream) => &mut stream.physical, + PhysicalExtra::Batch(batch) => &mut batch.physical, + } + } -impl PhysicalExtra { fn stream(&self) -> &StreamExtra { - match &self.inner { - PhysicalExtraInner::Stream(extra) => extra, + match self { + PhysicalExtra::Stream(extra) => extra, _ => panic!("access stream properties from batch plan node"), } } fn batch(&self) -> &BatchExtra { - match &self.inner { - PhysicalExtraInner::Batch(extra) => extra, + match self { + PhysicalExtra::Batch(extra) => extra, _ => panic!("access batch properties from stream plan node"), } } @@ -95,6 +117,7 @@ pub struct PlanBase { stream_key: Option>, functional_dependency: FunctionalDependencySet, + /// Extra fields if the plan node is physical. physical_extra: Option, } @@ -136,7 +159,7 @@ impl generic::GenericPlanRef for PlanBase { impl generic::PhysicalPlanRef for PlanBase { fn distribution(&self) -> &Distribution { - &self.physical_extra().dist + &self.physical_extra().common().dist } } @@ -205,14 +228,14 @@ impl PlanBase { schema, stream_key, functional_dependency, - physical_extra: Some(PhysicalExtra { - dist, - inner: PhysicalExtraInner::Stream(StreamExtra { + physical_extra: Some(PhysicalExtra::Stream({ + StreamExtra { + physical: PhysicalCommonExtra { dist }, append_only, emit_on_window_close, watermark_columns, - }), - }), + } + })), } } @@ -249,10 +272,12 @@ impl PlanBase { schema, stream_key: None, functional_dependency, - physical_extra: Some(PhysicalExtra { - dist, - inner: PhysicalExtraInner::Batch(BatchExtra { order }), - }), + physical_extra: Some(PhysicalExtra::Batch({ + BatchExtra { + physical: PhysicalCommonExtra { dist }, + order, + } + })), } } @@ -270,9 +295,12 @@ impl PlanBase { new } + /// Clone the plan node with a new distribution. + /// + /// Panics if the plan node is not physical. pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self { let mut new = self.clone(); - new.physical_extra_mut().dist = dist; + new.physical_extra_mut().common_mut().dist = dist; new } }