Skip to content

Commit

Permalink
feat(sink): enable kafka sink with format plain encode protobuf (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Oct 21, 2023
1 parent fa66cbd commit ccef1c5
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 17 deletions.
7 changes: 7 additions & 0 deletions ci/scripts/e2e-kafka-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,10 @@ sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/drop_sink.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-upsert --delete > /dev/null 2>&1
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-debezium --delete > /dev/null 2>&1

# test different encoding
echo "testing protobuf"
cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1
sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt'
./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
timeout_in_minutes: 17
retry: *auto-retry

- label: "end-to-end test (parallel)"
Expand Down
97 changes: 97 additions & 0 deletions e2e_test/sink/kafka/protobuf.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
statement ok
create table from_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
format plain encode protobuf (
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement ok
create table into_kafka (
bool_field bool,
string_field varchar,
bytes_field bytea,
float_field real,
double_field double precision,
int32_field int,
int64_field bigint,
sint32_field int,
sint64_field bigint,
sfixed32_field int,
sfixed64_field bigint,
nested_message_field struct<id int, name varchar>,
repeated_int_field int[],
timestamp_field timestamptz,
oneof_int32 int);

statement ok
insert into into_kafka values
(true, 'Rising', 'a0', 3.5, 4.25, 22, 23, 24, null, 26, 27, row(1, ''), array[4, 0, 4], '2006-01-02 15:04:05-07:00', 42),
(false, 'Wave', 'ZDF', 1.5, null, 11, 12, 13, 14, 15, 16, row(4, 'foo'), null, null, null);

statement ok
flush;

statement ok
create sink sink0 from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

sleep 2s

query TTTRRIIIIIITTTI
select
bool_field,
string_field,
bytes_field,
float_field,
double_field,
int32_field,
int64_field,
sint32_field,
sint64_field,
sfixed32_field,
sfixed64_field,
nested_message_field,
repeated_int_field,
timestamp_field,
oneof_int32 from from_kafka;
----
t Rising \x6130 3.5 4.25 22 23 24 0 26 27 (1,) {4,0,4} (1136239445,0) 42
f Wave \x5a4446 1.5 0 11 12 13 14 15 16 (4,foo) {} (0,0) 0

statement error failed to read file
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursiv',
message = 'recursive.AllTypes');

statement error encode extra_column error: field not in proto
create sink sink_err as select 1 as extra_column with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 'file:///risingwave/proto-recursive',
message = 'recursive.AllTypes');

statement error s3 URL not supported yet
create sink sink_err from into_kafka with (
connector = 'kafka',
topic = 'test-rw-sink-append-only-protobuf',
properties.bootstrap.server = '127.0.0.1:29092')
format plain encode protobuf (
force_append_only = true,
schema.location = 's3:///risingwave/proto-recursive',
message = 'recursive.AllTypes');
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub mod error;
mod macros;

pub mod parser;
pub mod schema;
pub mod sink;
pub mod source;

Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ProtobufAccessBuilder {
#[derive(Debug, Clone)]
pub struct ProtobufParserConfig {
confluent_wire_type: bool,
message_descriptor: MessageDescriptor,
pub(crate) message_descriptor: MessageDescriptor,
}

impl ProtobufParserConfig {
Expand Down
21 changes: 21 additions & 0 deletions src/connector/src/schema/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod protobuf;

const MESSAGE_NAME_KEY: &str = "message";
const SCHEMA_LOCATION_KEY: &str = "schema.location";

#[derive(Debug)]
pub struct SchemaFetchError(pub String);
57 changes: 57 additions & 0 deletions src/connector/src/schema/protobuf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use prost_reflect::MessageDescriptor;

use super::{SchemaFetchError, MESSAGE_NAME_KEY, SCHEMA_LOCATION_KEY};
use crate::aws_auth::AwsAuthProps;
use crate::parser::{EncodingProperties, ProtobufParserConfig, ProtobufProperties};

/// `aws_auth_props` is only required when reading `s3://` URL.
pub async fn fetch_descriptor(
format_options: &BTreeMap<String, String>,
aws_auth_props: Option<&AwsAuthProps>,
) -> Result<MessageDescriptor, SchemaFetchError> {
let row_schema_location = format_options
.get(SCHEMA_LOCATION_KEY)
.ok_or_else(|| SchemaFetchError(format!("{SCHEMA_LOCATION_KEY} required")))?
.clone();
let message_name = format_options
.get(MESSAGE_NAME_KEY)
.ok_or_else(|| SchemaFetchError(format!("{MESSAGE_NAME_KEY} required")))?
.clone();

if row_schema_location.starts_with("s3") && aws_auth_props.is_none() {
return Err(SchemaFetchError("s3 URL not supported yet".into()));
}

let enc = EncodingProperties::Protobuf(ProtobufProperties {
use_schema_registry: false,
row_schema_location,
message_name,
aws_auth_props: aws_auth_props.cloned(),
// name_strategy, topic, key_message_name, enable_upsert, client_config
..Default::default()
});
// Ideally, we should extract the schema loading logic from source parser to this place,
// and call this in both source and sink.
// But right now this function calls into source parser for its schema loading functionality.
// This reversed dependency will be fixed when we support schema registry.
let conf = ProtobufParserConfig::new(enc)
.await
.map_err(|e| SchemaFetchError(e.to_string()))?;
Ok(conf.message_descriptor)
}
48 changes: 37 additions & 11 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub use upsert::UpsertFormatter;

use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc};
use super::encoder::KafkaConnectParams;
use crate::sink::encoder::{JsonEncoder, TimestampHandlingMode};
use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode};

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
/// for example append-only, upsert or debezium.
Expand Down Expand Up @@ -61,27 +61,28 @@ macro_rules! tri {
};
}

#[expect(clippy::enum_variant_names)]
pub enum SinkFormatterImpl {
AppendOnlyJson(AppendOnlyFormatter<JsonEncoder, JsonEncoder>),
AppendOnlyProto(AppendOnlyFormatter<JsonEncoder, ProtoEncoder>),
UpsertJson(UpsertFormatter<JsonEncoder, JsonEncoder>),
DebeziumJson(DebeziumJsonFormatter),
}

impl SinkFormatterImpl {
pub fn new(
pub async fn new(
format_desc: &SinkFormatDesc,
schema: Schema,
pk_indices: Vec<usize>,
db_name: String,
sink_from_name: String,
) -> Result<Self> {
if format_desc.encode != SinkEncode::Json {
return Err(SinkError::Config(anyhow!(
"sink encode unsupported: {:?}",
let err_unsupported = || {
Err(SinkError::Config(anyhow!(
"sink format/encode unsupported: {:?} {:?}",
format_desc.format,
format_desc.encode,
)));
}
)))
};

match format_desc.format {
SinkFormat::AppendOnly => {
Expand All @@ -92,12 +93,32 @@ impl SinkFormatterImpl {
TimestampHandlingMode::Milli,
)
});
let val_encoder = JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);

let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::AppendOnlyJson(formatter))
match format_desc.encode {
SinkEncode::Json => {
let val_encoder =
JsonEncoder::new(schema, None, TimestampHandlingMode::Milli);
let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::AppendOnlyJson(formatter))
}
SinkEncode::Protobuf => {
// By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
let descriptor =
crate::schema::protobuf::fetch_descriptor(&format_desc.options, None)
.await
.map_err(|e| SinkError::Config(anyhow!("{e:?}")))?;
let val_encoder = ProtoEncoder::new(schema, None, descriptor)?;
let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder);
Ok(SinkFormatterImpl::AppendOnlyProto(formatter))
}
SinkEncode::Avro => err_unsupported(),
}
}
SinkFormat::Debezium => {
if format_desc.encode != SinkEncode::Json {
return err_unsupported();
}

Ok(SinkFormatterImpl::DebeziumJson(DebeziumJsonFormatter::new(
schema,
pk_indices,
Expand All @@ -107,6 +128,10 @@ impl SinkFormatterImpl {
)))
}
SinkFormat::Upsert => {
if format_desc.encode != SinkEncode::Json {
return err_unsupported();
}

let mut key_encoder = JsonEncoder::new(
schema.clone(),
Some(pk_indices),
Expand Down Expand Up @@ -146,6 +171,7 @@ macro_rules! dispatch_sink_formatter_impl {
($impl:expr, $name:ident, $body:expr) => {
match $impl {
SinkFormatterImpl::AppendOnlyJson($name) => $body,
SinkFormatterImpl::AppendOnlyProto($name) => $body,
SinkFormatterImpl::UpsertJson($name) => $body,
SinkFormatterImpl::DebeziumJson($name) => $body,
}
Expand Down
12 changes: 11 additions & 1 deletion src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ impl Sink for KafkaSink {
self.pk_indices.clone(),
self.db_name.clone(),
self.sink_from_name.clone(),
)?;
)
.await?;
KafkaLogSinker::new(self.config.clone(), formatter).await
}

Expand All @@ -326,6 +327,15 @@ impl Sink for KafkaSink {
self.format_desc.format
)));
}
// Check for formatter constructor error, before it is too late for error reporting.
SinkFormatterImpl::new(
&self.format_desc,
self.schema.clone(),
self.pk_indices.clone(),
self.db_name.clone(),
self.sink_from_name.clone(),
)
.await?;

// Try Kafka connection.
// There is no such interface for kafka producer to validate a connection
Expand Down
12 changes: 11 additions & 1 deletion src/connector/src/sink/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ impl Sink for KinesisSink {
"kinesis sink requires partition key (please define in `primary_key` field)",
)));
}
// Check for formatter constructor error, before it is too late for error reporting.
SinkFormatterImpl::new(
&self.format_desc,
self.schema.clone(),
self.pk_indices.clone(),
self.db_name.clone(),
self.sink_from_name.clone(),
)
.await?;

// check reachability
let client = self.config.common.build_client().await?;
Expand Down Expand Up @@ -145,7 +154,8 @@ impl KinesisSinkWriter {
sink_from_name: String,
) -> Result<Self> {
let formatter =
SinkFormatterImpl::new(format_desc, schema, pk_indices, db_name, sink_from_name)?;
SinkFormatterImpl::new(format_desc, schema, pk_indices, db_name, sink_from_name)
.await?;
let client = config
.common
.build_client()
Expand Down
12 changes: 11 additions & 1 deletion src/connector/src/sink/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ impl Sink for PulsarSink {
self.format_desc.format
)));
}
// Check for formatter constructor error, before it is too late for error reporting.
SinkFormatterImpl::new(
&self.format_desc,
self.schema.clone(),
self.downstream_pk.clone(),
self.db_name.clone(),
self.sink_from_name.clone(),
)
.await?;

// Validate pulsar connection.
let pulsar = self.config.common.build_client().await?;
Expand Down Expand Up @@ -211,7 +220,8 @@ impl PulsarSinkWriter {
sink_from_name: String,
) -> Result<Self> {
let formatter =
SinkFormatterImpl::new(format_desc, schema, downstream_pk, db_name, sink_from_name)?;
SinkFormatterImpl::new(format_desc, schema, downstream_pk, db_name, sink_from_name)
.await?;
let pulsar = config.common.build_client().await?;
let producer = build_pulsar_producer(&pulsar, &config).await?;
Ok(Self {
Expand Down
Loading

0 comments on commit ccef1c5

Please sign in to comment.