Skip to content

Commit

Permalink
refactor(optimizer): change stream key to option (#12524)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored and stdrc committed Oct 16, 2023
1 parent cab8135 commit e2936bc
Show file tree
Hide file tree
Showing 53 changed files with 182 additions and 148 deletions.
7 changes: 6 additions & 1 deletion src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ pub(crate) fn derive_pk(
columns: &[ColumnCatalog],
) -> (Vec<ColumnOrder>, Vec<usize>) {
// Note(congyi): avoid pk duplication
let stream_key = input.stream_key().iter().copied().unique().collect_vec();
let stream_key = input
.expect_stream_key()
.iter()
.copied()
.unique()
.collect_vec();
let schema = input.schema();

// Assert the uniqueness of column names and IDs, including hidden columns.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
window_col_idx: Option<usize>,
) -> Vec<AggCallState> {
let in_fields = self.input.schema().fields().to_vec();
let in_pks = self.input.stream_key().to_vec();
let in_pks = self.input.stream_key().unwrap().to_vec();
let in_append_only = self.input.append_only();
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Delete<PlanRef> {

fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
} else {
Some(vec![])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for DynamicFilter<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.left.stream_key().to_vec())
Some(self.left.stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub fn infer_left_internal_table_catalog(
let mut pk_indices = vec![left_key_index];
let read_prefix_len_hint = pk_indices.len();

for i in me.stream_key() {
for i in me.stream_key().unwrap() {
if *i != left_key_index {
pk_indices.push(*i);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Except<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
Some(self.inputs[0].stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
let input_schema_len = self.input.schema().len();
let mut pk_indices = self
.input
.stream_key()
.stream_key()?
.iter()
.map(|&pk| pk + input_schema_len)
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
} else {
let mut pk = self
.input
.stream_key()
.stream_key()?
.iter()
.filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/intersect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Intersect<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
Some(self.inputs[0].stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
fn stream_key(&self) -> Option<Vec<usize>> {
let _left_len = self.left.schema().len();
let _right_len = self.right.schema().len();
let left_pk = self.left.stream_key();
let right_pk = self.right.stream_key();
let left_pk = self.left.stream_key()?;
let right_pk = self.right.stream_key()?;
let l2i = self.l2i_col_mapping();
let r2i = self.r2i_col_mapping();
let full_out_col_num = self.internal_column_num();
Expand All @@ -110,7 +110,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {

// NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
// key.
pk_indices.and_then(|mut pk_indices| {
pk_indices.and_then(|mut pk_indices: Vec<usize>| {
let left_len = self.left.schema().len();
let right_len = self.right.schema().len();
let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Limit<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
}
}
impl<PlanRef> Limit<PlanRef> {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub(super) use impl_distill_unit_from_fields;

pub trait GenericPlanRef: Eq + Hash {
fn schema(&self) -> &Schema;
fn stream_key(&self) -> &[usize];
fn stream_key(&self) -> Option<&[usize]>;
fn functional_dependency(&self) -> &FunctionalDependencySet;
fn ctx(&self) -> OptimizerContextRef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
let mut output_pk = self.input.stream_key().to_vec();
let mut output_pk = self.input.stream_key()?.to_vec();
for part_key_idx in self
.window_functions
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
fn stream_key(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
self.input
.stream_key()
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
let i2o = self.i2o_col_mapping();
let mut pk = self
.input
.stream_key()
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Share<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.borrow().stream_key().to_vec())
Some(self.input.borrow().stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
&self,
schema: &Schema,
ctx: OptimizerContextRef,
stream_key: &[usize],
input_stream_key: &[usize],
vnode_col_idx: Option<usize>,
) -> TableCatalog {
let columns_fields = schema.fields().to_vec();
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
order_cols.insert(order.column_index);
});

stream_key.iter().for_each(|idx| {
input_stream_key.iter().for_each(|idx| {
if !order_cols.contains(idx) {
internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
order_cols.insert(*idx);
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
if self.limit_attr.max_one_row() {
Some(self.group_key.clone())
} else {
let mut pk = self.input.stream_key().to_vec();
let mut pk = self.input.stream_key()?.to_vec();
for i in &self.group_key {
if !pk.contains(i) {
pk.push(*i);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Union<PlanRef> {
// Union all its inputs pks + source_col if exists
let mut pk_indices = vec![];
for input in &self.inputs {
for pk in input.stream_key() {
for pk in input.stream_key()? {
if !pk_indices.contains(pk) {
pk_indices.push(*pk);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Update<PlanRef> {

fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
} else {
Some(vec![])
}
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ impl LogicalAgg {
// so it obeys consistent hash strategy via [`Distribution::HashShard`].
let stream_input =
if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() {
RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.stream_key())
.enforce_if_not_satisfies(stream_input, &Order::any())?
RequiredDist::shard_by_key(
stream_input.schema().len(),
stream_input.expect_stream_key(),
)
.enforce_if_not_satisfies(stream_input, &Order::any())?
} else {
stream_input
};
Expand Down
13 changes: 5 additions & 8 deletions src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,13 @@ impl LogicalApply {
let ctx = left.ctx();
let join_core = generic::Join::with_full_output(left, right, join_type, on);
let schema = join_core.schema();
let pk_indices = join_core.stream_key();
let (functional_dependency, pk_indices) = match pk_indices {
Some(pk_indices) => (
FunctionalDependencySet::with_key(schema.len(), &pk_indices),
pk_indices,
),
None => (FunctionalDependencySet::new(schema.len()), vec![]),
let stream_key = join_core.stream_key();
let functional_dependency = match &stream_key {
Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), stream_key),
None => FunctionalDependencySet::new(schema.len()),
};
let (left, right, on, join_type, _output_indices) = join_core.decompose();
let base = PlanBase::new_logical(ctx, schema, pk_indices, functional_dependency);
let base = PlanBase::new_logical(ctx, schema, stream_key, functional_dependency);
LogicalApply {
base,
left,
Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/optimizer/plan_node/logical_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,8 @@ pub struct LogicalDedup {

impl LogicalDedup {
pub fn new(input: PlanRef, dedup_cols: Vec<usize>) -> Self {
let base = PlanBase::new_logical(
input.ctx(),
input.schema().clone(),
dedup_cols.clone(),
input.functional_dependency().clone(),
);
let core = generic::Dedup { input, dedup_cols };
let base = PlanBase::new_logical_with_core(&core);
LogicalDedup { base, core }
}

Expand Down
15 changes: 2 additions & 13 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,12 @@ impl LogicalHopWindow {
output_indices,
};

let _schema = core.schema();
let _pk_indices = core.stream_key();
let ctx = core.ctx();

// NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
// key.
// let pk_indices = match pk_indices {
// Some(pk_indices) if functional_dependency.is_key(&pk_indices) => {
// functional_dependency.minimize_key(&pk_indices)
// }
// _ => pk_indices.unwrap_or_default(),
// };

let base = PlanBase::new_logical(
ctx,
core.schema(),
core.stream_key().unwrap_or_default(),
core.stream_key(),
core.functional_dependency(),
);

Expand Down Expand Up @@ -348,7 +337,7 @@ impl ToStream for LogicalHopWindow {
let i2o = self.core.i2o_col_mapping();
output_indices.extend(
input
.stream_key()
.expect_stream_key()
.iter()
.cloned()
.filter(|i| i2o.try_map(*i).is_none()),
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,14 +1395,14 @@ impl ToStream for LogicalJoin {

// Add missing pk indices to the logical join
let mut left_to_add = left
.stream_key()
.expect_stream_key()
.iter()
.cloned()
.filter(|i| l2o.try_map(*i).is_none())
.collect_vec();

let mut right_to_add = right
.stream_key()
.expect_stream_key()
.iter()
.filter(|&&i| r2o.try_map(i).is_none())
.map(|&i| i + left_len)
Expand Down Expand Up @@ -1464,13 +1464,13 @@ impl ToStream for LogicalJoin {
.composite(&join_with_pk.core.i2o_col_mapping());
let left_right_stream_keys = join_with_pk
.left()
.stream_key()
.expect_stream_key()
.iter()
.map(|i| l2o.map(*i))
.chain(
join_with_pk
.right()
.stream_key()
.expect_stream_key()
.iter()
.map(|i| r2o.map(*i)),
)
Expand Down
33 changes: 20 additions & 13 deletions src/frontend/src/optimizer/plan_node/logical_multi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,7 @@ impl LogicalMultiJoin {
.collect_vec()
};

let pk_indices = {
let mut pk_indices = vec![];
for (i, input_pk) in inputs.iter().map(|input| input.stream_key()).enumerate() {
for input_pk_idx in input_pk {
pk_indices.push(inner_i2o_mappings[i].map(*input_pk_idx));
}
}
pk_indices
.into_iter()
.map(|col_idx| inner2output.try_map(col_idx))
.collect::<Option<Vec<_>>>()
.unwrap_or_default()
};
let pk_indices = Self::derive_stream_key(&inputs, &inner_i2o_mappings, &inner2output);
let functional_dependency = {
let mut fd_set = FunctionalDependencySet::new(tot_col_num);
let mut column_cnt: usize = 0;
Expand Down Expand Up @@ -303,6 +291,25 @@ impl LogicalMultiJoin {
}
}

fn derive_stream_key(
inputs: &[PlanRef],
inner_i2o_mappings: &[ColIndexMapping],
inner2output: &ColIndexMapping,
) -> Option<Vec<usize>> {
// TODO(st1page): add JOIN key
let mut pk_indices = vec![];
for (i, input) in inputs.iter().enumerate() {
let input_stream_key = input.stream_key()?;
for input_pk_idx in input_stream_key {
pk_indices.push(inner_i2o_mappings[i].map(*input_pk_idx));
}
}
pk_indices
.into_iter()
.map(|col_idx| inner2output.try_map(col_idx))
.collect::<Option<Vec<_>>>()
}

/// Get a reference to the logical join's on.
pub fn on(&self) -> &Condition {
&self.on
Expand Down
7 changes: 6 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ impl LogicalNow {
sub_fields: vec![],
type_name: String::default(),
}]);
let base = PlanBase::new_logical(ctx, schema, vec![], FunctionalDependencySet::default());
let base = PlanBase::new_logical(
ctx,
schema,
Some(vec![]),
FunctionalDependencySet::default(),
);
Self { base }
}
}
Expand Down
Loading

0 comments on commit e2936bc

Please sign in to comment.