diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt index fb1faccbae94d..53f3a160d0a07 100644 --- a/e2e_test/source/basic/inlcude_key_as.slt +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -42,7 +42,16 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Protocol error: INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE Json + Bind error: can't CREATE SOURCE with FORMAT UPSERT + +Hint: use CREATE TABLE instead + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) # upsert format must have a pk @@ -64,7 +73,14 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Protocol error: Primary key must be specified to rw_key when creating source with FORMAT UPSERT ENCODE Json + Protocol error: Primary key must be specified to rw_key + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) # upsert format pk must be the key column @@ -86,7 +102,14 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Protocol error: upsert JSON's key column rw_key not match with sql defined primary key ID + Protocol error: Only ID can be used as primary key + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) statement error @@ -108,7 +131,16 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Invalid input syntax: Source does not support PRIMARY KEY constraint, please use "CREATE TABLE" instead + Bind error: can't CREATE SOURCE with FORMAT UPSERT + +Hint: use CREATE TABLE instead + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) statement ok diff --git a/e2e_test/source_inline/kafka/avro/name_strategy.slt b/e2e_test/source_inline/kafka/avro/name_strategy.slt index 09bd171bafe37..e48be738886f4 100644 --- a/e2e_test/source_inline/kafka/avro/name_strategy.slt +++ b/e2e_test/source_inline/kafka/avro/name_strategy.slt @@ -27,7 +27,6 @@ create source s1 () with ( system ok python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic" "avro" -# If we cannot extract key schema, use message key as varchar primary key statement ok CREATE TABLE t_topic ( primary key (rw_key) ) INCLUDE KEY AS rw_key diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 1f458bbbc09b5..41c6841c7990f 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -750,6 +750,18 @@ pub(crate) fn bind_all_columns( } } +fn hint_upsert(encode: &Encode) -> String { + return format!( + r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE {encode:} (...) +"# + ); +} + /// Bind column from source. Add key column to table columns if necessary. /// Return `pk_names`. pub(crate) async fn bind_source_pk( @@ -760,7 +772,7 @@ pub(crate) async fn bind_source_pk( with_properties: &WithOptions, ) -> Result> { let sql_defined_pk = !sql_defined_pk_names.is_empty(); - let key_column_name: Option = { + let include_key_column_name: Option = { // iter columns to check if contains additional columns from key part // return the key column names if exists columns.iter().find_map(|catalog| { @@ -793,34 +805,36 @@ pub(crate) async fn bind_source_pk( // For all Upsert formats, we only accept one and only key column as primary key. // Additional KEY columns must be set in this case and must be primary key. (Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => { - if let Some(ref key_column_name) = key_column_name + if let Some(ref key_column_name) = include_key_column_name && sql_defined_pk { - if sql_defined_pk_names.len() != 1 { - return Err(RwError::from(ProtocolError(format!( - "upsert {:?} supports only one primary key column ({}).", - encode, key_column_name - )))); - } + // pk is set. check if it's valid + // the column name have been converted to real value in `handle_addition_columns` // so we don't ignore ascii case here - if !key_column_name.eq(sql_defined_pk_names[0].as_str()) { + if sql_defined_pk_names.len() != 1 + || !key_column_name.eq(sql_defined_pk_names[0].as_str()) + { return Err(RwError::from(ProtocolError(format!( - "upsert {}'s key column {} not match with sql defined primary key {}", - encode, key_column_name, sql_defined_pk_names[0] + "Only {} can be used as primary key\n\n{}", + sql_defined_pk_names[0], + hint_upsert(&encode) )))); } sql_defined_pk_names } else { - return if key_column_name.is_none() { + // pk not set, or even key not included + return if include_key_column_name.is_none() { Err(RwError::from(ProtocolError(format!( - "INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE {:?}", - encode + "INCLUDE KEY clause not set\n\n{}", + hint_upsert(&encode) )))) } else { Err(RwError::from(ProtocolError(format!( - "Primary key must be specified to {} when creating source with FORMAT UPSERT ENCODE {:?}", - key_column_name.unwrap(), encode)))) + "Primary key must be specified to {}\n\n{}", + include_key_column_name.unwrap(), + hint_upsert(&encode) + )))) }; } } @@ -1340,8 +1354,9 @@ pub fn bind_connector_props( } Ok(WithOptions::new(with_properties)) } + #[allow(clippy::too_many_arguments)] -pub async fn bind_create_source( +pub async fn bind_create_source_or_table_with_connector( handler_args: HandlerArgs, full_name: ObjectName, source_schema: ConnectorSchema, @@ -1364,9 +1379,25 @@ pub async fn bind_create_source( session.get_database_and_schema_id_for_create(schema_name.clone())?; if !is_create_source && with_properties.is_iceberg_connector() { - return Err( - ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(), - ); + return Err(ErrorCode::BindError( + "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead" + .to_string(), + ) + .into()); + } + if is_create_source { + match source_schema.format { + Format::Upsert => { + return Err(ErrorCode::BindError(format!( + "can't CREATE SOURCE with FORMAT UPSERT\n\nHint: use CREATE TABLE instead\n\n{}", + hint_upsert(&source_schema.row_encode) + )) + .into()); + } + _ => { + // TODO: enhance error message for other formats + } + } } ensure_table_constraints_supported(&constraints)?; @@ -1522,7 +1553,7 @@ pub async fn handle_create_source( } let mut col_id_gen = ColumnIdGenerator::new_initial(); - let (source_catalog, database_id, schema_id) = bind_create_source( + let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), stmt.source_name, source_schema, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index c542762702053..de81115e628f1 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -55,8 +55,8 @@ use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ - bind_columns_from_source, bind_connector_props, bind_create_source, bind_source_watermark, - handle_addition_columns, UPSTREAM_SOURCE_KEY, + bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector, + bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; @@ -483,7 +483,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let (columns_from_resolve_source, source_info) = bind_columns_from_source(session, &source_schema, &with_properties).await?; - let (source_catalog, database_id, schema_id) = bind_create_source( + let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), table_name, source_schema, diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 0c92209471450..facedadeb5bc0 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -94,11 +94,16 @@ pub struct CreateSourceStatement { pub include_column_options: IncludeOption, } +/// FORMAT means how to get the operation(Insert/Delete) from the input. +/// +/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed. #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Format { + /// The format is the same with RisingWave's internal representation. + /// Used internally for schema change Native, - // Keyword::NONE + /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user. None, // Keyword::DEBEZIUM Debezium, @@ -143,8 +148,8 @@ impl Format { "CANAL" => Format::Canal, "PLAIN" => Format::Plain, "UPSERT" => Format::Upsert, - "NATIVE" => Format::Native, // used internally for schema change - "NONE" => Format::None, // used by iceberg + "NATIVE" => Format::Native, + "NONE" => Format::None, _ => parser_err!( "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT" ), @@ -152,6 +157,7 @@ impl Format { } } +/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed. #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Encode { @@ -160,8 +166,11 @@ pub enum Encode { Protobuf, // Keyword::PROTOBUF Json, // Keyword::JSON Bytes, // Keyword::BYTES - None, // Keyword::None - Text, // Keyword::TEXT + /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user. + None, + Text, // Keyword::TEXT + /// The encode is the same with RisingWave's internal representation. + /// Used internally for schema change Native, Template, } @@ -197,8 +206,8 @@ impl Encode { "PROTOBUF" => Encode::Protobuf, "JSON" => Encode::Json, "TEMPLATE" => Encode::Template, - "NATIVE" => Encode::Native, // used internally for schema change - "NONE" => Encode::None, // used by iceberg + "NATIVE" => Encode::Native, + "NONE" => Encode::None, _ => parser_err!( "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | NONE after Encode" ),