Skip to content

Commit

Permalink
change implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Sep 25, 2023
1 parent ea3ea6b commit 284cfdb
Show file tree
Hide file tree
Showing 52 changed files with 251 additions and 132 deletions.
12 changes: 11 additions & 1 deletion src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};

use super::PlanRef;
use crate::optimizer::property::Order;
use crate::Explain;

pub(crate) fn derive_columns(
input_schema: &Schema,
Expand Down Expand Up @@ -82,7 +83,16 @@ pub(crate) fn derive_pk(
columns: &[ColumnCatalog],
) -> (Vec<ColumnOrder>, Vec<usize>) {
// Note(congyi): avoid pk duplication
let stream_key = input.stream_key().iter().copied().unique().collect_vec();
let stream_key = input
.stream_key()
.expect(&format!(
"should always have a stream key on the top of the stream plan but not, plan: {}",
input.explain_to_string()
))
.iter()
.copied()
.unique()
.collect_vec();
let schema = input.schema();

// Assert the uniqueness of column names and IDs, including hidden columns.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
window_col_idx: Option<usize>,
) -> Vec<AggCallState> {
let in_fields = self.input.schema().fields().to_vec();
let in_pks = self.input.stream_key().to_vec();
let in_pks = self.input.stream_key().unwrap().to_vec();
let in_append_only = self.input.append_only();
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Delete<PlanRef> {

fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
} else {
Some(vec![])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for DynamicFilter<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.left.stream_key().to_vec())
Some(self.left.stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub fn infer_left_internal_table_catalog(
let mut pk_indices = vec![left_key_index];
let read_prefix_len_hint = pk_indices.len();

for i in me.stream_key() {
for i in me.stream_key().unwrap() {
if *i != left_key_index {
pk_indices.push(*i);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Except<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
Some(self.inputs[0].stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
let input_schema_len = self.input.schema().len();
let mut pk_indices = self
.input
.stream_key()
.stream_key()?
.iter()
.map(|&pk| pk + input_schema_len)
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
} else {
let mut pk = self
.input
.stream_key()
.stream_key()?
.iter()
.filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/intersect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Intersect<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
Some(self.inputs[0].stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
fn stream_key(&self) -> Option<Vec<usize>> {
let _left_len = self.left.schema().len();
let _right_len = self.right.schema().len();
let left_pk = self.left.stream_key();
let right_pk = self.right.stream_key();
let left_pk = self.left.stream_key()?;
let right_pk = self.right.stream_key()?;
let l2i = self.l2i_col_mapping();
let r2i = self.r2i_col_mapping();
let full_out_col_num = self.internal_column_num();
Expand All @@ -110,7 +110,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {

// NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
// key.
pk_indices.and_then(|mut pk_indices| {
pk_indices.and_then(|mut pk_indices: Vec<usize>| {
let left_len = self.left.schema().len();
let right_len = self.right.schema().len();
let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone());
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Limit<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
}
}
impl<PlanRef> Limit<PlanRef> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
let mut output_pk = self.input.stream_key().to_vec();
let mut output_pk = self.input.stream_key()?.to_vec();
for part_key_idx in self
.window_functions
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
fn stream_key(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
self.input
.stream_key()
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
let i2o = self.i2o_col_mapping();
let mut pk = self
.input
.stream_key()
.stream_key()?
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Share<PlanRef> {
}

fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.borrow().stream_key().to_vec())
Some(self.input.borrow().stream_key()?.to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
&self,
schema: &Schema,
ctx: OptimizerContextRef,
stream_key: &[usize],
input_stream_key: &[usize],
vnode_col_idx: Option<usize>,
) -> TableCatalog {
let columns_fields = schema.fields().to_vec();
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
order_cols.insert(order.column_index);
});

stream_key.iter().for_each(|idx| {
input_stream_key.iter().for_each(|idx| {
if !order_cols.contains(idx) {
internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending());
order_cols.insert(*idx);
Expand Down Expand Up @@ -176,7 +176,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
if self.limit_attr.max_one_row() {
Some(self.group_key.clone())
} else {
let mut pk = self.input.stream_key().to_vec();
let mut pk = self.input.stream_key()?.to_vec();
for i in &self.group_key {
if !pk.contains(i) {
pk.push(*i);
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Union<PlanRef> {
// Union all its inputs pks + source_col if exists
let mut pk_indices = vec![];
for input in &self.inputs {
for pk in input.stream_key() {
for pk in input.stream_key()? {
if !pk_indices.contains(pk) {
pk_indices.push(*pk);
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Update<PlanRef> {

fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.stream_key().to_vec())
Some(self.input.stream_key()?.to_vec())
} else {
Some(vec![])
}
Expand Down
11 changes: 9 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{
ColIndexMapping, ColIndexMappingRewriteExt, Condition, GroupBy, IndexSet, Substitute,
};
use crate::Explain;

/// `LogicalAgg` groups input data by their group key and computes aggregation functions.
///
Expand Down Expand Up @@ -222,8 +223,14 @@ impl LogicalAgg {
// so it obeys consistent hash strategy via [`Distribution::HashShard`].
let stream_input =
if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() {
RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.stream_key())
.enforce_if_not_satisfies(stream_input, &Order::any())?
RequiredDist::shard_by_key(
stream_input.schema().len(),
stream_input.stream_key().expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
stream_input.explain_to_string()
)),
)
.enforce_if_not_satisfies(stream_input, &Order::any())?
} else {
stream_input
};
Expand Down
13 changes: 5 additions & 8 deletions src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,13 @@ impl LogicalApply {
let ctx = left.ctx();
let join_core = generic::Join::with_full_output(left, right, join_type, on);
let schema = join_core.schema();
let pk_indices = join_core.stream_key();
let (functional_dependency, pk_indices) = match pk_indices {
Some(pk_indices) => (
FunctionalDependencySet::with_key(schema.len(), &pk_indices),
pk_indices,
),
None => (FunctionalDependencySet::new(schema.len()), vec![]),
let stream_key = join_core.stream_key();
let functional_dependency = match &stream_key {
Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), &stream_key),
None => FunctionalDependencySet::new(schema.len()),
};
let (left, right, on, join_type, _output_indices) = join_core.decompose();
let base = PlanBase::new_logical(ctx, schema, pk_indices, functional_dependency);
let base = PlanBase::new_logical(ctx, schema, stream_key, functional_dependency);
LogicalApply {
base,
left,
Expand Down
7 changes: 1 addition & 6 deletions src/frontend/src/optimizer/plan_node/logical_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,8 @@ pub struct LogicalDedup {

impl LogicalDedup {
pub fn new(input: PlanRef, dedup_cols: Vec<usize>) -> Self {
let base = PlanBase::new_logical(
input.ctx(),
input.schema().clone(),
dedup_cols.clone(),
input.functional_dependency().clone(),
);
let core = generic::Dedup { input, dedup_cols };
let base = PlanBase::new_logical_with_core(&core);
LogicalDedup { base, core }
}

Expand Down
18 changes: 6 additions & 12 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
};
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition};
use crate::Explain;

/// `LogicalHopWindow` implements Hop Table Function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -63,23 +64,12 @@ impl LogicalHopWindow {
output_indices,
};

let _schema = core.schema();
let _pk_indices = core.stream_key();
let ctx = core.ctx();

// NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
// key.
// let pk_indices = match pk_indices {
// Some(pk_indices) if functional_dependency.is_key(&pk_indices) => {
// functional_dependency.minimize_key(&pk_indices)
// }
// _ => pk_indices.unwrap_or_default(),
// };

let base = PlanBase::new_logical(
ctx,
core.schema(),
core.stream_key().unwrap_or_default(),
core.stream_key(),
core.functional_dependency(),
);

Expand Down Expand Up @@ -345,6 +335,10 @@ impl ToStream for LogicalHopWindow {
output_indices.extend(
input
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
input.explain_to_string()
))
.iter()
.cloned()
.filter(|i| i2o.try_map(*i).is_none()),
Expand Down
18 changes: 17 additions & 1 deletion src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::optimizer::plan_node::{
use crate::optimizer::plan_visitor::LogicalCardinalityExt;
use crate::optimizer::property::{Distribution, Order, RequiredDist};
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, Condition, ConditionDisplay};
use crate::Explain;

/// `LogicalJoin` combines two relations according to some condition.
///
Expand Down Expand Up @@ -1396,13 +1397,21 @@ impl ToStream for LogicalJoin {
// Add missing pk indices to the logical join
let mut left_to_add = left
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
left.explain_to_string()
))
.iter()
.cloned()
.filter(|i| l2o.try_map(*i).is_none())
.collect_vec();

let mut right_to_add = right
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
right.explain_to_string()
))
.iter()
.filter(|&&i| r2o.try_map(i).is_none())
.map(|&i| i + left_len)
Expand Down Expand Up @@ -1465,12 +1474,19 @@ impl ToStream for LogicalJoin {
let left_right_stream_keys = join_with_pk
.left()
.stream_key()
.expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
left.explain_to_string()
))
.iter()
.map(|i| l2o.map(*i))
.chain(
join_with_pk
.right()
.stream_key()
.stream_key() .expect(&format!(
"should always have a stream key in the stream plan but not, sub plan: {}",
join_with_pk.right().explain_to_string()
))
.iter()
.map(|i| r2o.map(*i)),
)
Expand Down
Loading

0 comments on commit 284cfdb

Please sign in to comment.