Skip to content

Commit

Permalink
feat(optimizer): change stream join mv distribution key (#13022)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Oct 24, 2023
1 parent 447e142 commit e818508
Show file tree
Hide file tree
Showing 32 changed files with 1,882 additions and 1,659 deletions.
11 changes: 6 additions & 5 deletions src/frontend/planner_test/tests/testdata/output/append_only.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
select t1.v1 as id, v2, v3 from t1 join t2 on t1.v1=t2.v1;
stream_plan: |-
StreamMaterialize { columns: [id, v2, v3, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, id], pk_columns: [t1._row_id, t2._row_id, id], pk_conflict: NoCheck }
└─StreamHashJoin [append_only] { type: Inner, predicate: t1.v1 = t2.v1, output: [t1.v1, t1.v2, t2.v3, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.v1) }
└─StreamTableScan { table: t2, columns: [t2.v1, t2.v3, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamExchange { dist: HashShard(t1.v1, t1._row_id, t2._row_id) }
└─StreamHashJoin [append_only] { type: Inner, predicate: t1.v1 = t2.v1, output: [t1.v1, t1.v2, t2.v3, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.v1) }
└─StreamTableScan { table: t2, columns: [t2.v1, t2.v3, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- sql: |
create table t1 (v1 int, v2 int) append only;
select v1 from t1 order by v1 limit 3 offset 3;
Expand Down
13 changes: 7 additions & 6 deletions src/frontend/planner_test/tests/testdata/output/basic_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@
└─BatchValues { rows: [] }
stream_plan: |-
StreamMaterialize { columns: [v, t._row_id(hidden), t._row_id#1(hidden)], stream_key: [t._row_id, t._row_id#1, v], pk_columns: [t._row_id, t._row_id#1, v], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: t.v = t.v, output: [t.v, t._row_id, t._row_id] }
├─StreamExchange { dist: HashShard(t.v) }
│ └─StreamTableScan { table: t, columns: [t.v, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(t.v) }
└─StreamFilter { predicate: false:Boolean }
└─StreamTableScan { table: t, columns: [t.v, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(t.v, t._row_id, t._row_id) }
└─StreamHashJoin { type: Inner, predicate: t.v = t.v, output: [t.v, t._row_id, t._row_id] }
├─StreamExchange { dist: HashShard(t.v) }
│ └─StreamTableScan { table: t, columns: [t.v, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(t.v) }
└─StreamFilter { predicate: false:Boolean }
└─StreamTableScan { table: t, columns: [t.v, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
302 changes: 157 additions & 145 deletions src/frontend/planner_test/tests/testdata/output/ch_benchmark.yaml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
└─LogicalScan { table: t1, columns: [t1.v1, t1.v2, t1._row_id] }
stream_plan: |-
StreamMaterialize { columns: [v3, v4, v1, t2._row_id(hidden), t1._row_id(hidden)], stream_key: [t2._row_id, t1._row_id, v3], pk_columns: [t2._row_id, t1._row_id, v3], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: t2.v3 = t1.v1, output: [t2.v3, t2.v4, t1.v1, t2._row_id, t1._row_id] }
├─StreamExchange { dist: HashShard(t2.v3) }
│ └─StreamTableScan { table: t2, columns: [t2.v3, t2.v4, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamExchange { dist: HashShard(t1.v1) }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.v3, t2._row_id, t1._row_id) }
└─StreamHashJoin { type: Inner, predicate: t2.v3 = t1.v1, output: [t2.v3, t2.v4, t1.v1, t2._row_id, t1._row_id] }
├─StreamExchange { dist: HashShard(t2.v3) }
│ └─StreamTableScan { table: t2, columns: [t2.v3, t2.v4, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamExchange { dist: HashShard(t1.v1) }
└─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
- sql: |
create table t1 (v1 int, v2 int);
create table t2 (v3 int, v4 int);
Expand Down Expand Up @@ -79,8 +80,9 @@
└─LogicalValues { rows: [['cn':Varchar, 'China':Varchar], ['us':Varchar, 'United States':Varchar]], schema: Schema { fields: [*VALUES*_0.column_0:Varchar, *VALUES*_0.column_1:Varchar] } }
stream_plan: |-
StreamMaterialize { columns: [v, c, abbr, real, t._row_id(hidden), _row_id(hidden)], stream_key: [t._row_id, _row_id, c], pk_columns: [t._row_id, _row_id, c], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: t.c = *VALUES*_0.column_0, output: [t.v, t.c, *VALUES*_0.column_0, *VALUES*_0.column_1, t._row_id, _row_id] }
├─StreamExchange { dist: HashShard(t.c) }
│ └─StreamTableScan { table: t, columns: [t.v, t.c, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(*VALUES*_0.column_0) }
└─StreamValues { rows: [['cn':Varchar, 'China':Varchar, 0:Int64], ['us':Varchar, 'United States':Varchar, 1:Int64]] }
└─StreamExchange { dist: HashShard(t.c, t._row_id, _row_id) }
└─StreamHashJoin { type: Inner, predicate: t.c = *VALUES*_0.column_0, output: [t.v, t.c, *VALUES*_0.column_0, *VALUES*_0.column_1, t._row_id, _row_id] }
├─StreamExchange { dist: HashShard(t.c) }
│ └─StreamTableScan { table: t, columns: [t.v, t.c, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
└─StreamExchange { dist: HashShard(*VALUES*_0.column_0) }
└─StreamValues { rows: [['cn':Varchar, 'China':Varchar, 0:Int64], ['us':Varchar, 'United States':Varchar, 1:Int64]] }
Original file line number Diff line number Diff line change
Expand Up @@ -963,31 +963,35 @@
└─BatchScan { table: a, columns: [a.k1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, ak1.k1], pk_columns: [ak1.a._row_id, ak1.k1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
│ └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamHashAgg { group_key: [a.k1], aggs: [count] }
└─StreamExchange { dist: HashShard(a.k1) }
└─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(ak1.a._row_id, ak1.k1) }
└─StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├─StreamExchange { dist: HashShard(ak1.k1) }
│ └─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamHashAgg { group_key: [a.k1], aggs: [count] }
└─StreamExchange { dist: HashShard(a.k1) }
└─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v, bv, ak1.a._row_id(hidden), ak1.k1(hidden), a.k1(hidden)], stream_key: [ak1.a._row_id, ak1.k1], pk_columns: [ak1.a._row_id, ak1.k1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamExchange Hash([0]) from 1
└── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 5, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([2, 3]) from 1
Fragment 1
StreamHashJoin { type: Inner, predicate: ak1.k1 = a.k1, output: [ak1.v, count, ak1.a._row_id, ak1.k1, a.k1] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamExchange Hash([0]) from 2
└── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 5, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 3
Fragment 2
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } { state table: 4 }
├── Upstream
└── BatchPlanNode
Fragment 2
Fragment 3
Chain { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) } { state table: 6 }
├── Upstream
└── BatchPlanNode
Expand Down Expand Up @@ -1022,7 +1026,7 @@
├── columns: [ v, bv, ak1.a._row_id, ak1.k1, a.k1 ]
├── primary key: [ $2 ASC, $3 ASC ]
├── value indices: [ 0, 1, 2, 3, 4 ]
├── distribution key: [ 3 ]
├── distribution key: [ 2, 3 ]
└── read pk prefix len hint: 2
- id: aggk1_join_Ak1_onk1
Expand Down Expand Up @@ -1054,31 +1058,35 @@
└─BatchScan { table: a, columns: [a.k1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [v, bv, a.k1(hidden), ak1.a._row_id(hidden)], stream_key: [a.k1, ak1.a._row_id], pk_columns: [a.k1, ak1.a._row_id], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: a.k1 = ak1.k1, output: [ak1.v, count, a.k1, ak1.a._row_id] }
├─StreamHashAgg { group_key: [a.k1], aggs: [count] }
│ └─StreamExchange { dist: HashShard(a.k1) }
│ └─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(ak1.k1) }
└─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
└─StreamExchange { dist: HashShard(a.k1, ak1.a._row_id) }
└─StreamHashJoin { type: Inner, predicate: a.k1 = ak1.k1, output: [ak1.v, count, a.k1, ak1.a._row_id] }
├─StreamHashAgg { group_key: [a.k1], aggs: [count] }
│ └─StreamExchange { dist: HashShard(a.k1) }
│ └─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamExchange { dist: HashShard(ak1.k1) }
└─StreamTableScan { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [v, bv, a.k1(hidden), ak1.a._row_id(hidden)], stream_key: [a.k1, ak1.a._row_id], pk_columns: [a.k1, ak1.a._row_id], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: a.k1 = ak1.k1, output: [ak1.v, count, a.k1, ak1.a._row_id] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 4, state tables: [], distinct tables: [] }
│ └── StreamExchange Hash([0]) from 1
└── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([2, 3]) from 1
Fragment 1
StreamHashJoin { type: Inner, predicate: a.k1 = ak1.k1, output: [ak1.v, count, a.k1, ak1.a._row_id] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 4, state tables: [], distinct tables: [] }
│ └── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([0]) from 3
Fragment 2
Chain { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) } { state table: 5 }
├── Upstream
└── BatchPlanNode
Fragment 2
Fragment 3
Chain { table: ak1, columns: [ak1.k1, ak1.v, ak1.a._row_id], pk: [ak1.a._row_id], dist: UpstreamHashShard(ak1.k1) } { state table: 6 }
├── Upstream
└── BatchPlanNode
Expand Down Expand Up @@ -1113,7 +1121,7 @@
├── columns: [ v, bv, a.k1, ak1.a._row_id ]
├── primary key: [ $2 ASC, $3 ASC ]
├── value indices: [ 0, 1, 2, 3 ]
├── distribution key: [ 2 ]
├── distribution key: [ 2, 3 ]
└── read pk prefix len hint: 2
- id: aggk1_join_aggk1_onk1
Expand Down Expand Up @@ -1156,33 +1164,37 @@
└─BatchScan { table: b, columns: [b.k1], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1], pk_columns: [a.k1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├─StreamHashAgg { group_key: [a.k1], aggs: [count] }
│ └─StreamExchange { dist: HashShard(a.k1) }
│ └─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamHashAgg { group_key: [b.k1], aggs: [count] }
└─StreamExchange { dist: HashShard(b.k1) }
└─StreamTableScan { table: b, columns: [b.k1, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
└─StreamExchange { dist: HashShard(a.k1) }
└─StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├─StreamHashAgg { group_key: [a.k1], aggs: [count] }
│ └─StreamExchange { dist: HashShard(a.k1) }
│ └─StreamTableScan { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) }
└─StreamHashAgg { group_key: [b.k1], aggs: [count] }
└─StreamExchange { dist: HashShard(b.k1) }
└─StreamTableScan { table: b, columns: [b.k1, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [num, bv, a.k1(hidden), b.k1(hidden)], stream_key: [a.k1], pk_columns: [a.k1], pk_conflict: NoCheck }
├── materialized table: 4294967294
└── StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 4, state tables: [], distinct tables: [] }
│ └── StreamExchange Hash([0]) from 1
└── StreamHashAgg { group_key: [b.k1], aggs: [count] } { intermediate state table: 6, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 2
└── StreamExchange Hash([2]) from 1
Fragment 1
StreamHashJoin { type: Inner, predicate: a.k1 = b.k1, output: [count, count, a.k1, b.k1] }
├── left table: 0
├── right table: 2
├── left degree table: 1
├── right degree table: 3
├── StreamHashAgg { group_key: [a.k1], aggs: [count] } { intermediate state table: 4, state tables: [], distinct tables: [] }
│ └── StreamExchange Hash([0]) from 2
└── StreamHashAgg { group_key: [b.k1], aggs: [count] } { intermediate state table: 6, state tables: [], distinct tables: [] }
└── StreamExchange Hash([0]) from 3
Fragment 2
Chain { table: a, columns: [a.k1, a._row_id], pk: [a._row_id], dist: UpstreamHashShard(a._row_id) } { state table: 5 }
├── Upstream
└── BatchPlanNode
Fragment 2
Fragment 3
Chain { table: b, columns: [b.k1, b._row_id], pk: [b._row_id], dist: UpstreamHashShard(b._row_id) } { state table: 7 }
├── Upstream
└── BatchPlanNode
Expand Down
21 changes: 11 additions & 10 deletions src/frontend/planner_test/tests/testdata/output/dynamic_filter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,17 @@
└─LogicalScan { table: t2, columns: [t2.v2] }
stream_plan: |-
StreamMaterialize { columns: [v1, max, t1._row_id(hidden)], stream_key: [t1._row_id, v1], pk_columns: [t1._row_id, v1], pk_conflict: NoCheck }
└─StreamHashJoin { type: Inner, predicate: t1.v1 = max(max(t2.v2)), output: [t1.v1, max(max(t2.v2)), t1._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(max(max(t2.v2))) }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [$expr1], aggs: [max(t2.v2), count] }
└─StreamProject { exprs: [t2.v2, t2._row_id, Vnode(t2._row_id) as $expr1] }
└─StreamTableScan { table: t2, columns: [t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamExchange { dist: HashShard(t1.v1, t1._row_id) }
└─StreamHashJoin { type: Inner, predicate: t1.v1 = max(max(t2.v2)), output: [t1.v1, max(max(t2.v2)), t1._row_id] }
├─StreamExchange { dist: HashShard(t1.v1) }
│ └─StreamTableScan { table: t1, columns: [t1.v1, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(max(max(t2.v2))) }
└─StreamProject { exprs: [max(max(t2.v2))] }
└─StreamSimpleAgg { aggs: [max(max(t2.v2)), count] }
└─StreamExchange { dist: Single }
└─StreamHashAgg { group_key: [$expr1], aggs: [max(t2.v2), count] }
└─StreamProject { exprs: [t2.v2, t2._row_id, Vnode(t2._row_id) as $expr1] }
└─StreamTableScan { table: t2, columns: [t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: Dynamic filter join on unequal types
sql: |
create table t1 (v1 int);
Expand Down
Loading

0 comments on commit e818508

Please sign in to comment.