diff --git a/ci/scripts/e2e-sink-test.sh b/ci/scripts/e2e-sink-test.sh index 5125ddedfdf8e..eaee563c7a992 100755 --- a/ci/scripts/e2e-sink-test.sh +++ b/ci/scripts/e2e-sink-test.sh @@ -45,7 +45,7 @@ mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/my echo "--- preparing postgresql" # set up PG sink destination -apt-get -y install postgresql-client +apt-get -y install postgresql-client jq export PGPASSWORD=postgres psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';" createdb -h db -U postgres test diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index 1cf27b811d9be..9bb9e3395a34e 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -42,8 +42,126 @@ create sink sink0 from into_kafka with ( format upsert encode avro ( schema.registry = 'http://schemaregistry:8082'); +system ok +rpk topic create test-rw-sink-plain-avro + +system ok +jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value/versions' +{ + "type": "record", + "name": "Simple", + "fields": [ + { + "name": "int32_field", + "type": ["null", "int"] + }, + { + "name": "string_field", + "type": ["null", "string"] + } + ] +} +EOF + +statement ok +create table from_kafka_plain +include key as raw_key +with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092') +format plain encode avro ( + schema.registry = 'http://schemaregistry:8082'); + +statement ok +create sink sink_plain_key_none as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092') +format plain encode avro ( + force_append_only = true, + schema.registry = 'http://schemaregistry:8082'); + +statement ok +create sink sink_plain_key_text as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field') +format plain encode avro ( + force_append_only = true, + schema.registry = 'http://schemaregistry:8082') +key encode text; + +system ok +jq '{"schema": tojson}' << EOF | curl -X POST -H 'content-type: application/json' -d @- 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key/versions' +{ + "type": "record", + "name": "Key", + "fields": [ + { + "name": "int32_field", + "type": ["null", "int"] + } + ] +} +EOF + +statement ok +create sink sink_plain_key_avro as select int32_field, string_field from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-plain-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field') +format plain encode avro ( + force_append_only = true, + schema.registry = 'http://schemaregistry:8082'); + sleep 2s +query ITT +select + int32_field, + string_field, + case when octet_length(raw_key) > 5 and substring(raw_key from 1 for 1) = '\x00' + then substring(raw_key from 6) -- skip indeterministic confluent schema registry header + else raw_key end +from from_kafka_plain order by string_field, raw_key; +---- +22 Rising \x022c +22 Rising \x3232 +22 Rising NULL +11 Wave \x0216 +11 Wave \x3131 +11 Wave NULL + +statement ok +drop sink sink_plain_key_avro; + +statement ok +drop sink sink_plain_key_text; + +statement ok +drop sink sink_plain_key_none; + +statement ok +drop table from_kafka_plain; + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-key?permanent=true' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value' + +system ok +curl -X DELETE 'http://schemaregistry:8082/subjects/test-rw-sink-plain-avro-value?permanent=true' + +system ok +rpk topic delete test-rw-sink-plain-avro + query TTTRRIITTTTTTTT select bool_field, diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 6239a55d862f4..4628a925da98d 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -73,6 +73,8 @@ pub enum SinkFormatterImpl { // append-only AppendOnlyJson(AppendOnlyFormatter), AppendOnlyTextJson(AppendOnlyFormatter), + AppendOnlyAvro(AppendOnlyFormatter), + AppendOnlyTextAvro(AppendOnlyFormatter), AppendOnlyProto(AppendOnlyFormatter), AppendOnlyTextProto(AppendOnlyFormatter), AppendOnlyTemplate(AppendOnlyFormatter), @@ -335,6 +337,10 @@ impl SinkFormatterImpl { Impl::AppendOnlyTextJson(build(p).await?) } (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?), + (F::AppendOnly, E::Avro, Some(E::Text)) => { + Impl::AppendOnlyTextAvro(build(p).await?) + } + (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?), (F::AppendOnly, E::Protobuf, Some(E::Text)) => { Impl::AppendOnlyTextProto(build(p).await?) } @@ -381,6 +387,8 @@ macro_rules! dispatch_sink_formatter_impl { match $impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, SinkFormatterImpl::AppendOnlyTextJson($name) => $body, + SinkFormatterImpl::AppendOnlyAvro($name) => $body, + SinkFormatterImpl::AppendOnlyTextAvro($name) => $body, SinkFormatterImpl::AppendOnlyProto($name) => $body, SinkFormatterImpl::AppendOnlyTextProto($name) => $body, @@ -403,6 +411,8 @@ macro_rules! dispatch_sink_formatter_str_key_impl { match $impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, SinkFormatterImpl::AppendOnlyTextJson($name) => $body, + SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(), + SinkFormatterImpl::AppendOnlyTextAvro($name) => $body, SinkFormatterImpl::AppendOnlyProto($name) => $body, SinkFormatterImpl::AppendOnlyTextProto($name) => $body, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index ce85c864e196e..f6b6e5f563d2d 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -816,7 +816,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), KafkaSink::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Json, Encode::Protobuf], + Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf], Format::Upsert => vec![Encode::Json, Encode::Avro], Format::Debezium => vec![Encode::Json], ),