Skip to content

Commit

Permalink
feat(frontend): Reorder cmp expressions and require now() lower bou…
Browse files Browse the repository at this point in the history
…nd for `TemporalFilter` (#7497)

Reorder cmp expressions and require now lower bound for `TemporalFilter`

Approved-By: soundOfDestiny

Co-Authored-By: jon-chuang <[email protected]>
Co-Authored-By: Liang Zhao <[email protected]>
  • Loading branch information
3 people authored Jan 20, 2023
1 parent 2e28fd3 commit da83a65
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 49 deletions.
37 changes: 0 additions & 37 deletions src/frontend/planner_test/tests/testdata/dynamic_filter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,40 +182,3 @@
└─StreamHashAgg { group_key: [Vnode(t2._row_id)], aggs: [count, max(t2.v2)] }
└─StreamProject { exprs: [t2.v2, t2._row_id, Vnode(t2._row_id)] }
└─StreamTableScan { table: t2, columns: [t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: Temporal filter works on complex columns on LHS
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now();
stream_plan: |
StreamMaterialize { columns: [ts, t1._row_id(hidden)], pk_columns: [t1._row_id] }
└─StreamProject { exprs: [t1.ts, t1._row_id] }
└─StreamDynamicFilter { predicate: ((t1.ts + '01:00:00':Interval) > now), output: [t1.ts, (t1.ts + '01:00:00':Interval), t1._row_id] }
├─StreamProject { exprs: [t1.ts, (t1.ts + '01:00:00':Interval), t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
- name: Temporal filter works on complex columns on LHS (part 2)
sql: |
create table t1 (ts timestamp with time zone, time_to_live interval);
select * from t1 where ts + time_to_live * 1.5 > now();
stream_plan: |
StreamMaterialize { columns: [ts, time_to_live, t1._row_id(hidden)], pk_columns: [t1._row_id] }
└─StreamProject { exprs: [t1.ts, t1.time_to_live, t1._row_id] }
└─StreamDynamicFilter { predicate: ((t1.ts + (t1.time_to_live * 1.5:Decimal)) > now), output: [t1.ts, t1.time_to_live, (t1.ts + (t1.time_to_live * 1.5:Decimal)), t1._row_id] }
├─StreamProject { exprs: [t1.ts, t1.time_to_live, (t1.ts + (t1.time_to_live * 1.5:Decimal)), t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.time_to_live, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
- name: Temporal filter works on complex columns on LHS (part 2, flipped)
sql: |
create table t1 (ts timestamp with time zone, additional_time_to_live interval);
select * from t1 where now() - interval '15 minutes' > ts + additional_time_to_live * 1.5;
stream_plan: |
StreamMaterialize { columns: [ts, additional_time_to_live, t1._row_id(hidden)], pk_columns: [t1._row_id] }
└─StreamProject { exprs: [t1.ts, t1.additional_time_to_live, t1._row_id] }
└─StreamDynamicFilter { predicate: ((t1.ts + (t1.additional_time_to_live * 1.5:Decimal)) < (now - '00:15:00':Interval)), output: [t1.ts, t1.additional_time_to_live, (t1.ts + (t1.additional_time_to_live * 1.5:Decimal)), t1._row_id] }
├─StreamProject { exprs: [t1.ts, t1.additional_time_to_live, (t1.ts + (t1.additional_time_to_live * 1.5:Decimal)), t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.additional_time_to_live, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(now - '00:15:00':Interval)], watermark_columns: [(now - '00:15:00':Interval)] }
└─StreamNow { output: [now] }
58 changes: 58 additions & 0 deletions src/frontend/planner_test/tests/testdata/temporal_filter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- name: Temporal filter works on complex columns on LHS
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now();
stream_plan: |
StreamMaterialize { columns: [ts, t1._row_id(hidden)], pk_columns: [t1._row_id] }
└─StreamProject { exprs: [t1.ts, t1._row_id] }
└─StreamDynamicFilter { predicate: ((t1.ts + '01:00:00':Interval) > now), output: [t1.ts, (t1.ts + '01:00:00':Interval), t1._row_id] }
├─StreamProject { exprs: [t1.ts, (t1.ts + '01:00:00':Interval), t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
- name: Temporal filter works on complex columns on LHS (part 2)
sql: |
create table t1 (ts timestamp with time zone, time_to_live interval);
select * from t1 where ts + time_to_live * 1.5 > now();
stream_plan: |
StreamMaterialize { columns: [ts, time_to_live, t1._row_id(hidden)], pk_columns: [t1._row_id] }
└─StreamProject { exprs: [t1.ts, t1.time_to_live, t1._row_id] }
└─StreamDynamicFilter { predicate: ((t1.ts + (t1.time_to_live * 1.5:Decimal)) > now), output: [t1.ts, t1.time_to_live, (t1.ts + (t1.time_to_live * 1.5:Decimal)), t1._row_id] }
├─StreamProject { exprs: [t1.ts, t1.time_to_live, (t1.ts + (t1.time_to_live * 1.5:Decimal)), t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.time_to_live, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamNow { output: [now] }
- name: Temporal filter works on complex columns on LHS (part 2, flipped)
sql: |
create table t1 (ts timestamp with time zone, additional_time_to_live interval);
select * from t1 where now() - interval '15 minutes' < ts + additional_time_to_live * 1.5;
stream_plan: |
StreamMaterialize { columns: [ts, additional_time_to_live, t1._row_id(hidden)], pk_columns: [t1._row_id] }
└─StreamProject { exprs: [t1.ts, t1.additional_time_to_live, t1._row_id] }
└─StreamDynamicFilter { predicate: ((t1.ts + (t1.additional_time_to_live * 1.5:Decimal)) > (now - '00:15:00':Interval)), output: [t1.ts, t1.additional_time_to_live, (t1.ts + (t1.additional_time_to_live * 1.5:Decimal)), t1._row_id] }
├─StreamProject { exprs: [t1.ts, t1.additional_time_to_live, (t1.ts + (t1.additional_time_to_live * 1.5:Decimal)), t1._row_id] }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.additional_time_to_live, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(now - '00:15:00':Interval)], watermark_columns: [(now - '00:15:00':Interval)] }
└─StreamNow { output: [now] }
- name: Temporal filter fails without `now()` in lower bound
sql: |-
create table t1 (ts timestamp with time zone);
select * from t1 where now() - interval '15 minutes' > ts;
stream_error: 'internal error: All `now()` exprs were valid, but the condition must have at least one now expr as a lower bound.'
- name: Temporal filter reorders now expressions correctly
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts < now() - interval '1 hour' and ts >= now() - interval '2 hour';
stream_plan: |
StreamMaterialize { columns: [ts, t1._row_id(hidden)], pk_columns: [t1._row_id] }
└─StreamDynamicFilter { predicate: (t1.ts < (now - '01:00:00':Interval)), output: [t1.ts, t1._row_id] }
├─StreamDynamicFilter { predicate: (t1.ts >= (now - '02:00:00':Interval)), output: [t1.ts, t1._row_id] }
| ├─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
| └─StreamExchange { dist: Broadcast }
| └─StreamProject { exprs: [(now - '02:00:00':Interval)], watermark_columns: [(now - '02:00:00':Interval)] }
| └─StreamNow { output: [now] }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [(now - '01:00:00':Interval)], watermark_columns: [(now - '01:00:00':Interval)] }
└─StreamNow { output: [now] }
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/logical_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl ToStream for LogicalFilter {
if predicate
.conjunctions
.iter()
.any(|expr| expr.as_now_comparison_cond().is_none())
.any(|expr| expr.count_nows() > 0 && expr.as_now_comparison_cond().is_none())
{
bail!(
"Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or \
Expand All @@ -216,7 +216,7 @@ impl ToStream for LogicalFilter {
);
}
bail!(
"Valid now exprs should have been pushed down into left semi join a `Now` operator as the RHS input"
"All `now()` exprs were valid, but the condition must have at least one now expr as a lower bound."
);
}
let new_logical = self.clone_with_input(new_input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use risingwave_common::types::DataType;
use risingwave_pb::expr::expr_node::Type;
use risingwave_pb::plan_common::JoinType;

use crate::expr::{ExprRewriter, FunctionCall, InputRef};
use crate::expr::{try_derive_watermark, ExprRewriter, FunctionCall, InputRef};
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalJoin, LogicalNow};
use crate::optimizer::rule::{BoxedRule, Rule};
Expand All @@ -29,7 +29,6 @@ pub struct FilterWithNowToJoinRule {}
impl Rule for FilterWithNowToJoinRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let filter: &LogicalFilter = plan.as_logical_filter()?;
// if filter.predicate().conjunctions.iter().(|)

let lhs_len = filter.base.schema().len();

Expand All @@ -38,21 +37,33 @@ impl Rule for FilterWithNowToJoinRule {

let mut rewriter = NowAsInputRef::new(lhs_len);

// If the `now` is not a valid dynamic filter expression,
// If the `now` is not a valid dynamic filter expression, we will not push it down.
filter.predicate().conjunctions.iter().for_each(|expr| {
if let Some((input_expr, cmp, now_expr)) = expr.as_now_comparison_cond() {
let now_expr = rewriter.rewrite_expr(now_expr);
now_filters.push(
FunctionCall::new(cmp, vec![input_expr, now_expr])
.unwrap()
.into(),
);

// as a sanity check, ensure that this expression will derive a watermark
// on the output of the now executor
debug_assert_eq!(try_derive_watermark(&now_expr), Some(lhs_len));

now_filters.push(FunctionCall::new(cmp, vec![input_expr, now_expr]).unwrap());
} else {
remainder.push(expr.clone());
}
});

if now_filters.is_empty() {
// We want to put `input_expr >/>= now_expr` before `input_expr </<= now_expr` as the former
// will introduce a watermark that can reduce state (since `now_expr` is monotonically
// increasing)
now_filters.sort_by_key(|l| rank_cmp(l.get_expr_type()));

// Ignore no now filter & forbid now filters that do not create a watermark
if now_filters.is_empty()
|| !matches!(
now_filters[0].get_expr_type(),
Type::GreaterThan | Type::GreaterThanOrEqual
)
{
return None;
}
let mut new_plan = plan.inputs()[0].clone();
Expand All @@ -63,7 +74,7 @@ impl Rule for FilterWithNowToJoinRule {
LogicalNow::new(plan.ctx()).into(),
JoinType::LeftSemi,
Condition {
conjunctions: vec![now_filter],
conjunctions: vec![now_filter.into()],
},
)
.into()
Expand All @@ -89,6 +100,14 @@ impl FilterWithNowToJoinRule {
}
}

fn rank_cmp(cmp: Type) -> u8 {
match cmp {
Type::GreaterThan | Type::GreaterThanOrEqual => 0,
Type::LessThan | Type::LessThanOrEqual => 1,
_ => 2,
}
}

struct NowAsInputRef {
index: usize,
}
Expand Down

0 comments on commit da83a65

Please sign in to comment.