diff --git a/e2e_test/source/basic/nosim_kafka.slt b/e2e_test/source/basic/nosim_kafka.slt index b773126c9a7c1..03d3df20d62cd 100644 --- a/e2e_test/source/basic/nosim_kafka.slt +++ b/e2e_test/source/basic/nosim_kafka.slt @@ -15,7 +15,9 @@ FORMAT UPSERT ENCODE AVRO (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}'); statement ok -CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) with ( +CREATE TABLE debezium_non_compact (PRIMARY KEY(order_id)) +INCLUDE TIMESTAMP +with ( connector = 'kafka', kafka.topic = 'debezium_non_compact_avro_json', kafka.brokers = '${RISEDEV_KAFKA_BOOTSTRAP_SERVERS}', diff --git a/src/compute/tests/integration_tests.rs b/src/compute/tests/integration_tests.rs index 13a76c6989b48..01c3bd055ca11 100644 --- a/src/compute/tests/integration_tests.rs +++ b/src/compute/tests/integration_tests.rs @@ -43,7 +43,7 @@ use risingwave_connector::source::reader::desc::test_utils::create_source_desc_b use risingwave_dml::dml_manager::DmlManager; use risingwave_hummock_sdk::to_committed_batch_query_epoch; use risingwave_pb::catalog::StreamSourceInfo; -use risingwave_pb::plan_common::PbRowFormatType; +use risingwave_pb::plan_common::{EncodeType, FormatType, PbRowFormatType}; use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::panic_store::PanicStateStore; use risingwave_storage::table::batch_table::storage_table::StorageTable; @@ -112,7 +112,8 @@ async fn test_table_materialize() -> StreamResult<()> { ], }; let source_info = StreamSourceInfo { - row_format: PbRowFormatType::Json as i32, + format: FormatType::Plain as i32, + row_encode: EncodeType::Json as i32, ..Default::default() }; let properties = convert_args!(btreemap!( diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 645220b401c5a..131ef4f0c11ae 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -25,7 +25,7 @@ use risingwave_pb::plan_common::{ AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition, AdditionalColumnPayload, AdditionalColumnTimestamp, - AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName, + AdditionalDatabaseName, AdditionalSchemaName, AdditionalTableName, FormatType, }; use crate::error::ConnectorResult; @@ -39,44 +39,111 @@ use crate::source::{ pub static COMMON_COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock> = LazyLock::new(|| HashSet::from(["partition", "offset"])); -pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock>> = - LazyLock::new(|| { - HashMap::from([ - ( - KAFKA_CONNECTOR, - HashSet::from([ - "key", - "timestamp", - "partition", - "offset", - "header", - "payload", - ]), - ), - ( - PULSAR_CONNECTOR, - HashSet::from(["key", "partition", "offset", "payload"]), - ), - ( - KINESIS_CONNECTOR, - HashSet::from(["key", "partition", "offset", "timestamp", "payload"]), - ), - ( - OPENDAL_S3_CONNECTOR, - HashSet::from(["file", "offset", "payload"]), - ), - (GCS_CONNECTOR, HashSet::from(["file", "offset", "payload"])), - ( - AZBLOB_CONNECTOR, - HashSet::from(["file", "offset", "payload"]), - ), - ( - POSIX_FS_CONNECTOR, - HashSet::from(["file", "offset", "payload"]), - ), - // mongodb-cdc doesn't support cdc backfill table - ( - MONGODB_CDC_CONNECTOR, +static KAFKA_COMMON_ADDITIONAL_COLUMNS: LazyLock> = LazyLock::new(|| { + HashSet::from([ + "key", + "timestamp", + "partition", + "offset", + "header", + "payload", + ]) +}); +static PULSAR_COMMON_ADDITIONAL_COLUMNS: LazyLock> = + LazyLock::new(|| HashSet::from(["key", "partition", "offset", "payload"])); +static KINESIS_COMMON_ADDITIONAL_COLUMNS: LazyLock> = + LazyLock::new(|| HashSet::from(["key", "partition", "offset", "timestamp", "payload"])); +static FS_COMMON_ADDITIONAL_COLUMNS: LazyLock> = + LazyLock::new(|| HashSet::from(["file", "offset", "payload"])); + +pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock< + HashMap<&'static str, HashMap>>, +> = LazyLock::new(|| { + HashMap::from([ + ( + KAFKA_CONNECTOR, + HashMap::from([ + (FormatType::Plain, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()), + (FormatType::Upsert, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()), + (FormatType::Maxwell, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()), + ( + // does not include `key` column because it is specified in the Debezium protocol + FormatType::Debezium, + HashSet::from(["timestamp", "partition", "offset", "header", "payload"]), + ), + (FormatType::Canal, KAFKA_COMMON_ADDITIONAL_COLUMNS.clone()), + ( + FormatType::DebeziumMongo, + HashSet::from(["timestamp", "partition", "offset", "header", "payload"]), + ), + ]), + ), + ( + PULSAR_CONNECTOR, + HashMap::from([ + (FormatType::Plain, PULSAR_COMMON_ADDITIONAL_COLUMNS.clone()), + (FormatType::Upsert, PULSAR_COMMON_ADDITIONAL_COLUMNS.clone()), + ( + FormatType::Maxwell, + PULSAR_COMMON_ADDITIONAL_COLUMNS.clone(), + ), + ( + // does not include `key` column because it is specified in the Debezium protocol + FormatType::Debezium, + HashSet::from(["partition", "offset", "payload"]), + ), + (FormatType::Canal, PULSAR_COMMON_ADDITIONAL_COLUMNS.clone()), + ( + FormatType::DebeziumMongo, + HashSet::from(["partition", "offset", "payload"]), + ), + ]), + ), + ( + KINESIS_CONNECTOR, + HashMap::from([ + (FormatType::Plain, KINESIS_COMMON_ADDITIONAL_COLUMNS.clone()), + ( + FormatType::Upsert, + KINESIS_COMMON_ADDITIONAL_COLUMNS.clone(), + ), + ( + FormatType::Maxwell, + KINESIS_COMMON_ADDITIONAL_COLUMNS.clone(), + ), + ( + // does not include `key` column because it is specified in the Debezium protocol + FormatType::Debezium, + HashSet::from(["timestamp", "partition", "offset", "payload"]), + ), + (FormatType::Canal, KINESIS_COMMON_ADDITIONAL_COLUMNS.clone()), + ( + FormatType::DebeziumMongo, + HashSet::from(["timestamp", "partition", "offset", "payload"]), + ), + ]), + ), + ( + OPENDAL_S3_CONNECTOR, + HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]), + ), + ( + GCS_CONNECTOR, + HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]), + ), + ( + AZBLOB_CONNECTOR, + HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]), + ), + ( + POSIX_FS_CONNECTOR, + HashMap::from([(FormatType::Plain, FS_COMMON_ADDITIONAL_COLUMNS.clone())]), + ), + // mongodb-cdc doesn't support cdc backfill table + ( + MONGODB_CDC_CONNECTOR, + HashMap::from([( + FormatType::DebeziumMongo, HashSet::from([ "timestamp", "partition", @@ -84,9 +151,10 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock>> = @@ -99,14 +167,17 @@ pub static CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS: LazyLock( + connector_name: &'a str, + format: &FormatType, is_cdc_backfill: bool, -) -> Option<&HashSet<&'static str>> { +) -> Option<&'a HashSet<&'static str>> { if is_cdc_backfill { CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS.as_ref() } else { - COMPATIBLE_ADDITIONAL_COLUMNS.get(connector_name) + COMPATIBLE_ADDITIONAL_COLUMNS + .get(connector_name) + .and_then(|map| map.get(format)) } } @@ -140,9 +211,10 @@ pub fn build_additional_column_desc( data_type: Option<&str>, reject_unknown_connector: bool, is_cdc_backfill_table: bool, + format_type: &FormatType, ) -> ConnectorResult { let compatible_columns = match ( - get_supported_additional_columns(connector_name, is_cdc_backfill_table), + get_supported_additional_columns(connector_name, format_type, is_cdc_backfill_table), reject_unknown_connector, ) { (Some(compat_cols), _) => compat_cols, @@ -277,6 +349,7 @@ pub fn build_additional_column_desc( pub fn source_add_partition_offset_cols( columns: &[ColumnCatalog], connector_name: &str, + format_type: &FormatType, ) -> ([bool; 2], [ColumnDesc; 2]) { let mut columns_exist = [false; 2]; let mut last_column_id = max_column_id(columns); @@ -284,6 +357,7 @@ pub fn source_add_partition_offset_cols( let additional_columns: Vec<_> = { let compat_col_types = COMPATIBLE_ADDITIONAL_COLUMNS .get(connector_name) + .and_then(|format_col_mapping| format_col_mapping.get(format_type)) .unwrap_or(&COMMON_COMPATIBLE_ADDITIONAL_COLUMNS); ["partition", "file", "offset"] .iter() @@ -300,6 +374,7 @@ pub fn source_add_partition_offset_cols( None, false, false, + format_type, ) .unwrap(), ) diff --git a/src/connector/src/source/reader/desc.rs b/src/connector/src/source/reader/desc.rs index af607d2537ea6..b2a3d47fa8cda 100644 --- a/src/connector/src/source/reader/desc.rs +++ b/src/connector/src/source/reader/desc.rs @@ -18,7 +18,7 @@ use risingwave_common::bail; use risingwave_common::catalog::ColumnCatalog; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::PbStreamSourceInfo; -use risingwave_pb::plan_common::PbColumnCatalog; +use risingwave_pb::plan_common::{FormatType, PbColumnCatalog}; #[expect(deprecated)] use super::fs_reader::FsSourceReader; @@ -90,8 +90,14 @@ impl SourceDescBuilder { .get(UPSTREAM_SOURCE_KEY) .map(|s| s.to_lowercase()) .unwrap(); - let (columns_exist, additional_columns) = - source_add_partition_offset_cols(&self.columns, &connector_name); + let (columns_exist, additional_columns) = source_add_partition_offset_cols( + &self.columns, + &connector_name, + self.source_info + .get_format() + .as_ref() + .unwrap_or(&FormatType::Unspecified), + ); let mut columns: Vec<_> = self .columns diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index aeaa026dffffe..e3dd9feb0c8d2 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -637,7 +637,14 @@ pub fn handle_addition_columns( ) -> Result<()> { let connector_name = with_properties.get_connector().unwrap(); // there must be a connector in source - if get_supported_additional_columns(connector_name.as_str(), is_cdc_backfill_table).is_none() + // CDC source will not pass the source_schema, and get the additional column set from + // `CDC_BACKFILL_TABLE_ADDITIONAL_COLUMNS`, does not check FORMAT compatibility. + // So set to FORMAT::Unspecified in this case. + let format_type = &source_schema + .and_then(|schema| format_to_prost(&schema.format).into()) + .unwrap_or(FormatType::Unspecified); + if get_supported_additional_columns(connector_name.as_str(), format_type, is_cdc_backfill_table) + .is_none() && !additional_columns.is_empty() { return Err(RwError::from(ProtocolError(format!( @@ -667,6 +674,7 @@ pub fn handle_addition_columns( data_type_name.as_deref(), true, is_cdc_backfill_table, + format_type, )?; columns.push(ColumnCatalog::visible(col)); } @@ -899,12 +907,6 @@ pub(crate) async fn bind_source_pk( } (Format::Debezium, Encode::Json) => { - if !additional_column_names.is_empty() { - return Err(RwError::from(ProtocolError(format!( - "FORMAT DEBEZIUM forbids additional columns, but got {:?}", - additional_column_names - )))); - } if !sql_defined_pk { return Err(RwError::from(ProtocolError( "Primary key must be specified when creating source with FORMAT DEBEZIUM." @@ -990,7 +992,11 @@ pub(crate) async fn bind_source_pk( } // Add a hidden column `_rw_kafka_timestamp` to each message from Kafka source. -fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut Vec) { +fn check_and_add_timestamp_column( + with_properties: &WithOptions, + columns: &mut Vec, + format_type: &FormatType, +) { if with_properties.is_kafka_connector() { if columns.iter().any(|col| { matches!( @@ -1012,6 +1018,7 @@ fn check_and_add_timestamp_column(with_properties: &WithOptions, columns: &mut V None, true, false, + format_type, ) .unwrap(); columns.push(ColumnCatalog::hidden(col)); @@ -1542,7 +1549,11 @@ pub async fn bind_create_source_or_table_with_connector( // compatible with the behavior that add a hidden column `_rw_kafka_timestamp` to each message from Kafka source if is_create_source { // must behind `handle_addition_columns` - check_and_add_timestamp_column(&with_properties, &mut columns); + check_and_add_timestamp_column( + &with_properties, + &mut columns, + &format_to_prost(&source_schema.format), + ); } // resolve privatelink connection for Kafka diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index d7808e4be51ce..9ea3cfa57f7ed 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -50,6 +50,7 @@ impl StreamSource { let (columns_exist, additional_columns) = source_add_partition_offset_cols( &core.column_catalog, &source_catalog.connector_name(), + &source_catalog.info.get_format().unwrap(), ); for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { if !existed { diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 83c79259952b2..114d81ee186c9 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -22,6 +22,7 @@ use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::sort_util::OrderType; use risingwave_connector::parser::additional_columns::source_add_partition_offset_cols; +use risingwave_pb::plan_common::FormatType; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::PbStreamNode; @@ -61,6 +62,11 @@ impl StreamSourceScan { let (columns_exist, additional_columns) = source_add_partition_offset_cols( &core.column_catalog, &source_catalog.connector_name(), + source_catalog + .info + .get_format() + .as_ref() + .unwrap_or(&FormatType::Unspecified), ); for (existed, c) in columns_exist.into_iter().zip_eq_fast(additional_columns) { if !existed {