From 92da5f057f4d55d95d321a87f55f0f85a27f6683 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 25 Sep 2023 16:13:07 +0800 Subject: [PATCH 1/2] rename --- .../src/optimizer/plan_node/generic/agg.rs | 4 +- .../src/optimizer/plan_node/generic/dedup.rs | 2 +- .../src/optimizer/plan_node/generic/delete.rs | 4 +- .../plan_node/generic/dynamic_filter.rs | 6 +- .../src/optimizer/plan_node/generic/except.rs | 4 +- .../src/optimizer/plan_node/generic/expand.rs | 4 +- .../src/optimizer/plan_node/generic/filter.rs | 4 +- .../optimizer/plan_node/generic/hop_window.rs | 4 +- .../src/optimizer/plan_node/generic/insert.rs | 2 +- .../optimizer/plan_node/generic/intersect.rs | 4 +- .../src/optimizer/plan_node/generic/join.rs | 6 +- .../src/optimizer/plan_node/generic/limit.rs | 4 +- .../src/optimizer/plan_node/generic/mod.rs | 6 +- .../plan_node/generic/over_window.rs | 4 +- .../optimizer/plan_node/generic/project.rs | 4 +- .../plan_node/generic/project_set.rs | 4 +- .../src/optimizer/plan_node/generic/scan.rs | 4 +- .../src/optimizer/plan_node/generic/share.rs | 4 +- .../src/optimizer/plan_node/generic/source.rs | 4 +- .../src/optimizer/plan_node/generic/top_n.rs | 4 +- .../src/optimizer/plan_node/generic/union.rs | 4 +- .../src/optimizer/plan_node/generic/update.rs | 4 +- .../src/optimizer/plan_node/logical_agg.rs | 2 +- .../src/optimizer/plan_node/logical_apply.rs | 2 +- .../optimizer/plan_node/logical_hop_window.rs | 4 +- .../src/optimizer/plan_node/logical_join.rs | 8 +-- .../optimizer/plan_node/logical_project.rs | 2 +- .../plan_node/logical_project_set.rs | 2 +- .../src/optimizer/plan_node/logical_scan.rs | 2 +- .../src/optimizer/plan_node/logical_union.rs | 10 +-- src/frontend/src/optimizer/plan_node/mod.rs | 6 +- .../src/optimizer/plan_node/plan_base.rs | 20 +++--- .../src/optimizer/plan_node/stream.rs | 12 ++-- .../src/optimizer/plan_node/stream_derive.rs | 68 +++++++++---------- .../optimizer/plan_node/stream_materialize.rs | 2 +- .../optimizer/plan_node/stream_table_scan.rs | 2 +- .../src/optimizer/plan_node/stream_topn.rs | 2 +- 37 files changed, 117 insertions(+), 117 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index 86aee0c01a7c1..b345c0b5f0206 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -195,7 +195,7 @@ impl GenericPlanNode for Agg { Schema { fields } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { Some((0..self.group_key.len()).collect()) } @@ -344,7 +344,7 @@ impl Agg { window_col_idx: Option, ) -> Vec { let in_fields = self.input.schema().fields().to_vec(); - let in_pks = self.input.logical_pk().to_vec(); + let in_pks = self.input.stream_key().to_vec(); let in_append_only = self.input.append_only(); let in_dist_key = self.input.distribution().dist_column_indices().to_vec(); diff --git a/src/frontend/src/optimizer/plan_node/generic/dedup.rs b/src/frontend/src/optimizer/plan_node/generic/dedup.rs index 89bfe14b2e9d3..bcec1b41ff7da 100644 --- a/src/frontend/src/optimizer/plan_node/generic/dedup.rs +++ b/src/frontend/src/optimizer/plan_node/generic/dedup.rs @@ -50,7 +50,7 @@ impl GenericPlanNode for Dedup { self.input.schema().clone() } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { Some(self.dedup_cols.clone()) } diff --git a/src/frontend/src/optimizer/plan_node/generic/delete.rs b/src/frontend/src/optimizer/plan_node/generic/delete.rs index 5d178b654acec..23ab1caf6f442 100644 --- a/src/frontend/src/optimizer/plan_node/generic/delete.rs +++ b/src/frontend/src/optimizer/plan_node/generic/delete.rs @@ -60,9 +60,9 @@ impl GenericPlanNode for Delete { } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { if self.returning { - Some(self.input.logical_pk().to_vec()) + Some(self.input.stream_key().to_vec()) } else { Some(vec![]) } diff --git a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs index aa8afc1779be8..1e45d8f710af2 100644 --- a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs @@ -58,8 +58,8 @@ impl GenericPlanNode for DynamicFilter { self.left.schema().clone() } - fn logical_pk(&self) -> Option> { - Some(self.left.logical_pk().to_vec()) + fn stream_key(&self) -> Option> { + Some(self.left.stream_key().to_vec()) } fn ctx(&self) -> OptimizerContextRef { @@ -151,7 +151,7 @@ pub fn infer_left_internal_table_catalog( let mut pk_indices = vec![left_key_index]; let read_prefix_len_hint = pk_indices.len(); - for i in me.logical_pk() { + for i in me.stream_key() { if *i != left_key_index { pk_indices.push(*i); } diff --git a/src/frontend/src/optimizer/plan_node/generic/except.rs b/src/frontend/src/optimizer/plan_node/generic/except.rs index 3721db69eefb8..7dc99c8290210 100644 --- a/src/frontend/src/optimizer/plan_node/generic/except.rs +++ b/src/frontend/src/optimizer/plan_node/generic/except.rs @@ -33,8 +33,8 @@ impl GenericPlanNode for Except { self.inputs[0].schema().clone() } - fn logical_pk(&self) -> Option> { - Some(self.inputs[0].logical_pk().to_vec()) + fn stream_key(&self) -> Option> { + Some(self.inputs[0].stream_key().to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/expand.rs b/src/frontend/src/optimizer/plan_node/generic/expand.rs index d78bd4c112028..55624b04baa99 100644 --- a/src/frontend/src/optimizer/plan_node/generic/expand.rs +++ b/src/frontend/src/optimizer/plan_node/generic/expand.rs @@ -57,11 +57,11 @@ impl GenericPlanNode for Expand { Schema::new(fields) } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { let input_schema_len = self.input.schema().len(); let mut pk_indices = self .input - .logical_pk() + .stream_key() .iter() .map(|&pk| pk + input_schema_len) .collect_vec(); diff --git a/src/frontend/src/optimizer/plan_node/generic/filter.rs b/src/frontend/src/optimizer/plan_node/generic/filter.rs index 2f6542cb19a9a..bac03135c4214 100644 --- a/src/frontend/src/optimizer/plan_node/generic/filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/filter.rs @@ -53,8 +53,8 @@ impl GenericPlanNode for Filter { self.input.schema().clone() } - fn logical_pk(&self) -> Option> { - Some(self.input.logical_pk().to_vec()) + fn stream_key(&self) -> Option> { + Some(self.input.stream_key().to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs index 131e7c6f1455e..37d9401ee194d 100644 --- a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs @@ -63,7 +63,7 @@ impl GenericPlanNode for HopWindow { .collect() } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { let window_start_index = self .output_indices .iter() @@ -77,7 +77,7 @@ impl GenericPlanNode for HopWindow { } else { let mut pk = self .input - .logical_pk() + .stream_key() .iter() .filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx)) .collect_vec(); diff --git a/src/frontend/src/optimizer/plan_node/generic/insert.rs b/src/frontend/src/optimizer/plan_node/generic/insert.rs index c5bfeb725ff83..727e0296e0c83 100644 --- a/src/frontend/src/optimizer/plan_node/generic/insert.rs +++ b/src/frontend/src/optimizer/plan_node/generic/insert.rs @@ -61,7 +61,7 @@ impl GenericPlanNode for Insert { } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { None } diff --git a/src/frontend/src/optimizer/plan_node/generic/intersect.rs b/src/frontend/src/optimizer/plan_node/generic/intersect.rs index 3b781eeb37fbd..84c34e0b8d507 100644 --- a/src/frontend/src/optimizer/plan_node/generic/intersect.rs +++ b/src/frontend/src/optimizer/plan_node/generic/intersect.rs @@ -32,8 +32,8 @@ impl GenericPlanNode for Intersect { self.inputs[0].schema().clone() } - fn logical_pk(&self) -> Option> { - Some(self.inputs[0].logical_pk().to_vec()) + fn stream_key(&self) -> Option> { + Some(self.inputs[0].stream_key().to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/join.rs b/src/frontend/src/optimizer/plan_node/generic/join.rs index 47c6b66286d98..d4cc84376599f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/join.rs +++ b/src/frontend/src/optimizer/plan_node/generic/join.rs @@ -90,11 +90,11 @@ impl GenericPlanNode for Join { Schema { fields } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { let _left_len = self.left.schema().len(); let _right_len = self.right.schema().len(); - let left_pk = self.left.logical_pk(); - let right_pk = self.right.logical_pk(); + let left_pk = self.left.stream_key(); + let right_pk = self.right.stream_key(); let l2i = self.l2i_col_mapping(); let r2i = self.r2i_col_mapping(); let full_out_col_num = self.internal_column_num(); diff --git a/src/frontend/src/optimizer/plan_node/generic/limit.rs b/src/frontend/src/optimizer/plan_node/generic/limit.rs index 060fdf47cbda4..2773ea325285e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/limit.rs +++ b/src/frontend/src/optimizer/plan_node/generic/limit.rs @@ -42,8 +42,8 @@ impl GenericPlanNode for Limit { self.input.functional_dependency().clone() } - fn logical_pk(&self) -> Option> { - Some(self.input.logical_pk().to_vec()) + fn stream_key(&self) -> Option> { + Some(self.input.stream_key().to_vec()) } } impl Limit { diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 032eaa40fcda2..d5d2b2ea2d448 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -86,7 +86,7 @@ pub(super) use impl_distill_unit_from_fields; pub trait GenericPlanRef: Eq + Hash { fn schema(&self) -> &Schema; - fn logical_pk(&self) -> &[usize]; + fn stream_key(&self) -> &[usize]; fn functional_dependency(&self) -> &FunctionalDependencySet; fn ctx(&self) -> OptimizerContextRef; } @@ -96,12 +96,12 @@ pub trait GenericPlanNode { fn logical_properties(&self) -> (Schema, Option>, FunctionalDependencySet) { ( self.schema(), - self.logical_pk(), + self.stream_key(), self.functional_dependency(), ) } fn functional_dependency(&self) -> FunctionalDependencySet; fn schema(&self) -> Schema; - fn logical_pk(&self) -> Option>; + fn stream_key(&self) -> Option>; fn ctx(&self) -> OptimizerContextRef; } diff --git a/src/frontend/src/optimizer/plan_node/generic/over_window.rs b/src/frontend/src/optimizer/plan_node/generic/over_window.rs index c148711698a24..cf7bc028ff82f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/over_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/over_window.rs @@ -218,8 +218,8 @@ impl GenericPlanNode for OverWindow { schema } - fn logical_pk(&self) -> Option> { - let mut output_pk = self.input.logical_pk().to_vec(); + fn stream_key(&self) -> Option> { + let mut output_pk = self.input.stream_key().to_vec(); for part_key_idx in self .window_functions .iter() diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index 1d35332f0e709..ca1dc7ae4468f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -97,10 +97,10 @@ impl GenericPlanNode for Project { Schema { fields } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { let i2o = self.i2o_col_mapping(); self.input - .logical_pk() + .stream_key() .iter() .map(|pk_col| i2o.try_map(*pk_col)) .collect::>>() diff --git a/src/frontend/src/optimizer/plan_node/generic/project_set.rs b/src/frontend/src/optimizer/plan_node/generic/project_set.rs index e159927f7bfda..3e5d5585f782c 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project_set.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project_set.rs @@ -88,11 +88,11 @@ impl GenericPlanNode for ProjectSet { Schema { fields } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { let i2o = self.i2o_col_mapping(); let mut pk = self .input - .logical_pk() + .stream_key() .iter() .map(|pk_col| i2o.try_map(*pk_col)) .collect::>>() diff --git a/src/frontend/src/optimizer/plan_node/generic/scan.rs b/src/frontend/src/optimizer/plan_node/generic/scan.rs index 526a677e279b5..b7de99d11096b 100644 --- a/src/frontend/src/optimizer/plan_node/generic/scan.rs +++ b/src/frontend/src/optimizer/plan_node/generic/scan.rs @@ -307,7 +307,7 @@ impl GenericPlanNode for Scan { Schema { fields } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { let id_to_op_idx = Self::get_id_to_op_idx_mapping(&self.output_col_idx, &self.table_desc); self.table_desc .stream_key @@ -325,7 +325,7 @@ impl GenericPlanNode for Scan { } fn functional_dependency(&self) -> FunctionalDependencySet { - let pk_indices = self.logical_pk(); + let pk_indices = self.stream_key(); let col_num = self.output_col_idx.len(); match &pk_indices { Some(pk_indices) => FunctionalDependencySet::with_key(col_num, pk_indices), diff --git a/src/frontend/src/optimizer/plan_node/generic/share.rs b/src/frontend/src/optimizer/plan_node/generic/share.rs index c22a46357fa66..d9c32b6a28f6f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/share.rs +++ b/src/frontend/src/optimizer/plan_node/generic/share.rs @@ -43,8 +43,8 @@ impl GenericPlanNode for Share { self.input.borrow().schema().clone() } - fn logical_pk(&self) -> Option> { - Some(self.input.borrow().logical_pk().to_vec()) + fn stream_key(&self) -> Option> { + Some(self.input.borrow().stream_key().to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/source.rs b/src/frontend/src/optimizer/plan_node/generic/source.rs index 4a4f092110dee..4d508cc37894e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/source.rs +++ b/src/frontend/src/optimizer/plan_node/generic/source.rs @@ -60,7 +60,7 @@ impl GenericPlanNode for Source { Schema { fields } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { self.row_id_index.map(|idx| vec![idx]) } @@ -69,7 +69,7 @@ impl GenericPlanNode for Source { } fn functional_dependency(&self) -> FunctionalDependencySet { - let pk_indices = self.logical_pk(); + let pk_indices = self.stream_key(); match pk_indices { Some(pk_indices) => { FunctionalDependencySet::with_key(self.column_catalog.len(), &pk_indices) diff --git a/src/frontend/src/optimizer/plan_node/generic/top_n.rs b/src/frontend/src/optimizer/plan_node/generic/top_n.rs index 990e37219d19b..dcca35f5a5d9f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/top_n.rs +++ b/src/frontend/src/optimizer/plan_node/generic/top_n.rs @@ -170,13 +170,13 @@ impl GenericPlanNode for TopN { self.input.schema().clone() } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { // We can use the group key as the stream key when there is at most one record for each // value of the group key. if self.limit_attr.max_one_row() { Some(self.group_key.clone()) } else { - let mut pk = self.input.logical_pk().to_vec(); + let mut pk = self.input.stream_key().to_vec(); for i in &self.group_key { if !pk.contains(i) { pk.push(*i); diff --git a/src/frontend/src/optimizer/plan_node/generic/union.rs b/src/frontend/src/optimizer/plan_node/generic/union.rs index 91f10eac749f0..b8a1d9e2ec5e3 100644 --- a/src/frontend/src/optimizer/plan_node/generic/union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/union.rs @@ -36,11 +36,11 @@ impl GenericPlanNode for Union { self.inputs[0].schema().clone() } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { // Union all its inputs pks + source_col if exists let mut pk_indices = vec![]; for input in &self.inputs { - for pk in input.logical_pk() { + for pk in input.stream_key() { if !pk_indices.contains(pk) { pk_indices.push(*pk); } diff --git a/src/frontend/src/optimizer/plan_node/generic/update.rs b/src/frontend/src/optimizer/plan_node/generic/update.rs index 0943d770daaef..f832806e7cf9a 100644 --- a/src/frontend/src/optimizer/plan_node/generic/update.rs +++ b/src/frontend/src/optimizer/plan_node/generic/update.rs @@ -62,9 +62,9 @@ impl GenericPlanNode for Update { } } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { if self.returning { - Some(self.input.logical_pk().to_vec()) + Some(self.input.stream_key().to_vec()) } else { Some(vec![]) } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index a2099b7d33f81..e24f060fd9e8a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -222,7 +222,7 @@ impl LogicalAgg { // so it obeys consistent hash strategy via [`Distribution::HashShard`]. let stream_input = if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() { - RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.logical_pk()) + RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.stream_key()) .enforce_if_not_satisfies(stream_input, &Order::any())? } else { stream_input diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs index 0ea21532458fe..3ab29bf4ff4d9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_apply.rs +++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs @@ -86,7 +86,7 @@ impl LogicalApply { let ctx = left.ctx(); let join_core = generic::Join::with_full_output(left, right, join_type, on); let schema = join_core.schema(); - let pk_indices = join_core.logical_pk(); + let pk_indices = join_core.stream_key(); let (functional_dependency, pk_indices) = match pk_indices { Some(pk_indices) => ( FunctionalDependencySet::with_key(schema.len(), &pk_indices), diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 3905daaaf9f85..cd80b3a20ff01 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -64,7 +64,7 @@ impl LogicalHopWindow { }; let _schema = core.schema(); - let _pk_indices = core.logical_pk(); + let _pk_indices = core.stream_key(); let ctx = core.ctx(); // NOTE(st1page): add join keys in the pk_indices a work around before we really have stream @@ -79,7 +79,7 @@ impl LogicalHopWindow { let base = PlanBase::new_logical( ctx, core.schema(), - core.logical_pk().unwrap_or_default(), + core.stream_key().unwrap_or_default(), core.functional_dependency(), ); diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 640b31170c546..a9e398350f8d4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1395,14 +1395,14 @@ impl ToStream for LogicalJoin { // Add missing pk indices to the logical join let mut left_to_add = left - .logical_pk() + .stream_key() .iter() .cloned() .filter(|i| l2o.try_map(*i).is_none()) .collect_vec(); let mut right_to_add = right - .logical_pk() + .stream_key() .iter() .filter(|&&i| r2o.try_map(i).is_none()) .map(|&i| i + left_len) @@ -1464,13 +1464,13 @@ impl ToStream for LogicalJoin { .composite(&join_with_pk.core.i2o_col_mapping()); let left_right_stream_keys = join_with_pk .left() - .logical_pk() + .stream_key() .iter() .map(|i| l2o.map(*i)) .chain( join_with_pk .right() - .logical_pk() + .stream_key() .iter() .map(|i| r2o.map(*i)), ) diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index 6e1f097e3c7d2..0eea08b6ac3ad 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -264,7 +264,7 @@ impl ToStream for LogicalProject { let (proj, out_col_change) = self.rewrite_with_input(input.clone(), input_col_change); // Add missing columns of input_pk into the select list. - let input_pk = input.logical_pk(); + let input_pk = input.stream_key(); let i2o = proj.i2o_col_mapping(); let col_need_to_add = input_pk .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 12100631ca666..50ad246b5bb09 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -363,7 +363,7 @@ impl ToStream for LogicalProjectSet { self.rewrite_with_input(input.clone(), input_col_change); // Add missing columns of input_pk into the select list. - let input_pk = input.logical_pk(); + let input_pk = input.stream_key(); let i2o = self.core.i2o_col_mapping(); let col_need_to_add = input_pk .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index e671f7412c661..8ee3e6504f185 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -549,7 +549,7 @@ impl ToStream for LogicalScan { None.into(), ))); } - match self.base.logical_pk.is_empty() { + match self.base.stream_key.is_empty() { true => { let mut col_ids = HashSet::new(); diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index 38ef55405693b..2538450606772 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -139,7 +139,7 @@ impl ToBatch for LogicalUnion { impl ToStream for LogicalUnion { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { // TODO: use round robin distribution instead of using hash distribution of all inputs. - let dist = RequiredDist::hash_shard(self.base.logical_pk()); + let dist = RequiredDist::hash_shard(self.base.stream_key()); let new_inputs: Result> = self .inputs() .iter() @@ -174,7 +174,7 @@ impl ToStream for LogicalUnion { .map(|x| col_index_mapping.map(x)) .collect_vec(); new_input - .logical_pk() + .stream_key() .iter() .all(|x| original_schema_new_pos.contains(x)) }); @@ -223,7 +223,7 @@ impl ToStream for LogicalUnion { .iter() .flat_map(|(new_input, _)| { new_input - .logical_pk() + .stream_key() .iter() .map(|x| new_input.schema().fields[*x].data_type()) }) @@ -234,7 +234,7 @@ impl ToStream for LogicalUnion { .collect_vec(); let input_pk_lens = rewrites .iter() - .map(|(new_input, _)| new_input.logical_pk().len()) + .map(|(new_input, _)| new_input.stream_key().len()) .collect_vec(); let mut input_pk_offsets = vec![0]; for (i, len) in input_pk_lens.into_iter().enumerate() { @@ -258,7 +258,7 @@ impl ToStream for LogicalUnion { .collect_vec(); // input1_pk + input2_pk + ... let mut input_pks = input_pk_nulls.clone(); - for (j, pk_idx) in new_input.logical_pk().iter().enumerate() { + for (j, pk_idx) in new_input.stream_key().iter().enumerate() { input_pks[input_pk_offsets[i] + j] = ExprImpl::InputRef( InputRef::new( *pk_idx, diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 926cf85048f3e..4c464f02484bf 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -435,8 +435,8 @@ impl GenericPlanRef for PlanRef { &self.plan_base().schema } - fn logical_pk(&self) -> &[usize] { - &self.plan_base().logical_pk + fn stream_key(&self) -> &[usize] { + &self.plan_base().stream_key } fn ctx(&self) -> OptimizerContextRef { @@ -515,7 +515,7 @@ impl dyn PlanNode { } pub fn logical_pk(&self) -> &[usize] { - &self.plan_base().logical_pk + &self.plan_base().stream_key } pub fn order(&self) -> &Order { diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 41dd857282fbe..2a8c651cc49e3 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -36,7 +36,7 @@ pub struct PlanBase { pub ctx: OptimizerContextRef, pub schema: Schema, /// the pk indices of the PlanNode's output, a empty logical_pk vec means there is no pk - pub logical_pk: Vec, + pub stream_key: Vec, /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect /// correctness, but insert unnecessary sort in plan pub order: Order, @@ -59,8 +59,8 @@ impl generic::GenericPlanRef for PlanBase { &self.schema } - fn logical_pk(&self) -> &[usize] { - &self.logical_pk + fn stream_key(&self) -> &[usize] { + &self.stream_key } fn ctx(&self) -> OptimizerContextRef { @@ -103,7 +103,7 @@ impl PlanBase { id, ctx, schema, - logical_pk, + stream_key: logical_pk, dist: Distribution::Single, order: Order::any(), // Logical plan node won't touch `append_only` field @@ -118,7 +118,7 @@ impl PlanBase { Self::new_logical( node.ctx(), node.schema(), - node.logical_pk().unwrap_or_default(), + node.stream_key().unwrap_or_default(), node.functional_dependency(), ) } @@ -133,7 +133,7 @@ impl PlanBase { Self::new_stream( logical.ctx(), logical.schema(), - logical.logical_pk().unwrap_or_default().to_vec(), + logical.stream_key().unwrap_or_default().to_vec(), logical.functional_dependency(), dist, append_only, @@ -160,7 +160,7 @@ impl PlanBase { schema, dist, order: Order::any(), - logical_pk, + stream_key: logical_pk, append_only, emit_on_window_close, functional_dependency, @@ -191,7 +191,7 @@ impl PlanBase { schema, dist, order, - logical_pk: vec![], + stream_key: vec![], // Batch plan node won't touch `append_only` field append_only: true, emit_on_window_close: false, // TODO(rc): batch EOWC support? @@ -204,7 +204,7 @@ impl PlanBase { PlanBase::new_stream( plan_node.ctx(), plan_node.schema().clone(), - plan_node.logical_pk().to_vec(), + plan_node.stream_key().to_vec(), plan_node.functional_dependency().clone(), plan_node.distribution().clone(), plan_node.append_only(), @@ -234,7 +234,7 @@ macro_rules! impl_base_delegate { &self.plan_base().schema } pub fn logical_pk(&self) -> &[usize] { - &self.plan_base().logical_pk + &self.plan_base().stream_key } pub fn order(&self) -> &Order { &self.plan_base().order diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 828f509351b37..c10101c8e3a12 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -76,7 +76,7 @@ pub trait StreamPlanNode: GenericPlanNode { id: ctx.next_plan_node_id(), ctx, schema: self.schema(), - logical_pk: self.logical_pk().unwrap_or_default(), + logical_pk: self.stream_key().unwrap_or_default(), dist: self.distribution(), append_only: self.append_only(), emit_on_window_close: self.emit_on_window_close(), @@ -95,7 +95,7 @@ impl generic::GenericPlanRef for PlanRef { &self.0.schema } - fn logical_pk(&self) -> &[usize] { + fn stream_key(&self) -> &[usize] { &self.0.logical_pk } @@ -113,7 +113,7 @@ impl generic::GenericPlanRef for PlanBase { &self.schema } - fn logical_pk(&self) -> &[usize] { + fn stream_key(&self) -> &[usize] { &self.logical_pk } @@ -266,7 +266,7 @@ impl HashJoin { // dedup the pk in dist key.. let mut deduped_input_pk_indices = vec![]; - for input_pk_idx in input.logical_pk() { + for input_pk_idx in input.stream_key() { if !pk_indices.contains(input_pk_idx) && !deduped_input_pk_indices.contains(input_pk_idx) { @@ -608,7 +608,7 @@ pub fn to_stream_prost_body( .infer_internal_table_catalog( input.schema(), input.ctx(), - input.logical_pk(), + input.stream_key(), me.vnode_col_idx, ) .with_id(state.gen_table_id_wrapped()); @@ -765,7 +765,7 @@ pub fn to_stream_prost_body( me.infer_internal_table_catalog( input.schema(), input.ctx(), - input.logical_pk(), + input.stream_key(), None, ) .with_id(state.gen_table_id_wrapped()) diff --git a/src/frontend/src/optimizer/plan_node/stream_derive.rs b/src/frontend/src/optimizer/plan_node/stream_derive.rs index 404ca04a39fab..f3da2b1b6a1dd 100644 --- a/src/frontend/src/optimizer/plan_node/stream_derive.rs +++ b/src/frontend/src/optimizer/plan_node/stream_derive.rs @@ -25,7 +25,7 @@ impl GenericPlanNode for DynamicFilter { todo!("new plan node derivation") } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { todo!("new plan node derivation") } @@ -56,7 +56,7 @@ impl GenericPlanNode for Exchange { todo!("new plan node derivation") } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { todo!("new plan node derivation") } @@ -88,8 +88,8 @@ impl GenericPlanNode for DeltaJoin { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -120,8 +120,8 @@ impl GenericPlanNode for Expand { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -152,8 +152,8 @@ impl GenericPlanNode for Filter { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -184,8 +184,8 @@ impl GenericPlanNode for SimpleAgg { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -216,8 +216,8 @@ impl GenericPlanNode for GroupTopN { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -248,8 +248,8 @@ impl GenericPlanNode for HashAgg { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -280,8 +280,8 @@ impl GenericPlanNode for HashJoin { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -312,8 +312,8 @@ impl GenericPlanNode for HopWindow { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -344,8 +344,8 @@ impl GenericPlanNode for IndexScan { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -376,8 +376,8 @@ impl GenericPlanNode for StatelessSimpleAgg { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -408,7 +408,7 @@ impl GenericPlanNode for Materialize { todo!("new plan node derivation") } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { todo!("new plan node derivation") } @@ -440,8 +440,8 @@ impl GenericPlanNode for ProjectSet { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -472,8 +472,8 @@ impl GenericPlanNode for Project { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -506,7 +506,7 @@ impl GenericPlanNode for Sink { todo!("new plan node derivation") } - fn logical_pk(&self) -> Option> { + fn stream_key(&self) -> Option> { todo!("new plan node derivation") } @@ -538,8 +538,8 @@ impl GenericPlanNode for Source { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -570,8 +570,8 @@ impl GenericPlanNode for TableScan { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -602,8 +602,8 @@ impl GenericPlanNode for TopN { self.core.schema() } - fn logical_pk(&self) -> Option> { - self.core.logical_pk() + fn stream_key(&self) -> Option> { + self.core.stream_key() } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 66be991bae95c..37ae226cbe21a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -292,7 +292,7 @@ impl PlanTreeNodeUnary for StreamMaterialize { assert_eq!(a.type_name, b.type_name); assert_eq!(a.sub_fields, b.sub_fields); }); - assert_eq!(new.plan_base().logical_pk, self.plan_base().logical_pk); + assert_eq!(new.plan_base().stream_key, self.plan_base().stream_key); new } } diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 131b58795cbe4..fd542c7fbb579 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -227,7 +227,7 @@ impl StreamTableScan { pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode { use risingwave_pb::stream_plan::*; - let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec(); + let stream_key = self.base.stream_key.iter().map(|x| *x as u32).collect_vec(); // The required columns from the table (both scan and upstream). let upstream_column_ids = match self.chain_type { diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index 83ab3820bd376..d5f9de6309781 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -43,7 +43,7 @@ impl StreamTopN { let mut base = PlanBase::new_stream_with_logical(&logical, dist, false, false, watermark_columns); if let Some(stream_key) = stream_key { - base.logical_pk = stream_key; + base.stream_key = stream_key; } StreamTopN { base, logical } } From fd4f89f60aebbd4891e07f7f6b56a85430daa5e0 Mon Sep 17 00:00:00 2001 From: st1page <1245835950@qq.com> Date: Mon, 25 Sep 2023 16:49:57 +0800 Subject: [PATCH 2/2] rename --- src/frontend/src/optimizer/plan_node/derive.rs | 2 +- src/frontend/src/optimizer/plan_node/generic/mod.rs | 2 +- .../src/optimizer/plan_node/logical_hop_window.rs | 2 +- .../src/optimizer/plan_node/logical_multi_join.rs | 2 +- src/frontend/src/optimizer/plan_node/logical_topn.rs | 2 +- src/frontend/src/optimizer/plan_node/mod.rs | 4 ++-- src/frontend/src/optimizer/plan_node/plan_base.rs | 12 ++++++------ src/frontend/src/optimizer/plan_node/stream.rs | 8 ++++---- src/frontend/src/optimizer/plan_node/stream_dml.rs | 2 +- .../optimizer/plan_node/stream_eowc_over_window.rs | 2 +- .../src/optimizer/plan_node/stream_exchange.rs | 4 ++-- .../src/optimizer/plan_node/stream_group_topn.rs | 2 +- .../src/optimizer/plan_node/stream_materialize.rs | 2 +- .../src/optimizer/plan_node/stream_over_window.rs | 2 +- .../src/optimizer/plan_node/stream_row_id_gen.rs | 2 +- src/frontend/src/optimizer/plan_node/stream_share.rs | 2 +- src/frontend/src/optimizer/plan_node/stream_sink.rs | 2 +- src/frontend/src/optimizer/plan_node/stream_sort.rs | 6 +++--- .../src/optimizer/plan_node/stream_table_scan.rs | 2 +- src/frontend/src/optimizer/plan_node/stream_topn.rs | 4 ++-- .../src/optimizer/plan_node/stream_values.rs | 2 +- .../optimizer/plan_node/stream_watermark_filter.rs | 2 +- .../optimizer/plan_visitor/cardinality_visitor.rs | 2 +- 23 files changed, 36 insertions(+), 36 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/derive.rs b/src/frontend/src/optimizer/plan_node/derive.rs index fe2bd3e2da325..7153f8e6a8943 100644 --- a/src/frontend/src/optimizer/plan_node/derive.rs +++ b/src/frontend/src/optimizer/plan_node/derive.rs @@ -82,7 +82,7 @@ pub(crate) fn derive_pk( columns: &[ColumnCatalog], ) -> (Vec, Vec) { // Note(congyi): avoid pk duplication - let stream_key = input.logical_pk().iter().copied().unique().collect_vec(); + let stream_key = input.stream_key().iter().copied().unique().collect_vec(); let schema = input.schema(); // Assert the uniqueness of column names and IDs, including hidden columns. diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index d5d2b2ea2d448..cc02763f26f98 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -92,7 +92,7 @@ pub trait GenericPlanRef: Eq + Hash { } pub trait GenericPlanNode { - /// return (schema, `logical_pk`, fds) + /// return (schema, `stream_key`, fds) fn logical_properties(&self) -> (Schema, Option>, FunctionalDependencySet) { ( self.schema(), diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index cd80b3a20ff01..73fd3730b0c4c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -344,7 +344,7 @@ impl ToStream for LogicalHopWindow { let i2o = self.core.i2o_col_mapping(); output_indices.extend( input - .logical_pk() + .stream_key() .iter() .cloned() .filter(|i| i2o.try_map(*i).is_none()), diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index b3d61cd495fb9..2a74d227dc86c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -246,7 +246,7 @@ impl LogicalMultiJoin { let pk_indices = { let mut pk_indices = vec![]; - for (i, input_pk) in inputs.iter().map(|input| input.logical_pk()).enumerate() { + for (i, input_pk) in inputs.iter().map(|input| input.stream_key()).enumerate() { for input_pk_idx in input_pk { pk_indices.push(inner_i2o_mappings[i].map(*input_pk_idx)); } diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index 8d9d446c0900f..1e4c6113f2ee7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -177,7 +177,7 @@ impl LogicalTopN { ); // TODO(st1page): solve it - let global_top_n = StreamTopN::with_stream_key(global_top_n, self.logical_pk().to_vec()); + let global_top_n = StreamTopN::with_stream_key(global_top_n, self.stream_key().to_vec()); // use another projection to remove the column we added before. exprs.pop(); diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 4c464f02484bf..189ab9a0f1a6d 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -514,7 +514,7 @@ impl dyn PlanNode { &self.plan_base().schema } - pub fn logical_pk(&self) -> &[usize] { + pub fn stream_key(&self) -> &[usize] { &self.plan_base().stream_key } @@ -566,7 +566,7 @@ impl dyn PlanNode { identity: self.explain_myself_to_string(), node_body: node, operator_id: self.id().0 as _, - stream_key: self.logical_pk().iter().map(|x| *x as u32).collect(), + stream_key: self.stream_key().iter().map(|x| *x as u32).collect(), fields: self.schema().to_prost(), append_only: self.append_only(), } diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 2a8c651cc49e3..1ca7f513cb3ab 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -35,7 +35,7 @@ pub struct PlanBase { #[educe(Hash(ignore))] pub ctx: OptimizerContextRef, pub schema: Schema, - /// the pk indices of the PlanNode's output, a empty logical_pk vec means there is no pk + /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key pub stream_key: Vec, /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect /// correctness, but insert unnecessary sort in plan @@ -94,7 +94,7 @@ impl PlanBase { pub fn new_logical( ctx: OptimizerContextRef, schema: Schema, - logical_pk: Vec, + stream_key: Vec, functional_dependency: FunctionalDependencySet, ) -> Self { let id = ctx.next_plan_node_id(); @@ -103,7 +103,7 @@ impl PlanBase { id, ctx, schema, - stream_key: logical_pk, + stream_key, dist: Distribution::Single, order: Order::any(), // Logical plan node won't touch `append_only` field @@ -145,7 +145,7 @@ impl PlanBase { pub fn new_stream( ctx: OptimizerContextRef, schema: Schema, - logical_pk: Vec, + stream_key: Vec, functional_dependency: FunctionalDependencySet, dist: Distribution, append_only: bool, @@ -160,7 +160,7 @@ impl PlanBase { schema, dist, order: Order::any(), - stream_key: logical_pk, + stream_key, append_only, emit_on_window_close, functional_dependency, @@ -233,7 +233,7 @@ macro_rules! impl_base_delegate { pub fn schema(&self) -> &Schema { &self.plan_base().schema } - pub fn logical_pk(&self) -> &[usize] { + pub fn stream_key(&self) -> &[usize] { &self.plan_base().stream_key } pub fn order(&self) -> &Order { diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index c10101c8e3a12..598318ed22d0c 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -76,7 +76,7 @@ pub trait StreamPlanNode: GenericPlanNode { id: ctx.next_plan_node_id(), ctx, schema: self.schema(), - logical_pk: self.stream_key().unwrap_or_default(), + stream_key: self.stream_key().unwrap_or_default(), dist: self.distribution(), append_only: self.append_only(), emit_on_window_close: self.emit_on_window_close(), @@ -96,7 +96,7 @@ impl generic::GenericPlanRef for PlanRef { } fn stream_key(&self) -> &[usize] { - &self.0.logical_pk + &self.0.stream_key } fn ctx(&self) -> OptimizerContextRef { @@ -114,7 +114,7 @@ impl generic::GenericPlanRef for PlanBase { } fn stream_key(&self) -> &[usize] { - &self.logical_pk + &self.stream_key } fn ctx(&self) -> OptimizerContextRef { @@ -410,7 +410,7 @@ pub struct PlanBase { #[educe(Hash(ignore))] pub ctx: OptimizerContextRef, pub schema: Schema, - pub logical_pk: Vec, + pub stream_key: Vec, #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] pub dist: Distribution, diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index c576d5e2d83d3..febf550049265 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -33,7 +33,7 @@ impl StreamDml { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.logical_pk().to_vec(), + input.stream_key().to_vec(), input.functional_dependency().clone(), input.distribution().clone(), append_only, diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index dea3d0eb49889..6b13b88a38bf2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -101,7 +101,7 @@ impl StreamEowcOverWindow { tbl_builder.add_order_column(order_key_index, OrderType::ascending()); order_cols.insert(order_key_index); } - for idx in self.logical.input.logical_pk() { + for idx in self.logical.input.stream_key() { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index b5da1804aee71..b70b127a5522c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -37,7 +37,7 @@ impl StreamExchange { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.logical_pk().to_vec(), + input.stream_key().to_vec(), input.functional_dependency().clone(), dist, input.append_only(), @@ -53,7 +53,7 @@ impl StreamExchange { pub fn new_no_shuffle(input: PlanRef) -> Self { let ctx = input.ctx(); - let pk_indices = input.logical_pk().to_vec(); + let pk_indices = input.stream_key().to_vec(); // Dispatch executor won't change the append-only behavior of the stream. let base = PlanBase::new_stream( ctx, diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 761807cba1ae7..93209aa0b8800 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -93,7 +93,7 @@ impl StreamNode for StreamGroupTopN { .infer_internal_table_catalog( input.schema(), input.ctx(), - input.logical_pk(), + input.stream_key(), self.vnode_col_idx, ) .with_id(state.gen_table_id_wrapped()); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 37ae226cbe21a..4b85612678f94 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -140,7 +140,7 @@ impl StreamMaterialize { TableType::MaterializedView => { assert_matches!(user_distributed_by, RequiredDist::Any); // ensure the same pk will not shuffle to different node - RequiredDist::shard_by_key(input.schema().len(), input.logical_pk()) + RequiredDist::shard_by_key(input.schema().len(), input.stream_key()) } TableType::Index => { assert_matches!( diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index b07c75a1f261e..e77145bac748d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -68,7 +68,7 @@ impl StreamOverWindow { tbl_builder.add_order_column(o.column_index, o.order_type); } } - for &idx in self.logical.input.logical_pk() { + for &idx in self.logical.input.stream_key() { if order_cols.insert(idx) { tbl_builder.add_order_column(idx, OrderType::ascending()); } diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index 1562306825dfd..a2a5f834400a5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -40,7 +40,7 @@ impl StreamRowIdGen { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.logical_pk().to_vec(), + input.stream_key().to_vec(), input.functional_dependency().clone(), distribution, input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 93e8030d49566..795cdcbfda06b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -96,7 +96,7 @@ impl StreamShare { identity: self.distill_to_string(), node_body: Some(node_body), operator_id: self.id().0 as _, - stream_key: self.logical_pk().iter().map(|x| *x as u32).collect(), + stream_key: self.stream_key().iter().map(|x| *x as u32).collect(), fields: self.schema().to_prost(), append_only: self.append_only(), }; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 60e89abf3f5c9..4b9250c4b23f5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -138,7 +138,7 @@ impl StreamSink { } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); - RequiredDist::shard_by_key(input.schema().len(), input.logical_pk()) + RequiredDist::shard_by_key(input.schema().len(), input.stream_key()) } } } diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index ea130a18af721..9ade7a165500a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -48,7 +48,7 @@ impl StreamEowcSort { assert!(input.watermark_columns().contains(sort_column_index)); let schema = input.schema().clone(); - let logical_pk = input.logical_pk().to_vec(); + let stream_key = input.stream_key().to_vec(); let fd_set = input.functional_dependency().clone(); let dist = input.distribution().clone(); let mut watermark_columns = FixedBitSet::with_capacity(input.schema().len()); @@ -56,7 +56,7 @@ impl StreamEowcSort { let base = PlanBase::new_stream( input.ctx(), schema, - logical_pk, + stream_key, fd_set, dist, true, @@ -92,7 +92,7 @@ impl StreamEowcSort { } } - for idx in self.input.logical_pk() { + for idx in self.input.stream_key() { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index fd542c7fbb579..bc841a3efd5cf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -202,7 +202,7 @@ impl Distill for StreamTableScan { if verbose { let pk = IndicesDisplay { - indices: self.logical_pk(), + indices: self.stream_key(), schema: &self.base.schema, }; vec.push(("pk", pk.distill())); diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index d5f9de6309781..f9be4149ea06b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -86,7 +86,7 @@ impl PlanTreeNodeUnary for StreamTopN { fn clone_with_input(&self, input: PlanRef) -> Self { let mut logical = self.logical.clone(); logical.input = input; - Self::new_inner(logical, Some(self.logical_pk().to_vec())) + Self::new_inner(logical, Some(self.stream_key().to_vec())) } } @@ -106,7 +106,7 @@ impl StreamNode for StreamTopN { .infer_internal_table_catalog( input.schema(), input.ctx(), - input.logical_pk(), + input.stream_key(), None, ) .with_id(state.gen_table_id_wrapped()) diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index f4ad303cdf2b6..1f764ba37c305 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -40,7 +40,7 @@ impl StreamValues { let base = PlanBase::new_stream( ctx, logical.schema().clone(), - logical.logical_pk().to_vec(), + logical.stream_key().to_vec(), logical.functional_dependency().clone(), Distribution::Single, true, diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index 0ca939a207926..2897d67ec5dea 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -43,7 +43,7 @@ impl StreamWatermarkFilter { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.logical_pk().to_vec(), + input.stream_key().to_vec(), input.functional_dependency().clone(), input.distribution().clone(), input.append_only(), diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index e563dcf2b41c3..e32385be59d7b 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -36,7 +36,7 @@ impl CardinalityVisitor { input_card: Cardinality, eq_set: HashSet, ) -> Cardinality { - let mut unique_keys: Vec> = vec![input.logical_pk().iter().copied().collect()]; + let mut unique_keys: Vec> = vec![input.stream_key().iter().copied().collect()]; // We don't have UNIQUE key now. So we hack here to support some complex queries on // system tables.