From 4b03d0064f7755010f159cbc9eef4a67a3864ece Mon Sep 17 00:00:00 2001 From: xiangjinwu <17769960+xiangjinwu@users.noreply.github.com> Date: Wed, 13 Mar 2024 13:25:50 +0800 Subject: [PATCH] feat(sink): support `encode protobuf` with confluent schema registry (#15546) --- ci/scripts/e2e-kafka-sink-test.sh | 10 ++- e2e_test/sink/kafka/protobuf.slt | 56 ++++++++++++ e2e_test/sink/kafka/register_schema.py | 16 ++-- src/connector/src/lib.rs | 1 + src/connector/src/schema/protobuf.rs | 49 +++++++++-- src/connector/src/sink/encoder/mod.rs | 2 +- src/connector/src/sink/encoder/proto.rs | 85 +++++++++++++++++-- src/connector/src/sink/formatter/mod.rs | 19 +++-- .../src/test_data/test-index-array.proto | 39 +++++++++ 9 files changed, 251 insertions(+), 26 deletions(-) create mode 100644 src/connector/src/test_data/test-index-array.proto diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 1a319975f32ca..b1d1f19c8f54d 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -143,14 +143,22 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt' ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 # test different encoding +echo "preparing confluent schema registry" +python3 -m pip install requests confluent-kafka + echo "testing protobuf" cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --create > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --create > /dev/null 2>&1 +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-a-value' src/connector/src/test_data/test-index-array.proto +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-append-only-protobuf-csr-hi-value' src/connector/src/test_data/test-index-array.proto sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-hi --delete > /dev/null 2>&1 +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf-csr-a --delete > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 echo "testing avro" -python3 -m pip install requests confluent-kafka python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field' ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1 diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 15e24bde59e94..0c74cc8a0b369 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -7,6 +7,24 @@ format plain encode protobuf ( schema.location = 'file:///risingwave/proto-recursive', message = 'recursive.AllTypes'); +statement ok +create table from_kafka_csr_trivial with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-a', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageA'); + +statement ok +create table from_kafka_csr_nested with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-hi', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageH.MessageI'); + statement ok create table into_kafka ( bool_field bool, @@ -43,6 +61,26 @@ format plain encode protobuf ( schema.location = 'file:///risingwave/proto-recursive', message = 'recursive.AllTypes'); +statement ok +create sink sink_csr_trivial as select string_field as field_a from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-a', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + force_append_only = true, + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageA'); + +statement ok +create sink sink_csr_nested as select sint32_field as field_i from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf-csr-hi', + properties.bootstrap.server = 'message_queue:29092') +format plain encode protobuf ( + force_append_only = true, + schema.registry = 'http://message_queue:8081', + message = 'test.package.MessageH.MessageI'); + sleep 2s query TTTRRIIIIIITTTI @@ -66,6 +104,18 @@ select t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,"") {4,0,4} (1136239445,0) 42 f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0 +query T +select field_a from from_kafka_csr_trivial order by 1; +---- +Rising +Wave + +query I +select field_i from from_kafka_csr_nested order by 1; +---- +13 +24 + statement error No such file create sink sink_err from into_kafka with ( connector = 'kafka', @@ -96,6 +146,12 @@ format plain encode protobuf ( schema.location = 's3:///risingwave/proto-recursive', message = 'recursive.AllTypes'); +statement ok +drop sink sink_csr_nested; + +statement ok +drop sink sink_csr_trivial; + statement ok drop sink sink0; diff --git a/e2e_test/sink/kafka/register_schema.py b/e2e_test/sink/kafka/register_schema.py index 2606e07bcb89b..7385725ca8d34 100644 --- a/e2e_test/sink/kafka/register_schema.py +++ b/e2e_test/sink/kafka/register_schema.py @@ -5,7 +5,8 @@ def main(): url = sys.argv[1] subject = sys.argv[2] - with open(sys.argv[3]) as f: + local_path = sys.argv[3] + with open(local_path) as f: schema_str = f.read() if 4 < len(sys.argv): keys = sys.argv[4].split(',') @@ -14,11 +15,16 @@ def main(): client = SchemaRegistryClient({"url": url}) - if keys: - schema_str = select_keys(schema_str, keys) + if local_path.endswith('.avsc'): + if keys: + schema_str = select_keys(schema_str, keys) + else: + schema_str = remove_unsupported(schema_str) + schema = Schema(schema_str, 'AVRO') + elif local_path.endswith('.proto'): + schema = Schema(schema_str, 'PROTOBUF') else: - schema_str = remove_unsupported(schema_str) - schema = Schema(schema_str, 'AVRO') + raise ValueError('{} shall end with .avsc or .proto'.format(local_path)) client.register_schema(subject, schema) diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 3ebf2475f7866..fb38f2db00c4f 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -14,6 +14,7 @@ #![expect(dead_code)] #![allow(clippy::derive_partial_eq_without_eq)] +#![feature(array_chunks)] #![feature(coroutines)] #![feature(proc_macro_hygiene)] #![feature(stmt_expr_attributes)] diff --git a/src/connector/src/schema/protobuf.rs b/src/connector/src/schema/protobuf.rs index 6229e2e3e223a..d35d17bc4ab12 100644 --- a/src/connector/src/schema/protobuf.rs +++ b/src/connector/src/schema/protobuf.rs @@ -17,11 +17,11 @@ use std::collections::BTreeMap; use itertools::Itertools as _; use prost_reflect::{DescriptorPool, FileDescriptor, MessageDescriptor}; -use super::loader::LoadedSchema; +use super::loader::{LoadedSchema, SchemaLoader}; use super::schema_registry::Subject; use super::{ invalid_option_error, InvalidOptionError, SchemaFetchError, MESSAGE_NAME_KEY, - SCHEMA_LOCATION_KEY, + SCHEMA_LOCATION_KEY, SCHEMA_REGISTRY_KEY, }; use crate::common::AwsAuthProps; use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties}; @@ -29,16 +29,34 @@ use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties /// `aws_auth_props` is only required when reading `s3://` URL. pub async fn fetch_descriptor( format_options: &BTreeMap, + topic: &str, aws_auth_props: Option<&AwsAuthProps>, -) -> Result { - let row_schema_location = format_options - .get(SCHEMA_LOCATION_KEY) - .ok_or_else(|| invalid_option_error!("{SCHEMA_LOCATION_KEY} required"))? - .clone(); +) -> Result<(MessageDescriptor, Option), SchemaFetchError> { let message_name = format_options .get(MESSAGE_NAME_KEY) .ok_or_else(|| invalid_option_error!("{MESSAGE_NAME_KEY} required"))? .clone(); + let schema_location = format_options.get(SCHEMA_LOCATION_KEY); + let schema_registry = format_options.get(SCHEMA_REGISTRY_KEY); + let row_schema_location = match (schema_location, schema_registry) { + (Some(_), Some(_)) => { + return Err(invalid_option_error!( + "cannot use {SCHEMA_LOCATION_KEY} and {SCHEMA_REGISTRY_KEY} together" + ) + .into()) + } + (None, None) => { + return Err(invalid_option_error!( + "requires one of {SCHEMA_LOCATION_KEY} or {SCHEMA_REGISTRY_KEY}" + ) + .into()) + } + (None, Some(_)) => { + let (md, sid) = fetch_from_registry(&message_name, format_options, topic).await?; + return Ok((md, Some(sid))); + } + (Some(url), None) => url.clone(), + }; if row_schema_location.starts_with("s3") && aws_auth_props.is_none() { return Err(invalid_option_error!("s3 URL not supported yet").into()); @@ -59,7 +77,22 @@ pub async fn fetch_descriptor( let conf = ProtobufParserConfig::new(enc) .await .map_err(SchemaFetchError::YetToMigrate)?; - Ok(conf.message_descriptor) + Ok((conf.message_descriptor, None)) +} + +pub async fn fetch_from_registry( + message_name: &str, + format_options: &BTreeMap, + topic: &str, +) -> Result<(MessageDescriptor, i32), SchemaFetchError> { + let loader = SchemaLoader::from_format_options(topic, format_options)?; + + let (vid, vpb) = loader.load_val_schema::().await?; + + Ok(( + vpb.parent_pool().get_message_by_name(message_name).unwrap(), + vid, + )) } impl LoadedSchema for FileDescriptor { diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 3254447e27077..34dc4c8886448 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -27,7 +27,7 @@ pub mod template; pub use avro::{AvroEncoder, AvroHeader}; pub use json::JsonEncoder; -pub use proto::ProtoEncoder; +pub use proto::{ProtoEncoder, ProtoHeader}; /// Encode a row of a relation into /// * an object in json diff --git a/src/connector/src/sink/encoder/proto.rs b/src/connector/src/sink/encoder/proto.rs index 1415cacb437a9..a5f1090dbafaf 100644 --- a/src/connector/src/sink/encoder/proto.rs +++ b/src/connector/src/sink/encoder/proto.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use bytes::Bytes; +use bytes::{BufMut, Bytes}; use prost::Message; use prost_reflect::{ DynamicMessage, FieldDescriptor, Kind, MessageDescriptor, ReflectMessage, Value, @@ -30,6 +30,17 @@ pub struct ProtoEncoder { schema: Schema, col_indices: Option>, descriptor: MessageDescriptor, + header: ProtoHeader, +} + +#[derive(Debug, Clone, Copy)] +pub enum ProtoHeader { + None, + /// + /// + /// * 00 + /// * 4-byte big-endian schema ID + ConfluentSchemaRegistry(i32), } impl ProtoEncoder { @@ -37,6 +48,7 @@ impl ProtoEncoder { schema: Schema, col_indices: Option>, descriptor: MessageDescriptor, + header: ProtoHeader, ) -> SinkResult { match &col_indices { Some(col_indices) => validate_fields( @@ -59,12 +71,18 @@ impl ProtoEncoder { schema, col_indices, descriptor, + header, }) } } +pub struct ProtoEncoded { + message: DynamicMessage, + header: ProtoHeader, +} + impl RowEncoder for ProtoEncoder { - type Output = DynamicMessage; + type Output = ProtoEncoded; fn schema(&self) -> &Schema { &self.schema @@ -87,12 +105,68 @@ impl RowEncoder for ProtoEncoder { &self.descriptor, ) .map_err(Into::into) + .map(|m| ProtoEncoded { + message: m, + header: self.header, + }) } } -impl SerTo> for DynamicMessage { +impl SerTo> for ProtoEncoded { fn ser_to(self) -> SinkResult> { - Ok(self.encode_to_vec()) + let mut buf = Vec::new(); + match self.header { + ProtoHeader::None => { /* noop */ } + ProtoHeader::ConfluentSchemaRegistry(schema_id) => { + buf.reserve(1 + 4); + buf.put_u8(0); + buf.put_i32(schema_id); + MessageIndexes::from(self.message.descriptor()).encode(&mut buf); + } + } + self.message.encode(&mut buf).unwrap(); + Ok(buf) + } +} + +struct MessageIndexes(Vec); + +impl MessageIndexes { + fn from(desc: MessageDescriptor) -> Self { + // https://github.com/protocolbuffers/protobuf/blob/v25.1/src/google/protobuf/descriptor.proto + // https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/tag.rs.html + // https://docs.rs/prost-reflect/0.12.0/src/prost_reflect/descriptor/build/visit.rs.html#125 + // `FileDescriptorProto` field #4 is `repeated DescriptorProto message_type` + const TAG_FILE_MESSAGE: i32 = 4; + // `DescriptorProto` field #3 is `repeated DescriptorProto nested_type` + const TAG_MESSAGE_NESTED: i32 = 3; + + let mut indexes = vec![]; + let mut path = desc.path().array_chunks(); + let &[tag, idx] = path.next().unwrap(); + assert_eq!(tag, TAG_FILE_MESSAGE); + indexes.push(idx); + for &[tag, idx] in path { + assert_eq!(tag, TAG_MESSAGE_NESTED); + indexes.push(idx); + } + Self(indexes) + } + + fn zig_i32(value: i32, buf: &mut impl BufMut) { + let unsigned = ((value << 1) ^ (value >> 31)) as u32 as u64; + prost::encoding::encode_varint(unsigned, buf); + } + + fn encode(&self, buf: &mut impl BufMut) { + if self.0 == [0] { + buf.put_u8(0); + return; + } + Self::zig_i32(self.0.len().try_into().unwrap(), buf); + for &idx in &self.0 { + Self::zig_i32(idx, buf); + } } } @@ -367,7 +441,8 @@ mod tests { Some(ScalarImpl::Timestamptz(Timestamptz::from_micros(3))), ]); - let encoder = ProtoEncoder::new(schema, None, descriptor.clone()).unwrap(); + let encoder = + ProtoEncoder::new(schema, None, descriptor.clone(), ProtoHeader::None).unwrap(); let m = encoder.encode(row).unwrap(); let encoded: Vec = m.ser_to().unwrap(); assert_eq!( diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index cb36bda793709..d923d337a3ffb 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -33,7 +33,7 @@ use super::encoder::{ }; use super::redis::{KEY_FORMAT, VALUE_FORMAT}; use crate::sink::encoder::{ - AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, TimestampHandlingMode, + AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, ProtoHeader, TimestampHandlingMode, }; /// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, @@ -123,11 +123,18 @@ impl SinkFormatterImpl { } SinkEncode::Protobuf => { // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet. - let descriptor = - crate::schema::protobuf::fetch_descriptor(&format_desc.options, None) - .await - .map_err(|e| SinkError::Config(anyhow!(e)))?; - let val_encoder = ProtoEncoder::new(schema, None, descriptor)?; + let (descriptor, sid) = crate::schema::protobuf::fetch_descriptor( + &format_desc.options, + topic, + None, + ) + .await + .map_err(|e| SinkError::Config(anyhow!(e)))?; + let header = match sid { + None => ProtoHeader::None, + Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid), + }; + let val_encoder = ProtoEncoder::new(schema, None, descriptor, header)?; let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) } diff --git a/src/connector/src/test_data/test-index-array.proto b/src/connector/src/test_data/test-index-array.proto new file mode 100644 index 0000000000000..f5119819ce3cb --- /dev/null +++ b/src/connector/src/test_data/test-index-array.proto @@ -0,0 +1,39 @@ +// Example taken from https://docs.confluent.io/platform/7.6/schema-registry/fundamentals/serdes-develop/index.html#wire-format +// `test.package.MessageH.MessageI` `[1, 0]` `2, 1, 0` 0x040200 +// `test.package.MessageA.MessageE.MessageG` `[0, 2, 1]` `3, 0, 2, 1` 0x06000402 +// `test.package.MessageA` `[0]` `1, 0`/`0` 0x00 + +syntax = "proto3"; +package test.package; + +message MessageA { + string field_a = 1; + + message MessageB { + double field_b = 1; + + message MessageC { + sint32 field_c = 1; + } + } + message MessageD { + sint32 field_d = 1; + } + message MessageE { + string field_e = 1; + + message MessageF { + double field_f = 1; + } + message MessageG { + sint32 field_g = 1; + } + } +} +message MessageH { + double field_h = 1; + + message MessageI { + sint32 field_i = 1; + } +}