Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): support agg group by simplify rule #12349

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -923,3 +923,9 @@
SELECT count(DISTINCT i) FROM integers;
expected_outputs:
- batch_plan
- sql: |
CREATE TABLE t(id int primary key, a int, b int);
SELECT count(*) FROM t group by a, id, b;
expected_outputs:
- batch_plan
- stream_plan
41 changes: 27 additions & 14 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1395,21 +1395,21 @@
sq_1.col_2;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [max(max(lineitem.l_commitdate))] }
└─BatchHashAgg { group_key: [lineitem.l_commitdate], aggs: [max(max(lineitem.l_commitdate))] }
└─BatchExchange { order: [], dist: HashShard(lineitem.l_commitdate) }
└─BatchHashAgg { group_key: [lineitem.l_commitdate], aggs: [max(lineitem.l_commitdate)] }
└─BatchHashAgg { group_key: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], aggs: [] }
└─BatchScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], distribution: UpstreamHashShard(lineitem.l_orderkey) }
└─BatchProject { exprs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))))] }
└─BatchHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))))] }
└─BatchExchange { order: [], dist: HashShard(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))) }
└─BatchHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))] }
└─BatchSortAgg { group_key: [lineitem.l_orderkey], aggs: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))] }
└─BatchScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_commitdate], distribution: UpstreamHashShard(lineitem.l_orderkey) }
stream_plan: |-
StreamMaterialize { columns: [col_0, lineitem.l_commitdate(hidden)], stream_key: [lineitem.l_commitdate], pk_columns: [lineitem.l_commitdate], pk_conflict: NoCheck }
└─StreamProject { exprs: [max(max(lineitem.l_commitdate)), lineitem.l_commitdate] }
└─StreamHashAgg { group_key: [lineitem.l_commitdate], aggs: [max(max(lineitem.l_commitdate)), count] }
└─StreamExchange { dist: HashShard(lineitem.l_commitdate) }
└─StreamHashAgg { group_key: [lineitem.l_commitdate, $expr1], aggs: [max(lineitem.l_commitdate), count] }
└─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct, Vnode(lineitem.l_orderkey) as $expr1] }
└─StreamHashAgg { group_key: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], aggs: [count] }
└─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_tax, lineitem.l_commitdate, lineitem.l_shipinstruct], pk: [lineitem.l_orderkey], dist: UpstreamHashShard(lineitem.l_orderkey) }
StreamMaterialize { columns: [col_0, first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))(hidden)], stream_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], pk_columns: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], pk_conflict: NoCheck }
└─StreamProject { exprs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))), first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))] }
└─StreamHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))], aggs: [max(max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)))), count] }
└─StreamExchange { dist: HashShard(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))) }
└─StreamHashAgg { group_key: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), $expr1], aggs: [max(first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC))), count] }
└─StreamProject { exprs: [lineitem.l_orderkey, first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), Vnode(lineitem.l_orderkey) as $expr1] }
└─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [first_value(lineitem.l_commitdate order_by(lineitem.l_commitdate ASC)), count] }
└─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_commitdate], pk: [lineitem.l_orderkey], dist: UpstreamHashShard(lineitem.l_orderkey) }
- name: two phase agg on hop window input should use two phase agg
sql: |
SET QUERY_MODE TO DISTRIBUTED;
Expand Down Expand Up @@ -1636,3 +1636,16 @@
└─BatchHashAgg { group_key: [integers.i], aggs: [] }
└─BatchExchange { order: [], dist: HashShard(integers.i) }
└─BatchScan { table: integers, columns: [integers.i], distribution: SomeShard }
- sql: |
CREATE TABLE t(id int primary key, a int, b int);
SELECT count(*) FROM t group by a, id, b;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [count] }
└─BatchSortAgg { group_key: [t.id], aggs: [count] }
└─BatchScan { table: t, columns: [t.id], distribution: UpstreamHashShard(t.id) }
stream_plan: |-
StreamMaterialize { columns: [count, t.id(hidden)], stream_key: [t.id], pk_columns: [t.id], pk_conflict: NoCheck }
└─StreamProject { exprs: [count, t.id] }
└─StreamHashAgg { group_key: [t.id], aggs: [count] }
└─StreamTableScan { table: t, columns: [t.id], pk: [t.id], dist: UpstreamHashShard(t.id) }
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
select t2.c, t2.d, count(distinct t.a) from t join t2 on t.a = t2.c group by t2.c, t2.d;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [t2.c, t2.d], aggs: [count(t.a)] }
└─BatchExchange { order: [], dist: HashShard(t2.c, t2.d) }
└─BatchHashAgg { group_key: [t2.c, t2.d, t.a], aggs: [] }
└─BatchHashAgg { group_key: [first_value(t2.c order_by(t2.c ASC))], aggs: [first_value(first_value(t2.d order_by(t2.d ASC)) order_by(first_value(t2.d order_by(t2.d ASC)) ASC)), count(t.a)] }
└─BatchExchange { order: [], dist: HashShard(first_value(t2.c order_by(t2.c ASC))) }
└─BatchHashAgg { group_key: [t.a], aggs: [first_value(t2.c order_by(t2.c ASC)), first_value(t2.d order_by(t2.d ASC))] }
└─BatchLookupJoin { type: Inner, predicate: t.a = t2.c, output: [t2.c, t2.d, t.a] }
└─BatchExchange { order: [], dist: UpstreamHashShard(t.a) }
└─BatchScan { table: t, columns: [t.a], distribution: SomeShard }
Expand Down
96 changes: 66 additions & 30 deletions src/frontend/planner_test/tests/testdata/output/except.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,85 +108,121 @@
create table t2 (a int, b numeric, c bigint, primary key(a));
select * from t1 except select * from t2;
optimized_logical_plan_for_batch: |-
LogicalAgg { group_key: [t1.a, t1.b, t1.c], aggs: [] }
LogicalAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
└─LogicalJoin { type: LeftAnti, on: IsNotDistinctFrom(t1.a, t2.a) AND IsNotDistinctFrom(t1.b, t2.b) AND IsNotDistinctFrom(t1.c, t2.c), output: all }
├─LogicalScan { table: t1, columns: [t1.a, t1.b, t1.c] }
└─LogicalScan { table: t2, columns: [t2.a, t2.b, t2.c] }
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [] }
└─BatchHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
└─BatchLookupJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
└─BatchExchange { order: [], dist: UpstreamHashShard(t1.a) }
└─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: UpstreamHashShard(t1.a) }
stream_plan: |-
StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.a, t1.b, t1.c] }
└─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] }
└─StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) }
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) }
└─StreamExchange { dist: HashShard(t2.a, t2.b, t2.c) }
└─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) }
StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck }
└─StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
└─StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] }
└─StreamExchange { dist: HashShard(t1.a) }
└─StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) }
│ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) }
└─StreamExchange { dist: HashShard(t2.a, t2.b, t2.c) }
└─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [a, b, c], stream_key: [a, b, c], pk_columns: [a, b, c], pk_conflict: NoCheck } { materialized table: 4294967294 }
└── StreamProject { exprs: [t1.a, t1.b, t1.c] }
└── StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } { intermediate state table: 0, state tables: [], distinct tables: [] }
└── StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├── left table: 1
├── right table: 3
├── left degree table: 2
├── right degree table: 4
├── StreamExchange Hash([0, 1, 2]) from 1
└── StreamExchange Hash([0, 1, 2]) from 2
StreamMaterialize { columns: [a, b, c], stream_key: [a], pk_columns: [a], pk_conflict: NoCheck } { materialized table: 4294967294 }
└── StreamProject { exprs: [t1.a, first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC))] }
└── StreamHashAgg { group_key: [t1.a], aggs: [first_value(t1.b order_by(t1.b ASC)), first_value(t1.c order_by(t1.c ASC)), count] }
├── intermediate state table: 2
├── state tables: [ 0, 1 ]
├── distinct tables: []
└── StreamExchange Hash([0]) from 1

Fragment 1
Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 5 }
StreamHashJoin { type: LeftAnti, predicate: t1.a IS NOT DISTINCT FROM t2.a AND t1.b IS NOT DISTINCT FROM t2.b AND t1.c IS NOT DISTINCT FROM t2.c, output: all }
├── left table: 3
├── right table: 5
├── left degree table: 4
├── right degree table: 6
├── StreamExchange Hash([0, 1, 2]) from 2
└── StreamExchange Hash([0, 1, 2]) from 3

Fragment 2
Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 7 }
├── Upstream
└── BatchPlanNode

Fragment 2
Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 6 }
Fragment 3
Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.a], dist: UpstreamHashShard(t2.a) } { state table: 8 }
├── Upstream
└── BatchPlanNode

Table 0 { columns: [ t1_a, t1_b, t1_c, count ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }
Table 0
├── columns: [ t1_a, t1_b, t1_c ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 1 { columns: [ t1_a, t1_b, t1_c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }
Table 1
├── columns: [ t1_a, t1_c, t1_b ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 2
├── columns: [ t1_a, first_value(t1_b order_by(t1_b ASC)), first_value(t1_c order_by(t1_c ASC)), count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 3
├── columns: [ t1_a, t1_b, t1_c ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3

Table 4
├── columns: [ t1_a, t1_b, t1_c, _degree ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 3 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3

Table 3 { columns: [ t2_a, t2_b, t2_c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }
Table 5
├── columns: [ t2_a, t2_b, t2_c ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 0, 1, 2 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3

Table 4
Table 6
├── columns: [ t2_a, t2_b, t2_c, _degree ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
├── value indices: [ 3 ]
├── distribution key: [ 0, 1, 2 ]
└── read pk prefix len hint: 3

Table 5
Table 7
├── columns: [ vnode, a, t1_backfill_finished ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 6
Table 8
├── columns: [ vnode, a, t2_backfill_finished ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2 ]
├── distribution key: [ 0 ]
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 4294967294 { columns: [ a, b, c ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }
Table 4294967294 { columns: [ a, b, c ], primary key: [ $0 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }

- sql: |
create table t1 (a int, b numeric, c bigint);
Expand Down
Loading
Loading