diff --git a/e2e_test/source/basic/kafka_batch.slt b/e2e_test/source/basic/kafka_batch.slt index 8d8d454c7c977..a81acf49518b5 100644 --- a/e2e_test/source/basic/kafka_batch.slt +++ b/e2e_test/source/basic/kafka_batch.slt @@ -92,6 +92,25 @@ select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00' 3 333 4 4444 +query IT rowsort +select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00' +---- +1 1 +2 22 +3 333 +4 4444 + +query IT rowsort +select * from s1 where _rw_kafka_timestamp > TO_TIMESTAMP('1977-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US') +---- +1 1 +2 22 +3 333 +4 4444 + +statement error expected format +select * from s1 where _rw_kafka_timestamp > 'abc' + query IT select * from s1 where _rw_kafka_timestamp > '2045-01-01 0:00:00+00:00' ---- @@ -201,4 +220,4 @@ statement ok drop table s8 statement ok -drop source s9 \ No newline at end of file +drop source s9 diff --git a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml index f54f6a837343f..e2f7dc9bde0dd 100644 --- a/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/cse_expr.yaml @@ -84,5 +84,5 @@ sql: | with t(v, arr) as (select 1, array[2, 3]) select v < all(arr), v < some(arr) from t; batch_plan: |- - BatchProject { exprs: [All((1:Int32 < $expr10060)) as $expr1, Some((1:Int32 < $expr10060)) as $expr2] } + BatchProject { exprs: [All((1:Int32 < $expr10064)) as $expr1, Some((1:Int32 < $expr10064)) as $expr2] } └─BatchValues { rows: [[1:Int32, ARRAY[2, 3]:List(Int32)]] } diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index 157736e53b21d..1662624eea519 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -70,7 +70,7 @@ "stages": { "0": { "root": { - "plan_node_id": 10036, + "plan_node_id": 10038, "plan_node_type": "BatchValues", "schema": [ { diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index 9492491c08aaa..3effdc191eab0 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -450,7 +450,7 @@ └─LogicalProject { exprs: [Array(1:Int32) as $expr1] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } batch_plan: |- - BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] } + BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10042, ARRAY[2]:List(Int32)))) as $expr1] } └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } ├─BatchValues { rows: [[]] } └─BatchValues { rows: [[ARRAY[1]:List(Int32)]] } @@ -473,7 +473,7 @@ └─LogicalProject { exprs: [Array(1:Int32) as $expr1] } └─LogicalValues { rows: [[]], schema: Schema { fields: [] } } batch_plan: |- - BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] } + BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10042, ARRAY[2]:List(Int32)))) as $expr1] } └─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all } ├─BatchValues { rows: [[]] } └─BatchValues { rows: [[ARRAY[1]:List(Int32)]] } @@ -497,7 +497,7 @@ select * from t where v1 >= now() - INTERVAL '2' SECOND; logical_plan: |- LogicalProject { exprs: [t.v1] } - └─LogicalFilter { predicate: (t.v1 >= (Now - '00:00:02':Interval)) } + └─LogicalFilter { predicate: (t.v1 >= SubtractWithTimeZone(Now, '00:00:02':Interval, 'UTC':Varchar)) } └─LogicalScan { table: t, columns: [t.v1, t._row_id] } stream_plan: |- StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v1] } diff --git a/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml b/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml index ae37459ef7bed..9424cce324607 100644 --- a/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml +++ b/src/frontend/planner_test/tests/testdata/output/predicate_pushdown.yaml @@ -256,8 +256,8 @@ select * from t1 cross join t2 where v1 = v2 and v1 > now() + '1 hr'; optimized_logical_plan_for_batch: |- LogicalJoin { type: Inner, on: (t1.v1 = t2.v2), output: all } - ├─LogicalScan { table: t1, columns: [t1.v1], predicate: (t1.v1 > ('2021-04-01 00:00:00+00:00':Timestamptz + '01:00:00':Interval)) } - └─LogicalScan { table: t2, columns: [t2.v2], predicate: (t2.v2 > ('2021-04-01 00:00:00+00:00':Timestamptz + '01:00:00':Interval)) } + ├─LogicalScan { table: t1, columns: [t1.v1], predicate: (t1.v1 > AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, '01:00:00':Interval, 'UTC':Varchar)) } + └─LogicalScan { table: t2, columns: [t2.v2], predicate: (t2.v2 > AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, '01:00:00':Interval, 'UTC':Varchar)) } stream_plan: |- StreamMaterialize { columns: [v1, v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, v1], pk_columns: [t1._row_id, t2._row_id, v1], pk_conflict: NoCheck } └─StreamHashJoin { type: Inner, predicate: t1.v1 = t2.v2, output: [t1.v1, t2.v2, t1._row_id, t2._row_id] } @@ -277,7 +277,7 @@ optimized_logical_plan_for_batch: |- LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) AND (t1.v1 > $expr1), output: [t1.v1, t2.v2, t2.v3] } ├─LogicalScan { table: t1, columns: [t1.v1] } - └─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] } + └─LogicalProject { exprs: [t2.v2, t2.v3, AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, t2.v3, 'UTC':Varchar) as $expr1] } └─LogicalScan { table: t2, columns: [t2.v2, t2.v3] } stream_error: 'internal error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.' - name: now() in complex cmp expr pushed onto join ON clause results in dynamic filter @@ -288,14 +288,14 @@ optimized_logical_plan_for_batch: |- LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) AND (t1.v1 > $expr1), output: [t1.v1, t2.v2, t2.v3] } ├─LogicalScan { table: t1, columns: [t1.v1] } - └─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] } + └─LogicalProject { exprs: [t2.v2, t2.v3, AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, t2.v3, 'UTC':Varchar) as $expr1] } └─LogicalScan { table: t2, columns: [t2.v2, t2.v3] } stream_error: 'internal error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.' - name: now() does not get pushed to scan, but others do sql: | create table t1(v1 timestamp with time zone, v2 int); select * from t1 where v1 > now() + '30 min' and v2 > 5; - optimized_logical_plan_for_batch: 'LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v1 > (''2021-04-01 00:00:00+00:00'':Timestamptz + ''00:30:00'':Interval)) AND (t1.v2 > 5:Int32) }' + optimized_logical_plan_for_batch: 'LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v1 > AddWithTimeZone(''2021-04-01 00:00:00+00:00'':Timestamptz, ''00:30:00'':Interval, ''UTC'':Varchar)) AND (t1.v2 > 5:Int32) }' stream_plan: |- StreamMaterialize { columns: [v1, v2, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output_watermarks: [t1.v1], output: [t1.v1, t1.v2, t1._row_id], cleaned_by_watermark: true } diff --git a/src/frontend/planner_test/tests/testdata/output/types.yaml b/src/frontend/planner_test/tests/testdata/output/types.yaml index 4f4cff47b8c11..dbc7d1e20a191 100644 --- a/src/frontend/planner_test/tests/testdata/output/types.yaml +++ b/src/frontend/planner_test/tests/testdata/output/types.yaml @@ -15,7 +15,7 @@ - sql: values ('1'::float(53)); logical_plan: 'LogicalValues { rows: [[1:Float64]], schema: Schema { fields: [*VALUES*_0.column_0:Float64] } }' - sql: values (''::timestamp with time zone); - logical_plan: 'LogicalValues { rows: [['''':Varchar::Timestamptz]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamptz] } }' + logical_plan: 'LogicalValues { rows: [[CastWithTimeZone('''':Varchar, ''UTC'':Varchar)]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamptz] } }' - sql: values (''::time with time zone); binder_error: |- Bind error: failed to bind expression: CAST('' AS TIME WITH TIME ZONE) diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml index d1916a33192c6..b5abee5d22608 100644 --- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml @@ -4,7 +4,7 @@ create source t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (connector = 'kinesis') FORMAT PLAIN ENCODE JSON; select t.v1 - INTERVAL '2' SECOND as v1 from t; logical_plan: |- - LogicalProject { exprs: [(v1 - '00:00:02':Interval) as $expr1] } + LogicalProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1] } └─LogicalSource { source: t, columns: [v1, _row_id], time_range: (Unbounded, Unbounded) } stream_plan: |- StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] } @@ -103,7 +103,7 @@ select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 left outer join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND)); logical_plan: |- LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] } - └─LogicalJoin { type: LeftOuter, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all } + └─LogicalJoin { type: LeftOuter, on: (t1.v1 = t2.v1) AND (t1.ts >= AddWithTimeZone(t2.ts, '00:00:01':Interval, 'UTC':Varchar)) AND (t2.ts >= AddWithTimeZone(t1.ts, '00:00:01':Interval, 'UTC':Varchar)), output: all } ├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] } └─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] } stream_plan: |- @@ -122,7 +122,7 @@ select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND)); logical_plan: |- LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] } - └─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all } + └─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= AddWithTimeZone(t2.ts, '00:00:01':Interval, 'UTC':Varchar)) AND (t2.ts >= AddWithTimeZone(t1.ts, '00:00:01':Interval, 'UTC':Varchar)), output: all } ├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] } └─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] } stream_plan: |- diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index 0af30ccb364f8..756ef33c3ed98 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -789,11 +789,16 @@ impl ExprImpl { if let ExprImpl::Now(_) = self { true } else if let ExprImpl::FunctionCall(f) = self { + // TODO: `now() + interval '1' month` shall not be accepted as const offset match f.func_type() { ExprType::Add | ExprType::Subtract => { let (_, lhs, rhs) = f.clone().decompose_as_binary(); lhs.is_now_offset() && rhs.is_const() } + ExprType::AddWithTimeZone | ExprType::SubtractWithTimeZone => { + let args = f.inputs(); + args[0].is_now_offset() && args[1].is_const() + } _ => false, } } else { diff --git a/src/frontend/src/expr/session_timezone.rs b/src/frontend/src/expr/session_timezone.rs index 545eb1427f99b..0630dc38504b0 100644 --- a/src/frontend/src/expr/session_timezone.rs +++ b/src/frontend/src/expr/session_timezone.rs @@ -21,6 +21,8 @@ use crate::expr::{Expr, ExprImpl}; /// `SessionTimezone` will be used to resolve session /// timezone-dependent casts, comparisons or arithmetic. +/// +/// This rewrite is idempotent as required by [`crate::optimizer::plan_node::ExprRewritable`]. pub struct SessionTimezone { timezone: String, /// Whether or not the session timezone was used @@ -242,6 +244,8 @@ impl SessionTimezone { new_inputs.push(ExprImpl::literal_varchar(self.timezone())); Some(FunctionCall::new(func_type, new_inputs).unwrap().into()) } + // When adding new rules, make sure to match arity and types of all args + // so that the rewrite is idempotent. _ => None, } } diff --git a/src/frontend/src/expr/utils.rs b/src/frontend/src/expr/utils.rs index d07287b08dbe2..88c29c143a6f0 100644 --- a/src/frontend/src/expr/utils.rs +++ b/src/frontend/src/expr/utils.rs @@ -535,10 +535,10 @@ impl WatermarkAnalyzer { ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_utf8()), _ => return WatermarkDerivation::None, }; - let interval = match &func_call.inputs()[1] { - ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_interval()), - _ => return WatermarkDerivation::None, + let Some(Ok(interval)) = &func_call.inputs()[1].try_fold_const() else { + return WatermarkDerivation::None; }; + let interval = interval.as_ref().map(|s| s.as_interval()); // null zone or null interval is treated same as const `interval '1' second`, to be // consistent with other match arms. let zone_without_dst = time_zone.map_or(true, |s| s.eq_ignore_ascii_case("UTC")); diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index aaab8ebde3dcd..fcb3957120145 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -97,6 +97,10 @@ impl PlanRoot { assert_eq!(input_schema.fields().len(), out_fields.len()); assert_eq!(out_fields.count_ones(..), out_names.len()); + // Inline session timezone at the very beginning so that (const) exprs can be evaluated. + let ctx = plan.ctx(); + let plan = inline_session_timezone_in_exprs(ctx.clone(), plan); + Self { plan, required_dist, @@ -177,8 +181,6 @@ impl PlanRoot { } let ctx = plan.ctx(); - // Inline session timezone mainly for rewriting now() - plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?; // Convert to physical plan node plan = plan.to_batch_with_order_required(&self.required_order)?; @@ -193,8 +195,8 @@ impl PlanRoot { ApplyOrder::BottomUp, )); - // Inline session timezone - plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?; + // Inline session timezone near the end so that exprs inserted by optimizer are handled. + plan = inline_session_timezone_in_exprs(ctx.clone(), plan); if ctx.is_explain_trace() { ctx.trace("Inline Session Timezone:"); @@ -314,8 +316,8 @@ impl PlanRoot { )); } - // Inline session timezone - plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?; + // Inline session timezone near the end so that exprs inserted by optimizer are handled. + plan = inline_session_timezone_in_exprs(ctx.clone(), plan); if ctx.is_explain_trace() { ctx.trace("Inline session timezone:"); @@ -488,7 +490,8 @@ impl PlanRoot { RequiredDist::ShardByKey(bitset) }; - let stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?; + // Inline session timezone near the end so that exprs inserted by optimizer are handled. + let stream_plan = inline_session_timezone_in_exprs(context, stream_plan); StreamMaterialize::create_for_table( stream_plan, @@ -593,9 +596,8 @@ fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> { Ok(plan) } -fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> { - let plan = plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()); - Ok(plan) +fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> PlanRef { + plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()) } fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 4ac006887ef55..d2dbcf4787eb0 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -369,8 +369,7 @@ fn expr_to_kafka_timestamp_range( match &expr { ExprImpl::FunctionCall(function_call) => { - if let Some((timestampz_literal, reverse)) = extract_timestampz_literal(&expr).unwrap() - { + if let Ok(Some((timestampz_literal, reverse))) = extract_timestampz_literal(&expr) { match function_call.func_type() { ExprType::GreaterThan => { if reverse {