Skip to content

Commit

Permalink
minor refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu committed May 6, 2024
1 parent 8cccf22 commit da8df3d
Showing 1 changed file with 79 additions and 55 deletions.
134 changes: 79 additions & 55 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,26 +77,31 @@ pub enum SinkFormatterImpl {
UpsertTemplate(UpsertFormatter<TemplateEncoder, TemplateEncoder>),
}

struct Builder<'a> {
#[derive(Debug, Clone)]
pub struct EncoderParams<'a> {
format_desc: &'a SinkFormatDesc,
schema: Schema,
db_name: String,
sink_from_name: String,
topic: &'a str,
}
struct FormatterParams<'a> {
builder: Builder<'a>,
pk_indices: Vec<usize>,
}
trait FromBuilder: Sized {
async fn from_builder(b: &Builder<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;

/// Each encoder shall be able to be built from parameters.
///
/// This is not part of `RowEncoder` trait, because that one is about how an encoder completes its
/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
/// one is about how different encoders can be selected from a common SQL interface.
pub trait EncoderBuild: Sized {
/// Pass `pk_indices: None` for value/payload and `Some` for key. Certain encoder builds
/// differently when used as key vs value.
async fn build(params: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self>;
}

impl FromBuilder for JsonEncoder {
async fn from_builder(b: &Builder<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
impl EncoderBuild for JsonEncoder {
async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
let timestamptz_mode = TimestamptzHandlingMode::from_options(&b.format_desc.options)?;
let encoder = JsonEncoder::new(
b.schema.clone(),
b.schema,
pk_indices,
DateHandlingMode::FromCe,
TimestampHandlingMode::Milli,
Expand Down Expand Up @@ -124,9 +129,10 @@ impl FromBuilder for JsonEncoder {
Ok(encoder)
}
}
impl FromBuilder for ProtoEncoder {
async fn from_builder(b: &Builder<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
// TODO: const generic

impl EncoderBuild for ProtoEncoder {
async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
// TODO: better to be a compile-time assert
assert!(pk_indices.is_none());
// By passing `None` as `aws_auth_props`, reading from `s3://` not supported yet.
let (descriptor, sid) =
Expand All @@ -137,11 +143,12 @@ impl FromBuilder for ProtoEncoder {
None => ProtoHeader::None,
Some(sid) => ProtoHeader::ConfluentSchemaRegistry(sid),
};
ProtoEncoder::new(b.schema.clone(), None, descriptor, header)
ProtoEncoder::new(b.schema, None, descriptor, header)
}
}
impl FromBuilder for AvroEncoder {
async fn from_builder(b: &Builder<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {

impl EncoderBuild for AvroEncoder {
async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
let loader =
crate::schema::SchemaLoader::from_format_options(b.topic, &b.format_desc.options)
.map_err(|e| SinkError::Config(anyhow!(e)))?;
Expand All @@ -157,15 +164,16 @@ impl FromBuilder for AvroEncoder {
.map_err(|e| SinkError::Config(anyhow!(e)))?,
};
AvroEncoder::new(
b.schema.clone(),
b.schema,
pk_indices,
std::sync::Arc::new(avro),
AvroHeader::ConfluentSchemaRegistry(schema_id),
)
}
}
impl FromBuilder for TemplateEncoder {
async fn from_builder(b: &Builder<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {

impl EncoderBuild for TemplateEncoder {
async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
let option_name = match pk_indices {
Some(_) => KEY_FORMAT,
None => VALUE_FORMAT,
Expand All @@ -175,36 +183,45 @@ impl FromBuilder for TemplateEncoder {
"Cannot find '{option_name}',please set it or use JSON"
))
})?;
Ok(TemplateEncoder::new(
b.schema.clone(),
pk_indices,
template.clone(),
))
Ok(TemplateEncoder::new(b.schema, pk_indices, template.clone()))
}
}
trait FormatFromBuilder: Sized {
async fn from_builder(b: FormatterParams<'_>) -> Result<Self>;

struct FormatterParams<'a> {
builder: EncoderParams<'a>,
pk_indices: Vec<usize>,
}

/// Each formatter shall be able to be built from parameters.
///
/// This is not part of `SinkFormatter` trait, because that is about how a formatter completes its
/// own job as a self-contained unit, with a custom `new` asking for only necessary info; while this
/// one is about how different formatters can be selected from a common SQL interface.
trait FormatterBuild: Sized {
async fn build(b: FormatterParams<'_>) -> Result<Self>;
}

impl<KE: FromBuilder, VE: FromBuilder> FormatFromBuilder for AppendOnlyFormatter<KE, VE> {
async fn from_builder(b: FormatterParams<'_>) -> Result<Self> {
impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for AppendOnlyFormatter<KE, VE> {
async fn build(b: FormatterParams<'_>) -> Result<Self> {
let key_encoder = match b.pk_indices.is_empty() {
true => None,
false => Some(KE::from_builder(&b.builder, Some(b.pk_indices)).await?),
false => Some(KE::build(b.builder.clone(), Some(b.pk_indices)).await?),
};
let val_encoder = VE::from_builder(&b.builder, None).await?;
let val_encoder = VE::build(b.builder, None).await?;
Ok(AppendOnlyFormatter::new(key_encoder, val_encoder))
}
}
impl<KE: FromBuilder, VE: FromBuilder> FormatFromBuilder for UpsertFormatter<KE, VE> {
async fn from_builder(b: FormatterParams<'_>) -> Result<Self> {
let key_encoder = KE::from_builder(&b.builder, Some(b.pk_indices)).await?;
let val_encoder = VE::from_builder(&b.builder, None).await?;

impl<KE: EncoderBuild, VE: EncoderBuild> FormatterBuild for UpsertFormatter<KE, VE> {
async fn build(b: FormatterParams<'_>) -> Result<Self> {
let key_encoder = KE::build(b.builder.clone(), Some(b.pk_indices)).await?;
let val_encoder = VE::build(b.builder, None).await?;
Ok(UpsertFormatter::new(key_encoder, val_encoder))
}
}
impl FormatFromBuilder for DebeziumJsonFormatter {
async fn from_builder(b: FormatterParams<'_>) -> Result<Self> {

impl FormatterBuild for DebeziumJsonFormatter {
async fn build(b: FormatterParams<'_>) -> Result<Self> {
assert_eq!(b.builder.format_desc.encode, SinkEncode::Json);

Ok(DebeziumJsonFormatter::new(
Expand All @@ -216,13 +233,6 @@ impl FormatFromBuilder for DebeziumJsonFormatter {
))
}
}
async fn build<T, F>(f: F, p: FormatterParams<'_>) -> Result<SinkFormatterImpl>
where
T: FormatFromBuilder,
F: FnOnce(T) -> SinkFormatterImpl,
{
T::from_builder(p).await.map(f)
}

impl SinkFormatterImpl {
pub async fn new(
Expand All @@ -235,7 +245,7 @@ impl SinkFormatterImpl {
) -> Result<Self> {
use {SinkEncode as E, SinkFormat as F, SinkFormatterImpl as Impl};
let p = FormatterParams {
builder: Builder {
builder: EncoderParams {
format_desc,
schema,
db_name,
Expand All @@ -245,24 +255,38 @@ impl SinkFormatterImpl {
pk_indices,
};

match (&format_desc.format, &format_desc.encode) {
(F::AppendOnly, E::Json) => build(Impl::AppendOnlyJson, p).await,
(F::AppendOnly, E::Protobuf) => build(Impl::AppendOnlyProto, p).await,
(F::AppendOnly, E::Template) => build(Impl::AppendOnlyTemplate, p).await,
(F::Upsert, E::Json) => build(Impl::UpsertJson, p).await,
(F::Upsert, E::Avro) => build(Impl::UpsertAvro, p).await,
(F::Upsert, E::Template) => build(Impl::UpsertTemplate, p).await,
(F::Debezium, E::Json) => build(Impl::DebeziumJson, p).await,
// When defining `SinkFormatterImpl` we already linked each variant (eg `AppendOnlyJson`) to
// an instantiation (eg `AppendOnlyFormatter<JsonEncoder, JsonEncoder>`) that implements the
// trait `FormatterBuild`.
//
// Here we just need to match a `(format, encode)` to a variant, and rustc shall be able to
// find the corresponding instantiation.

// However,
// `Impl::AppendOnlyJson(FormatterBuild::build(p).await?)`
// fails to be inferred without the following dummy wrapper.
async fn build<T: FormatterBuild>(p: FormatterParams<'_>) -> Result<T> {
T::build(p).await
}

Ok(match (&format_desc.format, &format_desc.encode) {
(F::AppendOnly, E::Json) => Impl::AppendOnlyJson(build(p).await?),
(F::AppendOnly, E::Protobuf) => Impl::AppendOnlyProto(build(p).await?),
(F::AppendOnly, E::Template) => Impl::AppendOnlyTemplate(build(p).await?),
(F::Upsert, E::Json) => Impl::UpsertJson(build(p).await?),
(F::Upsert, E::Avro) => Impl::UpsertAvro(build(p).await?),
(F::Upsert, E::Template) => Impl::UpsertTemplate(build(p).await?),
(F::Debezium, E::Json) => Impl::DebeziumJson(build(p).await?),
(F::AppendOnly, E::Avro)
| (F::Upsert, E::Protobuf)
| (F::Debezium, E::Avro | E::Protobuf | E::Template) => {
Err(SinkError::Config(anyhow!(
return Err(SinkError::Config(anyhow!(
"sink format/encode unsupported: {:?} {:?}",
format_desc.format,
format_desc.encode,
)))
)));
}
}
})
}
}

Expand Down

0 comments on commit da8df3d

Please sign in to comment.