Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): change stream key to option #12524

Merged
merged 14 commits into from
Oct 10, 2023
8 changes: 7 additions & 1 deletion src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use super::PlanRef;
use crate::optimizer::property::Order;


st1page marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn derive_columns(
input_schema: &Schema,
out_names: Vec<String>,
Expand Down Expand Up @@ -82,7 +83,12 @@ pub(crate) fn derive_pk(
columns: &[ColumnCatalog],
) -> (Vec<ColumnOrder>, Vec<usize>) {
// Note(congyi): avoid pk duplication
let stream_key = input.stream_key().iter().copied().unique().collect_vec();
let stream_key = input
.expect_stream_key()
.iter()
.copied()
.unique()
.collect_vec();
let schema = input.schema();

// Assert the uniqueness of column names and IDs, including hidden columns.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
window_col_idx: Option<usize>,
) -> Vec<AggCallState> {
let in_fields = self.input.schema().fields().to_vec();
let in_pks = self.input.stream_key().to_vec();
let in_pks = self.input.stream_key().unwrap().to_vec();
let in_append_only = self.input.append_only();
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Delete<PlanRef> {

fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
} else {
Some(vec![])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for DynamicFilter<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.left.stream_key().to_vec())
Some(self.left.stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub fn infer_left_internal_table_catalog(
let mut pk_indices = vec![left_key_index];
let read_prefix_len_hint = pk_indices.len();

for i in me.stream_key() {
for i in me.stream_key().unwrap() {
if *i != left_key_index {
pk_indices.push(*i);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Except<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
Some(self.inputs[0].stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
let input_schema_len = self.input.schema().len();
let mut pk_indices = self
.input
.stream_key()
.stream_key()?
.iter()
.map(|&pk| pk + input_schema_len)
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
} else {
let mut pk = self
.input
.stream_key()
.stream_key()?
.iter()
.filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/intersect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Intersect<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
Some(self.inputs[0].stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
fn stream_key(&self) -> Option<Vec<usize>> {
let _left_len = self.left.schema().len();
let _right_len = self.right.schema().len();
let left_pk = self.left.stream_key();
let right_pk = self.right.stream_key();
let left_pk = self.left.stream_key()?;
let right_pk = self.right.stream_key()?;
let l2i = self.l2i_col_mapping();
let r2i = self.r2i_col_mapping();
let full_out_col_num = self.internal_column_num();
Expand All @@ -110,7 +110,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {

// NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
// key.
pk_indices.and_then(|mut pk_indices| {
pk_indices.and_then(|mut pk_indices: Vec<usize>| {
let left_len = self.left.schema().len();
let right_len = self.right.schema().len();
let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Limit<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
}
}
impl<PlanRef> Limit<PlanRef> {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub(super) use impl_distill_unit_from_fields;

pub trait GenericPlanRef: Eq + Hash {
fn schema(&self) -> &Schema;
fn stream_key(&self) -> &[usize];
fn stream_key(&self) -> Option<&[usize]>;
fn functional_dependency(&self) -> &FunctionalDependencySet;
fn ctx(&self) -> OptimizerContextRef;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
let mut output_pk = self.input.stream_key().to_vec();
let mut output_pk = self.input.stream_key()?.to_vec();
for part_key_idx in self
.window_functions
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
fn stream_key(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
self.input
.stream_key()
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
let i2o = self.i2o_col_mapping();
let mut pk = self
.input
.stream_key()
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Share<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.borrow().stream_key().to_vec())
Some(self.input.borrow().stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
&self,
schema: &Schema,
ctx: OptimizerContextRef,
stream_key: &[usize],
input_stream_key: &[usize],
vnode_col_idx: Option<usize>,
) -> TableCatalog {
let columns_fields = schema.fields().to_vec();
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
order_cols.insert(order.column_index);
});

stream_key.iter().for_each(|idx| {
input_stream_key.iter().for_each(|idx| {
if !order_cols.contains(idx) {
internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
order_cols.insert(*idx);
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
if self.limit_attr.max_one_row() {
Some(self.group_key.clone())
} else {
let mut pk = self.input.stream_key().to_vec();
let mut pk = self.input.stream_key()?.to_vec();
for i in &self.group_key {
if !pk.contains(i) {
pk.push(*i);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Union<PlanRef> {
// Union all its inputs pks + source_col if exists
let mut pk_indices = vec![];
for input in &self.inputs {
for pk in input.stream_key() {
for pk in input.stream_key()? {
if !pk_indices.contains(pk) {
pk_indices.push(*pk);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Update<PlanRef> {

fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
} else {
Some(vec![])
}
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::utils::{
ColIndexMapping, ColIndexMappingRewriteExt, Condition, GroupBy, IndexSet, Substitute,
};


/// `LogicalAgg` groups input data by their group key and computes aggregation functions.
///
/// It corresponds to the `GROUP BY` operator in a SQL query statement together with the aggregate
Expand Down Expand Up @@ -222,8 +223,11 @@ impl LogicalAgg {
// so it obeys consistent hash strategy via [`Distribution::HashShard`].
let stream_input =
if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() {
RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.stream_key())
.enforce_if_not_satisfies(stream_input, &Order::any())?
RequiredDist::shard_by_key(
stream_input.schema().len(),
stream_input.expect_stream_key(),
)
.enforce_if_not_satisfies(stream_input, &Order::any())?
} else {
stream_input
};
Expand Down
13 changes: 5 additions & 8 deletions src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,13 @@ impl LogicalApply {
let ctx = left.ctx();
let join_core = generic::Join::with_full_output(left, right, join_type, on);
let schema = join_core.schema();
let pk_indices = join_core.stream_key();
let (functional_dependency, pk_indices) = match pk_indices {
Some(pk_indices) => (
FunctionalDependencySet::with_key(schema.len(), &pk_indices),
pk_indices,
),
None => (FunctionalDependencySet::new(schema.len()), vec![]),
let stream_key = join_core.stream_key();
let functional_dependency = match &stream_key {
Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), stream_key),
None => FunctionalDependencySet::new(schema.len()),
};
let (left, right, on, join_type, _output_indices) = join_core.decompose();
let base = PlanBase::new_logical(ctx, schema, pk_indices, functional_dependency);
let base = PlanBase::new_logical(ctx, schema, stream_key, functional_dependency);
LogicalApply {
base,
left,
Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/optimizer/plan_node/logical_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,8 @@ pub struct LogicalDedup {

impl LogicalDedup {
pub fn new(input: PlanRef, dedup_cols: Vec<usize>) -> Self {
let base = PlanBase::new_logical(
input.ctx(),
input.schema().clone(),
dedup_cols.clone(),
input.functional_dependency().clone(),
);
let core = generic::Dedup { input, dedup_cols };
let base = PlanBase::new_logical_with_core(&core);
LogicalDedup { base, core }
}

Expand Down
16 changes: 3 additions & 13 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::optimizer::plan_node::{
};
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};


/// `LogicalHopWindow` implements Hop Table Function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogicalHopWindow {
Expand Down Expand Up @@ -63,23 +64,12 @@ impl LogicalHopWindow {
output_indices,
};

let _schema = core.schema();
let _pk_indices = core.stream_key();
let ctx = core.ctx();

// NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
// key.
// let pk_indices = match pk_indices {
// Some(pk_indices) if functional_dependency.is_key(&pk_indices) => {
// functional_dependency.minimize_key(&pk_indices)
// }
// _ => pk_indices.unwrap_or_default(),
// };

let base = PlanBase::new_logical(
ctx,
core.schema(),
core.stream_key().unwrap_or_default(),
core.stream_key(),
core.functional_dependency(),
);

Expand Down Expand Up @@ -344,7 +334,7 @@ impl ToStream for LogicalHopWindow {
let i2o = self.core.i2o_col_mapping();
output_indices.extend(
input
.stream_key()
.expect_stream_key()
.iter()
.cloned()
.filter(|i| i2o.try_map(*i).is_none()),
Expand Down
9 changes: 5 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::optimizer::plan_visitor::LogicalCardinalityExt;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay};


/// `LogicalJoin` combines two relations according to some condition.
///
/// Each output row has fields from the left and right inputs. The set of output rows is a subset
Expand Down Expand Up @@ -1395,14 +1396,14 @@ impl ToStream for LogicalJoin {

// Add missing pk indices to the logical join
let mut left_to_add = left
.stream_key()
.expect_stream_key()
.iter()
.cloned()
.filter(|i| l2o.try_map(*i).is_none())
.collect_vec();

let mut right_to_add = right
.stream_key()
.expect_stream_key()
.iter()
.filter(|&&i| r2o.try_map(i).is_none())
.map(|&i| i + left_len)
Expand Down Expand Up @@ -1464,13 +1465,13 @@ impl ToStream for LogicalJoin {
.composite(&join_with_pk.core.i2o_col_mapping());
let left_right_stream_keys = join_with_pk
.left()
.stream_key()
.expect_stream_key()
.iter()
.map(|i| l2o.map(*i))
.chain(
join_with_pk
.right()
.stream_key()
.expect_stream_key()
.iter()
.map(|i| r2o.map(*i)),
)
Expand Down
Loading
Loading