Skip to content

Commit

Permalink
bring back simplication rule
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Mar 1, 2024
1 parent f3abdab commit 92ce016
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 133 deletions.
131 changes: 0 additions & 131 deletions src/frontend/src/expr/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,137 +605,6 @@ impl WatermarkAnalyzer {
}
}

/// The pattern we aim to optimize, e.g.,
/// 1. (NOT (e)) OR (e) => True | (NOT (e)) AND (e) => False
/// TODO(Zihao): 2. (NOT (e1) AND NOT (e2)) OR (e1 OR e2) => True
pub fn simplify_stream_filter_expression(expr: ExprImpl) -> ExprImpl {
let mut rewriter = SimplifyFilterExpressionRewriter {};
rewriter.rewrite_expr(expr)
}

/// If ever `Not (e)` and `(e)` appear together
/// First return value indicates if the optimizable pattern exist
/// Second return value indicates if the term `e` is either `IsNotNull` or `IsNull`
/// If so, it will contain the actual wrapper `ExprImpl` for that; otherwise it will be `None`
fn check_pattern(e1: ExprImpl, e2: ExprImpl) -> (bool, Option<ExprImpl>) {
fn is_null_or_not_null(func_type: ExprType) -> bool {
func_type == ExprType::IsNull || func_type == ExprType::IsNotNull
}

/// Simply extract every possible `InputRef` out from the input `expr`
fn extract_column(expr: ExprImpl, columns: &mut Vec<ExprImpl>) {
match expr {
ExprImpl::FunctionCall(func_call) => {
// `IsNotNull( ... )` or `IsNull( ... )` will be ignored
if is_null_or_not_null(func_call.func_type()) {
return;
}
for sub_expr in func_call.inputs() {
extract_column(sub_expr.clone(), columns);
}
}
ExprImpl::InputRef(_) => {
columns.push(expr);
}
_ => (),
}
}

/// Try wrapping inner expression with `IsNotNull`
/// Note: only columns (i.e., `InputRef`) will be extracted and connected via `AND`
fn try_wrap_inner_expression(expr: ExprImpl) -> Option<ExprImpl> {
let mut columns = vec![];

extract_column(expr, &mut columns);
if columns.is_empty() {
return None;
}

let mut inputs: Vec<ExprImpl> = vec![];
// From [`c1`, `c2`, ... , `cn`] to [`IsNotNull(c1)`, ... , `IsNotNull(cn)`]
for column in columns {
let Ok(expr) = FunctionCall::new(ExprType::IsNotNull, vec![column]) else {
return None;
};
inputs.push(expr.into());
}

// Connect them with `AND` if multiple columns are involved
// i.e., AND [`IsNotNull(c1)`, ... , `IsNotNull(cn)`]
if inputs.len() > 1 {
let Ok(expr) = FunctionCall::new(ExprType::And, inputs) else {
return None;
};
Some(expr.into())
} else {
Some(inputs[0].clone())
}
}

let ExprImpl::FunctionCall(e1_func) = e1.clone() else {
return (false, None);
};
let ExprImpl::FunctionCall(e2_func) = e2.clone() else {
return (false, None);
};
if e1_func.func_type() != ExprType::Not && e2_func.func_type() != ExprType::Not {
return (false, None);
}
if e1_func.func_type() != ExprType::Not {
if e2_func.inputs().len() != 1 {
return (false, None);
}
(
e1 == e2_func.inputs()[0].clone(),
try_wrap_inner_expression(e1),
)
} else {
if e1_func.inputs().len() != 1 {
return (false, None);
}
(
e2 == e1_func.inputs()[0].clone(),
try_wrap_inner_expression(e2),
)
}
}

struct SimplifyFilterExpressionRewriter {}

impl ExprRewriter for SimplifyFilterExpressionRewriter {
fn rewrite_expr(&mut self, expr: ExprImpl) -> ExprImpl {
let ExprImpl::FunctionCall(func_call) = expr.clone() else {
return expr;
};
if func_call.func_type() != ExprType::Or && func_call.func_type() != ExprType::And {
return expr;
}
assert_eq!(func_call.return_type(), DataType::Boolean);
// Currently just optimize the first rule
if func_call.inputs().len() != 2 {
return expr;
}
let inputs = func_call.inputs();
let (optimizable_flag, columns) = check_pattern(inputs[0].clone(), inputs[1].clone());
if optimizable_flag {
match func_call.func_type() {
ExprType::Or => {
if let Some(columns) = columns {
columns
} else {
ExprImpl::literal_bool(true)
}
}
// `AND` will always be false, no matter the underlying columns are null or not
ExprType::And => ExprImpl::literal_bool(false),
_ => expr,
}
} else {
expr
}
}
}

#[cfg(test)]
mod tests {
use risingwave_common::types::{DataType, ScalarImpl};
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub use stream::filter_with_now_to_join_rule::*;
pub use stream::split_now_and_rule::*;
pub use stream::split_now_or_rule::*;
pub use stream::stream_project_merge_rule::*;
pub use stream::stream_filter_expression_simplify_rule::*;
mod trivial_project_to_values_rule;
pub use trivial_project_to_values_rule::*;
mod union_input_values_merge_rule;
Expand Down Expand Up @@ -204,6 +205,7 @@ macro_rules! for_all_rules {
, { AlwaysFalseFilterRule }
, { BushyTreeJoinOrderingRule }
, { StreamProjectMergeRule }
, { StreamFilterExpressionSimplifyRule }
, { JoinProjectTransposeRule }
, { LimitPushDownRule }
, { PullUpHopRule }
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/rule/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ pub(crate) mod filter_with_now_to_join_rule;
pub(crate) mod split_now_and_rule;
pub(crate) mod split_now_or_rule;
pub(crate) mod stream_project_merge_rule;
pub(crate) mod stream_filter_expression_simplify_rule;
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_connector::source::DataType;

use crate::expr::{
Expr, ExprImpl, ExprRewriter, FunctionCall,
};
use crate::expr::ExprType;
use crate::optimizer::plan_node::{ExprRewritable, LogicalFilter, LogicalShare, PlanTreeNodeUnary};
use crate::optimizer::rule::Rule;
use crate::optimizer::PlanRef;

pub struct StreamFilterExpressionSimplifyRule {}
impl Rule for StreamFilterExpressionSimplifyRule {
/// The pattern we aim to optimize, e.g.,
/// 1. (NOT (e)) OR (e) => True | (NOT (e)) AND (e) => False
/// TODO(Zihao): 2. (NOT (e1) AND NOT (e2)) OR (e1 OR e2) => True
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let filter: &LogicalFilter = plan.as_logical_filter()?;
let mut rewriter = StreamFilterExpressionSimplifyRewriter {};
let logical_share_plan = filter.input();
let share: &LogicalShare = logical_share_plan.as_logical_share()?;
let input = share.input().rewrite_exprs(&mut rewriter);
let share = LogicalShare::new(input);
Some(LogicalFilter::create(share.into(), filter.predicate().clone()))
}
}

/// If ever `Not (e)` and `(e)` appear together
/// First return value indicates if the optimizable pattern exist
/// Second return value indicates if the term `e` is either `IsNotNull` or `IsNull`
/// If so, it will contain the actual wrapper `ExprImpl` for that; otherwise it will be `None`
fn check_pattern(e1: ExprImpl, e2: ExprImpl) -> (bool, Option<ExprImpl>) {
fn is_null_or_not_null(func_type: ExprType) -> bool {
func_type == ExprType::IsNull || func_type == ExprType::IsNotNull
}

/// Simply extract every possible `InputRef` out from the input `expr`
fn extract_column(expr: ExprImpl, columns: &mut Vec<ExprImpl>) {
match expr {
ExprImpl::FunctionCall(func_call) => {
// `IsNotNull( ... )` or `IsNull( ... )` will be ignored
if is_null_or_not_null(func_call.func_type()) {
return;
}
for sub_expr in func_call.inputs() {
extract_column(sub_expr.clone(), columns);
}
}
ExprImpl::InputRef(_) => {
columns.push(expr);
}
_ => (),
}
}

/// Try wrapping inner expression with `IsNotNull`
/// Note: only columns (i.e., `InputRef`) will be extracted and connected via `AND`
fn try_wrap_inner_expression(expr: ExprImpl) -> Option<ExprImpl> {
let mut columns = vec![];

extract_column(expr, &mut columns);
if columns.is_empty() {
return None;
}

let mut inputs: Vec<ExprImpl> = vec![];
// From [`c1`, `c2`, ... , `cn`] to [`IsNotNull(c1)`, ... , `IsNotNull(cn)`]
for column in columns {
let Ok(expr) = FunctionCall::new(ExprType::IsNotNull, vec![column]) else {
return None;
};
inputs.push(expr.into());
}

// Connect them with `AND` if multiple columns are involved
// i.e., AND [`IsNotNull(c1)`, ... , `IsNotNull(cn)`]
if inputs.len() > 1 {
let Ok(expr) = FunctionCall::new(ExprType::And, inputs) else {
return None;
};
Some(expr.into())
} else {
Some(inputs[0].clone())
}
}

let ExprImpl::FunctionCall(e1_func) = e1.clone() else {
return (false, None);
};
let ExprImpl::FunctionCall(e2_func) = e2.clone() else {
return (false, None);
};
if e1_func.func_type() != ExprType::Not && e2_func.func_type() != ExprType::Not {
return (false, None);
}
if e1_func.func_type() != ExprType::Not {
if e2_func.inputs().len() != 1 {
return (false, None);
}
(
e1 == e2_func.inputs()[0].clone(),
try_wrap_inner_expression(e1),
)
} else {
if e1_func.inputs().len() != 1 {
return (false, None);
}
(
e2 == e1_func.inputs()[0].clone(),
try_wrap_inner_expression(e2),
)
}
}

struct StreamFilterExpressionSimplifyRewriter {}
impl ExprRewriter for StreamFilterExpressionSimplifyRewriter {
fn rewrite_expr(&mut self, expr: ExprImpl) -> ExprImpl {
let ExprImpl::FunctionCall(func_call) = expr.clone() else {
return expr;
};
if func_call.func_type() != ExprType::Or && func_call.func_type() != ExprType::And {
return expr;
}
assert_eq!(func_call.return_type(), DataType::Boolean);
// Currently just optimize the first rule
if func_call.inputs().len() != 2 {
return expr;
}
let inputs = func_call.inputs();
let (optimizable_flag, columns) = check_pattern(inputs[0].clone(), inputs[1].clone());
if optimizable_flag {
match func_call.func_type() {
ExprType::Or => {
if let Some(columns) = columns {
columns
} else {
ExprImpl::literal_bool(true)
}
}
// `AND` will always be false, no matter the underlying columns are null or not
ExprType::And => ExprImpl::literal_bool(false),
_ => expr,
}
} else {
expr
}
}
}
3 changes: 1 addition & 2 deletions src/frontend/src/utils/condition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::util::scan_range::{is_full_range, ScanRange};
use crate::error::Result;
use crate::expr::{
collect_input_refs, factorization_expr, fold_boolean_constant, push_down_not,
simplify_stream_filter_expression, to_conjunctions, try_get_bool_constant, ExprDisplay,
to_conjunctions, try_get_bool_constant, ExprDisplay,
ExprImpl, ExprMutator, ExprRewriter, ExprType, ExprVisitor, FunctionCall, InequalityInputPair,
InputRef,
};
Expand Down Expand Up @@ -850,7 +850,6 @@ impl Condition {
.into_iter()
.map(push_down_not)
.map(fold_boolean_constant)
.map(simplify_stream_filter_expression)
.flat_map(to_conjunctions)
.collect();
let mut res: Vec<ExprImpl> = Vec::new();
Expand Down

0 comments on commit 92ce016

Please sign in to comment.