From 313216c9b5df27fb653b47db7a21fa4092ad2ddb Mon Sep 17 00:00:00 2001 From: tabVersion Date: Mon, 6 May 2024 20:03:12 +0100 Subject: [PATCH] fix --- src/connector/src/sink/formatter/mod.rs | 92 ++++++++++++++++++++----- 1 file changed, 73 insertions(+), 19 deletions(-) diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index b386d449ba5d8..b3f2735471c61 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -50,7 +50,7 @@ pub trait SinkFormatter { fn format_chunk( &self, chunk: &StreamChunk, - ) -> impl Iterator, Option)>>; + ) -> impl Iterator, Option)>>; } /// `tri!` in generators yield `Err` and return `()` @@ -157,6 +157,29 @@ impl EncoderBuild for ProtoEncoder { } } +impl EncoderBuild for TextEncoder { + async fn build( + params: EncoderParams<'_>, + pk_indices: Option>, + ) -> crate::sink::Result { + if pk_indices.is_none() { + Err(SinkError::Config(anyhow!( + "TextEncoder requires primary key columns to be specified" + ))) + } else { + let pk_indices = pk_indices.unwrap(); + if pk_indices.len() != 1 { + return Err(SinkError::Config(anyhow!( + "TextEncoder requires exactly one primary key column, but got {} columns", + pk_indices.len() + ))); + } + + Ok(Self::new(params.schema, Some(pk_indices))) + } + } +} + impl EncoderBuild for AvroEncoder { async fn build(b: EncoderParams<'_>, pk_indices: Option>) -> Result { let loader = @@ -279,24 +302,55 @@ impl SinkFormatterImpl { T::build(p).await } - Ok(match (&format_desc.format, &format_desc.encode) { - (F::AppendOnly, E::Json) => Impl::AppendOnlyJson(build(p).await?), - (F::AppendOnly, E::Protobuf) => Impl::AppendOnlyProto(build(p).await?), - (F::AppendOnly, E::Template) => Impl::AppendOnlyTemplate(build(p).await?), - (F::Upsert, E::Json) => Impl::UpsertJson(build(p).await?), - (F::Upsert, E::Avro) => Impl::UpsertAvro(build(p).await?), - (F::Upsert, E::Template) => Impl::UpsertTemplate(build(p).await?), - (F::Debezium, E::Json) => Impl::DebeziumJson(build(p).await?), - (F::AppendOnly, E::Avro) - | (F::Upsert, E::Protobuf) - | (F::Debezium, E::Avro | E::Protobuf | E::Template) => { - return Err(SinkError::Config(anyhow!( - "sink format/encode unsupported: {:?} {:?}", - format_desc.format, - format_desc.encode, - ))); - } - }) + Ok( + match ( + &format_desc.format, + &format_desc.encode, + &format_desc.key_encode, + ) { + (F::AppendOnly, E::Json, Some(E::Text)) => { + Impl::AppendOnlyTextJson(build(p).await?) + } + (F::AppendOnly, E::Json, None | Some(_)) => Impl::AppendOnlyJson(build(p).await?), + (F::AppendOnly, E::Protobuf, Some(E::Text)) => { + Impl::AppendOnlyTextProto(build(p).await?) + } + (F::AppendOnly, E::Protobuf, None | Some(_)) => { + Impl::AppendOnlyProto(build(p).await?) + } + (F::AppendOnly, E::Template, Some(E::Text)) => { + Impl::AppendOnlyTextTemplate(build(p).await?) + } + (F::AppendOnly, E::Template, None | Some(_)) => { + Impl::AppendOnlyTemplate(build(p).await?) + } + (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?), + (F::Upsert, E::Json, None | Some(_)) => Impl::UpsertJson(build(p).await?), + (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?), + (F::Upsert, E::Avro, None | Some(_)) => Impl::UpsertAvro(build(p).await?), + (F::Upsert, E::Template, Some(E::Text)) => { + Impl::UpsertTextTemplate(build(p).await?) + } + (F::Upsert, E::Template, None | Some(_)) => Impl::UpsertTemplate(build(p).await?), + (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?), + (F::AppendOnly | F::Upsert, E::Text, _) => { + return Err(SinkError::Config(anyhow!( + "ENCODE TEXT is only valid as key encode." + ))); + } + (F::AppendOnly, E::Avro, _) + | (F::Upsert, E::Protobuf, _) + | (F::Debezium, E::Json, Some(_)) + | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _) => { + return Err(SinkError::Config(anyhow!( + "sink format/encode/key_encode unsupported: {:?} {:?} {:?}", + format_desc.format, + format_desc.encode, + format_desc.key_encode + ))); + } + }, + ) } }