Skip to content

Commit

Permalink
fix(optimizer): impl ExprRewritable for StreamValues (#13217)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Nov 2, 2023
1 parent 2ce7c9f commit d5c5008
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 6 deletions.
11 changes: 11 additions & 0 deletions e2e_test/streaming/values.slt
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,14 @@ NULL

statement ok
drop materialized view mv;

statement ok
create materialized view mv as select '2020-01-01'::timestamptz;;

query I
select * from mv;
----
2020-01-01 00:00:00+00:00

statement ok
drop materialized view mv;
4 changes: 2 additions & 2 deletions src/frontend/planner_test/tests/testdata/output/array.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
batch_plan: 'BatchValues { rows: [[ARRAY[]:List(Timestamptz)]] }'
stream_plan: |-
StreamMaterialize { columns: [array, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamValues { rows: [[Array, 0:Int64]] }
└─StreamValues { rows: [[ARRAY[]:List(Timestamptz), 0:Int64]] }
- sql: |
select ARRAY[]::STRUCT<f1 INT>[];
logical_plan: |-
Expand All @@ -65,7 +65,7 @@
batch_plan: 'BatchValues { rows: [[ARRAY[]:List(Struct(StructType { field_names: ["f1"], field_types: [Int32] }))]] }'
stream_plan: |-
StreamMaterialize { columns: [array, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamValues { rows: [[Array, 0:Int64]] }
└─StreamValues { rows: [[ARRAY[]:List(Struct(StructType { field_names: ["f1"], field_types: [Int32] })), 0:Int64]] }
- sql: |
select array_cat(array[66], array[123]);
logical_plan: |-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
batch_plan: 'BatchValues { rows: [[11:Int32, 22:Int32], [36:Int32, 44:Int32]] }'
stream_plan: |-
StreamMaterialize { columns: [*VALUES*_0.column_0, *VALUES*_0.column_1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamValues { rows: [[11:Int32, 22:Int32, 0:Int64], [(33:Int32 + (1:Int32 + 2:Int32)), 44:Int32, 1:Int64]] }
└─StreamValues { rows: [[11:Int32, 22:Int32, 0:Int64], [36:Int32, 44:Int32, 1:Int64]] }
- sql: select * from t
binder_error: 'Catalog error: table or source not found: t'
- sql: |
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/planner_test/tests/testdata/output/expr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@
batch_plan: 'BatchValues { rows: [[1:Int32]] }'
stream_plan: |-
StreamMaterialize { columns: [abs, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamValues { rows: [[Abs(-1:Int32), 0:Int64]] }
└─StreamValues { rows: [[1:Int32, 0:Int64]] }
- sql: |
select * from range(1,2);
batch_plan: |-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@
batch_plan: 'BatchValues { rows: [[1:Int32, 1:Int64]] }'
stream_plan: |-
StreamMaterialize { columns: [abs, ordinality, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
└─StreamValues { rows: [[Abs(1:Int32), 1:Int64, 0:Int64]] }
└─StreamValues { rows: [[1:Int32, 1:Int64, 0:Int64]] }
- sql: |
create table t(x int , arr int[]);
select * from t, abs(x) WITH ORDINALITY;
Expand Down
17 changes: 16 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,19 @@ impl StreamNode for StreamValues {
}
}

impl ExprRewritable for StreamValues {}
impl ExprRewritable for StreamValues {
fn has_rewritable_expr(&self) -> bool {
true
}

fn rewrite_exprs(&self, r: &mut dyn crate::expr::ExprRewriter) -> crate::PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_values()
.unwrap()
.clone(),
)
.into()
}
}

0 comments on commit d5c5008

Please sign in to comment.