Skip to content

Commit

Permalink
feat(sink): allow upsert protobuf with text key (#18024)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Aug 13, 2024
1 parent 1df6b80 commit c6ed9bb
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
60 changes: 60 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ format plain encode protobuf (
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageH.MessageI');

system ok
rpk topic create test-rw-sink-upsert-protobuf

statement ok
create table from_kafka_raw (kafka_value bytea)
include key as kafka_key
with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-protobuf',
properties.bootstrap.server = 'message_queue:29092')
format plain encode bytes;

statement ok
create table into_kafka (
bool_field bool,
Expand Down Expand Up @@ -84,6 +96,40 @@ format plain encode protobuf (
schema.registry = 'http://schemaregistry:8082',
message = 'test.package.MessageH.MessageI');

statement error
create sink sink_upsert from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-protobuf',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'string_field')
format upsert encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: gRPC request to meta service failed: Internal error
2: failed to validate sink
3: config error
4: sink format/encode/key_encode unsupported: Upsert Protobuf None


statement ok
create sink sink_upsert from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-upsert-protobuf',
properties.bootstrap.server = 'message_queue:29092',
primary_key = 'string_field')
format upsert encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes')
key encode text;

# Shall be ignored by force_append_only sinks but processed by upsert sinks.
statement ok
delete from into_kafka where bool_field;

sleep 2s

query TTTRRIIIIIITTTI
Expand Down Expand Up @@ -119,6 +165,11 @@ select field_i from from_kafka_csr_nested order by 1;
13
24

query T
select convert_from(kafka_key, 'utf-8') from from_kafka_raw where kafka_value is null;
----
Rising

statement error No such file
create sink sink_err from into_kafka with (
connector = 'kafka',
Expand Down Expand Up @@ -149,6 +200,9 @@ format plain encode protobuf (
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
drop sink sink_upsert;

statement ok
drop sink sink_csr_nested;

Expand All @@ -161,5 +215,11 @@ drop sink sink0;
statement ok
drop table into_kafka;

statement ok
drop table from_kafka_raw;

system ok
rpk topic delete test-rw-sink-upsert-protobuf

statement ok
drop table from_kafka;
7 changes: 7 additions & 0 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ pub enum SinkFormatterImpl {
UpsertTextJson(UpsertFormatter<TextEncoder, JsonEncoder>),
UpsertAvro(UpsertFormatter<AvroEncoder, AvroEncoder>),
UpsertTextAvro(UpsertFormatter<TextEncoder, AvroEncoder>),
// `UpsertFormatter<ProtoEncoder, ProtoEncoder>` is intentionally left out
// to avoid using `ProtoEncoder` as key:
// <https://docs.confluent.io/platform/7.7/control-center/topics/schema.html#c3-schemas-best-practices-key-value-pairs>
UpsertTextProto(UpsertFormatter<TextEncoder, ProtoEncoder>),
UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
UpsertTextTemplate(UpsertFormatter<TextEncoder, TemplateEncoder>),
// debezium
Expand Down Expand Up @@ -356,6 +360,7 @@ impl SinkFormatterImpl {
(F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?),
(F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
(F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?),
(F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?),
(F::Upsert, E::Template, Some(E::Text)) => {
Impl::UpsertTextTemplate(build(p).await?)
}
Expand Down Expand Up @@ -399,6 +404,7 @@ macro_rules! dispatch_sink_formatter_impl {
SinkFormatterImpl::UpsertTextJson($name) => $body,
SinkFormatterImpl::UpsertAvro($name) => $body,
SinkFormatterImpl::UpsertTextAvro($name) => $body,
SinkFormatterImpl::UpsertTextProto($name) => $body,
SinkFormatterImpl::DebeziumJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
Expand All @@ -423,6 +429,7 @@ macro_rules! dispatch_sink_formatter_str_key_impl {
SinkFormatterImpl::UpsertTextJson($name) => $body,
SinkFormatterImpl::UpsertAvro(_) => unreachable!(),
SinkFormatterImpl::UpsertTextAvro($name) => $body,
SinkFormatterImpl::UpsertTextProto($name) => $body,
SinkFormatterImpl::DebeziumJson($name) => $body,
SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body,
SinkFormatterImpl::AppendOnlyTemplate($name) => $body,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
),
KafkaSink::SINK_NAME => hashmap!(
Format::Plain => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
Format::Upsert => vec![Encode::Json, Encode::Avro],
Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf],
Format::Debezium => vec![Encode::Json],
),
KinesisSink::SINK_NAME => hashmap!(
Expand Down

0 comments on commit c6ed9bb

Please sign in to comment.