diff --git a/src/frontend/planner_test/tests/testdata/input/union.yaml b/src/frontend/planner_test/tests/testdata/input/union.yaml index 2d7a005d12e21..8775d4f9d36f2 100644 --- a/src/frontend/planner_test/tests/testdata/input/union.yaml +++ b/src/frontend/planner_test/tests/testdata/input/union.yaml @@ -53,3 +53,45 @@ expected_outputs: - batch_plan - optimized_logical_plan_for_batch +- name: test merged union stream key (2 columns, row_id + src_col) + sql: | + create table t1 (a int, b numeric, c bigint); + create table t2 (a int, b numeric, c bigint); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - batch_plan + - stream_plan + - stream_dist_plan +- name: test merged union stream key (5 columns, row_id + src_col + a + b + c) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (c)); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - stream_dist_plan +- name: test merged union stream key (4 columns, row_id + src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - stream_dist_plan +- name: test merged union stream key (3 columns, src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (b)); + create table t4 (a int, b numeric, c bigint, primary key (b, a)); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + expected_outputs: + - stream_dist_plan diff --git a/src/frontend/planner_test/tests/testdata/output/share.yaml b/src/frontend/planner_test/tests/testdata/output/share.yaml index 4495a3deeaaf9..2815b00784b1d 100644 --- a/src/frontend/planner_test/tests/testdata/output/share.yaml +++ b/src/frontend/planner_test/tests/testdata/output/share.yaml @@ -150,7 +150,7 @@ create table t(a int, b int); with cte as (select count(*) from t) select * from cte union all select * from cte; stream_plan: |- - StreamMaterialize { columns: [count, 0:Int32(hidden)], stream_key: [0:Int32], pk_columns: [0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [count, $src(hidden)], stream_key: [$src], pk_columns: [$src], pk_conflict: NoCheck } └─StreamUnion { all: true } ├─StreamExchange { dist: HashShard(0:Int32) } │ └─StreamProject { exprs: [sum0(count), 0:Int32] } @@ -173,7 +173,7 @@ create table t(a int, b int); with cte as (select count(*) from t) select * from cte union all select * from cte; stream_plan: |- - StreamMaterialize { columns: [count, 0:Int32(hidden)], stream_key: [0:Int32], pk_columns: [0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [count, $src(hidden)], stream_key: [$src], pk_columns: [$src], pk_conflict: NoCheck } └─StreamUnion { all: true } ├─StreamExchange { dist: HashShard(0:Int32) } │ └─StreamProject { exprs: [sum0(count), 0:Int32] } diff --git a/src/frontend/planner_test/tests/testdata/output/union.yaml b/src/frontend/planner_test/tests/testdata/output/union.yaml index bc4c1fd4f1d50..14e7b7e65cb70 100644 --- a/src/frontend/planner_test/tests/testdata/output/union.yaml +++ b/src/frontend/planner_test/tests/testdata/output/union.yaml @@ -10,39 +10,56 @@ └─BatchExchange { order: [], dist: Single } └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard } stream_plan: |- - StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], stream_key: [t1._row_id, null:Serial, 0:Int32], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } └─StreamUnion { all: true } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } stream_dist_plan: |+ Fragment 0 - StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], stream_key: [t1._row_id, null:Serial, 0:Int32], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: NoCheck } + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } ├── materialized table: 4294967294 └── StreamUnion { all: true } - ├── StreamExchange Hash([3, 4, 5]) from 1 - └── StreamExchange Hash([3, 4, 5]) from 2 + ├── StreamExchange Hash([3, 4]) from 1 + └── StreamExchange Hash([3, 4]) from 2 Fragment 1 - StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } └── Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } { state table: 0 } ├── Upstream └── BatchPlanNode Fragment 2 - StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └── Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } { state table: 1 } ├── Upstream └── BatchPlanNode - Table 0 { columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 0 + ├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 - Table 1 { columns: [ vnode, _row_id, t2_backfill_finished, t2_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + Table 1 + ├── columns: [ vnode, _row_id, t2_backfill_finished, t2_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 - Table 4294967294 { columns: [ a, b, c, t1._row_id, null:Serial, 0:Int32 ], primary key: [ $3 ASC, $4 ASC, $5 ASC ], value indices: [ 0, 1, 2, 3, 4, 5 ], distribution key: [ 3, 4, 5 ], read pk prefix len hint: 3 } + Table 4294967294 + ├── columns: [ a, b, c, t1._row_id, $src ] + ├── primary key: [ $3 ASC, $4 ASC ] + ├── value indices: [ 0, 1, 2, 3, 4 ] + ├── distribution key: [ 3, 4 ] + └── read pk prefix len hint: 2 - sql: | create table t1 (a int, b numeric, c bigint); @@ -68,11 +85,11 @@ └─StreamHashAgg { group_key: [t1.a, t1.b, t1.c], aggs: [count] } └─StreamExchange { dist: HashShard(t1.a, t1.b, t1.c) } └─StreamUnion { all: true } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } stream_dist_plan: |+ Fragment 0 @@ -87,18 +104,18 @@ Fragment 1 StreamUnion { all: true } - ├── StreamExchange Hash([3, 4, 5]) from 2 - └── StreamExchange Hash([3, 4, 5]) from 3 + ├── StreamExchange Hash([3, 4]) from 2 + └── StreamExchange Hash([3, 4]) from 3 Fragment 2 - StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Serial, 0:Int32] } + StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } └── Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } ├── state table: 1 ├── Upstream └── BatchPlanNode Fragment 3 - StreamProject { exprs: [t2.a, t2.b, t2.c, null:Serial, t2._row_id, 1:Int32] } + StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } └── Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } ├── state table: 2 ├── Upstream @@ -292,3 +309,347 @@ └─BatchHashAgg { group_key: [1:Int32], aggs: [] } └─BatchExchange { order: [], dist: HashShard(1:Int32) } └─BatchValues { rows: [[1:Int32], [2:Int32], [3:Int32], [4:Int32], [5:Int32], [5:Int32]] } +- name: test merged union stream key (2 columns, row_id + src_col) + sql: | + create table t1 (a int, b numeric, c bigint); + create table t2 (a int, b numeric, c bigint); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + batch_plan: |- + BatchUnion { all: true } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t3, columns: [t3.a, t3.b, t3.c], distribution: SomeShard } + ├─BatchExchange { order: [], dist: Single } + │ └─BatchScan { table: t4, columns: [t4.a, t4.b, t4.c], distribution: SomeShard } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t5, columns: [t5.a, t5.b, t5.c], distribution: SomeShard } + stream_plan: |- + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } + └─StreamUnion { all: true } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } + │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } + ├─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + │ └─StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } + │ └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } + ├─StreamExchange { dist: HashShard(t3._row_id, 2:Int32) } + │ └─StreamProject { exprs: [t3.a, t3.b, t3.c, t3._row_id, 2:Int32] } + │ └─StreamTableScan { table: t3, columns: [t3.a, t3.b, t3.c, t3._row_id], pk: [t3._row_id], dist: UpstreamHashShard(t3._row_id) } + ├─StreamExchange { dist: HashShard(t4._row_id, 3:Int32) } + │ └─StreamProject { exprs: [t4.a, t4.b, t4.c, t4._row_id, 3:Int32] } + │ └─StreamTableScan { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } + └─StreamExchange { dist: HashShard(t5._row_id, 4:Int32) } + └─StreamProject { exprs: [t5.a, t5.b, t5.c, t5._row_id, 4:Int32] } + └─StreamTableScan { table: t5, columns: [t5.a, t5.b, t5.c, t5._row_id], pk: [t5._row_id], dist: UpstreamHashShard(t5._row_id) } + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([3, 4]) from 1 + ├── StreamExchange Hash([3, 4]) from 2 + ├── StreamExchange Hash([3, 4]) from 3 + ├── StreamExchange Hash([3, 4]) from 4 + └── StreamExchange Hash([3, 4]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, t2._row_id, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, t3._row_id, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c, t3._row_id], pk: [t3._row_id], dist: UpstreamHashShard(t3._row_id) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, t4._row_id, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, t5._row_id, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c, t5._row_id], pk: [t5._row_id], dist: UpstreamHashShard(t5._row_id) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 + ├── columns: [ vnode, _row_id, t1_backfill_finished, t1_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 1 + ├── columns: [ vnode, _row_id, t2_backfill_finished, t2_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 2 + ├── columns: [ vnode, _row_id, t3_backfill_finished, t3_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 3 + ├── columns: [ vnode, _row_id, t4_backfill_finished, t4_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4 + ├── columns: [ vnode, _row_id, t5_backfill_finished, t5_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4294967294 + ├── columns: [ a, b, c, t1._row_id, $src ] + ├── primary key: [ $3 ASC, $4 ASC ] + ├── value indices: [ 0, 1, 2, 3, 4 ] + ├── distribution key: [ 3, 4 ] + └── read pk prefix len hint: 2 + +- name: test merged union stream key (5 columns, row_id + src_col + a + b + c) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (c)); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, t1.a(hidden), null:Int64(hidden), null:Decimal(hidden), null:Serial(hidden), $src(hidden)], stream_key: [t1.a, null:Decimal, null:Int64, null:Serial, $src], pk_columns: [t1.a, null:Decimal, null:Int64, null:Serial, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 1 + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 2 + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 3 + ├── StreamExchange Hash([3, 5, 4, 6, 7]) from 4 + └── StreamExchange Hash([3, 5, 4, 6, 7]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, t1.a, null:Int64, null:Decimal, null:Serial, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int32, null:Int64, t2.b, null:Serial, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.b], dist: UpstreamHashShard(t2.b) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, null:Int32, t3.c, null:Decimal, null:Serial, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c], pk: [t3.c], dist: UpstreamHashShard(t3.c) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, null:Int32, null:Int64, null:Decimal, t4._row_id, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, t5.a, null:Int64, t5.b, null:Serial, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c], pk: [t5.a, t5.b], dist: UpstreamHashShard(t5.a, t5.b) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 { columns: [ vnode, a, t1_backfill_finished, t1_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 1 { columns: [ vnode, b, t2_backfill_finished, t2_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 2 { columns: [ vnode, c, t3_backfill_finished, t3_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 3 { columns: [ vnode, _row_id, t4_backfill_finished, t4_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4 { columns: [ vnode, a, b, t5_backfill_finished, t5_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3, 4 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4294967294 { columns: [ a, b, c, t1.a, null:Int64, null:Decimal, null:Serial, $src ], primary key: [ $3 ASC, $5 ASC, $4 ASC, $6 ASC, $7 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6, 7 ], distribution key: [ 3, 5, 4, 6, 7 ], read pk prefix len hint: 5 } + +- name: test merged union stream key (4 columns, row_id + src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint); + create table t4 (a int, b numeric, c bigint); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, t1.a(hidden), null:Decimal(hidden), null:Serial(hidden), $src(hidden)], stream_key: [t1.a, null:Decimal, null:Serial, $src], pk_columns: [t1.a, null:Decimal, null:Serial, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([3, 4, 5, 6]) from 1 + ├── StreamExchange Hash([3, 4, 5, 6]) from 2 + ├── StreamExchange Hash([3, 4, 5, 6]) from 3 + ├── StreamExchange Hash([3, 4, 5, 6]) from 4 + └── StreamExchange Hash([3, 4, 5, 6]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, t1.a, null:Decimal, null:Serial, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int32, t2.b, null:Serial, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.b], dist: UpstreamHashShard(t2.b) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, null:Int32, null:Decimal, t3._row_id, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c, t3._row_id], pk: [t3._row_id], dist: UpstreamHashShard(t3._row_id) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, null:Int32, null:Decimal, t4._row_id, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c, t4._row_id], pk: [t4._row_id], dist: UpstreamHashShard(t4._row_id) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, t5.a, t5.b, null:Serial, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c], pk: [t5.a, t5.b], dist: UpstreamHashShard(t5.a, t5.b) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 { columns: [ vnode, a, t1_backfill_finished, t1_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 1 { columns: [ vnode, b, t2_backfill_finished, t2_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 2 { columns: [ vnode, _row_id, t3_backfill_finished, t3_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 3 { columns: [ vnode, _row_id, t4_backfill_finished, t4_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4 { columns: [ vnode, a, b, t5_backfill_finished, t5_row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3, 4 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 } + + Table 4294967294 { columns: [ a, b, c, t1.a, null:Decimal, null:Serial, $src ], primary key: [ $3 ASC, $4 ASC, $5 ASC, $6 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6 ], distribution key: [ 3, 4, 5, 6 ], read pk prefix len hint: 4 } + +- name: test merged union stream key (3 columns, src_col + a + b) + sql: | + create table t1 (a int, b numeric, c bigint, primary key (a)); + create table t2 (a int, b numeric, c bigint, primary key (b)); + create table t3 (a int, b numeric, c bigint, primary key (b)); + create table t4 (a int, b numeric, c bigint, primary key (b, a)); + create table t5 (a int, b numeric, c bigint, primary key (a, b)); + select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5; + stream_dist_plan: |+ + Fragment 0 + StreamMaterialize { columns: [a, b, c, $src(hidden)], stream_key: [a, b, $src], pk_columns: [a, b, $src], pk_conflict: NoCheck } + ├── materialized table: 4294967294 + └── StreamUnion { all: true } + ├── StreamExchange Hash([0, 1, 3]) from 1 + ├── StreamExchange Hash([0, 1, 3]) from 2 + ├── StreamExchange Hash([0, 1, 3]) from 3 + ├── StreamExchange Hash([0, 1, 3]) from 4 + └── StreamExchange Hash([0, 1, 3]) from 5 + + Fragment 1 + StreamProject { exprs: [t1.a, t1.b, t1.c, 0:Int32] } + └── Chain { table: t1, columns: [t1.a, t1.b, t1.c], pk: [t1.a], dist: UpstreamHashShard(t1.a) } { state table: 0 } + ├── Upstream + └── BatchPlanNode + + Fragment 2 + StreamProject { exprs: [t2.a, t2.b, t2.c, 1:Int32] } + └── Chain { table: t2, columns: [t2.a, t2.b, t2.c], pk: [t2.b], dist: UpstreamHashShard(t2.b) } { state table: 1 } + ├── Upstream + └── BatchPlanNode + + Fragment 3 + StreamProject { exprs: [t3.a, t3.b, t3.c, 2:Int32] } + └── Chain { table: t3, columns: [t3.a, t3.b, t3.c], pk: [t3.b], dist: UpstreamHashShard(t3.b) } { state table: 2 } + ├── Upstream + └── BatchPlanNode + + Fragment 4 + StreamProject { exprs: [t4.a, t4.b, t4.c, 3:Int32] } + └── Chain { table: t4, columns: [t4.a, t4.b, t4.c], pk: [t4.b, t4.a], dist: UpstreamHashShard(t4.a, t4.b) } { state table: 3 } + ├── Upstream + └── BatchPlanNode + + Fragment 5 + StreamProject { exprs: [t5.a, t5.b, t5.c, 4:Int32] } + └── Chain { table: t5, columns: [t5.a, t5.b, t5.c], pk: [t5.a, t5.b], dist: UpstreamHashShard(t5.a, t5.b) } { state table: 4 } + ├── Upstream + └── BatchPlanNode + + Table 0 + ├── columns: [ vnode, a, t1_backfill_finished, t1_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 1 + ├── columns: [ vnode, b, t2_backfill_finished, t2_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 2 + ├── columns: [ vnode, b, t3_backfill_finished, t3_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 3 + ├── columns: [ vnode, b, a, t4_backfill_finished, t4_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3, 4 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4 + ├── columns: [ vnode, a, b, t5_backfill_finished, t5_row_count ] + ├── primary key: [ $0 ASC ] + ├── value indices: [ 1, 2, 3, 4 ] + ├── distribution key: [ 0 ] + ├── read pk prefix len hint: 1 + └── vnode column idx: 0 + + Table 4294967294 + ├── columns: [ a, b, c, $src ] + ├── primary key: [ $0 ASC, $1 ASC, $3 ASC ] + ├── value indices: [ 0, 1, 2, 3 ] + ├── distribution key: [ 0, 1, 3 ] + └── read pk prefix len hint: 3 + diff --git a/src/frontend/planner_test/tests/testdata/output/watermark.yaml b/src/frontend/planner_test/tests/testdata/output/watermark.yaml index d1916a33192c6..e4ef42b121528 100644 --- a/src/frontend/planner_test/tests/testdata/output/watermark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/watermark.yaml @@ -140,13 +140,13 @@ create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only; select * from t1 Union all select * from t2; stream_plan: |- - StreamMaterialize { columns: [ts, v1, v2, t1._row_id(hidden), null:Serial(hidden), 0:Int32(hidden)], stream_key: [t1._row_id, null:Serial, 0:Int32], pk_columns: [t1._row_id, null:Serial, 0:Int32], pk_conflict: NoCheck, watermark_columns: [ts] } + StreamMaterialize { columns: [ts, v1, v2, t1._row_id(hidden), $src(hidden)], stream_key: [t1._row_id, $src], pk_columns: [t1._row_id, $src], pk_conflict: NoCheck, watermark_columns: [ts] } └─StreamUnion { all: true, output_watermarks: [t1.ts] } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Serial, 0:Int32], output_watermarks: [t1.ts] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, 0:Int32], output_watermarks: [t1.ts] } │ └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Serial, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } └─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } - name: union sql: | @@ -159,11 +159,11 @@ └─StreamExchange { dist: HashShard(t1.ts, t1.v1, t1.v2) } └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2], output_watermarks: [t1.ts] } └─StreamUnion { all: true, output_watermarks: [t1.ts] } - ├─StreamExchange { dist: HashShard(t1._row_id, null:Serial, 0:Int32) } - │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, null:Serial, 0:Int32], output_watermarks: [t1.ts] } + ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) } + │ └─StreamProject { exprs: [t1.ts, t1.v1, t1.v2, t1._row_id, 0:Int32], output_watermarks: [t1.ts] } │ └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) } - └─StreamExchange { dist: HashShard(null:Serial, t2._row_id, 1:Int32) } - └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, null:Serial, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } + └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) } + └─StreamProject { exprs: [t2.ts, t2.v1, t2.v2, t2._row_id, 1:Int32], output_watermarks: [t2.ts] } └─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) } - name: tumble sql: | diff --git a/src/frontend/src/optimizer/plan_node/generic/union.rs b/src/frontend/src/optimizer/plan_node/generic/union.rs index bc736eed4e153..3e6a5b9b9bab6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/union.rs +++ b/src/frontend/src/optimizer/plan_node/generic/union.rs @@ -33,7 +33,13 @@ pub struct Union { impl GenericPlanNode for Union { fn schema(&self) -> Schema { - self.inputs[0].schema().clone() + let mut schema = self.inputs[0].schema().clone(); + if let Some(source_col) = self.source_col { + schema.fields[source_col].name = "$src".to_string(); + schema + } else { + schema + } } fn stream_key(&self) -> Option> { diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index 10371fda3c2b0..51e4e620cf4ca 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::max; +use std::collections::BTreeMap; + use itertools::Itertools; use risingwave_common::catalog::Schema; use risingwave_common::error::Result; @@ -174,7 +177,7 @@ impl ToStream for LogicalUnion { rewrites.push(input.logical_rewrite_for_stream(ctx)?); } - let original_schema_contain_all_input_pks = + let original_schema_contain_all_input_stream_keys = rewrites.iter().all(|(new_input, col_index_mapping)| { let original_schema_new_pos = (0..original_schema_len) .map(|x| col_index_mapping.map(x)) @@ -185,7 +188,7 @@ impl ToStream for LogicalUnion { .all(|x| original_schema_new_pos.contains(x)) }); - if original_schema_contain_all_input_pks { + if original_schema_contain_all_input_stream_keys { // Add one more column at the end of the original schema to identify the record came // from which input. [original_schema + source_col] let new_inputs = rewrites @@ -223,29 +226,45 @@ impl ToStream for LogicalUnion { Ok((new_union.into(), out_col_change)) } else { // In order to ensure all inputs have the same schema for new union, we construct new - // schema like that: [original_schema + input1_pk + input2_pk + ... + - // source_col] - let input_pk_types = rewrites - .iter() - .flat_map(|(new_input, _)| { - new_input - .expect_stream_key() - .iter() - .map(|x| new_input.schema().fields[*x].data_type()) - }) - .collect_vec(); - let input_pk_nulls = input_pk_types + // schema like that: [original_schema + merged_stream_key + source_col] + // where merged_stream_key is merged by the types of each input stream key. + // If all inputs have the same stream key column types, we have a small merged_stream_key. Otherwise, we will have a large merged_stream_key. + + let (merged_stream_key_types, types_offset) = { + let mut max_types_counter = BTreeMap::default(); + for (new_input, _) in &rewrites { + let mut types_counter = BTreeMap::default(); + for x in new_input.expect_stream_key() { + types_counter + .entry(new_input.schema().fields[*x].data_type()) + .and_modify(|x| *x += 1) + .or_insert(1); + } + for (key, val) in types_counter { + max_types_counter + .entry(key) + .and_modify(|x| *x = max(*x, val)) + .or_insert(val); + } + } + + let mut merged_stream_key_types = vec![]; + let mut types_offset = BTreeMap::default(); + let mut offset = 0; + for (key, val) in max_types_counter { + let _ = types_offset.insert(key.clone(), offset); + offset += val; + merged_stream_key_types.extend(std::iter::repeat(key.clone()).take(val)); + } + + (merged_stream_key_types, types_offset) + }; + + let input_stream_key_nulls = merged_stream_key_types .iter() .map(|t| ExprImpl::Literal(Literal::new(None, t.clone()).into())) .collect_vec(); - let input_pk_lens = rewrites - .iter() - .map(|(new_input, _)| new_input.expect_stream_key().len()) - .collect_vec(); - let mut input_pk_offsets = vec![0]; - for (i, len) in input_pk_lens.into_iter().enumerate() { - input_pk_offsets.push(input_pk_offsets[i] + len) - } + let new_inputs = rewrites .into_iter() .enumerate() @@ -262,18 +281,22 @@ impl ToStream for LogicalUnion { ) }) .collect_vec(); - // input1_pk + input2_pk + ... - let mut input_pks = input_pk_nulls.clone(); - for (j, pk_idx) in new_input.expect_stream_key().iter().enumerate() { - input_pks[input_pk_offsets[i] + j] = ExprImpl::InputRef( - InputRef::new( - *pk_idx, - new_input.schema().fields[*pk_idx].data_type.clone(), - ) - .into(), - ); + // merged_stream_key + let mut input_stream_keys = input_stream_key_nulls.clone(); + let mut types_counter = BTreeMap::default(); + for stream_key_idx in new_input.expect_stream_key() { + let data_type = + new_input.schema().fields[*stream_key_idx].data_type.clone(); + let count = *types_counter + .entry(data_type.clone()) + .and_modify(|x| *x += 1) + .or_insert(1); + let type_start_offset = *types_offset.get(&data_type).unwrap(); + + input_stream_keys[type_start_offset + count - 1] = + ExprImpl::InputRef(InputRef::new(*stream_key_idx, data_type).into()); } - exprs.extend(input_pks); + exprs.extend(input_stream_keys); // source_col exprs.push(ExprImpl::Literal( Literal::new(Some((i as i32).to_scalar_value()), DataType::Int32).into(), @@ -285,7 +308,7 @@ impl ToStream for LogicalUnion { let new_union = LogicalUnion::new_with_source_col( self.all(), new_inputs, - Some(original_schema_len + input_pk_types.len()), + Some(original_schema_len + merged_stream_key_types.len()), ); // We have already used project to map rewrite input to the origin schema, so we can use // identity with the new schema len.