Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(optimizer): allow fold_const on timezone-dependent exprs in logical plan #12633

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion e2e_test/source/basic/kafka_batch.slt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00+00:00'
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > '1977-01-01 00:00:00'
----
1 1
2 22
3 333
4 4444

query IT rowsort
select * from s1 where _rw_kafka_timestamp > TO_TIMESTAMP('1977-01-01 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')
----
1 1
2 22
3 333
4 4444

statement error expected format
select * from s1 where _rw_kafka_timestamp > 'abc'

query IT
select * from s1 where _rw_kafka_timestamp > '2045-01-01 0:00:00+00:00'
----
Expand Down Expand Up @@ -201,4 +220,4 @@ statement ok
drop table s8

statement ok
drop source s9
drop source s9
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@
sql: |
with t(v, arr) as (select 1, array[2, 3]) select v < all(arr), v < some(arr) from t;
batch_plan: |-
BatchProject { exprs: [All((1:Int32 < $expr10060)) as $expr1, Some((1:Int32 < $expr10060)) as $expr2] }
BatchProject { exprs: [All((1:Int32 < $expr10064)) as $expr1, Some((1:Int32 < $expr10064)) as $expr2] }
└─BatchValues { rows: [[1:Int32, ARRAY[2, 3]:List(Int32)]] }
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"stages": {
"0": {
"root": {
"plan_node_id": 10036,
"plan_node_id": 10038,
"plan_node_type": "BatchValues",
"schema": [
{
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |-
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [Some((1:Int32 < ArrayCat($expr10042, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand All @@ -473,7 +473,7 @@
└─LogicalProject { exprs: [Array(1:Int32) as $expr1] }
└─LogicalValues { rows: [[]], schema: Schema { fields: [] } }
batch_plan: |-
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10039, ARRAY[2]:List(Int32)))) as $expr1] }
BatchProject { exprs: [All((1:Int32 < ArrayCat($expr10042, ARRAY[2]:List(Int32)))) as $expr1] }
└─BatchNestedLoopJoin { type: LeftOuter, predicate: true, output: all }
├─BatchValues { rows: [[]] }
└─BatchValues { rows: [[ARRAY[1]:List(Int32)]] }
Expand All @@ -497,7 +497,7 @@
select * from t where v1 >= now() - INTERVAL '2' SECOND;
logical_plan: |-
LogicalProject { exprs: [t.v1] }
└─LogicalFilter { predicate: (t.v1 >= (Now - '00:00:02':Interval)) }
└─LogicalFilter { predicate: (t.v1 >= SubtractWithTimeZone(Now, '00:00:02':Interval, 'UTC':Varchar)) }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [v1, t._row_id(hidden)], stream_key: [t._row_id], pk_columns: [t._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@
select * from t1 cross join t2 where v1 = v2 and v1 > now() + '1 hr';
optimized_logical_plan_for_batch: |-
LogicalJoin { type: Inner, on: (t1.v1 = t2.v2), output: all }
├─LogicalScan { table: t1, columns: [t1.v1], predicate: (t1.v1 > ('2021-04-01 00:00:00+00:00':Timestamptz + '01:00:00':Interval)) }
└─LogicalScan { table: t2, columns: [t2.v2], predicate: (t2.v2 > ('2021-04-01 00:00:00+00:00':Timestamptz + '01:00:00':Interval)) }
├─LogicalScan { table: t1, columns: [t1.v1], predicate: (t1.v1 > AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, '01:00:00':Interval, 'UTC':Varchar)) }
└─LogicalScan { table: t2, columns: [t2.v2], predicate: (t2.v2 > AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, '01:00:00':Interval, 'UTC':Varchar)) }
stream_plan: |-
StreamMaterialize { columns: [v1, v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, v1], pk_columns: [t1._row_id, t2._row_id, v1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: t1.v1 = t2.v2, output: [t1.v1, t2.v2, t1._row_id, t2._row_id] }
Expand All @@ -277,7 +277,7 @@
optimized_logical_plan_for_batch: |-
LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) AND (t1.v1 > $expr1), output: [t1.v1, t2.v2, t2.v3] }
├─LogicalScan { table: t1, columns: [t1.v1] }
└─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] }
└─LogicalProject { exprs: [t2.v2, t2.v3, AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, t2.v3, 'UTC':Varchar) as $expr1] }
└─LogicalScan { table: t2, columns: [t2.v2, t2.v3] }
stream_error: 'internal error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.'
- name: now() in complex cmp expr pushed onto join ON clause results in dynamic filter
Expand All @@ -288,14 +288,14 @@
optimized_logical_plan_for_batch: |-
LogicalJoin { type: Inner, on: (t1.v1 = t2.v2) AND (t1.v1 > $expr1), output: [t1.v1, t2.v2, t2.v3] }
├─LogicalScan { table: t1, columns: [t1.v1] }
└─LogicalProject { exprs: [t2.v2, t2.v3, ('2021-04-01 00:00:00+00:00':Timestamptz + t2.v3) as $expr1] }
└─LogicalProject { exprs: [t2.v2, t2.v3, AddWithTimeZone('2021-04-01 00:00:00+00:00':Timestamptz, t2.v3, 'UTC':Varchar) as $expr1] }
└─LogicalScan { table: t2, columns: [t2.v2, t2.v3] }
stream_error: 'internal error: Conditions containing now must be of the form `input_expr cmp now() [+- const_expr]` or `now() [+- const_expr] cmp input_expr`, where `input_expr` references a column and contains no `now()`.'
- name: now() does not get pushed to scan, but others do
sql: |
create table t1(v1 timestamp with time zone, v2 int);
select * from t1 where v1 > now() + '30 min' and v2 > 5;
optimized_logical_plan_for_batch: 'LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v1 > (''2021-04-01 00:00:00+00:00'':Timestamptz + ''00:30:00'':Interval)) AND (t1.v2 > 5:Int32) }'
optimized_logical_plan_for_batch: 'LogicalScan { table: t1, columns: [t1.v1, t1.v2], predicate: (t1.v1 > AddWithTimeZone(''2021-04-01 00:00:00+00:00'':Timestamptz, ''00:30:00'':Interval, ''UTC'':Varchar)) AND (t1.v2 > 5:Int32) }'
stream_plan: |-
StreamMaterialize { columns: [v1, v2, t1._row_id(hidden)], stream_key: [t1._row_id], pk_columns: [t1._row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
└─StreamDynamicFilter { predicate: (t1.v1 > $expr1), output_watermarks: [t1.v1], output: [t1.v1, t1.v2, t1._row_id], cleaned_by_watermark: true }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/types.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- sql: values ('1'::float(53));
logical_plan: 'LogicalValues { rows: [[1:Float64]], schema: Schema { fields: [*VALUES*_0.column_0:Float64] } }'
- sql: values (''::timestamp with time zone);
logical_plan: 'LogicalValues { rows: [['''':Varchar::Timestamptz]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamptz] } }'
logical_plan: 'LogicalValues { rows: [[CastWithTimeZone('''':Varchar, ''UTC'':Varchar)]], schema: Schema { fields: [*VALUES*_0.column_0:Timestamptz] } }'
- sql: values (''::time with time zone);
binder_error: |-
Bind error: failed to bind expression: CAST('' AS TIME WITH TIME ZONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
create source t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (connector = 'kinesis') FORMAT PLAIN ENCODE JSON;
select t.v1 - INTERVAL '2' SECOND as v1 from t;
logical_plan: |-
LogicalProject { exprs: [(v1 - '00:00:02':Interval) as $expr1] }
LogicalProject { exprs: [SubtractWithTimeZone(v1, '00:00:02':Interval, 'UTC':Varchar) as $expr1] }
└─LogicalSource { source: t, columns: [v1, _row_id], time_range: (Unbounded, Unbounded) }
stream_plan: |-
StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck, watermark_columns: [v1] }
Expand Down Expand Up @@ -103,7 +103,7 @@
select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 left outer join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND));
logical_plan: |-
LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] }
└─LogicalJoin { type: LeftOuter, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all }
└─LogicalJoin { type: LeftOuter, on: (t1.v1 = t2.v1) AND (t1.ts >= AddWithTimeZone(t2.ts, '00:00:01':Interval, 'UTC':Varchar)) AND (t2.ts >= AddWithTimeZone(t1.ts, '00:00:01':Interval, 'UTC':Varchar)), output: all }
├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] }
└─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] }
stream_plan: |-
Expand All @@ -122,7 +122,7 @@
select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.ts as t2_ts, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1 join t2 on (t1.v1 = t2.v1 and (t1.ts >= t2.ts + INTERVAL '1' SECOND) and (t2.ts >= t1.ts + INTERVAL '1' SECOND));
logical_plan: |-
LogicalProject { exprs: [t1.ts, t1.v1, t1.v2, t2.ts, t2.v1, t2.v2] }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= (t2.ts + '00:00:01':Interval)) AND (t2.ts >= (t1.ts + '00:00:01':Interval)), output: all }
└─LogicalJoin { type: Inner, on: (t1.v1 = t2.v1) AND (t1.ts >= AddWithTimeZone(t2.ts, '00:00:01':Interval, 'UTC':Varchar)) AND (t2.ts >= AddWithTimeZone(t1.ts, '00:00:01':Interval, 'UTC':Varchar)), output: all }
├─LogicalScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id] }
└─LogicalScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id] }
stream_plan: |-
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,16 @@ impl ExprImpl {
if let ExprImpl::Now(_) = self {
true
} else if let ExprImpl::FunctionCall(f) = self {
// TODO: `now() + interval '1' month` shall not be accepted as const offset
match f.func_type() {
ExprType::Add | ExprType::Subtract => {
let (_, lhs, rhs) = f.clone().decompose_as_binary();
lhs.is_now_offset() && rhs.is_const()
}
ExprType::AddWithTimeZone | ExprType::SubtractWithTimeZone => {
let args = f.inputs();
args[0].is_now_offset() && args[1].is_const()
}
Comment on lines +798 to +801
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation is too weak. The following optimizer rule actually requires is_now_offset to be stricter than WatermarkAnalyzer. This was true before the PR but is broken.

if let Some((input_expr, cmp, now_expr)) = expr.as_now_comparison_cond() {
let now_expr = rewriter.rewrite_expr(now_expr);
// as a sanity check, ensure that this expression will derive a watermark
// on the output of the now executor
debug_assert_eq!(
try_derive_watermark(&now_expr),
WatermarkDerivation::Watermark(lhs_len)
);

For example, interval '1' month is allowed by is_now_offset but rejected by WatermarkAnalyzer. So the following query (inspired by previous temporal_filter.slt failure in ci) would fail the debug_assert above:

create table t1 (v1 timestamp);
create materialized view mv1 as select v1 from t1 where v1 between now() and now() + interval '1 month';

It is not hard to update implementation here to keep that invariant. But I prefer to fix the problem without touching too many different components.

_ => false,
}
} else {
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/expr/session_timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use crate::expr::{Expr, ExprImpl};

/// `SessionTimezone` will be used to resolve session
/// timezone-dependent casts, comparisons or arithmetic.
///
/// This rewrite is idempotent as required by [`crate::optimizer::plan_node::ExprRewritable`].
pub struct SessionTimezone {
timezone: String,
/// Whether or not the session timezone was used
Expand Down Expand Up @@ -242,6 +244,8 @@ impl SessionTimezone {
new_inputs.push(ExprImpl::literal_varchar(self.timezone()));
Some(FunctionCall::new(func_type, new_inputs).unwrap().into())
}
// When adding new rules, make sure to match arity and types of all args
// so that the rewrite is idempotent.
_ => None,
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/expr/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,10 @@ impl WatermarkAnalyzer {
ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_utf8()),
_ => return WatermarkDerivation::None,
};
let interval = match &func_call.inputs()[1] {
ExprImpl::Literal(lit) => lit.get_data().as_ref().map(|s| s.as_interval()),
_ => return WatermarkDerivation::None,
let Some(Ok(interval)) = &func_call.inputs()[1].try_fold_const() else {
return WatermarkDerivation::None;
};
let interval = interval.as_ref().map(|s| s.as_interval());
// null zone or null interval is treated same as const `interval '1' second`, to be
// consistent with other match arms.
let zone_without_dst = time_zone.map_or(true, |s| s.eq_ignore_ascii_case("UTC"));
Expand Down
22 changes: 12 additions & 10 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl PlanRoot {
assert_eq!(input_schema.fields().len(), out_fields.len());
assert_eq!(out_fields.count_ones(..), out_names.len());

// Inline session timezone at the very beginning so that (const) exprs can be evaluated.
let ctx = plan.ctx();
let plan = inline_session_timezone_in_exprs(ctx.clone(), plan);

Self {
plan,
required_dist,
Expand Down Expand Up @@ -177,8 +181,6 @@ impl PlanRoot {
}

let ctx = plan.ctx();
// Inline session timezone mainly for rewriting now()
plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;

// Convert to physical plan node
plan = plan.to_batch_with_order_required(&self.required_order)?;
Expand All @@ -193,8 +195,8 @@ impl PlanRoot {
ApplyOrder::BottomUp,
));

// Inline session timezone
plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
// Inline session timezone near the end so that exprs inserted by optimizer are handled.
plan = inline_session_timezone_in_exprs(ctx.clone(), plan);

if ctx.is_explain_trace() {
ctx.trace("Inline Session Timezone:");
Expand Down Expand Up @@ -314,8 +316,8 @@ impl PlanRoot {
));
}

// Inline session timezone
plan = inline_session_timezone_in_exprs(ctx.clone(), plan)?;
// Inline session timezone near the end so that exprs inserted by optimizer are handled.
plan = inline_session_timezone_in_exprs(ctx.clone(), plan);

if ctx.is_explain_trace() {
ctx.trace("Inline session timezone:");
Expand Down Expand Up @@ -488,7 +490,8 @@ impl PlanRoot {
RequiredDist::ShardByKey(bitset)
};

let stream_plan = inline_session_timezone_in_exprs(context, stream_plan)?;
// Inline session timezone near the end so that exprs inserted by optimizer are handled.
let stream_plan = inline_session_timezone_in_exprs(context, stream_plan);

StreamMaterialize::create_for_table(
stream_plan,
Expand Down Expand Up @@ -593,9 +596,8 @@ fn const_eval_exprs(plan: PlanRef) -> Result<PlanRef> {
Ok(plan)
}

fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result<PlanRef> {
let plan = plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut());
Ok(plan)
fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> PlanRef {
plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut())
}

fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool {
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/optimizer/plan_node/logical_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ fn expr_to_kafka_timestamp_range(

match &expr {
ExprImpl::FunctionCall(function_call) => {
if let Some((timestampz_literal, reverse)) = extract_timestampz_literal(&expr).unwrap()
{
if let Ok(Some((timestampz_literal, reverse))) = extract_timestampz_literal(&expr) {
Copy link
Contributor Author

@xiangjinwu xiangjinwu Oct 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another bug fixed: _rw_kafka_timestamp >= 'aa'::timestamptz shall not unwrap()

match function_call.func_type() {
ExprType::GreaterThan => {
if reverse {
Expand Down
Loading