diff --git a/src/frontend/src/optimizer/plan_node/derive.rs b/src/frontend/src/optimizer/plan_node/derive.rs index 7153f8e6a8943..92ea32c73592c 100644 --- a/src/frontend/src/optimizer/plan_node/derive.rs +++ b/src/frontend/src/optimizer/plan_node/derive.rs @@ -82,7 +82,12 @@ pub(crate) fn derive_pk( columns: &[ColumnCatalog], ) -> (Vec, Vec) { // Note(congyi): avoid pk duplication - let stream_key = input.stream_key().iter().copied().unique().collect_vec(); + let stream_key = input + .expect_stream_key() + .iter() + .copied() + .unique() + .collect_vec(); let schema = input.schema(); // Assert the uniqueness of column names and IDs, including hidden columns. diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index cdc59256439a1..4c35f2540c99f 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -343,7 +343,7 @@ impl Agg { window_col_idx: Option, ) -> Vec { let in_fields = self.input.schema().fields().to_vec(); - let in_pks = self.input.stream_key().to_vec(); + let in_pks = self.input.stream_key().unwrap().to_vec(); let in_append_only = self.input.append_only(); let in_dist_key = self.input.distribution().dist_column_indices().to_vec(); diff --git a/src/frontend/src/optimizer/plan_node/generic/delete.rs b/src/frontend/src/optimizer/plan_node/generic/delete.rs index 23ab1caf6f442..26952dd1c4031 100644 --- a/src/frontend/src/optimizer/plan_node/generic/delete.rs +++ b/src/frontend/src/optimizer/plan_node/generic/delete.rs @@ -62,7 +62,7 @@ impl GenericPlanNode for Delete { fn stream_key(&self) -> Option> { if self.returning { - Some(self.input.stream_key().to_vec()) + Some(self.input.stream_key()?.to_vec()) } else { Some(vec![]) } diff --git a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs index 1e45d8f710af2..85ffd922c43e7 100644 --- a/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/dynamic_filter.rs @@ -59,7 +59,7 @@ impl GenericPlanNode for DynamicFilter { } fn stream_key(&self) -> Option> { - Some(self.left.stream_key().to_vec()) + Some(self.left.stream_key()?.to_vec()) } fn ctx(&self) -> OptimizerContextRef { @@ -151,7 +151,7 @@ pub fn infer_left_internal_table_catalog( let mut pk_indices = vec![left_key_index]; let read_prefix_len_hint = pk_indices.len(); - for i in me.stream_key() { + for i in me.stream_key().unwrap() { if *i != left_key_index { pk_indices.push(*i); } diff --git a/src/frontend/src/optimizer/plan_node/generic/except.rs b/src/frontend/src/optimizer/plan_node/generic/except.rs index 7dc99c8290210..a49802e8d9155 100644 --- a/src/frontend/src/optimizer/plan_node/generic/except.rs +++ b/src/frontend/src/optimizer/plan_node/generic/except.rs @@ -34,7 +34,7 @@ impl GenericPlanNode for Except { } fn stream_key(&self) -> Option> { - Some(self.inputs[0].stream_key().to_vec()) + Some(self.inputs[0].stream_key()?.to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/expand.rs b/src/frontend/src/optimizer/plan_node/generic/expand.rs index 55624b04baa99..ba7c52aa814ef 100644 --- a/src/frontend/src/optimizer/plan_node/generic/expand.rs +++ b/src/frontend/src/optimizer/plan_node/generic/expand.rs @@ -61,7 +61,7 @@ impl GenericPlanNode for Expand { let input_schema_len = self.input.schema().len(); let mut pk_indices = self .input - .stream_key() + .stream_key()? .iter() .map(|&pk| pk + input_schema_len) .collect_vec(); diff --git a/src/frontend/src/optimizer/plan_node/generic/filter.rs b/src/frontend/src/optimizer/plan_node/generic/filter.rs index bac03135c4214..5b09e504121c0 100644 --- a/src/frontend/src/optimizer/plan_node/generic/filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/filter.rs @@ -54,7 +54,7 @@ impl GenericPlanNode for Filter { } fn stream_key(&self) -> Option> { - Some(self.input.stream_key().to_vec()) + Some(self.input.stream_key()?.to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs index 37d9401ee194d..9bd0dec4b70cc 100644 --- a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs @@ -77,7 +77,7 @@ impl GenericPlanNode for HopWindow { } else { let mut pk = self .input - .stream_key() + .stream_key()? .iter() .filter_map(|&pk_idx| self.output_indices.iter().position(|&idx| idx == pk_idx)) .collect_vec(); diff --git a/src/frontend/src/optimizer/plan_node/generic/intersect.rs b/src/frontend/src/optimizer/plan_node/generic/intersect.rs index 84c34e0b8d507..c0db320dbf537 100644 --- a/src/frontend/src/optimizer/plan_node/generic/intersect.rs +++ b/src/frontend/src/optimizer/plan_node/generic/intersect.rs @@ -33,7 +33,7 @@ impl GenericPlanNode for Intersect { } fn stream_key(&self) -> Option> { - Some(self.inputs[0].stream_key().to_vec()) + Some(self.inputs[0].stream_key()?.to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/join.rs b/src/frontend/src/optimizer/plan_node/generic/join.rs index d4cc84376599f..9429e29b968ea 100644 --- a/src/frontend/src/optimizer/plan_node/generic/join.rs +++ b/src/frontend/src/optimizer/plan_node/generic/join.rs @@ -93,8 +93,8 @@ impl GenericPlanNode for Join { fn stream_key(&self) -> Option> { let _left_len = self.left.schema().len(); let _right_len = self.right.schema().len(); - let left_pk = self.left.stream_key(); - let right_pk = self.right.stream_key(); + let left_pk = self.left.stream_key()?; + let right_pk = self.right.stream_key()?; let l2i = self.l2i_col_mapping(); let r2i = self.r2i_col_mapping(); let full_out_col_num = self.internal_column_num(); @@ -110,7 +110,7 @@ impl GenericPlanNode for Join { // NOTE(st1page): add join keys in the pk_indices a work around before we really have stream // key. - pk_indices.and_then(|mut pk_indices| { + pk_indices.and_then(|mut pk_indices: Vec| { let left_len = self.left.schema().len(); let right_len = self.right.schema().len(); let eq_predicate = EqJoinPredicate::create(left_len, right_len, self.on.clone()); diff --git a/src/frontend/src/optimizer/plan_node/generic/limit.rs b/src/frontend/src/optimizer/plan_node/generic/limit.rs index 2773ea325285e..b4ac1ef7821e2 100644 --- a/src/frontend/src/optimizer/plan_node/generic/limit.rs +++ b/src/frontend/src/optimizer/plan_node/generic/limit.rs @@ -43,7 +43,7 @@ impl GenericPlanNode for Limit { } fn stream_key(&self) -> Option> { - Some(self.input.stream_key().to_vec()) + Some(self.input.stream_key()?.to_vec()) } } impl Limit { diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index cc02763f26f98..49038500b4301 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -86,7 +86,7 @@ pub(super) use impl_distill_unit_from_fields; pub trait GenericPlanRef: Eq + Hash { fn schema(&self) -> &Schema; - fn stream_key(&self) -> &[usize]; + fn stream_key(&self) -> Option<&[usize]>; fn functional_dependency(&self) -> &FunctionalDependencySet; fn ctx(&self) -> OptimizerContextRef; } diff --git a/src/frontend/src/optimizer/plan_node/generic/over_window.rs b/src/frontend/src/optimizer/plan_node/generic/over_window.rs index cf7bc028ff82f..96e60184fbcca 100644 --- a/src/frontend/src/optimizer/plan_node/generic/over_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/over_window.rs @@ -219,7 +219,7 @@ impl GenericPlanNode for OverWindow { } fn stream_key(&self) -> Option> { - let mut output_pk = self.input.stream_key().to_vec(); + let mut output_pk = self.input.stream_key()?.to_vec(); for part_key_idx in self .window_functions .iter() diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index ca1dc7ae4468f..d8b6988af4391 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -100,7 +100,7 @@ impl GenericPlanNode for Project { fn stream_key(&self) -> Option> { let i2o = self.i2o_col_mapping(); self.input - .stream_key() + .stream_key()? .iter() .map(|pk_col| i2o.try_map(*pk_col)) .collect::>>() diff --git a/src/frontend/src/optimizer/plan_node/generic/project_set.rs b/src/frontend/src/optimizer/plan_node/generic/project_set.rs index 3e5d5585f782c..fef26d1b32993 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project_set.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project_set.rs @@ -92,7 +92,7 @@ impl GenericPlanNode for ProjectSet { let i2o = self.i2o_col_mapping(); let mut pk = self .input - .stream_key() + .stream_key()? .iter() .map(|pk_col| i2o.try_map(*pk_col)) .collect::>>() diff --git a/src/frontend/src/optimizer/plan_node/generic/share.rs b/src/frontend/src/optimizer/plan_node/generic/share.rs index d9c32b6a28f6f..838a02c07f2e1 100644 --- a/src/frontend/src/optimizer/plan_node/generic/share.rs +++ b/src/frontend/src/optimizer/plan_node/generic/share.rs @@ -44,7 +44,7 @@ impl GenericPlanNode for Share { } fn stream_key(&self) -> Option> { - Some(self.input.borrow().stream_key().to_vec()) + Some(self.input.borrow().stream_key()?.to_vec()) } fn ctx(&self) -> OptimizerContextRef { diff --git a/src/frontend/src/optimizer/plan_node/generic/top_n.rs b/src/frontend/src/optimizer/plan_node/generic/top_n.rs index dcca35f5a5d9f..f0d04ab933a78 100644 --- a/src/frontend/src/optimizer/plan_node/generic/top_n.rs +++ b/src/frontend/src/optimizer/plan_node/generic/top_n.rs @@ -42,7 +42,7 @@ impl TopN { &self, schema: &Schema, ctx: OptimizerContextRef, - stream_key: &[usize], + input_stream_key: &[usize], vnode_col_idx: Option, ) -> TableCatalog { let columns_fields = schema.fields().to_vec(); @@ -71,7 +71,7 @@ impl TopN { order_cols.insert(order.column_index); }); - stream_key.iter().for_each(|idx| { + input_stream_key.iter().for_each(|idx| { if !order_cols.contains(idx) { internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); @@ -176,7 +176,7 @@ impl GenericPlanNode for TopN { if self.limit_attr.max_one_row() { Some(self.group_key.clone()) } else { - let mut pk = self.input.stream_key().to_vec(); + let mut pk = self.input.stream_key()?.to_vec(); for i in &self.group_key { if !pk.contains(i) { pk.push(*i); diff --git a/src/frontend/src/optimizer/plan_node/generic/union.rs b/src/frontend/src/optimizer/plan_node/generic/union.rs index b8a1d9e2ec5e3..bc736eed4e153 100644 --- a/src/frontend/src/optimizer/plan_node/generic/union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/union.rs @@ -40,7 +40,7 @@ impl GenericPlanNode for Union { // Union all its inputs pks + source_col if exists let mut pk_indices = vec![]; for input in &self.inputs { - for pk in input.stream_key() { + for pk in input.stream_key()? { if !pk_indices.contains(pk) { pk_indices.push(*pk); } diff --git a/src/frontend/src/optimizer/plan_node/generic/update.rs b/src/frontend/src/optimizer/plan_node/generic/update.rs index f832806e7cf9a..16d73afe62003 100644 --- a/src/frontend/src/optimizer/plan_node/generic/update.rs +++ b/src/frontend/src/optimizer/plan_node/generic/update.rs @@ -64,7 +64,7 @@ impl GenericPlanNode for Update { fn stream_key(&self) -> Option> { if self.returning { - Some(self.input.stream_key().to_vec()) + Some(self.input.stream_key()?.to_vec()) } else { Some(vec![]) } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 7f86551acb397..0ab848eb29f64 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -223,8 +223,11 @@ impl LogicalAgg { // so it obeys consistent hash strategy via [`Distribution::HashShard`]. let stream_input = if *input_dist == Distribution::SomeShard && self.core.must_try_two_phase_agg() { - RequiredDist::shard_by_key(stream_input.schema().len(), stream_input.stream_key()) - .enforce_if_not_satisfies(stream_input, &Order::any())? + RequiredDist::shard_by_key( + stream_input.schema().len(), + stream_input.expect_stream_key(), + ) + .enforce_if_not_satisfies(stream_input, &Order::any())? } else { stream_input }; diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs index 3ab29bf4ff4d9..7640f093fc933 100644 --- a/src/frontend/src/optimizer/plan_node/logical_apply.rs +++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs @@ -86,16 +86,13 @@ impl LogicalApply { let ctx = left.ctx(); let join_core = generic::Join::with_full_output(left, right, join_type, on); let schema = join_core.schema(); - let pk_indices = join_core.stream_key(); - let (functional_dependency, pk_indices) = match pk_indices { - Some(pk_indices) => ( - FunctionalDependencySet::with_key(schema.len(), &pk_indices), - pk_indices, - ), - None => (FunctionalDependencySet::new(schema.len()), vec![]), + let stream_key = join_core.stream_key(); + let functional_dependency = match &stream_key { + Some(stream_key) => FunctionalDependencySet::with_key(schema.len(), stream_key), + None => FunctionalDependencySet::new(schema.len()), }; let (left, right, on, join_type, _output_indices) = join_core.decompose(); - let base = PlanBase::new_logical(ctx, schema, pk_indices, functional_dependency); + let base = PlanBase::new_logical(ctx, schema, stream_key, functional_dependency); LogicalApply { base, left, diff --git a/src/frontend/src/optimizer/plan_node/logical_dedup.rs b/src/frontend/src/optimizer/plan_node/logical_dedup.rs index f070d51847fee..dd46f9af9be1d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/logical_dedup.rs @@ -38,13 +38,8 @@ pub struct LogicalDedup { impl LogicalDedup { pub fn new(input: PlanRef, dedup_cols: Vec) -> Self { - let base = PlanBase::new_logical( - input.ctx(), - input.schema().clone(), - dedup_cols.clone(), - input.functional_dependency().clone(), - ); let core = generic::Dedup { input, dedup_cols }; + let base = PlanBase::new_logical_with_core(&core); LogicalDedup { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index d07504701ab4d..e4bd65efa647c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -63,23 +63,12 @@ impl LogicalHopWindow { output_indices, }; - let _schema = core.schema(); - let _pk_indices = core.stream_key(); let ctx = core.ctx(); - // NOTE(st1page): add join keys in the pk_indices a work around before we really have stream - // key. - // let pk_indices = match pk_indices { - // Some(pk_indices) if functional_dependency.is_key(&pk_indices) => { - // functional_dependency.minimize_key(&pk_indices) - // } - // _ => pk_indices.unwrap_or_default(), - // }; - let base = PlanBase::new_logical( ctx, core.schema(), - core.stream_key().unwrap_or_default(), + core.stream_key(), core.functional_dependency(), ); @@ -348,7 +337,7 @@ impl ToStream for LogicalHopWindow { let i2o = self.core.i2o_col_mapping(); output_indices.extend( input - .stream_key() + .expect_stream_key() .iter() .cloned() .filter(|i| i2o.try_map(*i).is_none()), diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 21aef3277a9de..e76a99fd15e21 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1395,14 +1395,14 @@ impl ToStream for LogicalJoin { // Add missing pk indices to the logical join let mut left_to_add = left - .stream_key() + .expect_stream_key() .iter() .cloned() .filter(|i| l2o.try_map(*i).is_none()) .collect_vec(); let mut right_to_add = right - .stream_key() + .expect_stream_key() .iter() .filter(|&&i| r2o.try_map(i).is_none()) .map(|&i| i + left_len) @@ -1464,13 +1464,13 @@ impl ToStream for LogicalJoin { .composite(&join_with_pk.core.i2o_col_mapping()); let left_right_stream_keys = join_with_pk .left() - .stream_key() + .expect_stream_key() .iter() .map(|i| l2o.map(*i)) .chain( join_with_pk .right() - .stream_key() + .expect_stream_key() .iter() .map(|i| r2o.map(*i)), ) diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index 2a74d227dc86c..1c0253ab7aafd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -244,19 +244,7 @@ impl LogicalMultiJoin { .collect_vec() }; - let pk_indices = { - let mut pk_indices = vec![]; - for (i, input_pk) in inputs.iter().map(|input| input.stream_key()).enumerate() { - for input_pk_idx in input_pk { - pk_indices.push(inner_i2o_mappings[i].map(*input_pk_idx)); - } - } - pk_indices - .into_iter() - .map(|col_idx| inner2output.try_map(col_idx)) - .collect::>>() - .unwrap_or_default() - }; + let pk_indices = Self::derive_stream_key(&inputs, &inner_i2o_mappings, &inner2output); let functional_dependency = { let mut fd_set = FunctionalDependencySet::new(tot_col_num); let mut column_cnt: usize = 0; @@ -303,6 +291,25 @@ impl LogicalMultiJoin { } } + fn derive_stream_key( + inputs: &[PlanRef], + inner_i2o_mappings: &[ColIndexMapping], + inner2output: &ColIndexMapping, + ) -> Option> { + // TODO(st1page): add JOIN key + let mut pk_indices = vec![]; + for (i, input) in inputs.iter().enumerate() { + let input_stream_key = input.stream_key()?; + for input_pk_idx in input_stream_key { + pk_indices.push(inner_i2o_mappings[i].map(*input_pk_idx)); + } + } + pk_indices + .into_iter() + .map(|col_idx| inner2output.try_map(col_idx)) + .collect::>>() + } + /// Get a reference to the logical join's on. pub fn on(&self) -> &Condition { &self.on diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index 4a26ef6304541..2792c4848e3b3 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -41,7 +41,12 @@ impl LogicalNow { sub_fields: vec![], type_name: String::default(), }]); - let base = PlanBase::new_logical(ctx, schema, vec![], FunctionalDependencySet::default()); + let base = PlanBase::new_logical( + ctx, + schema, + Some(vec![]), + FunctionalDependencySet::default(), + ); Self { base } } } diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index 0eea08b6ac3ad..f3bb51cc7f971 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -264,7 +264,7 @@ impl ToStream for LogicalProject { let (proj, out_col_change) = self.rewrite_with_input(input.clone(), input_col_change); // Add missing columns of input_pk into the select list. - let input_pk = input.stream_key(); + let input_pk = input.expect_stream_key(); let i2o = proj.i2o_col_mapping(); let col_need_to_add = input_pk .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index 50ad246b5bb09..cba907eeeb379 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -363,7 +363,7 @@ impl ToStream for LogicalProjectSet { self.rewrite_with_input(input.clone(), input_col_change); // Add missing columns of input_pk into the select list. - let input_pk = input.stream_key(); + let input_pk = input.expect_stream_key(); let i2o = self.core.i2o_col_mapping(); let col_need_to_add = input_pk .iter() diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 8ee3e6504f185..6307558882023 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -549,7 +549,7 @@ impl ToStream for LogicalScan { None.into(), ))); } - match self.base.stream_key.is_empty() { + match self.base.stream_key.is_none() { true => { let mut col_ids = HashSet::new(); diff --git a/src/frontend/src/optimizer/plan_node/logical_table_function.rs b/src/frontend/src/optimizer/plan_node/logical_table_function.rs index c42a51aeb9024..15d510cc1c6fd 100644 --- a/src/frontend/src/optimizer/plan_node/logical_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/logical_table_function.rs @@ -64,7 +64,7 @@ impl LogicalTableFunction { .push(Field::with_name(DataType::Int64, "ordinality")); } let functional_dependency = FunctionalDependencySet::new(schema.len()); - let base = PlanBase::new_logical(ctx, schema, vec![], functional_dependency); + let base = PlanBase::new_logical(ctx, schema, None, functional_dependency); Self { base, table_function, diff --git a/src/frontend/src/optimizer/plan_node/logical_topn.rs b/src/frontend/src/optimizer/plan_node/logical_topn.rs index 1e4c6113f2ee7..bd585910cb09e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_topn.rs +++ b/src/frontend/src/optimizer/plan_node/logical_topn.rs @@ -177,7 +177,8 @@ impl LogicalTopN { ); // TODO(st1page): solve it - let global_top_n = StreamTopN::with_stream_key(global_top_n, self.stream_key().to_vec()); + let global_top_n = + StreamTopN::with_stream_key(global_top_n, self.stream_key().map(|v| v.to_vec())); // use another projection to remove the column we added before. exprs.pop(); diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index 2538450606772..10371fda3c2b0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -28,6 +28,7 @@ use crate::optimizer::plan_node::{ }; use crate::optimizer::property::RequiredDist; use crate::utils::{ColIndexMapping, Condition}; +use crate::Explain; /// `LogicalUnion` returns the union of the rows of its inputs. /// If `all` is false, it needs to eliminate duplicates. @@ -139,7 +140,12 @@ impl ToBatch for LogicalUnion { impl ToStream for LogicalUnion { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { // TODO: use round robin distribution instead of using hash distribution of all inputs. - let dist = RequiredDist::hash_shard(self.base.stream_key()); + let dist = RequiredDist::hash_shard(self.base.stream_key().unwrap_or_else(|| { + panic!( + "should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string() + ) + })); let new_inputs: Result> = self .inputs() .iter() @@ -174,7 +180,7 @@ impl ToStream for LogicalUnion { .map(|x| col_index_mapping.map(x)) .collect_vec(); new_input - .stream_key() + .expect_stream_key() .iter() .all(|x| original_schema_new_pos.contains(x)) }); @@ -223,7 +229,7 @@ impl ToStream for LogicalUnion { .iter() .flat_map(|(new_input, _)| { new_input - .stream_key() + .expect_stream_key() .iter() .map(|x| new_input.schema().fields[*x].data_type()) }) @@ -234,7 +240,7 @@ impl ToStream for LogicalUnion { .collect_vec(); let input_pk_lens = rewrites .iter() - .map(|(new_input, _)| new_input.stream_key().len()) + .map(|(new_input, _)| new_input.expect_stream_key().len()) .collect_vec(); let mut input_pk_offsets = vec![0]; for (i, len) in input_pk_lens.into_iter().enumerate() { @@ -258,7 +264,7 @@ impl ToStream for LogicalUnion { .collect_vec(); // input1_pk + input2_pk + ... let mut input_pks = input_pk_nulls.clone(); - for (j, pk_idx) in new_input.stream_key().iter().enumerate() { + for (j, pk_idx) in new_input.expect_stream_key().iter().enumerate() { input_pks[input_pk_offsets[i] + j] = ExprImpl::InputRef( InputRef::new( *pk_idx, diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 0c903559b4e2b..80e4f350d8edb 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -12,11 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::vec; - -use risingwave_common::catalog::{Field, Schema, TableVersionId}; +use risingwave_common::catalog::TableVersionId; use risingwave_common::error::Result; -use risingwave_common::types::DataType; use super::utils::impl_distill_by_unit; use super::{ @@ -28,7 +25,6 @@ use crate::expr::{ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; -use crate::optimizer::property::FunctionalDependencySet; use crate::utils::{ColIndexMapping, Condition}; /// [`LogicalUpdate`] iterates on input relation, set some columns, and inject update records into @@ -43,14 +39,7 @@ pub struct LogicalUpdate { impl From> for LogicalUpdate { fn from(core: generic::Update) -> Self { - let ctx = core.input.ctx(); - let schema = if core.returning { - core.input.schema().clone() - } else { - Schema::new(vec![Field::unnamed(DataType::Int64)]) - }; - let fd_set = FunctionalDependencySet::new(schema.len()); - let base = PlanBase::new_logical(ctx, schema, vec![], fd_set); + let base = PlanBase::new_logical_with_core(&core); Self { base, core } } } diff --git a/src/frontend/src/optimizer/plan_node/logical_values.rs b/src/frontend/src/optimizer/plan_node/logical_values.rs index e6f8dc59c63ff..c6a3d2ac0564e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_values.rs +++ b/src/frontend/src/optimizer/plan_node/logical_values.rs @@ -50,7 +50,7 @@ impl LogicalValues { } } let functional_dependency = FunctionalDependencySet::new(schema.len()); - let base = PlanBase::new_logical(ctx, schema, vec![], functional_dependency); + let base = PlanBase::new_logical(ctx, schema, None, functional_dependency); Self { rows: rows.into(), base, @@ -70,7 +70,7 @@ impl LogicalValues { } } let functional_dependency = FunctionalDependencySet::new(schema.len()); - let base = PlanBase::new_logical(ctx, schema, vec![pk_index], functional_dependency); + let base = PlanBase::new_logical(ctx, schema, Some(vec![pk_index]), functional_dependency); Self { rows: rows.into(), base, diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 189ab9a0f1a6d..a4ec6191dedf3 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -218,6 +218,15 @@ impl RewriteExprsRecursive for PlanRef { } impl PlanRef { + pub fn expect_stream_key(&self) -> &[usize] { + self.stream_key().unwrap_or_else(|| { + panic!( + "a stream key is expected but not exist, plan: {}", + self.explain_to_string() + ) + }) + } + fn prune_col_inner(&self, required_cols: &[usize], ctx: &mut ColumnPruningContext) -> PlanRef { if let Some(logical_share) = self.as_logical_share() { // Check the share cache first. If cache exists, it means this is the second round of @@ -435,8 +444,8 @@ impl GenericPlanRef for PlanRef { &self.plan_base().schema } - fn stream_key(&self) -> &[usize] { - &self.plan_base().stream_key + fn stream_key(&self) -> Option<&[usize]> { + self.plan_base().stream_key() } fn ctx(&self) -> OptimizerContextRef { @@ -514,8 +523,8 @@ impl dyn PlanNode { &self.plan_base().schema } - pub fn stream_key(&self) -> &[usize] { - &self.plan_base().stream_key + pub fn stream_key(&self) -> Option<&[usize]> { + self.plan_base().stream_key() } pub fn order(&self) -> &Order { @@ -566,7 +575,12 @@ impl dyn PlanNode { identity: self.explain_myself_to_string(), node_body: node, operator_id: self.id().0 as _, - stream_key: self.stream_key().iter().map(|x| *x as u32).collect(), + stream_key: self + .stream_key() + .unwrap_or_default() + .iter() + .map(|x| *x as u32) + .collect(), fields: self.schema().to_prost(), append_only: self.append_only(), } diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index 1ca7f513cb3ab..22e239a7369d5 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -36,7 +36,7 @@ pub struct PlanBase { pub ctx: OptimizerContextRef, pub schema: Schema, /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key - pub stream_key: Vec, + pub stream_key: Option>, /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect /// correctness, but insert unnecessary sort in plan pub order: Order, @@ -59,8 +59,8 @@ impl generic::GenericPlanRef for PlanBase { &self.schema } - fn stream_key(&self) -> &[usize] { - &self.stream_key + fn stream_key(&self) -> Option<&[usize]> { + self.stream_key.as_deref() } fn ctx(&self) -> OptimizerContextRef { @@ -94,7 +94,7 @@ impl PlanBase { pub fn new_logical( ctx: OptimizerContextRef, schema: Schema, - stream_key: Vec, + stream_key: Option>, functional_dependency: FunctionalDependencySet, ) -> Self { let id = ctx.next_plan_node_id(); @@ -118,7 +118,7 @@ impl PlanBase { Self::new_logical( node.ctx(), node.schema(), - node.stream_key().unwrap_or_default(), + node.stream_key(), node.functional_dependency(), ) } @@ -133,7 +133,7 @@ impl PlanBase { Self::new_stream( logical.ctx(), logical.schema(), - logical.stream_key().unwrap_or_default().to_vec(), + logical.stream_key(), logical.functional_dependency(), dist, append_only, @@ -145,7 +145,7 @@ impl PlanBase { pub fn new_stream( ctx: OptimizerContextRef, schema: Schema, - stream_key: Vec, + stream_key: Option>, functional_dependency: FunctionalDependencySet, dist: Distribution, append_only: bool, @@ -191,7 +191,7 @@ impl PlanBase { schema, dist, order, - stream_key: vec![], + stream_key: None, // Batch plan node won't touch `append_only` field append_only: true, emit_on_window_close: false, // TODO(rc): batch EOWC support? @@ -204,7 +204,7 @@ impl PlanBase { PlanBase::new_stream( plan_node.ctx(), plan_node.schema().clone(), - plan_node.stream_key().to_vec(), + plan_node.stream_key().map(|v| v.to_vec()), plan_node.functional_dependency().clone(), plan_node.distribution().clone(), plan_node.append_only(), @@ -233,8 +233,8 @@ macro_rules! impl_base_delegate { pub fn schema(&self) -> &Schema { &self.plan_base().schema } - pub fn stream_key(&self) -> &[usize] { - &self.plan_base().stream_key + pub fn stream_key(&self) -> Option<&[usize]> { + self.plan_base().stream_key() } pub fn order(&self) -> &Order { &self.plan_base().order diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 598318ed22d0c..36b2c77d22aed 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -76,7 +76,7 @@ pub trait StreamPlanNode: GenericPlanNode { id: ctx.next_plan_node_id(), ctx, schema: self.schema(), - stream_key: self.stream_key().unwrap_or_default(), + stream_key: self.stream_key(), dist: self.distribution(), append_only: self.append_only(), emit_on_window_close: self.emit_on_window_close(), @@ -95,8 +95,8 @@ impl generic::GenericPlanRef for PlanRef { &self.0.schema } - fn stream_key(&self) -> &[usize] { - &self.0.stream_key + fn stream_key(&self) -> Option<&[usize]> { + self.0.stream_key.as_deref() } fn ctx(&self) -> OptimizerContextRef { @@ -113,8 +113,8 @@ impl generic::GenericPlanRef for PlanBase { &self.schema } - fn stream_key(&self) -> &[usize] { - &self.stream_key + fn stream_key(&self) -> Option<&[usize]> { + self.stream_key.as_deref() } fn ctx(&self) -> OptimizerContextRef { @@ -266,7 +266,7 @@ impl HashJoin { // dedup the pk in dist key.. let mut deduped_input_pk_indices = vec![]; - for input_pk_idx in input.stream_key() { + for input_pk_idx in input.stream_key().unwrap() { if !pk_indices.contains(input_pk_idx) && !deduped_input_pk_indices.contains(input_pk_idx) { @@ -410,7 +410,7 @@ pub struct PlanBase { #[educe(Hash(ignore))] pub ctx: OptimizerContextRef, pub schema: Schema, - pub stream_key: Vec, + pub stream_key: Option>, #[educe(PartialEq(ignore))] #[educe(Hash(ignore))] pub dist: Distribution, @@ -608,7 +608,7 @@ pub fn to_stream_prost_body( .infer_internal_table_catalog( input.schema(), input.ctx(), - input.stream_key(), + input.stream_key().unwrap(), me.vnode_col_idx, ) .with_id(state.gen_table_id_wrapped()); @@ -765,7 +765,9 @@ pub fn to_stream_prost_body( me.infer_internal_table_catalog( input.schema(), input.ctx(), - input.stream_key(), + input + .stream_key() + .expect("should always have a stream key in the stream plan but not"), None, ) .with_id(state.gen_table_id_wrapped()) diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index febf550049265..c9f969384c3a4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -33,7 +33,7 @@ impl StreamDml { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.stream_key().to_vec(), + input.stream_key().map(|v| v.to_vec()), input.functional_dependency().clone(), input.distribution().clone(), append_only, diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 6b13b88a38bf2..427c0de4c89c1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -101,7 +101,7 @@ impl StreamEowcOverWindow { tbl_builder.add_order_column(order_key_index, OrderType::ascending()); order_cols.insert(order_key_index); } - for idx in self.logical.input.stream_key() { + for idx in self.logical.input.expect_stream_key() { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index b70b127a5522c..45bcccff7a6cf 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -37,7 +37,7 @@ impl StreamExchange { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.stream_key().to_vec(), + input.stream_key().map(|v| v.to_vec()), input.functional_dependency().clone(), dist, input.append_only(), @@ -53,12 +53,11 @@ impl StreamExchange { pub fn new_no_shuffle(input: PlanRef) -> Self { let ctx = input.ctx(); - let pk_indices = input.stream_key().to_vec(); // Dispatch executor won't change the append-only behavior of the stream. let base = PlanBase::new_stream( ctx, input.schema().clone(), - pk_indices, + input.stream_key().map(|v| v.to_vec()), input.functional_dependency().clone(), input.distribution().clone(), input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 93209aa0b8800..2248a49223641 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -93,7 +93,7 @@ impl StreamNode for StreamGroupTopN { .infer_internal_table_catalog( input.schema(), input.ctx(), - input.stream_key(), + input.expect_stream_key(), self.vnode_col_idx, ) .with_id(state.gen_table_id_wrapped()); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index 7c77773f9ecc7..b14be1a371a2a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -149,7 +149,7 @@ impl StreamMaterialize { TableType::MaterializedView => { assert_matches!(user_distributed_by, RequiredDist::Any); // ensure the same pk will not shuffle to different node - RequiredDist::shard_by_key(input.schema().len(), input.stream_key()) + RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()) } TableType::Index => { assert_matches!( diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index da1f04f2a2698..9eb0a0e0f143e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -46,7 +46,7 @@ impl StreamNow { let base = PlanBase::new_stream( ctx, schema, - vec![], + Some(vec![]), FunctionalDependencySet::default(), Distribution::Single, false, diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index e77145bac748d..d3a89129b4b82 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -68,7 +68,7 @@ impl StreamOverWindow { tbl_builder.add_order_column(o.column_index, o.order_type); } } - for &idx in self.logical.input.stream_key() { + for &idx in self.logical.input.expect_stream_key() { if order_cols.insert(idx) { tbl_builder.add_order_column(idx, OrderType::ascending()); } diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs index a2a5f834400a5..c85bd10dc1632 100644 --- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs @@ -40,7 +40,7 @@ impl StreamRowIdGen { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.stream_key().to_vec(), + input.stream_key().map(|v| v.to_vec()), input.functional_dependency().clone(), distribution, input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 795cdcbfda06b..969527fa69702 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -20,6 +20,7 @@ use super::utils::Distill; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode}; use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode}; use crate::stream_fragmenter::BuildFragmentGraphState; +use crate::Explain; /// `StreamShare` will be translated into an `ExchangeNode` based on its distribution finally. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -96,7 +97,13 @@ impl StreamShare { identity: self.distill_to_string(), node_body: Some(node_body), operator_id: self.id().0 as _, - stream_key: self.stream_key().iter().map(|x| *x as u32).collect(), + stream_key: self + .stream_key() + .unwrap_or_else(|| panic!("should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string())) + .iter() + .map(|x| *x as u32) + .collect(), fields: self.schema().to_prost(), append_only: self.append_only(), }; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 563a067adaac5..a51380d630331 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -163,7 +163,7 @@ impl StreamSink { } _ => { assert_matches!(user_distributed_by, RequiredDist::Any); - RequiredDist::shard_by_key(input.schema().len(), input.stream_key()) + RequiredDist::shard_by_key(input.schema().len(), input.expect_stream_key()) } } } diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index 9ade7a165500a..b82d71068d817 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -48,7 +48,7 @@ impl StreamEowcSort { assert!(input.watermark_columns().contains(sort_column_index)); let schema = input.schema().clone(); - let stream_key = input.stream_key().to_vec(); + let stream_key = input.stream_key().map(|v| v.to_vec()); let fd_set = input.functional_dependency().clone(); let dist = input.distribution().clone(); let mut watermark_columns = FixedBitSet::with_capacity(input.schema().len()); @@ -92,7 +92,7 @@ impl StreamEowcSort { } } - for idx in self.input.stream_key() { + for idx in self.input.expect_stream_key() { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); order_cols.insert(*idx); diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 7e4e988900b3c..dfe1243e881db 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -32,7 +32,7 @@ use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; -use crate::TableCatalog; +use crate::{Explain, TableCatalog}; /// `StreamTableScan` is a virtual plan node to represent a stream table scan. It will be converted /// to chain + merge node (for upstream materialize) + batch table scan when converting to `MView` @@ -202,7 +202,7 @@ impl Distill for StreamTableScan { if verbose { let pk = IndicesDisplay { - indices: self.stream_key(), + indices: self.stream_key().unwrap_or_default(), schema: &self.base.schema, }; vec.push(("pk", pk.distill())); @@ -227,7 +227,17 @@ impl StreamTableScan { pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode { use risingwave_pb::stream_plan::*; - let stream_key = self.base.stream_key.iter().map(|x| *x as u32).collect_vec(); + let stream_key = self + .stream_key() + .unwrap_or_else(|| { + panic!( + "should always have a stream key in the stream plan but not, sub plan: {}", + PlanRef::from(self.clone()).explain_to_string() + ) + }) + .iter() + .map(|x| *x as u32) + .collect_vec(); // The required columns from the table (both scan and upstream). let upstream_column_ids = match self.chain_type { diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index f9be4149ea06b..a99dc920dbbaa 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -30,7 +30,7 @@ pub struct StreamTopN { } impl StreamTopN { - fn new_inner(logical: generic::TopN, stream_key: Option>) -> Self { + fn new_inner(logical: generic::TopN, stream_key: Option>>) -> Self { assert!(logical.group_key.is_empty()); assert!(logical.limit_attr.limit() > 0); let input = &logical.input; @@ -52,7 +52,10 @@ impl StreamTopN { Self::new_inner(logical, None) } - pub fn with_stream_key(logical: generic::TopN, stream_key: Vec) -> Self { + pub fn with_stream_key( + logical: generic::TopN, + stream_key: Option>, + ) -> Self { Self::new_inner(logical, Some(stream_key)) } @@ -86,7 +89,7 @@ impl PlanTreeNodeUnary for StreamTopN { fn clone_with_input(&self, input: PlanRef) -> Self { let mut logical = self.logical.clone(); logical.input = input; - Self::new_inner(logical, Some(self.stream_key().to_vec())) + Self::new_inner(logical, Some(self.stream_key().map(|v| v.to_vec()))) } } @@ -106,7 +109,7 @@ impl StreamNode for StreamTopN { .infer_internal_table_catalog( input.schema(), input.ctx(), - input.stream_key(), + input.expect_stream_key(), None, ) .with_id(state.gen_table_id_wrapped()) diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index 1f764ba37c305..fb0b844411f63 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -40,7 +40,7 @@ impl StreamValues { let base = PlanBase::new_stream( ctx, logical.schema().clone(), - logical.stream_key().to_vec(), + logical.stream_key().map(|v| v.to_vec()), logical.functional_dependency().clone(), Distribution::Single, true, diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index 2897d67ec5dea..ed5a946603ee4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -43,7 +43,7 @@ impl StreamWatermarkFilter { let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), - input.stream_key().to_vec(), + input.stream_key().map(|v| v.to_vec()), input.functional_dependency().clone(), input.distribution().clone(), input.append_only(), diff --git a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs index e32385be59d7b..71b7b17d9ab4e 100644 --- a/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs +++ b/src/frontend/src/optimizer/plan_visitor/cardinality_visitor.rs @@ -36,11 +36,14 @@ impl CardinalityVisitor { input_card: Cardinality, eq_set: HashSet, ) -> Cardinality { - let mut unique_keys: Vec> = vec![input.stream_key().iter().copied().collect()]; - + let mut unique_keys: Vec> = if let Some(stream_key) = input.stream_key() { + vec![stream_key.iter().copied().collect()] + } else { + vec![] + }; // We don't have UNIQUE key now. So we hack here to support some complex queries on // system tables. - // TODO(card): remove this after we have UNIQUE key. + // TODO(card): remove this after we have UNIQUE key. https://github.com/risingwavelabs/risingwave/issues/12514 if let Some(scan) = input.as_logical_scan() && scan.is_sys_table() && scan.table_name() == PG_NAMESPACE_TABLE_NAME @@ -55,7 +58,6 @@ impl CardinalityVisitor { if unique_keys .iter() - .filter(|unique_key| !unique_key.is_empty()) .any(|unique_key| eq_set.is_superset(unique_key)) { input_card.min(0..=1)