Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tabVersion committed May 6, 2024
1 parent 15924d4 commit 313216c
Showing 1 changed file with 73 additions and 19 deletions.
92 changes: 73 additions & 19 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub trait SinkFormatter {
fn format_chunk(
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
) -> impl Iterator<Item=Result<(Option<Self::K>, Option<Self::V>)>>;
}

/// `tri!` in generators yield `Err` and return `()`
Expand Down Expand Up @@ -157,6 +157,29 @@ impl EncoderBuild for ProtoEncoder {
}
}

impl EncoderBuild for TextEncoder {
async fn build(
params: EncoderParams<'_>,
pk_indices: Option<Vec<usize>>,
) -> crate::sink::Result<Self> {
if pk_indices.is_none() {
Err(SinkError::Config(anyhow!(
"TextEncoder requires primary key columns to be specified"
)))
} else {
let pk_indices = pk_indices.unwrap();
if pk_indices.len() != 1 {
return Err(SinkError::Config(anyhow!(
"TextEncoder requires exactly one primary key column, but got {} columns",
pk_indices.len()
)));
}

Ok(Self::new(params.schema, Some(pk_indices)))
}
}
}

impl EncoderBuild for AvroEncoder {
async fn build(b: EncoderParams<'_>, pk_indices: Option<Vec<usize>>) -> Result<Self> {
let loader =
Expand Down Expand Up @@ -279,24 +302,55 @@ impl SinkFormatterImpl {
T::build(p).await
}

Ok(match (&format_desc.format, &format_desc.encode) {
(F::AppendOnly, E::Json) => Impl::AppendOnlyJson(build(p).await?),
(F::AppendOnly, E::Protobuf) => Impl::AppendOnlyProto(build(p).await?),
(F::AppendOnly, E::Template) => Impl::AppendOnlyTemplate(build(p).await?),
(F::Upsert, E::Json) => Impl::UpsertJson(build(p).await?),
(F::Upsert, E::Avro) => Impl::UpsertAvro(build(p).await?),
(F::Upsert, E::Template) => Impl::UpsertTemplate(build(p).await?),
(F::Debezium, E::Json) => Impl::DebeziumJson(build(p).await?),
(F::AppendOnly, E::Avro)
| (F::Upsert, E::Protobuf)
| (F::Debezium, E::Avro | E::Protobuf | E::Template) => {
return Err(SinkError::Config(anyhow!(
"sink format/encode unsupported: {:?} {:?}",
format_desc.format,
format_desc.encode,
)));
}
})
Ok(
match (
&format_desc.format,
&format_desc.encode,
&format_desc.key_encode,
) {
(F::AppendOnly, E::Json, Some(E::Text)) => {
Impl::AppendOnlyTextJson(build(p).await?)
}
(F::AppendOnly, E::Json, None | Some(_)) => Impl::AppendOnlyJson(build(p).await?),
(F::AppendOnly, E::Protobuf, Some(E::Text)) => {
Impl::AppendOnlyTextProto(build(p).await?)
}
(F::AppendOnly, E::Protobuf, None | Some(_)) => {
Impl::AppendOnlyProto(build(p).await?)
}
(F::AppendOnly, E::Template, Some(E::Text)) => {
Impl::AppendOnlyTextTemplate(build(p).await?)
}
(F::AppendOnly, E::Template, None | Some(_)) => {
Impl::AppendOnlyTemplate(build(p).await?)
}
(F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?),
(F::Upsert, E::Json, None | Some(_)) => Impl::UpsertJson(build(p).await?),
(F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?),
(F::Upsert, E::Avro, None | Some(_)) => Impl::UpsertAvro(build(p).await?),
(F::Upsert, E::Template, Some(E::Text)) => {
Impl::UpsertTextTemplate(build(p).await?)
}
(F::Upsert, E::Template, None | Some(_)) => Impl::UpsertTemplate(build(p).await?),
(F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?),
(F::AppendOnly | F::Upsert, E::Text, _) => {
return Err(SinkError::Config(anyhow!(
"ENCODE TEXT is only valid as key encode."
)));
}
(F::AppendOnly, E::Avro, _)
| (F::Upsert, E::Protobuf, _)
| (F::Debezium, E::Json, Some(_))
| (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _) => {
return Err(SinkError::Config(anyhow!(
"sink format/encode/key_encode unsupported: {:?} {:?} {:?}",
format_desc.format,
format_desc.encode,
format_desc.key_encode
)));
}
},
)
}
}

Expand Down

0 comments on commit 313216c

Please sign in to comment.