Skip to content

Commit

Permalink
feat(optimizer): merge chaining OverWindows with same partition by …
Browse files Browse the repository at this point in the history
…and order by (#13281)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Nov 7, 2023
1 parent 04999f0 commit 875875d
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,24 @@
- optimized_logical_plan_for_stream
- stream_plan
- batch_plan
- id: TopN among multiple window function calls, some not TopN
sql: |
create table t (x int, y int, z int);
select r2, r3
from (
select
*,
row_number() over (partition by x order by y) r1,
row_number() over (partition by x, y order by z) r2,
rank() over (partition by x, y order by z) r3
from t
) Q
where Q.r1 < 10;
expected_outputs:
- logical_plan
- optimized_logical_plan_for_stream
- stream_plan
- batch_plan

# TopN on nexmark schema
- id: create_bid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,7 +1132,7 @@
JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S
ON mod(B.auction, 10000) = S.key
sink_plan: |-
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10040(hidden), side_input.key(hidden)] }
StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10042(hidden), side_input.key(hidden)] }
└─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] }
├─StreamExchange { dist: HashShard($expr1) }
│ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,27 +385,21 @@
└─LogicalScan { table: t, columns: [t.x, t._row_id] }
optimized_logical_plan_for_stream: |-
LogicalProject { exprs: [row_number, rank, dense_rank] }
└─LogicalOverWindow { window_functions: [dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalOverWindow { window_functions: [rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalScan { table: t, columns: [t.x] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalScan { table: t, columns: [t.x] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [row_number, rank, dense_rank] }
└─BatchOverWindow { window_functions: [dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchOverWindow { window_functions: [rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchExchange { order: [t.x ASC, t.x ASC], dist: HashShard(t.x) }
└─BatchSort { order: [t.x ASC, t.x ASC] }
└─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
└─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchExchange { order: [t.x ASC, t.x ASC], dist: HashShard(t.x) }
└─BatchSort { order: [t.x ASC, t.x ASC] }
└─BatchScan { table: t, columns: [t.x], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [row_number, rank, dense_rank, t._row_id(hidden), t.x(hidden)], stream_key: [t._row_id, t.x], pk_columns: [t._row_id, t.x], pk_conflict: NoCheck }
└─StreamProject { exprs: [row_number, rank, dense_rank, t._row_id, t.x] }
└─StreamOverWindow { window_functions: [dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamOverWindow { window_functions: [rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- id: row_number with valid over clause
sql: |
create table t(x int, y int);
Expand Down Expand Up @@ -801,6 +795,46 @@
└─StreamGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- id: TopN among multiple window function calls, some not TopN
sql: |
create table t (x int, y int, z int);
select r2, r3
from (
select
*,
row_number() over (partition by x order by y) r1,
row_number() over (partition by x, y order by z) r2,
rank() over (partition by x, y order by z) r3
from t
) Q
where Q.r1 < 10;
logical_plan: |-
LogicalProject { exprs: [row_number, rank] }
└─LogicalFilter { predicate: (row_number < 10:Int32) }
└─LogicalProject { exprs: [t.x, t.y, t.z, row_number, row_number, rank] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalProject { exprs: [t.x, t.y, t.z, t._row_id] }
└─LogicalScan { table: t, columns: [t.x, t.y, t.z, t._row_id] }
optimized_logical_plan_for_stream: |-
LogicalProject { exprs: [row_number, rank] }
└─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─LogicalTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] }
└─LogicalScan { table: t, columns: [t.x, t.y, t.z] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [row_number, rank] }
└─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─BatchSort { order: [t.x ASC, t.y ASC, t.z ASC] }
└─BatchGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] }
└─BatchExchange { order: [], dist: HashShard(t.x) }
└─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [r2, r3, t.x(hidden), t._row_id(hidden), t.y(hidden)], stream_key: [t.x, t._row_id, t.y], pk_columns: [t.x, t._row_id, t.y], pk_conflict: NoCheck }
└─StreamProject { exprs: [row_number, rank, t.x, t._row_id, t.y] }
└─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] }
└─StreamGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] }
└─StreamExchange { dist: HashShard(t.x) }
└─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- id: create_bid
sql: |
/*
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ static CONVERT_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
)
});

static MERGE_OVER_WINDOW: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Merge Over Window",
vec![OverWindowMergeRule::create()],
ApplyOrder::TopDown,
)
});

static REWRITE_LIKE_EXPR: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Rewrite Like Expr",
Expand Down Expand Up @@ -567,6 +575,7 @@ impl LogicalOptimizer {
// optimized to TopN.
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW);
plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW);

let force_split_distinct_agg = ctx.session_ctx().config().get_force_split_distinct_agg();
// TODO: better naming of the OptimizationStage
Expand Down Expand Up @@ -647,6 +656,7 @@ impl LogicalOptimizer {
// optimized to TopN.
plan = Self::predicate_pushdown(plan, explain_trace, &ctx);
plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW);
plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW);

// Convert distinct aggregates.
plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH);
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,14 @@ impl LogicalOverWindow {
&self.core.window_functions
}

pub fn partition_key_indices(&self) -> Vec<usize> {
self.core.partition_key_indices()
}

pub fn order_key(&self) -> &[ColumnOrder] {
self.core.order_key()
}

#[must_use]
fn rewrite_with_input_and_window(
&self,
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub trait Description {

pub(super) type BoxedRule = Box<dyn Rule>;

mod over_window_merge_rule;
pub use over_window_merge_rule::*;
mod project_join_merge_rule;
pub use project_join_merge_rule::*;
mod project_eliminate_rule;
Expand Down Expand Up @@ -180,6 +182,7 @@ macro_rules! for_all_rules {
, { OverWindowToTopNRule }
, { OverWindowToAggAndJoinRule }
, { OverWindowSplitRule }
, { OverWindowMergeRule }
, { JoinCommuteRule }
, { UnionToDistinctRule }
, { AggProjectMergeRule }
Expand Down
63 changes: 63 additions & 0 deletions src/frontend/src/optimizer/rule/over_window_merge_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::Rule;
use crate::optimizer::plan_node::{LogicalOverWindow, PlanTreeNodeUnary};
use crate::PlanRef;

/// Merge chaining `LogicalOverWindow`s with same `PARTITION BY` and `ORDER BY`.
/// Should be applied after `OverWindowSplitRule`.
pub struct OverWindowMergeRule;

impl OverWindowMergeRule {
pub fn create() -> Box<dyn Rule> {
Box::new(OverWindowMergeRule)
}
}

impl Rule for OverWindowMergeRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let over_window = plan.as_logical_over_window()?;
let mut window_functions_rev = over_window
.window_functions()
.iter()
.rev()
.cloned()
.collect::<Vec<_>>();
let partition_key = over_window.partition_key_indices();
let order_key = over_window.order_key();

let mut curr = plan.clone();
let mut curr_input = over_window.input();
while let Some(input_over_window) = curr_input.as_logical_over_window() {
if input_over_window.partition_key_indices() != partition_key
|| input_over_window.order_key() != order_key
{
// cannot merge `OverWindow`s with different partition key or order key
break;
}
window_functions_rev.extend(input_over_window.window_functions().iter().rev().cloned());
curr = curr_input.clone();
curr_input = input_over_window.input();
}

if curr.as_logical_over_window().unwrap() == over_window {
// unchanged
return None;
}

let window_functions = window_functions_rev.into_iter().rev().collect::<Vec<_>>();
Some(LogicalOverWindow::new(window_functions, curr_input).into())
}
}

0 comments on commit 875875d

Please sign in to comment.