Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): support logical filter expression simplify rule #15275

Merged
merged 47 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
dfbbac9
add expression rule
xzhseh Feb 23, 2024
59116bb
add optimization rule to stream
xzhseh Feb 23, 2024
89b1522
change to general filter
xzhseh Feb 23, 2024
1419878
debug statement
xzhseh Feb 23, 2024
750628e
debug statement
xzhseh Feb 23, 2024
e105c32
print out predicate
xzhseh Feb 23, 2024
b4dbe30
add basic simplify rule
xzhseh Feb 26, 2024
abd271d
remove debug statemetns
xzhseh Feb 26, 2024
e6c3988
fix check
xzhseh Feb 26, 2024
c88a311
add comment
xzhseh Feb 26, 2024
4741bea
update planner test
xzhseh Feb 26, 2024
d9f793f
tiny update
xzhseh Feb 26, 2024
03a8486
update planner test
xzhseh Feb 26, 2024
614f4fb
Merge branch 'main' into xzhseh/expression-simplify-rule
xzhseh Feb 27, 2024
1945eb6
move simplify process to condition.rs
xzhseh Feb 27, 2024
dca9cd2
fix check
xzhseh Feb 27, 2024
e9386d1
IsNotNull / IsNull special check
xzhseh Feb 27, 2024
beb65c4
extract all columns from underlying expr
xzhseh Feb 27, 2024
4365181
remove debug statements
xzhseh Feb 27, 2024
9e9813e
update planner test
xzhseh Feb 27, 2024
2913312
update comment
xzhseh Feb 27, 2024
8b49c79
fix check
xzhseh Feb 27, 2024
63874da
fix check
xzhseh Feb 28, 2024
3dc893d
update comment
xzhseh Feb 29, 2024
f3abdab
Merge branch 'main' into xzhseh/expression-simplify-rule
xzhseh Mar 1, 2024
92ce016
bring back simplication rule
xzhseh Mar 1, 2024
d86d335
update comment
xzhseh Mar 1, 2024
a3a168a
add is_null check at the beginning
xzhseh Mar 1, 2024
73a2741
add rule
xzhseh Mar 1, 2024
f41dd2f
update rule
xzhseh Mar 13, 2024
21c5aa7
update planner tests
xzhseh Mar 13, 2024
aea6d9c
Merge branch 'main' into xzhseh/expression-simplify-rule
xzhseh Mar 13, 2024
ddd2385
update format
xzhseh Mar 13, 2024
c80e99a
update comment
xzhseh Mar 14, 2024
fe1ef5f
fix typo
xzhseh Mar 14, 2024
3480649
add more functions that will never return null
xzhseh Mar 15, 2024
6d92d4a
update fmt
xzhseh Mar 15, 2024
3fe23cd
rewrite logical filter
xzhseh Mar 15, 2024
f6f6c60
make the rule general
xzhseh Mar 15, 2024
d99a286
update format
xzhseh Mar 15, 2024
cd6367e
remove stream rule
xzhseh Mar 15, 2024
c27f430
update the specific situation that will be simplified by rule
xzhseh Mar 18, 2024
39d11fe
Merge branch 'main' into xzhseh/expression-simplify-rule
xzhseh Mar 18, 2024
92b8c7e
update planner tests
xzhseh Mar 18, 2024
6b7e788
Merge branch 'main' into xzhseh/expression-simplify-rule
xzhseh Mar 19, 2024
60fe355
Merge branch 'main' into xzhseh/expression-simplify-rule
xzhseh Mar 19, 2024
755d743
Merge branch 'main' into xzhseh/expression-simplify-rule
xzhseh Mar 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -383,17 +383,15 @@
│ └─StreamDynamicFilter { predicate: ($expr1 > now), output_watermarks: [$expr1], output: [t1.ts, $expr1, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamProject { exprs: [t1.ts, AddWithTimeZone(t1.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t1._row_id] }
│ │ └─StreamFilter { predicate: Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamFilter { predicate: (Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) OR (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) }
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
│ │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ │ └─StreamShare { id: 1 }
│ │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow { output: [now] }
└─StreamExchange { dist: HashShard(t1._row_id, 1:Int32) }
└─StreamProject { exprs: [t1.ts, t1._row_id, 1:Int32] }
└─StreamFilter { predicate: (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz) }
└─StreamShare { id: 2 }
└─StreamFilter { predicate: (Not((t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) OR (t1.ts > '2023-12-18 00:00:00+00:00':Timestamptz)) }
└─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamShare { id: 1 }
└─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
- name: Temporal filter with or is null
sql: |
create table t1 (ts timestamp with time zone);
Expand All @@ -406,17 +404,15 @@
│ └─StreamDynamicFilter { predicate: ($expr1 > now), output_watermarks: [$expr1], output: [t1.ts, $expr1, t1._row_id], cleaned_by_watermark: true }
│ ├─StreamProject { exprs: [t1.ts, AddWithTimeZone(t1.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t1._row_id] }
│ │ └─StreamFilter { predicate: Not(IsNull(t1.ts)) }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamFilter { predicate: (Not(IsNull(t1.ts)) OR IsNull(t1.ts)) }
│ │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ │ └─StreamShare { id: 1 }
│ │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamNow { output: [now] }
└─StreamExchange { dist: HashShard(t1._row_id, 1:Int32) }
└─StreamProject { exprs: [t1.ts, t1._row_id, 1:Int32] }
└─StreamFilter { predicate: IsNull(t1.ts) }
└─StreamShare { id: 2 }
└─StreamFilter { predicate: (Not(IsNull(t1.ts)) OR IsNull(t1.ts)) }
└─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamShare { id: 1 }
└─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
- name: Temporal filter with or predicate
sql: |
create table t1 (ts timestamp with time zone);
Expand Down Expand Up @@ -458,7 +454,7 @@
│ │ │ └─StreamDynamicFilter { predicate: (t.t > $expr1), output_watermarks: [t.t], output: [t.t, t.a, t._row_id], cleaned_by_watermark: true }
│ │ │ ├─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND Not(IsNull(t.t)) AND Not((t.a < 1:Int32)) }
│ │ │ │ └─StreamShare { id: 2 }
│ │ │ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
│ │ │ │ └─StreamFilter { predicate: (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
│ │ │ │ └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
│ │ │ └─StreamExchange { dist: Broadcast }
│ │ │ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
Expand All @@ -467,7 +463,7 @@
│ │ └─StreamProject { exprs: [t.t, t.a, t._row_id, 1:Int32] }
│ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (IsNull(t.t) OR (t.a < 1:Int32)) }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
│ │ └─StreamFilter { predicate: (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
│ │ └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] }
Expand All @@ -482,7 +478,7 @@
│ └─StreamDynamicFilter { predicate: (t.t > $expr1), output_watermarks: [t.t], output: [t.t, t.a, t._row_id], cleaned_by_watermark: true }
│ ├─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND Not(IsNull(t.t)) AND Not((t.a < 1:Int32)) }
│ │ └─StreamShare { id: 2 }
│ │ └─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
│ │ └─StreamFilter { predicate: (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
│ │ └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
│ └─StreamExchange { dist: Broadcast }
│ └─StreamProject { exprs: [SubtractWithTimeZone(now, '01:00:00':Interval, 'UTC':Varchar) as $expr1], output_watermarks: [$expr1] }
Expand All @@ -491,5 +487,5 @@
└─StreamProject { exprs: [t.t, t.a, t._row_id, 1:Int32] }
└─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (IsNull(t.t) OR (t.a < 1:Int32)) }
└─StreamShare { id: 2 }
└─StreamFilter { predicate: (Not((t.a > 1:Int32)) OR (t.a > 1:Int32)) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
└─StreamFilter { predicate: (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
└─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,14 @@ static COMMON_SUB_EXPR_EXTRACT: LazyLock<OptimizationStage> = LazyLock::new(|| {
)
});

static SIMPLIFY_FILTER_EXPRESSION: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Simplify Filter Expression",
vec![SimplifyFilterExpressionRule::create()],
ApplyOrder::TopDown,
)
});

impl LogicalOptimizer {
pub fn predicate_pushdown(
plan: PlanRef,
Expand Down Expand Up @@ -624,6 +632,8 @@ impl LogicalOptimizer {

plan = plan.optimize_by_rules(&COMMON_SUB_EXPR_EXTRACT);

plan = plan.optimize_by_rules(&SIMPLIFY_FILTER_EXPRESSION);

#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());

Expand Down Expand Up @@ -720,6 +730,8 @@ impl LogicalOptimizer {

plan = plan.optimize_by_rules(&DAG_TO_TREE);

plan = plan.optimize_by_rules(&SIMPLIFY_FILTER_EXPRESSION);

#[cfg(debug_assertions)]
InputRefValidator.validate(plan.clone());

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/expr_rewritable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::*;
use crate::expr::ExprRewriter;

/// Rewrites expressions in a `PlanRef`. Due to `Share` operator,
/// the `ExprRewriter` needs to be idempotent i.e. applying it more than once
/// the `ExprRewriter` needs to be idempotent i.e., applying it more than once
/// to the same `ExprImpl` will be a noop on subsequent applications.
/// `rewrite_exprs` should only return a plan with the given node modified.
/// To rewrite recursively, call `rewrite_exprs_recursive` on [`RewriteExprsRecursive`].
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub use top_n_on_index_rule::*;
mod stream;
pub use stream::bushy_tree_join_ordering_rule::*;
pub use stream::filter_with_now_to_join_rule::*;
pub use stream::simplify_filter_expression_rule::*;
pub use stream::split_now_and_rule::*;
pub use stream::split_now_or_rule::*;
pub use stream::stream_project_merge_rule::*;
Expand Down Expand Up @@ -204,6 +205,7 @@ macro_rules! for_all_rules {
, { AlwaysFalseFilterRule }
, { BushyTreeJoinOrderingRule }
, { StreamProjectMergeRule }
, { SimplifyFilterExpressionRule }
, { JoinProjectTransposeRule }
, { LimitPushDownRule }
, { PullUpHopRule }
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/rule/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub(crate) mod bushy_tree_join_ordering_rule;
pub(crate) mod filter_with_now_to_join_rule;
pub(crate) mod simplify_filter_expression_rule;
pub(crate) mod split_now_and_rule;
pub(crate) mod split_now_or_rule;
pub(crate) mod stream_project_merge_rule;
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::DataType;

use crate::expr::{
Expr, ExprImpl, ExprRewriter, ExprType,
};
use crate::optimizer::plan_node::{
ExprRewritable, LogicalFilter, LogicalShare, PlanTreeNodeUnary,
};
use crate::optimizer::rule::{BoxedRule, Rule};
use crate::optimizer::PlanRef;

pub struct SimplifyFilterExpressionRule {}
impl Rule for SimplifyFilterExpressionRule {
/// Currently the rule is only applied to the potential filters in `LogicalShare`
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let filter: &LogicalFilter = plan.as_logical_filter()?;
let mut rewriter = SimplifyFilterExpressionRewriter {};
let logical_share_plan = filter.input();
let share: &LogicalShare = logical_share_plan.as_logical_share()?;
let input = share.input().rewrite_exprs(&mut rewriter);
share.replace_input(input);
Some(LogicalFilter::create(
share.clone().into(),
filter.predicate().clone(),
))
}
}

impl SimplifyFilterExpressionRule {
pub fn create() -> BoxedRule {
Box::new(SimplifyFilterExpressionRule {})
}
}

/// If ever `Not (e)` and `(e)` appear together
fn check_pattern(e1: ExprImpl, e2: ExprImpl) -> bool {
let ExprImpl::FunctionCall(e1_func) = e1.clone() else {
return false;
};
let ExprImpl::FunctionCall(e2_func) = e2.clone() else {
return false;
};
if e1_func.func_type() != ExprType::Not && e2_func.func_type() != ExprType::Not {
return false;
}
if e1_func.func_type() != ExprType::Not {
if e2_func.inputs().len() != 1 {
return false;
}
e1 == e2_func.inputs()[0].clone()
} else {
if e1_func.inputs().len() != 1 {
return false;
}
e2 == e1_func.inputs()[0].clone()
}
}

struct SimplifyFilterExpressionRewriter {}

impl ExprRewriter for SimplifyFilterExpressionRewriter {
/// The pattern we aim to optimize, e.g.,
/// 1. (NOT (e)) OR (e) => True | (NOT (e)) AND (e) => False
/// 2. (NOT (e1) AND NOT (e2)) OR (e1 OR e2) => True # TODO
fn rewrite_expr(&mut self, expr: ExprImpl) -> ExprImpl {
let ExprImpl::FunctionCall(func_call) = expr.clone() else {
return expr;
};
if func_call.func_type() != ExprType::Or && func_call.func_type() != ExprType::And {
return expr;
}
assert_eq!(func_call.return_type(), DataType::Boolean);
// Currently just optimize the first rule
if func_call.inputs().len() != 2 {
return expr;
}
let inputs = func_call.inputs();
if check_pattern(inputs[0].clone(), inputs[1].clone()) {
match func_call.func_type() {
ExprType::Or => ExprImpl::literal_bool(true),
ExprType::And => ExprImpl::literal_bool(false),
_ => expr,
}
} else {
expr
}
}
}
Loading