Skip to content

Commit

Permalink
perf(agg): merge common agg calls (#12683)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 10, 2023
1 parent 615f873 commit 826db77
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 31 deletions.
18 changes: 9 additions & 9 deletions src/frontend/planner_test/tests/testdata/output/nexmark.yaml

Large diffs are not rendered by default.

26 changes: 13 additions & 13 deletions src/frontend/planner_test/tests/testdata/output/nexmark_source.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1280,7 +1280,7 @@
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [$expr2], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr4) filter((flag = 1:Int64)), count($expr4) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr5) filter((flag = 2:Int64)), count($expr5) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
└─BatchExchange { order: [], dist: HashShard($expr2) }
└─BatchHashAgg { group_key: [$expr2, $expr4, $expr5, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] }
└─BatchHashAgg { group_key: [$expr2, $expr4, $expr5, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] }
└─BatchExchange { order: [], dist: HashShard($expr2, $expr4, $expr5, flag) }
└─BatchExpand { column_subsets: [[$expr2], [$expr2, $expr4], [$expr2, $expr5]] }
└─BatchProject { exprs: [ToChar($expr1, 'yyyy-MM-dd':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5] }
Expand Down Expand Up @@ -1360,7 +1360,7 @@
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [$expr2, $expr3], aggs: [max(max($expr4)) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr5 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr5 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr6) filter((flag = 1:Int64)), count($expr6) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr6) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr7) filter((flag = 2:Int64)), count($expr7) filter((count filter(($expr5 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7) filter((count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr7) filter((count filter(($expr5 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
└─BatchExchange { order: [], dist: HashShard($expr2, $expr3) }
└─BatchHashAgg { group_key: [$expr2, $expr3, $expr6, $expr7, flag], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32)), count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32))] }
└─BatchHashAgg { group_key: [$expr2, $expr3, $expr6, $expr7, flag], aggs: [max($expr4), count, count filter(($expr5 < 10000:Int32)), count filter(($expr5 >= 10000:Int32) AND ($expr5 < 1000000:Int32)), count filter(($expr5 >= 1000000:Int32))] }
└─BatchExchange { order: [], dist: HashShard($expr2, $expr3, $expr6, $expr7, flag) }
└─BatchExpand { column_subsets: [[$expr2, $expr3, $expr4], [$expr2, $expr3, $expr6], [$expr2, $expr3, $expr7]] }
└─BatchProject { exprs: [Field(bid, 3:Int32) as $expr2, ToChar($expr1, 'yyyy-MM-dd':Varchar) as $expr3, ToChar($expr1, 'HH:mm':Varchar) as $expr4, Field(bid, 2:Int32) as $expr5, Field(bid, 1:Int32) as $expr6, Field(bid, 0:Int32) as $expr7] }
Expand Down
19 changes: 13 additions & 6 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,7 @@ static PUSH_CALC_OF_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
static CONVERT_DISTINCT_AGG_FOR_STREAM: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Convert Distinct Aggregation",
vec![
UnionToDistinctRule::create(),
DistinctAggRule::create(true),
AggGroupBySimplifyRule::create(),
],
vec![UnionToDistinctRule::create(), DistinctAggRule::create(true)],
ApplyOrder::TopDown,
)
});
Expand All @@ -255,12 +251,19 @@ static CONVERT_DISTINCT_AGG_FOR_BATCH: LazyLock<OptimizationStage> = LazyLock::n
vec![
UnionToDistinctRule::create(),
DistinctAggRule::create(false),
AggGroupBySimplifyRule::create(),
],
ApplyOrder::TopDown,
)
});

static SIMPLIFY_AGG: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Simplify Aggregation",
vec![AggGroupBySimplifyRule::create(), AggCallMergeRule::create()],
ApplyOrder::TopDown,
)
});

static JOIN_COMMUTE: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Join Commute".to_string(),
Expand Down Expand Up @@ -564,6 +567,8 @@ impl LogicalOptimizer {
plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_STREAM)
};

plan = plan.optimize_by_rules(&SIMPLIFY_AGG);

plan = plan.optimize_by_rules(&JOIN_COMMUTE);

// Do a final column pruning and predicate pushing down to clean up the plan.
Expand Down Expand Up @@ -636,6 +641,8 @@ impl LogicalOptimizer {
// Convert distinct aggregates.
plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH);

plan = plan.optimize_by_rules(&SIMPLIFY_AGG);

plan = plan.optimize_by_rules(&JOIN_COMMUTE);

// Do a final column pruning and predicate pushing down to clean up the plan.
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
self.ctx().session_ctx().config().get_force_two_phase_agg()
}

fn two_phase_agg_enabled(&self) -> bool {
pub fn two_phase_agg_enabled(&self) -> bool {
self.enable_two_phase
}

Expand Down
57 changes: 57 additions & 0 deletions src/frontend/src/optimizer/rule/agg_call_merge_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{BoxedRule, Rule};
use crate::optimizer::plan_node::generic::Agg;
use crate::optimizer::plan_node::{LogicalProject, PlanTreeNodeUnary};
use crate::PlanRef;

/// Merges duplicated aggregate function calls in `LogicalAgg`, and project them back to the desired schema.
pub struct AggCallMergeRule {}

impl Rule for AggCallMergeRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let Some(agg) = plan.as_logical_agg() else {
return None;
};

let calls = agg.agg_calls();
let mut new_calls = Vec::with_capacity(calls.len());
let mut out_fields = (0..agg.group_key().len()).collect::<Vec<_>>();
out_fields.extend(calls.iter().map(|call| {
let pos = new_calls.iter().position(|c| c == call).unwrap_or_else(|| {
let pos = new_calls.len();
new_calls.push(call.clone());
pos
});
agg.group_key().len() + pos
}));

if calls.len() == new_calls.len() {
// no change
None
} else {
let new_agg = Agg::new(new_calls, agg.group_key().clone(), agg.input())
.with_enable_two_phase(agg.core().two_phase_agg_enabled())
.into();
Some(LogicalProject::with_out_col_idx(new_agg, out_fields.into_iter()).into())
}
}
}

impl AggCallMergeRule {
pub fn create() -> BoxedRule {
Box::new(Self {})
}
}
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ mod agg_group_by_simplify_rule;
pub use agg_group_by_simplify_rule::*;
mod apply_hop_window_transpose_rule;
pub use apply_hop_window_transpose_rule::*;
mod agg_call_merge_rule;
pub use agg_call_merge_rule::*;

#[macro_export]
macro_rules! for_all_rules {
Expand Down Expand Up @@ -212,6 +214,7 @@ macro_rules! for_all_rules {
, { ExpandToProjectRule }
, { AggGroupBySimplifyRule }
, { ApplyHopWindowTransposeRule }
, { AggCallMergeRule }
}
};
}
Expand Down

0 comments on commit 826db77

Please sign in to comment.