From c186a8fc26decb8fc8f37cf4ad9e63bb2afd81ed Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Mon, 15 Jul 2024 16:06:55 +0800 Subject: [PATCH] fix: enable upsert protobuf combination (#17624) Signed-off-by: tabVersion --- .../source_inline/kafka/protobuf/basic.slt | 23 +++++++++++++++++++ e2e_test/source_inline/kafka/protobuf/pb.py | 2 ++ src/frontend/src/handler/create_source.rs | 7 ++++-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/e2e_test/source_inline/kafka/protobuf/basic.slt b/e2e_test/source_inline/kafka/protobuf/basic.slt index 0eae891d04bcd..44153949e79e8 100644 --- a/e2e_test/source_inline/kafka/protobuf/basic.slt +++ b/e2e_test/source_inline/kafka/protobuf/basic.slt @@ -33,6 +33,22 @@ FORMAT plain ENCODE protobuf( message = 'test.User' ); + +# for upsert protobuf source +# NOTE: the key part is in json format and rw only read it as bytes +statement ok +create table sr_pb_upsert (primary key (rw_key)) +include + key as rw_key +with ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'sr_pb_test', + scan.startup.mode = 'earliest') +FORMAT plain ENCODE protobuf( + schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}', + message = 'test.User' + ); + # Wait for source sleep 2s @@ -50,9 +66,16 @@ select min(id), max(id), max((sc).file_name) from sr_pb_test; ---- 0 19 source/context_019.proto +query TT +select convert_from(min(rw_key), 'UTF-8'), convert_from(max(rw_key), 'UTF-8') from sr_pb_upsert; +---- +{"id": 0} {"id": 9} statement ok drop table sr_pb_test; statement ok drop table sr_pb_test_bk; + +statement ok +drop table sr_pb_upsert; diff --git a/e2e_test/source_inline/kafka/protobuf/pb.py b/e2e_test/source_inline/kafka/protobuf/pb.py index 4cab50f899e50..d78db1b536b9f 100644 --- a/e2e_test/source_inline/kafka/protobuf/pb.py +++ b/e2e_test/source_inline/kafka/protobuf/pb.py @@ -1,4 +1,5 @@ import sys +import json import importlib from google.protobuf.source_context_pb2 import SourceContext from confluent_kafka import Producer @@ -55,6 +56,7 @@ def send_to_kafka( producer.produce( topic=topic, partition=0, + key=json.dumps({"id": i}), # RisingWave does not handle key schema, so we use json value=serializer(user, SerializationContext(topic, MessageField.VALUE)), on_delivery=delivery_report, ) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 3d3c32958e31c..aac3649d808cf 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -798,7 +798,10 @@ pub(crate) async fn bind_source_pk( // For all Upsert formats, we only accept one and only key column as primary key. // Additional KEY columns must be set in this case and must be primary key. - (Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => { + ( + Format::Upsert, + encode @ Encode::Json | encode @ Encode::Avro | encode @ Encode::Protobuf, + ) => { if let Some(ref key_column_name) = include_key_column_name && sql_defined_pk { @@ -993,7 +996,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Json, Encode::Protobuf, Encode::Avro, Encode::Bytes, Encode::Csv], - Format::Upsert => vec![Encode::Json, Encode::Avro], + Format::Upsert => vec![Encode::Json, Encode::Avro, Encode::Protobuf], Format::Debezium => vec![Encode::Json, Encode::Avro], Format::Maxwell => vec![Encode::Json], Format::Canal => vec![Encode::Json],