From e0274600177c97edc5d425311088325eb3b8de3d Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 2 May 2024 16:12:57 +0800 Subject: [PATCH 1/7] FromBuilder for JsonEncoder & ProtoEncoder --- src/connector/src/sink/formatter/mod.rs | 84 ++++++++++++++++--------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index d923d337a3ffb..05807c0b2ba9f 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -77,6 +77,44 @@ pub enum SinkFormatterImpl { UpsertTemplate(UpsertFormatter), } +struct Builder<'a> { + format_desc: &'a SinkFormatDesc, + schema: Schema, + db_name: String, + sink_from_name: String, + topic: &'a str, +} + +impl JsonEncoder { + async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?; + Ok(JsonEncoder::new( + b.schema.clone(), + pk_indices, + DateHandlingMode::FromCe, + TimestampHandlingMode::Milli, + timestamptz_mode, + TimeHandlingMode::Milli, + )) + } +} +impl ProtoEncoder { + async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + // TODO: const generic + assert!(pk_indices.is_none()); + // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet. + let (descriptor, sid) = + crate::schema::protobuf::fetch_descriptor(&b.format_desc.options, b.topic, None) + .await + .map_err(|e| SinkError::Config(anyhow!(e)))?; + let header = match sid { + None => ProtoHeader::None, + Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid), + }; + ProtoEncoder::new(b.schema.clone(), None, descriptor, header) + } +} + impl SinkFormatterImpl { pub async fn new( format_desc: &SinkFormatDesc, @@ -94,47 +132,31 @@ impl SinkFormatterImpl { ))) }; let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?; + let builder = Builder { + format_desc, + schema: schema.clone(), + db_name: db_name.clone(), + sink_from_name: sink_from_name.clone(), + topic, + }; match format_desc.format { SinkFormat::AppendOnly => { - let key_encoder = (!pk_indices.is_empty()).then(|| { - JsonEncoder::new( - schema.clone(), - Some(pk_indices.clone()), - DateHandlingMode::FromCe, - TimestampHandlingMode::Milli, - timestamptz_mode, - TimeHandlingMode::Milli, - ) - }); + let key_encoder = match pk_indices.is_empty() { + true => None, + false => { + Some(JsonEncoder::from_builder(&builder, Some(pk_indices.clone())).await?) + } + }; match format_desc.encode { SinkEncode::Json => { - let val_encoder = JsonEncoder::new( - schema, - None, - DateHandlingMode::FromCe, - TimestampHandlingMode::Milli, - timestamptz_mode, - TimeHandlingMode::Milli, - ); + let val_encoder = JsonEncoder::from_builder(&builder, None).await?; let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::AppendOnlyJson(formatter)) } SinkEncode::Protobuf => { - // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet. - let (descriptor, sid) = crate::schema::protobuf::fetch_descriptor( - &format_desc.options, - topic, - None, - ) - .await - .map_err(|e| SinkError::Config(anyhow!(e)))?; - let header = match sid { - None => ProtoHeader::None, - Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid), - }; - let val_encoder = ProtoEncoder::new(schema, None, descriptor, header)?; + let val_encoder = ProtoEncoder::from_builder(&builder, None).await?; let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) } From 37330d7a84a34a7fbc431a21e017a6db8ce75568 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 2 May 2024 16:14:11 +0800 Subject: [PATCH 2/7] AppendOnlyFormatter::from_builder --- src/connector/src/sink/formatter/mod.rs | 93 ++++++++++++------------- 1 file changed, 46 insertions(+), 47 deletions(-) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 05807c0b2ba9f..d0a4eb6b06d83 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -77,15 +77,18 @@ pub enum SinkFormatterImpl { UpsertTemplate(UpsertFormatter), } -struct Builder<'a> { +pub struct Builder<'a> { format_desc: &'a SinkFormatDesc, schema: Schema, db_name: String, sink_from_name: String, topic: &'a str, } +pub trait FromBuilder: Sized { + async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result; +} -impl JsonEncoder { +impl FromBuilder for JsonEncoder { async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?; Ok(JsonEncoder::new( @@ -98,7 +101,7 @@ impl JsonEncoder { )) } } -impl ProtoEncoder { +impl FromBuilder for ProtoEncoder { async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { // TODO: const generic assert!(pk_indices.is_none()); @@ -114,6 +117,34 @@ impl ProtoEncoder { ProtoEncoder::new(b.schema.clone(), None, descriptor, header) } } +impl FromBuilder for TemplateEncoder { + async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + let option_name = match pk_indices { + Some(_) => KEY_FORMAT, + None => VALUE_FORMAT, + }; + let template = b.format_desc.options.get(option_name).ok_or_else(|| { + SinkError::Config(anyhow!( + "Cannot find '{option_name}',please set it or use JSON" + )) + })?; + Ok(TemplateEncoder::new( + b.schema.clone(), + pk_indices, + template.clone(), + )) + } +} +impl AppendOnlyFormatter { + async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result { + let key_encoder = match pk_indices.is_empty() { + true => None, + false => Some(KE::from_builder(b, Some(pk_indices.clone())).await?), + }; + let val_encoder = VE::from_builder(b, None).await?; + Ok(AppendOnlyFormatter::new(key_encoder, val_encoder)) + } +} impl SinkFormatterImpl { pub async fn new( @@ -141,50 +172,18 @@ impl SinkFormatterImpl { }; match format_desc.format { - SinkFormat::AppendOnly => { - let key_encoder = match pk_indices.is_empty() { - true => None, - false => { - Some(JsonEncoder::from_builder(&builder, Some(pk_indices.clone())).await?) - } - }; - - match format_desc.encode { - SinkEncode::Json => { - let val_encoder = JsonEncoder::from_builder(&builder, None).await?; - let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); - Ok(SinkFormatterImpl::AppendOnlyJson(formatter)) - } - SinkEncode::Protobuf => { - let val_encoder = ProtoEncoder::from_builder(&builder, None).await?; - let formatter = AppendOnlyFormatter::new(key_encoder, val_encoder); - Ok(SinkFormatterImpl::AppendOnlyProto(formatter)) - } - SinkEncode::Avro => err_unsupported(), - SinkEncode::Template => { - let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'key_format',please set it or use JSON" - )) - })?; - let value_format = - format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'redis_value_format',please set it or use JSON" - )) - })?; - let key_encoder = TemplateEncoder::new( - schema.clone(), - Some(pk_indices), - key_format.clone(), - ); - let val_encoder = TemplateEncoder::new(schema, None, value_format.clone()); - Ok(SinkFormatterImpl::AppendOnlyTemplate( - AppendOnlyFormatter::new(Some(key_encoder), val_encoder), - )) - } - } - } + SinkFormat::AppendOnly => match format_desc.encode { + SinkEncode::Json => Ok(SinkFormatterImpl::AppendOnlyJson( + AppendOnlyFormatter::from_builder(&builder, pk_indices).await?, + )), + SinkEncode::Protobuf => Ok(SinkFormatterImpl::AppendOnlyProto( + AppendOnlyFormatter::from_builder(&builder, pk_indices).await?, + )), + SinkEncode::Avro => err_unsupported(), + SinkEncode::Template => Ok(SinkFormatterImpl::AppendOnlyTemplate( + AppendOnlyFormatter::from_builder(&builder, pk_indices).await?, + )), + }, SinkFormat::Debezium => { if format_desc.encode != SinkEncode::Json { return err_unsupported(); From 1d3c736fb06cda471d420c7c42a8f2608439ec89 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 2 May 2024 16:37:29 +0800 Subject: [PATCH 3/7] schemas.enable and AvroEncoder::from_builder --- src/connector/src/sink/formatter/mod.rs | 126 ++++++++++-------------- 1 file changed, 54 insertions(+), 72 deletions(-) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index d0a4eb6b06d83..a431dd4787dc2 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -91,14 +91,33 @@ pub trait FromBuilder: Sized { impl FromBuilder for JsonEncoder { async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?; - Ok(JsonEncoder::new( + let encoder = JsonEncoder::new( b.schema.clone(), pk_indices, DateHandlingMode::FromCe, TimestampHandlingMode::Milli, timestamptz_mode, TimeHandlingMode::Milli, - )) + ); + let encoder = if let Some(s) = b.format_desc.options.get("schemas.enable") { + match s.to_lowercase().parse::() { + Ok(true) => { + let kafka_connect = KafkaConnectParams { + schema_name: format!("{}.{}", b.db_name, b.sink_from_name), + }; + encoder.with_kafka_connect(kafka_connect) + } + Ok(false) => encoder, + _ => { + return Err(SinkError::Config(anyhow!( + "schemas.enable is expected to be `true` or `false`, got {s}", + ))); + } + } + } else { + encoder + }; + Ok(encoder) } } impl FromBuilder for ProtoEncoder { @@ -117,6 +136,30 @@ impl FromBuilder for ProtoEncoder { ProtoEncoder::new(b.schema.clone(), None, descriptor, header) } } +impl FromBuilder for AvroEncoder { + async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + let loader = + crate::schema::SchemaLoader::from_format_options(b.topic, &b.format_desc.options) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + + let (schema_id, avro) = match pk_indices { + Some(_) => loader + .load_key_schema() + .await + .map_err(|e| SinkError::Config(anyhow!(e)))?, + None => loader + .load_val_schema() + .await + .map_err(|e| SinkError::Config(anyhow!(e)))?, + }; + AvroEncoder::new( + b.schema.clone(), + pk_indices, + std::sync::Arc::new(avro), + AvroHeader::ConfluentSchemaRegistry(schema_id), + ) + } +} impl FromBuilder for TemplateEncoder { async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { let option_name = match pk_indices { @@ -162,7 +205,6 @@ impl SinkFormatterImpl { format_desc.encode, ))) }; - let timestamptz_mode = TimestamptzHandlingMode::from_options(&format_desc.options)?; let builder = Builder { format_desc, schema: schema.clone(), @@ -200,87 +242,27 @@ impl SinkFormatterImpl { SinkFormat::Upsert => { match format_desc.encode { SinkEncode::Json => { - let mut key_encoder = JsonEncoder::new( - schema.clone(), - Some(pk_indices), - DateHandlingMode::FromCe, - TimestampHandlingMode::Milli, - timestamptz_mode, - TimeHandlingMode::Milli, - ); - let mut val_encoder = JsonEncoder::new( - schema, - None, - DateHandlingMode::FromCe, - TimestampHandlingMode::Milli, - timestamptz_mode, - TimeHandlingMode::Milli, - ); - - if let Some(s) = format_desc.options.get("schemas.enable") { - match s.to_lowercase().parse::() { - Ok(true) => { - let kafka_connect = KafkaConnectParams { - schema_name: format!("{}.{}", db_name, sink_from_name), - }; - key_encoder = - key_encoder.with_kafka_connect(kafka_connect.clone()); - val_encoder = val_encoder.with_kafka_connect(kafka_connect); - } - Ok(false) => {} - _ => { - return Err(SinkError::Config(anyhow!( - "schemas.enable is expected to be `true` or `false`, got {}", - s - ))); - } - } - }; + let key_encoder = + JsonEncoder::from_builder(&builder, Some(pk_indices)).await?; + let val_encoder = JsonEncoder::from_builder(&builder, None).await?; // Initialize the upsert_stream let formatter = UpsertFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::UpsertJson(formatter)) } SinkEncode::Template => { - let key_format = format_desc.options.get(KEY_FORMAT).ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'key_format',please set it or use JSON" - )) - })?; - let value_format = - format_desc.options.get(VALUE_FORMAT).ok_or_else(|| { - SinkError::Config(anyhow!( - "Cannot find 'redis_value_format',please set it or use JSON" - )) - })?; - let key_encoder = TemplateEncoder::new( - schema.clone(), - Some(pk_indices), - key_format.clone(), - ); - let val_encoder = TemplateEncoder::new(schema, None, value_format.clone()); + let key_encoder = + TemplateEncoder::from_builder(&builder, Some(pk_indices)).await?; + let val_encoder = TemplateEncoder::from_builder(&builder, None).await?; Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new( key_encoder, val_encoder, ))) } SinkEncode::Avro => { - let (key_schema, val_schema) = - crate::schema::avro::fetch_schema(&format_desc.options, topic) - .await - .map_err(|e| SinkError::Config(anyhow!(e)))?; - let key_encoder = AvroEncoder::new( - schema.clone(), - Some(pk_indices), - key_schema.schema, - AvroHeader::ConfluentSchemaRegistry(key_schema.id), - )?; - let val_encoder = AvroEncoder::new( - schema.clone(), - None, - val_schema.schema, - AvroHeader::ConfluentSchemaRegistry(val_schema.id), - )?; + let key_encoder = + AvroEncoder::from_builder(&builder, Some(pk_indices)).await?; + let val_encoder = AvroEncoder::from_builder(&builder, None).await?; let formatter = UpsertFormatter::new(key_encoder, val_encoder); Ok(SinkFormatterImpl::UpsertAvro(formatter)) } From bf85847fab1012a35661e2d2930ca1770bff342e Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 2 May 2024 16:47:54 +0800 Subject: [PATCH 4/7] FormatFromBuilder for Upsert and Debezium --- src/connector/src/sink/formatter/mod.rs | 82 +++++++++++++------------ 1 file changed, 43 insertions(+), 39 deletions(-) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index a431dd4787dc2..64ed9c4b6f95a 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -77,14 +77,14 @@ pub enum SinkFormatterImpl { UpsertTemplate(UpsertFormatter), } -pub struct Builder<'a> { +struct Builder<'a> { format_desc: &'a SinkFormatDesc, schema: Schema, db_name: String, sink_from_name: String, topic: &'a str, } -pub trait FromBuilder: Sized { +trait FromBuilder: Sized { async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result; } @@ -178,7 +178,11 @@ impl FromBuilder for TemplateEncoder { )) } } -impl AppendOnlyFormatter { +trait FormatFromBuilder: Sized { + async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result; +} + +impl FormatFromBuilder for AppendOnlyFormatter { async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result { let key_encoder = match pk_indices.is_empty() { true => None, @@ -188,6 +192,26 @@ impl AppendOnlyFormatter { Ok(AppendOnlyFormatter::new(key_encoder, val_encoder)) } } +impl FormatFromBuilder for UpsertFormatter { + async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result { + let key_encoder = KE::from_builder(b, Some(pk_indices.clone())).await?; + let val_encoder = VE::from_builder(b, None).await?; + Ok(UpsertFormatter::new(key_encoder, val_encoder)) + } +} +impl FormatFromBuilder for DebeziumJsonFormatter { + async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result { + assert_eq!(b.format_desc.encode, SinkEncode::Json); + + Ok(DebeziumJsonFormatter::new( + b.schema.clone(), + pk_indices, + b.db_name.clone(), + b.sink_from_name.clone(), + DebeziumAdapterOpts::default(), + )) + } +} impl SinkFormatterImpl { pub async fn new( @@ -231,44 +255,24 @@ impl SinkFormatterImpl { return err_unsupported(); } - Ok(SinkFormatterImpl::DebeziumJson(DebeziumJsonFormatter::new( - schema, - pk_indices, - db_name, - sink_from_name, - DebeziumAdapterOpts::default(), - ))) + Ok(SinkFormatterImpl::DebeziumJson( + DebeziumJsonFormatter::from_builder(&builder, pk_indices).await?, + )) } - SinkFormat::Upsert => { - match format_desc.encode { - SinkEncode::Json => { - let key_encoder = - JsonEncoder::from_builder(&builder, Some(pk_indices)).await?; - let val_encoder = JsonEncoder::from_builder(&builder, None).await?; - - // Initialize the upsert_stream - let formatter = UpsertFormatter::new(key_encoder, val_encoder); - Ok(SinkFormatterImpl::UpsertJson(formatter)) - } - SinkEncode::Template => { - let key_encoder = - TemplateEncoder::from_builder(&builder, Some(pk_indices)).await?; - let val_encoder = TemplateEncoder::from_builder(&builder, None).await?; - Ok(SinkFormatterImpl::UpsertTemplate(UpsertFormatter::new( - key_encoder, - val_encoder, - ))) - } - SinkEncode::Avro => { - let key_encoder = - AvroEncoder::from_builder(&builder, Some(pk_indices)).await?; - let val_encoder = AvroEncoder::from_builder(&builder, None).await?; - let formatter = UpsertFormatter::new(key_encoder, val_encoder); - Ok(SinkFormatterImpl::UpsertAvro(formatter)) - } - SinkEncode::Protobuf => err_unsupported(), + SinkFormat::Upsert => match format_desc.encode { + SinkEncode::Json => { + let formatter = UpsertFormatter::from_builder(&builder, pk_indices).await?; + Ok(SinkFormatterImpl::UpsertJson(formatter)) } - } + SinkEncode::Template => Ok(SinkFormatterImpl::UpsertTemplate( + UpsertFormatter::from_builder(&builder, pk_indices).await?, + )), + SinkEncode::Avro => { + let formatter = UpsertFormatter::from_builder(&builder, pk_indices).await?; + Ok(SinkFormatterImpl::UpsertAvro(formatter)) + } + SinkEncode::Protobuf => err_unsupported(), + }, } } } From a0e71b7b24497ef29bd146b5a563c3e4fe3b7fae Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Thu, 2 May 2024 17:27:17 +0800 Subject: [PATCH 5/7] infer from enum wrapper --- src/connector/src/sink/formatter/mod.rs | 76 ++++++++++++------------- 1 file changed, 35 insertions(+), 41 deletions(-) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 64ed9c4b6f95a..b7af29c617dda 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -212,6 +212,13 @@ impl FormatFromBuilder for DebeziumJsonFormatter { )) } } +async fn build(f: F, b: &Builder<'_>, pk_indices: Vec) -> Result +where + T: FormatFromBuilder, + F: FnOnce(T) -> SinkFormatterImpl, +{ + T::from_builder(b, pk_indices).await.map(f) +} impl SinkFormatterImpl { pub async fn new( @@ -222,13 +229,6 @@ impl SinkFormatterImpl { sink_from_name: String, topic: &str, ) -> Result { - let err_unsupported = || { - Err(SinkError::Config(anyhow!( - "sink format/encode unsupported: {:?} {:?}", - format_desc.format, - format_desc.encode, - ))) - }; let builder = Builder { format_desc, schema: schema.clone(), @@ -237,42 +237,36 @@ impl SinkFormatterImpl { topic, }; - match format_desc.format { - SinkFormat::AppendOnly => match format_desc.encode { - SinkEncode::Json => Ok(SinkFormatterImpl::AppendOnlyJson( - AppendOnlyFormatter::from_builder(&builder, pk_indices).await?, - )), - SinkEncode::Protobuf => Ok(SinkFormatterImpl::AppendOnlyProto( - AppendOnlyFormatter::from_builder(&builder, pk_indices).await?, - )), - SinkEncode::Avro => err_unsupported(), - SinkEncode::Template => Ok(SinkFormatterImpl::AppendOnlyTemplate( - AppendOnlyFormatter::from_builder(&builder, pk_indices).await?, - )), - }, - SinkFormat::Debezium => { - if format_desc.encode != SinkEncode::Json { - return err_unsupported(); - } + match (&format_desc.format, &format_desc.encode) { + (SinkFormat::AppendOnly, SinkEncode::Json) => { + build(SinkFormatterImpl::AppendOnlyJson, &builder, pk_indices).await + } + (SinkFormat::AppendOnly, SinkEncode::Protobuf) => { + build(SinkFormatterImpl::AppendOnlyProto, &builder, pk_indices).await + } + (SinkFormat::AppendOnly, SinkEncode::Template) => { + build(SinkFormatterImpl::AppendOnlyTemplate, &builder, pk_indices).await + } - Ok(SinkFormatterImpl::DebeziumJson( - DebeziumJsonFormatter::from_builder(&builder, pk_indices).await?, - )) + (SinkFormat::Upsert, SinkEncode::Json) => { + build(SinkFormatterImpl::UpsertJson, &builder, pk_indices).await } - SinkFormat::Upsert => match format_desc.encode { - SinkEncode::Json => { - let formatter = UpsertFormatter::from_builder(&builder, pk_indices).await?; - Ok(SinkFormatterImpl::UpsertJson(formatter)) - } - SinkEncode::Template => Ok(SinkFormatterImpl::UpsertTemplate( - UpsertFormatter::from_builder(&builder, pk_indices).await?, - )), - SinkEncode::Avro => { - let formatter = UpsertFormatter::from_builder(&builder, pk_indices).await?; - Ok(SinkFormatterImpl::UpsertAvro(formatter)) - } - SinkEncode::Protobuf => err_unsupported(), - }, + (SinkFormat::Upsert, SinkEncode::Avro) => { + build(SinkFormatterImpl::UpsertAvro, &builder, pk_indices).await + } + (SinkFormat::Upsert, SinkEncode::Template) => { + build(SinkFormatterImpl::UpsertTemplate, &builder, pk_indices).await + } + (SinkFormat::Debezium, SinkEncode::Json) => { + build(SinkFormatterImpl::DebeziumJson, &builder, pk_indices).await + } + (SinkFormat::AppendOnly, SinkEncode::Avro) + | (SinkFormat::Upsert, SinkEncode::Protobuf) + | (SinkFormat::Debezium, _) => Err(SinkError::Config(anyhow!( + "sink format/encode unsupported: {:?} {:?}", + format_desc.format, + format_desc.encode, + ))), } } } From 8cccf2231781b7374d1164f493e7978c17aef48a Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Fri, 3 May 2024 14:52:51 +0800 Subject: [PATCH 6/7] cleanup (more to come) --- src/connector/src/schema/avro.rs | 32 +-------- src/connector/src/sink/formatter/mod.rs | 95 ++++++++++++------------- 2 files changed, 46 insertions(+), 81 deletions(-) diff --git a/src/connector/src/schema/avro.rs b/src/connector/src/schema/avro.rs index 22c5fb4acadd1..557953dc5b342 100644 --- a/src/connector/src/schema/avro.rs +++ b/src/connector/src/schema/avro.rs @@ -12,42 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; -use std::sync::Arc; - use apache_avro::Schema as AvroSchema; -use super::loader::{LoadedSchema, SchemaLoader}; +use super::loader::LoadedSchema; use super::schema_registry::Subject; use super::SchemaFetchError; -pub struct SchemaWithId { - pub schema: Arc, - pub id: i32, -} - -/// Schema registry only -pub async fn fetch_schema( - format_options: &BTreeMap, - topic: &str, -) -> Result<(SchemaWithId, SchemaWithId), SchemaFetchError> { - let loader = SchemaLoader::from_format_options(topic, format_options)?; - - let (key_id, key_avro) = loader.load_key_schema().await?; - let (val_id, val_avro) = loader.load_val_schema().await?; - - Ok(( - SchemaWithId { - id: key_id, - schema: Arc::new(key_avro), - }, - SchemaWithId { - id: val_id, - schema: Arc::new(val_avro), - }, - )) -} - impl LoadedSchema for AvroSchema { fn compile(primary: Subject, _: Vec) -> Result { AvroSchema::parse_str(&primary.schema.content) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index b7af29c617dda..8ff4a6b253757 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -84,6 +84,10 @@ struct Builder<'a> { sink_from_name: String, topic: &'a str, } +struct FormatterParams<'a> { + builder: Builder<'a>, + pk_indices: Vec, +} trait FromBuilder: Sized { async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result; } @@ -179,45 +183,45 @@ impl FromBuilder for TemplateEncoder { } } trait FormatFromBuilder: Sized { - async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result; + async fn from_builder(b: FormatterParams<'_>) -> Result; } impl FormatFromBuilder for AppendOnlyFormatter { - async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result { - let key_encoder = match pk_indices.is_empty() { + async fn from_builder(b: FormatterParams<'_>) -> Result { + let key_encoder = match b.pk_indices.is_empty() { true => None, - false => Some(KE::from_builder(b, Some(pk_indices.clone())).await?), + false => Some(KE::from_builder(&b.builder, Some(b.pk_indices)).await?), }; - let val_encoder = VE::from_builder(b, None).await?; + let val_encoder = VE::from_builder(&b.builder, None).await?; Ok(AppendOnlyFormatter::new(key_encoder, val_encoder)) } } impl FormatFromBuilder for UpsertFormatter { - async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result { - let key_encoder = KE::from_builder(b, Some(pk_indices.clone())).await?; - let val_encoder = VE::from_builder(b, None).await?; + async fn from_builder(b: FormatterParams<'_>) -> Result { + let key_encoder = KE::from_builder(&b.builder, Some(b.pk_indices)).await?; + let val_encoder = VE::from_builder(&b.builder, None).await?; Ok(UpsertFormatter::new(key_encoder, val_encoder)) } } impl FormatFromBuilder for DebeziumJsonFormatter { - async fn from_builder(b: &Builder<'_>, pk_indices: Vec) -> Result { - assert_eq!(b.format_desc.encode, SinkEncode::Json); + async fn from_builder(b: FormatterParams<'_>) -> Result { + assert_eq!(b.builder.format_desc.encode, SinkEncode::Json); Ok(DebeziumJsonFormatter::new( - b.schema.clone(), - pk_indices, - b.db_name.clone(), - b.sink_from_name.clone(), + b.builder.schema, + b.pk_indices, + b.builder.db_name, + b.builder.sink_from_name, DebeziumAdapterOpts::default(), )) } } -async fn build(f: F, b: &Builder<'_>, pk_indices: Vec) -> Result +async fn build(f: F, p: FormatterParams<'_>) -> Result where T: FormatFromBuilder, F: FnOnce(T) -> SinkFormatterImpl, { - T::from_builder(b, pk_indices).await.map(f) + T::from_builder(p).await.map(f) } impl SinkFormatterImpl { @@ -229,44 +233,35 @@ impl SinkFormatterImpl { sink_from_name: String, topic: &str, ) -> Result { - let builder = Builder { - format_desc, - schema: schema.clone(), - db_name: db_name.clone(), - sink_from_name: sink_from_name.clone(), - topic, + use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl}; + let p = FormatterParams { + builder: Builder { + format_desc, + schema, + db_name, + sink_from_name, + topic, + }, + pk_indices, }; match (&format_desc.format, &format_desc.encode) { - (SinkFormat::AppendOnly, SinkEncode::Json) => { - build(SinkFormatterImpl::AppendOnlyJson, &builder, pk_indices).await - } - (SinkFormat::AppendOnly, SinkEncode::Protobuf) => { - build(SinkFormatterImpl::AppendOnlyProto, &builder, pk_indices).await - } - (SinkFormat::AppendOnly, SinkEncode::Template) => { - build(SinkFormatterImpl::AppendOnlyTemplate, &builder, pk_indices).await - } - - (SinkFormat::Upsert, SinkEncode::Json) => { - build(SinkFormatterImpl::UpsertJson, &builder, pk_indices).await - } - (SinkFormat::Upsert, SinkEncode::Avro) => { - build(SinkFormatterImpl::UpsertAvro, &builder, pk_indices).await - } - (SinkFormat::Upsert, SinkEncode::Template) => { - build(SinkFormatterImpl::UpsertTemplate, &builder, pk_indices).await - } - (SinkFormat::Debezium, SinkEncode::Json) => { - build(SinkFormatterImpl::DebeziumJson, &builder, pk_indices).await + (F::AppendOnly, E::Json) => build(Impl::AppendOnlyJson, p).await, + (F::AppendOnly, E::Protobuf) => build(Impl::AppendOnlyProto, p).await, + (F::AppendOnly, E::Template) => build(Impl::AppendOnlyTemplate, p).await, + (F::Upsert, E::Json) => build(Impl::UpsertJson, p).await, + (F::Upsert, E::Avro) => build(Impl::UpsertAvro, p).await, + (F::Upsert, E::Template) => build(Impl::UpsertTemplate, p).await, + (F::Debezium, E::Json) => build(Impl::DebeziumJson, p).await, + (F::AppendOnly, E::Avro) + | (F::Upsert, E::Protobuf) + | (F::Debezium, E::Avro | E::Protobuf | E::Template) => { + Err(SinkError::Config(anyhow!( + "sink format/encode unsupported: {:?} {:?}", + format_desc.format, + format_desc.encode, + ))) } - (SinkFormat::AppendOnly, SinkEncode::Avro) - | (SinkFormat::Upsert, SinkEncode::Protobuf) - | (SinkFormat::Debezium, _) => Err(SinkError::Config(anyhow!( - "sink format/encode unsupported: {:?} {:?}", - format_desc.format, - format_desc.encode, - ))), } } } From da8df3da7863d67bc1363d195396a6e0c0195c35 Mon Sep 17 00:00:00 2001 From: Xiangjin Date: Sat, 4 May 2024 11:00:22 +0800 Subject: [PATCH 7/7] minor refactors --- src/connector/src/sink/formatter/mod.rs | 134 ++++++++++++++---------- 1 file changed, 79 insertions(+), 55 deletions(-) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 8ff4a6b253757..8d6f1eb8ab463 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -77,26 +77,31 @@ pub enum SinkFormatterImpl { UpsertTemplate(UpsertFormatter), } -struct Builder<'a> { +#[derive(Debug, Clone)] +pub struct EncoderParams<'a> { format_desc: &'a SinkFormatDesc, schema: Schema, db_name: String, sink_from_name: String, topic: &'a str, } -struct FormatterParams<'a> { - builder: Builder<'a>, - pk_indices: Vec, -} -trait FromBuilder: Sized { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result; + +/// Each encoder shall be able to be built from parameters. +/// +/// This is not part of `RowEncoder` trait, because that one is about how an encoder completes its +/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this +/// one is about how different encoders can be selected from a common SQL interface. +pub trait EncoderBuild: Sized { + /// Pass `pk_indices: None` for value/payload and `Some` for key. Certain encoder builds + /// differently when used as key vs value. + async fn build(params: EncoderParams<'_>, pk_indices: Option>) -> Result; } -impl FromBuilder for JsonEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { +impl EncoderBuild for JsonEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?; let encoder = JsonEncoder::new( - b.schema.clone(), + b.schema, pk_indices, DateHandlingMode::FromCe, TimestampHandlingMode::Milli, @@ -124,9 +129,10 @@ impl FromBuilder for JsonEncoder { Ok(encoder) } } -impl FromBuilder for ProtoEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { - // TODO: const generic + +impl EncoderBuild for ProtoEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { + // TODO: better to be a compile-time assert assert!(pk_indices.is_none()); // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet. let (descriptor, sid) = @@ -137,11 +143,12 @@ impl FromBuilder for ProtoEncoder { None => ProtoHeader::None, Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid), }; - ProtoEncoder::new(b.schema.clone(), None, descriptor, header) + ProtoEncoder::new(b.schema, None, descriptor, header) } } -impl FromBuilder for AvroEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + +impl EncoderBuild for AvroEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let loader = crate::schema::SchemaLoader::from_format_options(b.topic, &b.format_desc.options) .map_err(|e| SinkError::Config(anyhow!(e)))?; @@ -157,15 +164,16 @@ impl FromBuilder for AvroEncoder { .map_err(|e| SinkError::Config(anyhow!(e)))?, }; AvroEncoder::new( - b.schema.clone(), + b.schema, pk_indices, std::sync::Arc::new(avro), AvroHeader::ConfluentSchemaRegistry(schema_id), ) } } -impl FromBuilder for TemplateEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + +impl EncoderBuild for TemplateEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let option_name = match pk_indices { Some(_) => KEY_FORMAT, None => VALUE_FORMAT, @@ -175,36 +183,45 @@ impl FromBuilder for TemplateEncoder { "Cannot find '{option_name}',please set it or use JSON" )) })?; - Ok(TemplateEncoder::new( - b.schema.clone(), - pk_indices, - template.clone(), - )) + Ok(TemplateEncoder::new(b.schema, pk_indices, template.clone())) } } -trait FormatFromBuilder: Sized { - async fn from_builder(b: FormatterParams<'_>) -> Result; + +struct FormatterParams<'a> { + builder: EncoderParams<'a>, + pk_indices: Vec, +} + +/// Each formatter shall be able to be built from parameters. +/// +/// This is not part of `SinkFormatter` trait, because that is about how a formatter completes its +/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this +/// one is about how different formatters can be selected from a common SQL interface. +trait FormatterBuild: Sized { + async fn build(b: FormatterParams<'_>) -> Result; } -impl FormatFromBuilder for AppendOnlyFormatter { - async fn from_builder(b: FormatterParams<'_>) -> Result { +impl FormatterBuild for AppendOnlyFormatter { + async fn build(b: FormatterParams<'_>) -> Result { let key_encoder = match b.pk_indices.is_empty() { true => None, - false => Some(KE::from_builder(&b.builder, Some(b.pk_indices)).await?), + false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?), }; - let val_encoder = VE::from_builder(&b.builder, None).await?; + let val_encoder = VE::build(b.builder, None).await?; Ok(AppendOnlyFormatter::new(key_encoder, val_encoder)) } } -impl FormatFromBuilder for UpsertFormatter { - async fn from_builder(b: FormatterParams<'_>) -> Result { - let key_encoder = KE::from_builder(&b.builder, Some(b.pk_indices)).await?; - let val_encoder = VE::from_builder(&b.builder, None).await?; + +impl FormatterBuild for UpsertFormatter { + async fn build(b: FormatterParams<'_>) -> Result { + let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?; + let val_encoder = VE::build(b.builder, None).await?; Ok(UpsertFormatter::new(key_encoder, val_encoder)) } } -impl FormatFromBuilder for DebeziumJsonFormatter { - async fn from_builder(b: FormatterParams<'_>) -> Result { + +impl FormatterBuild for DebeziumJsonFormatter { + async fn build(b: FormatterParams<'_>) -> Result { assert_eq!(b.builder.format_desc.encode, SinkEncode::Json); Ok(DebeziumJsonFormatter::new( @@ -216,13 +233,6 @@ impl FormatFromBuilder for DebeziumJsonFormatter { )) } } -async fn build(f: F, p: FormatterParams<'_>) -> Result -where - T: FormatFromBuilder, - F: FnOnce(T) -> SinkFormatterImpl, -{ - T::from_builder(p).await.map(f) -} impl SinkFormatterImpl { pub async fn new( @@ -235,7 +245,7 @@ impl SinkFormatterImpl { ) -> Result { use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl}; let p = FormatterParams { - builder: Builder { + builder: EncoderParams { format_desc, schema, db_name, @@ -245,24 +255,38 @@ impl SinkFormatterImpl { pk_indices, }; - match (&format_desc.format, &format_desc.encode) { - (F::AppendOnly, E::Json) => build(Impl::AppendOnlyJson, p).await, - (F::AppendOnly, E::Protobuf) => build(Impl::AppendOnlyProto, p).await, - (F::AppendOnly, E::Template) => build(Impl::AppendOnlyTemplate, p).await, - (F::Upsert, E::Json) => build(Impl::UpsertJson, p).await, - (F::Upsert, E::Avro) => build(Impl::UpsertAvro, p).await, - (F::Upsert, E::Template) => build(Impl::UpsertTemplate, p).await, - (F::Debezium, E::Json) => build(Impl::DebeziumJson, p).await, + // When defining `SinkFormatterImpl` we already linked each variant (eg `AppendOnlyJson`) to + // an instantiation (eg `AppendOnlyFormatter`) that implements the + // trait `FormatterBuild`. + // + // Here we just need to match a `(format, encode)` to a variant, and rustc shall be able to + // find the corresponding instantiation. + + // However, + // `Impl::AppendOnlyJson(FormatterBuild::build(p).await?)` + // fails to be inferred without the following dummy wrapper. + async fn build(p: FormatterParams<'_>) -> Result { + T::build(p).await + } + + Ok(match (&format_desc.format, &format_desc.encode) { + (F::AppendOnly, E::Json) => Impl::AppendOnlyJson(build(p).await?), + (F::AppendOnly, E::Protobuf) => Impl::AppendOnlyProto(build(p).await?), + (F::AppendOnly, E::Template) => Impl::AppendOnlyTemplate(build(p).await?), + (F::Upsert, E::Json) => Impl::UpsertJson(build(p).await?), + (F::Upsert, E::Avro) => Impl::UpsertAvro(build(p).await?), + (F::Upsert, E::Template) => Impl::UpsertTemplate(build(p).await?), + (F::Debezium, E::Json) => Impl::DebeziumJson(build(p).await?), (F::AppendOnly, E::Avro) | (F::Upsert, E::Protobuf) | (F::Debezium, E::Avro | E::Protobuf | E::Template) => { - Err(SinkError::Config(anyhow!( + return Err(SinkError::Config(anyhow!( "sink format/encode unsupported: {:?} {:?}", format_desc.format, format_desc.encode, - ))) + ))); } - } + }) } }