From 875875dae50c58a7935caa2724f1dccce7bf6247 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 7 Nov 2023 15:46:29 +0800 Subject: [PATCH] feat(optimizer): merge chaining `OverWindow`s with same partition by and order by (#13281) Signed-off-by: Richard Chien --- .../testdata/input/over_window_function.yaml | 18 ++++++ .../tests/testdata/output/nexmark.yaml | 2 +- .../testdata/output/over_window_function.yaml | 64 ++++++++++++++----- .../src/optimizer/logical_optimization.rs | 10 +++ .../plan_node/logical_over_window.rs | 8 +++ src/frontend/src/optimizer/rule/mod.rs | 3 + .../optimizer/rule/over_window_merge_rule.rs | 63 ++++++++++++++++++ 7 files changed, 152 insertions(+), 16 deletions(-) create mode 100644 src/frontend/src/optimizer/rule/over_window_merge_rule.rs diff --git a/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml index 33c00048abc60..ff7a157d84434 100644 --- a/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/input/over_window_function.yaml @@ -370,6 +370,24 @@ - optimized_logical_plan_for_stream - stream_plan - batch_plan +- id: TopN among multiple window function calls, some not TopN + sql: | + create table t (x int, y int, z int); + select r2, r3 + from ( + select + *, + row_number() over (partition by x order by y) r1, + row_number() over (partition by x, y order by z) r2, + rank() over (partition by x, y order by z) r3 + from t + ) Q + where Q.r1 < 10; + expected_outputs: + - logical_plan + - optimized_logical_plan_for_stream + - stream_plan + - batch_plan # TopN on nexmark schema - id: create_bid diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 637b7b3ddecc7..1399ee708ee9c 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1132,7 +1132,7 @@ JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S ON mod(B.auction, 10000) = S.key sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10040(hidden), side_input.key(hidden)] } + StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10042(hidden), side_input.key(hidden)] } └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } diff --git a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml index c54f2a9458e44..54926222e976b 100644 --- a/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml +++ b/src/frontend/planner_test/tests/testdata/output/over_window_function.yaml @@ -385,27 +385,21 @@ └─LogicalScan { table: t, columns: [t.x, t._row_id] } optimized_logical_plan_for_stream: |- LogicalProject { exprs: [row_number, rank, dense_rank] } - └─LogicalOverWindow { window_functions: [dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalOverWindow { window_functions: [rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─LogicalScan { table: t, columns: [t.x] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─LogicalScan { table: t, columns: [t.x] } batch_plan: |- BatchExchange { order: [], dist: Single } └─BatchProject { exprs: [row_number, rank, dense_rank] } - └─BatchOverWindow { window_functions: [dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─BatchOverWindow { window_functions: [rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─BatchExchange { order: [t.x ASC, t.x ASC], dist: HashShard(t.x) } - └─BatchSort { order: [t.x ASC, t.x ASC] } - └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchExchange { order: [t.x ASC, t.x ASC], dist: HashShard(t.x) } + └─BatchSort { order: [t.x ASC, t.x ASC] } + └─BatchScan { table: t, columns: [t.x], distribution: SomeShard } stream_plan: |- StreamMaterialize { columns: [row_number, rank, dense_rank, t._row_id(hidden), t.x(hidden)], stream_key: [t._row_id, t.x], pk_columns: [t._row_id, t.x], pk_conflict: NoCheck } └─StreamProject { exprs: [row_number, rank, dense_rank, t._row_id, t.x] } - └─StreamOverWindow { window_functions: [dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamOverWindow { window_functions: [rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } - └─StreamExchange { dist: HashShard(t.x) } - └─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), dense_rank() OVER(PARTITION BY t.x ORDER BY t.x ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamExchange { dist: HashShard(t.x) } + └─StreamTableScan { table: t, columns: [t.x, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: row_number with valid over clause sql: | create table t(x int, y int); @@ -801,6 +795,46 @@ └─StreamGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } └─StreamExchange { dist: HashShard(t.x) } └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } +- id: TopN among multiple window function calls, some not TopN + sql: | + create table t (x int, y int, z int); + select r2, r3 + from ( + select + *, + row_number() over (partition by x order by y) r1, + row_number() over (partition by x, y order by z) r2, + rank() over (partition by x, y order by z) r3 + from t + ) Q + where Q.r1 < 10; + logical_plan: |- + LogicalProject { exprs: [row_number, rank] } + └─LogicalFilter { predicate: (row_number < 10:Int32) } + └─LogicalProject { exprs: [t.x, t.y, t.z, row_number, row_number, rank] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x ORDER BY t.y ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─LogicalProject { exprs: [t.x, t.y, t.z, t._row_id] } + └─LogicalScan { table: t, columns: [t.x, t.y, t.z, t._row_id] } + optimized_logical_plan_for_stream: |- + LogicalProject { exprs: [row_number, rank] } + └─LogicalOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─LogicalTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } + └─LogicalScan { table: t, columns: [t.x, t.y, t.z] } + batch_plan: |- + BatchExchange { order: [], dist: Single } + └─BatchProject { exprs: [row_number, rank] } + └─BatchOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─BatchSort { order: [t.x ASC, t.y ASC, t.z ASC] } + └─BatchGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } + └─BatchExchange { order: [], dist: HashShard(t.x) } + └─BatchScan { table: t, columns: [t.x, t.y, t.z], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [r2, r3, t.x(hidden), t._row_id(hidden), t.y(hidden)], stream_key: [t.x, t._row_id, t.y], pk_columns: [t.x, t._row_id, t.y], pk_conflict: NoCheck } + └─StreamProject { exprs: [row_number, rank, t.x, t._row_id, t.y] } + └─StreamOverWindow { window_functions: [row_number() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), rank() OVER(PARTITION BY t.x, t.y ORDER BY t.z ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)] } + └─StreamGroupTopN { order: [t.y ASC], limit: 9, offset: 0, group_key: [t.x] } + └─StreamExchange { dist: HashShard(t.x) } + └─StreamTableScan { table: t, columns: [t.x, t.y, t.z, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) } - id: create_bid sql: | /* diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index ce52e12486e94..f8d9e6f78c737 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -325,6 +325,14 @@ static CONVERT_OVER_WINDOW: LazyLock = LazyLock::new(|| { ) }); +static MERGE_OVER_WINDOW: LazyLock = LazyLock::new(|| { + OptimizationStage::new( + "Merge Over Window", + vec![OverWindowMergeRule::create()], + ApplyOrder::TopDown, + ) +}); + static REWRITE_LIKE_EXPR: LazyLock = LazyLock::new(|| { OptimizationStage::new( "Rewrite Like Expr", @@ -567,6 +575,7 @@ impl LogicalOptimizer { // optimized to TopN. plan = Self::predicate_pushdown(plan, explain_trace, &ctx); plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW); + plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW); let force_split_distinct_agg = ctx.session_ctx().config().get_force_split_distinct_agg(); // TODO: better naming of the OptimizationStage @@ -647,6 +656,7 @@ impl LogicalOptimizer { // optimized to TopN. plan = Self::predicate_pushdown(plan, explain_trace, &ctx); plan = plan.optimize_by_rules(&CONVERT_OVER_WINDOW); + plan = plan.optimize_by_rules(&MERGE_OVER_WINDOW); // Convert distinct aggregates. plan = plan.optimize_by_rules(&CONVERT_DISTINCT_AGG_FOR_BATCH); diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index b9e58f9c9d6eb..665cee6f178a0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -522,6 +522,14 @@ impl LogicalOverWindow { &self.core.window_functions } + pub fn partition_key_indices(&self) -> Vec { + self.core.partition_key_indices() + } + + pub fn order_key(&self) -> &[ColumnOrder] { + self.core.order_key() + } + #[must_use] fn rewrite_with_input_and_window( &self, diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 723f8436d39e5..a82575735a638 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -29,6 +29,8 @@ pub trait Description { pub(super) type BoxedRule = Box; +mod over_window_merge_rule; +pub use over_window_merge_rule::*; mod project_join_merge_rule; pub use project_join_merge_rule::*; mod project_eliminate_rule; @@ -180,6 +182,7 @@ macro_rules! for_all_rules { , { OverWindowToTopNRule } , { OverWindowToAggAndJoinRule } , { OverWindowSplitRule } + , { OverWindowMergeRule } , { JoinCommuteRule } , { UnionToDistinctRule } , { AggProjectMergeRule } diff --git a/src/frontend/src/optimizer/rule/over_window_merge_rule.rs b/src/frontend/src/optimizer/rule/over_window_merge_rule.rs new file mode 100644 index 0000000000000..e856fd8cdd073 --- /dev/null +++ b/src/frontend/src/optimizer/rule/over_window_merge_rule.rs @@ -0,0 +1,63 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::Rule; +use crate::optimizer::plan_node::{LogicalOverWindow, PlanTreeNodeUnary}; +use crate::PlanRef; + +/// Merge chaining `LogicalOverWindow`s with same `PARTITION BY` and `ORDER BY`. +/// Should be applied after `OverWindowSplitRule`. +pub struct OverWindowMergeRule; + +impl OverWindowMergeRule { + pub fn create() -> Box { + Box::new(OverWindowMergeRule) + } +} + +impl Rule for OverWindowMergeRule { + fn apply(&self, plan: PlanRef) -> Option { + let over_window = plan.as_logical_over_window()?; + let mut window_functions_rev = over_window + .window_functions() + .iter() + .rev() + .cloned() + .collect::>(); + let partition_key = over_window.partition_key_indices(); + let order_key = over_window.order_key(); + + let mut curr = plan.clone(); + let mut curr_input = over_window.input(); + while let Some(input_over_window) = curr_input.as_logical_over_window() { + if input_over_window.partition_key_indices() != partition_key + || input_over_window.order_key() != order_key + { + // cannot merge `OverWindow`s with different partition key or order key + break; + } + window_functions_rev.extend(input_over_window.window_functions().iter().rev().cloned()); + curr = curr_input.clone(); + curr_input = input_over_window.input(); + } + + if curr.as_logical_over_window().unwrap() == over_window { + // unchanged + return None; + } + + let window_functions = window_functions_rev.into_iter().rev().collect::>(); + Some(LogicalOverWindow::new(window_functions, curr_input).into()) + } +}