Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): remove some ColIndexMapping::without_tar_size usage #13134

Merged
merged 4 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,9 @@ pub struct ColIndexMapping {
}

impl ColIndexMapping {
/// Create a partial mapping which maps the subscripts range `(0..map.len())` to the
/// corresponding element. **This method is not recommended**, please use `with_target_size` instead, see <https://github.com/risingwavelabs/risingwave/issues/7234> for more information**
pub fn without_target_size(map: Vec<Option<usize>>) -> Self {
let target_size = match map.iter().filter_map(|x| *x).max_by_key(|x| *x) {
Some(target_max) => target_max + 1,
None => 0,
};
Self { target_size, map }
}

/// Create a partial mapping which maps from the subscripts range `(0..map.len())` to
/// `(0..target_size)`. Each subscript is mapped to the corresponding element.
pub fn with_target_size(map: Vec<Option<usize>>, target_size: usize) -> Self {
pub fn new(map: Vec<Option<usize>>, target_size: usize) -> Self {
if let Some(target_max) = map.iter().filter_map(|x| *x).max_by_key(|x| *x) {
assert!(target_max < target_size)
};
Expand All @@ -69,7 +59,7 @@ impl ColIndexMapping {

pub fn identity(size: usize) -> Self {
let map = (0..size).map(Some).collect();
Self::without_target_size(map)
Self::new(map, size)
}

pub fn is_identity(&self) -> bool {
Expand All @@ -90,12 +80,12 @@ impl ColIndexMapping {
let map = (0..source_size)
.map(|i| if i < target_size { Some(i) } else { None })
.collect();
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

pub fn empty(source_size: usize, target_size: usize) -> Self {
let map = vec![None; source_size];
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

/// Create a partial mapping which maps range `(0..source_num)` to range
Expand Down Expand Up @@ -134,7 +124,7 @@ impl ColIndexMapping {
})
.collect_vec();
let target_size = usize::try_from(source_num as isize + offset).unwrap();
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

/// Maps the smallest index to 0, the next smallest to 1, and so on.
Expand All @@ -159,7 +149,7 @@ impl ColIndexMapping {
for (tar, &src) in cols.iter().enumerate() {
map[src] = Some(tar);
}
Self::without_target_size(map)
Self::new(map, cols.len())
}

// TODO(yuchao): isn't this the same as `with_remaining_columns`?
Expand All @@ -170,7 +160,7 @@ impl ColIndexMapping {
map[src] = Some(tar);
}
}
Self::without_target_size(map)
Self::new(map, cols.len())
}

/// Remove the given columns, and maps the remaining columns to a consecutive range starting
Expand Down Expand Up @@ -205,15 +195,15 @@ impl ColIndexMapping {
for target in &mut map {
*target = target.and_then(|index| following.try_map(index));
}
Self::with_target_size(map, following.target_size())
Self::new(map, following.target_size())
}

pub fn clone_with_offset(&self, offset: usize) -> Self {
let mut map = self.map.clone();
for target in &mut map {
*target = target.and_then(|index| index.checked_add(offset));
}
Self::with_target_size(map, self.target_size() + offset)
Self::new(map, self.target_size() + offset)
}

/// Union two mapping, the result mapping `target_size` and source size will be the max size
Expand All @@ -236,7 +226,7 @@ impl ColIndexMapping {
assert_eq!(map[src], None);
map[src] = Some(dst);
}
Self::with_target_size(map, target_size)
Self::new(map, target_size)
}

/// Inverse the mapping. If a target corresponds to more than one source, return `None`.
Expand All @@ -249,7 +239,7 @@ impl ColIndexMapping {
}
map[dst] = Some(src);
}
Some(Self::with_target_size(map, self.source_size()))
Some(Self::new(map, self.source_size()))
}

/// return iter of (src, dst) order by src
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/alter_table_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub async fn handle_alter_table_column(
};

// Calculate the mapping from the original columns to the new columns.
let col_index_mapping = ColIndexMapping::with_target_size(
let col_index_mapping = ColIndexMapping::new(
original_catalog
.columns()
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl PlanRoot {
}
let plan =
LogicalProject::with_out_col_idx(plan, output_indices.into_iter());
let out_col_change = ColIndexMapping::with_target_size(map, target_size);
let out_col_change = ColIndexMapping::new(map, target_size);
(plan.into(), out_col_change)
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/eq_join_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl EqJoinPredicate {
for (left, right, _) in self.eq_keys() {
map[right.index - left_cols_num] = Some(left.index);
}
ColIndexMapping::with_target_size(map, left_cols_num)
ColIndexMapping::new(map, left_cols_num)
}

/// return the eq columns index mapping from left inputs to right inputs
Expand All @@ -251,7 +251,7 @@ impl EqJoinPredicate {
for (left, right, _) in self.eq_keys() {
map[left.index] = Some(right.index - left_cols_num);
}
ColIndexMapping::with_target_size(map, right_cols_num)
ColIndexMapping::new(map, right_cols_num)
}

/// Reorder the `eq_keys` according to the `reorder_idx`.
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
for (i, key) in self.group_key.indices().enumerate() {
map[i] = Some(key);
}
ColIndexMapping::with_target_size(map, self.input.schema().len())
ColIndexMapping::new(map, self.input.schema().len())
}

/// get the Mapping of columnIndex from input column index to out column index
Expand All @@ -83,7 +83,7 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
for (i, key) in self.group_key.indices().enumerate() {
map[key] = Some(i);
}
ColIndexMapping::with_target_size(map, self.output_len())
ColIndexMapping::new(map, self.output_len())
}

fn two_phase_agg_forced(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<PlanRef: GenericPlanRef> Expand<PlanRef> {
let map = (0..input_len)
.map(|source| Some(source + input_len))
.collect_vec();
ColIndexMapping::with_target_size(map, self.output_len())
ColIndexMapping::new(map, self.output_len())
}

pub fn o2i_col_mapping(&self) -> ColIndexMapping {
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ impl<PlanRef: GenericPlanRef> Join<PlanRef> {
pub fn o2i_col_mapping(&self) -> ColIndexMapping {
// If output_indices = [0, 0, 1], we should use it as `o2i_col_mapping` directly.
// If we use `self.i2o_col_mapping().inverse()`, we will lose the first 0.
ColIndexMapping::with_target_size(
ColIndexMapping::new(
self.output_indices.iter().map(|x| Some(*x)).collect(),
self.internal_column_num(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
map[i] = Some(input.index())
}
}
ColIndexMapping::with_target_size(map, input_len)
ColIndexMapping::new(map, input_len)
}

/// get the Mapping of columnIndex from input column index to output column index,if a input
Expand All @@ -224,7 +224,7 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
map[input.index()] = Some(i)
}
}
ColIndexMapping::with_target_size(map, exprs.len())
ColIndexMapping::new(map, exprs.len())
}

pub fn is_all_inputref(&self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/generic/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
map[1 + i] = Some(input.index())
}
}
ColIndexMapping::with_target_size(map, input_len)
ColIndexMapping::new(map, input_len)
}

/// Gets the Mapping of columnIndex from input column index to output column index,if a input
Expand All @@ -135,7 +135,7 @@ impl<PlanRef: GenericPlanRef> ProjectSet<PlanRef> {
map[input.index()] = Some(1 + i)
}
}
ColIndexMapping::with_target_size(map, 1 + self.select_list.len())
ColIndexMapping::new(map, 1 + self.select_list.len())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ impl ToStream for LogicalAgg {
let (input, input_col_change) = self.input().logical_rewrite_for_stream(ctx)?;
let (agg, out_col_change) = self.rewrite_with_input(input, input_col_change);
let (map, _) = out_col_change.into_parts();
let out_col_change = ColIndexMapping::with_target_size(map, agg.schema().len());
let out_col_change = ColIndexMapping::new(map, agg.schema().len());
Ok((agg.into(), out_col_change))
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/optimizer/plan_node/logical_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,7 @@ impl PlanTreeNodeUnary for LogicalExpand {

let expand = Self::new(input, column_subsets);
let output_col_num = expand.schema().len();
(
expand,
ColIndexMapping::with_target_size(mapping, output_col_num),
)
(expand, ColIndexMapping::new(mapping, output_col_num))
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,7 @@ impl PlanTreeNodeBinary for LogicalJoin {
*i += left.schema().len();
}
map.append(&mut right_map);
let mut mapping =
ColIndexMapping::with_target_size(map, left.schema().len() + right.schema().len());
let mut mapping = ColIndexMapping::new(map, left.schema().len() + right.schema().len());

let new_output_indices = self
.output_indices()
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_multi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl LogicalMultiJoinBuilder {
/// add a predicate above the plan, so they will be rewritten from the `output_indices` to the
/// input indices
pub fn add_predicate_above(&mut self, exprs: impl Iterator<Item = ExprImpl>) {
let mut mapping = ColIndexMapping::with_target_size(
let mut mapping = ColIndexMapping::new(
self.output_indices.iter().map(|i| Some(*i)).collect(),
self.tot_input_col_num,
);
Expand Down Expand Up @@ -240,7 +240,7 @@ impl LogicalMultiJoin {

i2o_maps
.into_iter()
.map(|map| ColIndexMapping::with_target_size(map, tot_col_num))
.map(|map| ColIndexMapping::new(map, tot_col_num))
.collect_vec()
};

Expand Down
5 changes: 1 addition & 4 deletions src/frontend/src/optimizer/plan_node/logical_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,7 @@ impl ToStream for LogicalNow {
&self,
_ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
Ok((
self.clone().into(),
ColIndexMapping::with_target_size(vec![Some(0)], 1),
))
Ok((self.clone().into(), ColIndexMapping::new(vec![Some(0)], 1)))
}

/// `to_stream` is equivalent to `to_stream_with_dist_required(RequiredDist::Any)`
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl ToStream for LogicalProject {
// But the target size of `out_col_change` should be the same as the length of the new
// schema.
let (map, _) = out_col_change.into_parts();
let out_col_change = ColIndexMapping::with_target_size(map, proj.base.schema().len());
let out_col_change = ColIndexMapping::new(map, proj.base.schema().len());
Ok((proj.into(), out_col_change))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl ToStream for LogicalProjectSet {
// But the target size of `out_col_change` should be the same as the length of the new
// schema.
let (map, _) = out_col_change.into_parts();
let out_col_change = ColIndexMapping::with_target_size(map, project_set.schema().len());
let out_col_change = ColIndexMapping::new(map, project_set.schema().len());
Ok((project_set.into(), out_col_change))
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl LogicalScan {
}

let mut inverse_mapping = {
let mapping = ColIndexMapping::with_target_size(
let mapping = ColIndexMapping::new(
self.required_col_idx().iter().map(|i| Some(*i)).collect(),
self.table_desc().columns.len(),
);
Expand All @@ -242,7 +242,7 @@ impl LogicalScan {
for (src, dst) in mapping.mapping_pairs() {
inverse_map[dst] = Some(src);
}
ColIndexMapping::with_target_size(inverse_map, mapping.source_size())
ColIndexMapping::new(inverse_map, mapping.source_size())
};

predicate = predicate.rewrite_expr(&mut inverse_mapping);
Expand Down Expand Up @@ -412,7 +412,7 @@ impl PredicatePushdown for LogicalScan {
.conjunctions
.extract_if(|expr| expr.count_nows() > 0 || HasCorrelated {}.visit_expr(expr))
.collect();
let predicate = predicate.rewrite_expr(&mut ColIndexMapping::with_target_size(
let predicate = predicate.rewrite_expr(&mut ColIndexMapping::new(
self.output_col_idx().iter().map(|i| Some(*i)).collect(),
self.table_desc().columns.len(),
));
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl LogicalSource {
mapping[idx] = None;
}
}
ColIndexMapping::with_target_size(mapping, columns.len())
ColIndexMapping::new(mapping, columns.len())
};

let mut rewriter = IndexRewriter::new(col_mapping);
Expand Down
Loading
Loading