From d5c5008b6d1dcf8b91c3190177088a9e30528c8a Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Thu, 2 Nov 2023 19:32:05 +0800 Subject: [PATCH] fix(optimizer): impl ExprRewritable for StreamValues (#13217) --- e2e_test/streaming/values.slt | 11 +++++++++++ .../tests/testdata/output/array.yaml | 4 ++-- .../tests/testdata/output/basic_query.yaml | 2 +- .../tests/testdata/output/expr.yaml | 2 +- .../tests/testdata/output/with_ordinality.yaml | 2 +- .../src/optimizer/plan_node/stream_values.rs | 17 ++++++++++++++++- 6 files changed, 32 insertions(+), 6 deletions(-) diff --git a/e2e_test/streaming/values.slt b/e2e_test/streaming/values.slt index 9f4f6e0a36261..2cc5c701a4cec 100644 --- a/e2e_test/streaming/values.slt +++ b/e2e_test/streaming/values.slt @@ -75,3 +75,14 @@ NULL statement ok drop materialized view mv; + +statement ok +create materialized view mv as select '2020-01-01'::timestamptz;; + +query I +select * from mv; +---- +2020-01-01 00:00:00+00:00 + +statement ok +drop materialized view mv; diff --git a/src/frontend/planner_test/tests/testdata/output/array.yaml b/src/frontend/planner_test/tests/testdata/output/array.yaml index be957047c6d1a..6631e9a7510b1 100644 --- a/src/frontend/planner_test/tests/testdata/output/array.yaml +++ b/src/frontend/planner_test/tests/testdata/output/array.yaml @@ -56,7 +56,7 @@ batch_plan: 'BatchValues { rows: [[ARRAY[]:List(Timestamptz)]] }' stream_plan: |- StreamMaterialize { columns: [array, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } - └─StreamValues { rows: [[Array, 0:Int64]] } + └─StreamValues { rows: [[ARRAY[]:List(Timestamptz), 0:Int64]] } - sql: | select ARRAY[]::STRUCT[]; logical_plan: |- @@ -65,7 +65,7 @@ batch_plan: 'BatchValues { rows: [[ARRAY[]:List(Struct(StructType { field_names: ["f1"], field_types: [Int32] }))]] }' stream_plan: |- StreamMaterialize { columns: [array, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } - └─StreamValues { rows: [[Array, 0:Int64]] } + └─StreamValues { rows: [[ARRAY[]:List(Struct(StructType { field_names: ["f1"], field_types: [Int32] })), 0:Int64]] } - sql: | select array_cat(array[66], array[123]); logical_plan: |- diff --git a/src/frontend/planner_test/tests/testdata/output/basic_query.yaml b/src/frontend/planner_test/tests/testdata/output/basic_query.yaml index ce6724dc91c37..ddddfdfbc8602 100644 --- a/src/frontend/planner_test/tests/testdata/output/basic_query.yaml +++ b/src/frontend/planner_test/tests/testdata/output/basic_query.yaml @@ -3,7 +3,7 @@ batch_plan: 'BatchValues { rows: [[11:Int32, 22:Int32], [36:Int32, 44:Int32]] }' stream_plan: |- StreamMaterialize { columns: [*VALUES*_0.column_0, *VALUES*_0.column_1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } - └─StreamValues { rows: [[11:Int32, 22:Int32, 0:Int64], [(33:Int32 + (1:Int32 + 2:Int32)), 44:Int32, 1:Int64]] } + └─StreamValues { rows: [[11:Int32, 22:Int32, 0:Int64], [36:Int32, 44:Int32, 1:Int64]] } - sql: select * from t binder_error: 'Catalog error: table or source not found: t' - sql: | diff --git a/src/frontend/planner_test/tests/testdata/output/expr.yaml b/src/frontend/planner_test/tests/testdata/output/expr.yaml index 020c6a3548ef5..f16eae977a33c 100644 --- a/src/frontend/planner_test/tests/testdata/output/expr.yaml +++ b/src/frontend/planner_test/tests/testdata/output/expr.yaml @@ -600,7 +600,7 @@ batch_plan: 'BatchValues { rows: [[1:Int32]] }' stream_plan: |- StreamMaterialize { columns: [abs, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } - └─StreamValues { rows: [[Abs(-1:Int32), 0:Int64]] } + └─StreamValues { rows: [[1:Int32, 0:Int64]] } - sql: | select * from range(1,2); batch_plan: |- diff --git a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml index e614b0c20a36b..5a56058765fbb 100644 --- a/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml +++ b/src/frontend/planner_test/tests/testdata/output/with_ordinality.yaml @@ -190,7 +190,7 @@ batch_plan: 'BatchValues { rows: [[1:Int32, 1:Int64]] }' stream_plan: |- StreamMaterialize { columns: [abs, ordinality, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck } - └─StreamValues { rows: [[Abs(1:Int32), 1:Int64, 0:Int64]] } + └─StreamValues { rows: [[1:Int32, 1:Int64, 0:Int64]] } - sql: | create table t(x int , arr int[]); select * from t, abs(x) WITH ORDINALITY; diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index 3ae19a3112c5e..51cd507db260d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -88,4 +88,19 @@ impl StreamNode for StreamValues { } } -impl ExprRewritable for StreamValues {} +impl ExprRewritable for StreamValues { + fn has_rewritable_expr(&self) -> bool { + true + } + + fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> crate::PlanRef { + Self::new( + self.logical + .rewrite_exprs(r) + .as_logical_values() + .unwrap() + .clone(), + ) + .into() + } +}