diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 1e01903ec2042..06ef185f46e8b 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -131,3 +131,10 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt' ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1 ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1 + +# test different encoding +echo "testing protobuf" +cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 +sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 364455d045668..ee74834dd5163 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -82,7 +82,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 15 + timeout_in_minutes: 17 retry: *auto-retry - label: "end-to-end test (parallel)" diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt new file mode 100644 index 0000000000000..f69c4a9d07110 --- /dev/null +++ b/e2e_test/sink/kafka/protobuf.slt @@ -0,0 +1,97 @@ +statement ok +create table from_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf', + properties.bootstrap.server = '127.0.0.1:29092') +format plain encode protobuf ( + schema.location = 'file:///risingwave/proto-recursive', + message = 'recursive.AllTypes'); + +statement ok +create table into_kafka ( + bool_field bool, + string_field varchar, + bytes_field bytea, + float_field real, + double_field double precision, + int32_field int, + int64_field bigint, + sint32_field int, + sint64_field bigint, + sfixed32_field int, + sfixed64_field bigint, + nested_message_field struct, + repeated_int_field int[], + timestamp_field timestamptz, + oneof_int32 int); + +statement ok +insert into into_kafka values + (true, 'Rising', 'a0', 3.5, 4.25, 22, 23, 24, null, 26, 27, row(1, ''), array[4, 0, 4], '2006-01-02 15:04:05-07:00', 42), + (false, 'Wave', 'ZDF', 1.5, null, 11, 12, 13, 14, 15, 16, row(4, 'foo'), null, null, null); + +statement ok +flush; + +statement ok +create sink sink0 from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf', + properties.bootstrap.server = '127.0.0.1:29092') +format plain encode protobuf ( + force_append_only = true, + schema.location = 'file:///risingwave/proto-recursive', + message = 'recursive.AllTypes'); + +sleep 2s + +query TTTRRIIIIIITTTI +select + bool_field, + string_field, + bytes_field, + float_field, + double_field, + int32_field, + int64_field, + sint32_field, + sint64_field, + sfixed32_field, + sfixed64_field, + nested_message_field, + repeated_int_field, + timestamp_field, + oneof_int32 from from_kafka; +---- +t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,) {4,0,4} (1136239445,0) 42 +f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0 + +statement error failed to read file +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf', + properties.bootstrap.server = '127.0.0.1:29092') +format plain encode protobuf ( + force_append_only = true, + schema.location = 'file:///risingwave/proto-recursiv', + message = 'recursive.AllTypes'); + +statement error encode extra_column error: field not in proto +create sink sink_err as select 1 as extra_column with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf', + properties.bootstrap.server = '127.0.0.1:29092') +format plain encode protobuf ( + force_append_only = true, + schema.location = 'file:///risingwave/proto-recursive', + message = 'recursive.AllTypes'); + +statement error s3 URL not supported yet +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-append-only-protobuf', + properties.bootstrap.server = '127.0.0.1:29092') +format plain encode protobuf ( + force_append_only = true, + schema.location = 's3:///risingwave/proto-recursive', + message = 'recursive.AllTypes'); diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 8201e10731a28..8ccf62486ce65 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -46,6 +46,7 @@ pub mod error; mod macros; pub mod parser; +pub mod schema; pub mod sink; pub mod source; diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index b07a7b214d2c9..cc8dc0734bdfe 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -73,7 +73,7 @@ impl ProtobufAccessBuilder { #[derive(Debug, Clone)] pub struct ProtobufParserConfig { confluent_wire_type: bool, - message_descriptor: MessageDescriptor, + pub(crate) message_descriptor: MessageDescriptor, } impl ProtobufParserConfig { diff --git a/src/connector/src/schema/mod.rs b/src/connector/src/schema/mod.rs new file mode 100644 index 0000000000000..fff2576389cfd --- /dev/null +++ b/src/connector/src/schema/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod protobuf; + +const MESSAGE_NAME_KEY: &str = "message"; +const SCHEMA_LOCATION_KEY: &str = "schema.location"; + +#[derive(Debug)] +pub struct SchemaFetchError(pub String); diff --git a/src/connector/src/schema/protobuf.rs b/src/connector/src/schema/protobuf.rs new file mode 100644 index 0000000000000..d312f3d0d73eb --- /dev/null +++ b/src/connector/src/schema/protobuf.rs @@ -0,0 +1,57 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use prost_reflect::MessageDescriptor; + +use super::{SchemaFetchError, MESSAGE_NAME_KEY, SCHEMA_LOCATION_KEY}; +use crate::aws_auth::AwsAuthProps; +use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties}; + +/// `aws_auth_props` is only required when reading `s3://` URL. +pub async fn fetch_descriptor( + format_options: &BTreeMap, + aws_auth_props: Option<&AwsAuthProps>, +) -> Result { + let row_schema_location = format_options + .get(SCHEMA_LOCATION_KEY) + .ok_or_else(|| SchemaFetchError(format!("{SCHEMA_LOCATION_KEY} required")))? + .clone(); + let message_name = format_options + .get(MESSAGE_NAME_KEY) + .ok_or_else(|| SchemaFetchError(format!("{MESSAGE_NAME_KEY} required")))? + .clone(); + + if row_schema_location.starts_with("s3") && aws_auth_props.is_none() { + return Err(SchemaFetchError("s3 URL not supported yet".into())); + } + + let enc = EncodingProperties::Protobuf(ProtobufProperties { + use_schema_registry: false, + row_schema_location, + message_name, + aws_auth_props: aws_auth_props.cloned(), + // name_strategy, topic, key_message_name, enable_upsert, client_config + ..Default::default() + }); + // Ideally, we should extract the schema loading logic from source parser to this place, + // and call this in both source and sink. + // But right now this function calls into source parser for its schema loading functionality. + // This reversed dependency will be fixed when we support schema registry. + let conf = ProtobufParserConfig::new(enc) + .await + .map_err(|e| SchemaFetchError(e.to_string()))?; + Ok(conf.message_descriptor) +} diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 44c8ed6f3ea29..5a1c7379795d5 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -28,7 +28,7 @@ pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::KafkaConnectParams; -use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode}; +use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode}; /// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, /// for example append-only, upsert or debezium. @@ -61,27 +61,28 @@ macro_rules! tri { }; } -#[expect(clippy::enum_variant_names)] pub enum SinkFormatterImpl { AppendOnlyJson(AppendOnlyFormatter), + AppendOnlyProto(AppendOnlyFormatter), UpsertJson(UpsertFormatter), DebeziumJson(DebeziumJsonFormatter), } impl SinkFormatterImpl { - pub fn new( + pub async fn new( format_desc: &SinkFormatDesc, schema: Schema, pk_indices: Vec, db_name: String, sink_from_name: String, ) -> Result { - if format_desc.encode != SinkEncode::Json { - return Err(SinkError::Config(anyhow!( - "sink encode unsupported: {:?}", + let err_unsupported = || { + Err(SinkError::Config(anyhow!( + "sink format/encode unsupported: {:?} {:?}", + format_desc.format, format_desc.encode, - ))); - } + ))) + }; match format_desc.format { SinkFormat::AppendOnly => { @@ -92,12 +93,32 @@ impl SinkFormatterImpl { TimestampHandlingMode::Milli, ) }); - let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); - let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); - Ok(SinkFormatterImpl::AppendOnlyJson(formatter)) + match format_desc.encode { + SinkEncode::Json => { + let val_encoder = + JsonEncoder::new(schema, None, TimestampHandlingMode::Milli); + let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); + Ok(SinkFormatterImpl::AppendOnlyJson(formatter)) + } + SinkEncode::Protobuf => { + // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet. + let descriptor = + crate::schema::protobuf::fetch_descriptor(&format_desc.options, None) + .await + .map_err(|e| SinkError::Config(anyhow!("{e:?}")))?; + let val_encoder = ProtoEncoder::new(schema, None, descriptor)?; + let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); + Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) + } + SinkEncode::Avro => err_unsupported(), + } } SinkFormat::Debezium => { + if format_desc.encode != SinkEncode::Json { + return err_unsupported(); + } + Ok(SinkFormatterImpl::DebeziumJson(DebeziumJsonFormatter::new( schema, pk_indices, @@ -107,6 +128,10 @@ impl SinkFormatterImpl { ))) } SinkFormat::Upsert => { + if format_desc.encode != SinkEncode::Json { + return err_unsupported(); + } + let mut key_encoder = JsonEncoder::new( schema.clone(), Some(pk_indices), @@ -146,6 +171,7 @@ macro_rules! dispatch_sink_formatter_impl { ($impl:expr, $name:ident, $body:expr) => { match $impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, + SinkFormatterImpl::AppendOnlyProto($name) => $body, SinkFormatterImpl::UpsertJson($name) => $body, SinkFormatterImpl::DebeziumJson($name) => $body, } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index ef4efe88e43f8..a204a8d121706 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -314,7 +314,8 @@ impl Sink for KafkaSink { self.pk_indices.clone(), self.db_name.clone(), self.sink_from_name.clone(), - )?; + ) + .await?; KafkaLogSinker::new(self.config.clone(), formatter).await } @@ -326,6 +327,15 @@ impl Sink for KafkaSink { self.format_desc.format ))); } + // Check for formatter constructor error, before it is too late for error reporting. + SinkFormatterImpl::new( + &self.format_desc, + self.schema.clone(), + self.pk_indices.clone(), + self.db_name.clone(), + self.sink_from_name.clone(), + ) + .await?; // Try Kafka connection. // There is no such interface for kafka producer to validate a connection diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 741bc1bb1db14..dd8518af39948 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -79,6 +79,15 @@ impl Sink for KinesisSink { "kinesis sink requires partition key (please define in `primary_key` field)", ))); } + // Check for formatter constructor error, before it is too late for error reporting. + SinkFormatterImpl::new( + &self.format_desc, + self.schema.clone(), + self.pk_indices.clone(), + self.db_name.clone(), + self.sink_from_name.clone(), + ) + .await?; // check reachability let client = self.config.common.build_client().await?; @@ -145,7 +154,8 @@ impl KinesisSinkWriter { sink_from_name: String, ) -> Result { let formatter = - SinkFormatterImpl::new(format_desc, schema, pk_indices, db_name, sink_from_name)?; + SinkFormatterImpl::new(format_desc, schema, pk_indices, db_name, sink_from_name) + .await?; let client = config .common .build_client() diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 16514da4c4d23..f980b2ad9f9b1 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -180,6 +180,15 @@ impl Sink for PulsarSink { self.format_desc.format ))); } + // Check for formatter constructor error, before it is too late for error reporting. + SinkFormatterImpl::new( + &self.format_desc, + self.schema.clone(), + self.downstream_pk.clone(), + self.db_name.clone(), + self.sink_from_name.clone(), + ) + .await?; // Validate pulsar connection. let pulsar = self.config.common.build_client().await?; @@ -211,7 +220,8 @@ impl PulsarSinkWriter { sink_from_name: String, ) -> Result { let formatter = - SinkFormatterImpl::new(format_desc, schema, downstream_pk, db_name, sink_from_name)?; + SinkFormatterImpl::new(format_desc, schema, downstream_pk, db_name, sink_from_name) + .await?; let pulsar = config.common.build_client().await?; let producer = build_pulsar_producer(&pulsar, &config).await?; Ok(Self { diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 80335d278c8d4..32279dd4e70eb 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -266,7 +266,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( - Format::Plain => vec![Encode::Json], + Format::Plain => vec![Encode::Json, Encode::Protobuf], Format::Upsert => vec![Encode::Json], Format::Debezium => vec![Encode::Json], ),