From c6ed9bb6deb333452e311c3e8ba028ae1ab0f847 Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Tue, 13 Aug 2024 17:20:27 +0800 Subject: [PATCH] feat(sink): allow upsert protobuf with text key (#18024) --- e2e_test/sink/kafka/protobuf.slt | 60 +++++++++++++++++++++++++ src/connector/src/sink/formatter/mod.rs | 7 +++ src/frontend/src/handler/create_sink.rs | 2 +- 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 61a91435567da..0abd242e3c79d 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -28,6 +28,18 @@ format plain encode protobuf ( schema.registry = 'http://schemaregistry:8082', message = 'test.package.MessageH.MessageI'); +system ok +rpk topic create test-rw-sink-upsert-protobuf + +statement ok +create table from_kafka_raw (kafka_value bytea) +include key as kafka_key +with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-protobuf', + properties.bootstrap.server = 'message_queue:29092') +format plain encode bytes; + statement ok create table into_kafka ( bool_field bool, @@ -84,6 +96,40 @@ format plain encode protobuf ( schema.registry = 'http://schemaregistry:8082', message = 'test.package.MessageH.MessageI'); +statement error +create sink sink_upsert from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-protobuf', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'string_field') +format upsert encode protobuf ( + schema.location = 'file:///risingwave/proto-recursive', + message = 'recursive.AllTypes'); +---- +db error: ERROR: Failed to run the query + +Caused by these errors (recent errors listed first): + 1: gRPC request to meta service failed: Internal error + 2: failed to validate sink + 3: config error + 4: sink format/encode/key_encode unsupported: Upsert Protobuf None + + +statement ok +create sink sink_upsert from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-protobuf', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'string_field') +format upsert encode protobuf ( + schema.location = 'file:///risingwave/proto-recursive', + message = 'recursive.AllTypes') +key encode text; + +# Shall be ignored by force_append_only sinks but processed by upsert sinks. +statement ok +delete from into_kafka where bool_field; + sleep 2s query TTTRRIIIIIITTTI @@ -119,6 +165,11 @@ select field_i from from_kafka_csr_nested order by 1; 13 24 +query T +select convert_from(kafka_key, 'utf-8') from from_kafka_raw where kafka_value is null; +---- +Rising + statement error No such file create sink sink_err from into_kafka with ( connector = 'kafka', @@ -149,6 +200,9 @@ format plain encode protobuf ( schema.location = 's3:///risingwave/proto-recursive', message = 'recursive.AllTypes'); +statement ok +drop sink sink_upsert; + statement ok drop sink sink_csr_nested; @@ -161,5 +215,11 @@ drop sink sink0; statement ok drop table into_kafka; +statement ok +drop table from_kafka_raw; + +system ok +rpk topic delete test-rw-sink-upsert-protobuf + statement ok drop table from_kafka; diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index b2e93cba763ea..6da8a1e0d2008 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -85,6 +85,10 @@ pub enum SinkFormatterImpl { UpsertTextJson(UpsertFormatter), UpsertAvro(UpsertFormatter), UpsertTextAvro(UpsertFormatter), + // `UpsertFormatter` is intentionally left out + // to avoid using `ProtoEncoder` as key: + // + UpsertTextProto(UpsertFormatter), UpsertTemplate(UpsertFormatter), UpsertTextTemplate(UpsertFormatter), // debezium @@ -356,6 +360,7 @@ impl SinkFormatterImpl { (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?), (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?), (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?), + (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?), (F::Upsert, E::Template, Some(E::Text)) => { Impl::UpsertTextTemplate(build(p).await?) } @@ -399,6 +404,7 @@ macro_rules! dispatch_sink_formatter_impl { SinkFormatterImpl::UpsertTextJson($name) => $body, SinkFormatterImpl::UpsertAvro($name) => $body, SinkFormatterImpl::UpsertTextAvro($name) => $body, + SinkFormatterImpl::UpsertTextProto($name) => $body, SinkFormatterImpl::DebeziumJson($name) => $body, SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body, SinkFormatterImpl::AppendOnlyTemplate($name) => $body, @@ -423,6 +429,7 @@ macro_rules! dispatch_sink_formatter_str_key_impl { SinkFormatterImpl::UpsertTextJson($name) => $body, SinkFormatterImpl::UpsertAvro(_) => unreachable!(), SinkFormatterImpl::UpsertTextAvro($name) => $body, + SinkFormatterImpl::UpsertTextProto($name) => $body, SinkFormatterImpl::DebeziumJson($name) => $body, SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body, SinkFormatterImpl::AppendOnlyTemplate($name) => $body, diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index d7e71d2e7b41e..834f92906efa2 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -882,7 +882,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf], - Format::Upsert => vec![Encode::Json, Encode::Avro], + Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf], Format::Debezium => vec![Encode::Json], ), KinesisSink::SINK_NAME => hashmap!(