Skip to content

Commit

Permalink
feat: Experimentally introducing sink into table (#13185)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Dec 9, 2023
1 parent 5596207 commit 19f4254
Show file tree
Hide file tree
Showing 50 changed files with 2,036 additions and 435 deletions.
365 changes: 365 additions & 0 deletions e2e_test/sink/sink_into_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,365 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

# currently, we only support single sink into table

statement ok
create table t_simple (v1 int, v2 int);

statement ok
create table m_simple (v1 int primary key, v2 int);

statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

statement error Feature is not yet implemented: create sink into table with incoming sinks
create sink s_simple_2 into m_simple as select v1, v2 from t_simple;

# and we can't alter a table with incoming sinks
statement error Feature is not yet implemented: alter table with incoming sinks
alter table m_simple add column v3 int;

statement ok
drop sink s_simple_1;

statement ok
drop table t_simple;

statement ok
drop table m_simple;

# drop table with associated sink
statement ok
create table t_simple (v1 int, v2 int);

statement ok
create table m_simple (v1 int primary key, v2 int);

statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

statement ok
drop table m_simple;

statement ok
drop table t_simple;

# target table with row_id as primary key
statement ok
create table t_s1 (v1 int, v2 int);

statement ok
insert into t_s1 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_row_id_as_primary_key (v1 int, v2 int, v3 int default 1000, v4 int as v1 + v2);

statement error Only append-only sinks can sink to a table without primary keys.
create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1;

statement ok
create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query IIII rowsort
select * from t_row_id_as_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s1 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_row_id_as_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
insert into t_row_id_as_primary_key values (100, 100);

query IIII
select * from t_row_id_as_primary_key order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22
100 100 1000 200

# test append only
statement ok
update t_s1 set v2 = 10 where v1 > 3;

query IIII
select * from t_row_id_as_primary_key order by v1, v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
delete from t_s1;

query IIII
select * from t_row_id_as_primary_key order by v1,v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
drop sink s1;

statement ok
drop table t_row_id_as_primary_key;

statement ok
drop table t_s1;


# target table with append only

statement ok
create table t_s2 (v1 int, v2 int);

statement ok
insert into t_s2 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_append_only (v1 int, v2 int, v3 int default 1000, v4 int as v1 + v2) append only;

statement error Only append-only sinks can sink to a table without primary keys.
create sink s2 into t_append_only as select v1, v2 from t_s2;

statement ok
create sink s2 into t_append_only as select v1, v2 from t_s2 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query IIII rowsort
select * from t_append_only;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s2 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_append_only;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
insert into t_append_only values (100, 100);

query IIII
select * from t_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22
100 100 1000 200

# test append only
statement ok
update t_s2 set v2 = 10 where v1 > 3;

query IIII
select * from t_append_only order by v1, v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
delete from t_s2;

query IIII
select * from t_append_only order by v1,v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
drop sink s2;

statement ok
drop table t_append_only;

statement ok
drop table t_s2;


# target table with primary key

statement ok
create table t_s3 (v1 int, v2 int);

statement ok
insert into t_s3 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_primary_key (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2);

statement ok
create sink s3 into t_primary_key as select v1, v2 from t_s3;

statement ok
flush;

query IIII rowsort
select * from t_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s3 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
delete from t_s3 where v1 > 3;

query IIII rowsort
select * from t_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
update t_s3 set v2 = 111 where v1 = 1;

query IIII rowsort
select * from t_primary_key;
----
1 111 1000 112
2 12 1000 14
3 13 1000 16

statement ok
insert into t_primary_key values (100, 100);

query IIII
select * from t_primary_key order by v1;
----
1 111 1000 112
2 12 1000 14
3 13 1000 16
100 100 1000 200

statement ok
delete from t_s3 where v1 > 3;

query IIII
select * from t_primary_key order by v1;
----
1 111 1000 112
2 12 1000 14
3 13 1000 16
100 100 1000 200

statement ok
drop sink s3;

statement ok
drop table t_primary_key;

statement ok
drop table t_s3;

# cycle check

statement ok
create table t_a(v int primary key);

statement ok
create table t_b(v int primary key);

statement ok
create table t_c(v int primary key);

statement ok
create sink s_a into t_b as select v from t_a;

statement ok
create sink s_b into t_c as select v from t_b;

statement error Creating such a sink will result in circular dependency
create sink s_c into t_a as select v from t_c;

statement ok
drop sink s_a;

statement ok
drop sink s_b;

statement ok
drop table t_a;

statement ok
drop table t_b;

statement ok
drop table t_c;
6 changes: 6 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ message Sink {
string sink_from_name = 18;
StreamJobStatus stream_job_status = 19;
SinkFormatDesc format_desc = 20;

// Target table id (only applicable for table sink)
optional uint32 target_table = 21;
}

message Connection {
Expand Down Expand Up @@ -287,6 +290,9 @@ message Table {
// This field is used to store the description set by the `comment on` clause.
optional string description = 33;

// This field is used to mark the the sink into this table.
repeated uint32 incoming_sinks = 34;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message SinkParam {
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
optional uint32 target_table = 8;
}

enum SinkPayloadFormat {
Expand Down
Loading

0 comments on commit 19f4254

Please sign in to comment.