Skip to content

Commit

Permalink
feat(optimizer): support apply hop window transpose rule (#12338)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Sep 27, 2023
1 parent 4ca3a9a commit aad4119
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 3 deletions.
19 changes: 19 additions & 0 deletions e2e_test/batch/subquery/subquery_with_hop_window.slt.part
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (k int primary key, ts timestamp);

statement ok
insert into t1 values (1, '2021-01-01 10:15:00');

query IITTT rowsort
select * from (select 1 as col union select 2) u , lateral(select * from hop(t1, ts, interval '10' minute, interval '30' minute) where col = k);
----
1 1 2021-01-01 10:15:00 2021-01-01 09:50:00 2021-01-01 10:20:00
1 1 2021-01-01 10:15:00 2021-01-01 10:00:00 2021-01-01 10:30:00
1 1 2021-01-01 10:15:00 2021-01-01 10:10:00 2021-01-01 10:40:00

statement ok
drop table t1;

7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,10 @@
expected_outputs:
- batch_plan
- stream_plan
- name: test hop window subquery 1
sql: |
create table t1 (k int primary key, ts timestamp);
select * from (select 1 as col union select 2) u , lateral(select * from hop(t1, ts, interval '10' minute, interval '30' minute) where col = k);
expected_outputs:
- batch_plan
- stream_plan
24 changes: 24 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,27 @@
└─StreamProject { exprs: [rows.correlated_col, rows.k, rows.v, rows._row_id] }
└─StreamFilter { predicate: IsNotNull(rows.correlated_col) }
└─StreamTableScan { table: rows, columns: [rows.k, rows.v, rows.correlated_col, rows._row_id], pk: [rows._row_id], dist: UpstreamHashShard(rows._row_id) }
- name: test hop window subquery 1
sql: |
create table t1 (k int primary key, ts timestamp);
select * from (select 1 as col union select 2) u , lateral(select * from hop(t1, ts, interval '10' minute, interval '30' minute) where col = k);
batch_plan: |-
BatchHopWindow { time_col: t1.ts, slide: 00:10:00, size: 00:30:00, output: all }
└─BatchExchange { order: [], dist: Single }
└─BatchFilter { predicate: IsNotNull(t1.ts) }
└─BatchLookupJoin { type: Inner, predicate: 1:Int32 = t1.k AND IsNotNull(t1.ts), output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(1:Int32) }
└─BatchHashAgg { group_key: [1:Int32], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(1:Int32) }
└─BatchValues { rows: [[1:Int32], [2:Int32]] }
stream_plan: |-
StreamMaterialize { columns: [col, k, ts, window_start, window_end], stream_key: [col, k, window_start, window_end], pk_columns: [col, k, window_start, window_end], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: 1:Int32 = t1.k, output: all }
├─StreamAppendOnlyDedup { dedup_cols: [1:Int32] }
│ └─StreamExchange { dist: HashShard(1:Int32) }
│ └─StreamProject { exprs: [1:Int32] }
│ └─StreamValues { rows: [[1:Int32, 0:Int64], [2:Int32, 1:Int64]] }
└─StreamExchange { dist: HashShard(t1.k) }
└─StreamHopWindow { time_col: t1.ts, slide: 00:10:00, size: 00:30:00, output: all }
└─StreamFilter { predicate: IsNotNull(t1.ts) }
└─StreamTableScan { table: t1, columns: [t1.k, t1.ts], pk: [t1.k], dist: UpstreamHashShard(t1.k) }
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ static GENERAL_UNNESTING_PUSH_DOWN_APPLY: LazyLock<OptimizationStage> = LazyLock
ApplyUnionTransposeRule::create(),
ApplyOverWindowTransposeRule::create(),
ApplyExpandTransposeRule::create(),
ApplyHopWindowTransposeRule::create(),
CrossJoinEliminateRule::create(),
ApplyShareEliminateRule::create(),
],
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl LogicalHopWindow {
self.core.into_parts()
}

pub fn output_indices_are_trivial(&self) -> bool {
self.output_indices() == &(0..self.core.internal_column_num()).collect_vec()
}

/// used for binder and planner. The function will add a filter operator to ignore records with
/// NULL time value. <https://github.com/risingwavelabs/risingwave/issues/8130>
pub fn create(
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/logical_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl LogicalJoin {
self.core.is_full_out()
}

pub fn output_indices_is_trivial(&self) -> bool {
pub fn output_indices_are_trivial(&self) -> bool {
self.output_indices() == &(0..self.internal_column_num()).collect_vec()
}

Expand Down
93 changes: 93 additions & 0 deletions src/frontend/src/optimizer/rule/apply_hop_window_transpose_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::plan_common::JoinType;

use super::{BoxedRule, Rule};
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalHopWindow};
use crate::optimizer::PlanRef;
use crate::utils::Condition;

/// Transpose `LogicalApply` and `LogicalHopWindow`.
///
/// Before:
///
/// ```text
/// LogicalApply
/// / \
/// Domain LogicalHopWindow
/// |
/// Input
/// ```
///
/// After:
///
/// ```text
/// LogicalHopWindow
/// |
/// LogicalApply
/// / \
/// Domain Input
/// ```
pub struct ApplyHopWindowTransposeRule {}
impl Rule for ApplyHopWindowTransposeRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let apply: &LogicalApply = plan.as_logical_apply()?;
let (left, right, on, join_type, correlated_id, correlated_indices, max_one_row) =
apply.clone().decompose();
let hop_window: &LogicalHopWindow = right.as_logical_hop_window()?;
assert_eq!(join_type, JoinType::Inner);

if !hop_window.output_indices_are_trivial() {
return None;
}

let (hop_window_input, time_col, window_slide, window_size, window_offset, _output_indices) =
hop_window.clone().into_parts();

let apply_left_len = left.schema().len() as isize;

if max_one_row {
return None;
}

let new_apply = LogicalApply::new(
left,
hop_window_input,
JoinType::Inner,
Condition::true_cond(),
correlated_id,
correlated_indices,
false,
)
.into();

let new_hop_window = LogicalHopWindow::create(
new_apply,
time_col.clone_with_offset(apply_left_len),
window_slide,
window_size,
window_offset,
);

let filter = LogicalFilter::create(new_hop_window, on);
Some(filter)
}
}

impl ApplyHopWindowTransposeRule {
pub fn create() -> BoxedRule {
Box::new(ApplyHopWindowTransposeRule {})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Rule for ApplyJoinTransposeRule {
}

assert!(
join.output_indices_is_trivial(),
join.output_indices_are_trivial(),
"ApplyJoinTransposeRule requires the join containing no output indices, so make sure ProjectJoinSeparateRule is always applied before this rule"
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Rule for CrossJoinEliminateRule {
&& join_type == JoinType::Inner
&& values.rows().len() == 1 // one row
&& values.rows()[0].is_empty() // no columns
&& join.output_indices_is_trivial()
&& join.output_indices_are_trivial()
{
Some(left)
} else {
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ mod expand_to_project_rule;
pub use expand_to_project_rule::*;
mod agg_group_by_simplify_rule;
pub use agg_group_by_simplify_rule::*;
mod apply_hop_window_transpose_rule;
pub use apply_hop_window_transpose_rule::*;

#[macro_export]
macro_rules! for_all_rules {
Expand Down Expand Up @@ -209,6 +211,7 @@ macro_rules! for_all_rules {
, { ApplyExpandTransposeRule }
, { ExpandToProjectRule }
, { AggGroupBySimplifyRule }
, { ApplyHopWindowTransposeRule }
}
};
}
Expand Down

0 comments on commit aad4119

Please sign in to comment.