Skip to content

Commit

Permalink
Update optimizer/mod.rs & optimizer/plan_node/stream_union.rs. Create…
Browse files Browse the repository at this point in the history
… stream_plan var & handle union_inputs. Add new constructor to StreamUnion.
  • Loading branch information
shanicky committed Nov 13, 2023
1 parent 97a2613 commit 531c202
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
44 changes: 30 additions & 14 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ use crate::optimizer::plan_node::{
};
use crate::optimizer::plan_visitor::TemporalJoinValidator;
use crate::optimizer::property::Distribution;
use crate::optimizer::property::Distribution::HashShard;
use crate::utils::ColIndexMappingRewriteExt;
use crate::WithOptions;

Expand Down Expand Up @@ -512,7 +511,7 @@ impl PlanRoot {
.map(|c| c.column_desc.clone())
.collect();

let mut stream_plan = if with_external_source {
let union_inputs = if with_external_source {
let mut external_source_node = stream_plan;
external_source_node = inject_project_if_needed(&columns, external_source_node)?;
external_source_node = match kind {
Expand Down Expand Up @@ -544,12 +543,7 @@ impl PlanRoot {
column_descs,
)?;

StreamUnion::new(Union {
all: true,
inputs: vec![external_source_node, dml_node],
source_col: None,
})
.into()
vec![external_source_node, dml_node]
} else {
let dml_node = inject_dml_node(
&columns,
Expand All @@ -560,13 +554,35 @@ impl PlanRoot {
column_descs,
)?;

StreamUnion::new(Union {
vec![dml_node]
};

let dists = union_inputs
.iter()
.map(|input| input.distribution())
.dedup()
.collect_vec();

let dist = match &dists[..] {
&[Distribution::SomeShard, Distribution::HashShard(_)]
| &[Distribution::HashShard(_), Distribution::SomeShard] => Distribution::SomeShard,
&[dist @ Distribution::SomeShard] | &[dist @ Distribution::HashShard(_)] => {
dist.clone()
}
_ => {
unreachable!()
}
};

let mut stream_plan = StreamUnion::new_with_dist(
Union {
all: true,
inputs: vec![dml_node],
inputs: union_inputs,
source_col: None,
})
.into()
};
},
dist.clone(),
)
.into();

// Add WatermarkFilter node.
if !watermark_descs.is_empty() {
Expand All @@ -583,7 +599,7 @@ impl PlanRoot {
stream_plan = StreamRowIdGen::new_with_dist(
stream_plan,
row_id_index,
HashShard(vec![row_id_index]),
Distribution::HashShard(vec![row_id_index]),
)
.into();
}
Expand Down
11 changes: 10 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanRef};
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::{PlanBase, PlanTreeNode, StreamNode};
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;

/// `StreamUnion` implements [`super::LogicalUnion`]
Expand All @@ -40,6 +41,12 @@ impl StreamUnion {
let inputs = &core.inputs;
let dist = inputs[0].distribution().clone();
assert!(inputs.iter().all(|input| *input.distribution() == dist));
Self::new_with_dist(core, dist)
}

pub fn new_with_dist(core: generic::Union<PlanRef>, dist: Distribution) -> Self {
let inputs = &core.inputs;

let watermark_columns = inputs.iter().fold(
{
let mut bitset = FixedBitSet::with_capacity(core.schema().len());
Expand All @@ -56,6 +63,7 @@ impl StreamUnion {
inputs.iter().all(|x| x.emit_on_window_close()),
watermark_columns,
);

StreamUnion { base, core }
}
}
Expand All @@ -78,7 +86,8 @@ impl PlanTreeNode for StreamUnion {
fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
let mut new = self.core.clone();
new.inputs = inputs.to_vec();
Self::new(new).into()
let dist = self.distribution().clone();
Self::new_with_dist(new, dist).into()
}
}

Expand Down

0 comments on commit 531c202

Please sign in to comment.