Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimizer): optimize join stream key #12831

Merged
merged 3 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
└─StreamSimpleAgg { aggs: [sum0(count), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [count] }
└─StreamHashJoin { type: Inner, predicate: t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id, output: [t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id, t.id, t.id, t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id, t.id] }
└─StreamHashJoin { type: Inner, predicate: t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id, output: [t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id, t.id, t.id, t._row_id, t._row_id, t.id, t._row_id, t._row_id, t.id] }
├─StreamExchange { dist: HashShard(t.id) }
│ └─StreamHashJoin { type: Inner, predicate: t.id = t.id AND t.id = t.id AND t.id = t.id AND t.id = t.id, output: [t.id, t.id, t.id, t.id, t._row_id, t._row_id, t._row_id, t._row_id] }
│ ├─StreamExchange { dist: HashShard(t.id) }
Expand Down
410 changes: 203 additions & 207 deletions src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
/* should generate delta join plan, and stream index scan */
select * from a join b on a.a1 = b.b1 ;
stream_plan: |-
StreamMaterialize { columns: [a1, a2, b1, b2], stream_key: [a1, b1], pk_columns: [a1, b1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(a.a1, b.b1) }
StreamMaterialize { columns: [a1, a2, b1, b2], stream_key: [a1], pk_columns: [a1], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(a.a1) }
└─StreamDeltaJoin { type: Inner, predicate: a.a1 = b.b1, output: all }
├─StreamTableScan { table: a, columns: [a.a1, a.a2], pk: [a.a1], dist: UpstreamHashShard(a.a1) }
└─StreamTableScan { table: b, columns: [b.b1, b.b2], pk: [b.b1], dist: UpstreamHashShard(b.b1) }
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@
└─BatchExchange { order: [], dist: HashShard(a.k1) }
└─BatchScan { table: a, columns: [a.k1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, a.k1, ak1.k1], pk_columns: [ak1.a._row_id, a.k1, ak1.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, ak1.k1], pk_columns: [ak1.a._row_id, ak1.k1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
│ └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
Expand All @@ -971,9 +971,13 @@
└─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, a.k1, ak1.k1], pk_columns: [ak1.a._row_id, a.k1, ak1.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, ak1.k1], pk_columns: [ak1.a._row_id, ak1.k1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] } { left table: 0, right table: 2, left degree table: 1, right degree table: 3 }
└── StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamExchange Hash([0]) from 1
└── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 5, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 2
Expand Down Expand Up @@ -1014,7 +1018,12 @@
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 4294967294 { columns: [ v, bv, ak1.a._row_id, ak1.k1, a.k1 ], primary key: [ $2 ASC, $4 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3, 4 ], distribution key: [ 3 ], read pk prefix len hint: 3 }
Table 4294967294
├── columns: [ v, bv, ak1.a._row_id, ak1.k1, a.k1 ]
├── primary key: [ $2 ASC, $3 ASC ]
├── value indices: [ 0, 1, 2, 3, 4 ]
├── distribution key: [ 3 ]
└── read pk prefix len hint: 2

- id: aggk1_join_Ak1_onk1
before:
Expand Down Expand Up @@ -1146,7 +1155,7 @@
└─BatchExchange { order: [], dist: HashShard(b.k1) }
└─BatchScan { table: b, columns: [b.k1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1, b.k1], pk_columns: [a.k1, b.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1], pk_columns: [a.k1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├─StreamHashAgg { group_key: [a.k1], aggs: [count] }
│ └─StreamExchange { dist: HashShard(a.k1) }
Expand All @@ -1156,7 +1165,7 @@
└─StreamTableScan { table: b, columns: [b.k1, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1, b.k1], pk_columns: [a.k1, b.k1], pk_conflict: NoCheck }
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1], pk_columns: [a.k1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├── left table: 0
Expand All @@ -1178,15 +1187,40 @@
├── Upstream
└── BatchPlanNode

Table 0 { columns: [ a_k1, count ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 0
├── columns: [ a_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 1 { columns: [ a_k1, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 1
├── columns: [ a_k1, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 2 { columns: [ b_k1, count ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 2
├── columns: [ b_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 0, 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 3 { columns: [ b_k1, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 3
├── columns: [ b_k1, _degree ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 4 { columns: [ a_k1, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 4
├── columns: [ a_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 5
├── columns: [ vnode, _row_id, a_backfill_finished, a_row_count ]
Expand All @@ -1196,7 +1230,12 @@
├── read pk prefix len hint: 1
└── vnode column idx: 0

Table 6 { columns: [ b_k1, count ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 6
├── columns: [ b_k1, count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1

Table 7
├── columns: [ vnode, _row_id, b_backfill_finished, b_row_count ]
Expand All @@ -1208,10 +1247,10 @@

Table 4294967294
├── columns: [ num, bv, a.k1, b.k1 ]
├── primary key: [ $2 ASC, $3 ASC ]
├── primary key: [ $2 ASC ]
├── value indices: [ 0, 1, 2, 3 ]
├── distribution key: [ 2 ]
└── read pk prefix len hint: 2
└── read pk prefix len hint: 1

- sql: |
create table t1 (row_id int, uid int, v int, created_at timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
└─BatchFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.amount], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), all_sales._row_id(hidden), salesperson.id(hidden), all_sales.amount(hidden), salesperson.id#1(hidden)], stream_key: [salesperson._row_id, all_sales._row_id, salesperson.id, salesperson.id#1, all_sales.amount], pk_columns: [salesperson._row_id, all_sales._row_id, salesperson.id, salesperson.id#1, all_sales.amount], pk_conflict: NoCheck }
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), all_sales._row_id(hidden), salesperson.id(hidden), all_sales.amount(hidden), salesperson.id#1(hidden)], stream_key: [salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount], pk_columns: [salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: salesperson.id IS NOT DISTINCT FROM salesperson.id AND all_sales.amount = max(all_sales.amount), output: [salesperson.name, max(all_sales.amount), all_sales.customer_name, salesperson._row_id, all_sales._row_id, salesperson.id, all_sales.amount, salesperson.id] }
├─StreamHashJoin { type: Inner, predicate: salesperson.id = all_sales.salesperson_id, output: [salesperson.id, salesperson.name, all_sales.customer_name, all_sales.amount, salesperson._row_id, all_sales._row_id] }
│ ├─StreamExchange { dist: HashShard(salesperson.id) }
Expand Down Expand Up @@ -86,7 +86,7 @@
└─BatchFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_columns: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_conflict: NoCheck }
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, salesperson.id], pk_columns: [salesperson._row_id, salesperson.id], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: salesperson.id IS NOT DISTINCT FROM all_sales.salesperson_id, output: [salesperson.name, all_sales.amount, all_sales.customer_name, salesperson._row_id, salesperson.id, all_sales.salesperson_id] }
├─StreamExchange { dist: HashShard(salesperson.id) }
│ └─StreamTableScan { table: salesperson, columns: [salesperson.id, salesperson.name, salesperson._row_id], pk: [salesperson._row_id], dist: UpstreamHashShard(salesperson._row_id) }
Expand Down Expand Up @@ -123,7 +123,7 @@
└─BatchFilter { predicate: IsNotNull(all_sales.salesperson_id) }
└─BatchScan { table: all_sales, columns: [all_sales.salesperson_id, all_sales.customer_name, all_sales.amount], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_columns: [salesperson._row_id, all_sales.salesperson_id, salesperson.id], pk_conflict: NoCheck }
StreamMaterialize { columns: [name, amount, customer_name, salesperson._row_id(hidden), salesperson.id(hidden), all_sales.salesperson_id(hidden)], stream_key: [salesperson._row_id, salesperson.id], pk_columns: [salesperson._row_id, salesperson.id], pk_conflict: NoCheck }
└─StreamHashJoin { type: LeftOuter, predicate: salesperson.id IS NOT DISTINCT FROM all_sales.salesperson_id, output: [salesperson.name, all_sales.amount, all_sales.customer_name, salesperson._row_id, salesperson.id, all_sales.salesperson_id] }
├─StreamExchange { dist: HashShard(salesperson.id) }
│ └─StreamTableScan { table: salesperson, columns: [salesperson.id, salesperson.name, salesperson._row_id], pk: [salesperson._row_id], dist: UpstreamHashShard(salesperson._row_id) }
Expand Down Expand Up @@ -164,7 +164,7 @@
└─BatchExchange { order: [], dist: HashShard(t.arr) }
└─BatchScan { table: t, columns: [t.arr], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [x, arr, unnest, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, t.arr, projected_row_id, arr], pk_columns: [t._row_id, t.arr, projected_row_id, arr], pk_conflict: NoCheck }
StreamMaterialize { columns: [x, arr, unnest, t._row_id(hidden), t.arr(hidden), projected_row_id(hidden)], stream_key: [t._row_id, projected_row_id, arr], pk_columns: [t._row_id, projected_row_id, arr], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: t.arr IS NOT DISTINCT FROM t.arr, output: [t.x, t.arr, Unnest($0), t._row_id, t.arr, projected_row_id] }
├─StreamExchange { dist: HashShard(t.arr) }
│ └─StreamTableScan { table: t, columns: [t.x, t.arr, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
Expand Down
Loading
Loading