From 80ab6bb1042785ee29c1f454bcf6c064f805ea1e Mon Sep 17 00:00:00 2001 From: tabversion Date: Thu, 7 Nov 2024 18:56:11 +0800 Subject: [PATCH] fix --- e2e_test/sink/kafka/create_sink.slt | 8 +++---- src/connector/src/sink/formatter/mod.rs | 3 +-- src/frontend/src/handler/create_sink.rs | 28 ++++++++++++------------- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/e2e_test/sink/kafka/create_sink.slt b/e2e_test/sink/kafka/create_sink.slt index 9764090d8d4f..e7b5dc3a0401 100644 --- a/e2e_test/sink/kafka/create_sink.slt +++ b/e2e_test/sink/kafka/create_sink.slt @@ -156,8 +156,7 @@ format plain encode json ( force_append_only='true' ) key encode json ; -statement error -# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s +statement error KEY ENCODE TEXT expects only one primary key, but got 2 create sink sink_text_error from t_kafka with ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', @@ -200,8 +199,7 @@ format plain encode json ( force_append_only='true' ) key encode json; -statement error -# The key encode is BYTES, but the primary key has 2 columns. The key encode BYTES requires the primary key to be a single column +statement error KEY ENCODE BYTES expects only one primary key, but got 2 create sink sink_bytes_error as ( select int8send(id) as id_bytes, '\x1234'::bytea as other_bytea, * from t_kafka ) with ( @@ -211,7 +209,7 @@ create sink sink_bytes_error as ( primary_key = 'id_bytes, other_bytea') format plain encode json ( force_append_only='true' -) key encode json; +) key encode bytes; statement error key encode bytes only works with kafka connector, but found kinesis create sink sink_bytes_json as ( diff --git a/src/connector/src/sink/formatter/mod.rs b/src/connector/src/sink/formatter/mod.rs index ff8b640325fe..0fd6a61343a8 100644 --- a/src/connector/src/sink/formatter/mod.rs +++ b/src/connector/src/sink/formatter/mod.rs @@ -191,10 +191,9 @@ fn ensure_only_one_pk<'a>( }; if pk_indices.len() != 1 { return Err(SinkError::Config(anyhow!( - "The key encode is {}, but the primary key has {} columns. The key encode {} requires the primary key to be a single column", + "KEY ENCODE {} expects only one primary key, but got {}", data_type_name, pk_indices.len(), - data_type_name ))); } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 76107ba621fb..a9966e3acb86 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -26,7 +26,7 @@ use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, Ta use risingwave_common::secret::LocalSecretManager; use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; -use risingwave_connector::sink::catalog::{SinkCatalog, SinkEncode, SinkFormatDesc, SinkType}; +use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; use risingwave_connector::sink::kafka::KAFKA_SINK; use risingwave_connector::sink::{ @@ -182,19 +182,6 @@ pub async fn gen_sink_plan( }, }; - // only allow Kafka connector work with `bytes` as key encode - if let Some(format_desc) = &format_desc - && let Some(encode) = &format_desc.key_encode - && connector != KAFKA_SINK - && matches!(encode, SinkEncode::Bytes) - { - return Err(ErrorCode::BindError(format!( - "key encode bytes only works with kafka connector, but found {}", - connector - )) - .into()); - } - let definition = context.normalized_sql().to_owned(); let mut plan_root = Planner::new(context.into()).plan_query(bound)?; if let Some(col_names) = &col_names { @@ -963,6 +950,19 @@ pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions )) .into()); } + + // only allow Kafka connector work with `bytes` as key encode + if let Some(encode) = &format_desc.key_encode + && connector != KAFKA_SINK + && matches!(encode, Encode::Bytes) + { + return Err(ErrorCode::BindError(format!( + "key encode bytes only works with kafka connector, but found {}", + connector + )) + .into()); + } + Ok(()) }