Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support format plain encode avro for kafka #17216

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/my
echo "--- preparing postgresql"

# set up PG sink destination
apt-get -y install postgresql-client
apt-get -y install postgresql-client jq
export PGPASSWORD=postgres
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';"
createdb -h db -U postgres test
Expand Down
118 changes: 118 additions & 0 deletions e2e_test/sink/kafka/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,126 @@ create sink sink0 from into_kafka with (
format upsert encode avro (
schema.registry = 'http://schemaregistry:8082');

system ok
rpk topic create test-rw-sink-plain-avro

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value/versions'
{
"type": "record",
"name": "Simple",
"fields": [
{
"name": "int32_field",
"type": ["null", "int"]
},
{
"name": "string_field",
"type": ["null", "string"]
}
]
}
EOF

statement ok
create table from_kafka_plain
include key as raw_key
with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092')
format plain encode avro (
schema.registry = 'http://schemaregistry:8082');

statement ok
create sink sink_plain_key_none as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092')
format plain encode avro (
force_append_only = true,
schema.registry = 'http://schemaregistry:8082');

statement ok
create sink sink_plain_key_text as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field')
format plain encode avro (
force_append_only = true,
schema.registry = 'http://schemaregistry:8082')
key encode text;

system ok
jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key/versions'
{
"type": "record",
"name": "Key",
"fields": [
{
"name": "int32_field",
"type": ["null", "int"]
}
]
}
EOF

statement ok
create sink sink_plain_key_avro as select int32_field, string_field from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-plain-avro',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'int32_field')
format plain encode avro (
force_append_only = true,
schema.registry = 'http://schemaregistry:8082');

sleep 2s

query ITT
select
int32_field,
string_field,
case when octet_length(raw_key) > 5 and substring(raw_key from 1 for 1) = '\x00'
then substring(raw_key from 6) -- skip indeterministic confluent schema registry header
else raw_key end
from from_kafka_plain order by string_field, raw_key;
----
22 Rising \x022c
22 Rising \x3232
22 Rising NULL
11 Wave \x0216
11 Wave \x3131
11 Wave NULL

statement ok
drop sink sink_plain_key_avro;

statement ok
drop sink sink_plain_key_text;

statement ok
drop sink sink_plain_key_none;

statement ok
drop table from_kafka_plain;

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key?permanent=true'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value'

system ok
curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value?permanent=true'

system ok
rpk topic delete test-rw-sink-plain-avro

query TTTRRIITTTTTTTT
select
bool_field,
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub enum SinkFormatterImpl {
// append-only
AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
AppendOnlyTextJson(AppendOnlyFormatter<TextEncoder, JsonEncoder>),
AppendOnlyAvro(AppendOnlyFormatter<AvroEncoder, AvroEncoder>),
AppendOnlyTextAvro(AppendOnlyFormatter<TextEncoder, AvroEncoder>),
AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
AppendOnlyTextProto(AppendOnlyFormatter<TextEncoder, ProtoEncoder>),
AppendOnlyTemplate(AppendOnlyFormatter<TemplateEncoder, TemplateEncoder>),
Expand Down Expand Up @@ -335,6 +337,10 @@ impl SinkFormatterImpl {
Impl::AppendOnlyTextJson(build(p).await?)
}
(F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?),
(F::AppendOnly, E::Avro, Some(E::Text)) => {
Impl::AppendOnlyTextAvro(build(p).await?)
}
(F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?),
(F::AppendOnly, E::Protobuf, Some(E::Text)) => {
Impl::AppendOnlyTextProto(build(p).await?)
}
Expand Down Expand Up @@ -381,6 +387,8 @@ macro_rules! dispatch_sink_formatter_impl {
match $impl {
SinkFormatterImpl::AppendOnlyJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
SinkFormatterImpl::AppendOnlyAvro($name) => $body,
SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
SinkFormatterImpl::AppendOnlyProto($name) => $body,
SinkFormatterImpl::AppendOnlyTextProto($name) => $body,

Expand All @@ -403,6 +411,8 @@ macro_rules! dispatch_sink_formatter_str_key_impl {
match $impl {
SinkFormatterImpl::AppendOnlyJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextJson($name) => $body,
SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(),
SinkFormatterImpl::AppendOnlyTextAvro($name) => $body,
SinkFormatterImpl::AppendOnlyProto($name) => $body,
SinkFormatterImpl::AppendOnlyTextProto($name) => $body,

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
Format::Plain => vec![Encode::Json],
),
KafkaSink::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Protobuf],
Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Debezium => vec![Encode::Json],
),
Expand Down
Loading