Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): rename logical pk to stream key #12516

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub(crate) fn derive_pk(
columns: &[ColumnCatalog],
) -> (Vec<ColumnOrder>, Vec<usize>) {
// Note(congyi): avoid pk duplication
let stream_key = input.logical_pk().iter().copied().unique().collect_vec();
let stream_key = input.stream_key().iter().copied().unique().collect_vec();
let schema = input.schema();

// Assert the uniqueness of column names and IDs, including hidden columns.
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Agg<PlanRef> {
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
Some((0..self.group_key.len()).collect())
}

Expand Down Expand Up @@ -344,7 +344,7 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
window_col_idx: Option<usize>,
) -> Vec<AggCallState> {
let in_fields = self.input.schema().fields().to_vec();
let in_pks = self.input.logical_pk().to_vec();
let in_pks = self.input.stream_key().to_vec();
let in_append_only = self.input.append_only();
let in_dist_key = self.input.distribution().dist_column_indices().to_vec();

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Dedup<PlanRef> {
self.input.schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.dedup_cols.clone())
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Delete<PlanRef> {
}
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.logical_pk().to_vec())
Some(self.input.stream_key().to_vec())
} else {
Some(vec![])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for DynamicFilter<PlanRef> {
self.left.schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.left.logical_pk().to_vec())
fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.left.stream_key().to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down Expand Up @@ -151,7 +151,7 @@ pub fn infer_left_internal_table_catalog(
let mut pk_indices = vec![left_key_index];
let read_prefix_len_hint = pk_indices.len();

for i in me.logical_pk() {
for i in me.stream_key() {
if *i != left_key_index {
pk_indices.push(*i);
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/except.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Except<PlanRef> {
self.inputs[0].schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].logical_pk().to_vec())
fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
Schema::new(fields)
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
let input_schema_len = self.input.schema().len();
let mut pk_indices = self
.input
.logical_pk()
.stream_key()
.iter()
.map(|&pk| pk + input_schema_len)
.collect_vec();
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Filter<PlanRef> {
self.input.schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.input.logical_pk().to_vec())
fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
.collect()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
let window_start_index = self
.output_indices
.iter()
Expand All @@ -77,7 +77,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for HopWindow<PlanRef> {
} else {
let mut pk = self
.input
.logical_pk()
.stream_key()
.iter()
.filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx))
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Insert<PlanRef> {
}
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
None
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/intersect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Intersect<PlanRef> {
self.inputs[0].schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].logical_pk().to_vec())
fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.inputs[0].stream_key().to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Join<PlanRef> {
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
let _left_len = self.left.schema().len();
let _right_len = self.right.schema().len();
let left_pk = self.left.logical_pk();
let right_pk = self.right.logical_pk();
let left_pk = self.left.stream_key();
let right_pk = self.right.stream_key();
let l2i = self.l2i_col_mapping();
let r2i = self.r2i_col_mapping();
let full_out_col_num = self.internal_column_num();
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Limit<PlanRef> {
self.input.functional_dependency().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.input.logical_pk().to_vec())
fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.stream_key().to_vec())
}
}
impl<PlanRef> Limit<PlanRef> {
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,22 @@ pub(super) use impl_distill_unit_from_fields;

pub trait GenericPlanRef: Eq + Hash {
fn schema(&self) -> &Schema;
fn logical_pk(&self) -> &[usize];
fn stream_key(&self) -> &[usize];
fn functional_dependency(&self) -> &FunctionalDependencySet;
fn ctx(&self) -> OptimizerContextRef;
}

pub trait GenericPlanNode {
/// return (schema, `logical_pk`, fds)
/// return (schema, `stream_key`, fds)
fn logical_properties(&self) -> (Schema, Option<Vec<usize>>, FunctionalDependencySet) {
(
self.schema(),
self.logical_pk(),
self.stream_key(),
self.functional_dependency(),
)
}
fn functional_dependency(&self) -> FunctionalDependencySet;
fn schema(&self) -> Schema;
fn logical_pk(&self) -> Option<Vec<usize>>;
fn stream_key(&self) -> Option<Vec<usize>>;
fn ctx(&self) -> OptimizerContextRef;
}
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for OverWindow<PlanRef> {
schema
}

fn logical_pk(&self) -> Option<Vec<usize>> {
let mut output_pk = self.input.logical_pk().to_vec();
fn stream_key(&self) -> Option<Vec<usize>> {
let mut output_pk = self.input.stream_key().to_vec();
for part_key_idx in self
.window_functions
.iter()
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Project<PlanRef> {
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
self.input
.logical_pk()
.stream_key()
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for ProjectSet<PlanRef> {
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
let i2o = self.i2o_col_mapping();
let mut pk = self
.input
.logical_pk()
.stream_key()
.iter()
.map(|pk_col| i2o.try_map(*pk_col))
.collect::<Option<Vec<_>>>()
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl GenericPlanNode for Scan {
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc);
self.table_desc
.stream_key
Expand All @@ -325,7 +325,7 @@ impl GenericPlanNode for Scan {
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let pk_indices = self.logical_pk();
let pk_indices = self.stream_key();
let col_num = self.output_col_idx.len();
match &pk_indices {
Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices),
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Share<PlanRef> {
self.input.borrow().schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.input.borrow().logical_pk().to_vec())
fn stream_key(&self) -> Option<Vec<usize>> {
Some(self.input.borrow().stream_key().to_vec())
}

fn ctx(&self) -> OptimizerContextRef {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl GenericPlanNode for Source {
Schema { fields }
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
self.row_id_index.map(|idx| vec![idx])
}

Expand All @@ -69,7 +69,7 @@ impl GenericPlanNode for Source {
}

fn functional_dependency(&self) -> FunctionalDependencySet {
let pk_indices = self.logical_pk();
let pk_indices = self.stream_key();
match pk_indices {
Some(pk_indices) => {
FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices)
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {
self.input.schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
// We can use the group key as the stream key when there is at most one record for each
// value of the group key.
if self.limit_attr.max_one_row() {
Some(self.group_key.clone())
} else {
let mut pk = self.input.logical_pk().to_vec();
let mut pk = self.input.stream_key().to_vec();
for i in &self.group_key {
if !pk.contains(i) {
pk.push(*i);
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Union<PlanRef> {
self.inputs[0].schema().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
// Union all its inputs pks + source_col if exists
let mut pk_indices = vec![];
for input in &self.inputs {
for pk in input.logical_pk() {
for pk in input.stream_key() {
if !pk_indices.contains(pk) {
pk_indices.push(*pk);
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Update<PlanRef> {
}
}

fn logical_pk(&self) -> Option<Vec<usize>> {
fn stream_key(&self) -> Option<Vec<usize>> {
if self.returning {
Some(self.input.logical_pk().to_vec())
Some(self.input.stream_key().to_vec())
} else {
Some(vec![])
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl LogicalAgg {
// so it obeys consistent hash strategy via [`Distribution::HashShard`].
let stream_input =
if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() {
RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.logical_pk())
RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.stream_key())
.enforce_if_not_satisfies(stream_input, &Order::any())?
} else {
stream_input
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl LogicalApply {
let ctx = left.ctx();
let join_core = generic::Join::with_full_output(left, right, join_type, on);
let schema = join_core.schema();
let pk_indices = join_core.logical_pk();
let pk_indices = join_core.stream_key();
let (functional_dependency, pk_indices) = match pk_indices {
Some(pk_indices) => (
FunctionalDependencySet::with_key(schema.len(), &pk_indices),
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl LogicalHopWindow {
};

let _schema = core.schema();
let _pk_indices = core.logical_pk();
let _pk_indices = core.stream_key();
let ctx = core.ctx();

// NOTE(st1page): add join keys in the pk_indices a work around before we really have stream
Expand All @@ -79,7 +79,7 @@ impl LogicalHopWindow {
let base = PlanBase::new_logical(
ctx,
core.schema(),
core.logical_pk().unwrap_or_default(),
core.stream_key().unwrap_or_default(),
core.functional_dependency(),
);

Expand Down Expand Up @@ -344,7 +344,7 @@ impl ToStream for LogicalHopWindow {
let i2o = self.core.i2o_col_mapping();
output_indices.extend(
input
.logical_pk()
.stream_key()
.iter()
.cloned()
.filter(|i| i2o.try_map(*i).is_none()),
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,14 +1395,14 @@ impl ToStream for LogicalJoin {

// Add missing pk indices to the logical join
let mut left_to_add = left
.logical_pk()
.stream_key()
.iter()
.cloned()
.filter(|i| l2o.try_map(*i).is_none())
.collect_vec();

let mut right_to_add = right
.logical_pk()
.stream_key()
.iter()
.filter(|&&i| r2o.try_map(i).is_none())
.map(|&i| i + left_len)
Expand Down Expand Up @@ -1464,13 +1464,13 @@ impl ToStream for LogicalJoin {
.composite(&join_with_pk.core.i2o_col_mapping());
let left_right_stream_keys = join_with_pk
.left()
.logical_pk()
.stream_key()
.iter()
.map(|i| l2o.map(*i))
.chain(
join_with_pk
.right()
.logical_pk()
.stream_key()
.iter()
.map(|i| r2o.map(*i)),
)
Expand Down
Loading
Loading