Skip to content

Commit

Permalink
feat(optimizer): support PullUpCorrelatedPredicateAggRule (#15026)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored and yezizp2012 committed Feb 7, 2024
1 parent 67ae88a commit abc633e
Show file tree
Hide file tree
Showing 7 changed files with 716 additions and 699 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,30 @@
AS max_sale_customer;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: Inner, predicate: salesperson.id IS NOT DISTINCT FROM salesperson.id AND all_sales.amount = max(all_sales.amount), output: [salesperson.name, max(all_sales.amount), all_sales.customer_name] }
├─BatchHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id, output: [salesperson.id, salesperson.name, all_sales.customer_name, all_sales.amount] }
│ ├─BatchExchange { order: [], dist: HashShard(salesperson.id) }
│ │ └─BatchScan { table: salesperson, columns: [salesperson.id, salesperson.name], distribution: SomeShard }
│ └─BatchExchange { order: [], dist: HashShard(all_sales.salesperson_id) }
│ └─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount], distribution: SomeShard }
└─BatchHashAgg { group_key: [salesperson.id], aggs: [max(all_sales.amount)] }
└─BatchHashJoin { type: LeftOuter, predicate: salesperson.id IS NOT DISTINCT FROM all_sales.salesperson_id, output: [salesperson.id, all_sales.amount] }
├─BatchHashAgg { group_key: [salesperson.id], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(salesperson.id) }
│ └─BatchScan { table: salesperson, columns: [salesperson.id], distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(all_sales.salesperson_id) }
└─BatchFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.amount], distribution: SomeShard }
└─BatchHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id AND max(all_sales.amount) = all_sales.amount, output: [salesperson.name, max(all_sales.amount), all_sales.customer_name] }
├─BatchExchange { order: [], dist: HashShard(salesperson.id, max(all_sales.amount)) }
│ └─BatchHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id, output: [salesperson.id, salesperson.name, max(all_sales.amount)] }
│ ├─BatchExchange { order: [], dist: HashShard(salesperson.id) }
│ │ └─BatchScan { table: salesperson, columns: [salesperson.id, salesperson.name], distribution: SomeShard }
│ └─BatchHashAgg { group_key: [all_sales.salesperson_id], aggs: [max(all_sales.amount)] }
│ └─BatchExchange { order: [], dist: HashShard(all_sales.salesperson_id) }
│ └─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.amount], distribution: SomeShard }
└─BatchExchange { order: [], dist: HashShard(all_sales.salesperson_id, all_sales.amount) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), all_sales._row_id(hidden), salesperson.id(hidden), all_sales.amount(hidden), salesperson.id#1(hidden)], stream_key: [salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount], pk_columns: [salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount) }
└─StreamHashJoin { type: Inner, predicate: salesperson.id IS NOT DISTINCT FROM salesperson.id AND all_sales.amount = max(all_sales.amount), output: [salesperson.name, max(all_sales.amount), all_sales.customer_name, salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount, salesperson.id] }
├─StreamHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id, output: [salesperson.id, salesperson.name, all_sales.customer_name, all_sales.amount, salesperson._row_id, all_sales._row_id] }
│ ├─StreamExchange { dist: HashShard(salesperson.id) }
│ │ └─StreamTableScan { table: salesperson, columns: [salesperson.id, salesperson.name, salesperson._row_id], pk: [salesperson._row_id], dist: UpstreamHashShard(salesperson._row_id) }
│ └─StreamExchange { dist: HashShard(all_sales.salesperson_id) }
│ └─StreamTableScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount, all_sales._row_id], pk: [all_sales._row_id], dist: UpstreamHashShard(all_sales._row_id) }
└─StreamProject { exprs: [salesperson.id, max(all_sales.amount)] }
└─StreamHashAgg { group_key: [salesperson.id], aggs: [max(all_sales.amount), count] }
└─StreamHashJoin { type: LeftOuter, predicate: salesperson.id IS NOT DISTINCT FROM all_sales.salesperson_id, output: [salesperson.id, all_sales.amount, all_sales._row_id] }
├─StreamProject { exprs: [salesperson.id] }
│ └─StreamHashAgg { group_key: [salesperson.id], aggs: [count] }
│ └─StreamExchange { dist: HashShard(salesperson.id) }
│ └─StreamTableScan { table: salesperson, columns: [salesperson.id, salesperson._row_id], pk: [salesperson._row_id], dist: UpstreamHashShard(salesperson._row_id) }
└─StreamExchange { dist: HashShard(all_sales.salesperson_id) }
└─StreamFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─StreamTableScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.amount, all_sales._row_id], pk: [all_sales._row_id], dist: UpstreamHashShard(all_sales._row_id) }
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales._row_id(hidden)], stream_key: [salesperson._row_id, salesperson.id, all_sales._row_id, amount], pk_columns: [salesperson._row_id, salesperson.id, all_sales._row_id, amount], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(max(all_sales.amount), salesperson._row_id, salesperson.id, all_sales._row_id) }
└─StreamHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id AND max(all_sales.amount) = all_sales.amount, output: [salesperson.name, max(all_sales.amount), all_sales.customer_name, salesperson._row_id, salesperson.id, all_sales._row_id] }
├─StreamExchange { dist: HashShard(salesperson.id, max(all_sales.amount)) }
│ └─StreamHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id, output: [salesperson.id, salesperson.name, max(all_sales.amount), salesperson._row_id, all_sales.salesperson_id] }
│ ├─StreamExchange { dist: HashShard(salesperson.id) }
│ │ └─StreamTableScan { table: salesperson, columns: [salesperson.id, salesperson.name, salesperson._row_id], pk: [salesperson._row_id], dist: UpstreamHashShard(salesperson._row_id) }
│ └─StreamProject { exprs: [all_sales.salesperson_id, max(all_sales.amount)] }
│ └─StreamHashAgg { group_key: [all_sales.salesperson_id], aggs: [max(all_sales.amount), count] }
│ └─StreamExchange { dist: HashShard(all_sales.salesperson_id) }
│ └─StreamTableScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.amount, all_sales._row_id], pk: [all_sales._row_id], dist: UpstreamHashShard(all_sales._row_id) }
└─StreamExchange { dist: HashShard(all_sales.salesperson_id, all_sales.amount) }
└─StreamTableScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount, all_sales._row_id], pk: [all_sales._row_id], dist: UpstreamHashShard(all_sales._row_id) }
- name: lateral join 2
sql: |
create table all_sales (salesperson_id int, customer_name varchar, amount int );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@
└─LogicalFilter { predicate: (CorrelatedInputRef { index: 1, correlated_id: 1 } = t2.y) AND (t2.y = 1000:Int32) }
└─LogicalScan { table: t2, columns: [t2.x, t2.y, t2._row_id] }
optimized_logical_plan_for_batch: |-
LogicalJoin { type: Inner, on: IsNotDistinctFrom(t1.y, t1.y) AND ($expr1 > $expr2), output: [t1.x, t1.y] }
LogicalJoin { type: Inner, on: (t1.y = t2.y) AND ($expr1 > $expr2), output: [t1.x, t1.y] }
├─LogicalProject { exprs: [t1.x, t1.y, t1.x::Decimal as $expr1] }
│ └─LogicalScan { table: t1, columns: [t1.x, t1.y] }
└─LogicalProject { exprs: [t1.y, (1.5:Decimal * min(t2.x)::Decimal) as $expr2] }
└─LogicalAgg { group_key: [t1.y], aggs: [min(t2.x)] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t1.y, t2.y), output: [t1.y, t2.x] }
├─LogicalAgg { group_key: [t1.y], aggs: [] }
│ └─LogicalScan { table: t1, columns: [t1.y] }
└─LogicalProject { exprs: [t2.y, t2.x] }
└─LogicalScan { table: t2, columns: [t2.x, t2.y], predicate: IsNotNull(t2.y) AND (t2.y = 1000:Int32) }
└─LogicalProject { exprs: [(1.5:Decimal * min(t2.x)::Decimal) as $expr2, t2.y] }
└─LogicalAgg { group_key: [t2.y], aggs: [min(t2.x)] }
└─LogicalScan { table: t2, columns: [t2.x, t2.y], predicate: (t2.y = 1000:Int32) }
- sql: |
create table t1(x int, y int);
create table t2(x int, y int);
Expand Down Expand Up @@ -95,16 +91,12 @@
└─LogicalFilter { predicate: (CorrelatedInputRef { index: 1, correlated_id: 1 } = t2.y) }
└─LogicalScan { table: t2, columns: [t2.x, t2.y, t2._row_id] }
optimized_logical_plan_for_batch: |-
LogicalJoin { type: Inner, on: IsNotDistinctFrom(t1.y, t1.y) AND ($expr1 > $expr2), output: [t1.x, t1.y] }
LogicalJoin { type: Inner, on: (t1.y = t2.y) AND ($expr1 > $expr2), output: [t1.x, t1.y] }
├─LogicalProject { exprs: [t1.x, t1.y, t1.x::Decimal as $expr1] }
│ └─LogicalScan { table: t1, columns: [t1.x, t1.y] }
└─LogicalProject { exprs: [t1.y, (1.5:Decimal * min(t2.x)::Decimal) as $expr2] }
└─LogicalAgg { group_key: [t1.y], aggs: [min(t2.x)] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(t1.y, t2.y), output: [t1.y, t2.x] }
├─LogicalAgg { group_key: [t1.y], aggs: [] }
│ └─LogicalScan { table: t1, columns: [t1.y] }
└─LogicalProject { exprs: [t2.y, t2.x] }
└─LogicalScan { table: t2, columns: [t2.x, t2.y], predicate: IsNotNull(t2.y) }
└─LogicalProject { exprs: [(1.5:Decimal * min(t2.x)::Decimal) as $expr2, t2.y] }
└─LogicalAgg { group_key: [t2.y], aggs: [min(t2.x)] }
└─LogicalScan { table: t2, columns: [t2.x, t2.y] }
- sql: |
create table t1(x int, y int);
create table t2(x int, y int);
Expand Down Expand Up @@ -518,14 +510,11 @@
select count(*) from a where a.y = (select string_agg(x, ',' order by x) from b where b.z = a.z);
optimized_logical_plan_for_batch: |-
LogicalAgg { aggs: [count] }
└─LogicalJoin { type: Inner, on: IsNotDistinctFrom(a.z, a.z) AND (a.y = string_agg(b.x, ',':Varchar order_by(b.x ASC))), output: [] }
└─LogicalJoin { type: Inner, on: (a.z = b.z) AND (a.y = string_agg(b.x, ',':Varchar order_by(b.x ASC))), output: [] }
├─LogicalScan { table: a, columns: [a.y, a.z] }
└─LogicalAgg { group_key: [a.z], aggs: [string_agg(b.x, ',':Varchar order_by(b.x ASC))] }
└─LogicalJoin { type: LeftOuter, on: IsNotDistinctFrom(a.z, b.z), output: [a.z, b.x, ',':Varchar] }
├─LogicalAgg { group_key: [a.z], aggs: [] }
│ └─LogicalScan { table: a, columns: [a.z] }
└─LogicalProject { exprs: [b.z, b.x, ',':Varchar] }
└─LogicalScan { table: b, columns: [b.x, b.z], predicate: IsNotNull(b.z) }
└─LogicalAgg { group_key: [b.z], aggs: [string_agg(b.x, ',':Varchar order_by(b.x ASC))] }
└─LogicalProject { exprs: [b.x, ',':Varchar, b.z] }
└─LogicalScan { table: b, columns: [b.x, b.z] }
- sql: |
create table a(x int, y int, z int);
create table b(x int, y int, z int);
Expand Down Expand Up @@ -978,35 +967,23 @@
select Array(select c from t2 where b = d) arr from t1;
batch_plan: |-
BatchExchange { order: [], dist: Single }
└─BatchHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t1.b, output: [array_agg(t2.c)] }
└─BatchHashJoin { type: LeftOuter, predicate: t1.b = t2.d, output: [array_agg(t2.c)] }
├─BatchExchange { order: [], dist: HashShard(t1.b) }
│ └─BatchScan { table: t1, columns: [t1.b], distribution: SomeShard }
└─BatchHashAgg { group_key: [t1.b], aggs: [array_agg(t2.c)] }
└─BatchHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t2.d, output: [t1.b, t2.c] }
├─BatchHashAgg { group_key: [t1.b], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(t1.b) }
│ └─BatchScan { table: t1, columns: [t1.b], distribution: SomeShard }
└─BatchProject { exprs: [array_agg(t2.c), t2.d] }
└─BatchHashAgg { group_key: [t2.d], aggs: [array_agg(t2.c)] }
└─BatchExchange { order: [], dist: HashShard(t2.d) }
└─BatchProject { exprs: [t2.d, t2.c] }
└─BatchFilter { predicate: IsNotNull(t2.d) }
└─BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
└─BatchScan { table: t2, columns: [t2.c, t2.d], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [arr, t1._row_id(hidden), t1.b(hidden), t1.b#1(hidden)], stream_key: [t1._row_id, t1.b], pk_columns: [t1._row_id, t1.b], pk_conflict: NoCheck }
StreamMaterialize { columns: [arr, t1._row_id(hidden), t1.b(hidden), t2.d(hidden)], stream_key: [t1._row_id, t1.b], pk_columns: [t1._row_id, t1.b], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(t1._row_id, t1.b) }
└─StreamHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t1.b, output: [array_agg(t2.c), t1._row_id, t1.b, t1.b] }
└─StreamHashJoin { type: LeftOuter, predicate: t1.b = t2.d, output: [array_agg(t2.c), t1._row_id, t1.b, t2.d] }
├─StreamExchange { dist: HashShard(t1.b) }
│ └─StreamTableScan { table: t1, columns: [t1.b, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamProject { exprs: [t1.b, array_agg(t2.c)] }
└─StreamHashAgg { group_key: [t1.b], aggs: [array_agg(t2.c), count] }
└─StreamHashJoin { type: LeftOuter, predicate: t1.b IS NOT DISTINCT FROM t2.d, output: [t1.b, t2.c, t2._row_id] }
├─StreamProject { exprs: [t1.b] }
│ └─StreamHashAgg { group_key: [t1.b], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t1.b) }
│ └─StreamTableScan { table: t1, columns: [t1.b, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.d) }
└─StreamProject { exprs: [t2.d, t2.c, t2._row_id] }
└─StreamFilter { predicate: IsNotNull(t2.d) }
└─StreamTableScan { table: t2, columns: [t2.c, t2.d, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamProject { exprs: [array_agg(t2.c), t2.d] }
└─StreamHashAgg { group_key: [t2.d], aggs: [array_agg(t2.c), count] }
└─StreamExchange { dist: HashShard(t2.d) }
└─StreamTableScan { table: t2, columns: [t2.c, t2.d, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: correlated array subquery \du
sql: |
SELECT r.rolname, r.rolsuper, r.rolinherit,
Expand All @@ -1025,29 +1002,16 @@
BatchExchange { order: [rw_users.name ASC], dist: Single }
└─BatchProject { exprs: [rw_users.name, rw_users.is_super, true:Boolean, rw_users.create_user, rw_users.create_db, rw_users.can_login, -1:Int32, null:Timestamptz, array_agg(rw_users.name), true:Boolean, true:Boolean] }
└─BatchSort { order: [rw_users.name ASC] }
└─BatchHashJoin { type: LeftOuter, predicate: rw_users.id IS NOT DISTINCT FROM rw_users.id, output: all }
└─BatchHashJoin { type: LeftOuter, predicate: rw_users.id = null:Int32, output: all }
├─BatchExchange { order: [], dist: HashShard(rw_users.id) }
│ └─BatchFilter { predicate: Not(RegexpEq(rw_users.name, '^pg_':Varchar)) }
│ └─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name, rw_users.is_super, rw_users.create_db, rw_users.create_user, rw_users.can_login], distribution: Single }
└─BatchHashAgg { group_key: [rw_users.id], aggs: [array_agg(rw_users.name)] }
└─BatchHashJoin { type: LeftOuter, predicate: rw_users.id IS NOT DISTINCT FROM rw_users.id, output: [rw_users.id, rw_users.name] }
├─BatchHashAgg { group_key: [rw_users.id], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(rw_users.id) }
│ └─BatchProject { exprs: [rw_users.id] }
│ └─BatchFilter { predicate: Not(RegexpEq(rw_users.name, '^pg_':Varchar)) }
│ └─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name], distribution: Single }
└─BatchExchange { order: [], dist: HashShard(rw_users.id) }
└─BatchHashJoin { type: Inner, predicate: null:Int32 = rw_users.id, output: [rw_users.id, rw_users.name] }
└─BatchProject { exprs: [array_agg(rw_users.name), null:Int32] }
└─BatchHashAgg { group_key: [null:Int32], aggs: [array_agg(rw_users.name)] }
└─BatchExchange { order: [], dist: HashShard(null:Int32) }
└─BatchHashJoin { type: Inner, predicate: null:Int32 = rw_users.id, output: [rw_users.name, null:Int32] }
├─BatchExchange { order: [], dist: HashShard(null:Int32) }
│ └─BatchProject { exprs: [rw_users.id, null:Int32] }
│ └─BatchNestedLoopJoin { type: Inner, predicate: true, output: all }
│ ├─BatchExchange { order: [], dist: Single }
│ │ └─BatchHashAgg { group_key: [rw_users.id], aggs: [] }
│ │ └─BatchExchange { order: [], dist: HashShard(rw_users.id) }
│ │ └─BatchProject { exprs: [rw_users.id] }
│ │ └─BatchFilter { predicate: (null:Int32 = rw_users.id) AND Not(RegexpEq(rw_users.name, '^pg_':Varchar)) }
│ │ └─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name], distribution: Single }
│ └─BatchValues { rows: [] }
│ └─BatchValues { rows: [] }
└─BatchExchange { order: [], dist: HashShard(rw_users.id) }
└─BatchScan { table: rw_users, columns: [rw_users.id, rw_users.name], distribution: Single }
- name: correlated array subquery (issue 14423)
Expand Down
Loading

0 comments on commit abc633e

Please sign in to comment.