diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index d7b7821f1765d..360691897b9d3 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -11,6 +11,7 @@ rpk topic create test-rw-sink-upsert-schema rpk topic create test-rw-sink-debezium rpk topic create test-rw-sink-without-snapshot rpk topic create test-rw-sink-text-key-id +rpk topic create test-rw-sink-bytes-key-id sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt' sleep 2 diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 7f589e4a4b231..e9cb950315172 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -146,7 +146,7 @@ create sink invalid_pk_column from t_kafka with ( ### Test sink with key encode ### -statement error sink key encode unsupported: JSON, only TEXT supported +statement error sink key encode unsupported: JSON, only TEXT and BYTES supported create sink sink_text_error from t_kafka with ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', @@ -188,6 +188,55 @@ format plain encode json ( force_append_only='true' ) key encode text ; +statement error sink key encode unsupported: JSON, only TEXT and BYTES supported +create sink sink_bytes_error as ( + select int8send(id) as id_bytes, * from t_kafka +) with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', + primary_key = 'id_bytes') +format plain encode json ( + force_append_only='true' +) key encode json; + +statement error +# The key encode is BYTES, but the primary key has 2 columns. The key encode BYTES requires the primary key to be a single column +create sink sink_bytes_error as ( + select int8send(id) as id_bytes, '\x1234'::bytea as other_bytea, * from t_kafka +) with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', + primary_key = 'id_bytes, other_bytea') +format plain encode json ( + force_append_only='true' +) key encode json; + +statement error key encode bytes only works with kafka connector, but found kinesis +create sink sink_bytes_json as ( + select int8send(id) as id_bytes, * from t_kafka +) with ( + connector = 'kinesis', + topic = 'topic', + properties.bootstrap.server = 'message_queue:29092' +) +format plain encode json ( + force_append_only='true' +) key encode bytes; + +statement ok +create sink sink_bytes_json as ( + select int8send(id) as id_bytes, * from t_kafka +) with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', + primary_key = 'id_bytes') +format plain encode json ( + force_append_only='true' +) key encode bytes; + statement ok create table t_sink_text_id (id int) include key as rw_key @@ -197,6 +246,15 @@ with ( topic = 'test-rw-sink-text-key-id', ) format plain encode json; +statement ok +create table t_sink_bytea_id (id int) +include key as rw_key +with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', +) format plain encode json; + #====== statement ok @@ -244,11 +302,43 @@ select rw_key from t_sink_text_id order by rw_key \x36 \x37 +query T +select rw_key from t_sink_bytea_id order by rw_key +---- +\x0000000000000001 +\x0000000000000002 +\x0000000000000003 +\x0000000000000004 +\x0000000000000005 +\x0000000000000006 +\x0000000000000007 + + statement ok insert into t_kafka values (8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), (9, 'nwRq4zejSQ', 10028, 20090, 24837, 20699.559, 11615.276406159757, '2023-04-13 12:40:42.487742', '\xDEADBABE', '05:01:00.123456', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), (10, '0oVqRIHqkb', 26951, 20674, 20674, 19387.238, 9042.404483827515, '2023-04-13 16:40:58.888742', '\x00', '05:01:00.1234567', '1970-01-01', '00:00:01.123456', '1970-01-01 00:00:00.123456'::timestamptz, '{}'); +query T +select rw_key from t_sink_bytea_id order by rw_key +---- + \x0000000000000001 + \x0000000000000002 + \x0000000000000003 + \x0000000000000004 + \x0000000000000005 + \x0000000000000006 + \x0000000000000007 + \x0000000000000008 + \x0000000000000009 + \x000000000000000a + statement ok drop table t_sink_text_id; + +statement ok +drop table t_sink_bytea_id; + +statement ok +drop sink t_sink_bytea_id; diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 4b6327364f632..e3620fe5a5be6 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -263,10 +263,10 @@ impl TryFrom for SinkFormatDesc { } }; let key_encode = match &value.key_encode() { + E::Bytes => Some(SinkEncode::Bytes), E::Text => Some(SinkEncode::Text), E::Unspecified => None, encode @ (E::Avro - | E::Bytes | E::Csv | E::Json | E::Protobuf