Skip to content

Commit

Permalink
change agg decompose
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page committed Sep 12, 2023
1 parent b3944af commit 2613419
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,13 @@ impl<PlanRef: stream::StreamPlanRef> Agg<PlanRef> {
.collect()
}

pub fn decompose(self) -> (Vec<PlanAggCall>, IndexSet, Vec<IndexSet>, PlanRef) {
pub fn decompose(self) -> (Vec<PlanAggCall>, IndexSet, Vec<IndexSet>, PlanRef, bool) {
(
self.agg_calls,
self.group_key,
self.grouping_sets,
self.input,
self.enable_two_phase,
)
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ impl LogicalAgg {
&self.core.grouping_sets
}

pub fn decompose(self) -> (Vec<PlanAggCall>, IndexSet, Vec<IndexSet>, PlanRef) {
pub fn decompose(self) -> (Vec<PlanAggCall>, IndexSet, Vec<IndexSet>, PlanRef, bool) {
self.core.decompose()
}

Expand Down
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/rule/agg_project_merge_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use itertools::Itertools;

use super::super::plan_node::*;
use super::{BoxedRule, Rule};
use crate::optimizer::plan_node::generic::Agg;
use crate::utils::IndexSet;

/// Merge [`LogicalAgg`] <- [`LogicalProject`] to [`LogicalAgg`].
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/optimizer/rule/apply_agg_transpose_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ impl Rule for ApplyAggTransposeRule {
apply.clone().decompose();
assert_eq!(join_type, JoinType::Inner);
let agg: &LogicalAgg = right.as_logical_agg()?;
let (mut agg_calls, agg_group_key, grouping_sets, agg_input) = agg.clone().decompose();
let (mut agg_calls, agg_group_key, grouping_sets, agg_input, enable_two_phase) =
agg.clone().decompose();
assert!(grouping_sets.is_empty());
let is_scalar_agg = agg_group_key.is_empty();
let apply_left_len = left.schema().len();
Expand Down Expand Up @@ -147,7 +148,9 @@ impl Rule for ApplyAggTransposeRule {
}
let mut group_keys: IndexSet = (0..apply_left_len).collect();
group_keys.extend(agg_group_key.indices().map(|key| key + apply_left_len));
Agg::new(agg_calls, group_keys, node).into()
Agg::new(agg_calls, group_keys, node)
.with_enable_two_phase(enable_two_phase)
.into()
};

let filter = LogicalFilter::create(group_agg, on);
Expand Down
9 changes: 7 additions & 2 deletions src/frontend/src/optimizer/rule/distinct_agg_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ pub struct DistinctAggRule {
impl Rule for DistinctAggRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let agg: &LogicalAgg = plan.as_logical_agg()?;
let (mut agg_calls, mut agg_group_keys, grouping_sets, input) = agg.clone().decompose();
let (mut agg_calls, mut agg_group_keys, grouping_sets, input, enable_two_phase) =
agg.clone().decompose();
assert!(grouping_sets.is_empty());

if agg_calls.iter().all(|c| !c.distinct) {
Expand Down Expand Up @@ -84,6 +85,7 @@ impl Rule for DistinctAggRule {
agg_calls,
flag_values,
has_expand,
enable_two_phase,
))
}
}
Expand Down Expand Up @@ -246,6 +248,7 @@ impl DistinctAggRule {
mut agg_calls: Vec<PlanAggCall>,
flag_values: Vec<usize>,
has_expand: bool,
enable_two_phase: bool,
) -> PlanRef {
// the index of `flag` in schema of the middle `LogicalAgg`, if has `Expand`.
let pos_of_flag = mid_agg.group_key.len() - 1;
Expand Down Expand Up @@ -322,6 +325,8 @@ impl DistinctAggRule {
}
});

Agg::new(agg_calls, final_agg_group_keys, mid_agg.into()).into()
Agg::new(agg_calls, final_agg_group_keys, mid_agg.into())
.with_enable_two_phase(enable_two_phase)
.into()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Rule for GroupingSetsToExpandRule {
return None;
}
let agg = Self::prune_column_for_agg(agg);
let (agg_calls, mut group_keys, grouping_sets, input) = agg.decompose();
let (agg_calls, mut group_keys, grouping_sets, input, enable_two_phase) = agg.decompose();

let flag_col_idx = group_keys.len();
let input_schema_len = input.schema().len();
Expand Down Expand Up @@ -159,7 +159,8 @@ impl Rule for GroupingSetsToExpandRule {
}
}

let new_agg = Agg::new(new_agg_calls, group_keys, expand);
let new_agg =
Agg::new(new_agg_calls, group_keys, expand).with_enable_two_phase(enable_two_phase);
let project_exprs = (0..flag_col_idx)
.map(|i| {
ExprImpl::InputRef(
Expand Down

0 comments on commit 2613419

Please sign in to comment.