diff --git a/src/common/src/util/column_index_mapping.rs b/src/common/src/util/column_index_mapping.rs index 49030a06c5001..2c12dc47efb11 100644 --- a/src/common/src/util/column_index_mapping.rs +++ b/src/common/src/util/column_index_mapping.rs @@ -32,19 +32,9 @@ pub struct ColIndexMapping { } impl ColIndexMapping { - /// Create a partial mapping which maps the subscripts range `(0..map.len())` to the - /// corresponding element. **This method is not recommended**, please use `with_target_size` instead, see for more information** - pub fn without_target_size(map: Vec>) -> Self { - let target_size = match map.iter().filter_map(|x| *x).max_by_key(|x| *x) { - Some(target_max) => target_max + 1, - None => 0, - }; - Self { target_size, map } - } - /// Create a partial mapping which maps from the subscripts range `(0..map.len())` to /// `(0..target_size)`. Each subscript is mapped to the corresponding element. - pub fn with_target_size(map: Vec>, target_size: usize) -> Self { + pub fn new(map: Vec>, target_size: usize) -> Self { if let Some(target_max) = map.iter().filter_map(|x| *x).max_by_key(|x| *x) { assert!(target_max < target_size) }; @@ -69,7 +59,7 @@ impl ColIndexMapping { pub fn identity(size: usize) -> Self { let map = (0..size).map(Some).collect(); - Self::without_target_size(map) + Self::new(map, size) } pub fn is_identity(&self) -> bool { @@ -90,12 +80,12 @@ impl ColIndexMapping { let map = (0..source_size) .map(|i| if i < target_size { Some(i) } else { None }) .collect(); - Self::with_target_size(map, target_size) + Self::new(map, target_size) } pub fn empty(source_size: usize, target_size: usize) -> Self { let map = vec![None; source_size]; - Self::with_target_size(map, target_size) + Self::new(map, target_size) } /// Create a partial mapping which maps range `(0..source_num)` to range @@ -134,7 +124,7 @@ impl ColIndexMapping { }) .collect_vec(); let target_size = usize::try_from(source_num as isize + offset).unwrap(); - Self::with_target_size(map, target_size) + Self::new(map, target_size) } /// Maps the smallest index to 0, the next smallest to 1, and so on. @@ -159,7 +149,7 @@ impl ColIndexMapping { for (tar, &src) in cols.iter().enumerate() { map[src] = Some(tar); } - Self::without_target_size(map) + Self::new(map, cols.len()) } // TODO(yuchao): isn't this the same as `with_remaining_columns`? @@ -170,7 +160,7 @@ impl ColIndexMapping { map[src] = Some(tar); } } - Self::without_target_size(map) + Self::new(map, cols.len()) } /// Remove the given columns, and maps the remaining columns to a consecutive range starting @@ -205,7 +195,7 @@ impl ColIndexMapping { for target in &mut map { *target = target.and_then(|index| following.try_map(index)); } - Self::with_target_size(map, following.target_size()) + Self::new(map, following.target_size()) } pub fn clone_with_offset(&self, offset: usize) -> Self { @@ -213,7 +203,7 @@ impl ColIndexMapping { for target in &mut map { *target = target.and_then(|index| index.checked_add(offset)); } - Self::with_target_size(map, self.target_size() + offset) + Self::new(map, self.target_size() + offset) } /// Union two mapping, the result mapping `target_size` and source size will be the max size @@ -236,7 +226,7 @@ impl ColIndexMapping { assert_eq!(map[src], None); map[src] = Some(dst); } - Self::with_target_size(map, target_size) + Self::new(map, target_size) } /// Inverse the mapping. If a target corresponds to more than one source, return `None`. @@ -249,7 +239,7 @@ impl ColIndexMapping { } map[dst] = Some(src); } - Some(Self::with_target_size(map, self.source_size())) + Some(Self::new(map, self.source_size())) } /// return iter of (src, dst) order by src diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index 18313e0458a04..1b3babc41ceaf 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -240,7 +240,7 @@ pub async fn handle_alter_table_column( }; // Calculate the mapping from the original columns to the new columns. - let col_index_mapping = ColIndexMapping::with_target_size( + let col_index_mapping = ColIndexMapping::new( original_catalog .columns() .iter() diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 4004748a2f4f9..20da006dcc992 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -378,7 +378,7 @@ impl PlanRoot { } let plan = LogicalProject::with_out_col_idx(plan, output_indices.into_iter()); - let out_col_change = ColIndexMapping::with_target_size(map, target_size); + let out_col_change = ColIndexMapping::new(map, target_size); (plan.into(), out_col_change) } }; diff --git a/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs b/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs index 0811bc0531a25..2c714bea0f7de 100644 --- a/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs +++ b/src/frontend/src/optimizer/plan_node/eq_join_predicate.rs @@ -238,7 +238,7 @@ impl EqJoinPredicate { for (left, right, _) in self.eq_keys() { map[right.index - left_cols_num] = Some(left.index); } - ColIndexMapping::with_target_size(map, left_cols_num) + ColIndexMapping::new(map, left_cols_num) } /// return the eq columns index mapping from left inputs to right inputs @@ -251,7 +251,7 @@ impl EqJoinPredicate { for (left, right, _) in self.eq_keys() { map[left.index] = Some(right.index - left_cols_num); } - ColIndexMapping::with_target_size(map, right_cols_num) + ColIndexMapping::new(map, right_cols_num) } /// Reorder the `eq_keys` according to the `reorder_idx`. diff --git a/src/frontend/src/optimizer/plan_node/generic/agg.rs b/src/frontend/src/optimizer/plan_node/generic/agg.rs index c7a0ea4f39389..18ac5525182f3 100644 --- a/src/frontend/src/optimizer/plan_node/generic/agg.rs +++ b/src/frontend/src/optimizer/plan_node/generic/agg.rs @@ -74,7 +74,7 @@ impl Agg { for (i, key) in self.group_key.indices().enumerate() { map[i] = Some(key); } - ColIndexMapping::with_target_size(map, self.input.schema().len()) + ColIndexMapping::new(map, self.input.schema().len()) } /// get the Mapping of columnIndex from input column index to out column index @@ -83,7 +83,7 @@ impl Agg { for (i, key) in self.group_key.indices().enumerate() { map[key] = Some(i); } - ColIndexMapping::with_target_size(map, self.output_len()) + ColIndexMapping::new(map, self.output_len()) } fn two_phase_agg_forced(&self) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/generic/expand.rs b/src/frontend/src/optimizer/plan_node/generic/expand.rs index d346677ac2d62..426589d3983e6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/expand.rs +++ b/src/frontend/src/optimizer/plan_node/generic/expand.rs @@ -134,7 +134,7 @@ impl Expand { let map = (0..input_len) .map(|source| Some(source + input_len)) .collect_vec(); - ColIndexMapping::with_target_size(map, self.output_len()) + ColIndexMapping::new(map, self.output_len()) } pub fn o2i_col_mapping(&self) -> ColIndexMapping { diff --git a/src/frontend/src/optimizer/plan_node/generic/join.rs b/src/frontend/src/optimizer/plan_node/generic/join.rs index 2536cee984558..ee8c43b388684 100644 --- a/src/frontend/src/optimizer/plan_node/generic/join.rs +++ b/src/frontend/src/optimizer/plan_node/generic/join.rs @@ -431,7 +431,7 @@ impl Join { pub fn o2i_col_mapping(&self) -> ColIndexMapping { // If output_indices = [0, 0, 1], we should use it as `o2i_col_mapping` directly. // If we use `self.i2o_col_mapping().inverse()`, we will lose the first 0. - ColIndexMapping::with_target_size( + ColIndexMapping::new( self.output_indices.iter().map(|x| Some(*x)).collect(), self.internal_column_num(), ) diff --git a/src/frontend/src/optimizer/plan_node/generic/project.rs b/src/frontend/src/optimizer/plan_node/generic/project.rs index d8b6988af4391..e58ac40e918a8 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project.rs @@ -210,7 +210,7 @@ impl Project { map[i] = Some(input.index()) } } - ColIndexMapping::with_target_size(map, input_len) + ColIndexMapping::new(map, input_len) } /// get the Mapping of columnIndex from input column index to output column index,if a input @@ -224,7 +224,7 @@ impl Project { map[input.index()] = Some(i) } } - ColIndexMapping::with_target_size(map, exprs.len()) + ColIndexMapping::new(map, exprs.len()) } pub fn is_all_inputref(&self) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/generic/project_set.rs b/src/frontend/src/optimizer/plan_node/generic/project_set.rs index fef26d1b32993..195b420802076 100644 --- a/src/frontend/src/optimizer/plan_node/generic/project_set.rs +++ b/src/frontend/src/optimizer/plan_node/generic/project_set.rs @@ -122,7 +122,7 @@ impl ProjectSet { map[1 + i] = Some(input.index()) } } - ColIndexMapping::with_target_size(map, input_len) + ColIndexMapping::new(map, input_len) } /// Gets the Mapping of columnIndex from input column index to output column index,if a input @@ -135,7 +135,7 @@ impl ProjectSet { map[input.index()] = Some(1 + i) } } - ColIndexMapping::with_target_size(map, 1 + self.select_list.len()) + ColIndexMapping::new(map, 1 + self.select_list.len()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 0aed32abec40e..b0e04f0598778 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -1214,7 +1214,7 @@ impl ToStream for LogicalAgg { let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?; let (agg, out_col_change) = self.rewrite_with_input(input, input_col_change); let (map, _) = out_col_change.into_parts(); - let out_col_change = ColIndexMapping::with_target_size(map, agg.schema().len()); + let out_col_change = ColIndexMapping::new(map, agg.schema().len()); Ok((agg.into(), out_col_change)) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs index 054621ef967bf..77388ee33cf4a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_expand.rs +++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs @@ -121,10 +121,7 @@ impl PlanTreeNodeUnary for LogicalExpand { let expand = Self::new(input, column_subsets); let output_col_num = expand.schema().len(); - ( - expand, - ColIndexMapping::with_target_size(mapping, output_col_num), - ) + (expand, ColIndexMapping::new(mapping, output_col_num)) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index a928481230d3c..c594ededa40cf 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -518,8 +518,7 @@ impl PlanTreeNodeBinary for LogicalJoin { *i += left.schema().len(); } map.append(&mut right_map); - let mut mapping = - ColIndexMapping::with_target_size(map, left.schema().len() + right.schema().len()); + let mut mapping = ColIndexMapping::new(map, left.schema().len() + right.schema().len()); let new_output_indices = self .output_indices() diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index 819f84e963cfd..0bbe59dd70754 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -87,7 +87,7 @@ impl LogicalMultiJoinBuilder { /// add a predicate above the plan, so they will be rewritten from the `output_indices` to the /// input indices pub fn add_predicate_above(&mut self, exprs: impl Iterator) { - let mut mapping = ColIndexMapping::with_target_size( + let mut mapping = ColIndexMapping::new( self.output_indices.iter().map(|i| Some(*i)).collect(), self.tot_input_col_num, ); @@ -240,7 +240,7 @@ impl LogicalMultiJoin { i2o_maps .into_iter() - .map(|map| ColIndexMapping::with_target_size(map, tot_col_num)) + .map(|map| ColIndexMapping::new(map, tot_col_num)) .collect_vec() }; diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index c13a0c93a6e70..9e2d586c0fd2a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -81,10 +81,7 @@ impl ToStream for LogicalNow { &self, _ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - Ok(( - self.clone().into(), - ColIndexMapping::with_target_size(vec![Some(0)], 1), - )) + Ok((self.clone().into(), ColIndexMapping::new(vec![Some(0)], 1))) } /// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)` diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index 59a5509ebcd70..aee136e57ca9c 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -284,7 +284,7 @@ impl ToStream for LogicalProject { // But the target size of `out_col_change` should be the same as the length of the new // schema. let (map, _) = out_col_change.into_parts(); - let out_col_change = ColIndexMapping::with_target_size(map, proj.base.schema().len()); + let out_col_change = ColIndexMapping::new(map, proj.base.schema().len()); Ok((proj.into(), out_col_change)) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index fc894713c2f43..727c641d52974 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -385,7 +385,7 @@ impl ToStream for LogicalProjectSet { // But the target size of `out_col_change` should be the same as the length of the new // schema. let (map, _) = out_col_change.into_parts(); - let out_col_change = ColIndexMapping::with_target_size(map, project_set.schema().len()); + let out_col_change = ColIndexMapping::new(map, project_set.schema().len()); Ok((project_set.into(), out_col_change)) } diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 200302ae70f97..c9f5494b86be6 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -233,7 +233,7 @@ impl LogicalScan { } let mut inverse_mapping = { - let mapping = ColIndexMapping::with_target_size( + let mapping = ColIndexMapping::new( self.required_col_idx().iter().map(|i| Some(*i)).collect(), self.table_desc().columns.len(), ); @@ -242,7 +242,7 @@ impl LogicalScan { for (src, dst) in mapping.mapping_pairs() { inverse_map[dst] = Some(src); } - ColIndexMapping::with_target_size(inverse_map, mapping.source_size()) + ColIndexMapping::new(inverse_map, mapping.source_size()) }; predicate = predicate.rewrite_expr(&mut inverse_mapping); @@ -412,7 +412,7 @@ impl PredicatePushdown for LogicalScan { .conjunctions .extract_if(|expr| expr.count_nows() > 0 || HasCorrelated {}.visit_expr(expr)) .collect(); - let predicate = predicate.rewrite_expr(&mut ColIndexMapping::with_target_size( + let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new( self.output_col_idx().iter().map(|i| Some(*i)).collect(), self.table_desc().columns.len(), )); diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 01166e74f1359..cac051957b0a5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -127,7 +127,7 @@ impl LogicalSource { mapping[idx] = None; } } - ColIndexMapping::with_target_size(mapping, columns.len()) + ColIndexMapping::new(mapping, columns.len()) }; let mut rewriter = IndexRewriter::new(col_mapping); diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs index 7ac121692c81d..98363f2aba213 100644 --- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs @@ -356,7 +356,7 @@ impl ApplyJoinTransposeRule { correlated_indices, false, ); - let output_indices: Vec<_> = { + let (output_indices, target_size) = { let (apply_left_len, join_right_len) = match apply_join_type { JoinType::LeftSemi | JoinType::LeftAnti => (apply_left_len, 0), JoinType::RightSemi | JoinType::RightAnti => (0, join.right().schema().len()), @@ -368,14 +368,19 @@ impl ApplyJoinTransposeRule { join_left_len + apply_left_len..join_left_len + apply_left_len + join_right_len, ); - match join.join_type() { + let output_indices: Vec<_> = match join.join_type() { JoinType::LeftSemi | JoinType::LeftAnti => left_iter.collect(), JoinType::RightSemi | JoinType::RightAnti => right_iter.collect(), _ => left_iter.chain(right_iter).collect(), - } + }; + + let target_size = join_left_len + apply_left_len + join_right_len; + (output_indices, target_size) }; - let mut output_indices_mapping = - ColIndexMapping::without_target_size(output_indices.iter().map(|x| Some(*x)).collect()); + let mut output_indices_mapping = ColIndexMapping::new( + output_indices.iter().map(|x| Some(*x)).collect(), + target_size, + ); let new_join = LogicalJoin::new( join.left(), new_join_right, @@ -518,7 +523,7 @@ impl ApplyJoinTransposeRule { false, ); - let output_indices: Vec<_> = { + let (output_indices, target_size) = { let (apply_left_len, join_right_len) = match apply_join_type { JoinType::LeftSemi | JoinType::LeftAnti => (apply_left_len, 0), JoinType::RightSemi | JoinType::RightAnti => (0, join.right().schema().len()), @@ -529,11 +534,14 @@ impl ApplyJoinTransposeRule { let right_iter = join_left_len + apply_left_len * 2 ..join_left_len + apply_left_len * 2 + join_right_len; - match join.join_type() { + let output_indices: Vec<_> = match join.join_type() { JoinType::LeftSemi | JoinType::LeftAnti => left_iter.collect(), JoinType::RightSemi | JoinType::RightAnti => right_iter.collect(), _ => left_iter.chain(right_iter).collect(), - } + }; + + let target_size = join_left_len + apply_left_len * 2 + join_right_len; + (output_indices, target_size) }; let new_join = LogicalJoin::new( new_join_left, @@ -548,8 +556,9 @@ impl ApplyJoinTransposeRule { new_join.into() } JoinType::Inner | JoinType::LeftOuter | JoinType::RightOuter | JoinType::FullOuter => { - let mut output_indices_mapping = ColIndexMapping::without_target_size( + let mut output_indices_mapping = ColIndexMapping::new( output_indices.iter().map(|x| Some(*x)).collect(), + target_size, ); // Leave other condition for predicate push down to deal with LogicalFilter::create( diff --git a/src/frontend/src/optimizer/rule/apply_offset_rewriter.rs b/src/frontend/src/optimizer/rule/apply_offset_rewriter.rs index 4a18f1ba37e44..559ace9d5dbe0 100644 --- a/src/frontend/src/optimizer/rule/apply_offset_rewriter.rs +++ b/src/frontend/src/optimizer/rule/apply_offset_rewriter.rs @@ -74,13 +74,18 @@ pub struct ApplyCorrelatedIndicesConverter {} impl ApplyCorrelatedIndicesConverter { pub fn convert_to_index_mapping(correlated_indices: &[usize]) -> ColIndexMapping { // Inverse anyway. - let col_mapping = ColIndexMapping::without_target_size( + let target_size = match correlated_indices.iter().max_by_key(|&&x| x) { + Some(target_max) => target_max + 1, + None => 0, + }; + let col_mapping = ColIndexMapping::new( correlated_indices.iter().copied().map(Some).collect_vec(), + target_size, ); let mut map = vec![None; col_mapping.target_size()]; for (src, dst) in col_mapping.mapping_pairs() { map[dst] = Some(src); } - ColIndexMapping::with_target_size(map, col_mapping.source_size()) + ColIndexMapping::new(map, col_mapping.source_size()) } } diff --git a/src/frontend/src/optimizer/rule/push_calculation_of_join_rule.rs b/src/frontend/src/optimizer/rule/push_calculation_of_join_rule.rs index ce5004b069a31..a4cdb78990a77 100644 --- a/src/frontend/src/optimizer/rule/push_calculation_of_join_rule.rs +++ b/src/frontend/src/optimizer/rule/push_calculation_of_join_rule.rs @@ -63,7 +63,7 @@ impl Rule for PushCalculationOfJoinRule { ) .map(Some) .collect_vec(); - ColIndexMapping::with_target_size(map, new_internal_col_num) + ColIndexMapping::new(map, new_internal_col_num) }; let (mut exprs, new_output_indices) = Self::remap_exprs_and_output_indices(exprs, output_indices, &mut col_index_mapping); @@ -82,7 +82,7 @@ impl Rule for PushCalculationOfJoinRule { .map(|i| i + left_col_num + left_exprs_non_input_ref.len()) .map(Some) .collect_vec(); - ColIndexMapping::with_target_size(map, new_internal_col_num) + ColIndexMapping::new(map, new_internal_col_num) }; // replace chosen function calls. for (((index_of_func_call, ty), left_expr), right_expr) in indices_and_ty_of_func_calls diff --git a/src/frontend/src/optimizer/rule/translate_apply_rule.rs b/src/frontend/src/optimizer/rule/translate_apply_rule.rs index aee6a2a06e1a9..ec6f58a2422d9 100644 --- a/src/frontend/src/optimizer/rule/translate_apply_rule.rs +++ b/src/frontend/src/optimizer/rule/translate_apply_rule.rs @@ -59,8 +59,7 @@ impl Rule for TranslateApplyRule { let apply_left_len = left.schema().len(); let correlated_indices = apply.correlated_indices(); - let mut index_mapping = - ColIndexMapping::with_target_size(vec![None; apply_left_len], apply_left_len); + let mut index_mapping = ColIndexMapping::new(vec![None; apply_left_len], apply_left_len); let mut data_types = HashMap::new(); let mut index = 0;