diff --git a/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml index 5022b7fb9bb6..8df9d78869f0 100644 --- a/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/input/temporal_filter.yaml @@ -23,6 +23,13 @@ expected_outputs: - stream_plan - stream_dist_plan +- name: Temporal filter with equal condition + sql: |- + create table t1 (ts timestamp with time zone); + select * from t1 where date_trunc('week', now()) = date_trunc('week',ts); + expected_outputs: + - stream_plan + - stream_dist_plan - name: Temporal filter with `now()` in upper bound on append only table sql: |- create table t1 (ts timestamp with time zone) APPEND ONLY; diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml index 3bfdc5cdc559..a8799aba8a4f 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml @@ -90,6 +90,75 @@ ├── distribution key: [ 1 ] └── read pk prefix len hint: 1 +- name: Temporal filter with equal condition + sql: |- + create table t1 (ts timestamp with time zone); + select * from t1 where date_trunc('week', now()) = date_trunc('week',ts); + stream_plan: |- + StreamMaterialize { columns: [ts, t1._row_id(hidden), $expr1(hidden)], stream_key: [t1._row_id, $expr1], pk_columns: [t1._row_id, $expr1], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(t1._row_id, $expr1) } + └─StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr2, output: [t1.ts, t1._row_id, $expr1] } + ├─StreamExchange { dist: HashShard($expr1) } + │ └─StreamProject { exprs: [t1.ts, DateTrunc('week':Varchar, t1.ts, 'UTC':Varchar) as $expr1, t1._row_id] } + │ └─StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + └─StreamExchange { dist: HashShard($expr2) } + └─StreamProject { exprs: [DateTrunc('week':Varchar, now, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + └─StreamNow { output: [now] } + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [ts, t1._row_id(hidden), $expr1(hidden)], stream_key: [t1._row_id, $expr1], pk_columns: [t1._row_id, $expr1], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamExchange Hash([1, 2]) from 1 + + Fragment 1 + StreamHashJoin { type: LeftSemi, predicate: $expr1 = $expr2, output: [t1.ts, t1._row_id, $expr1] } + ├── left table: 0 + ├── right table: 2 + ├── left degree table: 1 + ├── right degree table: 3 + ├── StreamExchange Hash([1]) from 2 + └── StreamExchange Hash([0]) from 3 + + Fragment 2 + StreamProject { exprs: [t1.ts, DateTrunc('week':Varchar, t1.ts, 'UTC':Varchar) as $expr1, t1._row_id] } + └── StreamTableScan { table: t1, columns: [t1.ts, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [DateTrunc('week':Varchar, now, 'UTC':Varchar) as $expr2], output_watermarks: [$expr2] } + └── StreamNow { output: [now] } { state table: 5 } + + Table 0 + ├── columns: [ t1_ts, $expr1, t1__row_id ] + ├── primary key: [ $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 1 ] + └── read pk prefix len hint: 1 + + Table 1 { columns: [ $expr1, t1__row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + + Table 2 { columns: [ $expr2 ], primary key: [ $0 ASC ], value indices: [ 0 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + + Table 3 { columns: [ $expr2, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 } + + Table 4 + ├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 5 { columns: [ now ], primary key: [], value indices: [ 0 ], distribution key: [], read pk prefix len hint: 0 } + + Table 4294967294 + ├── columns: [ ts, t1._row_id, $expr1 ] + ├── primary key: [ $1 ASC, $2 ASC ] + ├── value indices: [ 0, 1, 2 ] + ├── distribution key: [ 1, 2 ] + └── read pk prefix len hint: 2 + - name: Temporal filter with `now()` in upper bound on append only table sql: |- create table t1 (ts timestamp with time zone) APPEND ONLY; diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index ecf256fe78de..6f2de8d5dc64 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -723,6 +723,16 @@ impl ExprImpl { None } } + ty @ ExprType::Equal => { + let (_, op1, op2) = function_call.clone().decompose_as_binary(); + if op1.count_nows() == 0 && op1.has_input_ref() && op2.count_nows() > 0 { + Some((op1, ty, op2)) + } else if op2.count_nows() == 0 && op2.has_input_ref() && op1.count_nows() > 0 { + Some((op2, Self::reverse_comparison(ty), op1)) + } else { + None + } + } _ => None, } } else { diff --git a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs index 0a0e42ccf159..567234a169e2 100644 --- a/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs +++ b/src/frontend/src/optimizer/rule/stream/filter_with_now_to_join_rule.rs @@ -44,14 +44,12 @@ impl Rule for FilterWithNowToJoinRule { if let Some((input_expr, cmp, now_expr)) = expr.as_now_comparison_cond() { let now_expr = rewriter.rewrite_expr(now_expr); - // as a sanity check, ensure that this expression will derive a watermark - // on the output of the now executor - debug_assert_eq!( - try_derive_watermark(&now_expr), - WatermarkDerivation::Watermark(lhs_len) - ); - - now_filters.push(FunctionCall::new(cmp, vec![input_expr, now_expr]).unwrap()); + // ensure that this expression will derive a watermark + if try_derive_watermark(&now_expr) != WatermarkDerivation::Watermark(lhs_len) { + remainder.push(expr.clone()); + } else { + now_filters.push(FunctionCall::new(cmp, vec![input_expr, now_expr]).unwrap()); + } } else { remainder.push(expr.clone()); }