From aad4119c278d0a814761ef98d08cedd399465779 Mon Sep 17 00:00:00 2001 From: Dylan Date: Wed, 27 Sep 2023 13:46:15 +0800 Subject: [PATCH] feat(optimizer): support apply hop window transpose rule (#12338) --- .../subquery_with_hop_window.slt.part | 19 ++++ .../tests/testdata/input/subquery.yaml | 7 ++ .../tests/testdata/output/subquery.yaml | 24 +++++ .../src/optimizer/logical_optimization.rs | 1 + .../optimizer/plan_node/logical_hop_window.rs | 4 + .../src/optimizer/plan_node/logical_join.rs | 2 +- .../rule/apply_hop_window_transpose_rule.rs | 93 +++++++++++++++++++ .../rule/apply_join_transpose_rule.rs | 2 +- .../rule/cross_join_eliminate_rule.rs | 2 +- src/frontend/src/optimizer/rule/mod.rs | 3 + 10 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 e2e_test/batch/subquery/subquery_with_hop_window.slt.part create mode 100644 src/frontend/src/optimizer/rule/apply_hop_window_transpose_rule.rs diff --git a/e2e_test/batch/subquery/subquery_with_hop_window.slt.part b/e2e_test/batch/subquery/subquery_with_hop_window.slt.part new file mode 100644 index 000000000000..d4dba50fa37c --- /dev/null +++ b/e2e_test/batch/subquery/subquery_with_hop_window.slt.part @@ -0,0 +1,19 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table t1 (k int primary key, ts timestamp); + +statement ok +insert into t1 values (1, '2021-01-01 10:15:00'); + +query IITTT rowsort +select * from (select 1 as col union select 2) u , lateral(select * from hop(t1, ts, interval '10' minute, interval '30' minute) where col = k); +---- +1 1 2021-01-01 10:15:00 2021-01-01 09:50:00 2021-01-01 10:20:00 +1 1 2021-01-01 10:15:00 2021-01-01 10:00:00 2021-01-01 10:30:00 +1 1 2021-01-01 10:15:00 2021-01-01 10:10:00 2021-01-01 10:40:00 + +statement ok +drop table t1; + diff --git a/src/frontend/planner_test/tests/testdata/input/subquery.yaml b/src/frontend/planner_test/tests/testdata/input/subquery.yaml index e17087cd071f..b1d58c5e422e 100644 --- a/src/frontend/planner_test/tests/testdata/input/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/input/subquery.yaml @@ -284,3 +284,10 @@ expected_outputs: - batch_plan - stream_plan +- name: test hop window subquery 1 + sql: | + create table t1 (k int primary key, ts timestamp); + select * from (select 1 as col union select 2) u , lateral(select * from hop(t1, ts, interval '10' minute, interval '30' minute) where col = k); + expected_outputs: + - batch_plan + - stream_plan diff --git a/src/frontend/planner_test/tests/testdata/output/subquery.yaml b/src/frontend/planner_test/tests/testdata/output/subquery.yaml index ca6375cfaace..64116534a99b 100644 --- a/src/frontend/planner_test/tests/testdata/output/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/output/subquery.yaml @@ -833,3 +833,27 @@ └─StreamProject { exprs: [rows.correlated_col, rows.k, rows.v, rows._row_id] } └─StreamFilter { predicate: IsNotNull(rows.correlated_col) } └─StreamTableScan { table: rows, columns: [rows.k, rows.v, rows.correlated_col, rows._row_id], pk: [rows._row_id], dist: UpstreamHashShard(rows._row_id) } +- name: test hop window subquery 1 + sql: | + create table t1 (k int primary key, ts timestamp); + select * from (select 1 as col union select 2) u , lateral(select * from hop(t1, ts, interval '10' minute, interval '30' minute) where col = k); + batch_plan: |- + BatchHopWindow { time_col: t1.ts, slide: 00:10:00, size: 00:30:00, output: all } + └─BatchExchange { order: [], dist: Single } + └─BatchFilter { predicate: IsNotNull(t1.ts) } + └─BatchLookupJoin { type: Inner, predicate: 1:Int32 = t1.k AND IsNotNull(t1.ts), output: all } + └─BatchExchange { order: [], dist: UpstreamHashShard(1:Int32) } + └─BatchHashAgg { group_key: [1:Int32], aggs: [] } + └─BatchExchange { order: [], dist: HashShard(1:Int32) } + └─BatchValues { rows: [[1:Int32], [2:Int32]] } + stream_plan: |- + StreamMaterialize { columns: [col, k, ts, window_start, window_end], stream_key: [col, k, window_start, window_end], pk_columns: [col, k, window_start, window_end], pk_conflict: NoCheck } + └─StreamHashJoin { type: Inner, predicate: 1:Int32 = t1.k, output: all } + ├─StreamAppendOnlyDedup { dedup_cols: [1:Int32] } + │ └─StreamExchange { dist: HashShard(1:Int32) } + │ └─StreamProject { exprs: [1:Int32] } + │ └─StreamValues { rows: [[1:Int32, 0:Int64], [2:Int32, 1:Int64]] } + └─StreamExchange { dist: HashShard(t1.k) } + └─StreamHopWindow { time_col: t1.ts, slide: 00:10:00, size: 00:30:00, output: all } + └─StreamFilter { predicate: IsNotNull(t1.ts) } + └─StreamTableScan { table: t1, columns: [t1.k, t1.ts], pk: [t1.k], dist: UpstreamHashShard(t1.k) } diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index 7a59c968326f..f5b363904bab 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -189,6 +189,7 @@ static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock = LazyLock ApplyUnionTransposeRule::create(), ApplyOverWindowTransposeRule::create(), ApplyExpandTransposeRule::create(), + ApplyHopWindowTransposeRule::create(), CrossJoinEliminateRule::create(), ApplyShareEliminateRule::create(), ], diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 73fd3730b0c4..d07504701ab4 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -90,6 +90,10 @@ impl LogicalHopWindow { self.core.into_parts() } + pub fn output_indices_are_trivial(&self) -> bool { + self.output_indices() == &(0..self.core.internal_column_num()).collect_vec() + } + /// used for binder and planner. The function will add a filter operator to ignore records with /// NULL time value. pub fn create( diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index a9e398350f8d..21aef3277a9d 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -184,7 +184,7 @@ impl LogicalJoin { self.core.is_full_out() } - pub fn output_indices_is_trivial(&self) -> bool { + pub fn output_indices_are_trivial(&self) -> bool { self.output_indices() == &(0..self.internal_column_num()).collect_vec() } diff --git a/src/frontend/src/optimizer/rule/apply_hop_window_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_hop_window_transpose_rule.rs new file mode 100644 index 000000000000..bf332fee2841 --- /dev/null +++ b/src/frontend/src/optimizer/rule/apply_hop_window_transpose_rule.rs @@ -0,0 +1,93 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_pb::plan_common::JoinType; + +use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalHopWindow}; +use crate::optimizer::PlanRef; +use crate::utils::Condition; + +/// Transpose `LogicalApply` and `LogicalHopWindow`. +/// +/// Before: +/// +/// ```text +/// LogicalApply +/// / \ +/// Domain LogicalHopWindow +/// | +/// Input +/// ``` +/// +/// After: +/// +/// ```text +/// LogicalHopWindow +/// | +/// LogicalApply +/// / \ +/// Domain Input +/// ``` +pub struct ApplyHopWindowTransposeRule {} +impl Rule for ApplyHopWindowTransposeRule { + fn apply(&self, plan: PlanRef) -> Option { + let apply: &LogicalApply = plan.as_logical_apply()?; + let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) = + apply.clone().decompose(); + let hop_window: &LogicalHopWindow = right.as_logical_hop_window()?; + assert_eq!(join_type, JoinType::Inner); + + if !hop_window.output_indices_are_trivial() { + return None; + } + + let (hop_window_input, time_col, window_slide, window_size, window_offset, _output_indices) = + hop_window.clone().into_parts(); + + let apply_left_len = left.schema().len() as isize; + + if max_one_row { + return None; + } + + let new_apply = LogicalApply::new( + left, + hop_window_input, + JoinType::Inner, + Condition::true_cond(), + correlated_id, + correlated_indices, + false, + ) + .into(); + + let new_hop_window = LogicalHopWindow::create( + new_apply, + time_col.clone_with_offset(apply_left_len), + window_slide, + window_size, + window_offset, + ); + + let filter = LogicalFilter::create(new_hop_window, on); + Some(filter) + } +} + +impl ApplyHopWindowTransposeRule { + pub fn create() -> BoxedRule { + Box::new(ApplyHopWindowTransposeRule {}) + } +} diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs index 089a66f0ad08..66579248a76f 100644 --- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs @@ -122,7 +122,7 @@ impl Rule for ApplyJoinTransposeRule { } assert!( - join.output_indices_is_trivial(), + join.output_indices_are_trivial(), "ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule" ); diff --git a/src/frontend/src/optimizer/rule/cross_join_eliminate_rule.rs b/src/frontend/src/optimizer/rule/cross_join_eliminate_rule.rs index 336ec74fa909..ddc8ff3efc2b 100644 --- a/src/frontend/src/optimizer/rule/cross_join_eliminate_rule.rs +++ b/src/frontend/src/optimizer/rule/cross_join_eliminate_rule.rs @@ -44,7 +44,7 @@ impl Rule for CrossJoinEliminateRule { && join_type == JoinType::Inner && values.rows().len() == 1 // one row && values.rows()[0].is_empty() // no columns - && join.output_indices_is_trivial() + && join.output_indices_are_trivial() { Some(left) } else { diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index 388a8ea632c8..137528fd6b41 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -146,6 +146,8 @@ mod expand_to_project_rule; pub use expand_to_project_rule::*; mod agg_group_by_simplify_rule; pub use agg_group_by_simplify_rule::*; +mod apply_hop_window_transpose_rule; +pub use apply_hop_window_transpose_rule::*; #[macro_export] macro_rules! for_all_rules { @@ -209,6 +211,7 @@ macro_rules! for_all_rules { , { ApplyExpandTransposeRule } , { ExpandToProjectRule } , { AggGroupBySimplifyRule } + , { ApplyHopWindowTransposeRule } } }; }