Skip to content

Commit

Permalink
feat(frontend): Merge LogicalUnion all with LogicalValues inputs …
Browse files Browse the repository at this point in the history
…into `LogicalValues` (#7923)

Merge `LogicalUnion` all with `LogicalValues` inputs into `LogicalValues`

Depends on: #7912

Approved-By: xxchan
Approved-By: st1page

Co-Authored-By: jon-chuang <[email protected]>
Co-Authored-By: jon-chuang <[email protected]>
  • Loading branch information
jon-chuang and jon-chuang authored Feb 20, 2023
1 parent 40da822 commit 1c94d22
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 36 deletions.
7 changes: 2 additions & 5 deletions src/frontend/planner_test/tests/testdata/project.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
select 1 as k, 2 as v;
batch_plan: |
BatchValues { rows: [[1:Int32, 2:Int32]] }
- name: Project over union all of nested project over union all, now, and simple agg
sql: |
create table t(v int);
select 'abc', 1, 1.4 as k from (select 1 as k, 2 from (select 1, 2 union all select 3, 4) union all select * from (select 3, 4) union all select 100, 200 from (select now(), now() - interval '1 hour') union all select count(*)::int, sum(v)::int from t);
- sql: select 'abc', 1, 1.4 as k from (select 1 as k, 2 from (select 1, 2 union all select 3, 4) union all select * from (select 3, 4) union all select 100, 200 from (select now(), now() - interval '1 hour'));
batch_plan: |
BatchValues { rows: [['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal]] }
BatchValues { rows: [['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal], ['abc':Varchar, 1:Int32, 1.4:Decimal]] }
38 changes: 7 additions & 31 deletions src/frontend/planner_test/tests/testdata/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,46 +206,22 @@
- sql: |
select 1 union all select 1
optimized_logical_plan: |
LogicalUnion { all: true }
├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
└─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
LogicalValues { rows: [[1:Int32], [1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
batch_plan: |
BatchUnion { all: true }
├─BatchValues { rows: [[1:Int32]] }
└─BatchValues { rows: [[1:Int32]] }
BatchValues { rows: [[1:Int32], [1:Int32]] }
- sql: |
select 1 union all select 2 union all select 3 union all select 4 union all select 5
optimized_logical_plan: |
LogicalUnion { all: true }
├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
├─LogicalValues { rows: [[2:Int32]], schema: Schema { fields: [2:Int32:Int32] } }
├─LogicalValues { rows: [[3:Int32]], schema: Schema { fields: [3:Int32:Int32] } }
├─LogicalValues { rows: [[4:Int32]], schema: Schema { fields: [4:Int32:Int32] } }
└─LogicalValues { rows: [[5:Int32]], schema: Schema { fields: [5:Int32:Int32] } }
LogicalValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
batch_plan: |
BatchUnion { all: true }
├─BatchValues { rows: [[1:Int32]] }
├─BatchValues { rows: [[2:Int32]] }
├─BatchValues { rows: [[3:Int32]] }
├─BatchValues { rows: [[4:Int32]] }
└─BatchValues { rows: [[5:Int32]] }
BatchValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32]] }
- sql: |
select 1 union select 2 union select 3 union select 4 union select 5
select 1 union select 2 union select 3 union select 4 union select 5 union select 5
optimized_logical_plan: |
LogicalAgg { group_key: [1:Int32], aggs: [] }
└─LogicalUnion { all: true }
├─LogicalValues { rows: [[1:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
├─LogicalValues { rows: [[2:Int32]], schema: Schema { fields: [2:Int32:Int32] } }
├─LogicalValues { rows: [[3:Int32]], schema: Schema { fields: [3:Int32:Int32] } }
├─LogicalValues { rows: [[4:Int32]], schema: Schema { fields: [4:Int32:Int32] } }
└─LogicalValues { rows: [[5:Int32]], schema: Schema { fields: [5:Int32:Int32] } }
└─LogicalValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32], [5:Int32]], schema: Schema { fields: [1:Int32:Int32] } }
batch_plan: |
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [1:Int32], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(1:Int32) }
└─BatchUnion { all: true }
├─BatchValues { rows: [[1:Int32]] }
├─BatchValues { rows: [[2:Int32]] }
├─BatchValues { rows: [[3:Int32]] }
├─BatchValues { rows: [[4:Int32]] }
└─BatchValues { rows: [[5:Int32]] }
└─BatchValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32], [5:Int32]] }
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ impl PlanRoot {
ProjectMergeRule::create(),
ProjectEliminateRule::create(),
TrivialProjectToValuesRule::create(),
UnionInputValuesMergeRule::create(),
// project-join merge should be applied after merge
// eliminate and to values
ProjectJoinMergeRule::create(),
Expand All @@ -424,6 +425,7 @@ impl PlanRoot {
ProjectMergeRule::create(),
ProjectEliminateRule::create(),
TrivialProjectToValuesRule::create(),
UnionInputValuesMergeRule::create(),
],
ApplyOrder::TopDown,
);
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/rule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ mod stream;
pub use stream::filter_with_now_to_join_rule::*;
mod trivial_project_to_values_rule;
pub use trivial_project_to_values_rule::*;
mod union_input_values_merge_rule;
pub use union_input_values_merge_rule::*;

#[macro_export]
macro_rules! for_all_rules {
Expand Down Expand Up @@ -121,6 +123,7 @@ macro_rules! for_all_rules {
,{FilterWithNowToJoinRule}
,{TopNOnIndexRule}
,{TrivialProjectToValuesRule}
,{UnionInputValuesMergeRule}
}
};
}
Expand Down
40 changes: 40 additions & 0 deletions src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::{BoxedRule, Rule};
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::{PlanRef, PlanTreeNode};

pub struct UnionInputValuesMergeRule {}
impl Rule for UnionInputValuesMergeRule {
fn apply(&self, plan: PlanRef) -> Option<PlanRef> {
let union = plan.as_logical_union()?;
// !union.all() is already handled by [`UnionToDistinctRule`]
if !union.all() {
return None;
}

let mut rows = vec![];
for v in union.inputs() {
rows.extend_from_slice(v.as_logical_values()?.rows());
}
Some(LogicalValues::new(rows, union.schema().clone(), union.ctx()).into())
}
}

impl UnionInputValuesMergeRule {
pub fn create() -> BoxedRule {
Box::new(UnionInputValuesMergeRule {})
}
}

0 comments on commit 1c94d22

Please sign in to comment.