Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 7, 2024
1 parent 89fa7dd commit 80ab6bb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
8 changes: 3 additions & 5 deletions e2e_test/sink/kafka/create_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ format plain encode json (
force_append_only='true'
) key encode json ;

statement error
# The key encode is TEXT, but the primary key has 2 columns. The key encode TEXT requires the primary key to be a single column.s
statement error KEY ENCODE TEXT expects only one primary key, but got 2
create sink sink_text_error from t_kafka with (
connector = 'kafka',
properties.bootstrap.server = 'message_queue:29092',
Expand Down Expand Up @@ -200,8 +199,7 @@ format plain encode json (
force_append_only='true'
) key encode json;

statement error
# The key encode is BYTES, but the primary key has 2 columns. The key encode BYTES requires the primary key to be a single column
statement error KEY ENCODE BYTES expects only one primary key, but got 2
create sink sink_bytes_error as (
select int8send(id) as id_bytes, '\x1234'::bytea as other_bytea, * from t_kafka
) with (
Expand All @@ -211,7 +209,7 @@ create sink sink_bytes_error as (
primary_key = 'id_bytes, other_bytea')
format plain encode json (
force_append_only='true'
) key encode json;
) key encode bytes;

statement error key encode bytes only works with kafka connector, but found kinesis
create sink sink_bytes_json as (
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,9 @@ fn ensure_only_one_pk<'a>(
};
if pk_indices.len() != 1 {
return Err(SinkError::Config(anyhow!(
"The key encode is {}, but the primary key has {} columns. The key encode {} requires the primary key to be a single column",
"KEY ENCODE {} expects only one primary key, but got {}",
data_type_name,
pk_indices.len(),
data_type_name
)));
}

Expand Down
28 changes: 14 additions & 14 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::catalog::{ColumnCatalog, DatabaseId, Schema, SchemaId, Ta
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::types::DataType;
use risingwave_common::{bail, catalog};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkEncode, SinkFormatDesc, SinkType};
use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType};
use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK};
use risingwave_connector::sink::kafka::KAFKA_SINK;
use risingwave_connector::sink::{
Expand Down Expand Up @@ -182,19 +182,6 @@ pub async fn gen_sink_plan(
},
};

// only allow Kafka connector work with `bytes` as key encode
if let Some(format_desc) = &format_desc
&& let Some(encode) = &format_desc.key_encode
&& connector != KAFKA_SINK
&& matches!(encode, SinkEncode::Bytes)
{
return Err(ErrorCode::BindError(format!(
"key encode bytes only works with kafka connector, but found {}",
connector
))
.into());
}

let definition = context.normalized_sql().to_owned();
let mut plan_root = Planner::new(context.into()).plan_query(bound)?;
if let Some(col_names) = &col_names {
Expand Down Expand Up @@ -963,6 +950,19 @@ pub fn validate_compatibility(connector: &str, format_desc: &FormatEncodeOptions
))
.into());
}

// only allow Kafka connector work with `bytes` as key encode
if let Some(encode) = &format_desc.key_encode
&& connector != KAFKA_SINK
&& matches!(encode, Encode::Bytes)
{
return Err(ErrorCode::BindError(format!(
"key encode bytes only works with kafka connector, but found {}",
connector
))
.into());
}

Ok(())
}

Expand Down

0 comments on commit 80ab6bb

Please sign in to comment.