diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index 8ff4a6b253757..8d6f1eb8ab463 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -77,26 +77,31 @@ pub enum SinkFormatterImpl { UpsertTemplate(UpsertFormatter), } -struct Builder<'a> { +#[derive(Debug, Clone)] +pub struct EncoderParams<'a> { format_desc: &'a SinkFormatDesc, schema: Schema, db_name: String, sink_from_name: String, topic: &'a str, } -struct FormatterParams<'a> { - builder: Builder<'a>, - pk_indices: Vec, -} -trait FromBuilder: Sized { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result; + +/// Each encoder shall be able to be built from parameters. +/// +/// This is not part of `RowEncoder` trait, because that one is about how an encoder completes its +/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this +/// one is about how different encoders can be selected from a common SQL interface. +pub trait EncoderBuild: Sized { + /// Pass `pk_indices: None` for value/payload and `Some` for key. Certain encoder builds + /// differently when used as key vs value. + async fn build(params: EncoderParams<'_>, pk_indices: Option>) -> Result; } -impl FromBuilder for JsonEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { +impl EncoderBuild for JsonEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?; let encoder = JsonEncoder::new( - b.schema.clone(), + b.schema, pk_indices, DateHandlingMode::FromCe, TimestampHandlingMode::Milli, @@ -124,9 +129,10 @@ impl FromBuilder for JsonEncoder { Ok(encoder) } } -impl FromBuilder for ProtoEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { - // TODO: const generic + +impl EncoderBuild for ProtoEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { + // TODO: better to be a compile-time assert assert!(pk_indices.is_none()); // By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet. let (descriptor, sid) = @@ -137,11 +143,12 @@ impl FromBuilder for ProtoEncoder { None => ProtoHeader::None, Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid), }; - ProtoEncoder::new(b.schema.clone(), None, descriptor, header) + ProtoEncoder::new(b.schema, None, descriptor, header) } } -impl FromBuilder for AvroEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + +impl EncoderBuild for AvroEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let loader = crate::schema::SchemaLoader::from_format_options(b.topic, &b.format_desc.options) .map_err(|e| SinkError::Config(anyhow!(e)))?; @@ -157,15 +164,16 @@ impl FromBuilder for AvroEncoder { .map_err(|e| SinkError::Config(anyhow!(e)))?, }; AvroEncoder::new( - b.schema.clone(), + b.schema, pk_indices, std::sync::Arc::new(avro), AvroHeader::ConfluentSchemaRegistry(schema_id), ) } } -impl FromBuilder for TemplateEncoder { - async fn from_builder(b: &Builder<'_>, pk_indices: Option>) -> Result { + +impl EncoderBuild for TemplateEncoder { + async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let option_name = match pk_indices { Some(_) => KEY_FORMAT, None => VALUE_FORMAT, @@ -175,36 +183,45 @@ impl FromBuilder for TemplateEncoder { "Cannot find '{option_name}',please set it or use JSON" )) })?; - Ok(TemplateEncoder::new( - b.schema.clone(), - pk_indices, - template.clone(), - )) + Ok(TemplateEncoder::new(b.schema, pk_indices, template.clone())) } } -trait FormatFromBuilder: Sized { - async fn from_builder(b: FormatterParams<'_>) -> Result; + +struct FormatterParams<'a> { + builder: EncoderParams<'a>, + pk_indices: Vec, +} + +/// Each formatter shall be able to be built from parameters. +/// +/// This is not part of `SinkFormatter` trait, because that is about how a formatter completes its +/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this +/// one is about how different formatters can be selected from a common SQL interface. +trait FormatterBuild: Sized { + async fn build(b: FormatterParams<'_>) -> Result; } -impl FormatFromBuilder for AppendOnlyFormatter { - async fn from_builder(b: FormatterParams<'_>) -> Result { +impl FormatterBuild for AppendOnlyFormatter { + async fn build(b: FormatterParams<'_>) -> Result { let key_encoder = match b.pk_indices.is_empty() { true => None, - false => Some(KE::from_builder(&b.builder, Some(b.pk_indices)).await?), + false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?), }; - let val_encoder = VE::from_builder(&b.builder, None).await?; + let val_encoder = VE::build(b.builder, None).await?; Ok(AppendOnlyFormatter::new(key_encoder, val_encoder)) } } -impl FormatFromBuilder for UpsertFormatter { - async fn from_builder(b: FormatterParams<'_>) -> Result { - let key_encoder = KE::from_builder(&b.builder, Some(b.pk_indices)).await?; - let val_encoder = VE::from_builder(&b.builder, None).await?; + +impl FormatterBuild for UpsertFormatter { + async fn build(b: FormatterParams<'_>) -> Result { + let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?; + let val_encoder = VE::build(b.builder, None).await?; Ok(UpsertFormatter::new(key_encoder, val_encoder)) } } -impl FormatFromBuilder for DebeziumJsonFormatter { - async fn from_builder(b: FormatterParams<'_>) -> Result { + +impl FormatterBuild for DebeziumJsonFormatter { + async fn build(b: FormatterParams<'_>) -> Result { assert_eq!(b.builder.format_desc.encode, SinkEncode::Json); Ok(DebeziumJsonFormatter::new( @@ -216,13 +233,6 @@ impl FormatFromBuilder for DebeziumJsonFormatter { )) } } -async fn build(f: F, p: FormatterParams<'_>) -> Result -where - T: FormatFromBuilder, - F: FnOnce(T) -> SinkFormatterImpl, -{ - T::from_builder(p).await.map(f) -} impl SinkFormatterImpl { pub async fn new( @@ -235,7 +245,7 @@ impl SinkFormatterImpl { ) -> Result { use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl}; let p = FormatterParams { - builder: Builder { + builder: EncoderParams { format_desc, schema, db_name, @@ -245,24 +255,38 @@ impl SinkFormatterImpl { pk_indices, }; - match (&format_desc.format, &format_desc.encode) { - (F::AppendOnly, E::Json) => build(Impl::AppendOnlyJson, p).await, - (F::AppendOnly, E::Protobuf) => build(Impl::AppendOnlyProto, p).await, - (F::AppendOnly, E::Template) => build(Impl::AppendOnlyTemplate, p).await, - (F::Upsert, E::Json) => build(Impl::UpsertJson, p).await, - (F::Upsert, E::Avro) => build(Impl::UpsertAvro, p).await, - (F::Upsert, E::Template) => build(Impl::UpsertTemplate, p).await, - (F::Debezium, E::Json) => build(Impl::DebeziumJson, p).await, + // When defining `SinkFormatterImpl` we already linked each variant (eg `AppendOnlyJson`) to + // an instantiation (eg `AppendOnlyFormatter`) that implements the + // trait `FormatterBuild`. + // + // Here we just need to match a `(format, encode)` to a variant, and rustc shall be able to + // find the corresponding instantiation. + + // However, + // `Impl::AppendOnlyJson(FormatterBuild::build(p).await?)` + // fails to be inferred without the following dummy wrapper. + async fn build(p: FormatterParams<'_>) -> Result { + T::build(p).await + } + + Ok(match (&format_desc.format, &format_desc.encode) { + (F::AppendOnly, E::Json) => Impl::AppendOnlyJson(build(p).await?), + (F::AppendOnly, E::Protobuf) => Impl::AppendOnlyProto(build(p).await?), + (F::AppendOnly, E::Template) => Impl::AppendOnlyTemplate(build(p).await?), + (F::Upsert, E::Json) => Impl::UpsertJson(build(p).await?), + (F::Upsert, E::Avro) => Impl::UpsertAvro(build(p).await?), + (F::Upsert, E::Template) => Impl::UpsertTemplate(build(p).await?), + (F::Debezium, E::Json) => Impl::DebeziumJson(build(p).await?), (F::AppendOnly, E::Avro) | (F::Upsert, E::Protobuf) | (F::Debezium, E::Avro | E::Protobuf | E::Template) => { - Err(SinkError::Config(anyhow!( + return Err(SinkError::Config(anyhow!( "sink format/encode unsupported: {:?} {:?}", format_desc.format, format_desc.encode, - ))) + ))); } - } + }) } }