Skip to content

Commit

Permalink
refine docs and extra structs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 23, 2023
1 parent 1315a58 commit 6266b96
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 24 deletions.
7 changes: 7 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ impl PlanNodeMeta for PlanRef {
}
}

/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
impl<P> GenericPlanRef for P
where
P: PlanNodeMeta + Eq + Hash,
Expand All @@ -458,6 +459,8 @@ where
}
}

/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
// TODO: further constrain the convention to be `Stream` or `Batch`.
impl<P> PhysicalPlanRef for P
where
P: PlanNodeMeta + Eq + Hash,
Expand All @@ -467,6 +470,8 @@ where
}
}

/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
// TODO: further constrain the convention to be `Stream`.
impl<P> StreamPlanRef for P
where
P: PlanNodeMeta + Eq + Hash,
Expand All @@ -484,6 +489,8 @@ where
}
}

/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
// TODO: further constrain the convention to be `Batch`.
impl<P> BatchPlanRef for P
where
P: PlanNodeMeta + Eq + Hash,
Expand Down
76 changes: 52 additions & 24 deletions src/frontend/src/optimizer/plan_node/plan_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,20 @@ use super::*;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order};

/// Common extra fields for physical plan nodes.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct PhysicalCommonExtra {
/// The distribution property of the PlanNode's output, store an `Distribution::any()` here
/// will not affect correctness, but insert unnecessary exchange in plan
dist: Distribution,
}

/// Extra fields for stream plan nodes.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct StreamExtra {
/// Common fields for physical plan nodes.
physical: PhysicalCommonExtra,

/// The append-only property of the PlanNode's output is a stream-only property. Append-only
/// means the stream contains only insert operation.
append_only: bool,
Expand All @@ -33,39 +45,49 @@ struct StreamExtra {
watermark_columns: FixedBitSet,
}

/// Extra fields for batch plan nodes.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct BatchExtra {
/// Common fields for physical plan nodes.
physical: PhysicalCommonExtra,

/// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
/// correctness, but insert unnecessary sort in plan
order: Order,
}

/// Extra fields for physical plan nodes.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
enum PhysicalExtraInner {
enum PhysicalExtra {
Stream(StreamExtra),
Batch(BatchExtra),
}

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct PhysicalExtra {
/// The distribution property of the PlanNode's output, store an `Distribution::any()` here
/// will not affect correctness, but insert unnecessary exchange in plan
dist: Distribution,
impl PhysicalExtra {
fn common(&self) -> &PhysicalCommonExtra {
match self {
PhysicalExtra::Stream(stream) => &stream.physical,
PhysicalExtra::Batch(batch) => &batch.physical,
}
}

inner: PhysicalExtraInner,
}
fn common_mut(&mut self) -> &mut PhysicalCommonExtra {
match self {
PhysicalExtra::Stream(stream) => &mut stream.physical,
PhysicalExtra::Batch(batch) => &mut batch.physical,
}
}

impl PhysicalExtra {
fn stream(&self) -> &StreamExtra {
match &self.inner {
PhysicalExtraInner::Stream(extra) => extra,
match self {
PhysicalExtra::Stream(extra) => extra,
_ => panic!("access stream properties from batch plan node"),
}
}

fn batch(&self) -> &BatchExtra {
match &self.inner {
PhysicalExtraInner::Batch(extra) => extra,
match self {
PhysicalExtra::Batch(extra) => extra,
_ => panic!("access batch properties from stream plan node"),
}
}
Expand Down Expand Up @@ -95,6 +117,7 @@ pub struct PlanBase {
stream_key: Option<Vec<usize>>,
functional_dependency: FunctionalDependencySet,

/// Extra fields if the plan node is physical.
physical_extra: Option<PhysicalExtra>,
}

Expand Down Expand Up @@ -136,7 +159,7 @@ impl generic::GenericPlanRef for PlanBase {

impl generic::PhysicalPlanRef for PlanBase {
fn distribution(&self) -> &Distribution {
&self.physical_extra().dist
&self.physical_extra().common().dist
}
}

Expand Down Expand Up @@ -205,14 +228,14 @@ impl PlanBase {
schema,
stream_key,
functional_dependency,
physical_extra: Some(PhysicalExtra {
dist,
inner: PhysicalExtraInner::Stream(StreamExtra {
physical_extra: Some(PhysicalExtra::Stream({
StreamExtra {
physical: PhysicalCommonExtra { dist },
append_only,
emit_on_window_close,
watermark_columns,
}),
}),
}
})),
}
}

Expand Down Expand Up @@ -249,10 +272,12 @@ impl PlanBase {
schema,
stream_key: None,
functional_dependency,
physical_extra: Some(PhysicalExtra {
dist,
inner: PhysicalExtraInner::Batch(BatchExtra { order }),
}),
physical_extra: Some(PhysicalExtra::Batch({
BatchExtra {
physical: PhysicalCommonExtra { dist },
order,
}
})),
}
}

Expand All @@ -270,9 +295,12 @@ impl PlanBase {
new
}

/// Clone the plan node with a new distribution.
///
/// Panics if the plan node is not physical.
pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
let mut new = self.clone();
new.physical_extra_mut().dist = dist;
new.physical_extra_mut().common_mut().dist = dist;
new
}
}
Expand Down

0 comments on commit 6266b96

Please sign in to comment.