Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce include clause to add additional connector columns #13707

Merged
merged 60 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
648dce1
stash
tabVersion Nov 29, 2023
16f451a
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Nov 29, 2023
5b65a4e
fix compilation
xxchan Nov 29, 2023
37d8192
stash
tabVersion Nov 29, 2023
05147aa
Merge branch 'tab/include-opts' of https://github.com/singularity-dat…
tabVersion Nov 29, 2023
d7c39c4
feat: Refactor source and table creation handling
tabVersion Nov 29, 2023
922fa1c
add new field in ColumnDesc
tabVersion Nov 30, 2023
7b268cc
bind_source_pk
tabVersion Nov 30, 2023
842d722
add license
tabVersion Nov 30, 2023
bbf74fb
Merge branch 'main' into tab/include-opts
tabVersion Dec 2, 2023
c31f624
feat: Implement additional column validation for various formats
tabVersion Dec 4, 2023
981a74c
stash
tabVersion Dec 4, 2023
90c1d92
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 4, 2023
ebfe20b
format
tabVersion Dec 4, 2023
30c5768
stash
tabVersion Dec 5, 2023
0ce99cc
fix
tabVersion Dec 5, 2023
c67f138
compatible with prev version
tabVersion Dec 6, 2023
cc94cc8
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 6, 2023
f4c4e44
merge fix
tabVersion Dec 6, 2023
4f6bca2
change parser trait, stash
tabVersion Dec 7, 2023
2d0ad74
rerun
tabVersion Dec 7, 2023
6fb5922
stash
tabVersion Dec 10, 2023
383e844
Revert "change parser trait, stash"
tabVersion Dec 10, 2023
64e6656
fix
tabVersion Dec 10, 2023
b2cafab
stash
tabVersion Dec 10, 2023
ac03436
refactor: Refactor and standardize the `access_field` function and im…
tabVersion Dec 11, 2023
fece45b
Refactor parsing logic and imports in connector code
tabVersion Dec 11, 2023
74c3f86
stash
tabVersion Dec 11, 2023
67a14e5
Merge branch 'main' into tab/include-opts
tabVersion Dec 12, 2023
1a9653f
fix
tabVersion Dec 12, 2023
f45a0a9
fix
tabVersion Dec 12, 2023
7650a9f
fix e2e
tabVersion Dec 13, 2023
104a478
fix e2e
tabVersion Dec 13, 2023
4926b84
format
tabVersion Dec 13, 2023
00b1ae8
fix test
tabVersion Dec 13, 2023
33c1354
fix test
tabVersion Dec 14, 2023
32f13eb
Merge branch 'main' into tab/include-opts
tabVersion Dec 14, 2023
c71f49b
pre delete json parser
tabVersion Dec 14, 2023
30f9516
remove unwrap array for kafka
tabVersion Dec 15, 2023
4c05e90
feat: Refactor handling of exempted connectors and addition columns
tabVersion Dec 15, 2023
8c2ca18
fix
tabVersion Dec 15, 2023
0377f84
fix
tabVersion Dec 15, 2023
131faf5
fix
tabVersion Dec 15, 2023
285e3a2
fix
tabVersion Dec 16, 2023
6c797b7
fix
tabVersion Dec 16, 2023
4f8600c
format
tabVersion Dec 16, 2023
ec06a13
fix broker addr
tabVersion Dec 16, 2023
3c9b563
refactor: Refactor primary key definitions in test database
tabVersion Dec 18, 2023
bdaabc4
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 18, 2023
4c70a6b
fix
tabVersion Dec 18, 2023
9904da0
remove legacy avro behavior
tabVersion Dec 18, 2023
a4c6ba6
refactor: Refactor additional columns handling and imports
tabVersion Dec 19, 2023
4da8dbf
change additionalColumn_type to Normal rather than Unspecified
tabVersion Dec 19, 2023
d38e038
add version for column_desc
tabVersion Dec 19, 2023
334f6c0
Merge remote-tracking branch 'origin' into tab/include-opts
tabVersion Dec 19, 2023
be6d28e
resolve comments
tabVersion Dec 19, 2023
c2c3642
resolve comments
tabVersion Dec 19, 2023
412e4bc
refactor: Refactor error messages and handling for source creation wi…
tabVersion Dec 19, 2023
8e72517
fix
tabVersion Dec 19, 2023
46df0fc
Merge branch 'main' into tab/include-opts
tabVersion Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
statement ok
create table from_kafka with (
create table from_kafka ( primary key (some_key) )
include key as some_key
with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-avro',
properties.bootstrap.server = 'message_queue:29092')
Expand Down
63 changes: 63 additions & 0 deletions e2e_test/source/basic/inlcude_key_as.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# upsert format must have a pk
statement error
CREATE TABLE upsert_students_default_key (
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
FORMAT UPSERT ENCODE JSON

# upsert format pk must be the key column
statement error
CREATE TABLE upsert_students_default_key (
"ID" INT primary key,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
FORMAT UPSERT ENCODE JSON

statement ok
CREATE TABLE upsert_students_default_key (
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL,
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
FORMAT PLAIN ENCODE JSON

statement ok
select * from upsert_students_default_key;

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 3s

query I
select count(rw_key) from upsert_students_default_key
----
15

statement ok
drop table upsert_students_default_key
10 changes: 7 additions & 3 deletions e2e_test/source/basic/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,10 @@ CREATE TABLE upsert_students_default_key (
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
weight REAL,
primary key (rw_key)
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand All @@ -425,13 +427,15 @@ FORMAT UPSERT ENCODE JSON

statement ok
CREATE TABLE upsert_students (
"ID" INT PRIMARY KEY,
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
weight REAL,
primary key (rw_key)
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down
20 changes: 6 additions & 14 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,17 @@

# If we cannot extract key schema, use message key as varchar primary key
statement ok
CREATE TABLE upsert_avro_json_default_key ()
CREATE TABLE upsert_avro_json_default_key ( primary key (rw_key) )
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');


# key schema should be a subset of value schema
statement error
CREATE TABLE upsert_student_key_not_subset_of_value_avro_json ()
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_student_key_not_subset_of_value_avro_json')
FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081');


statement ok
CREATE TABLE upsert_student_avro_json ()
CREATE TABLE upsert_student_avro_json ( primary key (rw_key) )
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -68,7 +59,8 @@ CREATE TABLE kafka_json_schema_plain with (
) FORMAT PLAIN ENCODE JSON (schema.registry = 'http://schemaregistry:8082');

statement ok
CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(id))
CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(rw_key))
INCLUDE KEY AS rw_key
with (
connector = 'kafka',
kafka.topic = 'kafka_upsert_json_schema',
Expand Down
47 changes: 5 additions & 42 deletions e2e_test/source/basic/old_row_format_syntax/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -368,33 +368,20 @@ WITH (
topic = 'debezium_mongo_json_customers_no_schema_field')
ROW FORMAT DEBEZIUM_MONGO_JSON

statement ok
CREATE TABLE upsert_students_default_key (
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
)
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
ROW FORMAT UPSERT_JSON

statement ok
CREATE TABLE upsert_students (
"ID" INT PRIMARY KEY,
"ID" INT,
"firstName" VARCHAR,
"lastName" VARCHAR,
age INT,
height REAL,
weight REAL
weight REAL,
primary key (rw_key)
)
INCLUDE KEY AS rw_key
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
properties.bootstrap.server = 'message_queue:29092',
topic = 'upsert_json')
ROW FORMAT UPSERT_JSON

Expand Down Expand Up @@ -682,27 +669,6 @@ ORDER BY
6 Leah Davis 18 5.7 140
9 Jacob Anderson 20 5.8 155

query II
SELECT
"ID",
"firstName",
"lastName",
"age",
"height",
"weight"
FROM
upsert_students_default_key
ORDER BY
"ID";
----
1 Ethan Martinez 18 6.1 180
2 Emily Jackson 19 5.4 110
3 Noah Thompson 21 6.3 195
4 Emma Brown 20 5.3 130
5 Michael Williams 22 6.2 190
6 Leah Davis 18 5.7 140
9 Jacob Anderson 20 5.8 155

query II
select
L_ORDERKEY,
Expand Down Expand Up @@ -791,8 +757,5 @@ DROP TABLE mongo_customers_no_schema_field;
statement ok
DROP TABLE upsert_students;

statement ok
DROP TABLE upsert_students_default_key;

statement ok
drop table dbz_ignore_case_json;
138 changes: 0 additions & 138 deletions e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt

This file was deleted.

4 changes: 3 additions & 1 deletion e2e_test/source/basic/schema_registry.slt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ create table t1 () with (
);

statement ok
create table t1 () with (
create table t1 (primary key(rw_key))
INCLUDE KEY AS rw_key
with (
connector = 'kafka',
topic = 'upsert_avro_json-topic-record',
properties.bootstrap.server = 'message_queue:29092'
Expand Down
14 changes: 14 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ message Field {
string name = 2;
}

enum AdditionalColumnType {
UNSPECIFIED = 0;
KEY = 1;
TIMESTAMP = 2;
PARTITION = 3;
OFFSET = 4;
HEADER = 5;
FILENAME = 6;
}

message ColumnDesc {
data.DataType column_type = 1;
int32 column_id = 2;
Expand All @@ -40,6 +50,10 @@ message ColumnDesc {

// This field is used to store the description set by the `comment on` clause.
optional string description = 8;

// This field is used to represent the connector-spec additional column type.
// UNSPECIFIED or unset for normal column.
AdditionalColumnType additional_column_type = 9;
}

message ColumnCatalog {
Expand Down
Loading
Loading