-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow configure other additional columns for connectors #14215
Changes from 21 commits
79807e6
54d60b8
3a89125
9b4dd37
a98bcf4
3f96e4d
7107acf
587fb35
64552ed
b1572b4
512b8d3
dbaa602
dcd79dd
77b5b65
134f571
600300a
1df7136
09c654c
671f365
b60ba68
a719ee4
ce7adff
5c62b5a
c6ad345
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -48,9 +48,25 @@ WITH ( | |||||||||
topic = 'upsert_json') | ||||||||||
FORMAT PLAIN ENCODE JSON | ||||||||||
|
||||||||||
statement ok | ||||||||||
create table additional_columns (a int) | ||||||||||
include key as key_col | ||||||||||
include partition as partition_col | ||||||||||
include offset as offset_col | ||||||||||
include timestamp as timestamp_col | ||||||||||
include header as header_col | ||||||||||
tabVersion marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
WITH ( | ||||||||||
connector = 'kafka', | ||||||||||
properties.bootstrap.server = 'message_queue:29092', | ||||||||||
topic = 'kafka_additional_columns') | ||||||||||
FORMAT PLAIN ENCODE JSON | ||||||||||
|
||||||||||
statement ok | ||||||||||
select * from upsert_students_default_key; | ||||||||||
|
||||||||||
statement ok | ||||||||||
select * from additional_columns; | ||||||||||
st1page marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
# Wait enough time to ensure SourceExecutor consumes all Kafka data. | ||||||||||
sleep 3s | ||||||||||
|
||||||||||
|
@@ -59,5 +75,31 @@ select count(rw_key) from upsert_students_default_key | |||||||||
---- | ||||||||||
15 | ||||||||||
|
||||||||||
query I | ||||||||||
SELECT count(*) | ||||||||||
FROM additional_columns | ||||||||||
WHERE key_col IS NOT NULL | ||||||||||
AND partition_col IS NOT NULL | ||||||||||
AND offset_col IS NOT NULL | ||||||||||
AND timestamp_col IS NOT NULL | ||||||||||
AND header_col IS NOT NULL | ||||||||||
---- | ||||||||||
101 | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we know what count to expect here 🤔 Where can I find the input data There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
it will generate message like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May mention this in comment to avoid confusion |
||||||||||
|
||||||||||
# the input data is from scripts/source/prepare_ci_kafka.sh | ||||||||||
# ``` | ||||||||||
# for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done | ||||||||||
# ``` | ||||||||||
# The command generates 101 messages with key `key0` to `key100` and value `{"a": 0}` to `{"a": 100}`, with fixed headers `header1=v1` and `header2=v2`. | ||||||||||
|
||||||||||
query TT | ||||||||||
SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value | ||||||||||
FROM additional_columns limit 1; | ||||||||||
---- | ||||||||||
header1 \x7631 | ||||||||||
tabVersion marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
statement ok | ||||||||||
drop table upsert_students_default_key | ||||||||||
|
||||||||||
statement ok | ||||||||||
drop table additional_columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why to change this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @wangrunji0408 does this, to support some API in madsim.