diff --git a/e2e_test/source/basic/inlcude_key_as.slt b/e2e_test/source/basic/inlcude_key_as.slt index 83cace5eae6f6..c971e1ac074d0 100644 --- a/e2e_test/source/basic/inlcude_key_as.slt +++ b/e2e_test/source/basic/inlcude_key_as.slt @@ -61,12 +61,32 @@ WITH ( topic = 'kafka_additional_columns') FORMAT PLAIN ENCODE JSON +# header with varchar type & non-exist header key +statement ok +create table additional_columns_1 (a int) +include key as key_col +include partition as partition_col +include offset as offset_col +include timestamp as timestamp_col +include header 'header1' as header_col_1 +include header 'header2' as header_col_2 +include header 'header2' varchar as header_col_3 +include header 'header3' as header_col_4 +WITH ( + connector = 'kafka', + properties.bootstrap.server = 'message_queue:29092', + topic = 'kafka_additional_columns') +FORMAT PLAIN ENCODE JSON + statement ok select * from upsert_students_default_key; statement ok select * from additional_columns; +statement ok +select * from additional_columns_1; + # Wait enough time to ensure SourceExecutor consumes all Kafka data. sleep 3s @@ -98,8 +118,16 @@ FROM additional_columns limit 1; ---- header1 \x7631 +query TTTT +select header_col_1, header_col_2, header_col_3, header_col_4 from additional_columns_1 limit 1 +---- +\x7631 \x7632 v2 NULL + statement ok drop table upsert_students_default_key statement ok drop table additional_columns + +statement ok +drop table additional_columns_1 diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 6fdcc86feb7d8..c85ea9ef8a603 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -15,17 +15,6 @@ message Field { string name = 2; } -enum AdditionalColumnType { - UNSPECIFIED = 0; - KEY = 1; - TIMESTAMP = 2; - PARTITION = 3; - OFFSET = 4; - HEADER = 5; - FILENAME = 6; - NORMAL = 7; -} - enum ColumnDescVersion { COLUMN_DESC_VERSION_UNSPECIFIED = 0; // Introduced in https://github.com/risingwavelabs/risingwave/pull/13707#discussion_r1429947537, @@ -64,9 +53,15 @@ message ColumnDesc { // This field is used to represent the connector-spec additional column type. // UNSPECIFIED or unset for normal column. - AdditionalColumnType additional_column_type = 9; + + // deprecated, use AdditionalColumn instead + // AdditionalColumnType additional_column_type = 9; + reserved "additional_column_type"; + reserved 9; ColumnDescVersion version = 10; + + AdditionalColumn additional_columns = 11; } message ColumnCatalog { @@ -190,3 +185,33 @@ message Cardinality { message ExprContext { string time_zone = 1; } + +message AdditionalColumnKey {} + +message AdditionalColumnTimestamp {} + +message AdditionalColumnPartition {} + +message AdditionalColumnOffset {} + +message AdditionalColumnFilename {} + +message AdditionalColumnHeader { + string inner_field = 1; + data.DataType data_type = 2; +} + +// this type means we read all headers as a whole +message AdditionalColumnHeaders {} + +message AdditionalColumn { + oneof column_type { + AdditionalColumnKey key = 1; + AdditionalColumnTimestamp timestamp = 2; + AdditionalColumnPartition partition = 3; + AdditionalColumnOffset offset = 4; + AdditionalColumnHeader header_inner = 5; + AdditionalColumnFilename filename = 6; + AdditionalColumnHeaders headers = 7; + } +} diff --git a/src/common/src/catalog/column.rs b/src/common/src/catalog/column.rs index 7be23eb3a0b19..09fd3db09561f 100644 --- a/src/common/src/catalog/column.rs +++ b/src/common/src/catalog/column.rs @@ -18,7 +18,7 @@ use itertools::Itertools; use risingwave_pb::expr::ExprNode; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ - AdditionalColumnType, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, + AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, }; use super::row_id_column_desc; @@ -103,7 +103,7 @@ pub struct ColumnDesc { pub type_name: String, pub generated_or_default_column: Option, pub description: Option, - pub additional_column_type: AdditionalColumnType, + pub additional_columns: AdditionalColumn, pub version: ColumnDescVersion, } @@ -117,7 +117,7 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -131,7 +131,7 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -140,7 +140,7 @@ impl ColumnDesc { name: impl Into, column_id: ColumnId, data_type: DataType, - additional_column_type: AdditionalColumnType, + additional_column_type: AdditionalColumn, ) -> ColumnDesc { ColumnDesc { data_type, @@ -150,7 +150,7 @@ impl ColumnDesc { type_name: String::new(), generated_or_default_column: None, description: None, - additional_column_type, + additional_columns: additional_column_type, version: ColumnDescVersion::Pr13707, } } @@ -170,7 +170,7 @@ impl ColumnDesc { type_name: self.type_name.clone(), generated_or_default_column: self.generated_or_default_column.clone(), description: self.description.clone(), - additional_column_type: self.additional_column_type as i32, + additional_columns: Some(self.additional_columns.clone()), version: self.version as i32, } } @@ -198,7 +198,7 @@ impl ColumnDesc { type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -221,7 +221,7 @@ impl ColumnDesc { type_name: type_name.to_string(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -239,7 +239,7 @@ impl ColumnDesc { type_name: field.type_name.clone(), description: None, generated_or_default_column: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, } } @@ -265,7 +265,10 @@ impl ColumnDesc { impl From for ColumnDesc { fn from(prost: PbColumnDesc) -> Self { - let additional_column_type = prost.additional_column_type(); + let additional_columns = prost + .get_additional_columns() + .unwrap_or(&AdditionalColumn { column_type: None }) + .clone(); let version = prost.version(); let field_descs: Vec = prost .field_descs @@ -280,7 +283,7 @@ impl From for ColumnDesc { field_descs, generated_or_default_column: prost.generated_or_default_column, description: prost.description.clone(), - additional_column_type, + additional_columns, version, } } @@ -302,7 +305,7 @@ impl From<&ColumnDesc> for PbColumnDesc { type_name: c.type_name.clone(), generated_or_default_column: c.generated_or_default_column.clone(), description: c.description.clone(), - additional_column_type: c.additional_column_type as i32, + additional_columns: c.additional_columns.clone().into(), version: c.version as i32, } } diff --git a/src/common/src/catalog/test_utils.rs b/src/common/src/catalog/test_utils.rs index db8efb278dd83..ca154b9bf0b0b 100644 --- a/src/common/src/catalog/test_utils.rs +++ b/src/common/src/catalog/test_utils.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType; -use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion}; +use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; pub trait ColumnDescTestExt { /// Create a [`ColumnDesc`] with the given name and type. @@ -35,7 +35,7 @@ impl ColumnDescTestExt for ColumnDesc { column_type: Some(data_type), column_id, name: name.to_string(), - additional_column_type: AdditionalColumnType::Normal as i32, + additional_columns: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, ..Default::default() } @@ -60,7 +60,7 @@ impl ColumnDescTestExt for ColumnDesc { field_descs: fields, generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal as i32, + additional_columns: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, } } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index f604f1225227c..fa023bac01baa 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -12,104 +12,223 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{HashMap, HashSet}; +use std::sync::LazyLock; + use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; +use risingwave_common::error::Result; use risingwave_common::types::{DataType, StructType}; -use risingwave_pb::plan_common::AdditionalColumnType; +use risingwave_pb::data::data_type::TypeName; +use risingwave_pb::data::DataType as PbDataType; +use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; +use risingwave_pb::plan_common::{ + AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders, + AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition, + AdditionalColumnTimestamp, +}; use crate::source::{ GCS_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, OPENDAL_S3_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; -pub type CompatibleAdditionalColumnsFn = - Box ColumnCatalog + Send + Sync + 'static>; +pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock>> = + LazyLock::new(|| { + HashMap::from([ + ( + KAFKA_CONNECTOR, + HashSet::from(["key", "timestamp", "partition", "offset", "header"]), + ), + ( + PULSAR_CONNECTOR, + HashSet::from(["key", "partition", "offset"]), + ), + ( + KINESIS_CONNECTOR, + HashSet::from(["key", "partition", "offset", "timestamp"]), + ), + (OPENDAL_S3_CONNECTOR, HashSet::from(["file", "offset"])), + (S3_CONNECTOR, HashSet::from(["file", "offset"])), + (GCS_CONNECTOR, HashSet::from(["file", "offset"])), + ]) + }); + +fn gen_default_name( + connector_name: &str, + additional_col_type: &str, + inner_field_name: Option<&str>, + data_type: Option<&str>, +) -> String { + let col_name = [ + Some(connector_name), + Some(additional_col_type), + inner_field_name, + data_type, + ]; + col_name.iter().fold("_rw".to_string(), |name, ele| { + if let Some(ele) = ele { + format!("{}_{}", name, ele) + } else { + name + } + }) +} -pub fn get_connector_compatible_additional_columns( +pub fn build_additional_column_catalog( + column_id: ColumnId, connector_name: &str, -) -> Option> { - let compatible_columns = match connector_name { - KAFKA_CONNECTOR => kafka_compatible_column_vec(), - PULSAR_CONNECTOR => pulsar_compatible_column_vec(), - KINESIS_CONNECTOR => kinesis_compatible_column_vec(), - OPENDAL_S3_CONNECTOR | S3_CONNECTOR | GCS_CONNECTOR => s3_compatible_column_column_vec(), - _ => return None, + additional_col_type: &str, + column_alias: Option, + inner_field_name: Option<&str>, + data_type: Option<&str>, +) -> Result { + let compatible_columns = COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name).unwrap(); + if !compatible_columns.contains(additional_col_type) { + return Err(format!( + "additional column type {} is not supported for connector {}, acceptable column types: {:?}", + additional_col_type, connector_name, compatible_columns + ).into()); + } + + let column_name = column_alias.unwrap_or_else(|| { + gen_default_name( + connector_name, + additional_col_type, + inner_field_name, + data_type, + ) + }); + + let catalog = match additional_col_type { + "key" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Bytea, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), + }, + ), + is_hidden: false, + }, + "timestamp" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Timestamptz, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Timestamp( + AdditionalColumnTimestamp {}, + )), + }, + ), + is_hidden: false, + }, + "partition" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Partition( + AdditionalColumnPartition {}, + )), + }, + ), + is_hidden: false, + }, + "offset" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Offset(AdditionalColumnOffset {})), + }, + ), + is_hidden: false, + }, + "file" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::Filename(AdditionalColumnFilename {})), + }, + ), + is_hidden: false, + }, + "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type), + _ => unreachable!(), }; - Some(compatible_columns) + + Ok(catalog) } -fn kafka_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { - vec![ - ( - "key", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, +fn build_header_catalog( + column_id: ColumnId, + col_name: &str, + inner_field_name: Option<&str>, + data_type: Option<&str>, +) -> ColumnCatalog { + if let Some(inner) = inner_field_name { + let (data_type, pb_data_type) = { + if let Some(type_name) = data_type { + match type_name { + "bytea" => ( DataType::Bytea, - AdditionalColumnType::Key, - ), - is_hidden: false, - } - }), - ), - ( - "timestamp", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Timestamptz, - AdditionalColumnType::Timestamp, + PbDataType { + type_name: TypeName::Bytea as i32, + ..Default::default() + }, ), - is_hidden: false, - } - }), - ), - ( - "partition", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, + "varchar" => ( DataType::Varchar, - AdditionalColumnType::Partition, + PbDataType { + type_name: TypeName::Varchar as i32, + ..Default::default() + }, ), - is_hidden: false, + _ => unreachable!(), } - }), - ), - ( - "offset", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Offset, - ), - is_hidden: false, - } - }), - ), - ( - "header", // type: struct[] - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::List(get_kafka_header_item_datatype().into()), - AdditionalColumnType::Header, - ), - is_hidden: false, - } - }), - ), - ] + } else { + ( + DataType::Bytea, + PbDataType { + type_name: TypeName::Bytea as i32, + ..Default::default() + }, + ) + } + }; + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + col_name, + column_id, + data_type, + AdditionalColumn { + column_type: Some(AdditionalColumnType::HeaderInner(AdditionalColumnHeader { + inner_field: inner.to_string(), + data_type: Some(pb_data_type), + })), + }, + ), + is_hidden: false, + } + } else { + ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + col_name, + column_id, + DataType::List(get_kafka_header_item_datatype().into()), + AdditionalColumn { + column_type: Some(AdditionalColumnType::Headers(AdditionalColumnHeaders {})), + }, + ), + is_hidden: false, + } + } } pub fn get_kafka_header_item_datatype() -> DataType { @@ -117,143 +236,23 @@ pub fn get_kafka_header_item_datatype() -> DataType { DataType::Struct(StructType::new(struct_inner)) } -fn pulsar_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { - vec![ - ( - "key", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Bytea, - AdditionalColumnType::Key, - ), - is_hidden: false, - } - }), - ), - ( - "partition", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Partition, - ), - is_hidden: false, - } - }), - ), - ( - "offset", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Offset, - ), - is_hidden: false, - } - }), - ), - ] -} - -fn kinesis_compatible_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { - vec![ - ( - "key", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Bytea, - AdditionalColumnType::Key, - ), - is_hidden: false, - } - }), - ), - ( - "partition", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Partition, - ), - is_hidden: false, - } - }), - ), - ( - "offset", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Offset, - ), - is_hidden: false, - } - }), - ), - ( - "timestamp", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Timestamptz, - AdditionalColumnType::Timestamp, - ), - is_hidden: false, - } - }), - ), - ] -} +#[cfg(test)] +mod test { + use super::*; -fn s3_compatible_column_column_vec() -> Vec<(&'static str, CompatibleAdditionalColumnsFn)> { - vec![ - ( - "file", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Filename, - ), - is_hidden: false, - } - }), - ), - ( - "offset", - Box::new(|id: ColumnId, name: &str| -> ColumnCatalog { - ColumnCatalog { - column_desc: ColumnDesc::named_with_additional_column( - name, - id, - DataType::Varchar, - AdditionalColumnType::Offset, - ), - is_hidden: false, - } - }), - ), - ] + #[test] + fn test_gen_default_name() { + assert_eq!( + gen_default_name("kafka", "key", None, None), + "_rw_kafka_key" + ); + assert_eq!( + gen_default_name("kafka", "header", Some("inner"), None), + "_rw_kafka_header_inner" + ); + assert_eq!( + gen_default_name("kafka", "header", Some("inner"), Some("varchar")), + "_rw_kafka_header_inner_varchar" + ); + } } diff --git a/src/connector/src/parser/avro/util.rs b/src/connector/src/parser/avro/util.rs index f81f73fd1b7e4..12ab86cdb8d3c 100644 --- a/src/connector/src/parser/avro/util.rs +++ b/src/connector/src/parser/avro/util.rs @@ -18,7 +18,7 @@ use apache_avro::schema::{DecimalSchema, RecordSchema, Schema}; use itertools::Itertools; use risingwave_common::log::LogSuppresser; use risingwave_common::types::{DataType, Decimal}; -use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion}; +use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; pub fn avro_schema_to_column_descs(schema: &Schema) -> anyhow::Result> { if let Schema::Record(RecordSchema { fields, .. }) = schema { @@ -61,7 +61,7 @@ fn avro_field_to_column_desc( type_name: schema_name.to_string(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal as i32, + additional_columns: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) } @@ -71,7 +71,7 @@ fn avro_field_to_column_desc( column_type: Some(data_type.to_protobuf()), column_id: *index, name: name.to_owned(), - additional_column_type: AdditionalColumnType::Normal as i32, + additional_columns: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, ..Default::default() }) diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index 5efdd237e9e32..ae7dd22b2d349 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -501,7 +501,7 @@ mod tests { // postgres-specific data-type mapping tests mod test3_postgres { - use risingwave_pb::plan_common::AdditionalColumnType; + use risingwave_pb::plan_common::AdditionalColumn; use super::*; use crate::source::SourceColumnType; @@ -566,7 +566,7 @@ mod tests { fields: vec![], column_type: SourceColumnType::Normal, is_pk: false, - additional_column_type: AdditionalColumnType::Normal, + additional_column_type: AdditionalColumn { column_type: None }, }, SourceColumnDesc::simple("o_enum", DataType::Varchar, ColumnId::from(8)), SourceColumnDesc::simple("o_char", DataType::Varchar, ColumnId::from(9)), diff --git a/src/connector/src/parser/json_parser.rs b/src/connector/src/parser/json_parser.rs index 5ed5e2811df98..b549dcb838c24 100644 --- a/src/connector/src/parser/json_parser.rs +++ b/src/connector/src/parser/json_parser.rs @@ -203,7 +203,8 @@ mod tests { use risingwave_common::row::Row; use risingwave_common::test_prelude::StreamChunkTestExt; use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; - use risingwave_pb::plan_common::AdditionalColumnType; + use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; + use risingwave_pb::plan_common::{AdditionalColumn, AdditionalColumnKey}; use super::JsonParser; use crate::parser::upsert_parser::UpsertParser; @@ -580,7 +581,9 @@ mod tests { fields: vec![], column_type: SourceColumnType::Normal, is_pk: true, - additional_column_type: AdditionalColumnType::Key, + additional_column_type: AdditionalColumn { + column_type: Some(AdditionalColumnType::Key(AdditionalColumnKey {})), + }, }; let descs = vec![ SourceColumnDesc::simple("a", DataType::Int32, 0.into()), diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index cb82c443ba0a7..c9ee987f4652c 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -36,7 +36,7 @@ use risingwave_common::util::tracing::InstrumentStream; use risingwave_pb::catalog::{ SchemaRegistryNameStrategy as PbSchemaRegistryNameStrategy, StreamSourceInfo, }; -use risingwave_pb::plan_common::AdditionalColumnType; +use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use self::avro::AvroAccessBuilder; use self::bytes_parser::BytesAccessBuilder; @@ -49,7 +49,9 @@ use self::upsert_parser::UpsertParser; use self::util::get_kafka_topic; use crate::common::AwsAuthProps; use crate::parser::maxwell::MaxwellParser; -use crate::parser::util::{extract_headers_from_meta, extreact_timestamp_from_meta}; +use crate::parser::util::{ + extract_header_inner_from_meta, extract_headers_from_meta, extreact_timestamp_from_meta, +}; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; use crate::source::{ @@ -317,7 +319,7 @@ impl SourceStreamChunkRowWriter<'_> { mut f: impl FnMut(&SourceColumnDesc) -> AccessResult, ) -> AccessResult<()> { let mut wrapped_f = |desc: &SourceColumnDesc| { - match (&desc.column_type, &desc.additional_column_type) { + match (&desc.column_type, &desc.additional_column_type.column_type) { (&SourceColumnType::Offset | &SourceColumnType::RowId, _) => { // SourceColumnType is for CDC source only. Ok(A::output_for( @@ -341,7 +343,7 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap(), // handled all match cases in internal match, unwrap is safe )); } - (_, &AdditionalColumnType::Timestamp) => { + (_, &Some(AdditionalColumnType::Timestamp(_))) => { return Ok(A::output_for( self.row_meta .as_ref() @@ -349,7 +351,7 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap_or(None), )) } - (_, &AdditionalColumnType::Partition) => { + (_, &Some(AdditionalColumnType::Partition(_))) => { // the meta info does not involve spec connector return Ok(A::output_for( self.row_meta @@ -357,7 +359,7 @@ impl SourceStreamChunkRowWriter<'_> { .map(|ele| ScalarImpl::Utf8(ele.split_id.to_string().into())), )); } - (_, &AdditionalColumnType::Offset) => { + (_, &Some(AdditionalColumnType::Offset(_))) => { // the meta info does not involve spec connector return Ok(A::output_for( self.row_meta @@ -365,7 +367,21 @@ impl SourceStreamChunkRowWriter<'_> { .map(|ele| ScalarImpl::Utf8(ele.offset.to_string().into())), )); } - (_, &AdditionalColumnType::Header) => { + (_, &Some(AdditionalColumnType::HeaderInner(ref header_inner))) => { + return Ok(A::output_for( + self.row_meta + .as_ref() + .and_then(|ele| { + extract_header_inner_from_meta( + ele.meta, + header_inner.inner_field.as_ref(), + header_inner.data_type.as_ref(), + ) + }) + .unwrap_or(None), + )) + } + (_, &Some(AdditionalColumnType::Headers(_))) => { return Ok(A::output_for( self.row_meta .as_ref() @@ -373,7 +389,7 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap_or(None), )) } - (_, &AdditionalColumnType::Filename) => { + (_, &Some(AdditionalColumnType::Filename(_))) => { // Filename is used as partition in FS connectors return Ok(A::output_for( self.row_meta diff --git a/src/connector/src/parser/protobuf/parser.rs b/src/connector/src/parser/protobuf/parser.rs index 47a1747f7c683..36b12dded6029 100644 --- a/src/connector/src/parser/protobuf/parser.rs +++ b/src/connector/src/parser/protobuf/parser.rs @@ -24,7 +24,7 @@ use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::try_match_expand; use risingwave_common::types::{DataType, Datum, Decimal, JsonbVal, ScalarImpl, F32, F64}; -use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDesc, ColumnDescVersion}; +use risingwave_pb::plan_common::{AdditionalColumn, ColumnDesc, ColumnDescVersion}; use super::schema_resolver::*; use crate::parser::unified::protobuf::ProtobufAccess; @@ -175,7 +175,7 @@ impl ProtobufParserConfig { type_name: m.full_name().to_string(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal as i32, + additional_columns: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, }) } else { @@ -184,7 +184,7 @@ impl ProtobufParserConfig { column_id: *index, name: field_descriptor.name().to_string(), column_type: Some(field_type.to_protobuf()), - additional_column_type: AdditionalColumnType::Normal as i32, + additional_columns: Some(AdditionalColumn { column_type: None }), version: ColumnDescVersion::Pr13707 as i32, ..Default::default() }) diff --git a/src/connector/src/parser/unified/upsert.rs b/src/connector/src/parser/unified/upsert.rs index 648a667e2b2d8..8fbed9dc2ac52 100644 --- a/src/connector/src/parser/unified/upsert.rs +++ b/src/connector/src/parser/unified/upsert.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_common::types::DataType; -use risingwave_pb::plan_common::AdditionalColumnType; +use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use super::{Access, ChangeEvent, ChangeEventOperation}; use crate::parser::unified::AccessError; @@ -105,8 +105,8 @@ where } fn access_field(&self, desc: &SourceColumnDesc) -> super::AccessResult { - match desc.additional_column_type { - AdditionalColumnType::Key => { + match desc.additional_column_type.column_type { + Some(AdditionalColumnType::Key(_)) => { if let Some(key_as_column_name) = &self.key_column_name && &desc.name == key_as_column_name { @@ -115,9 +115,7 @@ where self.access(&["key", &desc.name], Some(&desc.data_type)) } } - AdditionalColumnType::Unspecified | AdditionalColumnType::Normal => { - self.access(&["value", &desc.name], Some(&desc.data_type)) - } + None => self.access(&["value", &desc.name], Some(&desc.data_type)), _ => unreachable!(), } } diff --git a/src/connector/src/parser/upsert_parser.rs b/src/connector/src/parser/upsert_parser.rs index 99a2c19c33e78..fbd82f6f3c167 100644 --- a/src/connector/src/parser/upsert_parser.rs +++ b/src/connector/src/parser/upsert_parser.rs @@ -14,7 +14,7 @@ use risingwave_common::error::ErrorCode::ProtocolError; use risingwave_common::error::{Result, RwError}; -use risingwave_pb::plan_common::AdditionalColumnType; +use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use super::bytes_parser::BytesAccessBuilder; use super::unified::upsert::UpsertChangeEvent; @@ -53,7 +53,10 @@ async fn build_accessor_builder( pub fn get_key_column_name(columns: &[SourceColumnDesc]) -> Option { columns.iter().find_map(|column| { - if column.additional_column_type == AdditionalColumnType::Key { + if matches!( + column.additional_column_type.column_type, + Some(AdditionalColumnType::Key(_)) + ) { Some(column.name.clone()) } else { None diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 4e0bd6f2f3f0f..81819d7b0953d 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -18,6 +18,7 @@ use reqwest::Url; use risingwave_common::error::ErrorCode::{InvalidParameterValue, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::Datum; +use risingwave_pb::data::DataType as PbDataType; use crate::aws_utils::load_file_descriptor_from_s3; use crate::common::AwsAuthProps; @@ -121,7 +122,18 @@ pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option { match meta { - SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), + SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), /* expect output of type `array[struct]` */ + _ => None, + } +} + +pub fn extract_header_inner_from_meta( + meta: &SourceMeta, + inner_field: &str, + data_type: Option<&PbDataType>, +) -> Option { + match meta { + SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_header_inner(inner_field, data_type), /* expect output of type `bytea` or `varchar` */ _ => None, } } diff --git a/src/connector/src/source/kafka/source/message.rs b/src/connector/src/source/kafka/source/message.rs index 52f9722533136..0ef55dc79132d 100644 --- a/src/connector/src/source/kafka/source/message.rs +++ b/src/connector/src/source/kafka/source/message.rs @@ -16,6 +16,8 @@ use itertools::Itertools; use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders}; use rdkafka::Message; use risingwave_common::types::{Datum, ListValue, Scalar, ScalarImpl, StructValue}; +use risingwave_pb::data::data_type::TypeName as PbTypeName; +use risingwave_pb::data::DataType as PbDataType; use crate::parser::additional_columns::get_kafka_header_item_datatype; use crate::source::base::SourceMessage; @@ -39,6 +41,31 @@ impl KafkaMeta { .into() } + pub fn extract_header_inner( + &self, + inner_field: &str, + data_type: Option<&PbDataType>, + ) -> Option { + let target_value = self + .headers + .as_ref() + .iter() + .find_map(|headers| { + headers + .iter() + .find(|header| header.key == inner_field) + .map(|header| header.value) + }) + .unwrap_or(None); // if not found the specified column, return None + if let Some(data_type) = data_type + && data_type.type_name == PbTypeName::Varchar as i32 + { + Some(target_value.map(|byte| ScalarImpl::Utf8(String::from_utf8_lossy(byte).into()))) + } else { + Some(target_value.map(|byte| ScalarImpl::Bytea(byte.into()))) + } + } + pub fn extract_headers(&self) -> Option { self.headers.as_ref().map(|headers| { let header_item: Vec = headers diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 7f0cab97122ae..95e44a26cd919 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -25,7 +25,7 @@ use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::error::KafkaError; use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; -use risingwave_pb::plan_common::AdditionalColumnType; +use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use crate::parser::ParserConfig; use crate::source::base::SourceMessage; @@ -196,12 +196,12 @@ impl CommonSplitReader for KafkaSplitReader { let max_chunk_size = self.source_ctx.source_ctrl_opts.chunk_size; let mut res = Vec::with_capacity(max_chunk_size); // ingest kafka message header can be expensive, do it only when required - let require_message_header = self - .parser_config - .common - .rw_columns - .iter() - .any(|col_desc| col_desc.additional_column_type == AdditionalColumnType::Header); + let require_message_header = self.parser_config.common.rw_columns.iter().any(|col_desc| { + matches!( + col_desc.additional_column_type.column_type, + Some(AdditionalColumnType::Headers(_) | AdditionalColumnType::HeaderInner(_)) + ) + }); #[for_await] 'for_outer_loop: for msgs in self.consumer.stream().ready_chunks(max_chunk_size) { diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 07ce5a3d4898e..48b8a9fe5c096 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -19,7 +19,7 @@ use risingwave_common::catalog::{ TABLE_NAME_COLUMN_NAME, }; use risingwave_common::types::DataType; -use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDescVersion}; +use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion}; /// `SourceColumnDesc` is used to describe a column in the Source and is used as the column /// counterpart in `StreamScan` @@ -37,7 +37,7 @@ pub struct SourceColumnDesc { // `additional_column_type` and `column_type` are orthogonal // `additional_column_type` is used to indicate the column is from which part of the message // `column_type` is used to indicate the type of the column, only used in cdc scenario - pub additional_column_type: AdditionalColumnType, + pub additional_column_type: AdditionalColumn, } /// `SourceColumnType` is used to indicate the type of a column emitted by the Source. @@ -87,7 +87,7 @@ impl SourceColumnDesc { fields: vec![], column_type: SourceColumnType::Normal, is_pk: false, - additional_column_type: AdditionalColumnType::Normal, + additional_column_type: AdditionalColumn { column_type: None }, } } @@ -119,7 +119,7 @@ impl From<&ColumnDesc> for SourceColumnDesc { fields: c.field_descs.clone(), column_type, is_pk: false, - additional_column_type: c.additional_column_type, + additional_column_type: c.additional_columns.clone(), } } } @@ -134,7 +134,7 @@ impl From<&SourceColumnDesc> for ColumnDesc { type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_column_type: s.additional_column_type, + additional_columns: s.additional_column_type.clone(), version: ColumnDescVersion::Pr13707, } } diff --git a/src/connector/src/source/mod.rs b/src/connector/src/source/mod.rs index 484ffd156fb97..cba63b3005c1a 100644 --- a/src/connector/src/source/mod.rs +++ b/src/connector/src/source/mod.rs @@ -37,9 +37,6 @@ pub mod test_source; pub use manager::{SourceColumnDesc, SourceColumnType}; -pub use crate::parser::additional_columns::{ - get_connector_compatible_additional_columns, CompatibleAdditionalColumnsFn, -}; pub use crate::source::filesystem::opendal_source::{ GCS_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, }; diff --git a/src/frontend/src/binder/expr/mod.rs b/src/frontend/src/binder/expr/mod.rs index fedf2aacdd9cd..93356cd1efec6 100644 --- a/src/frontend/src/binder/expr/mod.rs +++ b/src/frontend/src/binder/expr/mod.rs @@ -18,7 +18,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::zip_eq_fast; use risingwave_common::{bail_not_implemented, not_implemented}; -use risingwave_pb::plan_common::{AdditionalColumnType, ColumnDescVersion}; +use risingwave_pb::plan_common::{AdditionalColumn, ColumnDescVersion}; use risingwave_sqlparser::ast::{ Array, BinaryOperator, DataType as AstDataType, Expr, Function, JsonPredicateType, ObjectName, Query, StructField, TrimWhereField, UnaryOperator, @@ -625,7 +625,7 @@ pub fn bind_struct_field(column_def: &StructField) -> Result { type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, }) } diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index c632cca5c745f..37e79664c666a 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -598,7 +598,7 @@ mod tests { use risingwave_common::util::sort_util::OrderType; use risingwave_pb::catalog::{PbStreamJobStatus, PbTable}; use risingwave_pb::plan_common::{ - AdditionalColumnType, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, + AdditionalColumn, ColumnDescVersion, PbColumnCatalog, PbColumnDesc, }; use super::*; @@ -694,7 +694,7 @@ mod tests { type_name: ".test.Country".to_string(), description: None, generated_or_default_column: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, }, is_hidden: false diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index d677d123812d8..a03e074301cd1 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -28,6 +28,9 @@ use risingwave_common::catalog::{ use risingwave_common::error::ErrorCode::{self, InvalidInputSyntax, NotSupported, ProtocolError}; use risingwave_common::error::{Result, RwError}; use risingwave_common::types::DataType; +use risingwave_connector::parser::additional_columns::{ + build_additional_column_catalog, COMPATIBLE_ADDITIONAL_COLUMNS, +}; use risingwave_connector::parser::{ schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig, SpecificParserConfig, @@ -44,19 +47,20 @@ use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::test_source::TEST_CONNECTOR; use risingwave_connector::source::{ - get_connector_compatible_additional_columns, GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, - KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, - POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, + GCS_CONNECTOR, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, KINESIS_CONNECTOR, NATS_CONNECTOR, + NEXMARK_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR, S3_CONNECTOR, }; use risingwave_pb::catalog::{ PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc, }; -use risingwave_pb::plan_common::{AdditionalColumnType, EncodeType, FormatType}; +use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; +use risingwave_pb::plan_common::{EncodeType, FormatType}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_sqlparser::ast::{ get_delimiter, AstString, ColumnDef, ConnectorSchema, CreateSourceStatement, Encode, Format, - Ident, ProtobufSchema, SourceWatermark, + ProtobufSchema, SourceWatermark, }; +use risingwave_sqlparser::parser::IncludeOption; use super::RwPgResponse; use crate::binder::Binder; @@ -495,29 +499,21 @@ fn bind_columns_from_source_for_cdc( /// add connector-spec columns to the end of column catalog pub fn handle_addition_columns( with_properties: &HashMap, - mut additional_columns: Vec<(Ident, Option)>, + mut additional_columns: IncludeOption, columns: &mut Vec, ) -> Result<()> { let connector_name = get_connector(with_properties).unwrap(); // there must be a connector in source - let addition_col_list = - match get_connector_compatible_additional_columns(connector_name.as_str()) { - Some(cols) => cols, - // early return if there are no accepted additional columns for the connector - None => { - return if additional_columns.is_empty() { - Ok(()) - } else { - Err(RwError::from(ProtocolError(format!( - "Connector {} accepts no additional column but got {:?}", - connector_name, additional_columns - )))) - } - } - }; - let gen_default_column_name = |connector_name: &str, addi_column_name: &str| { - format!("_rw_{}_{}", connector_name, addi_column_name) - }; + if COMPATIBLE_ADDITIONAL_COLUMNS + .get(connector_name.as_str()) + .is_none() + && !additional_columns.is_empty() + { + return Err(RwError::from(ProtocolError(format!( + "Connector {} accepts no additional column but got {:?}", + connector_name, additional_columns + )))); + } let latest_col_id: ColumnId = columns .iter() @@ -525,27 +521,30 @@ pub fn handle_addition_columns( .max() .unwrap(); // there must be at least one column in the column catalog - for (col_name, gen_column_catalog_fn) in addition_col_list { - // always insert in spec order - if let Some(idx) = additional_columns - .iter() - .position(|(col, _)| col.real_value().eq_ignore_ascii_case(col_name)) + while let Some(item) = additional_columns.pop() { { - let (_, alias) = additional_columns.remove(idx); - columns.push(gen_column_catalog_fn( - latest_col_id.next(), - alias - .map(|alias| alias.real_value()) - .unwrap_or_else(|| gen_default_column_name(connector_name.as_str(), col_name)) - .as_str(), - )) + // only allow header column have inner field + if item.inner_field.is_some() + && !item.column_type.real_value().eq_ignore_ascii_case("header") + { + return Err(RwError::from(ProtocolError(format!( + "Only header column can have inner field, but got {:?}", + item.column_type.real_value(), + )))); + } } - } - if !additional_columns.is_empty() { - return Err(RwError::from(ProtocolError(format!( - "Unknown additional columns {:?}", - additional_columns - )))); + + let data_type_name: Option = item + .header_inner_expect_type + .map(|dt| format!("{:?}", dt).to_lowercase()); + columns.push(build_additional_column_catalog( + latest_col_id.next(), + connector_name.as_str(), + item.column_type.real_value().as_str(), + item.column_alias.map(|alias| alias.real_value()), + item.inner_field.as_deref(), + data_type_name.as_deref(), + )?); } Ok(()) @@ -684,7 +683,10 @@ pub(crate) async fn bind_source_pk( // iter columns to check if contains additional columns from key part // return the key column names if exists columns.iter().find_map(|catalog| { - if catalog.column_desc.additional_column_type == AdditionalColumnType::Key { + if matches!( + catalog.column_desc.additional_columns.column_type, + Some(AdditionalColumnType::Key(_)) + ) { Some(catalog.name().to_string()) } else { None @@ -694,9 +696,7 @@ pub(crate) async fn bind_source_pk( let additional_column_names = columns .iter() .filter_map(|col| { - if (col.column_desc.additional_column_type != AdditionalColumnType::Unspecified) - && (col.column_desc.additional_column_type != AdditionalColumnType::Normal) - { + if col.column_desc.additional_columns.column_type.is_some() { Some(col.name().to_string()) } else { None @@ -845,21 +845,26 @@ fn check_and_add_timestamp_column( columns: &mut Vec, ) { if is_kafka_connector(with_properties) { - if columns - .iter() - .any(|col| col.column_desc.additional_column_type == AdditionalColumnType::Timestamp) - { + if columns.iter().any(|col| { + matches!( + col.column_desc.additional_columns.column_type, + Some(AdditionalColumnType::Timestamp(_)) + ) + }) { // already has timestamp column, no need to add a new one return; } // add a hidden column `_rw_kafka_timestamp` to each message from Kafka source - let mut catalog = get_connector_compatible_additional_columns(KAFKA_CONNECTOR) - .unwrap() - .iter() - .find(|(col_name, _)| col_name.eq(&"timestamp")) - .unwrap() - .1(ColumnId::placeholder(), KAFKA_TIMESTAMP_COLUMN_NAME); + let mut catalog = build_additional_column_catalog( + ColumnId::placeholder(), + KAFKA_CONNECTOR, + "timestamp", + Some(KAFKA_TIMESTAMP_COLUMN_NAME.to_string()), + None, + None, + ) + .unwrap(); catalog.is_hidden = true; columns.push(catalog); @@ -1545,5 +1550,18 @@ pub mod tests { } _ => unreachable!(), } + + let sql = + "CREATE SOURCE s3 (v1 int) include timestamp 'header1' as header_col with (connector = 'kafka') format plain encode json" + .to_string(); + match frontend.run_sql(sql).await { + Err(e) => { + assert_eq!( + e.to_string(), + "Protocol error: Only header column can have inner field, but got \"timestamp\"" + ) + } + _ => unreachable!(), + } } } diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d0aaa4c5c21a8..2ee247880191d 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -38,14 +38,15 @@ use risingwave_pb::catalog::{PbSource, PbTable, StreamSourceInfo, Table, Waterma use risingwave_pb::ddl_service::TableJobType; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::{ - AdditionalColumnType, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc, + AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc, }; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_sqlparser::ast::{ - CdcTableInfo, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, Ident, + CdcTableInfo, ColumnDef, ColumnOption, ConnectorSchema, DataType as AstDataType, Format, ObjectName, SourceWatermark, TableConstraint, }; +use risingwave_sqlparser::parser::IncludeOption; use super::RwPgResponse; use crate::binder::{bind_data_type, bind_struct_field, Clause}; @@ -211,7 +212,7 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result> type_name: "".to_string(), generated_or_default_column: None, description: None, - additional_column_type: AdditionalColumnType::Normal, + additional_columns: AdditionalColumn { column_type: None }, version: ColumnDescVersion::Pr13707, }, is_hidden: false, @@ -459,7 +460,7 @@ pub(crate) async fn gen_create_table_plan_with_source( source_watermarks: Vec, mut col_id_gen: ColumnIdGenerator, append_only: bool, - include_column_options: Vec<(Ident, Option)>, + include_column_options: IncludeOption, ) -> Result<(PlanRef, Option, PbTable)> { if append_only && source_schema.format != Format::Plain @@ -896,7 +897,7 @@ pub(super) async fn handle_create_table_plan( constraints: Vec, source_watermarks: Vec, append_only: bool, - include_column_options: Vec<(Ident, Option)>, + include_column_options: IncludeOption, ) -> Result<(PlanRef, Option, PbTable, TableJobType)> { let source_schema = check_create_table_with_source( context.with_options(), @@ -970,7 +971,7 @@ pub async fn handle_create_table( source_watermarks: Vec, append_only: bool, cdc_table_info: Option, - include_column_options: Vec<(Ident, Option)>, + include_column_options: IncludeOption, ) -> Result { let session = handler_args.session.clone(); @@ -1032,7 +1033,7 @@ pub async fn handle_create_table( pub fn check_create_table_with_source( with_options: &WithOptions, source_schema: Option, - include_column_options: &[(Ident, Option)], + include_column_options: &IncludeOption, ) -> Result> { let defined_source = with_options.inner().contains_key(UPSTREAM_SOURCE_KEY); if !include_column_options.is_empty() && !defined_source { diff --git a/src/prost/build.rs b/src/prost/build.rs index 7b06a5e5fb7e5..3af4c9873863d 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -115,6 +115,28 @@ fn main() -> Result<(), Box> { .type_attribute("plan_common.Cardinality", "#[derive(Eq, Hash, Copy)]") .type_attribute("plan_common.ExternalTableDesc", "#[derive(Eq, Hash)]") .type_attribute("plan_common.ColumnDesc", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalColumn", "#[derive(Eq, Hash)]") + .type_attribute( + "plan_common.AdditionalColumn.column_type", + "#[derive(Eq, Hash)]", + ) + .type_attribute("plan_common.AdditionalColumnNormal", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalColumnKey", "#[derive(Eq, Hash)]") + .type_attribute( + "plan_common.AdditionalColumnPartition", + "#[derive(Eq, Hash)]", + ) + .type_attribute( + "plan_common.AdditionalColumnTimestamp", + "#[derive(Eq, Hash)]", + ) + .type_attribute( + "plan_common.AdditionalColumnFilename", + "#[derive(Eq, Hash)]", + ) + .type_attribute("plan_common.AdditionalColumnHeader", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalColumnHeaders", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalColumnOffset", "#[derive(Eq, Hash)]") .type_attribute("common.ColumnOrder", "#[derive(Eq, Hash)]") .type_attribute("common.OrderType", "#[derive(Eq, Hash)]") .type_attribute("common.Buffer", "#[derive(Eq)]") diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index b11d4dc784bb4..f49d2fb8af154 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -55,7 +55,7 @@ pub use crate::ast::ddl::{ AlterIndexOperation, AlterSinkOperation, AlterSourceOperation, AlterViewOperation, }; use crate::keywords::Keyword; -use crate::parser::{Parser, ParserError}; +use crate::parser::{IncludeOption, IncludeOptionItem, Parser, ParserError}; pub struct DisplaySeparated<'a, T> where @@ -1132,7 +1132,7 @@ pub enum Statement { /// `FROM cdc_source TABLE database_name.table_name` cdc_table_info: Option, /// `INCLUDE a AS b INCLUDE c` - include_column_options: Vec<(Ident, Option)>, + include_column_options: IncludeOption, }, /// CREATE INDEX CreateIndex { @@ -1647,12 +1647,20 @@ impl fmt::Display for Statement { } if !include_column_options.is_empty() { // (Ident, Option) write!(f, "{}", display_comma_separated( - include_column_options.iter().map(|(a, b)| { - if let Some(b) = b { - format!("INCLUDE {} AS {}", a, b) - } else { - format!("INCLUDE {}", a) - } + include_column_options.iter().map(|option_item: &IncludeOptionItem| { + format!("INCLUDE {}{}{}", + option_item.column_type, + if let Some(inner_field) = &option_item.inner_field { + format!(" {}", inner_field) + } else { + "".into() + } + , if let Some(alias) = &option_item.column_alias { + format!(" AS {}", alias) + } else { + "".into() + } + ) }).collect_vec().as_slice() ))?; } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index e394697d45a9e..3dd923b610542 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -26,7 +26,7 @@ use crate::ast::{ display_comma_separated, display_separated, ColumnDef, ObjectName, SqlOption, TableConstraint, }; use crate::keywords::Keyword; -use crate::parser::{IsOptional, Parser, ParserError, UPSTREAM_SOURCE_KEY}; +use crate::parser::{IncludeOption, IsOptional, Parser, ParserError, UPSTREAM_SOURCE_KEY}; use crate::tokenizer::Token; /// Consumes token from the parser into an AST node. @@ -87,7 +87,7 @@ pub struct CreateSourceStatement { pub with_properties: WithProperties, pub source_schema: CompatibleSourceSchema, pub source_watermarks: Vec, - pub include_column_options: Vec<(Ident, Option)>, + pub include_column_options: IncludeOption, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index e88a1df3157da..dfb0a030125de 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -80,6 +80,17 @@ pub enum IsLateral { use IsLateral::*; +pub type IncludeOption = Vec; + +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[derive(Eq, Clone, Debug, PartialEq, Hash)] +pub struct IncludeOptionItem { + pub column_type: Ident, + pub column_alias: Option, + pub inner_field: Option, + pub header_inner_expect_type: Option, +} + #[derive(Debug)] pub enum WildcardOrExpr { Expr(Expr), @@ -2527,16 +2538,46 @@ impl Parser { }) } - pub fn parse_include_options(&mut self) -> Result)>, ParserError> { + pub fn parse_include_options(&mut self) -> Result { let mut options = vec![]; while self.parse_keyword(Keyword::INCLUDE) { - let add_column = self.parse_identifier()?; + let column_type = self.parse_identifier()?; + + let mut column_inner_field = None; + let mut header_inner_expect_type = None; + if let Token::SingleQuotedString(inner_field) = self.peek_token().token { + self.next_token(); + column_inner_field = Some(inner_field); + + if let Token::Word(w) = self.peek_token().token { + match w.keyword { + Keyword::BYTEA => { + header_inner_expect_type = Some(DataType::Bytea); + self.next_token(); + } + Keyword::VARCHAR => { + header_inner_expect_type = Some(DataType::Varchar); + self.next_token(); + } + _ => { + // default to bytea + header_inner_expect_type = Some(DataType::Bytea); + } + } + } + } + + let mut column_alias = None; if self.parse_keyword(Keyword::AS) { - let column_alias = self.parse_identifier()?; - options.push((add_column, Some(column_alias))); - } else { - options.push((add_column, None)); + column_alias = Some(self.parse_identifier()?); } + + options.push(IncludeOptionItem { + column_type, + inner_field: column_inner_field, + column_alias, + header_inner_expect_type, + }); } Ok(options) } diff --git a/src/storage/src/row_serde/mod.rs b/src/storage/src/row_serde/mod.rs index 96ef7be735244..2668d75bc1b93 100644 --- a/src/storage/src/row_serde/mod.rs +++ b/src/storage/src/row_serde/mod.rs @@ -95,7 +95,9 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_column_type: Normal, + additional_columns: AdditionalColumn { + column_type: None, + }, version: Pr13707, }, ColumnDesc { @@ -106,7 +108,9 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_column_type: Normal, + additional_columns: AdditionalColumn { + column_type: None, + }, version: Pr13707, }, ], @@ -137,7 +141,9 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_column_type: Normal, + additional_columns: AdditionalColumn { + column_type: None, + }, version: Pr13707, }, ColumnDesc { @@ -148,7 +154,9 @@ mod test { type_name: "", generated_or_default_column: None, description: None, - additional_column_type: Normal, + additional_columns: AdditionalColumn { + column_type: None, + }, version: Pr13707, }, ], diff --git a/src/stream/src/from_proto/source/trad_source.rs b/src/stream/src/from_proto/source/trad_source.rs index 29c7ba5e0158a..1b1801c6dea63 100644 --- a/src/stream/src/from_proto/source/trad_source.rs +++ b/src/stream/src/from_proto/source/trad_source.rs @@ -18,8 +18,10 @@ use risingwave_common::catalog::{ use risingwave_connector::source::reader::desc::SourceDescBuilder; use risingwave_connector::source::{ConnectorProperties, SourceCtrlOpts}; use risingwave_pb::data::data_type::TypeName as PbTypeName; +use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{ - AdditionalColumnType, ColumnDescVersion, FormatType, PbEncodeType, + AdditionalColumn, AdditionalColumnKey, AdditionalColumnTimestamp, ColumnDescVersion, + FormatType, PbEncodeType, }; use risingwave_pb::stream_plan::SourceNode; use risingwave_storage::panic_store::PanicStateStore; @@ -76,7 +78,11 @@ impl ExecutorBuilder for SourceExecutorBuilder { // the column is from a legacy version && desc.version == ColumnDescVersion::Unspecified as i32 { - desc.additional_column_type = AdditionalColumnType::Key as i32; + desc.additional_columns = Some(AdditionalColumn { + column_type: Some(AdditionalColumnType::Key( + AdditionalColumnKey {}, + )), + }); } }); }); @@ -86,7 +92,7 @@ impl ExecutorBuilder for SourceExecutorBuilder { { // compatible code: handle legacy column `_rw_kafka_timestamp` // the column is auto added for all kafka source to empower batch query on source - // solution: rewrite the column `additional_column_type` to Timestamp + // solution: rewrite the column `additional_columns` to Timestamp let _ = source_columns.iter_mut().map(|c| { let _ = c.column_desc.as_mut().map(|desc| { @@ -101,8 +107,11 @@ impl ExecutorBuilder for SourceExecutorBuilder { // the column is from a legacy version && desc.version == ColumnDescVersion::Unspecified as i32 { - desc.additional_column_type = - AdditionalColumnType::Timestamp as i32; + desc.additional_columns = Some(AdditionalColumn { + column_type: Some(AdditionalColumnType::Timestamp( + AdditionalColumnTimestamp {}, + )), + }); } }); });