Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Experimentally introducing sink into table #13185

Merged
merged 42 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8691d95
Update `fill_cdc_mysql_server_id` to conditionally add server ID if n…
shanicky Nov 8, 2023
43e3953
Added modules and commented code blocks in `create_table.rs`. Importe…
shanicky Sep 11, 2023
c6facb3
Code cleanup: Removal of unnecessary println! statements
shanicky Sep 19, 2023
8480a2f
Added modules and commented code blocks in `create_table.rs`. Importe…
shanicky Sep 11, 2023
0021be9
Added new Table variant, commented out println, and modified ddl_serv…
shanicky Sep 21, 2023
4f8a4f1
Merge message types, modify imports, add new functions, and modify st…
shanicky Sep 22, 2023
1465ee4
Add `ReplaceTableChange` to `ddl_service.proto` and update `CreateSin…
shanicky Oct 13, 2023
85e7264
Code changes: Module relocation, print statement removal, comment add…
shanicky Nov 6, 2023
b040e3b
Added "incoming_sinks" field to TableCatalogBuilder struct
shanicky Nov 6, 2023
20f3c59
Add empty vector to incoming_sinks in table_catalog.rs, remove return…
shanicky Nov 7, 2023
9cb1d77
Refactor imports and functions, remove unused code, add new functiona…
shanicky Nov 7, 2023
dbbfc72
Update code with various changes: single sink, specific attributes, e…
shanicky Nov 7, 2023
339aae0
Remove `println!` statements in code.
shanicky Nov 7, 2023
7bb87eb
Code changes: remove unused argument, modify function names and logic…
shanicky Nov 8, 2023
30443a9
Import Arc, SessionImpl, add fetch_table_catalog_for_alter, refactor …
shanicky Nov 9, 2023
06945e7
Update `sink_into_name` to `target_table`, rename `generate_table` to…
shanicky Nov 10, 2023
25f9de6
Modified `SinkCatalog`'s `target_table` field, changed `external.rs`,…
shanicky Nov 10, 2023
53f711d
Add new tables, sinks, drops and cycle check.
shanicky Nov 10, 2023
cf60c6e
correct handle incoming sink
shanicky Nov 10, 2023
32871d7
Fix import, create Arc instances in handle functions
shanicky Nov 13, 2023
f3dfae1
Error handling implemented, mutable reference acquired, function remo…
shanicky Nov 16, 2023
5f13596
Code changes: import `PbReplaceTablePlan`, `ReplaceTablePlan`, remove…
shanicky Nov 21, 2023
f324cf7
Updated `catalog.proto` and other files, added `target_table` field a…
shanicky Nov 23, 2023
9558987
Refactoring: Remove Barrier from stream_plan.proto, refactor import i…
shanicky Nov 23, 2023
a25d941
Add comment explaining affected_table_change field in CreateSinkRequest.
shanicky Nov 23, 2023
cf07820
Modified error message and removed functions in alter_table_column.rs…
shanicky Nov 23, 2023
6a6ac5a
Modifies `impl PlanRoot` and adds `with_upstreams` method.
shanicky Nov 24, 2023
f09f448
Refactor imports and remove unused ones
shanicky Nov 25, 2023
62976a9
fix default column
shanicky Nov 27, 2023
c1764bf
Modify `sink_into_table.slt` and `CatalogManager` in `catalog`.
shanicky Nov 27, 2023
2f23be8
add default value column for sink into table
shanicky Nov 27, 2023
ab18fb2
move check cycle to meta
shanicky Nov 27, 2023
4807201
fix conflict
shanicky Dec 5, 2023
2911f85
Updated error messages, added imports, modified functions, and fixed …
shanicky Dec 5, 2023
877a00d
Code changes: remove `ColIndexMapping` usage, replace with `None`, up…
shanicky Dec 5, 2023
da0899d
move cycle check back to fe
shanicky Dec 7, 2023
4252e6d
huge: use AddAndUpdate mutation
shanicky Dec 7, 2023
36049dd
Refactored return types and statements.
shanicky Dec 7, 2023
4dcc103
big change
shanicky Dec 7, 2023
703cfd6
fix conflict
shanicky Dec 7, 2023
910b473
Added comments and new struct for sink plan in handler files
shanicky Dec 8, 2023
0a05844
Add comment about dropping sink into table not compatible with cascad…
shanicky Dec 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
365 changes: 365 additions & 0 deletions e2e_test/sink/sink_into_table.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,365 @@
statement ok
shanicky marked this conversation as resolved.
Show resolved Hide resolved
SET RW_IMPLICIT_FLUSH TO true;

# currently, we only support single sink into table

statement ok
create table t_simple (v1 int, v2 int);

statement ok
create table m_simple (v1 int primary key, v2 int);

statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

statement error Feature is not yet implemented: create sink into table with incoming sinks
create sink s_simple_2 into m_simple as select v1, v2 from t_simple;

# and we can't alter a table with incoming sinks
statement error Feature is not yet implemented: alter table with incoming sinks
alter table m_simple add column v3 int;

statement ok
drop sink s_simple_1;

statement ok
drop table t_simple;
shanicky marked this conversation as resolved.
Show resolved Hide resolved

statement ok
drop table m_simple;

# drop table with associated sink
statement ok
create table t_simple (v1 int, v2 int);

statement ok
create table m_simple (v1 int primary key, v2 int);

statement ok
create sink s_simple_1 into m_simple as select v1, v2 from t_simple;

statement ok
drop table m_simple;

statement ok
drop table t_simple;

# target table with row_id as primary key
statement ok
create table t_s1 (v1 int, v2 int);

statement ok
insert into t_s1 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_row_id_as_primary_key (v1 int, v2 int, v3 int default 1000, v4 int as v1 + v2);

statement error Only append-only sinks can sink to a table without primary keys.
create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1;

statement ok
create sink s1 into t_row_id_as_primary_key as select v1, v2 from t_s1 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query IIII rowsort
select * from t_row_id_as_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s1 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_row_id_as_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
insert into t_row_id_as_primary_key values (100, 100);

query IIII
select * from t_row_id_as_primary_key order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22
100 100 1000 200

# test append only
statement ok
update t_s1 set v2 = 10 where v1 > 3;

query IIII
select * from t_row_id_as_primary_key order by v1, v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
delete from t_s1;

query IIII
select * from t_row_id_as_primary_key order by v1,v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
drop sink s1;

statement ok
drop table t_row_id_as_primary_key;

statement ok
drop table t_s1;


# target table with append only

statement ok
create table t_s2 (v1 int, v2 int);

statement ok
insert into t_s2 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_append_only (v1 int, v2 int, v3 int default 1000, v4 int as v1 + v2) append only;

statement error Only append-only sinks can sink to a table without primary keys.
create sink s2 into t_append_only as select v1, v2 from t_s2;

statement ok
create sink s2 into t_append_only as select v1, v2 from t_s2 with (type = 'append-only', force_append_only = 'true');

statement ok
flush;

query IIII rowsort
select * from t_append_only;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s2 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_append_only;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
insert into t_append_only values (100, 100);

query IIII
select * from t_append_only order by v1;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22
100 100 1000 200

# test append only
statement ok
update t_s2 set v2 = 10 where v1 > 3;

query IIII
select * from t_append_only order by v1, v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
delete from t_s2;

query IIII
select * from t_append_only order by v1,v2;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 10 1000 14
4 14 1000 18
5 10 1000 15
5 15 1000 20
6 10 1000 16
6 16 1000 22
100 100 1000 200

statement ok
drop sink s2;

statement ok
drop table t_append_only;

statement ok
drop table t_s2;


# target table with primary key

statement ok
create table t_s3 (v1 int, v2 int);

statement ok
insert into t_s3 values (1, 11), (2, 12), (3, 13);

statement ok
create table t_primary_key (v1 int primary key, v2 int, v3 int default 1000, v4 int as v1 + v2);

statement ok
create sink s3 into t_primary_key as select v1, v2 from t_s3;

statement ok
flush;

query IIII rowsort
select * from t_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
insert into t_s3 values (4, 14), (5, 15), (6, 16);

query IIII rowsort
select * from t_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16
4 14 1000 18
5 15 1000 20
6 16 1000 22

statement ok
delete from t_s3 where v1 > 3;

query IIII rowsort
select * from t_primary_key;
----
1 11 1000 12
2 12 1000 14
3 13 1000 16

statement ok
update t_s3 set v2 = 111 where v1 = 1;

query IIII rowsort
select * from t_primary_key;
----
1 111 1000 112
2 12 1000 14
3 13 1000 16

statement ok
insert into t_primary_key values (100, 100);

query IIII
select * from t_primary_key order by v1;
----
1 111 1000 112
2 12 1000 14
3 13 1000 16
100 100 1000 200

statement ok
delete from t_s3 where v1 > 3;

query IIII
select * from t_primary_key order by v1;
----
1 111 1000 112
2 12 1000 14
3 13 1000 16
100 100 1000 200

statement ok
drop sink s3;

statement ok
drop table t_primary_key;

statement ok
drop table t_s3;

# cycle check

statement ok
create table t_a(v int primary key);

statement ok
create table t_b(v int primary key);

statement ok
create table t_c(v int primary key);

statement ok
create sink s_a into t_b as select v from t_a;

statement ok
create sink s_b into t_c as select v from t_b;

statement error Creating such a sink will result in circular dependency
create sink s_c into t_a as select v from t_c;

statement ok
drop sink s_a;

statement ok
drop sink s_b;

statement ok
drop table t_a;

statement ok
drop table t_b;

statement ok
drop table t_c;
6 changes: 6 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ message Sink {
string sink_from_name = 18;
StreamJobStatus stream_job_status = 19;
SinkFormatDesc format_desc = 20;

// Target table id (only applicable for table sink)
optional uint32 target_table = 21;
shanicky marked this conversation as resolved.
Show resolved Hide resolved
}

message Connection {
Expand Down Expand Up @@ -287,6 +290,9 @@ message Table {
// This field is used to store the description set by the `comment on` clause.
optional string description = 33;

// This field is used to mark the the sink into this table.
repeated uint32 incoming_sinks = 34;

// Per-table catalog version, used by schema change. `None` for internal tables and tests.
// Not to be confused with the global catalog version for notification service.
TableVersion version = 100;
Expand Down
1 change: 1 addition & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message SinkParam {
string db_name = 5;
string sink_from_name = 6;
catalog.SinkFormatDesc format_desc = 7;
optional uint32 target_table = 8;
}

enum SinkPayloadFormat {
Expand Down
Loading
Loading