Skip to content

Commit

Permalink
feat(sink): support format plain encode avro for kafka (#17216)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Jun 14, 2024
1 parent 74ca765 commit ac7abd9
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 2 deletions.
2 changes: 1 addition & 1 deletion ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/my
echo "--- preparing postgresql"

# set up PG sink destination
apt-get -y install postgresql-client
apt-get -y install postgresql-client jq
export PGPASSWORD=postgres
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';"
createdb -h db -U postgres test
Expand Down
118 changes: 118 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,126 @@ create sink sink0 from into_kafka with (
format upsert encode avro (
schema.registry = 'http://schemaregistry:8082');

system ok
rpk topic create test-rw-sink-plain-avro

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value/versions'
{
"type": "record",
"name": "Simple",
"fields": [
{
"name": "int32_field",
"type": ["null", "int"]
},
{
"name": "string_field",
"type": ["null", "string"]
}
]
}
EOF

statement ok
create table from_kafka_plain
include key as raw_key
with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092')
format plain encode avro (
schema.registry = 'http://schemaregistry:8082');

statement ok
create sink sink_plain_key_none as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092')
format plain encode avro (
force_append_only = true,
schema.registry = 'http://schemaregistry:8082');

statement ok
create sink sink_plain_key_text as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field')
format plain encode avro (
force_append_only = true,
schema.registry = 'http://schemaregistry:8082')
key encode text;

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key/versions'
{
"type": "record",
"name": "Key",
"fields": [
{
"name": "int32_field",
"type": ["null", "int"]
}
]
}
EOF

statement ok
create sink sink_plain_key_avro as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field')
format plain encode avro (
force_append_only = true,
schema.registry = 'http://schemaregistry:8082');

sleep 2s

query ITT
select
int32_field,
string_field,
case when octet_length(raw_key) > 5 and substring(raw_key from 1 for 1) = '\x00'
then substring(raw_key from 6) -- skip indeterministic confluent schema registry header
else raw_key end
from from_kafka_plain order by string_field, raw_key;
----
22 Rising \x022c
22 Rising \x3232
22 Rising NULL
11 Wave \x0216
11 Wave \x3131
11 Wave NULL

statement ok
drop sink sink_plain_key_avro;

statement ok
drop sink sink_plain_key_text;

statement ok
drop sink sink_plain_key_none;

statement ok
drop table from_kafka_plain;

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key?permanent=true'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value?permanent=true'

system ok
rpk topic delete test-rw-sink-plain-avro

query TTTRRIITTTTTTTT
select
bool_field,
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum SinkFormatterImpl {
// append-only
AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
Expand Down Expand Up @@ -335,6 +337,10 @@ impl SinkFormatterImpl {
Impl::AppendOnlyTextJson(build(p).await?)
}
(F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
(F::AppendOnly, E::Avro, Some(E::Text)) => {
Impl::AppendOnlyTextAvro(build(p).await?)
}
(F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
(F::AppendOnly, E::Protobuf, Some(E::Text)) => {
Impl::AppendOnlyTextProto(build(p).await?)
}
Expand Down Expand Up @@ -381,6 +387,8 @@ macro_rules! dispatch_sink_formatter_impl {
match $impl {
SinkFormatterImpl::AppendOnlyJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
SinkFormatterImpl::AppendOnlyAvro($name) => $body,
SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
SinkFormatterImpl::AppendOnlyProto($name) => $body,
SinkFormatterImpl::AppendOnlyTextProto($name) => $body,

Expand All @@ -403,6 +411,8 @@ macro_rules! dispatch_sink_formatter_str_key_impl {
match $impl {
SinkFormatterImpl::AppendOnlyJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
SinkFormatterImpl::AppendOnlyProto($name) => $body,
SinkFormatterImpl::AppendOnlyTextProto($name) => $body,

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
Format::Plain => vec![Encode::Json],
),
KafkaSink::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf],
Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Debezium => vec![Encode::Json],
),
Expand Down

0 comments on commit ac7abd9

Please sign in to comment.