Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support CORRESPONDING specification in set operations #17891

Merged
merged 12 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions e2e_test/batch/basic/union.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,33 @@ statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 bigint);
create table t1 (v1 int, v2 bigint, v4 int);

statement ok
create table t2 (v1 int, v3 int);
create table t2 (v1 int, v3 int, v4 int);

statement ok
insert into t1 values(1, 2);
insert into t1 values(1, 2, 3);

statement ok
insert into t2 values(1, 2);
insert into t2 values(1, 2, 3);

query II
query III
select * from t1 union select * from t2
----
1 2
1 2 3

query II
query III
select * from t1 union all select * from t2
----
1 2
1 2
1 2 3
1 2 3

query II
query III
select * from t1 union all select * from t2 order by v1
----
1 2
1 2
1 2 3
1 2 3

statement error
select * from t1 union all select * from t2 order by v1 + 1
Expand Down Expand Up @@ -69,9 +69,43 @@ NULL
statement error
select null union all select null select union 1

query II
select * from t1 union all corresponding select * from t2 order by v1
----
1 3
1 3

query II
select * from t1 union corresponding select v4, v3 as v1 from t2 order by v1
----
1 3
2 3

query II
select * from t1 union all corresponding by (v4, v1) select * from t2
----
3 1
3 1

query II
select * from t1 union corresponding by (v4) select * from t2
----
3

statement error Invalid input syntax: Every <column name> in the <corresponding column list> shall be a <column name> of both left and right side. Missing column: `vxx`
select * from t1 union corresponding by (vxx) select * from t2

statement ok
create table txx (vxx int);

statement error Invalid input syntax: When CORRESPONDING is specified, at least one column of the left side shall have a <column name> that is the <column name> of some column of the right side
select * from t1 union corresponding select * from txx

statement ok
drop table t1;

statement ok
drop table t2;

statement ok
drop table txx;
98 changes: 79 additions & 19 deletions e2e_test/streaming/union.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int);
create table t1 (v1 int, v2 int, v4 int);

statement ok
create table t2 (v1 int, v3 int);
create table t2 (v1 int, v3 int, v4 int);

statement ok
create materialized view v as select * from t1 union all select * from t2;

statement ok
create materialized view v2 as select * from t1 union select * from t2;

statement ok
create materialized view v3 as select * from t1 union all corresponding select * from t2;

statement ok
create materialized view v4 as select * from t1 union corresponding by (v4, v1) select * from t2;

query II
select * from v;
----
Expand All @@ -22,64 +28,118 @@ select * from v2;
----

statement ok
insert into t1 values(1, 2);
insert into t1 values(1, 2, 3);

query II
query III
select * from v;
----
1 2
1 2 3

query II
query III
select * from v2;
----
1 2
1 2 3

query II
select * from v3;
----
1 3

query II
select * from v4;
----
3 1

statement ok
insert into t2 values(1, 2);
insert into t2 values(1, 2, 3);


query II
query III
select * from v;
----
1 2
1 2
1 2 3
1 2 3

query II
query III
select * from v2;
----
1 2
1 2 3

query II
select * from v3;
----
1 3
1 3

query II
select * from v4;
----
3 1

statement ok
delete from t1 where v1 = 1;

query II
query III
select * from v;
----
1 2
1 2 3

query II
query III
select * from v2;
----
1 2
1 2 3

query II
select * from v3;
----
1 3

query II
select * from v4;
----
3 1

statement ok
delete from t2 where v1 = 1;

query II
query III
select * from v;
----

query II
query III
select * from v2;
----

query II
select * from v3;
----

query II
select * from v4;
----


statement ok
drop materialized view v;

statement ok
drop materialized view v2;

statement ok
drop materialized view v3;

statement ok
drop materialized view v4;

statement error Invalid input syntax: Every <column name> in the <corresponding column list> shall be a <column name> of both left and right side. Missing column: `vxx`
create materialized view v5 as select * from t1 union corresponding by (vxx, v1) select * from t2

statement ok
create table txx (vxx int);

statement error Invalid input syntax: When CORRESPONDING is specified, at least one column of the left side shall have a <column name> that is the <column name> of some column of the right side
create materialized view v5 as select * from t1 union corresponding select * from txx

statement ok
drop table t1;

Expand Down
20 changes: 20 additions & 0 deletions src/frontend/planner_test/tests/testdata/input/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,23 @@
select * from t1 union all select * from t2 union all select * from t3 union all select * from t4 union all select * from t5;
expected_outputs:
- stream_dist_plan

- name: test corresponding union
sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, y bigint);
create table t3 (x int, b numeric, c bigint);
select * from t1 union corresponding select * from t2 union all corresponding by (b) select * from t3;
expected_outputs:
- batch_plan
- stream_plan
- stream_dist_plan

- name: test corresponding union error
sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, y bigint);
create table t3 (x int, b numeric, c bigint);
select * from t1 union corresponding select * from t2 union all corresponding by (c) select * from t3;
expected_outputs:
- binder_error
97 changes: 97 additions & 0 deletions src/frontend/planner_test/tests/testdata/output/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -639,3 +639,100 @@
├── distribution key: [ 0, 1, 3 ]
└── read pk prefix len hint: 3

- name: test corresponding union
sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, y bigint);
create table t3 (x int, b numeric, c bigint);
select * from t1 union corresponding select * from t2 union all corresponding by (b) select * from t3;
batch_plan: |-
BatchUnion { all: true }
├─BatchExchange { order: [], dist: Single }
│ └─BatchProject { exprs: [t1.b] }
│ └─BatchHashAgg { group_key: [t1.a, t1.b], aggs: [] }
│ └─BatchExchange { order: [], dist: HashShard(t1.a, t1.b) }
│ └─BatchUnion { all: true }
│ ├─BatchExchange { order: [], dist: Single }
│ │ └─BatchScan { table: t1, columns: [t1.a, t1.b], distribution: SomeShard }
│ └─BatchExchange { order: [], dist: Single }
│ └─BatchScan { table: t2, columns: [t2.a, t2.b], distribution: SomeShard }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t3, columns: [t3.b], distribution: SomeShard }
stream_plan: |-
StreamMaterialize { columns: [b, t1.a(hidden), t1.b(hidden), null:Serial(hidden), $src(hidden)], stream_key: [t1.a, t1.b, null:Serial, $src], pk_columns: [t1.a, t1.b, null:Serial, $src], pk_conflict: NoCheck }
└─StreamUnion { all: true }
├─StreamExchange { dist: HashShard(t1.a, t1.b, null:Serial, 0:Int32) }
│ └─StreamProject { exprs: [t1.b, t1.a, t1.b, null:Serial, 0:Int32], noop_update_hint: true }
│ └─StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] }
│ └─StreamExchange { dist: HashShard(t1.a, t1.b) }
│ └─StreamUnion { all: true }
│ ├─StreamExchange { dist: HashShard(t1._row_id, 0:Int32) }
│ │ └─StreamProject { exprs: [t1.a, t1.b, t1._row_id, 0:Int32] }
│ │ └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
│ └─StreamExchange { dist: HashShard(t2._row_id, 1:Int32) }
│ └─StreamProject { exprs: [t2.a, t2.b, t2._row_id, 1:Int32] }
│ └─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
└─StreamExchange { dist: HashShard(null:Int32, null:Decimal, t3._row_id, 1:Int32) }
└─StreamProject { exprs: [t3.b, null:Int32, null:Decimal, t3._row_id, 1:Int32] }
└─StreamTableScan { table: t3, columns: [t3.b, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) }
stream_dist_plan: |+
Fragment 0
StreamMaterialize { columns: [b, t1.a(hidden), t1.b(hidden), null:Serial(hidden), $src(hidden)], stream_key: [t1.a, t1.b, null:Serial, $src], pk_columns: [t1.a, t1.b, null:Serial, $src], pk_conflict: NoCheck }
├── tables: [ Materialize: 4294967294 ]
└── StreamUnion { all: true }
├── StreamExchange Hash([1, 2, 3, 4]) from 1
└── StreamExchange Hash([1, 2, 3, 4]) from 5

Fragment 1
StreamProject { exprs: [t1.b, t1.a, t1.b, null:Serial, 0:Int32], noop_update_hint: true }
└── StreamHashAgg { group_key: [t1.a, t1.b], aggs: [count] } { tables: [ HashAggState: 0 ] }
└── StreamExchange Hash([0, 1]) from 2

Fragment 2
StreamUnion { all: true }
├── StreamExchange Hash([2, 3]) from 3
└── StreamExchange Hash([2, 3]) from 4

Fragment 3
StreamProject { exprs: [t1.a, t1.b, t1._row_id, 0:Int32] }
└── StreamTableScan { table: t1, columns: [t1.a, t1.b, t1._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t1._row_id], pk: [_row_id], dist: UpstreamHashShard(t1._row_id) }
├── tables: [ StreamScan: 1 ]
├── Upstream
└── BatchPlanNode

Fragment 4
StreamProject { exprs: [t2.a, t2.b, t2._row_id, 1:Int32] }
└── StreamTableScan { table: t2, columns: [t2.a, t2.b, t2._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t2._row_id], pk: [_row_id], dist: UpstreamHashShard(t2._row_id) }
├── tables: [ StreamScan: 2 ]
├── Upstream
└── BatchPlanNode

Fragment 5
StreamProject { exprs: [t3.b, null:Int32, null:Decimal, t3._row_id, 1:Int32] }
└── StreamTableScan { table: t3, columns: [t3.b, t3._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t3._row_id], pk: [_row_id], dist: UpstreamHashShard(t3._row_id) }
├── tables: [ StreamScan: 3 ]
├── Upstream
└── BatchPlanNode

Table 0 { columns: [ t1_a, t1_b, count ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0, 1 ], read pk prefix len hint: 2 }

Table 1 { columns: [ vnode, _row_id, backfill_finished, row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }

Table 2 { columns: [ vnode, _row_id, backfill_finished, row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }

Table 3 { columns: [ vnode, _row_id, backfill_finished, row_count ], primary key: [ $0 ASC ], value indices: [ 1, 2, 3 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }

Table 4294967294
├── columns: [ b, t1.a, t1.b, null:Serial, $src ]
├── primary key: [ $1 ASC, $2 ASC, $3 ASC, $4 ASC ]
├── value indices: [ 0, 1, 2, 3, 4 ]
├── distribution key: [ 1, 2, 3, 4 ]
└── read pk prefix len hint: 4

- name: test corresponding union error
sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, y bigint);
create table t3 (x int, b numeric, c bigint);
select * from t1 union corresponding select * from t2 union all corresponding by (c) select * from t3;
binder_error: 'Invalid input syntax: Every <column name> in the <corresponding column list> shall be a <column name> of both left and right side. Missing column: `c`'
Loading
Loading