Skip to content

Commit

Permalink
add AggCallMergeRule
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Oct 8, 2023
1 parent 437d36e commit aaa94d7
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 6 deletions.
19 changes: 13 additions & 6 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,7 @@ static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert Distinct Aggregation",
vec![
UnionToDistinctRule::create(),
DistinctAggRule::create(true),
AggGroupBySimplifyRule::create(),
],
vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
ApplyOrder::TopDown,
)
});
Expand All @@ -255,12 +251,19 @@ static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::n
vec![
UnionToDistinctRule::create(),
DistinctAggRule::create(false),
AggGroupBySimplifyRule::create(),
],
ApplyOrder::TopDown,
)
});

static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Simplify Aggregation",
vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
ApplyOrder::TopDown,
)
});

static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Join Commute".to_string(),
Expand Down Expand Up @@ -564,6 +567,8 @@ impl LogicalOptimizer {
plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)
};

plan = plan.optimize_by_rules(&SIMPLIFY_AGG);

plan = plan.optimize_by_rules(&JOIN_COMMUTE);

// Do a final column pruning and predicate pushing down to clean up the plan.
Expand Down Expand Up @@ -636,6 +641,8 @@ impl LogicalOptimizer {
// Convert distinct aggregates.
plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH);

plan = plan.optimize_by_rules(&SIMPLIFY_AGG);

plan = plan.optimize_by_rules(&JOIN_COMMUTE);

// Do a final column pruning and predicate pushing down to clean up the plan.
Expand Down
54 changes: 54 additions & 0 deletions src/frontend/src/optimizer/rule/agg_call_merge_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{BoxedRule, Rule};
use crate::optimizer::plan_node::generic::Agg;
use crate::optimizer::plan_node::{LogicalProject, PlanTreeNodeUnary};
use crate::PlanRef;

pub struct AggCallMergeRule {}

impl Rule for AggCallMergeRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let Some(agg) = plan.as_logical_agg() else {
return None;
};

let calls = agg.agg_calls();
let mut new_calls = Vec::with_capacity(calls.len());
let mut out_fields = (0..agg.group_key().len()).collect::<Vec<_>>();
out_fields.extend(calls.iter().map(|call| {
let pos = new_calls.iter().position(|c| c == call).unwrap_or_else(|| {
let pos = new_calls.len();
new_calls.push(call.clone());
pos
});
agg.group_key().len() + pos
}));

if calls.len() == new_calls.len() {
// no change
None
} else {
let new_agg = Agg::new(new_calls, agg.group_key().clone(), agg.input()).into();
Some(LogicalProject::with_out_col_idx(new_agg, out_fields.into_iter()).into())
}
}
}

impl AggCallMergeRule {
pub fn create() -> BoxedRule {
Box::new(Self {})
}
}
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ mod agg_group_by_simplify_rule;
pub use agg_group_by_simplify_rule::*;
mod apply_hop_window_transpose_rule;
pub use apply_hop_window_transpose_rule::*;
mod agg_call_merge_rule;
pub use agg_call_merge_rule::*;

#[macro_export]
macro_rules! for_all_rules {
Expand Down Expand Up @@ -212,6 +214,7 @@ macro_rules! for_all_rules {
, { ExpandToProjectRule }
, { AggGroupBySimplifyRule }
, { ApplyHopWindowTransposeRule }
, { AggCallMergeRule }
}
};
}
Expand Down

0 comments on commit aaa94d7

Please sign in to comment.