diff --git a/Cargo.lock b/Cargo.lock index 0667152e8fddd..41bad70cdd31d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11629,8 +11629,7 @@ dependencies = [ ] [[package]] - -name = "risingwave_meta_model_migration" +name = "risingwave_meta_model" version = "2.1.0-rc.2" dependencies = [ "prost 0.13.1", @@ -11643,7 +11642,7 @@ dependencies = [ ] [[package]] -name = "risingwave_meta_model_v2" +name = "risingwave_meta_model_migration" version = "2.1.0-rc.2" dependencies = [ "async-std", diff --git a/ci/scripts/e2e-kafka-sink-test.sh b/ci/scripts/e2e-kafka-sink-test.sh index e9dc660a95e67..7318893e51040 100755 --- a/ci/scripts/e2e-kafka-sink-test.sh +++ b/ci/scripts/e2e-kafka-sink-test.sh @@ -11,6 +11,7 @@ rpk topic create test-rw-sink-upsert-schema rpk topic create test-rw-sink-debezium rpk topic create test-rw-sink-without-snapshot rpk topic create test-rw-sink-text-key-id +rpk topic create test-rw-sink-bytes-key-id sqllogictest -p 4566 -d dev 'e2e_test/sink/kafka/create_sink.slt' sleep 2 diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 338465c471af9..835c7ab169ce4 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -188,7 +188,7 @@ create sink invalid_pk_column from t_kafka with ( ### Test sink with key encode ### -statement error sink key encode unsupported: JSON, only TEXT supported +statement error sink key encode unsupported: JSON, only TEXT and BYTES supported create sink sink_text_error from t_kafka with ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', @@ -198,8 +198,7 @@ format plain encode json ( force_append_only='true' ) key encode json ; -statement error -# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s +statement error KEY ENCODE TEXT expects only one primary key, but got 2 create sink sink_text_error from t_kafka with ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', @@ -230,6 +229,54 @@ format plain encode json ( force_append_only='true' ) key encode text ; +statement error sink key encode unsupported: JSON, only TEXT and BYTES supported +create sink sink_bytes_error as ( + select int8send(id) as id_bytes, * from t_kafka +) with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', + primary_key = 'id_bytes') +format plain encode json ( + force_append_only='true' +) key encode json; + +statement error KEY ENCODE BYTES expects only one primary key, but got 2 +create sink sink_bytes_error as ( + select int8send(id) as id_bytes, '\x1234'::bytea as other_bytea, * from t_kafka +) with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', + primary_key = 'id_bytes, other_bytea') +format plain encode json ( + force_append_only='true' +) key encode bytes; + +statement error key encode bytes only works with kafka connector, but found kinesis +create sink sink_bytes_json as ( + select int8send(id) as id_bytes, * from t_kafka +) with ( + connector = 'kinesis', + topic = 'topic', + properties.bootstrap.server = 'message_queue:29092' +) +format plain encode json ( + force_append_only='true' +) key encode bytes; + +statement ok +create sink sink_bytes_json as ( + select int8send(id) as id_bytes, * from t_kafka +) with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', + primary_key = 'id_bytes') +format plain encode json ( + force_append_only='true' +) key encode bytes; + statement ok create table t_sink_text_id (id int) include key as rw_key @@ -239,6 +286,15 @@ with ( topic = 'test-rw-sink-text-key-id', ) format plain encode json; +statement ok +create table t_sink_bytea_id (id int) +include key as rw_key +with ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'test-rw-sink-bytes-key-id', +) format plain encode json; + #====== statement ok @@ -286,6 +342,18 @@ select rw_key from t_sink_text_id order by rw_key \x36 \x37 +query T +select rw_key from t_sink_bytea_id order by rw_key +---- +\x0000000000000001 +\x0000000000000002 +\x0000000000000003 +\x0000000000000004 +\x0000000000000005 +\x0000000000000006 +\x0000000000000007 + + statement ok insert into t_kafka values (8, 'lv7Eq3g8hx', 194, 19036, 28641, 13652.073, 993.408963466774, '2023-04-13 13:52:09.356742', '\xDEADBABE', '04:00:00.1234', '1970-01-01', '00:00:01.123456', '0001-01-01 00:00:00.123456'::timestamptz, '{}'), @@ -294,3 +362,9 @@ insert into t_kafka values statement ok drop table t_sink_text_id; + +statement ok +drop table t_sink_bytea_id; + +statement ok +drop sink sink_bytes_json; diff --git a/src/connector/src/sink/catalog/mod.rs b/src/connector/src/sink/catalog/mod.rs index 23f34eab97417..e3620fe5a5be6 100644 --- a/src/connector/src/sink/catalog/mod.rs +++ b/src/connector/src/sink/catalog/mod.rs @@ -149,6 +149,7 @@ pub enum SinkEncode { Template, Parquet, Text, + Bytes, } impl Display for SinkEncode { @@ -205,6 +206,7 @@ impl SinkFormatDesc { SinkEncode::Template => E::Template, SinkEncode::Parquet => E::Parquet, SinkEncode::Text => E::Text, + SinkEncode::Bytes => E::Bytes, }; let encode = mapping_encode(&self.encode); @@ -261,10 +263,10 @@ impl TryFrom for SinkFormatDesc { } }; let key_encode = match &value.key_encode() { + E::Bytes => Some(SinkEncode::Bytes), E::Text => Some(SinkEncode::Text), E::Unspecified => None, encode @ (E::Avro - | E::Bytes | E::Csv | E::Json | E::Protobuf diff --git a/src/connector/src/sink/encoder/bytes.rs b/src/connector/src/sink/encoder/bytes.rs new file mode 100644 index 0000000000000..a80a6f9dbf1fe --- /dev/null +++ b/src/connector/src/sink/encoder/bytes.rs @@ -0,0 +1,101 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::Schema; +use risingwave_common::types::DataType; + +use super::RowEncoder; + +pub struct BytesEncoder { + pub schema: Schema, + // the column must contain only one element + pub col_index: usize, +} + +impl BytesEncoder { + pub fn new(schema: Schema, col_index: usize) -> Self { + Self { schema, col_index } + } +} + +impl RowEncoder for BytesEncoder { + type Output = Vec; + + fn schema(&self) -> &risingwave_common::catalog::Schema { + &self.schema + } + + fn col_indices(&self) -> Option<&[usize]> { + Some(std::slice::from_ref(&self.col_index)) + } + + fn encode_cols( + &self, + row: impl risingwave_common::row::Row, + col_indices: impl Iterator, + ) -> crate::sink::Result { + // It is guaranteed by the caller that col_indices contains only one element + let mut result = Vec::new(); + for col_index in col_indices { + let datum = row.datum_at(col_index); + let data_type = &self.schema.fields[col_index].data_type; + if data_type == &DataType::Bytea { + if let Some(scalar_impl) = datum { + result = scalar_impl.into_bytea().to_vec(); + } else { + result = vec![]; + } + } else { + return Err(crate::sink::SinkError::Encode(format!( + "Unsupported data type: expected bytea, got {}", + data_type + ))); + } + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use risingwave_common::catalog::Field; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::ScalarImpl; + + use super::*; + + #[test] + fn test_bytes_encoder_ser_bytes() { + let schema = Schema::new(vec![Field::with_name(DataType::Bytea, "col1")]); + let encoder = BytesEncoder::new(schema, 0); + + let row = OwnedRow::new(vec![Some(ScalarImpl::Bytea(b"some_bytes".to_vec().into()))]); + assert_eq!( + encoder.encode_cols(&row, std::iter::once(0)).unwrap(), + b"some_bytes".to_vec() + ); + + let row = OwnedRow::new(vec![None]); + assert_eq!( + encoder.encode_cols(&row, std::iter::once(0)).unwrap(), + Vec::::new() + ); + + let schema = Schema::new(vec![Field::with_name(DataType::Int16, "col1")]); + let encoder = BytesEncoder::new(schema, 0); + let row = OwnedRow::new(vec![Some(ScalarImpl::Int16(123))]); + assert!(encoder.encode_cols(&row, std::iter::once(0)).is_err()); + } +} diff --git a/src/connector/src/sink/encoder/mod.rs b/src/connector/src/sink/encoder/mod.rs index dc00a89143c57..16f1b45c01ed7 100644 --- a/src/connector/src/sink/encoder/mod.rs +++ b/src/connector/src/sink/encoder/mod.rs @@ -22,6 +22,7 @@ use crate::sink::Result; mod avro; mod bson; +pub mod bytes; mod json; mod proto; pub mod template; diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index bb0a41f63c33d..5c247c311a034 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -14,6 +14,7 @@ use anyhow::{anyhow, Context}; use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::Field; use crate::sink::{Result, SinkError}; @@ -28,6 +29,7 @@ use risingwave_common::types::DataType; pub use upsert::UpsertFormatter; use super::catalog::{SinkEncode, SinkFormat, SinkFormatDesc}; +use super::encoder::bytes::BytesEncoder; use super::encoder::template::TemplateEncoder; use super::encoder::text::TextEncoder; use super::encoder::{ @@ -74,23 +76,31 @@ pub enum SinkFormatterImpl { // append-only AppendOnlyJson(AppendOnlyFormatter), AppendOnlyTextJson(AppendOnlyFormatter), + AppendOnlyBytesJson(AppendOnlyFormatter), AppendOnlyAvro(AppendOnlyFormatter), AppendOnlyTextAvro(AppendOnlyFormatter), + AppendOnlyBytesAvro(AppendOnlyFormatter), AppendOnlyProto(AppendOnlyFormatter), AppendOnlyTextProto(AppendOnlyFormatter), + AppendOnlyBytesProto(AppendOnlyFormatter), AppendOnlyTemplate(AppendOnlyFormatter), AppendOnlyTextTemplate(AppendOnlyFormatter), + AppendOnlyBytesTemplate(AppendOnlyFormatter), // upsert UpsertJson(UpsertFormatter), UpsertTextJson(UpsertFormatter), + UpsertBytesJson(UpsertFormatter), UpsertAvro(UpsertFormatter), UpsertTextAvro(UpsertFormatter), + UpsertBytesAvro(UpsertFormatter), // `UpsertFormatter` is intentionally left out // to avoid using `ProtoEncoder` as key: // UpsertTextProto(UpsertFormatter), + UpsertBytesProto(UpsertFormatter), UpsertTemplate(UpsertFormatter), UpsertTextTemplate(UpsertFormatter), + UpsertBytesTemplate(UpsertFormatter), // debezium DebeziumJson(DebeziumJsonFormatter), } @@ -167,27 +177,54 @@ impl EncoderBuild for ProtoEncoder { } } -impl EncoderBuild for TextEncoder { +fn ensure_only_one_pk<'a>( + data_type_name: &'a str, + params: &'a EncoderParams<'_>, + pk_indices: &'a Option>, +) -> Result<(usize, &'a Field)> { + let Some(pk_indices) = pk_indices else { + return Err(SinkError::Config(anyhow!( + "{}Encoder requires primary key columns to be specified", + data_type_name + ))); + }; + if pk_indices.len() != 1 { + return Err(SinkError::Config(anyhow!( + "KEY ENCODE {} expects only one primary key, but got {}", + data_type_name, + pk_indices.len(), + ))); + } + + let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| { + SinkError::Config(anyhow!( + "The primary key column index {} is out of bounds in schema {:?}", + pk_indices[0], + params.schema + )) + })?; + + Ok((pk_indices[0], schema_ref)) +} + +impl EncoderBuild for BytesEncoder { async fn build(params: EncoderParams<'_>, pk_indices: Option>) -> Result { - let Some(pk_indices) = pk_indices else { - return Err(SinkError::Config(anyhow!( - "TextEncoder requires primary key columns to be specified" - ))); - }; - if pk_indices.len() != 1 { - return Err(SinkError::Config(anyhow!( - "The key encode is TEXT, but the primary key has {} columns. The key encode TEXT requires the primary key to be a single column", - pk_indices.len() - ))); + let (pk_index, schema_ref) = ensure_only_one_pk("BYTES", ¶ms, &pk_indices)?; + if let DataType::Bytea = schema_ref.data_type() { + Ok(BytesEncoder::new(params.schema, pk_index)) + } else { + Err(SinkError::Config(anyhow!( + "The key encode is BYTES, but the primary key column {} has type {}", + schema_ref.name, + schema_ref.data_type + ))) } + } +} - let schema_ref = params.schema.fields().get(pk_indices[0]).ok_or_else(|| { - SinkError::Config(anyhow!( - "The primary key column index {} is out of bounds in schema {:?}", - pk_indices[0], - params.schema - )) - })?; +impl EncoderBuild for TextEncoder { + async fn build(params: EncoderParams<'_>, pk_indices: Option>) -> Result { + let (pk_index, schema_ref) = ensure_only_one_pk("TEXT", ¶ms, &pk_indices)?; match &schema_ref.data_type() { DataType::Varchar | DataType::Boolean @@ -208,7 +245,7 @@ impl EncoderBuild for TextEncoder { } } - Ok(Self::new(params.schema, pk_indices[0])) + Ok(Self::new(params.schema, pk_index)) } } @@ -347,27 +384,51 @@ impl SinkFormatterImpl { (F::AppendOnly, E::Json, Some(E::Text)) => { Impl::AppendOnlyTextJson(build(p).await?) } + (F::AppendOnly, E::Json, Some(E::Bytes)) => { + Impl::AppendOnlyBytesJson(build(p).await?) + } (F::AppendOnly, E::Json, None) => Impl::AppendOnlyJson(build(p).await?), (F::AppendOnly, E::Avro, Some(E::Text)) => { Impl::AppendOnlyTextAvro(build(p).await?) } + (F::AppendOnly, E::Avro, Some(E::Bytes)) => { + Impl::AppendOnlyBytesAvro(build(p).await?) + } (F::AppendOnly, E::Avro, None) => Impl::AppendOnlyAvro(build(p).await?), (F::AppendOnly, E::Protobuf, Some(E::Text)) => { Impl::AppendOnlyTextProto(build(p).await?) } + (F::AppendOnly, E::Protobuf, Some(E::Bytes)) => { + Impl::AppendOnlyBytesProto(build(p).await?) + } (F::AppendOnly, E::Protobuf, None) => Impl::AppendOnlyProto(build(p).await?), (F::AppendOnly, E::Template, Some(E::Text)) => { Impl::AppendOnlyTextTemplate(build(p).await?) } + (F::AppendOnly, E::Template, Some(E::Bytes)) => { + Impl::AppendOnlyBytesTemplate(build(p).await?) + } (F::AppendOnly, E::Template, None) => Impl::AppendOnlyTemplate(build(p).await?), (F::Upsert, E::Json, Some(E::Text)) => Impl::UpsertTextJson(build(p).await?), + (F::Upsert, E::Json, Some(E::Bytes)) => { + Impl::UpsertBytesJson(build(p).await?) + } (F::Upsert, E::Json, None) => Impl::UpsertJson(build(p).await?), (F::Upsert, E::Avro, Some(E::Text)) => Impl::UpsertTextAvro(build(p).await?), + (F::Upsert, E::Avro, Some(E::Bytes)) => { + Impl::UpsertBytesAvro(build(p).await?) + } (F::Upsert, E::Avro, None) => Impl::UpsertAvro(build(p).await?), (F::Upsert, E::Protobuf, Some(E::Text)) => Impl::UpsertTextProto(build(p).await?), + (F::Upsert, E::Protobuf, Some(E::Bytes)) => { + Impl::UpsertBytesProto(build(p).await?) + } (F::Upsert, E::Template, Some(E::Text)) => { Impl::UpsertTextTemplate(build(p).await?) } + (F::Upsert, E::Template, Some(E::Bytes)) => { + Impl::UpsertBytesTemplate(build(p).await?) + } (F::Upsert, E::Template, None) => Impl::UpsertTemplate(build(p).await?), (F::Debezium, E::Json, None) => Impl::DebeziumJson(build(p).await?), (F::AppendOnly | F::Upsert, E::Text, _) => { @@ -379,6 +440,9 @@ impl SinkFormatterImpl { | (F::Upsert, E::Protobuf, _) | (F::Debezium, E::Json, Some(_)) | (F::Debezium, E::Avro | E::Protobuf | E::Template | E::Text, _) + | (F::AppendOnly, E::Bytes, _) + | (F::Upsert, E::Bytes, _) + | (F::Debezium, E::Bytes, _) | (_, E::Parquet, _) | (_, _, Some(E::Parquet)) | (F::AppendOnly | F::Upsert, _, Some(E::Template) | Some(E::Json) | Some(E::Avro) | Some(E::Protobuf)) // reject other encode as key encode @@ -395,52 +459,80 @@ impl SinkFormatterImpl { } } +/// Macro to dispatch formatting implementation for all supported sink formatter types. +/// Used when the message key can be either bytes or string. +/// +/// Takes a formatter implementation ($impl), binds it to a name ($name), +/// and executes the provided code block ($body) with that binding. #[macro_export] macro_rules! dispatch_sink_formatter_impl { ($impl:expr, $name:ident, $body:expr) => { match $impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, + SinkFormatterImpl::AppendOnlyBytesJson($name) => $body, SinkFormatterImpl::AppendOnlyTextJson($name) => $body, SinkFormatterImpl::AppendOnlyAvro($name) => $body, SinkFormatterImpl::AppendOnlyTextAvro($name) => $body, + SinkFormatterImpl::AppendOnlyBytesAvro($name) => $body, SinkFormatterImpl::AppendOnlyProto($name) => $body, SinkFormatterImpl::AppendOnlyTextProto($name) => $body, + SinkFormatterImpl::AppendOnlyBytesProto($name) => $body, SinkFormatterImpl::UpsertJson($name) => $body, + SinkFormatterImpl::UpsertBytesJson($name) => $body, SinkFormatterImpl::UpsertTextJson($name) => $body, SinkFormatterImpl::UpsertAvro($name) => $body, SinkFormatterImpl::UpsertTextAvro($name) => $body, + SinkFormatterImpl::UpsertBytesAvro($name) => $body, SinkFormatterImpl::UpsertTextProto($name) => $body, + SinkFormatterImpl::UpsertBytesProto($name) => $body, SinkFormatterImpl::DebeziumJson($name) => $body, SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body, SinkFormatterImpl::AppendOnlyTemplate($name) => $body, SinkFormatterImpl::UpsertTextTemplate($name) => $body, SinkFormatterImpl::UpsertTemplate($name) => $body, + SinkFormatterImpl::AppendOnlyBytesTemplate($name) => $body, + SinkFormatterImpl::UpsertBytesTemplate($name) => $body, } }; } +/// Macro to dispatch formatting implementation for sink formatters that require string keys. +/// Used when the message key must be a string (excludes some Avro and bytes implementations). +/// +/// Similar to `dispatch_sink_formatter_impl`, but excludes certain formatter types +/// that don't support string keys (e.g., `AppendOnlyAvro`, `UpsertAvro`). +/// These cases are marked as unreachable!() since they should never occur +/// in contexts requiring string keys. #[macro_export] macro_rules! dispatch_sink_formatter_str_key_impl { ($impl:expr, $name:ident, $body:expr) => { match $impl { SinkFormatterImpl::AppendOnlyJson($name) => $body, + SinkFormatterImpl::AppendOnlyBytesJson(_) => unreachable!(), SinkFormatterImpl::AppendOnlyTextJson($name) => $body, SinkFormatterImpl::AppendOnlyAvro(_) => unreachable!(), SinkFormatterImpl::AppendOnlyTextAvro($name) => $body, + SinkFormatterImpl::AppendOnlyBytesAvro(_) => unreachable!(), SinkFormatterImpl::AppendOnlyProto($name) => $body, SinkFormatterImpl::AppendOnlyTextProto($name) => $body, + SinkFormatterImpl::AppendOnlyBytesProto(_) => unreachable!(), SinkFormatterImpl::UpsertJson($name) => $body, SinkFormatterImpl::UpsertTextJson($name) => $body, SinkFormatterImpl::UpsertAvro(_) => unreachable!(), SinkFormatterImpl::UpsertTextAvro($name) => $body, + SinkFormatterImpl::UpsertBytesAvro(_) => unreachable!(), SinkFormatterImpl::UpsertTextProto($name) => $body, + SinkFormatterImpl::UpsertBytesProto(_) => unreachable!(), SinkFormatterImpl::DebeziumJson($name) => $body, SinkFormatterImpl::AppendOnlyTextTemplate($name) => $body, SinkFormatterImpl::AppendOnlyTemplate($name) => $body, SinkFormatterImpl::UpsertTextTemplate($name) => $body, + SinkFormatterImpl::UpsertBytesJson(_) => unreachable!(), SinkFormatterImpl::UpsertTemplate($name) => $body, + SinkFormatterImpl::AppendOnlyBytesTemplate(_) => unreachable!(), + SinkFormatterImpl::UpsertBytesTemplate(_) => unreachable!(), } }; } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index fb35c5efc2e99..92b4cc3352785 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -30,6 +30,7 @@ use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; +use risingwave_connector::sink::kafka::KAFKA_SINK; use risingwave_connector::sink::{ CONNECTOR_TYPE_KEY, SINK_TYPE_OPTION, SINK_USER_FORCE_APPEND_ONLY_OPTION, SINK_WITHOUT_BACKFILL, }; @@ -840,13 +841,15 @@ fn bind_sink_format_desc(session: &SessionImpl, value: ConnectorSchema) -> Resul let mut key_encode = None; if let Some(encode) = value.key_encode { - if encode == E::Text { - key_encode = Some(SinkEncode::Text); - } else { - return Err(ErrorCode::BindError(format!( - "sink key encode unsupported: {encode}, only TEXT supported" - )) - .into()); + match encode { + E::Text => key_encode = Some(SinkEncode::Text), + E::Bytes => key_encode = Some(SinkEncode::Bytes), + _ => { + return Err(ErrorCode::BindError(format!( + "sink key encode unsupported: {encode}, only TEXT and BYTES supported" + )) + .into()) + } } } @@ -951,6 +954,19 @@ pub fn validate_compatibility(connector: &str, format_desc: &ConnectorSchema) -> )) .into()); } + + // only allow Kafka connector work with `bytes` as key encode + if let Some(encode) = &format_desc.key_encode + && connector != KAFKA_SINK + && matches!(encode, Encode::Bytes) + { + return Err(ErrorCode::BindError(format!( + "key encode bytes only works with kafka connector, but found {}", + connector + )) + .into()); + } + Ok(()) }