Skip to content

Commit

Permalink
feat(optimizer): support split now or rule (#14127)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Dec 22, 2023
1 parent 29e62e6 commit adaf0a7
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,21 @@
select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where v1 > now();
expected_outputs:
- stream_plan
- name: Temporal filter with or predicate
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now() or ts > ' 2023-12-18 00:00:00+00:00';
expected_outputs:
- stream_plan
- name: Temporal filter with or is null
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now() or ts is null;
expected_outputs:
- stream_plan
- name: Temporal filter with or predicate
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now() or ts > ' 2023-12-18 00:00:00+00:00' or ts is null;
expected_outputs:
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,72 @@
│ └─StreamTableScan { table: version, columns: [version.id2], pk: [version.id2], dist: UpstreamHashShard(version.id2) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
- name: Temporal filter with or predicate
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now() or ts > ' 2023-12-18 00:00:00+00:00';
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) }
│ └─StreamProject { exprs: [t1.ts, t1._row_id, 0:Int32] }
│ └─StreamDynamicFilter { predicate: ($expr1 > now), output_watermarks: [$expr1], output: [t1.ts, $expr1, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamProject { exprs: [t1.ts, AddWithTimeZone(t1.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t1._row_id] }
│ │ └─StreamFilter { predicate: Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamFilter { predicate: (Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) OR (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) }
│ │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow { output: [now] }
└─StreamExchange { dist: HashShard(t1._row_id, 1:Int32) }
└─StreamProject { exprs: [t1.ts, t1._row_id, 1:Int32] }
└─StreamFilter { predicate: (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz) }
└─StreamShare { id: 2 }
└─StreamFilter { predicate: (Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) OR (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) }
└─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
- name: Temporal filter with or is null
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now() or ts is null;
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) }
│ └─StreamProject { exprs: [t1.ts, t1._row_id, 0:Int32] }
│ └─StreamDynamicFilter { predicate: ($expr1 > now), output_watermarks: [$expr1], output: [t1.ts, $expr1, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamProject { exprs: [t1.ts, AddWithTimeZone(t1.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t1._row_id] }
│ │ └─StreamFilter { predicate: Not(IsNull(t1.ts)) }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamFilter { predicate: (Not(IsNull(t1.ts)) OR IsNull(t1.ts)) }
│ │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow { output: [now] }
└─StreamExchange { dist: HashShard(t1._row_id, 1:Int32) }
└─StreamProject { exprs: [t1.ts, t1._row_id, 1:Int32] }
└─StreamFilter { predicate: IsNull(t1.ts) }
└─StreamShare { id: 2 }
└─StreamFilter { predicate: (Not(IsNull(t1.ts)) OR IsNull(t1.ts)) }
└─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
- name: Temporal filter with or predicate
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now() or ts > ' 2023-12-18 00:00:00+00:00' or ts is null;
stream_plan: |-
StreamMaterialize { columns: [ts, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) }
│ └─StreamProject { exprs: [t1.ts, t1._row_id, 0:Int32] }
│ └─StreamDynamicFilter { predicate: ($expr1 > now), output_watermarks: [$expr1], output: [t1.ts, $expr1, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamProject { exprs: [t1.ts, AddWithTimeZone(t1.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t1._row_id] }
│ │ └─StreamFilter { predicate: Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) AND Not(IsNull(t1.ts)) }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamFilter { predicate: (((Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) AND Not(IsNull(t1.ts))) OR (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) OR IsNull(t1.ts)) }
│ │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow { output: [now] }
└─StreamExchange { dist: HashShard(t1._row_id, 1:Int32) }
└─StreamProject { exprs: [t1.ts, t1._row_id, 1:Int32] }
└─StreamFilter { predicate: ((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz) OR IsNull(t1.ts)) }
└─StreamShare { id: 2 }
└─StreamFilter { predicate: (((Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) AND Not(IsNull(t1.ts))) OR (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) OR IsNull(t1.ts)) }
└─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(||
static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Push down filter with now into a left semijoin",
vec![FilterWithNowToJoinRule::create()],
vec![SplitNowOrRule::create(), FilterWithNowToJoinRule::create()],
ApplyOrder::TopDown,
)
});
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub use top_n_on_index_rule::*;
mod stream;
pub use stream::bushy_tree_join_ordering_rule::*;
pub use stream::filter_with_now_to_join_rule::*;
pub use stream::split_now_or_rule::*;
pub use stream::stream_project_merge_rule::*;
mod trivial_project_to_values_rule;
pub use trivial_project_to_values_rule::*;
Expand Down Expand Up @@ -189,6 +190,7 @@ macro_rules! for_all_rules {
, { AggProjectMergeRule }
, { UnionMergeRule }
, { DagToTreeRule }
, { SplitNowOrRule }
, { FilterWithNowToJoinRule }
, { TopNOnIndexRule }
, { TrivialProjectToValuesRule }
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/rule/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@

pub(crate) mod bushy_tree_join_ordering_rule;
pub(crate) mod filter_with_now_to_join_rule;
pub(crate) mod split_now_or_rule;
pub(crate) mod stream_project_merge_rule;
102 changes: 102 additions & 0 deletions src/frontend/src/optimizer/rule/stream/split_now_or_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::DataType;

use crate::expr::{ExprImpl, ExprType, FunctionCall};
use crate::optimizer::plan_node::{LogicalFilter, LogicalShare, LogicalUnion, PlanTreeNodeUnary};
use crate::optimizer::rule::{BoxedRule, Rule};
use crate::optimizer::PlanRef;

/// Convert `LogicalFilter` with now or others predicates to a `UNION ALL`
///
/// Before:
/// ```text
/// `LogicalFilter`
/// now() or others
/// |
/// Input
/// ```
///
/// After:
/// ```text
/// `LogicalUnionAll`
/// / \
/// `LogicalFilter` `LogicalFilter`
/// now() & !others others
/// | |
/// \ /
/// `LogicalShare`
/// |
/// Input
/// ```text
pub struct SplitNowOrRule {}
impl Rule for SplitNowOrRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let filter: &LogicalFilter = plan.as_logical_filter()?;
let input = filter.input();

if filter.predicate().conjunctions.len() != 1 {
return None;
}

let disjunctions = filter.predicate().conjunctions[0].as_or_disjunctions()?;

if disjunctions.len() < 2 {
return None;
}

let (mut now, others): (Vec<ExprImpl>, Vec<ExprImpl>) =
disjunctions.into_iter().partition(|x| x.count_nows() != 0);

// Only support now in one arm of disjunctions
if now.len() != 1 {
return None;
}

// A or B or C ... or Z
// =>
// + A & !B & !C ... &!Z
// + B | C ... | Z

let mut arm1 = now.pop().unwrap();
for pred in &others {
let not_pred: ExprImpl =
FunctionCall::new_unchecked(ExprType::Not, vec![pred.clone()], DataType::Boolean)
.into();
arm1 =
FunctionCall::new_unchecked(ExprType::And, vec![arm1, not_pred], DataType::Boolean)
.into();
}

let arm2 = others
.into_iter()
.reduce(|a, b| {
FunctionCall::new_unchecked(ExprType::Or, vec![a, b], DataType::Boolean).into()
})
.unwrap();

let share = LogicalShare::create(input);
let filter1 = LogicalFilter::create_with_expr(share.clone(), arm1);
let filter2 = LogicalFilter::create_with_expr(share.clone(), arm2);
let union_all = LogicalUnion::create(true, vec![filter1, filter2]);
Some(union_all)
}
}

impl SplitNowOrRule {
pub fn create() -> BoxedRule {
Box::new(SplitNowOrRule {})
}
}

0 comments on commit adaf0a7

Please sign in to comment.