Skip to content

Commit

Permalink
enhance error message for FORMAT UPSERT
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jun 21, 2024
1 parent 307bf21 commit 828b5fa
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 36 deletions.
1 change: 0 additions & 1 deletion e2e_test/source_inline/kafka/avro/name_strategy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ create source s1 () with (
system ok
python3 scripts/source/schema_registry_producer.py "${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}" "${RISEDEV_SCHEMA_REGISTRY_URL}" e2e_test/source_inline/kafka/avro/upsert_avro_json "topic" "avro"

# If we cannot extract key schema, use message key as varchar primary key
statement ok
CREATE TABLE t_topic ( primary key (rw_key) )
INCLUDE KEY AS rw_key
Expand Down
40 changes: 36 additions & 4 deletions e2e_test/source_inline/kafka/include_key_as.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,16 @@ FORMAT UPSERT ENCODE JSON
db error: ERROR: Failed to run the query

Caused by:
Protocol error: INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE Json
Bind error: can't CREATE SOURCE with FORMAT UPSERT

Hint: use CREATE TABLE instead

Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key.
example:
CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
INCLUDE KEY [AS <key_name>]
WITH (...)
FORMAT UPSERT ENCODE JSON (...)


# upsert format must have a pk
Expand All @@ -64,7 +73,14 @@ FORMAT UPSERT ENCODE JSON
db error: ERROR: Failed to run the query

Caused by:
Protocol error: Primary key must be specified to rw_key when creating source with FORMAT UPSERT ENCODE Json
Protocol error: Primary key must be specified to rw_key

Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key.
example:
CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
INCLUDE KEY [AS <key_name>]
WITH (...)
FORMAT UPSERT ENCODE JSON (...)


# upsert format pk must be the key column
Expand All @@ -86,7 +102,14 @@ FORMAT UPSERT ENCODE JSON
db error: ERROR: Failed to run the query

Caused by:
Protocol error: upsert JSON's key column rw_key not match with sql defined primary key ID
Protocol error: Only ID can be used as primary key

Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key.
example:
CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
INCLUDE KEY [AS <key_name>]
WITH (...)
FORMAT UPSERT ENCODE JSON (...)


statement error
Expand All @@ -108,7 +131,16 @@ FORMAT UPSERT ENCODE JSON
db error: ERROR: Failed to run the query

Caused by:
Invalid input syntax: Source does not support PRIMARY KEY constraint, please use "CREATE TABLE" instead
Bind error: can't CREATE SOURCE with FORMAT UPSERT

Hint: use CREATE TABLE instead

Hint: For FORMAT UPSERT ENCODE JSON, INCLUDE KEY must be specified and the key column must be used as primary key.
example:
CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
INCLUDE KEY [AS <key_name>]
WITH (...)
FORMAT UPSERT ENCODE JSON (...)


statement ok
Expand Down
73 changes: 52 additions & 21 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,18 @@ pub(crate) fn bind_all_columns(
}
}

fn hint_upsert(encode: &Encode) -> String {
return format!(
r#"Hint: For FORMAT UPSERT ENCODE {encode:}, INCLUDE KEY must be specified and the key column must be used as primary key.
example:
CREATE TABLE <table_name> ( PRIMARY KEY ([rw_key | <key_name>]) )
INCLUDE KEY [AS <key_name>]
WITH (...)
FORMAT UPSERT ENCODE {encode:} (...)
"#
);
}

/// Bind column from source. Add key column to table columns if necessary.
/// Return `pk_names`.
pub(crate) async fn bind_source_pk(
Expand All @@ -760,7 +772,7 @@ pub(crate) async fn bind_source_pk(
with_properties: &WithOptions,
) -> Result<Vec<String>> {
let sql_defined_pk = !sql_defined_pk_names.is_empty();
let key_column_name: Option<String> = {
let include_key_column_name: Option<String> = {
// iter columns to check if contains additional columns from key part
// return the key column names if exists
columns.iter().find_map(|catalog| {
Expand Down Expand Up @@ -793,34 +805,36 @@ pub(crate) async fn bind_source_pk(
// For all Upsert formats, we only accept one and only key column as primary key.
// Additional KEY columns must be set in this case and must be primary key.
(Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => {
if let Some(ref key_column_name) = key_column_name
if let Some(ref key_column_name) = include_key_column_name
&& sql_defined_pk
{
if sql_defined_pk_names.len() != 1 {
return Err(RwError::from(ProtocolError(format!(
"upsert {:?} supports only one primary key column ({}).",
encode, key_column_name
))));
}
// pk is set. check if it's valid

// the column name have been converted to real value in `handle_addition_columns`
// so we don't ignore ascii case here
if !key_column_name.eq(sql_defined_pk_names[0].as_str()) {
if sql_defined_pk_names.len() != 1
|| !key_column_name.eq(sql_defined_pk_names[0].as_str())
{
return Err(RwError::from(ProtocolError(format!(
"upsert {}'s key column {} not match with sql defined primary key {}",
encode, key_column_name, sql_defined_pk_names[0]
"Only {} can be used as primary key\n\n{}",
sql_defined_pk_names[0],
hint_upsert(&encode)
))));
}
sql_defined_pk_names
} else {
return if key_column_name.is_none() {
// pk not set, or even key not included
return if include_key_column_name.is_none() {
Err(RwError::from(ProtocolError(format!(
"INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE {:?}",
encode
"INCLUDE KEY clause not set\n\n{}",
hint_upsert(&encode)
))))
} else {
Err(RwError::from(ProtocolError(format!(
"Primary key must be specified to {} when creating source with FORMAT UPSERT ENCODE {:?}",
key_column_name.unwrap(), encode))))
"Primary key must be specified to {}\n\n{}",
include_key_column_name.unwrap(),
hint_upsert(&encode)
))))
};
}
}
Expand Down Expand Up @@ -1340,8 +1354,9 @@ pub fn bind_connector_props(
}
Ok(WithOptions::new(with_properties))
}

#[allow(clippy::too_many_arguments)]
pub async fn bind_create_source(
pub async fn bind_create_source_or_table_with_connector(
handler_args: HandlerArgs,
full_name: ObjectName,
source_schema: ConnectorSchema,
Expand All @@ -1364,9 +1379,25 @@ pub async fn bind_create_source(
session.get_database_and_schema_id_for_create(schema_name.clone())?;

if !is_create_source && with_properties.is_iceberg_connector() {
return Err(
ErrorCode::BindError("can't create table with iceberg connector".to_string()).into(),
);
return Err(ErrorCode::BindError(
"can't CREATE TABLE with iceberg connector\n\nHint: use CREATE SOURCE instead"
.to_string(),
)
.into());
}
if is_create_source {
match source_schema.format {
Format::Upsert => {
return Err(ErrorCode::BindError(format!(
"can't CREATE SOURCE with FORMAT UPSERT\n\nHint: use CREATE TABLE instead\n\n{}",
hint_upsert(&source_schema.row_encode)
))
.into());
}
_ => {
// TODO: enhance error message for other formats
}
}
}

ensure_table_constraints_supported(&constraints)?;
Expand Down Expand Up @@ -1522,7 +1553,7 @@ pub async fn handle_create_source(
}
let mut col_id_gen = ColumnIdGenerator::new_initial();

let (source_catalog, database_id, schema_id) = bind_create_source(
let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector(
handler_args.clone(),
stmt.source_name,
source_schema,
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime};
use crate::handler::create_source::{
bind_columns_from_source, bind_connector_props, bind_create_source, bind_source_watermark,
handle_addition_columns, UPSTREAM_SOURCE_KEY,
bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector,
bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY,
};
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::{CdcScanOptions, SourceNodeKind};
Expand Down Expand Up @@ -483,7 +483,7 @@ pub(crate) async fn gen_create_table_plan_with_source(
let (columns_from_resolve_source, source_info) =
bind_columns_from_source(session, &source_schema, &with_properties).await?;

let (source_catalog, database_id, schema_id) = bind_create_source(
let (source_catalog, database_id, schema_id) = bind_create_source_or_table_with_connector(
handler_args.clone(),
table_name,
source_schema,
Expand Down
23 changes: 16 additions & 7 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,16 @@ pub struct CreateSourceStatement {
pub include_column_options: IncludeOption,
}

/// FORMAT means how to get the operation(Insert/Delete) from the input.
///
/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Format {
/// The format is the same with RisingWave's internal representation.
/// Used internally for schema change
Native,
// Keyword::NONE
/// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
None,
// Keyword::DEBEZIUM
Debezium,
Expand Down Expand Up @@ -143,15 +148,16 @@ impl Format {
"CANAL" => Format::Canal,
"PLAIN" => Format::Plain,
"UPSERT" => Format::Upsert,
"NATIVE" => Format::Native, // used internally for schema change
"NONE" => Format::None, // used by iceberg
"NATIVE" => Format::Native,
"NONE" => Format::None,
_ => parser_err!(
"expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT"
),
})
}
}

/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Encode {
Expand All @@ -160,8 +166,11 @@ pub enum Encode {
Protobuf, // Keyword::PROTOBUF
Json, // Keyword::JSON
Bytes, // Keyword::BYTES
None, // Keyword::None
Text, // Keyword::TEXT
/// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
None,
Text, // Keyword::TEXT
/// The encode is the same with RisingWave's internal representation.
/// Used internally for schema change
Native,
Template,
}
Expand Down Expand Up @@ -197,8 +206,8 @@ impl Encode {
"PROTOBUF" => Encode::Protobuf,
"JSON" => Encode::Json,
"TEMPLATE" => Encode::Template,
"NATIVE" => Encode::Native, // used internally for schema change
"NONE" => Encode::None, // used by iceberg
"NATIVE" => Encode::Native,
"NONE" => Encode::None,
_ => parser_err!(
"expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | NONE after Encode"
),
Expand Down

0 comments on commit 828b5fa

Please sign in to comment.