diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index 71a91f2d8fba9..d51482a912235 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -138,3 +138,11 @@ cp src/connector/src/test_data/proto_recursive/recursive.pb ./proto-recursive ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --create > /dev/null 2>&1 sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/protobuf.slt' ./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-append-only-protobuf --delete > /dev/null 2>&1 + +echo "testing avro" +python3 -m pip install requests confluent-kafka +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-value' src/connector/src/test_data/all-types.avsc +python3 e2e_test/sink/kafka/register_schema.py 'http://message_queue:8081' 'test-rw-sink-upsert-avro-key' src/connector/src/test_data/all-types.avsc 'string_field,int32_field' +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --create > /dev/null 2>&1 +sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/avro.slt' +./.risingwave/bin/kafka/bin/kafka-topics.sh --bootstrap-server message_queue:29092 --topic test-rw-sink-upsert-avro --delete > /dev/null 2>&1 diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt new file mode 100644 index 0000000000000..e1b09e3608e37 --- /dev/null +++ b/e2e_test/sink/kafka/avro.slt @@ -0,0 +1,110 @@ +statement ok +create table from_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +statement ok +create table into_kafka ( + bool_field bool, + string_field varchar, + bytes_field bytea, + float_field real, + double_field double precision, + int32_field int, + int64_field bigint, + record_field struct, + array_field int[][], + timestamp_micros_field timestamptz, + timestamp_millis_field timestamptz, + date_field date, + time_micros_field time, + time_millis_field time); + +statement ok +insert into into_kafka values + (true, 'Rising', 'a0', 3.5, 4.25, 22, 23, null, array[array[null, 3], null, array[7, null, 2]], '2006-01-02 15:04:05-07:00', null, null, '12:34:56.123456', null), + (false, 'Wave', 'ZDF', 1.5, null, 11, 12, row(null::int, 'foo'), null, null, '2006-01-02 15:04:05-07:00', '2021-04-01', null, '23:45:16.654321'); + +statement ok +flush; + +statement ok +create sink sink0 from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +sleep 2s + +query TTTRRIITTTTTTTT +select + bool_field, + string_field, + bytes_field, + float_field, + double_field, + int32_field, + int64_field, + record_field, + array_field, + timestamp_micros_field, + timestamp_millis_field, + date_field, + time_micros_field, + time_millis_field from from_kafka; +---- +t Rising \x6130 3.5 4.25 22 23 NULL {{NULL,3},NULL,{7,NULL,2}} 2006-01-02 22:04:05+00:00 NULL NULL 12:34:56.123456 NULL +f Wave \x5a4446 1.5 NULL 11 12 (NULL,foo) NULL NULL 2006-01-02 22:04:05+00:00 2021-04-01 NULL 23:45:16.654 + +statement error SchemaFetchError +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro-err', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +statement error encode extra_column error: field not in avro +create sink sink_err as select 1 as extra_column, * from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081'); + +statement error unrecognized +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081', + schema.registry.name.strategy = 'typo'); + +statement error empty field key.message +create sink sink_err from into_kafka with ( + connector = 'kafka', + topic = 'test-rw-sink-upsert-avro', + properties.bootstrap.server = 'message_queue:29092', + primary_key = 'int32_field,string_field') +format upsert encode avro ( + schema.registry = 'http://message_queue:8081', + schema.registry.name.strategy = 'record_name_strategy'); + +statement ok +drop sink sink0; + +statement ok +drop table into_kafka; + +statement ok +drop table from_kafka; diff --git a/e2e_test/sink/kafka/protobuf.slt b/e2e_test/sink/kafka/protobuf.slt index 87ab884eddbde..2f827aeda9fc0 100644 --- a/e2e_test/sink/kafka/protobuf.slt +++ b/e2e_test/sink/kafka/protobuf.slt @@ -95,3 +95,12 @@ format plain encode protobuf ( force_append_only = true, schema.location = 's3:///risingwave/proto-recursive', message = 'recursive.AllTypes'); + +statement ok +drop sink sink0; + +statement ok +drop table into_kafka; + +statement ok +drop table from_kafka; diff --git a/e2e_test/sink/kafka/register_schema.py b/e2e_test/sink/kafka/register_schema.py new file mode 100644 index 0000000000000..2606e07bcb89b --- /dev/null +++ b/e2e_test/sink/kafka/register_schema.py @@ -0,0 +1,48 @@ +import sys +from confluent_kafka.schema_registry import SchemaRegistryClient, Schema + + +def main(): + url = sys.argv[1] + subject = sys.argv[2] + with open(sys.argv[3]) as f: + schema_str = f.read() + if 4 < len(sys.argv): + keys = sys.argv[4].split(',') + else: + keys = [] + + client = SchemaRegistryClient({"url": url}) + + if keys: + schema_str = select_keys(schema_str, keys) + else: + schema_str = remove_unsupported(schema_str) + schema = Schema(schema_str, 'AVRO') + client.register_schema(subject, schema) + + +def select_fields(schema_str, f): + import json + root = json.loads(schema_str) + if not isinstance(root, dict): + return schema_str + if root['type'] != 'record': + return schema_str + root['fields'] = f(root['fields']) + return json.dumps(root) + + +def remove_unsupported(schema_str): + return select_fields(schema_str, lambda fields: [f for f in fields if f['name'] not in {'unsupported', 'mon_day_sec_field'}]) + + +def select_keys(schema_str, keys): + def process(fields): + by_name = {f['name']: f for f in fields} + return [by_name[k] for k in keys] + return select_fields(schema_str, process) + + +if __name__ == '__main__': + main() diff --git a/src/connector/src/schema/avro.rs b/src/connector/src/schema/avro.rs new file mode 100644 index 0000000000000..fc12ba90f25aa --- /dev/null +++ b/src/connector/src/schema/avro.rs @@ -0,0 +1,105 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use apache_avro::Schema as AvroSchema; +use risingwave_pb::catalog::PbSchemaRegistryNameStrategy; + +use super::schema_registry::{ + get_subject_by_strategy, handle_sr_list, name_strategy_from_str, Client, ConfluentSchema, + SchemaRegistryAuth, +}; +use super::{ + SchemaFetchError, KEY_MESSAGE_NAME_KEY, MESSAGE_NAME_KEY, NAME_STRATEGY_KEY, + SCHEMA_REGISTRY_KEY, +}; + +pub struct SchemaWithId { + pub schema: Arc, + pub id: i32, +} + +impl TryFrom for SchemaWithId { + type Error = SchemaFetchError; + + fn try_from(fetched: ConfluentSchema) -> Result { + let parsed = + AvroSchema::parse_str(&fetched.content).map_err(|e| SchemaFetchError(e.to_string()))?; + Ok(Self { + schema: Arc::new(parsed), + id: fetched.id, + }) + } +} + +/// Schema registry only +pub async fn fetch_schema( + format_options: &BTreeMap, + topic: &str, +) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> { + let schema_location = format_options + .get(SCHEMA_REGISTRY_KEY) + .ok_or_else(|| SchemaFetchError(format!("{SCHEMA_REGISTRY_KEY} required")))? + .clone(); + let client_config = format_options.into(); + let name_strategy = format_options + .get(NAME_STRATEGY_KEY) + .map(|s| { + name_strategy_from_str(s) + .ok_or_else(|| SchemaFetchError(format!("unrecognized strategy {s}"))) + }) + .transpose()? + .unwrap_or_default(); + let key_record_name = format_options + .get(KEY_MESSAGE_NAME_KEY) + .map(std::ops::Deref::deref); + let val_record_name = format_options + .get(MESSAGE_NAME_KEY) + .map(std::ops::Deref::deref); + + let (key_schema, val_schema) = fetch_schema_inner( + &schema_location, + &client_config, + &name_strategy, + topic, + key_record_name, + val_record_name, + ) + .await + .map_err(|e| SchemaFetchError(e.to_string()))?; + + Ok((key_schema.try_into()?, val_schema.try_into()?)) +} + +async fn fetch_schema_inner( + schema_location: &str, + client_config: &SchemaRegistryAuth, + name_strategy: &PbSchemaRegistryNameStrategy, + topic: &str, + key_record_name: Option<&str>, + val_record_name: Option<&str>, +) -> Result<(ConfluentSchema, ConfluentSchema), risingwave_common::error::RwError> { + let urls = handle_sr_list(schema_location)?; + let client = Client::new(urls, client_config)?; + + let key_subject = get_subject_by_strategy(name_strategy, topic, key_record_name, true)?; + let key_schema = client.get_schema_by_subject(&key_subject).await?; + + let val_subject = get_subject_by_strategy(name_strategy, topic, val_record_name, false)?; + let val_schema = client.get_schema_by_subject(&val_subject).await?; + + Ok((key_schema, val_schema)) +} diff --git a/src/connector/src/schema/mod.rs b/src/connector/src/schema/mod.rs index 3c8694fc8f359..75a521a50ec55 100644 --- a/src/connector/src/schema/mod.rs +++ b/src/connector/src/schema/mod.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod avro; pub mod protobuf; pub mod schema_registry; const MESSAGE_NAME_KEY: &str = "message"; +const KEY_MESSAGE_NAME_KEY: &str = "key.message"; const SCHEMA_LOCATION_KEY: &str = "schema.location"; +const SCHEMA_REGISTRY_KEY: &str = "schema.registry"; +const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; #[derive(Debug)] pub struct SchemaFetchError(pub String); diff --git a/src/connector/src/schema/schema_registry/client.rs b/src/connector/src/schema/schema_registry/client.rs index 591478752d926..6d5778cd367c4 100644 --- a/src/connector/src/schema/schema_registry/client.rs +++ b/src/connector/src/schema/schema_registry/client.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::sync::Arc; @@ -43,6 +43,18 @@ impl From<&HashMap> for SchemaRegistryAuth { } } +impl From<&BTreeMap> for SchemaRegistryAuth { + fn from(props: &BTreeMap) -> Self { + const SCHEMA_REGISTRY_USERNAME: &str = "schema.registry.username"; + const SCHEMA_REGISTRY_PASSWORD: &str = "schema.registry.password"; + + SchemaRegistryAuth { + username: props.get(SCHEMA_REGISTRY_USERNAME).cloned(), + password: props.get(SCHEMA_REGISTRY_PASSWORD).cloned(), + } + } +} + /// An client for communication with schema registry #[derive(Debug)] pub struct Client { @@ -123,7 +135,9 @@ impl Client { Err(RwError::from(ProtocolError(format!( "all request confluent registry all timeout, req path {:?}, urls {:?}, err: {:?}", - path, self.url, errs + path, + self.url, + errs.iter().map(|e| e.to_string()).collect_vec() )))) } diff --git a/src/connector/src/sink/encoder/avro.rs b/src/connector/src/sink/encoder/avro.rs index fc2db75eb4c38..7aff29dbb43b7 100644 --- a/src/connector/src/sink/encoder/avro.rs +++ b/src/connector/src/sink/encoder/avro.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use apache_avro::schema::Schema as AvroSchema; use apache_avro::types::{Record, Value}; -use apache_avro::Writer; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::{DataType, DatumRef, ScalarRefImpl, StructType}; @@ -30,6 +29,28 @@ pub struct AvroEncoder { schema: Schema, col_indices: Option>, avro_schema: Arc, + header: AvroHeader, +} + +#[derive(Debug, Clone, Copy)] +pub enum AvroHeader { + None, + /// + /// + /// * C3 01 + /// * 8-byte little-endian CRC-64-AVRO fingerprint + SingleObject, + /// + /// + /// * 4F 62 6A 01 + /// * schema + /// * 16-byte random sync marker + ContainerFile, + /// + /// + /// * 00 + /// * 4-byte big-endian schema ID + ConfluentSchemaRegistry(i32), } impl AvroEncoder { @@ -37,6 +58,7 @@ impl AvroEncoder { schema: Schema, col_indices: Option>, avro_schema: Arc, + header: AvroHeader, ) -> SinkResult { match &col_indices { Some(col_indices) => validate_fields( @@ -59,12 +81,19 @@ impl AvroEncoder { schema, col_indices, avro_schema, + header, }) } } +pub struct AvroEncoded { + value: Value, + schema: Arc, + header: AvroHeader, +} + impl RowEncoder for AvroEncoder { - type Output = (Value, Arc); + type Output = AvroEncoded; fn schema(&self) -> &Schema { &self.schema @@ -86,16 +115,32 @@ impl RowEncoder for AvroEncoder { }), &self.avro_schema, )?; - Ok((record.into(), self.avro_schema.clone())) + Ok(AvroEncoded { + value: record.into(), + schema: self.avro_schema.clone(), + header: self.header, + }) } } -impl SerTo> for (Value, Arc) { +impl SerTo> for AvroEncoded { fn ser_to(self) -> SinkResult> { - let mut w = Writer::new(&self.1, Vec::new()); - w.append(self.0) - .and_then(|_| w.into_inner()) - .map_err(|e| crate::sink::SinkError::Encode(e.to_string())) + use bytes::BufMut as _; + + let AvroHeader::ConfluentSchemaRegistry(schema_id) = self.header else { + return Err(crate::sink::SinkError::Encode(format!( + "{:?} unsupported yet", + self.header + ))); + }; + let raw = apache_avro::to_avro_datum(&self.schema, self.value) + .map_err(|e| crate::sink::SinkError::Encode(e.to_string()))?; + let mut buf = Vec::with_capacity(1 + 4 + raw.len()); + buf.put_u8(0); + buf.put_i32(schema_id); + buf.put_slice(&raw); + + Ok(buf) } } @@ -616,8 +661,16 @@ mod tests { .unwrap(); let mut record = Record::new(&avro_schema).unwrap(); record.put("f0", Value::String("2".into())); - let res: SinkResult> = (Value::from(record), Arc::new(avro_schema)).ser_to(); - assert_eq!(res.unwrap_err().to_string(), "Encode error: Value does not match schema: Reason: Unsupported value-schema combination"); + let res: SinkResult> = AvroEncoded { + value: Value::from(record), + schema: Arc::new(avro_schema), + header: AvroHeader::ConfluentSchemaRegistry(42), + } + .ser_to(); + assert_eq!( + res.unwrap_err().to_string(), + "Encode error: Value does not match schema" + ); } #[test] @@ -634,6 +687,7 @@ mod tests { ) .unwrap(); let avro_schema = Arc::new(avro_schema); + let header = AvroHeader::None; let schema = Schema::new(vec![ Field::with_name(DataType::Int64, "opt"), @@ -643,10 +697,10 @@ mod tests { Some(ScalarImpl::Int64(31)), Some(ScalarImpl::Int32(15)), ]); - let encoder = AvroEncoder::new(schema, None, avro_schema.clone()).unwrap(); + let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap(); let actual = encoder.encode(row).unwrap(); assert_eq!( - actual.0, + actual.value, Value::Record(vec![ ("req".into(), Value::Int(15)), ("opt".into(), Value::Union(1, Value::Long(31).into())), @@ -655,10 +709,10 @@ mod tests { let schema = Schema::new(vec![Field::with_name(DataType::Int32, "req")]); let row = OwnedRow::new(vec![Some(ScalarImpl::Int32(15))]); - let encoder = AvroEncoder::new(schema, None, avro_schema.clone()).unwrap(); + let encoder = AvroEncoder::new(schema, None, avro_schema.clone(), header).unwrap(); let actual = encoder.encode(row).unwrap(); assert_eq!( - actual.0, + actual.value, Value::Record(vec![ ("req".into(), Value::Int(15)), ("opt".into(), Value::Union(0, Value::Null.into())), @@ -666,7 +720,7 @@ mod tests { ); let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]); - let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone()) else { + let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else { panic!() }; assert_eq!( @@ -679,7 +733,7 @@ mod tests { Field::with_name(DataType::Int32, "req"), Field::with_name(DataType::Varchar, "extra"), ]); - let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone()) else { + let Err(err) = AvroEncoder::new(schema, None, avro_schema.clone(), header) else { panic!() }; assert_eq!( @@ -689,7 +743,7 @@ mod tests { let avro_schema = AvroSchema::parse_str(r#"["null", "long"]"#).unwrap(); let schema = Schema::new(vec![Field::with_name(DataType::Int64, "opt")]); - let Err(err) = AvroEncoder::new(schema, None, avro_schema.into()) else { + let Err(err) = AvroEncoder::new(schema, None, avro_schema.into(), header) else { panic!() }; assert_eq!( @@ -843,7 +897,7 @@ mod tests { /// The encoder is not using these buggy calls and is already tested above. #[test] fn test_encode_avro_lib_bug() { - use apache_avro::Reader; + use apache_avro::{Reader, Writer}; // a record with 2 optional int fields let avro_schema = AvroSchema::parse_str( diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index 83b2ab4f09df0..b55fd534d5eb3 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -25,7 +25,7 @@ mod json; mod proto; pub mod template; -pub use avro::AvroEncoder; +pub use avro::{AvroEncoder, AvroHeader}; pub use json::JsonEncoder; pub use proto::ProtoEncoder; diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 17cb708292890..1e165268300fa 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -30,7 +30,9 @@ use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; use super::encoder::template::TemplateEncoder; use super::encoder::KafkaConnectParams; use super::redis::{KEY_FORMAT, VALUE_FORMAT}; -use crate::sink::encoder::{JsonEncoder, ProtoEncoder, TimestampHandlingMode}; +use crate::sink::encoder::{ + AvroEncoder, AvroHeader, JsonEncoder, ProtoEncoder, TimestampHandlingMode, +}; /// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format, /// for example append-only, upsert or debezium. @@ -67,6 +69,7 @@ pub enum SinkFormatterImpl { AppendOnlyJson(AppendOnlyFormatter), AppendOnlyProto(AppendOnlyFormatter), UpsertJson(UpsertFormatter), + UpsertAvro(UpsertFormatter), DebeziumJson(DebeziumJsonFormatter), AppendOnlyTemplate(AppendOnlyFormatter), UpsertTemplate(UpsertFormatter), @@ -79,6 +82,7 @@ impl SinkFormatterImpl { pk_indices: Vec, db_name: String, sink_from_name: String, + topic: &str, ) -> Result { let err_unsupported = || { Err(SinkError::Config(anyhow!( @@ -211,7 +215,27 @@ impl SinkFormatterImpl { val_encoder, ))) } - _ => err_unsupported(), + SinkEncode::Avro => { + let (key_schema, val_schema) = + crate::schema::avro::fetch_schema(&format_desc.options, topic) + .await + .map_err(|e| SinkError::Config(anyhow!("{e:?}")))?; + let key_encoder = AvroEncoder::new( + schema.clone(), + Some(pk_indices), + key_schema.schema, + AvroHeader::ConfluentSchemaRegistry(key_schema.id), + )?; + let val_encoder = AvroEncoder::new( + schema.clone(), + None, + val_schema.schema, + AvroHeader::ConfluentSchemaRegistry(val_schema.id), + )?; + let formatter = UpsertFormatter::new(key_encoder, val_encoder); + Ok(SinkFormatterImpl::UpsertAvro(formatter)) + } + SinkEncode::Protobuf => err_unsupported(), } } } @@ -225,6 +249,22 @@ macro_rules! dispatch_sink_formatter_impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, SinkFormatterImpl::AppendOnlyProto($name) => $body, SinkFormatterImpl::UpsertJson($name) => $body, + SinkFormatterImpl::UpsertAvro($name) => $body, + SinkFormatterImpl::DebeziumJson($name) => $body, + SinkFormatterImpl::AppendOnlyTemplate($name) => $body, + SinkFormatterImpl::UpsertTemplate($name) => $body, + } + }; +} + +#[macro_export] +macro_rules! dispatch_sink_formatter_str_key_impl { + ($impl:expr, $name:ident, $body:expr) => { + match $impl { + SinkFormatterImpl::AppendOnlyJson($name) => $body, + SinkFormatterImpl::AppendOnlyProto($name) => $body, + SinkFormatterImpl::UpsertJson($name) => $body, + SinkFormatterImpl::UpsertAvro(_) => unreachable!(), SinkFormatterImpl::DebeziumJson($name) => $body, SinkFormatterImpl::AppendOnlyTemplate($name) => $body, SinkFormatterImpl::UpsertTemplate($name) => $body, diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index f77b2b0a88c36..07709f182dc47 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -312,6 +312,7 @@ impl Sink for KafkaSink { self.pk_indices.clone(), self.db_name.clone(), self.sink_from_name.clone(), + &self.config.common.topic, ) .await?; let max_delivery_buffer_size = (self @@ -343,6 +344,7 @@ impl Sink for KafkaSink { self.pk_indices.clone(), self.db_name.clone(), self.sink_from_name.clone(), + &self.config.common.topic, ) .await?; diff --git a/src/connector/src/sink/kinesis.rs b/src/connector/src/sink/kinesis.rs index 605edde3b1eb0..03e044ad37b91 100644 --- a/src/connector/src/sink/kinesis.rs +++ b/src/connector/src/sink/kinesis.rs @@ -29,7 +29,7 @@ use tokio_retry::Retry; use super::catalog::SinkFormatDesc; use super::SinkParam; use crate::common::KinesisCommon; -use crate::dispatch_sink_formatter_impl; +use crate::dispatch_sink_formatter_str_key_impl; use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -94,6 +94,7 @@ impl Sink for KinesisSink { self.pk_indices.clone(), self.db_name.clone(), self.sink_from_name.clone(), + &self.config.common.stream_name, ) .await?; @@ -161,9 +162,15 @@ impl KinesisSinkWriter { db_name: String, sink_from_name: String, ) -> Result { - let formatter = - SinkFormatterImpl::new(format_desc, schema, pk_indices, db_name, sink_from_name) - .await?; + let formatter = SinkFormatterImpl::new( + format_desc, + schema, + pk_indices, + db_name, + sink_from_name, + &config.common.stream_name, + ) + .await?; let client = config .common .build_client() @@ -228,7 +235,7 @@ impl AsyncTruncateSinkWriter for KinesisSinkWriter { chunk: StreamChunk, _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - dispatch_sink_formatter_impl!( + dispatch_sink_formatter_str_key_impl!( &self.formatter, formatter, self.payload_writer.write_chunk(chunk, formatter).await diff --git a/src/connector/src/sink/pulsar.rs b/src/connector/src/sink/pulsar.rs index 9eb57c1ae0771..04da204ef79e7 100644 --- a/src/connector/src/sink/pulsar.rs +++ b/src/connector/src/sink/pulsar.rs @@ -36,7 +36,7 @@ use crate::sink::writer::{ AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, FormattedSink, }; use crate::sink::{DummySinkCommitCoordinator, Result}; -use crate::{deserialize_duration_from_string, dispatch_sink_formatter_impl}; +use crate::{deserialize_duration_from_string, dispatch_sink_formatter_str_key_impl}; pub const PULSAR_SINK: &str = "pulsar"; @@ -194,6 +194,7 @@ impl Sink for PulsarSink { self.downstream_pk.clone(), self.db_name.clone(), self.sink_from_name.clone(), + &self.config.common.topic, ) .await?; @@ -237,9 +238,15 @@ impl PulsarSinkWriter { db_name: String, sink_from_name: String, ) -> Result { - let formatter = - SinkFormatterImpl::new(format_desc, schema, downstream_pk, db_name, sink_from_name) - .await?; + let formatter = SinkFormatterImpl::new( + format_desc, + schema, + downstream_pk, + db_name, + sink_from_name, + &config.common.topic, + ) + .await?; let pulsar = config.common.build_client().await?; let producer = build_pulsar_producer(&pulsar, &config).await?; Ok(Self { @@ -322,7 +329,7 @@ impl AsyncTruncateSinkWriter for PulsarSinkWriter { chunk: StreamChunk, add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - dispatch_sink_formatter_impl!(&self.formatter, formatter, { + dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { let mut payload_writer = PulsarPayloadWriter { producer: &mut self.producer, add_future, diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index af3ec3b981620..910582b9662b7 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -28,7 +28,7 @@ use super::encoder::template::TemplateEncoder; use super::formatter::SinkFormatterImpl; use super::writer::FormattedSink; use super::{SinkError, SinkParam}; -use crate::dispatch_sink_formatter_impl; +use crate::dispatch_sink_formatter_str_key_impl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; use crate::sink::writer::{ AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriter, AsyncTruncateSinkWriterExt, @@ -224,6 +224,7 @@ impl RedisSinkWriter { pk_indices.clone(), db_name, sink_from_name, + "NO_TOPIC", ) .await?; @@ -248,6 +249,7 @@ impl RedisSinkWriter { pk_indices.clone(), "d1".to_string(), "t1".to_string(), + "NO_TOPIC", ) .await?; Ok(Self { @@ -266,7 +268,7 @@ impl AsyncTruncateSinkWriter for RedisSinkWriter { chunk: StreamChunk, _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - dispatch_sink_formatter_impl!(&self.formatter, formatter, { + dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { self.payload_writer.write_chunk(chunk, formatter).await }) } diff --git a/src/connector/src/test_data/all-types.avsc b/src/connector/src/test_data/all-types.avsc new file mode 100644 index 0000000000000..3fea69bbef4ca --- /dev/null +++ b/src/connector/src/test_data/all-types.avsc @@ -0,0 +1,69 @@ +{ + "type": "record", + "name": "AllTypes", + "fields": [ + {"name": "bool_field", "type": ["null", "boolean"]}, + {"name": "string_field", "type": ["null", "string"]}, + {"name": "bytes_field", "type": ["null", "bytes"]}, + {"name": "float_field", "type": ["null", "float"]}, + {"name": "double_field", "type": ["null", "double"]}, + {"name": "int32_field", "type": ["null", "int"]}, + {"name": "int64_field", "type": ["null", "long"]}, + {"name": "record_field", "type": ["null", { + "type": "record", + "name": "Nested", + "fields": [ + {"name": "id", "type": ["null", "int"]}, + {"name": "name", "type": ["null", "string"]} + ] + }]}, + {"name": "array_field", "type": ["null", { + "type": "array", + "items": ["null", { + "type": "array", + "items": ["null", "int"] + }] + }]}, + {"name": "timestamp_micros_field", "type": ["null", {"type": "long", "logicalType": "timestamp-micros"}]}, + {"name": "timestamp_millis_field", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}]}, + {"name": "date_field", "type": ["null", {"type": "int", "logicalType": "date"}]}, + {"name": "time_micros_field", "type": ["null", {"type": "long", "logicalType": "time-micros"}]}, + {"name": "time_millis_field", "type": ["null", {"type": "int", "logicalType": "time-millis"}]}, + {"name": "mon_day_sec_field", "type": ["null", { + "type": "fixed", + "name": "Duration", + "size": 12, + "logicalType": "duration" + }]}, + {"name": "unsupported", "type": ["null", { + "type": "record", + "name": "Unsupported", + "fields": [ + {"name": "enum_field", "type": ["null", { + "type": "enum", + "name": "Suit", + "symbols": ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"] + }]}, + {"name": "map_field", "type": ["null", { + "type": "map", + "values": ["null", "string"] + }]}, + {"name": "union_field", "type": ["null", "string", "double", "boolean"]}, + {"name": "fixed_field", "type": ["null", { + "type": "fixed", + "name": "Int256", + "size": 32 + }]}, + {"name": "decimal_field", "type": ["null", { + "type": "bytes", + "logicalType": "decimal", + "precision": 38, + "scale": 10 + }]}, + {"name": "uuid_field", "type": ["null", {"type": "string", "logicalType": "uuid"}]}, + {"name": "local_micros_field", "type": ["null", {"type": "long", "logicalType": "local-timestamp-micros"}]}, + {"name": "local_millis_field", "type": ["null", {"type": "long", "logicalType": "local-timestamp-millis"}]} + ] + }]} + ] +} diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index ddb1d697b856d..85cbc7b171e20 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -269,7 +269,7 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock hashmap!( Format::Plain => vec![Encode::Json, Encode::Protobuf], - Format::Upsert => vec![Encode::Json], + Format::Upsert => vec![Encode::Json, Encode::Avro], Format::Debezium => vec![Encode::Json], ), KinesisSink::SINK_NAME => hashmap!(