diff --git a/src/frontend/planner_test/tests/testdata/project.yaml b/src/frontend/planner_test/tests/testdata/project.yaml index 6de27ff36553..9fe50fb7a38b 100644 --- a/src/frontend/planner_test/tests/testdata/project.yaml +++ b/src/frontend/planner_test/tests/testdata/project.yaml @@ -3,9 +3,6 @@ select 1 as k, 2 as v; batch_plan: | BatchValues { rows: [[1:Int32, 2:Int32]] } -- name: Project over union all of nested project over union all, now, and simple agg - sql: | - create table t(v int); - select 'abc', 1, 1.4 as k from (select 1 as k, 2 from (select 1, 2 union all select 3, 4) union all select * from (select 3, 4) union all select 100, 200 from (select now(), now() - interval '1 hour') union all select count(*)::int, sum(v)::int from t); +- sql: select 'abc', 1, 1.4 as k from (select 1 as k, 2 from (select 1, 2 union all select 3, 4) union all select * from (select 3, 4) union all select 100, 200 from (select now(), now() - interval '1 hour')); batch_plan: | - BatchValues { rows: [['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal]] } + BatchValues { rows: [['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal]] } diff --git a/src/frontend/planner_test/tests/testdata/union.yaml b/src/frontend/planner_test/tests/testdata/union.yaml index 1b221281526b..b904226c0f06 100644 --- a/src/frontend/planner_test/tests/testdata/union.yaml +++ b/src/frontend/planner_test/tests/testdata/union.yaml @@ -206,46 +206,22 @@ - sql: | select 1 union all select 1 optimized_logical_plan: | - LogicalUnion { all: true } - ├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } } - └─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } } + LogicalValues { rows: [[1:Int32], [1:Int32]], schema: Schema { fields: [1:Int32:Int32] } } batch_plan: | - BatchUnion { all: true } - ├─BatchValues { rows: [[1:Int32]] } - └─BatchValues { rows: [[1:Int32]] } + BatchValues { rows: [[1:Int32], [1:Int32]] } - sql: | select 1 union all select 2 union all select 3 union all select 4 union all select 5 optimized_logical_plan: | - LogicalUnion { all: true } - ├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } } - ├─LogicalValues { rows: [[2:Int32]], schema: Schema { fields: [2:Int32:Int32] } } - ├─LogicalValues { rows: [[3:Int32]], schema: Schema { fields: [3:Int32:Int32] } } - ├─LogicalValues { rows: [[4:Int32]], schema: Schema { fields: [4:Int32:Int32] } } - └─LogicalValues { rows: [[5:Int32]], schema: Schema { fields: [5:Int32:Int32] } } + LogicalValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32]], schema: Schema { fields: [1:Int32:Int32] } } batch_plan: | - BatchUnion { all: true } - ├─BatchValues { rows: [[1:Int32]] } - ├─BatchValues { rows: [[2:Int32]] } - ├─BatchValues { rows: [[3:Int32]] } - ├─BatchValues { rows: [[4:Int32]] } - └─BatchValues { rows: [[5:Int32]] } + BatchValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32]] } - sql: | - select 1 union select 2 union select 3 union select 4 union select 5 + select 1 union select 2 union select 3 union select 4 union select 5 union select 5 optimized_logical_plan: | LogicalAgg { group_key: [1:Int32], aggs: [] } - └─LogicalUnion { all: true } - ├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } } - ├─LogicalValues { rows: [[2:Int32]], schema: Schema { fields: [2:Int32:Int32] } } - ├─LogicalValues { rows: [[3:Int32]], schema: Schema { fields: [3:Int32:Int32] } } - ├─LogicalValues { rows: [[4:Int32]], schema: Schema { fields: [4:Int32:Int32] } } - └─LogicalValues { rows: [[5:Int32]], schema: Schema { fields: [5:Int32:Int32] } } + └─LogicalValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32], [5:Int32]], schema: Schema { fields: [1:Int32:Int32] } } batch_plan: | BatchExchange { order: [], dist: Single } └─BatchHashAgg { group_key: [1:Int32], aggs: [] } └─BatchExchange { order: [], dist: HashShard(1:Int32) } - └─BatchUnion { all: true } - ├─BatchValues { rows: [[1:Int32]] } - ├─BatchValues { rows: [[2:Int32]] } - ├─BatchValues { rows: [[3:Int32]] } - ├─BatchValues { rows: [[4:Int32]] } - └─BatchValues { rows: [[5:Int32]] } + └─BatchValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32], [5:Int32]] } diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index 181d82a7decb..caf076f64dfd 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -408,6 +408,7 @@ impl PlanRoot { ProjectMergeRule::create(), ProjectEliminateRule::create(), TrivialProjectToValuesRule::create(), + UnionInputValuesMergeRule::create(), // project-join merge should be applied after merge // eliminate and to values ProjectJoinMergeRule::create(), @@ -424,6 +425,7 @@ impl PlanRoot { ProjectMergeRule::create(), ProjectEliminateRule::create(), TrivialProjectToValuesRule::create(), + UnionInputValuesMergeRule::create(), ], ApplyOrder::TopDown, ); diff --git a/src/frontend/src/optimizer/rule/mod.rs b/src/frontend/src/optimizer/rule/mod.rs index de39488b5536..7b91e0af4b4a 100644 --- a/src/frontend/src/optimizer/rule/mod.rs +++ b/src/frontend/src/optimizer/rule/mod.rs @@ -87,6 +87,8 @@ mod stream; pub use stream::filter_with_now_to_join_rule::*; mod trivial_project_to_values_rule; pub use trivial_project_to_values_rule::*; +mod union_input_values_merge_rule; +pub use union_input_values_merge_rule::*; #[macro_export] macro_rules! for_all_rules { @@ -121,6 +123,7 @@ macro_rules! for_all_rules { ,{FilterWithNowToJoinRule} ,{TopNOnIndexRule} ,{TrivialProjectToValuesRule} + ,{UnionInputValuesMergeRule} } }; } diff --git a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs new file mode 100644 index 000000000000..8119b8847b60 --- /dev/null +++ b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs @@ -0,0 +1,40 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::LogicalValues; +use crate::optimizer::{PlanRef, PlanTreeNode}; + +pub struct UnionInputValuesMergeRule {} +impl Rule for UnionInputValuesMergeRule { + fn apply(&self, plan: PlanRef) -> Option { + let union = plan.as_logical_union()?; + // !union.all() is already handled by [`UnionToDistinctRule`] + if !union.all() { + return None; + } + + let mut rows = vec![]; + for v in union.inputs() { + rows.extend_from_slice(v.as_logical_values()?.rows()); + } + Some(LogicalValues::new(rows, union.schema().clone(), union.ctx()).into()) + } +} + +impl UnionInputValuesMergeRule { + pub fn create() -> BoxedRule { + Box::new(UnionInputValuesMergeRule {}) + } +}