Skip to content

Commit

Permalink
refactor(optimizer): remove some ColIndexMapping::without_tar_size us…
Browse files Browse the repository at this point in the history
…age (#13134)

Co-authored-by: Dylan Chen <[email protected]>
  • Loading branch information
st1page and chenzl25 authored Oct 30, 2023
1 parent e392db0 commit 67056a5
Show file tree
Hide file tree
Showing 22 changed files with 63 additions and 67 deletions.
32 changes: 11 additions & 21 deletions src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,9 @@ pub struct ColIndexMapping {
}

impl ColIndexMapping {
/// Create a partial mapping which maps the subscripts range `(0..map.len())` to the
/// corresponding element. **This method is not recommended**, please use `with_target_size` instead, see <https://github.com/risingwavelabs/risingwave/issues/7234> for more information**
pub fn without_target_size(map: Vec<Option<usize>>) -> Self {
let target_size = match map.iter().filter_map(|x| *x).max_by_key(|x| *x) {
Some(target_max) => target_max + 1,
None => 0,
};
Self { target_size, map }
}

/// Create a partial mapping which maps from the subscripts range `(0..map.len())` to
/// `(0..target_size)`. Each subscript is mapped to the corresponding element.
pub fn with_target_size(map: Vec<Option<usize>>, target_size: usize) -> Self {
pub fn new(map: Vec<Option<usize>>, target_size: usize) -> Self {
if let Some(target_max) = map.iter().filter_map(|x| *x).max_by_key(|x| *x) {
assert!(target_max < target_size)
};
Expand All @@ -69,7 +59,7 @@ impl ColIndexMapping {

pub fn identity(size: usize) -> Self {
let map = (0..size).map(Some).collect();
Self::without_target_size(map)
Self::new(map, size)
}

pub fn is_identity(&self) -> bool {
Expand All @@ -90,12 +80,12 @@ impl ColIndexMapping {
let map = (0..source_size)
.map(|i| if i < target_size { Some(i) } else { None })
.collect();
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

pub fn empty(source_size: usize, target_size: usize) -> Self {
let map = vec![None; source_size];
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

/// Create a partial mapping which maps range `(0..source_num)` to range
Expand Down Expand Up @@ -134,7 +124,7 @@ impl ColIndexMapping {
})
.collect_vec();
let target_size = usize::try_from(source_num as isize + offset).unwrap();
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

/// Maps the smallest index to 0, the next smallest to 1, and so on.
Expand All @@ -159,7 +149,7 @@ impl ColIndexMapping {
for (tar, &src) in cols.iter().enumerate() {
map[src] = Some(tar);
}
Self::without_target_size(map)
Self::new(map, cols.len())
}

// TODO(yuchao): isn't this the same as `with_remaining_columns`?
Expand All @@ -170,7 +160,7 @@ impl ColIndexMapping {
map[src] = Some(tar);
}
}
Self::without_target_size(map)
Self::new(map, cols.len())
}

/// Remove the given columns, and maps the remaining columns to a consecutive range starting
Expand Down Expand Up @@ -205,15 +195,15 @@ impl ColIndexMapping {
for target in &mut map {
*target = target.and_then(|index| following.try_map(index));
}
Self::with_target_size(map, following.target_size())
Self::new(map, following.target_size())
}

pub fn clone_with_offset(&self, offset: usize) -> Self {
let mut map = self.map.clone();
for target in &mut map {
*target = target.and_then(|index| index.checked_add(offset));
}
Self::with_target_size(map, self.target_size() + offset)
Self::new(map, self.target_size() + offset)
}

/// Union two mapping, the result mapping `target_size` and source size will be the max size
Expand All @@ -236,7 +226,7 @@ impl ColIndexMapping {
assert_eq!(map[src], None);
map[src] = Some(dst);
}
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

/// Inverse the mapping. If a target corresponds to more than one source, return `None`.
Expand All @@ -249,7 +239,7 @@ impl ColIndexMapping {
}
map[dst] = Some(src);
}
Some(Self::with_target_size(map, self.source_size()))
Some(Self::new(map, self.source_size()))
}

/// return iter of (src, dst) order by src
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub async fn handle_alter_table_column(
};

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::with_target_size(
let col_index_mapping = ColIndexMapping::new(
original_catalog
.columns()
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl PlanRoot {
}
let plan =
LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
let out_col_change = ColIndexMapping::with_target_size(map, target_size);
let out_col_change = ColIndexMapping::new(map, target_size);
(plan.into(), out_col_change)
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/eq_join_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl EqJoinPredicate {
for (left, right, _) in self.eq_keys() {
map[right.index - left_cols_num] = Some(left.index);
}
ColIndexMapping::with_target_size(map, left_cols_num)
ColIndexMapping::new(map, left_cols_num)
}

/// return the eq columns index mapping from left inputs to right inputs
Expand All @@ -251,7 +251,7 @@ impl EqJoinPredicate {
for (left, right, _) in self.eq_keys() {
map[left.index] = Some(right.index - left_cols_num);
}
ColIndexMapping::with_target_size(map, right_cols_num)
ColIndexMapping::new(map, right_cols_num)
}

/// Reorder the `eq_keys` according to the `reorder_idx`.
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
for (i, key) in self.group_key.indices().enumerate() {
map[i] = Some(key);
}
ColIndexMapping::with_target_size(map, self.input.schema().len())
ColIndexMapping::new(map, self.input.schema().len())
}

/// get the Mapping of columnIndex from input column index to out column index
Expand All @@ -83,7 +83,7 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
for (i, key) in self.group_key.indices().enumerate() {
map[key] = Some(i);
}
ColIndexMapping::with_target_size(map, self.output_len())
ColIndexMapping::new(map, self.output_len())
}

fn two_phase_agg_forced(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
let map = (0..input_len)
.map(|source| Some(source + input_len))
.collect_vec();
ColIndexMapping::with_target_size(map, self.output_len())
ColIndexMapping::new(map, self.output_len())
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl<PlanRef: GenericPlanRef> Join<PlanRef> {
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
// If output_indices = [0, 0, 1], we should use it as `o2i_col_mapping` directly.
// If we use `self.i2o_col_mapping().inverse()`, we will lose the first 0.
ColIndexMapping::with_target_size(
ColIndexMapping::new(
self.output_indices.iter().map(|x| Some(*x)).collect(),
self.internal_column_num(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
map[i] = Some(input.index())
}
}
ColIndexMapping::with_target_size(map, input_len)
ColIndexMapping::new(map, input_len)
}

/// get the Mapping of columnIndex from input column index to output column index,if a input
Expand All @@ -224,7 +224,7 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
map[input.index()] = Some(i)
}
}
ColIndexMapping::with_target_size(map, exprs.len())
ColIndexMapping::new(map, exprs.len())
}

pub fn is_all_inputref(&self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
map[1 + i] = Some(input.index())
}
}
ColIndexMapping::with_target_size(map, input_len)
ColIndexMapping::new(map, input_len)
}

/// Gets the Mapping of columnIndex from input column index to output column index,if a input
Expand All @@ -135,7 +135,7 @@ impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
map[input.index()] = Some(1 + i)
}
}
ColIndexMapping::with_target_size(map, 1 + self.select_list.len())
ColIndexMapping::new(map, 1 + self.select_list.len())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ impl ToStream for LogicalAgg {
let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
let (agg, out_col_change) = self.rewrite_with_input(input, input_col_change);
let (map, _) = out_col_change.into_parts();
let out_col_change = ColIndexMapping::with_target_size(map, agg.schema().len());
let out_col_change = ColIndexMapping::new(map, agg.schema().len());
Ok((agg.into(), out_col_change))
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/optimizer/plan_node/logical_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ impl PlanTreeNodeUnary for LogicalExpand {

let expand = Self::new(input, column_subsets);
let output_col_num = expand.schema().len();
(
expand,
ColIndexMapping::with_target_size(mapping, output_col_num),
)
(expand, ColIndexMapping::new(mapping, output_col_num))
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,7 @@ impl PlanTreeNodeBinary for LogicalJoin {
*i += left.schema().len();
}
map.append(&mut right_map);
let mut mapping =
ColIndexMapping::with_target_size(map, left.schema().len() + right.schema().len());
let mut mapping = ColIndexMapping::new(map, left.schema().len() + right.schema().len());

let new_output_indices = self
.output_indices()
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_multi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl LogicalMultiJoinBuilder {
/// add a predicate above the plan, so they will be rewritten from the `output_indices` to the
/// input indices
pub fn add_predicate_above(&mut self, exprs: impl Iterator<Item = ExprImpl>) {
let mut mapping = ColIndexMapping::with_target_size(
let mut mapping = ColIndexMapping::new(
self.output_indices.iter().map(|i| Some(*i)).collect(),
self.tot_input_col_num,
);
Expand Down Expand Up @@ -240,7 +240,7 @@ impl LogicalMultiJoin {

i2o_maps
.into_iter()
.map(|map| ColIndexMapping::with_target_size(map, tot_col_num))
.map(|map| ColIndexMapping::new(map, tot_col_num))
.collect_vec()
};

Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/optimizer/plan_node/logical_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ impl ToStream for LogicalNow {
&self,
_ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
Ok((
self.clone().into(),
ColIndexMapping::with_target_size(vec![Some(0)], 1),
))
Ok((self.clone().into(), ColIndexMapping::new(vec![Some(0)], 1)))
}

/// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)`
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl ToStream for LogicalProject {
// But the target size of `out_col_change` should be the same as the length of the new
// schema.
let (map, _) = out_col_change.into_parts();
let out_col_change = ColIndexMapping::with_target_size(map, proj.base.schema().len());
let out_col_change = ColIndexMapping::new(map, proj.base.schema().len());
Ok((proj.into(), out_col_change))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl ToStream for LogicalProjectSet {
// But the target size of `out_col_change` should be the same as the length of the new
// schema.
let (map, _) = out_col_change.into_parts();
let out_col_change = ColIndexMapping::with_target_size(map, project_set.schema().len());
let out_col_change = ColIndexMapping::new(map, project_set.schema().len());
Ok((project_set.into(), out_col_change))
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl LogicalScan {
}

let mut inverse_mapping = {
let mapping = ColIndexMapping::with_target_size(
let mapping = ColIndexMapping::new(
self.required_col_idx().iter().map(|i| Some(*i)).collect(),
self.table_desc().columns.len(),
);
Expand All @@ -242,7 +242,7 @@ impl LogicalScan {
for (src, dst) in mapping.mapping_pairs() {
inverse_map[dst] = Some(src);
}
ColIndexMapping::with_target_size(inverse_map, mapping.source_size())
ColIndexMapping::new(inverse_map, mapping.source_size())
};

predicate = predicate.rewrite_expr(&mut inverse_mapping);
Expand Down Expand Up @@ -412,7 +412,7 @@ impl PredicatePushdown for LogicalScan {
.conjunctions
.extract_if(|expr| expr.count_nows() > 0 || HasCorrelated {}.visit_expr(expr))
.collect();
let predicate = predicate.rewrite_expr(&mut ColIndexMapping::with_target_size(
let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
self.output_col_idx().iter().map(|i| Some(*i)).collect(),
self.table_desc().columns.len(),
));
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl LogicalSource {
mapping[idx] = None;
}
}
ColIndexMapping::with_target_size(mapping, columns.len())
ColIndexMapping::new(mapping, columns.len())
};

let mut rewriter = IndexRewriter::new(col_mapping);
Expand Down
Loading

0 comments on commit 67056a5

Please sign in to comment.