Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(temporal-filter): support more now expressions in temporal filter pattern #17745

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,96 +3,96 @@
create table t1 (ts timestamp with time zone);
select * from t1 where ts + interval '1 hour' > now();
expected_outputs:
- stream_plan
- stream_plan
- name: Temporal filter works on complex columns on LHS (part 2)
sql: |
create table t1 (ts timestamp with time zone, time_to_live interval);
select * from t1 where ts + time_to_live * 1.5 > now();
expected_outputs:
- stream_plan
- stream_plan
- name: Temporal filter works on complex columns on LHS (part 2, flipped)
sql: |
create table t1 (ts timestamp with time zone, additional_time_to_live interval);
select * from t1 where now() - interval '15 minutes' < ts + additional_time_to_live * 1.5;
expected_outputs:
- stream_plan
- stream_plan
- name: Temporal filter with `now()` in upper bound
sql: |-
create table t1 (ts timestamp with time zone);
select * from t1 where now() - interval '15 minutes' > ts;
expected_outputs:
- stream_plan
- stream_dist_plan
- stream_plan
- stream_dist_plan
- name: Temporal filter with equal condition
sql: |-
create table t1 (ts timestamp with time zone);
select * from t1 where date_trunc('week', now()) = date_trunc('week',ts);
expected_outputs:
- stream_plan
- stream_dist_plan
- stream_plan
- stream_dist_plan
- name: Temporal filter with `now()` in upper bound on append only table
sql: |-
create table t1 (ts timestamp with time zone) APPEND ONLY;
select * from t1 where now() - interval '15 minutes' > ts;
expected_outputs:
- stream_plan
- stream_dist_plan
- stream_plan
- stream_dist_plan
- name: Temporal filter reorders now expressions correctly
sql: |
create table t1 (ts timestamp with time zone);
select * from t1 where ts < now() - interval '1 hour' and ts >= now() - interval '2 hour';
expected_outputs:
- stream_plan
- stream_dist_plan
- stream_plan
- stream_dist_plan
- name: Temporal filter in on clause for inner join's left side
sql: |
create table t1 (a int, ta timestamp with time zone);
create table t2 (b int, tb timestamp with time zone);
select * from t1 join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour';
expected_outputs:
- stream_plan
- stream_plan
- name: Temporal filter in on clause for left join's left side
sql: |
create table t1 (a int, ta timestamp with time zone);
create table t2 (b int, tb timestamp with time zone);
select * from t1 left join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour';
expected_outputs:
- stream_error
- stream_error
- name: Temporal filter in on clause for right join's left side
sql: |
create table t1 (a int, ta timestamp with time zone);
create table t2 (b int, tb timestamp with time zone);
select * from t1 right join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour';
expected_outputs:
- stream_plan
- stream_plan
- name: Temporal filter in on clause for full join's left side
sql: |
create table t1 (a int, ta timestamp with time zone);
create table t2 (b int, tb timestamp with time zone);
select * from t1 full join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour';
expected_outputs:
- stream_error
- stream_error
- name: Temporal filter in on clause for left join's right side
sql: |
create table t1 (a int, ta timestamp with time zone);
create table t2 (b int, tb timestamp with time zone);
select * from t1 left join t2 on a = b AND tb < now() - interval '1 hour' and tb >= now() - interval '2 hour';
expected_outputs:
- stream_plan
- stream_plan
- name: Temporal filter in on clause for right join's right side
sql: |
create table t1 (a int, ta timestamp with time zone);
create table t2 (b int, tb timestamp with time zone);
select * from t1 right join t2 on a = b AND tb < now() - interval '1 hour' and tb >= now() - interval '2 hour';
expected_outputs:
- stream_error
- stream_error
- name: Temporal filter after temporal join
sql: |
create table stream(id1 int, a1 int, b1 int, v1 timestamp with time zone) APPEND ONLY;
create table version(id2 int, a2 int, b2 int, primary key (id2));
select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where v1 > now();
expected_outputs:
- stream_plan
- stream_plan
- name: Temporal filter with or predicate
sql: |
create table t1 (ts timestamp with time zone);
Expand All @@ -116,4 +116,16 @@
create table t (t timestamp with time zone, a int);
select * from t where (t > NOW() - INTERVAL '1 hour' OR t is NULL OR a < 1) AND (t < NOW() - INTERVAL '1 hour' OR a > 1);
expected_outputs:
- stream_plan
- stream_plan
- name: Non-trivial now expression
sql: |
create table t (ts timestamp with time zone, a int);
select * from t where ts + interval '1 hour' > date_trunc('day', now());
expected_outputs:
- stream_plan
- name: Non-trivial now expression 2
sql: |
create table t (ts timestamp with time zone, a int);
select * from t where ts + interval '1 hour' > date_trunc('day', ('2024-07-18 00:00:00+00:00'::timestamptz - ('2024-07-18 00:00:00+00:00'::timestamptz - now())));
expected_outputs:
- stream_plan
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,29 @@
└─StreamShare { id: 2 }
└─StreamFilter { predicate: IsNotNull(t.a) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) }
└─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
- name: Non-trivial now expression
sql: |
create table t (ts timestamp with time zone, a int);
select * from t where ts + interval '1 hour' > date_trunc('day', now());
stream_plan: |-
StreamMaterialize { columns: [ts, a, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.ts, t.a, t._row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [t.ts, t.a, $expr1, t._row_id], cleaned_by_watermark: true }
├─StreamProject { exprs: [t.ts, t.a, AddWithTimeZone(t.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.ts, t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [DateTrunc('day':Varchar, now, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] }
└─StreamNow { output: [now] }
- name: Non-trivial now expression 2
sql: |
create table t (ts timestamp with time zone, a int);
select * from t where ts + interval '1 hour' > date_trunc('day', ('2024-07-18 00:00:00+00:00'::timestamptz - ('2024-07-18 00:00:00+00:00'::timestamptz - now())));
stream_plan: |-
StreamMaterialize { columns: [ts, a, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck }
└─StreamProject { exprs: [t.ts, t.a, t._row_id] }
└─StreamDynamicFilter { predicate: ($expr1 > $expr2), output: [t.ts, t.a, $expr1, t._row_id] }
├─StreamProject { exprs: [t.ts, t.a, AddWithTimeZone(t.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t._row_id] }
│ └─StreamTableScan { table: t, columns: [t.ts, t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: Broadcast }
└─StreamProject { exprs: [DateTrunc('day':Varchar, SubtractWithTimeZone('2024-07-18 00:00:00+00:00':Timestamptz, ('2024-07-18 00:00:00+00:00':Timestamptz - now), 'UTC':Varchar), 'UTC':Varchar) as $expr2] }
└─StreamNow { output: [now] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add more test such as

  • xxx < now_expression
  • xxx = now_expression
  • a non-monotonic now expression such as
create table t (ts timestamp with time zone, c text, i int);
explain create materialized view mv as select * from t where i > extract(hour from now()) ;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have many xxx < now_expression and xxx = now_expression. Just added a non-mono expression test for failing case.

48 changes: 11 additions & 37 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,45 +768,36 @@ impl ExprImpl {
}
}

/// Accepts expressions of the form `input_expr cmp now() [+- const_expr]` or
/// `now() [+- const_expr] cmp input_expr`, where `input_expr` contains an
/// `InputRef` and contains no `now()`.
/// Accepts expressions of the form `input_expr cmp now_expr` or `now_expr cmp input_expr`,
/// where `input_expr` contains an `InputRef` and contains no `now()`, and `now_expr`
/// contains a `now()` but no `InputRef`.
///
/// Canonicalizes to the first ordering and returns `(input_expr, cmp, now_expr)`
pub fn as_now_comparison_cond(&self) -> Option<(ExprImpl, ExprType, ExprImpl)> {
if let ExprImpl::FunctionCall(function_call) = self {
match function_call.func_type() {
ty @ (ExprType::LessThan
ty @ (ExprType::Equal
| ExprType::LessThan
| ExprType::LessThanOrEqual
| ExprType::GreaterThan
| ExprType::GreaterThanOrEqual) => {
let (_, op1, op2) = function_call.clone().decompose_as_binary();
if op1.count_nows() == 0
if !op1.has_now()
&& op1.has_input_ref()
&& op2.count_nows() > 0
&& op2.is_now_offset()
&& op2.has_now()
&& !op2.has_input_ref()
{
Some((op1, ty, op2))
} else if op2.count_nows() == 0
} else if op1.has_now()
&& !op1.has_input_ref()
&& !op2.has_now()
&& op2.has_input_ref()
&& op1.count_nows() > 0
&& op1.is_now_offset()
{
Some((op2, Self::reverse_comparison(ty), op1))
} else {
None
}
}
ty @ ExprType::Equal => {
let (_, op1, op2) = function_call.clone().decompose_as_binary();
if op1.count_nows() == 0 && op1.has_input_ref() && op2.count_nows() > 0 {
Some((op1, ty, op2))
} else if op2.count_nows() == 0 && op2.has_input_ref() && op1.count_nows() > 0 {
Some((op2, Self::reverse_comparison(ty), op1))
} else {
None
}
}
_ => None,
}
} else {
Expand Down Expand Up @@ -862,23 +853,6 @@ impl ExprImpl {
}
}

/// Checks if expr is of the form `now() [+- const_expr]`
fn is_now_offset(&self) -> bool {
if let ExprImpl::Now(_) = self {
true
} else if let ExprImpl::FunctionCall(f) = self {
match f.func_type() {
ExprType::Add | ExprType::Subtract => {
let (_, lhs, rhs) = f.clone().decompose_as_binary();
lhs.is_now_offset() && rhs.is_const()
}
_ => false,
}
} else {
false
}
}

/// Returns the `InputRef` and offset of a predicate if it matches
/// the form `InputRef [+- const_expr]`, else returns None.
fn as_input_offset(&self) -> Option<(usize, Option<(ExprType, ExprImpl)>)> {
Expand Down
Loading