Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 7, 2024
1 parent 6e4d8d6 commit aa93bb1
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
1 change: 1 addition & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rpk topic create test-rw-sink-upsert-schema
rpk topic create test-rw-sink-debezium
rpk topic create test-rw-sink-without-snapshot
rpk topic create test-rw-sink-text-key-id
rpk topic create test-rw-sink-bytes-key-id

sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt'
sleep 2
Expand Down
92 changes: 91 additions & 1 deletion e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ create sink invalid_pk_column from t_kafka with (

### Test sink with key encode ###

statement error sink key encode unsupported: JSON, only TEXT supported
statement error sink key encode unsupported: JSON, only TEXT and BYTES supported
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -188,6 +188,55 @@ format plain encode json (
force_append_only='true'
) key encode text ;

statement error sink key encode unsupported: JSON, only TEXT and BYTES supported
create sink sink_bytes_error as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes')
format plain encode json (
force_append_only='true'
) key encode json;

statement error
# The key encode is BYTES, but the primary key has 2 columns. The key encode BYTES requires the primary key to be a single column
create sink sink_bytes_error as (
select int8send(id) as id_bytes, '\x1234'::bytea as other_bytea, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes, other_bytea')
format plain encode json (
force_append_only='true'
) key encode json;

statement error key encode bytes only works with kafka connector, but found kinesis
create sink sink_bytes_json as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kinesis',
topic = 'topic',
properties.bootstrap.server = 'message_queue:29092'
)
format plain encode json (
force_append_only='true'
) key encode bytes;

statement ok
create sink sink_bytes_json as (
select int8send(id) as id_bytes, * from t_kafka
) with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
primary_key = 'id_bytes')
format plain encode json (
force_append_only='true'
) key encode bytes;

statement ok
create table t_sink_text_id (id int)
include key as rw_key
Expand All @@ -197,6 +246,15 @@ with (
topic = 'test-rw-sink-text-key-id',
) format plain encode json;

statement ok
create table t_sink_bytea_id (id int)
include key as rw_key
with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
topic = 'test-rw-sink-bytes-key-id',
) format plain encode json;

#======

statement ok
Expand Down Expand Up @@ -244,11 +302,43 @@ select rw_key from t_sink_text_id order by rw_key
\x36
\x37

query T
select rw_key from t_sink_bytea_id order by rw_key
----
\x0000000000000001
\x0000000000000002
\x0000000000000003
\x0000000000000004
\x0000000000000005
\x0000000000000006
\x0000000000000007


statement ok
insert into t_kafka values
(8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(9, 'nwRq4zejSQ', 10028, 20090, 24837, 20699.559, 11615.276406159757, '2023-04-13 12:40:42.487742', '\xDEADBABE', '05:01:00.123456', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'),
(10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}');

query T
select rw_key from t_sink_bytea_id order by rw_key
----
\x0000000000000001
\x0000000000000002
\x0000000000000003
\x0000000000000004
\x0000000000000005
\x0000000000000006
\x0000000000000007
\x0000000000000008
\x0000000000000009
\x000000000000000a

statement ok
drop table t_sink_text_id;

statement ok
drop table t_sink_bytea_id;

statement ok
drop sink t_sink_bytea_id;
2 changes: 1 addition & 1 deletion src/connector/src/sink/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ impl TryFrom<PbSinkFormatDesc> for SinkFormatDesc {
}
};
let key_encode = match &value.key_encode() {
E::Bytes => Some(SinkEncode::Bytes),
E::Text => Some(SinkEncode::Text),
E::Unspecified => None,
encode @ (E::Avro
| E::Bytes
| E::Csv
| E::Json
| E::Protobuf
Expand Down

0 comments on commit aa93bb1

Please sign in to comment.