From 6ec29a9ab4e51ebae66f995042a2df48355c9ccb Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 14:55:44 +0800 Subject: [PATCH 01/10] move test Signed-off-by: xxchan --- .../inlcude_key_as.slt => source_inline/kafka/include_key_as.slt} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename e2e_test/{source/basic/inlcude_key_as.slt => source_inline/kafka/include_key_as.slt} (100%) diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt similarity index 100% rename from e2e_test/source/basic/inlcude_key_as.slt rename to e2e_test/source_inline/kafka/include_key_as.slt From 0b37f4ba53eca847e8c7bbda3ffb0d0326ad3461 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 16:29:08 +0800 Subject: [PATCH 02/10] modernize test Signed-off-by: xxchan --- Makefile.toml | 3 +- .../source_inline/kafka/include_key_as.slt | 108 +++++++++++++----- scripts/source/prepare_ci_kafka.sh | 4 - 3 files changed, 81 insertions(+), 34 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index 1e618b1aed6ba..ca7e5ce10fb5f 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1294,7 +1294,7 @@ echo "All processes has exited." [tasks.slt] category = "RiseDev - Test - SQLLogicTest" -install_crate = { version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +install_crate = { min_version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-and-load-risedev-env-file"] @@ -1308,6 +1308,7 @@ SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}" SLT_DB = "dev" PATH = "${PWD}/e2e_test/commands:${PATH}" RUST_BACKTRACE = "0" # slt backtrace is useless +RUST_LOG = "sqllogictest=warn,sqllogictest::system_command=info" [tasks.slt-clean] category = "RiseDev - Test - SQLLogicTest" diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index c971e1ac074d0..7b7cc9dfb6feb 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -1,3 +1,30 @@ +control substitution on + +system ok +rpk topic delete 'test_include_key' || true; + +system ok +rpk topic create 'test_include_key' + +system ok +cat < Date: Fri, 21 Jun 2024 16:34:34 +0800 Subject: [PATCH 03/10] add more error test Signed-off-by: xxchan --- .../source_inline/kafka/include_key_as.slt | 103 +++++++++++++++--- 1 file changed, 86 insertions(+), 17 deletions(-) diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index 7b7cc9dfb6feb..fb1faccbae94d 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -25,9 +25,29 @@ cat < Date: Fri, 21 Jun 2024 18:01:59 +0800 Subject: [PATCH 04/10] enhance error message for FORMAT UPSERT Signed-off-by: xxchan --- .../kafka/avro/name_strategy.slt | 1 - .../source_inline/kafka/include_key_as.slt | 40 +++++++++- src/frontend/src/handler/create_source.rs | 73 +++++++++++++------ src/frontend/src/handler/create_table.rs | 6 +- src/sqlparser/src/ast/statement.rs | 23 ++++-- 5 files changed, 107 insertions(+), 36 deletions(-) diff --git a/e2e_test/source_inline/kafka/avro/name_strategy.slt b/e2e_test/source_inline/kafka/avro/name_strategy.slt index 09bd171bafe37..e48be738886f4 100644 --- a/e2e_test/source_inline/kafka/avro/name_strategy.slt +++ b/e2e_test/source_inline/kafka/avro/name_strategy.slt @@ -27,7 +27,6 @@ create source s1 () with ( system ok python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic" "avro" -# If we cannot extract key schema, use message key as varchar primary key statement ok CREATE TABLE t_topic ( primary key (rw_key) ) INCLUDE KEY AS rw_key diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index fb1faccbae94d..53f3a160d0a07 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -42,7 +42,16 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Protocol error: INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE Json + Bind error: can't CREATE SOURCE with FORMAT UPSERT + +Hint: use CREATE TABLE instead + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) # upsert format must have a pk @@ -64,7 +73,14 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Protocol error: Primary key must be specified to rw_key when creating source with FORMAT UPSERT ENCODE Json + Protocol error: Primary key must be specified to rw_key + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) # upsert format pk must be the key column @@ -86,7 +102,14 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Protocol error: upsert JSON's key column rw_key not match with sql defined primary key ID + Protocol error: Only ID can be used as primary key + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) statement error @@ -108,7 +131,16 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Invalid input syntax: Source does not support PRIMARY KEY constraint, please use "CREATE TABLE" instead + Bind error: can't CREATE SOURCE with FORMAT UPSERT + +Hint: use CREATE TABLE instead + +Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE JSON (...) statement ok diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e5ead9d71a19b..89eeaa382f24d 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -750,6 +750,18 @@ pub(crate) fn bind_all_columns( } } +fn hint_upsert(encode: &Encode) -> String { + return format!( + r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key. +example: + CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) + INCLUDE KEY [AS ] + WITH (...) + FORMAT UPSERT ENCODE {encode:} (...) +"# + ); +} + /// Bind column from source. Add key column to table columns if necessary. /// Return `pk_names`. pub(crate) async fn bind_source_pk( @@ -760,7 +772,7 @@ pub(crate) async fn bind_source_pk( with_properties: &WithOptions, ) -> Result> { let sql_defined_pk = !sql_defined_pk_names.is_empty(); - let key_column_name: Option = { + let include_key_column_name: Option = { // iter columns to check if contains additional columns from key part // return the key column names if exists columns.iter().find_map(|catalog| { @@ -793,34 +805,36 @@ pub(crate) async fn bind_source_pk( // For all Upsert formats, we only accept one and only key column as primary key. // Additional KEY columns must be set in this case and must be primary key. (Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => { - if let Some(ref key_column_name) = key_column_name + if let Some(ref key_column_name) = include_key_column_name && sql_defined_pk { - if sql_defined_pk_names.len() != 1 { - return Err(RwError::from(ProtocolError(format!( - "upsert {:?} supports only one primary key column ({}).", - encode, key_column_name - )))); - } + // pk is set. check if it's valid + // the column name have been converted to real value in `handle_addition_columns` // so we don't ignore ascii case here - if !key_column_name.eq(sql_defined_pk_names[0].as_str()) { + if sql_defined_pk_names.len() != 1 + || !key_column_name.eq(sql_defined_pk_names[0].as_str()) + { return Err(RwError::from(ProtocolError(format!( - "upsert {}'s key column {} not match with sql defined primary key {}", - encode, key_column_name, sql_defined_pk_names[0] + "Only {} can be used as primary key\n\n{}", + sql_defined_pk_names[0], + hint_upsert(&encode) )))); } sql_defined_pk_names } else { - return if key_column_name.is_none() { + // pk not set, or even key not included + return if include_key_column_name.is_none() { Err(RwError::from(ProtocolError(format!( - "INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE {:?}", - encode + "INCLUDE KEY clause not set\n\n{}", + hint_upsert(&encode) )))) } else { Err(RwError::from(ProtocolError(format!( - "Primary key must be specified to {} when creating source with FORMAT UPSERT ENCODE {:?}", - key_column_name.unwrap(), encode)))) + "Primary key must be specified to {}\n\n{}", + include_key_column_name.unwrap(), + hint_upsert(&encode) + )))) }; } } @@ -1340,8 +1354,9 @@ pub fn bind_connector_props( } Ok(WithOptions::new(with_properties)) } + #[allow(clippy::too_many_arguments)] -pub async fn bind_create_source( +pub async fn bind_create_source_or_table_with_connector( handler_args: HandlerArgs, full_name: ObjectName, source_schema: ConnectorSchema, @@ -1364,9 +1379,25 @@ pub async fn bind_create_source( session.get_database_and_schema_id_for_create(schema_name.clone())?; if !is_create_source && with_properties.is_iceberg_connector() { - return Err( - ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(), - ); + return Err(ErrorCode::BindError( + "can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead" + .to_string(), + ) + .into()); + } + if is_create_source { + match source_schema.format { + Format::Upsert => { + return Err(ErrorCode::BindError(format!( + "can't CREATE SOURCE with FORMAT UPSERT\n\nHint: use CREATE TABLE instead\n\n{}", + hint_upsert(&source_schema.row_encode) + )) + .into()); + } + _ => { + // TODO: enhance error message for other formats + } + } } ensure_table_constraints_supported(&constraints)?; @@ -1522,7 +1553,7 @@ pub async fn handle_create_source( } let mut col_id_gen = ColumnIdGenerator::new_initial(); - let (source_catalog, database_id, schema_id) = bind_create_source( + let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), stmt.source_name, source_schema, diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index c542762702053..de81115e628f1 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -55,8 +55,8 @@ use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ - bind_columns_from_source, bind_connector_props, bind_create_source, bind_source_watermark, - handle_addition_columns, UPSTREAM_SOURCE_KEY, + bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector, + bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind}; @@ -483,7 +483,7 @@ pub(crate) async fn gen_create_table_plan_with_source( let (columns_from_resolve_source, source_info) = bind_columns_from_source(session, &source_schema, &with_properties).await?; - let (source_catalog, database_id, schema_id) = bind_create_source( + let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector( handler_args.clone(), table_name, source_schema, diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 0c92209471450..facedadeb5bc0 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -94,11 +94,16 @@ pub struct CreateSourceStatement { pub include_column_options: IncludeOption, } +/// FORMAT means how to get the operation(Insert/Delete) from the input. +/// +/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed. #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Format { + /// The format is the same with RisingWave's internal representation. + /// Used internally for schema change Native, - // Keyword::NONE + /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user. None, // Keyword::DEBEZIUM Debezium, @@ -143,8 +148,8 @@ impl Format { "CANAL" => Format::Canal, "PLAIN" => Format::Plain, "UPSERT" => Format::Upsert, - "NATIVE" => Format::Native, // used internally for schema change - "NONE" => Format::None, // used by iceberg + "NATIVE" => Format::Native, + "NONE" => Format::None, _ => parser_err!( "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT" ), @@ -152,6 +157,7 @@ impl Format { } } +/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed. #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum Encode { @@ -160,8 +166,11 @@ pub enum Encode { Protobuf, // Keyword::PROTOBUF Json, // Keyword::JSON Bytes, // Keyword::BYTES - None, // Keyword::None - Text, // Keyword::TEXT + /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user. + None, + Text, // Keyword::TEXT + /// The encode is the same with RisingWave's internal representation. + /// Used internally for schema change Native, Template, } @@ -197,8 +206,8 @@ impl Encode { "PROTOBUF" => Encode::Protobuf, "JSON" => Encode::Json, "TEMPLATE" => Encode::Template, - "NATIVE" => Encode::Native, // used internally for schema change - "NONE" => Encode::None, // used by iceberg + "NATIVE" => Encode::Native, + "NONE" => Encode::None, _ => parser_err!( "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | NONE after Encode" ), From 0fbfcd7bf922b755483d20fcc44820c708d8cd59 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 21 Jun 2024 18:21:25 +0800 Subject: [PATCH 05/10] refine test Signed-off-by: xxchan --- .../source_inline/kafka/include_key_as.slt | 22 +++++++++++++++ src/frontend/src/handler/create_source.rs | 28 ++----------------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index 53f3a160d0a07..f80c2e29a9831 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -197,6 +197,28 @@ WITH ( topic = 'test_additional_columns') FORMAT PLAIN ENCODE JSON +# header with varchar type & non-exist header key +statement error +create table additional_columns_1 (a int) +include key as key_col +include partition as partition_col +include offset as offset_col +include timestamp 'header1' as timestamp_col +include header 'header1' as header_col_1 +include header 'header2' as header_col_2 +include header 'header2' varchar as header_col_3 +include header 'header3' as header_col_4 +WITH ( + ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON}, + topic = 'test_additional_columns') +FORMAT PLAIN ENCODE JSON +---- +db error: ERROR: Failed to run the query + +Caused by: + Protocol error: Only header column can have inner field, but got "timestamp" + + # header with varchar type & non-exist header key statement ok create table additional_columns_1 (a int) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 89eeaa382f24d..ebcc2f0c93769 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -818,7 +818,7 @@ pub(crate) async fn bind_source_pk( return Err(RwError::from(ProtocolError(format!( "Only {} can be used as primary key\n\n{}", sql_defined_pk_names[0], - hint_upsert(&encode) + hint_upsert(encode) )))); } sql_defined_pk_names @@ -827,13 +827,13 @@ pub(crate) async fn bind_source_pk( return if include_key_column_name.is_none() { Err(RwError::from(ProtocolError(format!( "INCLUDE KEY clause not set\n\n{}", - hint_upsert(&encode) + hint_upsert(encode) )))) } else { Err(RwError::from(ProtocolError(format!( "Primary key must be specified to {}\n\n{}", include_key_column_name.unwrap(), - hint_upsert(&encode) + hint_upsert(encode) )))) }; } @@ -1841,28 +1841,6 @@ pub mod tests { }; assert_eq!(columns, expect_columns); - // test derive include column for format upsert - let sql = "CREATE SOURCE s1 (v1 int) with (connector = 'kafka') format upsert encode json" - .to_string(); - match frontend.run_sql(sql).await { - Err(e) => { - assert_eq!( - e.to_string(), - "Protocol error: INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE Json" - ) - } - _ => unreachable!(), - } - - let sql = "CREATE SOURCE s2 (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format upsert encode json" - .to_string(); - match frontend.run_sql(sql).await { - Err(e) => { - assert_eq!(e.to_string(), "Protocol error: Primary key must be specified to _rw_kafka_key when creating source with FORMAT UPSERT ENCODE Json") - } - _ => unreachable!(), - } - let sql = "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json" .to_string(); From 111dba6422e831017e5c47012ccb0397b73e86c1 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 09:36:38 +0800 Subject: [PATCH 06/10] fix test Signed-off-by: xxchan --- e2e_test/source_inline/kafka/include_key_as.slt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index f80c2e29a9831..8d95c7edf25d4 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -182,8 +182,9 @@ rpk topic delete 'test_additional_columns' || true; system ok rpk topic create 'test_additional_columns' +# Note: `sh` doesn't have {..} brace expansion system ok -for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce 'test_additional_columns' -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done +bash -c 'for i in {0..100}; do echo "key\$i:{\\"a\\": \$i}" | rpk topic produce test_additional_columns -f "%k:%v\\n" -H "header1=v1" -H "header2=v2"; done' statement ok create table additional_columns (a int) From bb39fd0eb7ea3442ada3140566eee65ad262d4cc Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 12:24:55 +0800 Subject: [PATCH 07/10] minor Signed-off-by: xxchan --- Makefile.toml | 2 +- ci/Dockerfile | 4 ++-- ci/build-ci-image.sh | 2 +- ci/docker-compose.yml | 10 +++++----- src/frontend/src/handler/create_source.rs | 4 ++-- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Makefile.toml b/Makefile.toml index ca7e5ce10fb5f..cdbc49a490ff5 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -1294,7 +1294,7 @@ echo "All processes has exited." [tasks.slt] category = "RiseDev - Test - SQLLogicTest" -install_crate = { min_version = "0.20.1", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ +install_crate = { min_version = "0.20.6", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [ "--help", ], install_command = "binstall" } dependencies = ["check-and-load-risedev-env-file"] diff --git a/ci/Dockerfile b/ci/Dockerfile index 1e98d02fa522d..04e2154a93b17 100644 --- a/ci/Dockerfile +++ b/ci/Dockerfile @@ -70,14 +70,14 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \ cargo-make@0.37.9 \ - sqllogictest-bin@0.20.4 \ + sqllogictest-bin@0.20.6 \ sccache@0.7.4 \ && cargo cache -a \ && rm -rf "/root/.cargo/registry/index" \ && rm -rf "/root/.cargo/registry/cache" \ && rm -rf "/root/.cargo/git/db" RUN cargo install cargo-dylint@3.1.0 dylint-link@3.1.0 -RUN cargo uninstall cargo-binstall cargo-cache +RUN cargo uninstall cargo-cache # install risedev COPY < String { - return format!( + format!( r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key. example: CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) @@ -759,7 +759,7 @@ example: WITH (...) FORMAT UPSERT ENCODE {encode:} (...) "# - ); + ) } /// Bind column from source. Add key column to table columns if necessary. From 2e02b736d526d21a256d14664764a8f55a33be8c Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 12:43:21 +0800 Subject: [PATCH 08/10] fix Signed-off-by: xxchan --- e2e_test/source_inline/kafka/include_key_as.slt | 2 +- src/frontend/src/handler/create_source.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/e2e_test/source_inline/kafka/include_key_as.slt b/e2e_test/source_inline/kafka/include_key_as.slt index 8d95c7edf25d4..332e15bd54b03 100644 --- a/e2e_test/source_inline/kafka/include_key_as.slt +++ b/e2e_test/source_inline/kafka/include_key_as.slt @@ -102,7 +102,7 @@ FORMAT UPSERT ENCODE JSON db error: ERROR: Failed to run the query Caused by: - Protocol error: Only ID can be used as primary key + Protocol error: Only "rw_key" can be used as primary key Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key. example: diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 94fce63e99404..e790755c7b3c2 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -816,8 +816,8 @@ pub(crate) async fn bind_source_pk( || !key_column_name.eq(sql_defined_pk_names[0].as_str()) { return Err(RwError::from(ProtocolError(format!( - "Only {} can be used as primary key\n\n{}", - sql_defined_pk_names[0], + "Only \"{}\" can be used as primary key\n\n{}", + key_column_name, hint_upsert(encode) )))); } From 641dbb92ec9179166f7d9825e1f4eac13c0f4110 Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 13:33:01 +0800 Subject: [PATCH 09/10] Update src/frontend/src/handler/create_source.rs Co-authored-by: Bugen Zhao --- src/frontend/src/handler/create_source.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index e790755c7b3c2..5e6f1fc500c06 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -751,7 +751,7 @@ pub(crate) fn bind_all_columns( } fn hint_upsert(encode: &Encode) -> String { - format!( + format_args!( r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key. example: CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) From d04bed5413896cae5df5b5499976a2e178872f1a Mon Sep 17 00:00:00 2001 From: xxchan Date: Mon, 24 Jun 2024 14:06:46 +0800 Subject: [PATCH 10/10] nit Signed-off-by: xxchan --- src/frontend/src/handler/create_source.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 5e6f1fc500c06..4c5a8abdec005 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -750,8 +750,9 @@ pub(crate) fn bind_all_columns( } } +/// TODO: perhaps put the hint in notice is better. The error message format might be not that reliable. fn hint_upsert(encode: &Encode) -> String { - format_args!( + format!( r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key. example: CREATE TABLE ( PRIMARY KEY ([rw_key | ]) ) @@ -824,15 +825,15 @@ pub(crate) async fn bind_source_pk( sql_defined_pk_names } else { // pk not set, or even key not included - return if include_key_column_name.is_none() { + return if let Some(include_key_column_name) = include_key_column_name { Err(RwError::from(ProtocolError(format!( - "INCLUDE KEY clause not set\n\n{}", + "Primary key must be specified to {}\n\n{}", + include_key_column_name, hint_upsert(encode) )))) } else { Err(RwError::from(ProtocolError(format!( - "Primary key must be specified to {}\n\n{}", - include_key_column_name.unwrap(), + "INCLUDE KEY clause not set\n\n{}", hint_upsert(encode) )))) };