diff --git a/e2e_test/sink/kafka/avro.slt b/e2e_test/sink/kafka/avro.slt index a9040c93f793f..45ecf302f0ddd 100644 --- a/e2e_test/sink/kafka/avro.slt +++ b/e2e_test/sink/kafka/avro.slt @@ -1,5 +1,7 @@ statement ok -create table from_kafka with ( +create table from_kafka ( primary key (some_key) ) +include key as some_key +with ( connector = 'kafka', topic = 'test-rw-sink-upsert-avro', properties.bootstrap.server = 'message_queue:29092') diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt new file mode 100644 index 0000000000000..d7780d4916376 --- /dev/null +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -0,0 +1,63 @@ +# upsert format must have a pk +statement error +CREATE TABLE upsert_students_default_key ( + "ID" INT, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL +) +INCLUDE KEY AS rw_key +WITH ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'upsert_json') +FORMAT UPSERT ENCODE JSON + +# upsert format pk must be the key column +statement error +CREATE TABLE upsert_students_default_key ( + "ID" INT primary key, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL +) +INCLUDE KEY AS rw_key +WITH ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'upsert_json') +FORMAT UPSERT ENCODE JSON + +statement ok +CREATE TABLE upsert_students_default_key ( + "ID" INT, + "firstName" VARCHAR, + "lastName" VARCHAR, + age INT, + height REAL, + weight REAL, +) +INCLUDE KEY AS rw_key +WITH ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'upsert_json') +FORMAT PLAIN ENCODE JSON + +statement ok +select * from upsert_students_default_key; + +# Wait enough time to ensure SourceExecutor consumes all Kafka data. +sleep 3s + +query I +select count(rw_key) from upsert_students_default_key +---- +15 + +statement ok +drop table upsert_students_default_key diff --git a/e2e_test/source/basic/kafka.slt b/e2e_test/source/basic/kafka.slt index b4782d36ced80..ab7616084db5f 100644 --- a/e2e_test/source/basic/kafka.slt +++ b/e2e_test/source/basic/kafka.slt @@ -415,8 +415,10 @@ CREATE TABLE upsert_students_default_key ( "lastName" VARCHAR, age INT, height REAL, - weight REAL + weight REAL, + primary key (rw_key) ) +INCLUDE KEY AS rw_key WITH ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', @@ -425,13 +427,15 @@ FORMAT UPSERT ENCODE JSON statement ok CREATE TABLE upsert_students ( - "ID" INT PRIMARY KEY, + "ID" INT, "firstName" VARCHAR, "lastName" VARCHAR, age INT, height REAL, - weight REAL + weight REAL, + primary key (rw_key) ) +INCLUDE KEY AS rw_key WITH ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index 333d9b5909ee3..3e251009df1dd 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -3,26 +3,17 @@ # If we cannot extract key schema, use message key as varchar primary key statement ok -CREATE TABLE upsert_avro_json_default_key () +CREATE TABLE upsert_avro_json_default_key ( primary key (rw_key) ) +INCLUDE KEY AS rw_key WITH ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', topic = 'upsert_avro_json') FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); - -# key schema should be a subset of value schema -statement error -CREATE TABLE upsert_student_key_not_subset_of_value_avro_json () -WITH ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'upsert_student_key_not_subset_of_value_avro_json') -FORMAT UPSERT ENCODE AVRO (schema.registry = 'http://message_queue:8081'); - - statement ok -CREATE TABLE upsert_student_avro_json () +CREATE TABLE upsert_student_avro_json ( primary key (rw_key) ) +INCLUDE KEY AS rw_key WITH ( connector = 'kafka', properties.bootstrap.server = 'message_queue:29092', @@ -68,7 +59,8 @@ CREATE TABLE kafka_json_schema_plain with ( ) FORMAT PLAIN ENCODE JSON (schema.registry = 'http://schemaregistry:8082'); statement ok -CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(id)) +CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(rw_key)) +INCLUDE KEY AS rw_key with ( connector = 'kafka', kafka.topic = 'kafka_upsert_json_schema', diff --git a/e2e_test/source/basic/old_row_format_syntax/kafka.slt b/e2e_test/source/basic/old_row_format_syntax/kafka.slt index 77b8b759b8624..8e8c42a6e41f3 100644 --- a/e2e_test/source/basic/old_row_format_syntax/kafka.slt +++ b/e2e_test/source/basic/old_row_format_syntax/kafka.slt @@ -368,33 +368,20 @@ WITH ( topic = 'debezium_mongo_json_customers_no_schema_field') ROW FORMAT DEBEZIUM_MONGO_JSON -statement ok -CREATE TABLE upsert_students_default_key ( - "ID" INT, - "firstName" VARCHAR, - "lastName" VARCHAR, - age INT, - height REAL, - weight REAL -) -WITH ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'upsert_json') -ROW FORMAT UPSERT_JSON - statement ok CREATE TABLE upsert_students ( - "ID" INT PRIMARY KEY, + "ID" INT, "firstName" VARCHAR, "lastName" VARCHAR, age INT, height REAL, - weight REAL + weight REAL, + primary key (rw_key) ) +INCLUDE KEY AS rw_key WITH ( connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', + properties.bootstrap.server = 'message_queue:29092', topic = 'upsert_json') ROW FORMAT UPSERT_JSON @@ -682,27 +669,6 @@ ORDER BY 6 Leah Davis 18 5.7 140 9 Jacob Anderson 20 5.8 155 -query II -SELECT - "ID", - "firstName", - "lastName", - "age", - "height", - "weight" -FROM - upsert_students_default_key -ORDER BY - "ID"; ----- -1 Ethan Martinez 18 6.1 180 -2 Emily Jackson 19 5.4 110 -3 Noah Thompson 21 6.3 195 -4 Emma Brown 20 5.3 130 -5 Michael Williams 22 6.2 190 -6 Leah Davis 18 5.7 140 -9 Jacob Anderson 20 5.8 155 - query II select L_ORDERKEY, @@ -791,8 +757,5 @@ DROP TABLE mongo_customers_no_schema_field; statement ok DROP TABLE upsert_students; -statement ok -DROP TABLE upsert_students_default_key; - statement ok drop table dbz_ignore_case_json; diff --git a/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt b/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt deleted file mode 100644 index eaeb70ae5bad9..0000000000000 --- a/e2e_test/source/basic/old_row_format_syntax/nosim_kafka.slt +++ /dev/null @@ -1,138 +0,0 @@ -# Start with nosim to avoid running in deterministic test - - -# If we cannot extract key schema, use message key as varchar primary key -statement ok -CREATE TABLE upsert_avro_json_default_key () -WITH ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'upsert_avro_json') -ROW FORMAT UPSERT_AVRO -row schema location confluent schema registry 'http://message_queue:8081' - - -# key schema should be a subset of value schema -statement error -CREATE TABLE upsert_student_key_not_subset_of_value_avro_json () -WITH ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'upsert_student_key_not_subset_of_value_avro_json') -ROW FORMAT UPSERT_AVRO -row schema location confluent schema registry 'http://message_queue:8081' - - -statement ok -CREATE TABLE upsert_student_avro_json () -WITH ( - connector = 'kafka', - properties.bootstrap.server = 'message_queue:29092', - topic = 'upsert_student_avro_json') -ROW FORMAT UPSERT_AVRO -row schema location confluent schema registry 'http://message_queue:8081' - - -# TODO: Uncomment this when we add test data kafka key with format `"ID":id` -# statement ok -# CREATE TABLE upsert_avro_json ( -# PRIMARY KEY("ID") -# ) -# WITH ( -# connector = 'kafka', -# properties.bootstrap.server = 'message_queue:29092', -# topic = 'upsert_avro_json') -# ROW FORMAT UPSERT_AVRO -# row schema location confluent schema registry 'http://message_queue:8081' - -statement ok -CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with ( - connector = 'kafka', - kafka.topic = 'debezium_non_compact_avro_json', - kafka.brokers = 'message_queue:29092', - kafka.scan.startup.mode = 'earliest' -) ROW FORMAT DEBEZIUM_AVRO ROW SCHEMA LOCATION CONFLUENT SCHEMA REGISTRY 'http://message_queue:8081'; - - -statement ok -CREATE TABLE debezium_compact (PRIMARY KEY(order_id)) with ( - connector = 'kafka', - kafka.topic = 'debezium_compact_avro_json', - kafka.brokers = 'message_queue:29092', - kafka.scan.startup.mode = 'earliest' -) ROW FORMAT DEBEZIUM_AVRO ROW SCHEMA LOCATION CONFLUENT SCHEMA REGISTRY 'http://message_queue:8081'; - -statement ok -flush; - -# Wait enough time to ensure SourceExecutor consumes all Kafka data. -sleep 10s - -query II -SELECT - op_type, "ID", "CLASS_ID", "ITEM_ID", "ATTR_ID", "ATTR_VALUE", "ORG_ID", "UNIT_INFO", "UPD_TIME" -FROM - upsert_avro_json_default_key -ORDER BY - "ID"; ----- -update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z - -# query II -# SELECT -# * -# FROM -# upsert_avro_json -# ORDER BY -# "ID"; -# ---- -# update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -# delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -# delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z -# delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z - - -query II -SELECT - "ID", "firstName", "lastName", "age", "height", "weight" -FROM - upsert_student_avro_json -ORDER BY - "ID"; ----- -1 Ethan Martinez 18 6.1 180 -2 Emily Jackson 19 5.4 110 -3 Noah Thompson 21 6.3 195 -4 Emma Brown 20 5.3 130 -5 Michael Williams 22 6.2 190 -6 Leah Davis 18 5.7 140 -9 Jacob Anderson 20 5.8 155 - -query I -select count(*) from debezium_non_compact; ----- -2 - -query I -select count(*) from debezium_compact; ----- -2 - -statement ok -DROP TABLE upsert_avro_json_default_key; - -# statement ok -# DROP TABLE upsert_avro_json; - - -statement ok -DROP TABLE upsert_student_avro_json; - -statement ok -DROP TABLE debezium_non_compact; - -statement ok -DROP TABLE debezium_compact; \ No newline at end of file diff --git a/e2e_test/source/basic/schema_registry.slt b/e2e_test/source/basic/schema_registry.slt index 650d493ef5003..76f867b2b1d0e 100644 --- a/e2e_test/source/basic/schema_registry.slt +++ b/e2e_test/source/basic/schema_registry.slt @@ -47,7 +47,9 @@ create table t1 () with ( ); statement ok -create table t1 () with ( +create table t1 (primary key(rw_key)) +INCLUDE KEY AS rw_key +with ( connector = 'kafka', topic = 'upsert_avro_json-topic-record', properties.bootstrap.server = 'message_queue:29092' diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 6a5d8e26541e4..932d813c87388 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -15,6 +15,27 @@ message Field { string name = 2; } +enum AdditionalColumnType { + UNSPECIFIED = 0; + KEY = 1; + TIMESTAMP = 2; + PARTITION = 3; + OFFSET = 4; + HEADER = 5; + FILENAME = 6; + NORMAL = 7; +} + +enum ColumnDescVersion { + COLUMN_DESC_VERSION_UNSPECIFIED = 0; + // Introduced in https://github.com/risingwavelabs/risingwave/pull/13707#discussion_r1429947537, + // in case DEFAULT_KEY_COLUMN_NAME changes + COLUMN_DESC_VERSION_PR_13707 = 1; + + // for test only + COLUMN_DESC_VERSION_MAX = 2147483647; +} + message ColumnDesc { data.DataType column_type = 1; int32 column_id = 2; @@ -40,6 +61,12 @@ message ColumnDesc { // This field is used to store the description set by the `comment on` clause. optional string description = 8; + + // This field is used to represent the connector-spec additional column type. + // UNSPECIFIED or unset for normal column. + AdditionalColumnType additional_column_type = 9; + + ColumnDescVersion version = 10; } message ColumnCatalog { diff --git a/scripts/source/test_data/kafka_1_partition_mv_topic.1 b/scripts/source/test_data/kafka_1_partition_mv_topic.1 index 08bd5e7820f74..a11f9eb3c1c5a 100644 --- a/scripts/source/test_data/kafka_1_partition_mv_topic.1 +++ b/scripts/source/test_data/kafka_1_partition_mv_topic.1 @@ -15,4 +15,6 @@ {"v1":7,"v2":"name0"} {"v1":0,"v2":"name9"} {"v1":3,"v2":"name2"} -[{"v1":7,"v2":"name5"},{"v1":1,"v2":"name7"},{"v1":3,"v2":"name9"}] +{"v1":7,"v2":"name5"} +{"v1":1,"v2":"name7"} +{"v1":3,"v2":"name9"} diff --git a/scripts/source/test_data/kafka_1_partition_topic.1 b/scripts/source/test_data/kafka_1_partition_topic.1 index 0334f4d057aa7..3fb1b528d5fa2 100644 --- a/scripts/source/test_data/kafka_1_partition_topic.1 +++ b/scripts/source/test_data/kafka_1_partition_topic.1 @@ -1,3 +1,4 @@ {"v1": 1, "v2": "1"} {"v1": 2, "v2": "22"} -[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}] \ No newline at end of file +{"v1": 3, "v2": "333"} +{"v1": 4, "v2": "4444"} \ No newline at end of file diff --git a/scripts/source/test_data/kafka_3_partition_topic.3 b/scripts/source/test_data/kafka_3_partition_topic.3 index 0334f4d057aa7..3fb1b528d5fa2 100644 --- a/scripts/source/test_data/kafka_3_partition_topic.3 +++ b/scripts/source/test_data/kafka_3_partition_topic.3 @@ -1,3 +1,4 @@ {"v1": 1, "v2": "1"} {"v1": 2, "v2": "22"} -[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}] \ No newline at end of file +{"v1": 3, "v2": "333"} +{"v1": 4, "v2": "4444"} \ No newline at end of file diff --git a/scripts/source/test_data/kafka_4_partition_topic.4 b/scripts/source/test_data/kafka_4_partition_topic.4 index 0334f4d057aa7..3fb1b528d5fa2 100644 --- a/scripts/source/test_data/kafka_4_partition_topic.4 +++ b/scripts/source/test_data/kafka_4_partition_topic.4 @@ -1,3 +1,4 @@ {"v1": 1, "v2": "1"} {"v1": 2, "v2": "22"} -[{"v1": 3, "v2": "333"},{"v1": 4, "v2": "4444"}] \ No newline at end of file +{"v1": 3, "v2": "333"} +{"v1": 4, "v2": "4444"} \ No newline at end of file diff --git a/scripts/source/test_data/kafka_4_partition_topic_with_100_message.4 b/scripts/source/test_data/kafka_4_partition_topic_with_100_message.4 index cc1ad14663da2..aae7a4687ca55 100644 --- a/scripts/source/test_data/kafka_4_partition_topic_with_100_message.4 +++ b/scripts/source/test_data/kafka_4_partition_topic_with_100_message.4 @@ -94,4 +94,7 @@ {"v1": 93, "v2": "QE53BJ", "v3": [93, 93, 93], "v4": {"v5": 93, "v6": 94}} {"v1": 94, "v2": "9Q7W89", "v3": [94, 94, 94], "v4": {"v5": 94, "v6": 95}} {"v1": 95, "v2": "VGDBS1", "v3": [95, 95, 95], "v4": {"v5": 95, "v6": 96}} -[{"v1": 96, "v2": "KK6WEX", "v3": [96, 96, 96], "v4": {"v5": 96, "v6": 97}},{"v1": 97, "v2": "XRTK3Y", "v3": [97, 97, 97], "v4": {"v5": 97, "v6": 98}},{"v1": 98, "v2": "ZQ2TCL", "v3": [98, 98, 98], "v4": {"v5": 98, "v6": 99}},{"v1": 99, "v2": "15UCX7", "v3": [99, 99, 99], "v4": {"v5": 99, "v6": 100}}] \ No newline at end of file +{"v1": 96, "v2": "KK6WEX", "v3": [96, 96, 96], "v4": {"v5": 96, "v6": 97}} +{"v1": 97, "v2": "XRTK3Y", "v3": [97, 97, 97], "v4": {"v5": 97, "v6": 98}} +{"v1": 98, "v2": "ZQ2TCL", "v3": [98, 98, 98], "v4": {"v5": 98, "v6": 99}} +{"v1": 99, "v2": "15UCX7", "v3": [99, 99, 99], "v4": {"v5": 99, "v6": 100}} \ No newline at end of file diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 68c1618073169..3780aa1f83eb8 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -17,7 +17,9 @@ use std::borrow::Cow; use itertools::Itertools; use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; -use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; +use risingwave_pb::plan_common::{ + AdditionalColumnType, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, +}; use super::row_id_column_desc; use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID}; @@ -101,6 +103,8 @@ pub struct ColumnDesc { pub type_name: String, pub generated_or_default_column: Option, pub description: Option, + pub additional_column_type: AdditionalColumnType, + pub version: ColumnDescVersion, } impl ColumnDesc { @@ -113,6 +117,8 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, } } @@ -125,6 +131,27 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, + } + } + + pub fn named_with_additional_column( + name: impl Into, + column_id: ColumnId, + data_type: DataType, + additional_column_type: AdditionalColumnType, + ) -> ColumnDesc { + ColumnDesc { + data_type, + column_id, + name: name.into(), + field_descs: vec![], + type_name: String::new(), + generated_or_default_column: None, + description: None, + additional_column_type, + version: ColumnDescVersion::Pr13707, } } @@ -143,6 +170,8 @@ impl ColumnDesc { type_name: self.type_name.clone(), generated_or_default_column: self.generated_or_default_column.clone(), description: self.description.clone(), + additional_column_type: self.additional_column_type as i32, + version: self.version as i32, } } @@ -169,6 +198,8 @@ impl ColumnDesc { type_name: "".to_string(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, } } @@ -190,6 +221,8 @@ impl ColumnDesc { type_name: type_name.to_string(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, } } @@ -206,6 +239,8 @@ impl ColumnDesc { type_name: field.type_name.clone(), description: None, generated_or_default_column: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, } } @@ -230,6 +265,8 @@ impl ColumnDesc { impl From for ColumnDesc { fn from(prost: PbColumnDesc) -> Self { + let additional_column_type = prost.additional_column_type(); + let version = prost.version(); let field_descs: Vec = prost .field_descs .into_iter() @@ -243,6 +280,8 @@ impl From for ColumnDesc { field_descs, generated_or_default_column: prost.generated_or_default_column, description: prost.description.clone(), + additional_column_type, + version, } } } @@ -263,6 +302,8 @@ impl From<&ColumnDesc> for PbColumnDesc { type_name: c.type_name.clone(), generated_or_default_column: c.generated_or_default_column.clone(), description: c.description.clone(), + additional_column_type: c.additional_column_type as i32, + version: c.version as i32, } } } diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 54ad7b9d2fd6c..eac1f3350bdb4 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -29,6 +29,7 @@ pub use internal_table::*; use parse_display::Display; pub use physical_table::*; use risingwave_pb::catalog::HandleConflictBehavior as PbHandleConflictBehavior; +use risingwave_pb::plan_common::ColumnDescVersion; pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema}; use thiserror_ext::AsReport; @@ -74,8 +75,16 @@ pub const RW_RESERVED_COLUMN_NAME_PREFIX: &str = "_rw_"; // When there is no primary key specified while creating source, will use the // the message key as primary key in `BYTEA` type with this name. +// Note: the field has version to track, please refer to `default_key_column_name_version_mapping` pub const DEFAULT_KEY_COLUMN_NAME: &str = "_rw_key"; +pub fn default_key_column_name_version_mapping(version: &ColumnDescVersion) -> &str { + match version { + ColumnDescVersion::Unspecified => DEFAULT_KEY_COLUMN_NAME, + _ => DEFAULT_KEY_COLUMN_NAME, + } +} + /// For kafka source, we attach a hidden column [`KAFKA_TIMESTAMP_COLUMN_NAME`] to it, so that we /// can limit the timestamp range when querying it directly with batch query. The column type is /// [`DataType::Timestamptz`]. For more details, please refer to diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index 6b524edb92430..34af52530912f 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType; -use risingwave_pb::plan_common::ColumnDesc; +use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion}; pub trait ColumnDescTestExt { /// Create a [`ColumnDesc`] with the given name and type. @@ -35,6 +35,8 @@ impl ColumnDescTestExt for ColumnDesc { column_type: Some(data_type), column_id, name: name.to_string(), + additional_column_type: AdditionalColumnType::Normal as i32, + version: ColumnDescVersion::Pr13707 as i32, ..Default::default() } } @@ -58,6 +60,8 @@ impl ColumnDescTestExt for ColumnDesc { field_descs: fields, generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal as i32, + version: ColumnDescVersion::Pr13707 as i32, } } } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs new file mode 100644 index 0000000000000..767a656b63f44 --- /dev/null +++ b/src/connector/src/parser/additional_columns.rs @@ -0,0 +1,161 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; +use risingwave_common::types::DataType; +use risingwave_pb::plan_common::AdditionalColumnType; + +use crate::source::{ + KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, +}; + +pub type CompatibleAdditionalColumnsFn = + Box ColumnCatalog + Send + Sync + 'static>; + +pub fn get_connector_compatible_additional_columns( + connector_name: &str, +) -> Option> { + let compatible_columns = match connector_name { + KAFKA_CONNECTOR => kafka_compatible_column_vec(), + PULSAR_CONNECTOR => pulsar_compatible_column_vec(), + KINESIS_CONNECTOR => kinesis_compatible_column_vec(), + OPENDAL_S3_CONNECTOR | S3_CONNECTOR => s3_compatible_column_column_vec(), + _ => return None, + }; + Some(compatible_columns) +} + +fn kafka_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { + vec![ + ( + "key", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Bytea, + AdditionalColumnType::Key, + ), + is_hidden: false, + } + }), + ), + ( + "timestamp", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Timestamptz, + AdditionalColumnType::Timestamp, + ), + is_hidden: false, + } + }), + ), + ( + "partition", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Int64, + AdditionalColumnType::Partition, + ), + is_hidden: false, + } + }), + ), + ( + "offset", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Int64, + AdditionalColumnType::Offset, + ), + is_hidden: false, + } + }), + ), + // Todo(tabVersion): add header column desc + // ( + // "header", + // Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + // ColumnCatalog { + // column_desc: ColumnDesc::named(name, id, DataType::List( + // + // )), + // is_hidden: false, + // } + // }), + // ), + ] +} + +fn pulsar_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { + vec![( + "key", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Bytea, + AdditionalColumnType::Key, + ), + is_hidden: false, + } + }), + )] +} + +fn kinesis_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { + vec![( + "key", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Bytea, + AdditionalColumnType::Key, + ), + is_hidden: false, + } + }), + )] +} + +fn s3_compatible_column_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { + vec![( + "file", + Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + name, + id, + DataType::Varchar, + AdditionalColumnType::Filename, + ), + is_hidden: false, + } + }), + )] +} diff --git a/src/connector/src/parser/avro/parser.rs b/src/connector/src/parser/avro/parser.rs index 10473a031e89c..9429ce157f1e8 100644 --- a/src/connector/src/parser/avro/parser.rs +++ b/src/connector/src/parser/avro/parser.rs @@ -305,6 +305,7 @@ mod test { let conf = new_avro_conf_from_local(file_name).await?; Ok(PlainParser { + key_builder: None, payload_builder: AccessBuilderImpl::Avro(AvroAccessBuilder::new( conf, EncodingType::Value, @@ -332,7 +333,10 @@ mod test { let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 1); { let writer = builder.row_writer(); - parser.parse_inner(input_data, writer).await.unwrap(); + parser + .parse_inner(None, Some(input_data), writer) + .await + .unwrap(); } let chunk = builder.finish(); let (op, row) = chunk.rows().next().unwrap(); diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index 46800e4fb2004..1e5873e051753 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -18,7 +18,7 @@ use apache_avro::schema::{DecimalSchema, RecordSchema, Schema}; use itertools::Itertools; use risingwave_common::log::LogSuppresser; use risingwave_common::types::{DataType, Decimal}; -use risingwave_pb::plan_common::ColumnDesc; +use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion}; pub fn avro_schema_to_column_descs(schema: &Schema) -> anyhow::Result> { if let Schema::Record(RecordSchema { fields, .. }) = schema { @@ -61,6 +61,8 @@ fn avro_field_to_column_desc( type_name: schema_name.to_string(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal as i32, + version: ColumnDescVersion::Pr13707 as i32, }) } _ => { @@ -69,6 +71,8 @@ fn avro_field_to_column_desc( column_type: Some(data_type.to_protobuf()), column_id: *index, name: name.to_owned(), + additional_column_type: AdditionalColumnType::Normal as i32, + version: ColumnDescVersion::Pr13707 as i32, ..Default::default() }) } diff --git a/src/connector/src/parser/bytes_parser.rs b/src/connector/src/parser/bytes_parser.rs index 2a0b2f1b90f2a..b80836af1de82 100644 --- a/src/connector/src/parser/bytes_parser.rs +++ b/src/connector/src/parser/bytes_parser.rs @@ -74,7 +74,10 @@ mod tests { for payload in get_payload() { let writer = builder.row_writer(); - parser.parse_inner(payload, writer).await.unwrap(); + parser + .parse_inner(None, Some(payload), writer) + .await + .unwrap(); } let chunk = builder.finish(); diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 9ee966456799b..3de1c45bb7615 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -501,6 +501,8 @@ mod tests { // postgres-specific data-type mapping tests mod test3_postgres { + use risingwave_pb::plan_common::AdditionalColumnType; + use super::*; use crate::source::SourceColumnType; @@ -564,6 +566,7 @@ mod tests { fields: vec![], column_type: SourceColumnType::Normal, is_pk: false, + additional_column_type: AdditionalColumnType::Normal, }, SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)), SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)), diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 9707459798ac5..481c9b570acbe 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -45,7 +45,11 @@ pub struct JsonAccessBuilder { impl AccessBuilder for JsonAccessBuilder { #[allow(clippy::unused_async)] async fn generate_accessor(&mut self, payload: Vec) -> Result> { - self.value = Some(payload); + if payload.is_empty() { + self.value = Some("{}".into()); + } else { + self.value = Some(payload); + } let value = simd_json::to_borrowed_value( &mut self.value.as_mut().unwrap()[self.payload_start_idx..], ) @@ -205,6 +209,7 @@ mod tests { use risingwave_common::row::Row; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; + use risingwave_pb::plan_common::AdditionalColumnType; use super::JsonParser; use crate::parser::upsert_parser::UpsertParser; @@ -212,6 +217,7 @@ mod tests { EncodingProperties, JsonProperties, ProtocolProperties, SourceColumnDesc, SourceStreamChunkBuilder, SpecificParserConfig, }; + use crate::source::SourceColumnType; fn get_payload() -> Vec> { vec![ @@ -573,9 +579,19 @@ mod tests { (r#"{"a":2}"#, r#""#), ] .to_vec(); + let key_column_desc = SourceColumnDesc { + name: "rw_key".into(), + data_type: DataType::Bytea, + column_id: 2.into(), + fields: vec![], + column_type: SourceColumnType::Normal, + is_pk: true, + additional_column_type: AdditionalColumnType::Key, + }; let descs = vec![ SourceColumnDesc::simple("a", DataType::Int32, 0.into()), SourceColumnDesc::simple("b", DataType::Int32, 1.into()), + key_column_desc, ]; let props = SpecificParserConfig { key_encoding_config: None, @@ -589,7 +605,7 @@ mod tests { .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); for item in items { - let test = parser + parser .parse_inner( Some(item.0.as_bytes().to_vec()), if !item.1.is_empty() { @@ -599,13 +615,21 @@ mod tests { }, builder.row_writer(), ) - .await; - println!("{:?}", test); + .await + .unwrap(); } let chunk = builder.finish(); - let mut rows = chunk.rows(); + // expected chunk + // +---+---+---+------------------+ + // | + | 1 | 2 | \x7b2261223a317d | + // | + | 1 | 3 | \x7b2261223a317d | + // | + | 2 | 2 | \x7b2261223a327d | + // | - | | | \x7b2261223a327d | + // +---+---+---+------------------+ + + let mut rows = chunk.rows(); { let (op, row) = rows.next().unwrap(); assert_eq!(op, Op::Insert); @@ -634,10 +658,7 @@ mod tests { { let (op, row) = rows.next().unwrap(); assert_eq!(op, Op::Delete); - assert_eq!( - row.datum_at(0).to_owned_datum(), - (Some(ScalarImpl::Int32(2))) - ); + assert_eq!(row.datum_at(0).to_owned_datum(), (None)); } } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 6848bc26c7f0e..448c98ec571ae 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::sync::LazyLock; +use anyhow::anyhow; use auto_enums::auto_enum; pub use avro::AvroParserConfig; pub use canal::*; @@ -35,6 +36,7 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, }; +use risingwave_pb::plan_common::AdditionalColumnType; use tracing_futures::Instrument; use self::avro::AvroAccessBuilder; @@ -47,6 +49,7 @@ use self::upsert_parser::UpsertParser; use self::util::get_kafka_topic; use crate::common::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; +use crate::parser::unified::AccessError; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ @@ -55,6 +58,7 @@ use crate::source::{ StreamChunkWithState, }; +pub mod additional_columns; mod avro; mod bytes_parser; mod canal; @@ -64,7 +68,7 @@ mod debezium; mod json_parser; mod maxwell; mod mysql; -mod plain_parser; +pub mod plain_parser; mod protobuf; mod unified; mod upsert_parser; @@ -312,37 +316,70 @@ impl SourceStreamChunkRowWriter<'_> { mut f: impl FnMut(&SourceColumnDesc) -> AccessResult, ) -> AccessResult<()> { let mut wrapped_f = |desc: &SourceColumnDesc| { - if let Some(meta_value) = - (self.row_meta.as_ref()).and_then(|row_meta| row_meta.value_for_column(desc)) - { - // For meta columns, fill in the meta data. - Ok(A::output_for(meta_value)) - } else { - // For normal columns, call the user provided closure. - match f(desc) { - Ok(output) => Ok(output), - - // Throw error for failed access to primary key columns. - Err(e) if desc.is_pk => Err(e), - // Ignore error for other columns and fill in `NULL` instead. - Err(error) => { - // TODO: figure out a way to fill in not-null default value if user specifies one - // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message) - // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, - // see #13105 - static LOG_SUPPERSSER: LazyLock = - LazyLock::new(LogSuppresser::default); - if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { - tracing::warn!( - %error, - split_id = self.row_meta.as_ref().map(|m| m.split_id), - offset = self.row_meta.as_ref().map(|m| m.offset), - column = desc.name, - suppressed_count, - "failed to parse non-pk column, padding with `NULL`" - ); + match (&desc.column_type, &desc.additional_column_type) { + (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => { + // SourceColumnType is for CDC source only. + Ok(A::output_for( + self.row_meta + .as_ref() + .and_then(|row_meta| row_meta.value_for_column(desc)) + .unwrap(), // handled all match cases in internal match, unwrap is safe + )) + } + (&SourceColumnType::Meta, _) + if matches!( + &self.row_meta.map(|ele| ele.meta), + &Some(SourceMeta::Kafka(_) | SourceMeta::DebeziumCdc(_)) + ) => + { + // SourceColumnType is for CDC source only. + return Ok(A::output_for( + self.row_meta + .as_ref() + .and_then(|row_meta| row_meta.value_for_column(desc)) + .unwrap(), // handled all match cases in internal match, unwrap is safe + )); + } + ( + _, + &AdditionalColumnType::Timestamp + | &AdditionalColumnType::Partition + | &AdditionalColumnType::Filename + | &AdditionalColumnType::Offset + | &AdditionalColumnType::Header, + // AdditionalColumnType::Unspecified and AdditionalColumnType::Normal is means it comes from message payload + // AdditionalColumnType::Key is processed in normal process, together with Unspecified ones + ) => Err(AccessError::Other(anyhow!( + "Column type {:?} not implemented yet", + &desc.additional_column_type + ))), + (_, _) => { + // For normal columns, call the user provided closure. + match f(desc) { + Ok(output) => Ok(output), + + // Throw error for failed access to primary key columns. + Err(e) if desc.is_pk => Err(e), + // Ignore error for other columns and fill in `NULL` instead. + Err(error) => { + // TODO: figure out a way to fill in not-null default value if user specifies one + // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message) + // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, + // see #13105 + static LOG_SUPPERSSER: LazyLock = + LazyLock::new(LogSuppresser::default); + if let Ok(suppressed_count) = LOG_SUPPERSSER.check() { + tracing::warn!( + %error, + split_id = self.row_meta.as_ref().map(|m| m.split_id), + offset = self.row_meta.as_ref().map(|m| m.offset), + column = desc.name, + suppressed_count, + "failed to parse non-pk column, padding with `NULL`" + ); + } + Ok(A::output_for(Datum::None)) } - Ok(A::output_for(Datum::None)) } } } @@ -754,9 +791,6 @@ impl ByteStreamSourceParserImpl { let protocol = &parser_config.specific.protocol_config; let encode = &parser_config.specific.encoding_config; match (protocol, encode) { - (ProtocolProperties::Plain, EncodingProperties::Json(_)) => { - JsonParser::new(parser_config.specific, rw_columns, source_ctx).map(Self::Json) - } (ProtocolProperties::Plain, EncodingProperties::Csv(config)) => { CsvParser::new(rw_columns, *config, source_ctx).map(Self::Csv) } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index defb7ef54a1e6..c02a373a082eb 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -13,19 +13,23 @@ // limitations under the License. use risingwave_common::error::ErrorCode::ProtocolError; -use risingwave_common::error::{ErrorCode, Result, RwError}; +use risingwave_common::error::{Result, RwError}; -use super::unified::util::apply_row_accessor_on_stream_chunk_writer; use super::{ AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType, SourceStreamChunkRowWriter, SpecificParserConfig, }; -use crate::only_parse_payload; -use crate::parser::ParserFormat; +use crate::parser::bytes_parser::BytesAccessBuilder; +use crate::parser::unified::upsert::UpsertChangeEvent; +use crate::parser::unified::util::apply_row_operation_on_stream_chunk_writer_with_op; +use crate::parser::unified::{AccessImpl, ChangeEventOperation}; +use crate::parser::upsert_parser::get_key_column_name; +use crate::parser::{BytesProperties, ParserFormat}; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; #[derive(Debug)] pub struct PlainParser { + pub key_builder: Option, pub payload_builder: AccessBuilderImpl, pub(crate) rw_columns: Vec, pub source_ctx: SourceContextRef, @@ -37,8 +41,19 @@ impl PlainParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> Result { + let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) { + Some(AccessBuilderImpl::Bytes(BytesAccessBuilder::new( + EncodingProperties::Bytes(BytesProperties { + column_name: Some(key_column_name), + }), + )?)) + } else { + None + }; + let payload_builder = match props.encoding_config { - EncodingProperties::Protobuf(_) + EncodingProperties::Json(_) + | EncodingProperties::Protobuf(_) | EncodingProperties::Avro(_) | EncodingProperties::Bytes(_) => { AccessBuilderImpl::new_default(props.encoding_config, EncodingType::Value).await? @@ -50,6 +65,7 @@ impl PlainParser { } }; Ok(Self { + key_builder, payload_builder, rw_columns, source_ctx, @@ -58,12 +74,26 @@ impl PlainParser { pub async fn parse_inner( &mut self, - payload: Vec, + key: Option>, + payload: Option>, mut writer: SourceStreamChunkRowWriter<'_>, ) -> Result<()> { - let accessor = self.payload_builder.generate_accessor(payload).await?; + // reuse upsert component but always insert + let mut row_op: UpsertChangeEvent, AccessImpl<'_, '_>> = + UpsertChangeEvent::default(); + let change_event_op = ChangeEventOperation::Upsert; + + if let Some(data) = key && let Some(key_builder) = self.key_builder.as_mut() { + // key is optional in format plain + row_op = row_op.with_key(key_builder.generate_accessor(data).await?); + } + if let Some(data) = payload { + // the data part also can be an empty vec + row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?); + } - apply_row_accessor_on_stream_chunk_writer(accessor, &mut writer).map_err(Into::into) + apply_row_operation_on_stream_chunk_writer_with_op(row_op, &mut writer, change_event_op) + .map_err(Into::into) } } @@ -82,10 +112,10 @@ impl ByteStreamSourceParser for PlainParser { async fn parse_one<'a>( &'a mut self, - _key: Option>, + key: Option>, payload: Option>, writer: SourceStreamChunkRowWriter<'a>, ) -> Result<()> { - only_parse_payload!(self, payload, writer) + self.parse_inner(key, payload, writer).await } } diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index bd661acf9861f..4ab09ae19d79e 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -25,7 +25,7 @@ use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::try_match_expand; use risingwave_common::types::{DataType, Datum, Decimal, JsonbVal, ScalarImpl, F32, F64}; -use risingwave_pb::plan_common::ColumnDesc; +use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion}; use super::schema_resolver::*; use crate::aws_utils::load_file_descriptor_from_s3; @@ -213,6 +213,8 @@ impl ProtobufParserConfig { type_name: m.full_name().to_string(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal as i32, + version: ColumnDescVersion::Pr13707 as i32, }) } else { *index += 1; @@ -220,6 +222,8 @@ impl ProtobufParserConfig { column_id: *index, name: field_descriptor.name().to_string(), column_type: Some(field_type.to_protobuf()), + additional_column_type: AdditionalColumnType::Normal as i32, + version: ColumnDescVersion::Pr13707 as i32, ..Default::default() }) } diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index e392e31e3644d..8011bc0a42361 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -16,6 +16,7 @@ use risingwave_common::types::{DataType, Datum, ScalarImpl}; use super::{Access, AccessError, ChangeEvent, ChangeEventOperation}; use crate::parser::TransactionControl; +use crate::source::SourceColumnDesc; pub struct DebeziumChangeEvent { value_accessor: Option, @@ -89,20 +90,16 @@ impl ChangeEvent for DebeziumChangeEvent where A: Access, { - fn access_field( - &self, - name: &str, - type_expected: &risingwave_common::types::DataType, - ) -> super::AccessResult { + fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { match self.op()? { ChangeEventOperation::Delete => { if let Some(va) = self.value_accessor.as_ref() { - va.access(&[BEFORE, name], Some(type_expected)) + va.access(&[BEFORE, &desc.name], Some(&desc.data_type)) } else { self.key_accessor .as_ref() .unwrap() - .access(&[name], Some(type_expected)) + .access(&[&desc.name], Some(&desc.data_type)) } } @@ -111,7 +108,7 @@ where .value_accessor .as_ref() .unwrap() - .access(&[AFTER, name], Some(type_expected)), + .access(&[AFTER, &desc.name], Some(&desc.data_type)), } } diff --git a/src/connector/src/parser/unified/maxwell.rs b/src/connector/src/parser/unified/maxwell.rs index 1ccb83353f03d..7303d082d916c 100644 --- a/src/connector/src/parser/unified/maxwell.rs +++ b/src/connector/src/parser/unified/maxwell.rs @@ -16,6 +16,7 @@ use risingwave_common::types::{DataType, ScalarImpl}; use super::{Access, ChangeEvent}; use crate::parser::unified::ChangeEventOperation; +use crate::source::SourceColumnDesc; pub const MAXWELL_INSERT_OP: &str = "insert"; pub const MAXWELL_UPDATE_OP: &str = "update"; @@ -48,8 +49,8 @@ where }) } - fn access_field(&self, name: &str, type_expected: &DataType) -> super::AccessResult { + fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { const DATA: &str = "data"; - self.0.access(&[DATA, name], Some(type_expected)) + self.0.access(&[DATA, &desc.name], Some(&desc.data_type)) } } diff --git a/src/connector/src/parser/unified/mod.rs b/src/connector/src/parser/unified/mod.rs index f99fd0d2769f8..8c8abcdd0f30c 100644 --- a/src/connector/src/parser/unified/mod.rs +++ b/src/connector/src/parser/unified/mod.rs @@ -22,6 +22,7 @@ use self::avro::AvroAccess; use self::bytes::BytesAccess; use self::json::JsonAccess; use self::protobuf::ProtobufAccess; +use crate::source::SourceColumnDesc; pub mod avro; pub mod bytes; @@ -69,7 +70,7 @@ pub trait ChangeEvent { /// Access the operation type. fn op(&self) -> std::result::Result; /// Access the field after the operation. - fn access_field(&self, name: &str, type_expected: &DataType) -> AccessResult; + fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult; } impl ChangeEvent for (ChangeEventOperation, A) @@ -80,8 +81,8 @@ where Ok(self.0) } - fn access_field(&self, name: &str, type_expected: &DataType) -> AccessResult { - self.1.access(&[name], Some(type_expected)) + fn access_field(&self, desc: &SourceColumnDesc) -> AccessResult { + self.1.access(&[desc.name.as_str()], Some(&desc.data_type)) } } diff --git a/src/connector/src/parser/unified/upsert.rs b/src/connector/src/parser/unified/upsert.rs index 2697d4bdf8151..20de7fed43b66 100644 --- a/src/connector/src/parser/unified/upsert.rs +++ b/src/connector/src/parser/unified/upsert.rs @@ -13,16 +13,18 @@ // limitations under the License. use risingwave_common::types::DataType; +use risingwave_pb::plan_common::AdditionalColumnType; use super::{Access, ChangeEvent, ChangeEventOperation}; use crate::parser::unified::AccessError; +use crate::source::SourceColumnDesc; /// `UpsertAccess` wraps a key-value message format into an upsert source. /// A key accessor and a value accessor are required. pub struct UpsertChangeEvent { key_accessor: Option, value_accessor: Option, - key_as_column_name: Option, + key_column_name: Option, } impl Default for UpsertChangeEvent { @@ -30,7 +32,7 @@ impl Default for UpsertChangeEvent { Self { key_accessor: None, value_accessor: None, - key_as_column_name: None, + key_column_name: None, } } } @@ -52,8 +54,8 @@ impl UpsertChangeEvent { self } - pub fn with_key_as_column_name(mut self, name: impl ToString) -> Self { - self.key_as_column_name = Some(name.to_string()); + pub fn with_key_column_name(mut self, name: impl ToString) -> Self { + self.key_column_name = Some(name.to_string()); self } } @@ -102,24 +104,21 @@ where } } - fn access_field(&self, name: &str, type_expected: &DataType) -> super::AccessResult { - // access value firstly - match self.access(&["value", name], Some(type_expected)) { - Err(AccessError::Undefined { .. }) => (), // fallthrough - other => return other, - }; - - match self.access(&["key", name], Some(type_expected)) { - Err(AccessError::Undefined { .. }) => (), // fallthrough - other => return other, - }; - - if let Some(key_as_column_name) = &self.key_as_column_name - && name == key_as_column_name - { - return self.access(&["key"], Some(type_expected)); + fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { + match desc.additional_column_type { + AdditionalColumnType::Key => { + if let Some(key_as_column_name) = &self.key_column_name + && &desc.name == key_as_column_name + { + self.access(&["key"], Some(&desc.data_type)) + } else { + self.access(&["key", &desc.name], Some(&desc.data_type)) + } + } + AdditionalColumnType::Unspecified | AdditionalColumnType::Normal => { + self.access(&["value", &desc.name], Some(&desc.data_type)) + } + _ => unreachable!(), } - - Ok(None) } } diff --git a/src/connector/src/parser/unified/util.rs b/src/connector/src/parser/unified/util.rs index 92cf5da3ac81c..c25892835df70 100644 --- a/src/connector/src/parser/unified/util.rs +++ b/src/connector/src/parser/unified/util.rs @@ -24,7 +24,7 @@ pub fn apply_row_operation_on_stream_chunk_writer_with_op( writer: &mut SourceStreamChunkRowWriter<'_>, op: ChangeEventOperation, ) -> AccessResult<()> { - let f = |column: &SourceColumnDesc| row_op.access_field(&column.name, &column.data_type); + let f = |column: &SourceColumnDesc| row_op.access_field(column); match op { ChangeEventOperation::Upsert => writer.insert(f), ChangeEventOperation::Delete => writer.delete(f), diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index 71210b9e4b8f8..10027bc10bbb6 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::DEFAULT_KEY_COLUMN_NAME; use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; +use risingwave_pb::plan_common::AdditionalColumnType; use super::bytes_parser::BytesAccessBuilder; use super::unified::upsert::UpsertChangeEvent; @@ -24,7 +24,6 @@ use super::{ AccessBuilderImpl, ByteStreamSourceParser, BytesProperties, EncodingProperties, EncodingType, SourceStreamChunkRowWriter, SpecificParserConfig, }; -use crate::extract_key_config; use crate::parser::ParserFormat; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef}; @@ -52,13 +51,14 @@ async fn build_accessor_builder( } } -fn check_rw_default_key(columns: &Vec) -> bool { - for col in columns { - if col.name.starts_with(DEFAULT_KEY_COLUMN_NAME) { - return true; +pub fn get_key_column_name(columns: &[SourceColumnDesc]) -> Option { + columns.iter().find_map(|column| { + if column.additional_column_type == AdditionalColumnType::Key { + Some(column.name.clone()) + } else { + None } - } - false + }) } impl UpsertParser { @@ -67,17 +67,18 @@ impl UpsertParser { rw_columns: Vec, source_ctx: SourceContextRef, ) -> Result { - // check whether columns has `DEFAULT_KEY_COLUMN_NAME`, if so, the key accessor should be + // check whether columns has Key as AdditionalColumnType, if so, the key accessor should be // bytes - let key_builder = if check_rw_default_key(&rw_columns) { + let key_builder = if let Some(key_column_name) = get_key_column_name(&rw_columns) { + // later: if key column has other type other than bytes, build other accessor. + // For now, all key columns are bytes AccessBuilderImpl::Bytes(BytesAccessBuilder::new(EncodingProperties::Bytes( BytesProperties { - column_name: Some(DEFAULT_KEY_COLUMN_NAME.into()), + column_name: Some(key_column_name), }, ))?) } else { - let (key_config, key_type) = extract_key_config!(props); - build_accessor_builder(key_config, key_type).await? + unreachable!("format upsert must have key column") }; let payload_builder = build_accessor_builder(props.encoding_config, EncodingType::Value).await?; diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index c766f05099810..e818bca9b32c8 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -19,6 +19,7 @@ use risingwave_common::catalog::{ TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType; +use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDescVersion}; /// `SourceColumnDesc` is used to describe a column in the Source and is used as the column /// counterpart in `StreamScan` @@ -32,6 +33,11 @@ pub struct SourceColumnDesc { // `is_pk` is used to indicate whether the column is part of the primary key columns. pub is_pk: bool, + + // `additional_column_type` and `column_type` are orthogonal + // `additional_column_type` is used to indicate the column is from which part of the message + // `column_type` is used to indicate the type of the column, only used in cdc scenario + pub additional_column_type: AdditionalColumnType, } /// `SourceColumnType` is used to indicate the type of a column emitted by the Source. @@ -81,6 +87,7 @@ impl SourceColumnDesc { fields: vec![], column_type: SourceColumnType::Normal, is_pk: false, + additional_column_type: AdditionalColumnType::Normal, } } @@ -112,6 +119,7 @@ impl From<&ColumnDesc> for SourceColumnDesc { fields: c.field_descs.clone(), column_type, is_pk: false, + additional_column_type: c.additional_column_type, } } } @@ -126,6 +134,8 @@ impl From<&SourceColumnDesc> for ColumnDesc { type_name: "".to_string(), generated_or_default_column: None, description: None, + additional_column_type: s.additional_column_type, + version: ColumnDescVersion::Pr13707, } } } diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index ca62c76789873..9273754797ea6 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -39,6 +39,9 @@ pub mod test_source; pub use manager::{SourceColumnDesc, SourceColumnType}; pub use mock_external_table::MockExternalTableReader; +pub use crate::parser::additional_columns::{ + get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn, +}; pub use crate::source::filesystem::opendal_source::{GCS_CONNECTOR, OPENDAL_S3_CONNECTOR}; pub use crate::source::filesystem::S3_CONNECTOR; pub use crate::source::nexmark::NEXMARK_CONNECTOR; diff --git a/src/frontend/planner_test/src/lib.rs b/src/frontend/planner_test/src/lib.rs index 341d6c697c725..8e9f953f0ab7b 100644 --- a/src/frontend/planner_test/src/lib.rs +++ b/src/frontend/planner_test/src/lib.rs @@ -424,6 +424,7 @@ impl TestCase { source_watermarks, append_only, cdc_table_info, + include_column_options, .. } => { let source_schema = source_schema.map(|schema| schema.into_v2_with_warning()); @@ -438,6 +439,7 @@ impl TestCase { source_watermarks, append_only, cdc_table_info, + include_column_options, ) .await?; } diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index 93df1a8a14ea4..2dfe1abbb26b3 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -18,6 +18,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::zip_eq_fast; use risingwave_common::{bail_not_implemented, not_implemented}; +use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDescVersion}; use risingwave_sqlparser::ast::{ Array, BinaryOperator, DataType as AstDataType, Expr, Function, JsonPredicateType, ObjectName, Query, StructField, TrimWhereField, UnaryOperator, @@ -575,6 +576,8 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { type_name: "".to_string(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, }) } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index 63fd9e5496919..9622ea3cfcedd 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -588,7 +588,9 @@ mod tests { use risingwave_common::types::*; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; - use risingwave_pb::plan_common::{PbColumnCatalog, PbColumnDesc}; + use risingwave_pb::plan_common::{ + AdditionalColumnType, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, + }; use super::*; use crate::catalog::table_catalog::{TableCatalog, TableType}; @@ -681,6 +683,8 @@ mod tests { type_name: ".test.Country".to_string(), description: None, generated_or_default_column: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, }, is_hidden: false } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index a04627cf2b5ad..8a9ef0edfbc38 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -21,8 +21,8 @@ use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::catalog::{ - is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId, DEFAULT_KEY_COLUMN_NAME, - INITIAL_SOURCE_VERSION_ID, KAFKA_TIMESTAMP_COLUMN_NAME, + is_column_ids_dedup, ColumnCatalog, ColumnDesc, TableId, INITIAL_SOURCE_VERSION_ID, + KAFKA_TIMESTAMP_COLUMN_NAME, }; use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, ProtocolError}; use risingwave_common::error::{Result, RwError}; @@ -43,17 +43,18 @@ use risingwave_connector::source::external::CdcTableType; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ - GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, - NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, + get_connector_compatible_additional_columns, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, + KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, + PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, }; -use risingwave_pb::plan_common::{EncodeType, FormatType}; +use risingwave_pb::plan_common::{AdditionalColumnType, EncodeType, FormatType}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ get_delimiter, AstString, AvroSchema, ColumnDef, ConnectorSchema, CreateSourceStatement, - DebeziumAvroSchema, Encode, Format, ProtobufSchema, SourceWatermark, + DebeziumAvroSchema, Encode, Format, Ident, ProtobufSchema, SourceWatermark, }; use super::RwPgResponse; @@ -147,37 +148,6 @@ async fn extract_avro_table_schema( .collect_vec()) } -/// Extract Avro primary key columns. -async fn extract_upsert_avro_table_pk_columns( - info: &StreamSourceInfo, - with_properties: &HashMap, -) -> Result>> { - let parser_config = SpecificParserConfig::new(info, with_properties)?; - let conf = AvroParserConfig::new(parser_config.encoding_config).await?; - let vec_column_desc = conf.map_to_columns()?; - - conf.extract_pks() - .ok() - .map(|pk_desc| { - pk_desc - .into_iter() - .map(|desc| { - vec_column_desc - .iter() - .find(|x| x.name == desc.name) - .ok_or_else(|| { - RwError::from(ErrorCode::InternalError(format!( - "Can not found primary key column {} in value schema", - desc.name - ))) - }) - }) - .map_ok(|desc| desc.name.clone()) - .collect::>>() - }) - .transpose() -} - async fn extract_debezium_avro_table_pk_columns( info: &StreamSourceInfo, with_properties: &HashMap, @@ -696,6 +666,65 @@ pub(crate) async fn bind_columns_from_source( Ok(res) } +/// add connector-spec columns to the end of column catalog +pub fn handle_addition_columns( + with_properties: &HashMap, + mut additional_columns: Vec<(Ident, Option)>, + columns: &mut Vec, +) -> Result<()> { + let connector_name = get_connector(with_properties).unwrap(); // there must be a connector in source + + let addition_col_list = + match get_connector_compatible_additional_columns(connector_name.as_str()) { + Some(cols) => cols, + // early return if there are no accepted additional columns for the connector + None => { + return if additional_columns.is_empty() { + Ok(()) + } else { + Err(RwError::from(ProtocolError(format!( + "Connector {} accepts no additional column but got {:?}", + connector_name, additional_columns + )))) + } + } + }; + let gen_default_column_name = |connector_name: &str, addi_column_name: &str| { + format!("_rw_{}_{}", connector_name, addi_column_name) + }; + + let latest_col_id: ColumnId = columns + .iter() + .map(|col| col.column_desc.column_id) + .max() + .unwrap(); // there must be at least one column in the column catalog + + for (col_name, gen_column_catalog_fn) in addition_col_list { + // always insert in spec order + if let Some(idx) = additional_columns + .iter() + .position(|(col, _)| col.real_value().eq_ignore_ascii_case(col_name)) + { + let (_, alias) = additional_columns.remove(idx); + columns.push(gen_column_catalog_fn( + latest_col_id.next(), + alias + .map(|alias| alias.real_value()) + .unwrap_or_else(|| gen_default_column_name(connector_name.as_str(), col_name)) + .as_str(), + )) + } + } + if !additional_columns.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "Unknown additional columns {:?}", + additional_columns + )))); + } + + Ok(()) +} + /// Bind columns from both source and sql defined. pub(crate) fn bind_all_columns( source_schema: &ConnectorSchema, @@ -797,42 +826,77 @@ pub(crate) fn bind_all_columns( pub(crate) async fn bind_source_pk( source_schema: &ConnectorSchema, source_info: &StreamSourceInfo, - columns: &mut Vec, + columns: &mut [ColumnCatalog], sql_defined_pk_names: Vec, with_properties: &HashMap, ) -> Result> { let sql_defined_pk = !sql_defined_pk_names.is_empty(); + let key_column_name: Option = { + // iter columns to check if contains additional columns from key part + // return the key column names if exists + columns.iter().find_map(|catalog| { + if catalog.column_desc.additional_column_type == AdditionalColumnType::Key { + Some(catalog.name().to_string()) + } else { + None + } + }) + }; + let additional_column_names = columns + .iter() + .filter_map(|col| { + if (col.column_desc.additional_column_type != AdditionalColumnType::Unspecified) + && (col.column_desc.additional_column_type != AdditionalColumnType::Normal) + { + Some(col.name().to_string()) + } else { + None + } + }) + .collect_vec(); let res = match (&source_schema.format, &source_schema.row_encode) { (Format::Native, Encode::Native) | (Format::Plain, _) => sql_defined_pk_names, - (Format::Upsert, Encode::Json) => { - if sql_defined_pk { - sql_defined_pk_names - } else { - add_default_key_column(columns); - vec![DEFAULT_KEY_COLUMN_NAME.into()] - } - } - (Format::Upsert, Encode::Avro) => { - if sql_defined_pk { + + // For all Upsert formats, we only accept one and only key column as primary key. + // Additional KEY columns must be set in this case and must be primary key. + (Format::Upsert, encode @ Encode::Json | encode @ Encode::Avro) => { + if let Some(ref key_column_name) = key_column_name && sql_defined_pk { if sql_defined_pk_names.len() != 1 { return Err(RwError::from(ProtocolError( - "upsert avro supports only one primary key column.".to_string(), + format!("upsert {:?} supports only one primary key column ({}).", encode, key_column_name) ))); } + // the column name have been converted to real value in `handle_addition_columns` + // so we don't ignore ascii case here + if !key_column_name.eq(sql_defined_pk_names[0].as_str()) { + return Err(RwError::from(ProtocolError(format!( + "upsert {}'s key column {} not match with sql defined primary key {}", encode, + key_column_name, sql_defined_pk_names[0] + )))); + } sql_defined_pk_names - } else if let Some(extracted_pk_names) = - extract_upsert_avro_table_pk_columns(source_info, with_properties).await? - { - extracted_pk_names } else { - // For upsert avro, if we can't extract pk from schema, use message key as primary key - add_default_key_column(columns); - vec![DEFAULT_KEY_COLUMN_NAME.into()] + return if key_column_name.is_none() { + Err( + RwError::from(ProtocolError(format!("INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE {:?}", encode) + )) + ) + } else { + Err(RwError::from(ProtocolError(format!( + "Primary key must be specified to {} when creating source with FORMAT UPSERT ENCODE {:?}", + key_column_name.unwrap(), encode)))) + } } } (Format::Debezium, Encode::Json) => { + if !additional_column_names.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "FORMAT DEBEZIUM forbids additional columns, but got {:?}", + additional_column_names + )))); + } if !sql_defined_pk { return Err(RwError::from(ProtocolError( "Primary key must be specified when creating source with FORMAT DEBEZIUM." @@ -842,6 +906,12 @@ pub(crate) async fn bind_source_pk( sql_defined_pk_names } (Format::Debezium, Encode::Avro) => { + if !additional_column_names.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "FORMAT DEBEZIUM forbids additional columns, but got {:?}", + additional_column_names + )))); + } if sql_defined_pk { sql_defined_pk_names } else { @@ -863,6 +933,12 @@ pub(crate) async fn bind_source_pk( } } (Format::DebeziumMongo, Encode::Json) => { + if !additional_column_names.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "FORMAT DEBEZIUMMONGO forbids additional columns, but got {:?}", + additional_column_names + )))); + } if sql_defined_pk { sql_defined_pk_names } else { @@ -871,6 +947,12 @@ pub(crate) async fn bind_source_pk( } (Format::Maxwell, Encode::Json) => { + if !additional_column_names.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "FORMAT MAXWELL forbids additional columns, but got {:?}", + additional_column_names + )))); + } if !sql_defined_pk { return Err(RwError::from(ProtocolError( "Primary key must be specified when creating source with FORMAT MAXWELL ENCODE JSON." @@ -881,6 +963,12 @@ pub(crate) async fn bind_source_pk( } (Format::Canal, Encode::Json) => { + if !additional_column_names.is_empty() { + return Err(RwError::from(ProtocolError(format!( + "FORMAT CANAL forbids additional columns, but got {:?}", + additional_column_names + )))); + } if !sql_defined_pk { return Err(RwError::from(ProtocolError( "Primary key must be specified when creating source with FORMAT CANAL ENCODE JSON." @@ -918,18 +1006,6 @@ fn check_and_add_timestamp_column( } } -fn add_default_key_column(columns: &mut Vec) { - let column = ColumnCatalog { - column_desc: ColumnDesc::named( - DEFAULT_KEY_COLUMN_NAME, - (columns.len() as i32).into(), - DataType::Bytea, - ), - is_hidden: false, - }; - columns.push(column); -} - pub(super) fn bind_source_watermark( session: &SessionImpl, name: String, @@ -1236,6 +1312,8 @@ pub async fn handle_create_source( columns_from_sql, &stmt.columns, )?; + // add additional columns before bind pk, because `format upsert` requires the key column + handle_addition_columns(&with_properties, stmt.include_column_options, &mut columns)?; let pk_names = bind_source_pk( &source_schema, &source_info, @@ -1356,6 +1434,7 @@ pub async fn handle_create_source( #[cfg(test)] pub mod tests { use std::collections::HashMap; + use std::sync::Arc; use risingwave_common::catalog::{ CDC_SOURCE_COLUMN_NUM, DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, OFFSET_COLUMN_NAME, @@ -1364,9 +1443,19 @@ pub mod tests { use risingwave_common::types::DataType; use crate::catalog::root_catalog::SchemaPath; + use crate::catalog::source_catalog::SourceCatalog; use crate::handler::create_source::debezium_cdc_source_schema; use crate::test_utils::{create_proto_file, LocalFrontend, PROTO_FILE_DATA}; + const GET_COLUMN_FROM_CATALOG: fn(&Arc) -> HashMap<&str, DataType> = + |catalog: &Arc| -> HashMap<&str, DataType> { + catalog + .columns + .iter() + .map(|col| (col.name(), col.data_type().clone())) + .collect::>() + }; + #[tokio::test] async fn test_create_source_handler() { let proto_file = create_proto_file(PROTO_FILE_DATA); @@ -1389,11 +1478,7 @@ pub mod tests { .unwrap(); assert_eq!(source.name, "t"); - let columns = source - .columns - .iter() - .map(|col| (col.name(), col.data_type().clone())) - .collect::>(); + let columns = GET_COLUMN_FROM_CATALOG(source); let city_type = DataType::new_struct( vec![DataType::Varchar, DataType::Varchar], @@ -1524,4 +1609,57 @@ pub mod tests { // make sure it doesn't broken by future PRs assert_eq!(CDC_SOURCE_COLUMN_NUM, columns.len() as u32); } + + #[tokio::test] + async fn test_source_addition_columns() { + // test derive include column for format plain + let sql = + "CREATE SOURCE s (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format plain encode json" + .to_string(); + let frontend = LocalFrontend::new(Default::default()).await; + frontend.run_sql(sql).await.unwrap(); + let session = frontend.session_ref(); + let catalog_reader = session.env().catalog_reader().read_guard(); + let (source, _) = catalog_reader + .get_source_by_name( + DEFAULT_DATABASE_NAME, + SchemaPath::Name(DEFAULT_SCHEMA_NAME), + "s", + ) + .unwrap(); + assert_eq!(source.name, "s"); + + let columns = GET_COLUMN_FROM_CATALOG(source); + let expect_columns = maplit::hashmap! { + ROWID_PREFIX => DataType::Serial, + "v1" => DataType::Int32, + "_rw_kafka_key" => DataType::Bytea, + // todo: kafka connector will automatically derive the column + // will change to a required field in the include clause + "_rw_kafka_timestamp" => DataType::Timestamptz, + }; + assert_eq!(columns, expect_columns); + + // test derive include column for format upsert + let sql = "CREATE SOURCE s1 (v1 int) with (connector = 'kafka') format upsert encode json" + .to_string(); + match frontend.run_sql(sql).await { + Err(e) => { + assert_eq!( + e.to_string(), + "Protocol error: INCLUDE KEY clause must be set for FORMAT UPSERT ENCODE Json" + ) + } + _ => unreachable!(), + } + + let sql = "CREATE SOURCE s2 (v1 int) include key as _rw_kafka_key with (connector = 'kafka') format upsert encode json" + .to_string(); + match frontend.run_sql(sql).await { + Err(e) => { + assert_eq!(e.to_string(), "Protocol error: Primary key must be specified to _rw_kafka_key when creating source with FORMAT UPSERT ENCODE Json") + } + _ => unreachable!(), + } + } } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index c59292b008a21..f7843310e482d 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -35,11 +35,13 @@ use risingwave_pb::catalog::source::OptionalAssociatedTableId; use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, WatermarkDesc}; use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; -use risingwave_pb::plan_common::{DefaultColumnDesc, GeneratedColumnDesc}; +use risingwave_pb::plan_common::{ + AdditionalColumnType, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc, +}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ - CdcTableInfo, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, + CdcTableInfo, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, Ident, ObjectName, SourceWatermark, TableConstraint, }; @@ -52,7 +54,7 @@ use crate::catalog::{check_valid_column_name, ColumnId}; use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; use crate::handler::create_source::{ bind_all_columns, bind_columns_from_source, bind_source_pk, bind_source_watermark, - check_source_schema, validate_compatibility, UPSTREAM_SOURCE_KEY, + check_source_schema, handle_addition_columns, validate_compatibility, UPSTREAM_SOURCE_KEY, }; use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{LogicalCdcScan, LogicalSource}; @@ -206,6 +208,8 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result> type_name: "".to_string(), generated_or_default_column: None, description: None, + additional_column_type: AdditionalColumnType::Normal, + version: ColumnDescVersion::Pr13707, }, is_hidden: false, }); @@ -451,6 +455,7 @@ pub(crate) async fn gen_create_table_plan_with_source( source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, append_only: bool, + include_column_options: Vec<(Ident, Option)>, ) -> Result<(PlanRef, Option, PbTable)> { if append_only && source_schema.format != Format::Plain @@ -486,6 +491,9 @@ pub(crate) async fn gen_create_table_plan_with_source( columns_from_sql, &column_defs, )?; + + // add additional columns before bind pk, because `format upsert` requires the key column + handle_addition_columns(&with_properties, include_column_options, &mut columns)?; let pk_names = bind_source_pk( &source_schema, &source_info, @@ -893,8 +901,13 @@ pub(super) async fn handle_create_table_plan( constraints: Vec, source_watermarks: Vec, append_only: bool, + include_column_options: Vec<(Ident, Option)>, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { - let source_schema = check_create_table_with_source(context.with_options(), source_schema)?; + let source_schema = check_create_table_with_source( + context.with_options(), + source_schema, + &include_column_options, + )?; let ((plan, source, table), job_type) = match (source_schema, cdc_table_info.as_ref()) { @@ -908,6 +921,7 @@ pub(super) async fn handle_create_table_plan( source_watermarks, col_id_gen, append_only, + include_column_options, ) .await?, TableJobType::General, @@ -959,6 +973,7 @@ pub async fn handle_create_table( source_watermarks: Vec, append_only: bool, cdc_table_info: Option, + include_column_options: Vec<(Ident, Option)>, ) -> Result { let session = handler_args.session.clone(); @@ -987,6 +1002,7 @@ pub async fn handle_create_table( constraints, source_watermarks, append_only, + include_column_options, ) .await?; @@ -1018,8 +1034,16 @@ pub async fn handle_create_table( pub fn check_create_table_with_source( with_options: &WithOptions, source_schema: Option, + include_column_options: &[(Ident, Option)], ) -> Result> { - if with_options.inner().contains_key(UPSTREAM_SOURCE_KEY) { + let defined_source = with_options.inner().contains_key(UPSTREAM_SOURCE_KEY); + if !include_column_options.is_empty() && !defined_source { + return Err(ErrorCode::InvalidInputSyntax( + "INCLUDE should be used with a connector".to_owned(), + ) + .into()); + } + if defined_source { source_schema.as_ref().ok_or_else(|| { ErrorCode::InvalidInputSyntax("Please specify a source schema using FORMAT".to_owned()) })?; @@ -1054,6 +1078,7 @@ pub async fn generate_stream_graph_for_table( source_watermarks, col_id_gen, append_only, + vec![], ) .await? } diff --git a/src/frontend/src/handler/explain.rs b/src/frontend/src/handler/explain.rs index 8e2dc051379f9..342b0602a68ae 100644 --- a/src/frontend/src/handler/explain.rs +++ b/src/frontend/src/handler/explain.rs @@ -63,6 +63,7 @@ async fn do_handle_explain( source_watermarks, append_only, cdc_table_info, + include_column_options, .. } => { let col_id_gen = ColumnIdGenerator::new_initial(); @@ -79,6 +80,7 @@ async fn do_handle_explain( constraints, source_watermarks, append_only, + include_column_options, ) .await?; let context = plan.ctx(); diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 08826dece7325..25aec90be772a 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -229,6 +229,7 @@ pub async fn handle( source_watermarks, append_only, cdc_table_info, + include_column_options, } => { if or_replace { bail_not_implemented!("CREATE OR REPLACE TABLE"); @@ -258,6 +259,7 @@ pub async fn handle( source_watermarks, append_only, cdc_table_info, + include_column_options, ) .await } diff --git a/src/source/benches/json_parser.rs b/src/source/benches/json_parser.rs index e54a51befa9f1..961dd592cade1 100644 --- a/src/source/benches/json_parser.rs +++ b/src/source/benches/json_parser.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use futures::executor::block_on; use paste::paste; @@ -19,8 +21,11 @@ use rand::distributions::Alphanumeric; use rand::prelude::*; use risingwave_common::catalog::ColumnId; use risingwave_common::types::{DataType, Date, Timestamp}; -use risingwave_connector::parser::{DebeziumParser, JsonParser, SourceStreamChunkBuilder}; -use risingwave_connector::source::SourceColumnDesc; +use risingwave_connector::parser::plain_parser::PlainParser; +use risingwave_connector::parser::{ + DebeziumParser, SourceStreamChunkBuilder, SpecificParserConfig, +}; +use risingwave_connector::source::{SourceColumnDesc, SourceContext}; macro_rules! create_debezium_bench_helpers { ($op:ident, $op_sym:expr, $bench_function:expr) => { @@ -122,21 +127,31 @@ fn get_descs() -> Vec { fn bench_json_parser(c: &mut Criterion) { let descs = get_descs(); - let parser = JsonParser::new_for_test(descs.clone()).unwrap(); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); let records = generate_json_rows(); + let ctx = Arc::new(SourceContext::default()); c.bench_function("json_parser", |b| { b.to_async(&rt).iter_batched( || records.clone(), |records| async { + let mut parser = rt + .block_on(PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + descs.clone(), + ctx.clone(), + )) + .unwrap(); let mut builder = SourceStreamChunkBuilder::with_capacity(descs.clone(), NUM_RECORDS); for record in records { let writer = builder.row_writer(); - parser.parse_inner(record, writer).await.unwrap(); + parser + .parse_inner(None, Some(record), writer) + .await + .unwrap(); } }, BatchSize::SmallInput, diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 5ddad8ae373b1..d6063b8d4872d 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1104,6 +1104,8 @@ pub enum Statement { query: Option>, /// `FROM cdc_source TABLE database_name.table_name` cdc_table_info: Option, + /// `INCLUDE a AS b INCLUDE c` + include_column_options: Vec<(Ident, Option)>, }, /// CREATE INDEX CreateIndex { @@ -1589,6 +1591,7 @@ impl fmt::Display for Statement { append_only, query, cdc_table_info, + include_column_options, } => { // We want to allow the following options // Empty column list, allowed by PostgreSQL: @@ -1614,6 +1617,17 @@ impl fmt::Display for Statement { if *append_only { write!(f, " APPEND ONLY")?; } + if !include_column_options.is_empty() { // (Ident, Option) + write!(f, " INCLUDE {}", display_comma_separated( + include_column_options.iter().map(|(a, b)| { + if let Some(b) = b { + format!("{} AS {}", a, b) + } else { + a.to_string() + } + }).collect_vec().as_slice() + ))?; + } if !with_options.is_empty() { write!(f, " WITH ({})", display_comma_separated(with_options))?; } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index f50a6a1c45450..9c095e76f62db 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -85,6 +85,7 @@ pub struct CreateSourceStatement { pub with_properties: WithProperties, pub source_schema: CompatibleSourceSchema, pub source_watermarks: Vec, + pub include_column_options: Vec<(Ident, Option)>, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -325,6 +326,7 @@ impl ParseTo for CreateSourceStatement { // parse columns let (columns, constraints, source_watermarks) = p.parse_columns_with_watermark()?; + let include_options = p.parse_include_options()?; let with_options = p.parse_with_properties()?; let option = with_options @@ -346,6 +348,7 @@ impl ParseTo for CreateSourceStatement { with_properties: WithProperties(with_options), source_schema, source_watermarks, + include_column_options: include_options, }) } } diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index fdd01b7b29433..5d67bbb4a1a92 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2442,6 +2442,7 @@ impl Parser { } else { false }; + let include_options = self.parse_include_options()?; // PostgreSQL supports `WITH ( options )`, before `AS` let with_options = self.parse_with_properties()?; @@ -2494,9 +2495,24 @@ impl Parser { append_only, query, cdc_table_info, + include_column_options: include_options, }) } + pub fn parse_include_options(&mut self) -> Result)>, ParserError> { + let mut options = vec![]; + while self.parse_keyword(Keyword::INCLUDE) { + let add_column = self.parse_identifier()?; + if self.parse_keyword(Keyword::AS) { + let column_alias = self.parse_identifier()?; + options.push((add_column, Some(column_alias))); + } else { + options.push((add_column, None)); + } + } + Ok(options) + } + pub fn parse_columns_with_watermark(&mut self) -> Result { let mut columns = vec![]; let mut constraints = vec![]; diff --git a/src/sqlparser/tests/testdata/create.yaml b/src/sqlparser/tests/testdata/create.yaml index 4da81a4c43325..c45cf0e6fc2e7 100644 --- a/src/sqlparser/tests/testdata/create.yaml +++ b/src/sqlparser/tests/testdata/create.yaml @@ -35,13 +35,13 @@ Near "pad CHARACTER VARYING) FROM sbtest" - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.location = 'file://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }] }), source_watermarks: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "location", quote_style: None }]), value: SingleQuotedString("file://") }] }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') formatted_sql: CREATE SOURCE IF NOT EXISTS src WITH (kafka.topic = 'abc', kafka.servers = 'localhost:1001') FORMAT PLAIN ENCODE PROTOBUF (message = 'Foo', schema.registry = 'http://') - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }] }), source_watermarks: [] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: true, columns: [], constraints: [], source_name: ObjectName([Ident { value: "src", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "topic", quote_style: None }]), value: SingleQuotedString("abc") }, SqlOption { name: ObjectName([Ident { value: "kafka", quote_style: None }, Ident { value: "servers", quote_style: None }]), value: SingleQuotedString("localhost:1001") }]), source_schema: V2(ConnectorSchema { format: Plain, row_encode: Protobuf, row_options: [SqlOption { name: ObjectName([Ident { value: "message", quote_style: None }]), value: SingleQuotedString("Foo") }, SqlOption { name: ObjectName([Ident { value: "schema", quote_style: None }, Ident { value: "registry", quote_style: None }]), value: SingleQuotedString("http://") }] }), source_watermarks: [], include_column_options: [] } }' - input: CREATE SOURCE bid (auction INTEGER, bidder INTEGER, price INTEGER, WATERMARK FOR auction AS auction - 1, "date_time" TIMESTAMP) with (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') formatted_sql: CREATE SOURCE bid (auction INT, bidder INT, price INT, "date_time" TIMESTAMP, WATERMARK FOR auction AS auction - 1) WITH (connector = 'nexmark', nexmark.table.type = 'Bid', nexmark.split.num = '12', nexmark.min.event.gap.in.ns = '0') FORMAT NATIVE ENCODE NATIVE - formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [] }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }] } }' + formatted_ast: 'CreateSource { stmt: CreateSourceStatement { if_not_exists: false, columns: [ColumnDef { name: Ident { value: "auction", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "bidder", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "price", quote_style: None }, data_type: Some(Int), collation: None, options: [] }, ColumnDef { name: Ident { value: "date_time", quote_style: Some(''"'') }, data_type: Some(Timestamp(false)), collation: None, options: [] }], constraints: [], source_name: ObjectName([Ident { value: "bid", quote_style: None }]), with_properties: WithProperties([SqlOption { name: ObjectName([Ident { value: "connector", quote_style: None }]), value: SingleQuotedString("nexmark") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "table", quote_style: None }, Ident { value: "type", quote_style: None }]), value: SingleQuotedString("Bid") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "split", quote_style: None }, Ident { value: "num", quote_style: None }]), value: SingleQuotedString("12") }, SqlOption { name: ObjectName([Ident { value: "nexmark", quote_style: None }, Ident { value: "min", quote_style: None }, Ident { value: "event", quote_style: None }, Ident { value: "gap", quote_style: None }, Ident { value: "in", quote_style: None }, Ident { value: "ns", quote_style: None }]), value: SingleQuotedString("0") }]), source_schema: V2(ConnectorSchema { format: Native, row_encode: Native, row_options: [] }), source_watermarks: [SourceWatermark { column: Ident { value: "auction", quote_style: None }, expr: BinaryOp { left: Identifier(Ident { value: "auction", quote_style: None }), op: Minus, right: Value(Number("1")) } }], include_column_options: [] } }' - input: CREATE TABLE T (v1 INT, v2 STRUCT) formatted_sql: CREATE TABLE T (v1 INT, v2 STRUCT) - input: CREATE TABLE T (v1 INT, v2 STRUCT>) diff --git a/src/storage/src/row_serde/mod.rs b/src/storage/src/row_serde/mod.rs index 5fc99b8b6945a..b94c8e663d9c4 100644 --- a/src/storage/src/row_serde/mod.rs +++ b/src/storage/src/row_serde/mod.rs @@ -84,32 +84,36 @@ mod test { check( result, expect![[r#" - ( - [ - ColumnDesc { - data_type: Int64, - column_id: #2, - name: "", - field_descs: [], - type_name: "", - generated_or_default_column: None, - description: None, - }, - ColumnDesc { - data_type: Int16, - column_id: #3, - name: "", - field_descs: [], - type_name: "", - generated_or_default_column: None, - description: None, - }, - ], - [ - 1, - 2, - ], - )"#]], + ( + [ + ColumnDesc { + data_type: Int64, + column_id: #2, + name: "", + field_descs: [], + type_name: "", + generated_or_default_column: None, + description: None, + additional_column_type: Normal, + version: Pr13707, + }, + ColumnDesc { + data_type: Int16, + column_id: #3, + name: "", + field_descs: [], + type_name: "", + generated_or_default_column: None, + description: None, + additional_column_type: Normal, + version: Pr13707, + }, + ], + [ + 1, + 2, + ], + )"#]], ); let table_columns = vec![ @@ -132,6 +136,8 @@ mod test { type_name: "", generated_or_default_column: None, description: None, + additional_column_type: Normal, + version: Pr13707, }, ColumnDesc { data_type: Varchar, @@ -141,6 +147,8 @@ mod test { type_name: "", generated_or_default_column: None, description: None, + additional_column_type: Normal, + version: Pr13707, }, ], [ diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index d2bd5a34c2ad7..686d90f8e5f79 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -12,8 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::{ColumnId, TableId}; +use risingwave_common::catalog::{default_key_column_name_version_mapping, ColumnId, TableId}; use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; +use risingwave_pb::data::data_type::TypeName as PbTypeName; +use risingwave_pb::plan_common::{ + AdditionalColumnType, ColumnDescVersion, FormatType, PbEncodeType, +}; use risingwave_pb::stream_plan::SourceNode; use risingwave_source::source_desc::SourceDescBuilder; use risingwave_storage::panic_store::PanicStateStore; @@ -50,8 +54,37 @@ impl ExecutorBuilder for SourceExecutorBuilder { let source_name = source.source_name.clone(); let source_info = source.get_info()?; + let mut source_columns = source.columns.clone(); + + { + // compatible code: introduced in https://github.com/risingwavelabs/risingwave/pull/13707 + // for upsert and (avro | protobuf) overwrite the `_rw_key` column's ColumnDesc.additional_column_type to Key + if source_info.format() == FormatType::Upsert + && (source_info.row_encode() == PbEncodeType::Avro + || source_info.row_encode() == PbEncodeType::Protobuf) + { + let _ = source_columns.iter_mut().map(|c| { + let _ = c.column_desc.as_mut().map(|desc| { + let is_bytea = desc + .get_column_type() + .map(|col_type| col_type.type_name == PbTypeName::Bytea as i32) + .unwrap(); + if desc.name == default_key_column_name_version_mapping( + &desc.version() + ) + && is_bytea + // the column is from a legacy version + && desc.version == ColumnDescVersion::Unspecified as i32 + { + desc.additional_column_type = AdditionalColumnType::Key as i32; + } + }); + }); + } + } + let source_desc_builder = SourceDescBuilder::new( - source.columns.clone(), + source_columns.clone(), params.env.source_metrics(), source.row_id_index.map(|x| x as _), source.with_properties.clone(), @@ -74,8 +107,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { chunk_size: params.env.config().developer.chunk_size, }; - let source_column_ids: Vec<_> = source - .columns + let source_column_ids: Vec<_> = source_columns .iter() .map(|column| ColumnId::from(column.get_column_desc().unwrap().column_id)) .collect(); diff --git a/src/tests/sqlsmith/src/lib.rs b/src/tests/sqlsmith/src/lib.rs index 6b6f164d3cd73..c5a9c9b919aa4 100644 --- a/src/tests/sqlsmith/src/lib.rs +++ b/src/tests/sqlsmith/src/lib.rs @@ -281,6 +281,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); append_only: false, query: None, cdc_table_info: None, + include_column_options: [], }, CreateTable { or_replace: false, @@ -325,6 +326,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); append_only: false, query: None, cdc_table_info: None, + include_column_options: [], }, CreateTable { or_replace: false, @@ -380,6 +382,7 @@ CREATE TABLE t3(v1 int, v2 bool, v3 smallint); append_only: false, query: None, cdc_table_info: None, + include_column_options: [], }, ], )"#]], @@ -511,6 +514,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY append_only: false, query: None, cdc_table_info: None, + include_column_options: [], }, CreateTable { or_replace: false, @@ -562,6 +566,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY append_only: false, query: None, cdc_table_info: None, + include_column_options: [], }, CreateTable { or_replace: false, @@ -620,6 +625,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY append_only: false, query: None, cdc_table_info: None, + include_column_options: [], }, CreateTable { or_replace: false, @@ -696,6 +702,7 @@ CREATE TABLE t4(v1 int PRIMARY KEY, v2 smallint PRIMARY KEY, v3 bool PRIMARY KEY append_only: false, query: None, cdc_table_info: None, + include_column_options: [], }, ], )"#]],