diff --git a/e2e_test/source/basic/ddl.slt b/e2e_test/source/basic/ddl.slt index c1941d4697ffa..c6c1c0590d558 100644 --- a/e2e_test/source/basic/ddl.slt +++ b/e2e_test/source/basic/ddl.slt @@ -28,7 +28,7 @@ create source invalid_startup_timestamp ( properties.bootstrap.server = 'message_queue:29092' ) FORMAT PLAIN ENCODE JSON; -statement error db error: ERROR: QueryError: Invalid input syntax: schema definition is required for ENCODE JSON +statement error db error: ERROR: QueryError: Protocol error: Schema definition is required, either from SQL or schema registry. create source invalid_schema_definition with ( connector = 'kafka', diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index 945f60e732fb8..beafb02cb96ba 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -30,26 +30,17 @@ WITH ( FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); -statement ok -CREATE TABLE upsert_avro_json ( - PRIMARY KEY("ID") -) -WITH ( -connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', -topic = 'upsert_avro_json') -FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); +# TODO: Uncomment this when we add test data kafka key with format `"ID":id` +# statement ok +# CREATE TABLE upsert_avro_json ( +# PRIMARY KEY("ID") +# ) +# WITH ( +# connector = 'kafka', +# properties.bootstrap.server = 'message_queue:29092', +# topic = 'upsert_avro_json') +# FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); -# Just ignore the kafka key, it works -statement ok -CREATE TABLE upsert_avro_json2 ( - PRIMARY KEY("ID") -) -WITH ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'upsert_avro_json') -FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); statement ok CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with ( @@ -89,7 +80,7 @@ statement ok flush; # Wait enough time to ensure SourceExecutor consumes all Kafka data. -sleep 5s +sleep 8s query II SELECT @@ -104,33 +95,18 @@ delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -query II -SELECT - * -FROM - upsert_avro_json -ORDER BY - "ID"; ----- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z - - -query II -SELECT - * -FROM - upsert_avro_json2 -ORDER BY - "ID"; ----- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z - +# query II +# SELECT +# * +# FROM +# upsert_avro_json +# ORDER BY +# "ID"; +# ---- +# update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z +# delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z +# delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z +# delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z query II SELECT @@ -172,12 +148,8 @@ select * from kafka_json_schema_upsert order by id statement ok DROP TABLE upsert_avro_json_default_key; -statement ok -DROP TABLE upsert_avro_json; - - -statement ok -DROP TABLE upsert_avro_json2; +# statement ok +# DROP TABLE upsert_avro_json; statement ok DROP TABLE upsert_student_avro_json; diff --git a/e2e_test/source/basic/old_row_format_syntax/ddl.slt b/e2e_test/source/basic/old_row_format_syntax/ddl.slt index d0a8cd9ba08ea..6d1290463beeb 100644 --- a/e2e_test/source/basic/old_row_format_syntax/ddl.slt +++ b/e2e_test/source/basic/old_row_format_syntax/ddl.slt @@ -28,7 +28,7 @@ create source invalid_startup_timestamp ( properties.bootstrap.server = 'message_queue:29092' ) ROW FORMAT JSON; -statement error db error: ERROR: QueryError: Invalid input syntax: schema definition is required for ENCODE JSON +statement error db error: ERROR: QueryError: Protocol error: Schema definition is required, either from SQL or schema registry. create source invalid_schema_definition with ( connector = 'kafka', diff --git a/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt b/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt index 582aff7d958fb..37e2ef2266ff0 100644 --- a/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt @@ -33,28 +33,17 @@ ROW FORMAT UPSERT_AVRO row schema location confluent schema registry 'http://message_queue:8081' -statement ok -CREATE TABLE upsert_avro_json ( - PRIMARY KEY("ID") -) -WITH ( -connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', -topic = 'upsert_avro_json') -ROW FORMAT UPSERT_AVRO -row schema location confluent schema registry 'http://message_queue:8081' - -# Just ignore the kafka key, it works -statement ok -CREATE TABLE upsert_avro_json2 ( - PRIMARY KEY("ID") -) -WITH ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'upsert_avro_json') -ROW FORMAT UPSERT_AVRO -row schema location confluent schema registry 'http://message_queue:8081' +# TODO: Uncomment this when we add test data kafka key with format `"ID":id` +# statement ok +# CREATE TABLE upsert_avro_json ( +# PRIMARY KEY("ID") +# ) +# WITH ( +# connector = 'kafka', +# properties.bootstrap.server = 'message_queue:29092', +# topic = 'upsert_avro_json') +# ROW FORMAT UPSERT_AVRO +# row schema location confluent schema registry 'http://message_queue:8081' statement ok CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with ( @@ -65,7 +54,6 @@ CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with ( ) ROW FORMAT DEBEZIUM_AVRO ROW SCHEMA LOCATION CONFLUENT SCHEMA REGISTRY 'http://message_queue:8081'; - statement ok CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with ( connector = 'kafka', @@ -78,7 +66,7 @@ statement ok flush; # Wait enough time to ensure SourceExecutor consumes all Kafka data. -sleep 5s +sleep 10s query II SELECT @@ -93,32 +81,18 @@ delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -query II -SELECT - * -FROM - upsert_avro_json -ORDER BY - "ID"; ----- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z - - -query II -SELECT - * -FROM - upsert_avro_json2 -ORDER BY - "ID"; ----- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z +# query II +# SELECT +# * +# FROM +# upsert_avro_json +# ORDER BY +# "ID"; +# ---- +# update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z +# delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z +# delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z +# delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z query II @@ -150,12 +124,9 @@ select count(*) from debezium_compact; statement ok DROP TABLE upsert_avro_json_default_key; -statement ok -DROP TABLE upsert_avro_json; - +# statement ok +# DROP TABLE upsert_avro_json; -statement ok -DROP TABLE upsert_avro_json2; statement ok DROP TABLE upsert_student_avro_json; diff --git a/proto/catalog.proto b/proto/catalog.proto index c966b7bbe5eb0..4f421305c4eea 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -55,7 +55,8 @@ message StreamSourceInfo { string proto_message_name = 4; int32 csv_delimiter = 5; bool csv_has_header = 6; - string upsert_avro_primary_key = 7; + reserved 7; + reserved "upsert_avro_primary_key"; // deprecated plan_common.FormatType format = 8; plan_common.EncodeType row_encode = 9; SchemaRegistryNameStrategy name_strategy = 10; diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index e02bcca7fe2ae..61c55decfc040 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -108,7 +108,6 @@ pub struct AvroParserConfig { pub schema: Arc, pub key_schema: Option>, pub schema_resolver: Option>, - pub upsert_primary_key_column_name: Option, } impl AvroParserConfig { @@ -120,12 +119,7 @@ impl AvroParserConfig { if avro_config.use_schema_registry { let client = Client::new(url, &avro_config.client_config)?; let resolver = ConfluentSchemaResolver::new(client); - let upsert_primary_key_column_name = - if enable_upsert && !avro_config.upsert_primary_key.is_empty() { - Some(avro_config.upsert_primary_key.clone()) - } else { - None - }; + let subject_key = if enable_upsert { Some(get_subject_by_strategy( &avro_config.name_strategy, @@ -157,7 +151,6 @@ impl AvroParserConfig { None }, schema_resolver: Some(Arc::new(resolver)), - upsert_primary_key_column_name, }) } else { if enable_upsert { @@ -184,7 +177,6 @@ impl AvroParserConfig { schema: Arc::new(schema), key_schema: None, schema_resolver: None, - upsert_primary_key_column_name: None, }) } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 7362497bb2dcb..1fff3f2447d73 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -786,7 +786,6 @@ impl SpecificParserConfig { pub struct AvroProperties { pub use_schema_registry: bool, pub row_schema_location: String, - pub upsert_primary_key: String, pub client_config: SchemaRegistryAuth, pub aws_auth_props: Option, pub topic: String, @@ -887,7 +886,6 @@ impl SpecificParserConfig { .unwrap(), use_schema_registry: info.use_schema_registry, row_schema_location: info.row_schema_location.clone(), - upsert_primary_key: info.upsert_avro_primary_key.clone(), ..Default::default() }; if format == SourceFormat::Upsert { diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index 214775851103b..f9ce0caa7e254 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -34,7 +34,6 @@ pub struct UpsertParser { payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, source_ctx: SourceContextRef, - avro_primary_key_column_name: Option, } async fn build_accessor_builder( @@ -68,23 +67,18 @@ impl UpsertParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> Result { - let mut avro_primary_key_column_name = None; - let key_builder: AccessBuilderImpl; // check whether columns has `DEFAULT_KEY_COLUMN_NAME`, if so, the key accessor should be // bytes - if check_rw_default_key(&rw_columns) { - key_builder = AccessBuilderImpl::Bytes(BytesAccessBuilder::new( - EncodingProperties::Bytes(BytesProperties { + let key_builder = if check_rw_default_key(&rw_columns) { + AccessBuilderImpl::Bytes(BytesAccessBuilder::new(EncodingProperties::Bytes( + BytesProperties { column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()), - }), - )?); + }, + ))?) } else { - if let EncodingProperties::Avro(config) = &props.encoding_config { - avro_primary_key_column_name = Some(config.upsert_primary_key.clone()) - } let (key_config, key_type) = extract_key_config!(props); - key_builder = build_accessor_builder(key_config, key_type).await?; - } + build_accessor_builder(key_config, key_type).await? + }; let payload_builder = build_accessor_builder(props.encoding_config, EncodingType::Value).await?; Ok(Self { @@ -92,7 +86,6 @@ impl UpsertParser { payload_builder, rw_columns, source_ctx, - avro_primary_key_column_name, }) } @@ -113,9 +106,6 @@ impl UpsertParser { row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?); change_event_op = ChangeEventOperation::Upsert; } - if let Some(primary_key_name) = &self.avro_primary_key_column_name { - row_op = row_op.with_key_as_column_name(primary_key_name); - } apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op) .map_err(Into::into) diff --git a/src/frontend/planner_test/tests/testdata/output/create_source.yaml b/src/frontend/planner_test/tests/testdata/output/create_source.yaml index d1d6c314595b8..50178ccb1c6e5 100644 --- a/src/frontend/planner_test/tests/testdata/output/create_source.yaml +++ b/src/frontend/planner_test/tests/testdata/output/create_source.yaml @@ -10,7 +10,7 @@ - id: create_source_without_schema_in_json sql: | create source s with(connector='kafka') FORMAT PLAIN ENCODE JSON; - planner_error: 'Invalid input syntax: schema definition is required for ENCODE JSON' + planner_error: 'Protocol error: Schema definition is required, either from SQL or schema registry.' - id: csv_delimiter_tab sql: | explain create table s0 (v1 int, v2 varchar) with ( diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 85d867d538e7f..e4081fbee4fcf 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -226,7 +226,7 @@ pub async fn handle_create_sink( } /// Transforms the (format, encode, options) from sqlparser AST into an internal struct `SinkFormatDesc`. -/// This is an analogy to (part of) [`crate::handler::create_source::try_bind_columns_from_source`] +/// This is an analogy to (part of) [`crate::handler::create_source::bind_columns_from_source`] /// which transforms sqlparser AST `SourceSchemaV2` into `StreamSourceInfo`. fn bind_sink_format_desc(value: ConnectorSchema) -> Result { use risingwave_connector::sink::catalog::{SinkEncode, SinkFormat}; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 0ce3e32ed584e..8bca367351641 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -45,8 +45,8 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::plan_common::{EncodeType, FormatType}; use risingwave_sqlparser::ast::{ - self, get_delimiter, AstString, AvroSchema, ColumnDef, ColumnOption, ConnectorSchema, - CreateSourceStatement, DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceWatermark, + get_delimiter, AstString, AvroSchema, ColumnDef, ConnectorSchema, CreateSourceStatement, + DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceWatermark, }; use super::RwPgResponse; @@ -54,7 +54,7 @@ use crate::binder::Binder; use crate::catalog::ColumnId; use crate::expr::Expr; use crate::handler::create_table::{ - bind_pk_names, bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, + bind_pk_on_relation, bind_sql_column_constraints, bind_sql_columns, bind_sql_pk_names, ensure_table_constraints_supported, ColumnIdGenerator, }; use crate::handler::util::{get_connector, is_kafka_connector}; @@ -110,45 +110,35 @@ async fn extract_avro_table_schema( .collect_vec()) } -/// Map an Avro schema to a relational schema. And extract primary key columns. -async fn extract_upsert_avro_table_schema( +/// Extract Avro primary key columns. +async fn extract_upsert_avro_table_pk_columns( info: &StreamSourceInfo, with_properties: &HashMap, -) -> Result<(Vec, Vec)> { +) -> Result>> { let parser_config = SpecificParserConfig::new(info, with_properties)?; let conf = AvroParserConfig::new(parser_config.encoding_config).await?; let vec_column_desc = conf.map_to_columns()?; - let mut vec_column_catalog = vec_column_desc - .clone() - .into_iter() - .map(|col| ColumnCatalog { - column_desc: col.into(), - is_hidden: false, - }) - .collect_vec(); - // For upsert avro, if we can't extract pk from schema, use message key as primary key - let pks = if let Ok(pk_desc) = conf.extract_pks() { - pk_desc - .into_iter() - .map(|desc| { - vec_column_desc - .iter() - .find(|x| x.name == desc.name) - .ok_or_else(|| { - RwError::from(ErrorCode::InternalError(format!( - "Can not found primary key column {} in value schema", - desc.name - ))) - }) - }) - .map_ok(|desc| desc.name.clone()) - .collect::>>()? - } else { - add_upsert_default_key_column(&mut vec_column_catalog); - vec![DEFAULT_KEY_COLUMN_NAME.into()] - }; - Ok((vec_column_catalog, pks)) + conf.extract_pks() + .ok() + .map(|pk_desc| { + pk_desc + .into_iter() + .map(|desc| { + vec_column_desc + .iter() + .find(|x| x.name == desc.name) + .ok_or_else(|| { + RwError::from(ErrorCode::InternalError(format!( + "Can not found primary key column {} in value schema", + desc.name + ))) + }) + }) + .map_ok(|desc| desc.name.clone()) + .collect::>>() + }) + .transpose() } async fn extract_debezium_avro_table_pk_columns( @@ -208,11 +198,7 @@ async fn extract_protobuf_table_schema( fn non_generated_sql_columns(columns: &[ColumnDef]) -> Vec { columns .iter() - .filter(|c| { - c.options - .iter() - .all(|option| !matches!(option.option, ColumnOption::GeneratedColumns(_))) - }) + .filter(|c| !c.is_generated()) .cloned() .collect() } @@ -275,19 +261,15 @@ fn get_name_strategy_or_default(name_strategy: Option) -> Result for more information. -/// return `(columns, pk_names, source info)` -pub(crate) async fn try_bind_columns_from_source( +/// return `(columns, source info)` +pub(crate) async fn bind_columns_from_source( source_schema: &ConnectorSchema, - sql_defined_pk_names: Vec, - sql_defined_columns: &[ColumnDef], with_properties: &HashMap, -) -> Result<(Option>, Vec, StreamSourceInfo)> { +) -> Result<(Option>, StreamSourceInfo)> { const MESSAGE_NAME_KEY: &str = "message"; const KEY_MESSAGE_NAME_KEY: &str = "key.message"; const NAME_STRATEGY_KEY: &str = "schema.registry.name.strategy"; - let sql_defined_pk = !sql_defined_pk_names.is_empty(); - let sql_defined_schema = !sql_defined_columns.is_empty(); let is_kafka: bool = is_kafka_connector(with_properties); let mut options = WithOptions::try_from(source_schema.row_options())?.into_inner(); @@ -314,7 +296,6 @@ pub(crate) async fn try_bind_columns_from_source( let res = match (&source_schema.format, &source_schema.row_encode) { (Format::Native, Encode::Native) => ( None, - sql_defined_pk_names, StreamSourceInfo { format: FormatType::Native as i32, row_encode: EncodeType::Native as i32, @@ -322,10 +303,6 @@ pub(crate) async fn try_bind_columns_from_source( }, ), (Format::Plain, Encode::Protobuf) => { - if sql_defined_schema { - return Err(RwError::from(ProtocolError( - "User-defined schema is not allowed with FORMAT PLAIN ENCODE PROTOBUF. Please refer to https://www.risingwave.dev/docs/current/sql-create-source/#protobuf for more information.".to_string()))); - }; let (row_schema_location, use_schema_registry) = get_schema_location(&mut options)?; let protobuf_schema = ProtobufSchema { message_name: consume_string_from_options(&mut options, MESSAGE_NAME_KEY)?, @@ -340,7 +317,6 @@ pub(crate) async fn try_bind_columns_from_source( extract_protobuf_table_schema(&protobuf_schema, with_properties.clone()) .await?, ), - sql_defined_pk_names, StreamSourceInfo { format: FormatType::Plain as i32, row_encode: EncodeType::Protobuf as i32, @@ -357,19 +333,8 @@ pub(crate) async fn try_bind_columns_from_source( } (Format::Plain, Encode::Json) => { let schema_config = get_json_schema_location(&mut options)?; - if schema_config.is_some() && sql_defined_schema { - return Err(RwError::from(ProtocolError( - "User-defined schema is not allowed with schema registry.".to_string(), - ))); - } - if schema_config.is_none() && sql_defined_columns.is_empty() { - return Err(RwError::from(InvalidInputSyntax( - "schema definition is required for ENCODE JSON".to_owned(), - ))); - } ( extract_json_table_schema(&schema_config, with_properties).await?, - sql_defined_pk_names, StreamSourceInfo { format: FormatType::Plain as i32, row_encode: EncodeType::Json as i32, @@ -384,10 +349,7 @@ pub(crate) async fn try_bind_columns_from_source( row_schema_location, use_schema_registry, }; - if sql_defined_schema { - return Err(RwError::from(ProtocolError( - "User-defined schema is not allowed with FORMAT PLAIN ENCODE AVRO. Please refer to https://www.risingwave.dev/docs/current/sql-create-source/#avro for more information.".to_string()))); - } + let key_message_name = get_key_message_name(&mut options); let message_name = try_consume_string_from_options(&mut options, MESSAGE_NAME_KEY); let name_strategy = @@ -405,7 +367,6 @@ pub(crate) async fn try_bind_columns_from_source( }; ( Some(extract_avro_table_schema(&stream_source_info, with_properties).await?), - sql_defined_pk_names, stream_source_info, ) } @@ -425,7 +386,6 @@ pub(crate) async fn try_bind_columns_from_source( } ( None, - sql_defined_pk_names, StreamSourceInfo { format: FormatType::Plain as i32, row_encode: EncodeType::Csv as i32, @@ -435,48 +395,20 @@ pub(crate) async fn try_bind_columns_from_source( }, ) } - (Format::Plain, Encode::Bytes) => { - if !sql_defined_schema || sql_defined_columns.len() != 1 { - return Err(RwError::from(ProtocolError( - "BYTES format only accepts one column".to_string(), - ))); - } - - match sql_defined_columns[0].data_type { - Some(ast::DataType::Bytea) => {} - _ => { - return Err(RwError::from(ProtocolError( - "BYTES format only accepts BYTEA type".to_string(), - ))) - } - } - - ( - None, - sql_defined_pk_names, - StreamSourceInfo { - format: FormatType::Plain as i32, - row_encode: EncodeType::Bytes as i32, - ..Default::default() - }, - ) - } + (Format::Plain, Encode::Bytes) => ( + None, + StreamSourceInfo { + format: FormatType::Plain as i32, + row_encode: EncodeType::Bytes as i32, + ..Default::default() + }, + ), (Format::Upsert, Encode::Json) => { let schema_config = get_json_schema_location(&mut options)?; let columns = extract_json_table_schema(&schema_config, with_properties).await?; - let (columns, pk_names) = if !sql_defined_pk { - let mut columns = match columns { - None => bind_sql_columns(sql_defined_columns)?, - Some(columns) => columns, - }; - add_upsert_default_key_column(&mut columns); - (Some(columns), vec![DEFAULT_KEY_COLUMN_NAME.into()]) - } else { - (columns, sql_defined_pk_names) - }; + ( columns, - pk_names, StreamSourceInfo { format: FormatType::Upsert as i32, row_encode: EncodeType::Json as i32, @@ -491,10 +423,6 @@ pub(crate) async fn try_bind_columns_from_source( row_schema_location, use_schema_registry, }; - if sql_defined_schema { - return Err(RwError::from(ProtocolError( - "User-defined schema is not allowed with row format upsert avro. Please refer to https://www.risingwave.dev/docs/current/sql-create-source/#avro for more information.".to_string()))); - } let name_strategy = get_sr_name_strategy_check(&mut options, avro_schema.use_schema_registry)? @@ -502,57 +430,25 @@ pub(crate) async fn try_bind_columns_from_source( let key_message_name = get_key_message_name(&mut options); let message_name = try_consume_string_from_options(&mut options, MESSAGE_NAME_KEY); - if sql_defined_pk { - if sql_defined_pk_names.len() != 1 { - return Err(RwError::from(ProtocolError( - "upsert avro supports only one primary key column.".to_string(), - ))); - } - let upsert_avro_primary_key = sql_defined_pk_names[0].clone(); - - let stream_source_info = StreamSourceInfo { - key_message_name, - format: FormatType::Upsert as i32, - row_encode: EncodeType::Avro as i32, - row_schema_location: avro_schema.row_schema_location.0.clone(), - use_schema_registry: avro_schema.use_schema_registry, - proto_message_name: message_name.unwrap_or(AstString("".into())).0, - upsert_avro_primary_key, - name_strategy, - ..Default::default() - }; - let columns = - extract_avro_table_schema(&stream_source_info, with_properties).await?; + let stream_source_info = StreamSourceInfo { + key_message_name, + format: FormatType::Upsert as i32, + row_encode: EncodeType::Avro as i32, + row_schema_location: avro_schema.row_schema_location.0.clone(), + use_schema_registry: avro_schema.use_schema_registry, + proto_message_name: message_name.unwrap_or(AstString("".into())).0, + name_strategy, + ..Default::default() + }; + let columns = extract_avro_table_schema(&stream_source_info, with_properties).await?; - (Some(columns), sql_defined_pk_names, stream_source_info) - } else { - let stream_source_info = StreamSourceInfo { - format: FormatType::Upsert as i32, - row_encode: EncodeType::Avro as i32, - row_schema_location: avro_schema.row_schema_location.0.clone(), - use_schema_registry: avro_schema.use_schema_registry, - proto_message_name: message_name.unwrap_or(AstString("".into())).0, - name_strategy, - key_message_name, - ..Default::default() - }; - let (columns, pk_from_avro) = - extract_upsert_avro_table_schema(&stream_source_info, with_properties).await?; - (Some(columns), pk_from_avro, stream_source_info) - } + (Some(columns), stream_source_info) } (Format::Debezium, Encode::Json) => { - if !sql_defined_pk { - return Err(RwError::from(ProtocolError( - "Primary key must be specified when creating source with format debezium." - .to_string(), - ))); - } let schema_config = get_json_schema_location(&mut options)?; ( extract_json_table_schema(&schema_config, with_properties).await?, - sql_defined_pk_names, StreamSourceInfo { format: FormatType::Debezium as i32, row_encode: EncodeType::Json as i32, @@ -571,11 +467,6 @@ pub(crate) async fn try_bind_columns_from_source( let avro_schema = DebeziumAvroSchema { row_schema_location, }; - if sql_defined_schema { - return Err(RwError::from(ProtocolError( - "User-defined schema is not allowed with row format debezium avro.".to_string(), - ))); - } // no need to check whether works schema registry because debezium avro always work with // schema registry @@ -597,121 +488,22 @@ pub(crate) async fn try_bind_columns_from_source( let full_columns = extract_debezium_avro_table_schema(&stream_source_info, with_properties).await?; - let pk_names = if sql_defined_pk { - sql_defined_pk_names - } else { - let pk_names = - extract_debezium_avro_table_pk_columns(&stream_source_info, with_properties) - .await?; - // extract pk(s) from schema registry - for pk_name in &pk_names { - full_columns - .iter() - .find(|c: &&ColumnCatalog| c.name().eq(pk_name)) - .ok_or_else(|| { - RwError::from(ProtocolError(format!( - "avro's key column {} not exists in avro's row schema", - pk_name - ))) - })?; - } - pk_names - }; - (Some(full_columns), pk_names, stream_source_info) - } - (Format::DebeziumMongo, Encode::Json) => { - let mut columns = vec![ - ColumnCatalog { - column_desc: ColumnDesc { - data_type: DataType::Varchar, - column_id: 0.into(), - name: "_id".to_string(), - field_descs: vec![], - type_name: "".to_string(), - generated_or_default_column: None, - description: None, - }, - is_hidden: false, - }, - ColumnCatalog { - column_desc: ColumnDesc { - data_type: DataType::Jsonb, - column_id: 0.into(), - name: "payload".to_string(), - field_descs: vec![], - type_name: "".to_string(), - generated_or_default_column: None, - description: None, - }, - is_hidden: false, - }, - ]; - if sql_defined_schema { - let non_generated_sql_defined_columns = - non_generated_sql_columns(sql_defined_columns); - if non_generated_sql_defined_columns.len() != 2 - && non_generated_sql_defined_columns[0].name.real_value() != columns[0].name() - && non_generated_sql_defined_columns[1].name.real_value() != columns[1].name() - { - return Err(RwError::from(ProtocolError( - "the not generated columns of the source with row format DebeziumMongoJson - must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)." - .to_string(), - ))); - } - if let Some(key_data_type) = &non_generated_sql_defined_columns[0].data_type { - let key_data_type = bind_data_type(key_data_type)?; - match key_data_type { - DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => { - columns[0].column_desc.data_type = key_data_type; - } - _ => { - return Err(RwError::from(ProtocolError( - "the `_id` column of the source with row format DebeziumMongoJson - must be [Jsonb | Varchar | Int32 | Int64]" - .to_string(), - ))); - } - } - } - if let Some(value_data_type) = &non_generated_sql_defined_columns[1].data_type { - if !matches!(bind_data_type(value_data_type)?, DataType::Jsonb) { - return Err(RwError::from(ProtocolError( - "the `payload` column of the source with row format DebeziumMongoJson - must be Jsonb datatype" - .to_string(), - ))); - } - } - } - let pk_names = if sql_defined_pk { - sql_defined_pk_names - } else { - vec!["_id".to_string()] - }; - ( - Some(columns), - pk_names, - StreamSourceInfo { - format: FormatType::DebeziumMongo as i32, - row_encode: EncodeType::Json as i32, - ..Default::default() - }, - ) + (Some(full_columns), stream_source_info) } + (Format::DebeziumMongo, Encode::Json) => ( + None, + StreamSourceInfo { + format: FormatType::DebeziumMongo as i32, + row_encode: EncodeType::Json as i32, + ..Default::default() + }, + ), (Format::Maxwell, Encode::Json) => { - if !sql_defined_pk { - return Err(RwError::from(ProtocolError( - "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON." - .to_string(), - ))); - } let schema_config = get_json_schema_location(&mut options)?; ( extract_json_table_schema(&schema_config, with_properties).await?, - sql_defined_pk_names, StreamSourceInfo { format: FormatType::Maxwell as i32, row_encode: EncodeType::Json as i32, @@ -722,16 +514,9 @@ pub(crate) async fn try_bind_columns_from_source( } (Format::Canal, Encode::Json) => { - if !sql_defined_pk { - return Err(RwError::from(ProtocolError( - "Primary key must be specified when creating source with row format cannal_json." - .to_string(), - ))); - } let schema_config = get_json_schema_location(&mut options)?; ( extract_json_table_schema(&schema_config, with_properties).await?, - sql_defined_pk_names, StreamSourceInfo { format: FormatType::Canal as i32, row_encode: EncodeType::Json as i32, @@ -762,6 +547,225 @@ pub(crate) async fn try_bind_columns_from_source( Ok(res) } +/// Bind columns from both source and sql defined. +pub(crate) fn bind_all_columns( + source_schema: &ConnectorSchema, + cols_from_source: Option>, + cols_from_sql: Vec, + col_defs_from_sql: &[ColumnDef], +) -> Result> { + if let Some(cols_from_source) = cols_from_source { + if cols_from_sql.is_empty() { + Ok(cols_from_source) + } else { + // TODO(yuhao): https://github.com/risingwavelabs/risingwave/issues/12209 + Err(RwError::from(ProtocolError( + format!("User-defined schema from SQL is not allowed with FORMAT {} ENCODE {}. \ + Please refer to https://www.risingwave.dev/docs/current/sql-create-source/ for more information.", source_schema.format, source_schema.row_encode)))) + } + } else { + // FIXME(yuhao): cols_from_sql should be None is no `()` is given. + if cols_from_sql.is_empty() { + return Err(RwError::from(ProtocolError( + "Schema definition is required, either from SQL or schema registry.".to_string(), + ))); + } + match (&source_schema.format, &source_schema.row_encode) { + (Format::DebeziumMongo, Encode::Json) => { + let mut columns = vec![ + ColumnCatalog { + column_desc: ColumnDesc { + data_type: DataType::Varchar, + column_id: 0.into(), + name: "_id".to_string(), + field_descs: vec![], + type_name: "".to_string(), + generated_or_default_column: None, + description: None, + }, + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc { + data_type: DataType::Jsonb, + column_id: 0.into(), + name: "payload".to_string(), + field_descs: vec![], + type_name: "".to_string(), + generated_or_default_column: None, + description: None, + }, + is_hidden: false, + }, + ]; + let non_generated_sql_defined_columns = + non_generated_sql_columns(col_defs_from_sql); + if non_generated_sql_defined_columns.len() != 2 + || non_generated_sql_defined_columns[0].name.real_value() != columns[0].name() + || non_generated_sql_defined_columns[1].name.real_value() != columns[1].name() + { + return Err(RwError::from(ProtocolError( + "the not generated columns of the source with row format DebeziumMongoJson + must be (_id [Jsonb | Varchar | Int32 | Int64], payload jsonb)." + .to_string(), + ))); + } + // ok to unwrap since it was checked at `bind_sql_columns` + let key_data_type = bind_data_type( + non_generated_sql_defined_columns[0] + .data_type + .as_ref() + .unwrap(), + )?; + match key_data_type { + DataType::Jsonb | DataType::Varchar | DataType::Int32 | DataType::Int64 => { + columns[0].column_desc.data_type = key_data_type.clone(); + } + _ => { + return Err(RwError::from(ProtocolError( + "the `_id` column of the source with row format DebeziumMongoJson + must be [Jsonb | Varchar | Int32 | Int64]" + .to_string(), + ))); + } + } + + // ok to unwrap since it was checked at `bind_sql_columns` + let value_data_type = bind_data_type( + non_generated_sql_defined_columns[1] + .data_type + .as_ref() + .unwrap(), + )?; + if !matches!(value_data_type, DataType::Jsonb) { + return Err(RwError::from(ProtocolError( + "the `payload` column of the source with row format DebeziumMongoJson + must be Jsonb datatype" + .to_string(), + ))); + } + Ok(columns) + } + (Format::Plain, Encode::Bytes) => { + if cols_from_sql.len() != 1 || cols_from_sql[0].data_type() != &DataType::Bytea { + return Err(RwError::from(ProtocolError( + "ENCODE BYTES only accepts one BYTEA type column".to_string(), + ))); + } + Ok(cols_from_sql) + } + (_, _) => Ok(cols_from_sql), + } + } +} + +/// Bind column from source. Add key column to table columns if necessary. +/// Return (columns, pks) +pub(crate) async fn bind_source_pk( + source_schema: &ConnectorSchema, + source_info: &StreamSourceInfo, + columns: &mut Vec, + sql_defined_pk_names: Vec, + with_properties: &HashMap, +) -> Result> { + let sql_defined_pk = !sql_defined_pk_names.is_empty(); + + let res = match (&source_schema.format, &source_schema.row_encode) { + (Format::Native, Encode::Native) | (Format::Plain, _) => sql_defined_pk_names, + (Format::Upsert, Encode::Json) => { + if sql_defined_pk { + sql_defined_pk_names + } else { + add_upsert_default_key_column(columns); + vec![DEFAULT_KEY_COLUMN_NAME.into()] + } + } + (Format::Upsert, Encode::Avro) => { + if sql_defined_pk { + if sql_defined_pk_names.len() != 1 { + return Err(RwError::from(ProtocolError( + "upsert avro supports only one primary key column.".to_string(), + ))); + } + sql_defined_pk_names + } else if let Some(extracted_pk_names) = + extract_upsert_avro_table_pk_columns(source_info, with_properties).await? + { + extracted_pk_names + } else { + // For upsert avro, if we can't extract pk from schema, use message key as primary key + add_upsert_default_key_column(columns); + vec![DEFAULT_KEY_COLUMN_NAME.into()] + } + } + + (Format::Debezium, Encode::Json) => { + if !sql_defined_pk { + return Err(RwError::from(ProtocolError( + "Primary key must be specified when creating source with FORMAT DEBEZIUM." + .to_string(), + ))); + } + sql_defined_pk_names + } + (Format::Debezium, Encode::Avro) => { + if sql_defined_pk { + sql_defined_pk_names + } else { + let pk_names = + extract_debezium_avro_table_pk_columns(source_info, with_properties).await?; + // extract pk(s) from schema registry + for pk_name in &pk_names { + columns + .iter() + .find(|c: &&ColumnCatalog| c.name().eq(pk_name)) + .ok_or_else(|| { + RwError::from(ProtocolError(format!( + "avro's key column {} not exists in avro's row schema", + pk_name + ))) + })?; + } + pk_names + } + } + (Format::DebeziumMongo, Encode::Json) => { + if sql_defined_pk { + sql_defined_pk_names + } else { + vec!["_id".to_string()] + } + } + + (Format::Maxwell, Encode::Json) => { + if !sql_defined_pk { + return Err(RwError::from(ProtocolError( + "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON." + .to_string(), + ))); + } + sql_defined_pk_names + } + + (Format::Canal, Encode::Json) => { + if !sql_defined_pk { + return Err(RwError::from(ProtocolError( + "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON." + .to_string(), + ))); + } + sql_defined_pk_names + } + (format, encoding) => { + return Err(RwError::from(ProtocolError(format!( + "Unknown combination {:?} {:?}", + format, encoding + )))); + } + }; + Ok(res) +} + // Add a hidden column `_rw_kafka_timestamp` to each message from Kafka source. fn check_and_add_timestamp_column( with_properties: &HashMap, @@ -1070,14 +1074,26 @@ pub async fn handle_create_source( validate_compatibility(&source_schema, &mut with_properties)?; ensure_table_constraints_supported(&stmt.constraints)?; - let pk_names = bind_pk_names(&stmt.columns, &stmt.constraints)?; + let sql_pk_names = bind_sql_pk_names(&stmt.columns, &stmt.constraints)?; - let (columns_from_resolve_source, pk_names, source_info) = - try_bind_columns_from_source(&source_schema, pk_names, &stmt.columns, &with_properties) - .await?; + let (columns_from_resolve_source, source_info) = + bind_columns_from_source(&source_schema, &with_properties).await?; let columns_from_sql = bind_sql_columns(&stmt.columns)?; - let mut columns = columns_from_resolve_source.unwrap_or(columns_from_sql); + let mut columns = bind_all_columns( + &source_schema, + columns_from_resolve_source, + columns_from_sql, + &stmt.columns, + )?; + let pk_names = bind_source_pk( + &source_schema, + &source_info, + &mut columns, + sql_pk_names, + &with_properties, + ) + .await?; check_and_add_timestamp_column(&with_properties, &mut columns); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index e412658cb712f..efa4278bcbc99 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -43,8 +43,8 @@ use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, CatalogError, ColumnId}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ - bind_source_watermark, check_source_schema, try_bind_columns_from_source, - validate_compatibility, UPSTREAM_SOURCE_KEY, + bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark, + check_source_schema, validate_compatibility, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::LogicalSource; @@ -346,7 +346,7 @@ pub fn ensure_table_constraints_supported(table_constraints: &[TableConstraint]) Ok(()) } -pub fn bind_pk_names( +pub fn bind_sql_pk_names( columns_defs: &[ColumnDef], table_constraints: &[TableConstraint], ) -> Result> { @@ -457,13 +457,27 @@ pub(crate) async fn gen_create_table_plan_with_source( validate_compatibility(&source_schema, &mut properties)?; ensure_table_constraints_supported(&constraints)?; - let pk_names = bind_pk_names(&column_defs, &constraints)?; - let (columns_from_resolve_source, pk_names, mut source_info) = - try_bind_columns_from_source(&source_schema, pk_names, &column_defs, &properties).await?; + let sql_pk_names = bind_sql_pk_names(&column_defs, &constraints)?; + + let (columns_from_resolve_source, mut source_info) = + bind_columns_from_source(&source_schema, &properties).await?; let columns_from_sql = bind_sql_columns(&column_defs)?; - let mut columns = columns_from_resolve_source.unwrap_or(columns_from_sql); + let mut columns = bind_all_columns( + &source_schema, + columns_from_resolve_source, + columns_from_sql, + &column_defs, + )?; + let pk_names = bind_source_pk( + &source_schema, + &source_info, + &mut columns, + sql_pk_names, + &properties, + ) + .await?; for c in &mut columns { c.column_desc.column_id = col_id_gen.generate(c.name()) @@ -605,7 +619,7 @@ pub(crate) fn gen_create_table_plan_without_bind( version: Option, ) -> Result<(PlanRef, Option, PbTable)> { ensure_table_constraints_supported(&constraints)?; - let pk_names = bind_pk_names(&column_defs, &constraints)?; + let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; let (mut columns, pk_column_ids, row_id_index) = bind_pk_on_relation(columns, pk_names)?; let watermark_descs = bind_source_watermark( @@ -976,7 +990,7 @@ mod tests { c.column_desc.column_id = col_id_gen.generate(c.name()) } ensure_table_constraints_supported(&constraints)?; - let pk_names = bind_pk_names(&column_defs, &constraints)?; + let pk_names = bind_sql_pk_names(&column_defs, &constraints)?; let (_, pk_column_ids, _) = bind_pk_on_relation(columns, pk_names)?; Ok(pk_column_ids) })(); diff --git a/src/sqlparser/src/ast/ddl.rs b/src/sqlparser/src/ast/ddl.rs index 8cdc4ac2aa865..ae497d777c4d0 100644 --- a/src/sqlparser/src/ast/ddl.rs +++ b/src/sqlparser/src/ast/ddl.rs @@ -363,6 +363,12 @@ impl ColumnDef { options, } } + + pub fn is_generated(&self) -> bool { + self.options + .iter() + .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_))) + } } impl fmt::Display for ColumnDef {