diff --git a/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml index 6bd62c1ce4d61..ce8fc00e15496 100644 --- a/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml @@ -3,96 +3,96 @@ create table t1 (ts timestamp with time zone); select * from t1 where ts + interval '1 hour' > now(); expected_outputs: - - stream_plan + - stream_plan - name: Temporal filter works on complex columns on LHS (part 2) sql: | create table t1 (ts timestamp with time zone, time_to_live interval); select * from t1 where ts + time_to_live * 1.5 > now(); expected_outputs: - - stream_plan + - stream_plan - name: Temporal filter works on complex columns on LHS (part 2, flipped) sql: | create table t1 (ts timestamp with time zone, additional_time_to_live interval); select * from t1 where now() - interval '15 minutes' < ts + additional_time_to_live * 1.5; expected_outputs: - - stream_plan + - stream_plan - name: Temporal filter with `now()` in upper bound sql: |- create table t1 (ts timestamp with time zone); select * from t1 where now() - interval '15 minutes' > ts; expected_outputs: - - stream_plan - - stream_dist_plan + - stream_plan + - stream_dist_plan - name: Temporal filter with equal condition sql: |- create table t1 (ts timestamp with time zone); - select * from t1 where date_trunc('week', now()) = date_trunc('week',ts); + select * from t1 where date_trunc('week', now()) = date_trunc('week',ts); expected_outputs: - - stream_plan - - stream_dist_plan + - stream_plan + - stream_dist_plan - name: Temporal filter with `now()` in upper bound on append only table sql: |- create table t1 (ts timestamp with time zone) APPEND ONLY; select * from t1 where now() - interval '15 minutes' > ts; expected_outputs: - - stream_plan - - stream_dist_plan + - stream_plan + - stream_dist_plan - name: Temporal filter reorders now expressions correctly sql: | create table t1 (ts timestamp with time zone); select * from t1 where ts < now() - interval '1 hour' and ts >= now() - interval '2 hour'; expected_outputs: - - stream_plan - - stream_dist_plan + - stream_plan + - stream_dist_plan - name: Temporal filter in on clause for inner join's left side sql: | create table t1 (a int, ta timestamp with time zone); create table t2 (b int, tb timestamp with time zone); select * from t1 join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour'; expected_outputs: - - stream_plan + - stream_plan - name: Temporal filter in on clause for left join's left side sql: | create table t1 (a int, ta timestamp with time zone); create table t2 (b int, tb timestamp with time zone); select * from t1 left join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour'; expected_outputs: - - stream_error + - stream_error - name: Temporal filter in on clause for right join's left side sql: | create table t1 (a int, ta timestamp with time zone); create table t2 (b int, tb timestamp with time zone); select * from t1 right join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour'; expected_outputs: - - stream_plan + - stream_plan - name: Temporal filter in on clause for full join's left side sql: | create table t1 (a int, ta timestamp with time zone); create table t2 (b int, tb timestamp with time zone); select * from t1 full join t2 on a = b AND ta < now() - interval '1 hour' and ta >= now() - interval '2 hour'; expected_outputs: - - stream_error + - stream_error - name: Temporal filter in on clause for left join's right side sql: | create table t1 (a int, ta timestamp with time zone); create table t2 (b int, tb timestamp with time zone); select * from t1 left join t2 on a = b AND tb < now() - interval '1 hour' and tb >= now() - interval '2 hour'; expected_outputs: - - stream_plan + - stream_plan - name: Temporal filter in on clause for right join's right side sql: | create table t1 (a int, ta timestamp with time zone); create table t2 (b int, tb timestamp with time zone); select * from t1 right join t2 on a = b AND tb < now() - interval '1 hour' and tb >= now() - interval '2 hour'; expected_outputs: - - stream_error + - stream_error - name: Temporal filter after temporal join sql: | create table stream(id1 int, a1 int, b1 int, v1 timestamp with time zone) APPEND ONLY; create table version(id2 int, a2 int, b2 int, primary key (id2)); select id1, a1, id2, v1 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where v1 > now(); expected_outputs: - - stream_plan + - stream_plan - name: Temporal filter with or predicate sql: | create table t1 (ts timestamp with time zone); @@ -116,4 +116,22 @@ create table t (t timestamp with time zone, a int); select * from t where (t > NOW() - INTERVAL '1 hour' OR t is NULL OR a < 1) AND (t < NOW() - INTERVAL '1 hour' OR a > 1); expected_outputs: - - stream_plan \ No newline at end of file + - stream_plan +- name: Non-trivial now expression + sql: | + create table t (ts timestamp with time zone, a int); + select * from t where ts + interval '1 hour' > date_trunc('day', now()); + expected_outputs: + - stream_plan +- name: Non-trivial now expression 2 + sql: | + create table t (ts timestamp with time zone, a int); + select * from t where ts + interval '1 hour' > date_trunc('day', ('2024-07-18 00:00:00+00:00'::timestamptz - ('2024-07-18 00:00:00+00:00'::timestamptz - now()))); + expected_outputs: + - stream_plan +- name: Non-monotonic now expression + sql: | + create table t (ts timestamp with time zone, a int); + select * from t where a > extract(hour from now()); + expected_outputs: + - stream_error diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index f88f7c4d69b76..d24d0f0eeba18 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -520,7 +520,7 @@ sql: | create table t (v1 timestamp with time zone, v2 timestamp with time zone); select * from t where v1 >= now() or v2 >= now(); - stream_error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`. + stream_error: Conditions containing now must be in the form of `input_expr cmp now_expr` or `now_expr cmp input_expr`, where `input_expr` references a column and contains no `now()`, and `now_expr` is a non-decreasing expression contains `now()`. - name: now inside HAVING clause sql: | create table t (v1 timestamp with time zone, v2 int); diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml index 7bbd43ce3c35c..edc3bb6c364c2 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml @@ -81,7 +81,7 @@ - name: Temporal filter with equal condition sql: |- create table t1 (ts timestamp with time zone); - select * from t1 where date_trunc('week', now()) = date_trunc('week',ts); + select * from t1 where date_trunc('week', now()) = date_trunc('week',ts); stream_plan: |- StreamMaterialize { columns: [ts, t1._row_id(hidden), $expr1(hidden)], stream_key: [t1._row_id, $expr1], pk_columns: [t1._row_id, $expr1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(t1._row_id, $expr1) } @@ -460,3 +460,34 @@ └─StreamShare { id: 2 } └─StreamFilter { predicate: IsNotNull(t.a) AND (((Not(IsNull(t.t)) AND Not((t.a < 1:Int32))) OR IsNull(t.t)) OR (t.a < 1:Int32)) } └─StreamTableScan { table: t, columns: [t.t, t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } +- name: Non-trivial now expression + sql: | + create table t (ts timestamp with time zone, a int); + select * from t where ts + interval '1 hour' > date_trunc('day', now()); + stream_plan: |- + StreamMaterialize { columns: [ts, a, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.ts, t.a, t._row_id] } + └─StreamDynamicFilter { predicate: ($expr1 > $expr2), output_watermarks: [$expr1], output: [t.ts, t.a, $expr1, t._row_id], cleaned_by_watermark: true } + ├─StreamProject { exprs: [t.ts, t.a, AddWithTimeZone(t.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.ts, t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamExchange { dist: Broadcast } + └─StreamProject { exprs: [DateTrunc('day':Varchar, now, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + └─StreamNow { output: [now] } +- name: Non-trivial now expression 2 + sql: | + create table t (ts timestamp with time zone, a int); + select * from t where ts + interval '1 hour' > date_trunc('day', ('2024-07-18 00:00:00+00:00'::timestamptz - ('2024-07-18 00:00:00+00:00'::timestamptz - now()))); + stream_plan: |- + StreamMaterialize { columns: [ts, a, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck } + └─StreamProject { exprs: [t.ts, t.a, t._row_id] } + └─StreamDynamicFilter { predicate: ($expr1 > $expr2), output: [t.ts, t.a, $expr1, t._row_id] } + ├─StreamProject { exprs: [t.ts, t.a, AddWithTimeZone(t.ts, '01:00:00':Interval, 'UTC':Varchar) as $expr1, t._row_id] } + │ └─StreamTableScan { table: t, columns: [t.ts, t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } + └─StreamExchange { dist: Broadcast } + └─StreamProject { exprs: [DateTrunc('day':Varchar, SubtractWithTimeZone('2024-07-18 00:00:00+00:00':Timestamptz, ('2024-07-18 00:00:00+00:00':Timestamptz - now), 'UTC':Varchar), 'UTC':Varchar) as $expr2] } + └─StreamNow { output: [now] } +- name: Non-monotonic now expression + sql: | + create table t (ts timestamp with time zone, a int); + select * from t where a > extract(hour from now()); + stream_error: Conditions containing now must be in the form of `input_expr cmp now_expr` or `now_expr cmp input_expr`, where `input_expr` references a column and contains no `now()`, and `now_expr` is a non-decreasing expression contains `now()`. diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 19ffbaed92e28..444740c9400fc 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -768,45 +768,36 @@ impl ExprImpl { } } - /// Accepts expressions of the form `input_expr cmp now() [+- const_expr]` or - /// `now() [+- const_expr] cmp input_expr`, where `input_expr` contains an - /// `InputRef` and contains no `now()`. + /// Accepts expressions of the form `input_expr cmp now_expr` or `now_expr cmp input_expr`, + /// where `input_expr` contains an `InputRef` and contains no `now()`, and `now_expr` + /// contains a `now()` but no `InputRef`. /// /// Canonicalizes to the first ordering and returns `(input_expr, cmp, now_expr)` pub fn as_now_comparison_cond(&self) -> Option<(ExprImpl, ExprType, ExprImpl)> { if let ExprImpl::FunctionCall(function_call) = self { match function_call.func_type() { - ty @ (ExprType::LessThan + ty @ (ExprType::Equal + | ExprType::LessThan | ExprType::LessThanOrEqual | ExprType::GreaterThan | ExprType::GreaterThanOrEqual) => { let (_, op1, op2) = function_call.clone().decompose_as_binary(); - if op1.count_nows() == 0 + if !op1.has_now() && op1.has_input_ref() - && op2.count_nows() > 0 - && op2.is_now_offset() + && op2.has_now() + && !op2.has_input_ref() { Some((op1, ty, op2)) - } else if op2.count_nows() == 0 + } else if op1.has_now() + && !op1.has_input_ref() + && !op2.has_now() && op2.has_input_ref() - && op1.count_nows() > 0 - && op1.is_now_offset() { Some((op2, Self::reverse_comparison(ty), op1)) } else { None } } - ty @ ExprType::Equal => { - let (_, op1, op2) = function_call.clone().decompose_as_binary(); - if op1.count_nows() == 0 && op1.has_input_ref() && op2.count_nows() > 0 { - Some((op1, ty, op2)) - } else if op2.count_nows() == 0 && op2.has_input_ref() && op1.count_nows() > 0 { - Some((op2, Self::reverse_comparison(ty), op1)) - } else { - None - } - } _ => None, } } else { @@ -862,23 +853,6 @@ impl ExprImpl { } } - /// Checks if expr is of the form `now() [+- const_expr]` - fn is_now_offset(&self) -> bool { - if let ExprImpl::Now(_) = self { - true - } else if let ExprImpl::FunctionCall(f) = self { - match f.func_type() { - ExprType::Add | ExprType::Subtract => { - let (_, lhs, rhs) = f.clone().decompose_as_binary(); - lhs.is_now_offset() && rhs.is_const() - } - _ => false, - } - } else { - false - } - } - /// Returns the `InputRef` and offset of a predicate if it matches /// the form `InputRef [+- const_expr]`, else returns None. fn as_input_offset(&self) -> Option<(usize, Option<(ExprType, ExprImpl)>)> { diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index 04cc2cb12a689..25062ee0eebc7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -197,24 +197,11 @@ impl ToStream for LogicalFilter { let new_input = self.input().to_stream(ctx)?; let predicate = self.predicate(); - let has_now = predicate - .conjunctions - .iter() - .any(|cond| cond.count_nows() > 0); - if has_now { - if predicate - .conjunctions - .iter() - .any(|expr| expr.count_nows() > 0 && expr.as_now_comparison_cond().is_none()) - { - bail!( - "Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or \ - `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column \ - and contains no `now()`." - ); - } + if predicate.conjunctions.iter().any(|cond| cond.has_now()) { bail!( - "All `now()` exprs were valid, but the condition must have at least one now expr as a lower bound." + "Conditions containing now must be in the form of `input_expr cmp now_expr` or \ + `now_expr cmp input_expr`, where `input_expr` references a column and contains \ + no `now()`, and `now_expr` is a non-decreasing expression contains `now()`." ); } let mut new_logical = self.core.clone();