Skip to content

Commit

Permalink
feat: support multiple temporal filter with or (#14382)
Browse files Browse the repository at this point in the history
  • Loading branch information
st1page authored Jan 5, 2024
1 parent ed4101f commit 7954da3
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,11 @@
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now() or ts > ' 2023-12-18 00:00:00+00:00' or ts is null;
expected_outputs:
- stream_plan
- name: Many Temporal filter with or predicate
sql: |
create table t (t timestamp with time zone, a int);
select * from t where (t > NOW() - INTERVAL '1 hour' OR t is NULL OR a < 1) AND (t < NOW() - INTERVAL '1 hour' OR a > 1);
expected_outputs:
- stream_plan
123 changes: 79 additions & 44 deletions src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/logical_optimization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ static BUSHY_TREE_JOIN_ORDERING: LazyLock<OptimizationStage> = LazyLock::new(||
static FILTER_WITH_NOW_TO_JOIN: LazyLock<OptimizationStage> = LazyLock::new(|| {
OptimizationStage::new(
"Push down filter with now into a left semijoin",
vec![SplitNowOrRule::create(), FilterWithNowToJoinRule::create()],
vec![
SplitNowAndRule::create(),
SplitNowOrRule::create(),
FilterWithNowToJoinRule::create(),
],
ApplyOrder::TopDown,
)
});
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub use top_n_on_index_rule::*;
mod stream;
pub use stream::bushy_tree_join_ordering_rule::*;
pub use stream::filter_with_now_to_join_rule::*;
pub use stream::split_now_and_rule::*;
pub use stream::split_now_or_rule::*;
pub use stream::stream_project_merge_rule::*;
mod trivial_project_to_values_rule;
Expand Down Expand Up @@ -190,6 +191,7 @@ macro_rules! for_all_rules {
, { AggProjectMergeRule }
, { UnionMergeRule }
, { DagToTreeRule }
, { SplitNowAndRule }
, { SplitNowOrRule }
, { FilterWithNowToJoinRule }
, { TopNOnIndexRule }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use risingwave_common::types::DataType;
use risingwave_pb::expr::expr_node::Type;
use risingwave_pb::plan_common::JoinType;

use crate::expr::{
Expand Down Expand Up @@ -55,11 +54,6 @@ impl Rule for FilterWithNowToJoinRule {
}
});

// We want to put `input_expr >/>= now_expr` before `input_expr </<= now_expr` as the former
// will introduce a watermark that can reduce state (since `now_expr` is monotonically
// increasing)
now_filters.sort_by_key(|l| rank_cmp(l.func_type()));

// Ignore no now filter
if now_filters.is_empty() {
return None;
Expand Down Expand Up @@ -98,14 +92,6 @@ impl FilterWithNowToJoinRule {
}
}

fn rank_cmp(cmp: Type) -> u8 {
match cmp {
Type::GreaterThan | Type::GreaterThanOrEqual => 0,
Type::LessThan | Type::LessThanOrEqual => 1,
_ => 2,
}
}

struct NowAsInputRef {
index: usize,
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/rule/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@

pub(crate) mod bushy_tree_join_ordering_rule;
pub(crate) mod filter_with_now_to_join_rule;
pub(crate) mod split_now_and_rule;
pub(crate) mod split_now_or_rule;
pub(crate) mod stream_project_merge_rule;
84 changes: 84 additions & 0 deletions src/frontend/src/optimizer/rule/stream/split_now_and_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::optimizer::plan_node::{LogicalFilter, PlanTreeNodeUnary};
use crate::optimizer::rule::{BoxedRule, Rule};
use crate::optimizer::PlanRef;
use crate::utils::Condition;

/// Split `LogicalFilter` with many AND conjunctions with now into multiple `LogicalFilter`, prepared for `SplitNowOrRule`
///
/// Before:
/// ```text
/// `LogicalFilter`
/// (now() or c11 or c12 ..) and (now() or c21 or c22 ...) and .. and other exprs
/// |
/// Input
/// ```
///
/// After:
/// ```text
/// `LogicalFilter`(now() or c11 or c12 ..)
/// |
/// `LogicalFilter`(now() or c21 or c22 ...)
/// |
/// ......
/// |
/// `LogicalFilter` other exprs
/// |
/// Input
/// ```
pub struct SplitNowAndRule {}
impl Rule for SplitNowAndRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let filter: &LogicalFilter = plan.as_logical_filter()?;
let input = filter.input();
if filter.predicate().conjunctions.len() == 1 {
return None;
}

if filter
.predicate()
.conjunctions
.iter()
.all(|e| e.count_nows() == 0)
{
return None;
}

let [with_now, others] =
filter
.predicate()
.clone()
.group_by::<_, 2>(|e| if e.count_nows() > 0 { 0 } else { 1 });

let mut plan = LogicalFilter::create(input, others);
for e in with_now {
plan = LogicalFilter::new(
plan,
Condition {
conjunctions: vec![e],
},
)
.into();
}
Some(plan)
}
}

impl SplitNowAndRule {
pub fn create() -> BoxedRule {
Box::new(SplitNowAndRule {})
}
}

0 comments on commit 7954da3

Please sign in to comment.