Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow configure other additional columns for connectors #14215

Merged
merged 24 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ aws-types = "1"
etcd-client = { package = "madsim-etcd-client", version = "0.4" }
futures-async-stream = "0.2.9"
hytra = "0.1"
rdkafka = { package = "madsim-rdkafka", version = "0.3.0", features = [
rdkafka = { package = "madsim-rdkafka", version = "0.3.1", features = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to change this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @wangrunji0408 does this, to support some API in madsim.

"cmake-build",
] }
hashbrown = { version = "0.14.0", features = [
Expand Down
10 changes: 8 additions & 2 deletions e2e_test/s3/json_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def do_test(client, config, N, prefix):
name TEXT,
sex int,
mark int,
) WITH (
)
include file as file_col
WITH (
connector = 's3',
match_pattern = '{prefix}*.json',
s3.region_name = '{config['S3_REGION']}',
Expand Down Expand Up @@ -60,14 +62,18 @@ def do_test(client, config, N, prefix):
cur.execute(
'select count(*), sum(id), sum(sex), sum(mark) from s3_test_jsonfile')
result = cur.fetchone()

print(result)

assert result[0] == total_row
assert result[1] == int(((N - 1) * N / 2))
assert result[2] == int(N / 2)
assert result[3] == 0

cur.execute('select file_col from s3_test_jsonfile')
result = cur.fetchone()
file_col = result[0]
print(file_col)

cur.execute('drop table s3_test_jsonfile')

cur.close()
Expand Down
42 changes: 42 additions & 0 deletions e2e_test/source/basic/inlcude_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,25 @@ WITH (
topic = 'upsert_json')
FORMAT PLAIN ENCODE JSON

statement ok
create table additional_columns (a int)
include key as key_col
include partition as partition_col
include offset as offset_col
include timestamp as timestamp_col
include header as header_col
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
WITH (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'kafka_additional_columns')
FORMAT PLAIN ENCODE JSON

statement ok
select * from upsert_students_default_key;

statement ok
select * from additional_columns;
st1page marked this conversation as resolved.
Show resolved Hide resolved

# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 3s

Expand All @@ -59,5 +75,31 @@ select count(rw_key) from upsert_students_default_key
----
15

query I
SELECT count(*)
FROM additional_columns
WHERE key_col IS NOT NULL
AND partition_col IS NOT NULL
AND offset_col IS NOT NULL
AND timestamp_col IS NOT NULL
AND header_col IS NOT NULL
----
101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know what count to expect here 🤔

Where can I find the input data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done

it will generate message like

key payload header
key1 {"a": 1} [(header1, v1), (header2, v2)]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May mention this in comment to avoid confusion


# the input data is from scripts/source/prepare_ci_kafka.sh
# ```
# for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done
# ```
# The command generates 101 messages with key `key0` to `key100` and value `{"a": 0}` to `{"a": 100}`, with fixed headers `header1=v1` and `header2=v2`.

query TT
SELECT (header_col[1]).key AS key, (header_col[1]).value::text AS value
FROM additional_columns limit 1;
----
header1 \x7631
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

statement ok
drop table upsert_students_default_key

statement ok
drop table additional_columns
24 changes: 23 additions & 1 deletion e2e_test/source/basic/kafka_batch.slt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ create source s9 (id bytea) with (
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE BYTES

statement ok
create source s10 (v1 int, v2 varchar)
include timestamp as some_ts
with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON

query IT rowsort
select * from s1
----
Expand Down Expand Up @@ -219,8 +229,20 @@ select id from s9 order by id
\x6b6b
\x776561776566776566

# query from a kafka timestamp column with alias
query IT rowsort
select v1, v2 from s10 where some_ts > '1977-01-01 00:00:00+00:00'
----
1 1
2 22
3 333
4 4444

statement ok
drop table s8

statement ok
drop source s9
drop source s9

statement ok
drop source s10
4 changes: 4 additions & 0 deletions scripts/source/prepare_ci_kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ for filename in $kafka_data_files; do
) &
done

# test additional columns: produce messages with headers
ADDI_COLUMN_TOPIC="kafka_additional_columns"
for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done

# write schema with name strategy

## topic: upsert_avro_json-record, key subject: string, value subject: CPLM.OBJ_ATTRIBUTE_VALUE
Expand Down
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
aws-smithy-runtime-api = { workspace = true }
aws-smithy-types = { workspace = true }
aws-smithy-types-convert = { version = "0.60.1", features = ["convert-chrono"] }
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
aws-types = { workspace = true }
base64 = "0.21"
byteorder = "1"
Expand Down
Loading
Loading