From 8a89ddb07818c4cf71487106f58f596dd8073909 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 31 May 2024 16:53:58 +0800 Subject: [PATCH] database_name and table_name meta column --- proto/plan_common.proto | 12 +++++ .../src/parser/additional_columns.rs | 53 +++++++++++++++++-- src/connector/src/parser/mod.rs | 16 ++++-- src/connector/src/parser/unified/debezium.rs | 20 +++++++ src/connector/src/parser/util.rs | 29 ++++++++-- .../src/source/cdc/source/message.rs | 38 ++++++++++--- src/prost/build.rs | 6 +++ 7 files changed, 156 insertions(+), 18 deletions(-) diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 8c73374a4d7d2..80232742edc0f 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -203,6 +203,14 @@ message AdditionalColumnHeader { data.DataType data_type = 2; } + +message AdditionalDatabaseName {} + +message AdditionalTableName {} + +message AdditionalCollectionName {} + + // this type means we read all headers as a whole message AdditionalColumnHeaders {} @@ -215,6 +223,10 @@ message AdditionalColumn { AdditionalColumnHeader header_inner = 5; AdditionalColumnFilename filename = 6; AdditionalColumnHeaders headers = 7; + // metadata column for cdc table + AdditionalDatabaseName database_name = 8; + AdditionalTableName table_name = 9; + AdditionalCollectionName collection_name = 10; } } diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 012b7214c7e70..c054ecaf6454a 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -22,9 +22,10 @@ use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType as PbDataType; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{ - AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders, - AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition, - AdditionalColumnTimestamp, + AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, + AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset, + AdditionalColumnPartition, AdditionalColumnTimestamp, AdditionalDatabaseName, + AdditionalTableName, }; use crate::error::ConnectorResult; @@ -66,7 +67,14 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock>> = - LazyLock::new(|| Some(HashSet::from(["timestamp"]))); + LazyLock::new(|| { + Some(HashSet::from([ + "timestamp", + "database_name", + "table_name", + "collection_name", + ])) + }); pub fn get_supported_additional_columns( connector_name: &str, @@ -201,6 +209,43 @@ pub fn build_additional_column_catalog( is_hidden: false, }, "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type), + "database_name" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::DatabaseName( + AdditionalDatabaseName {}, + )), + }, + ), + is_hidden: false, + }, + "table_name" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})), + }, + ), + is_hidden: false, + }, + "collection_name" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::CollectionName( + AdditionalCollectionName {}, + )), + }, + ), + is_hidden: false, + }, _ => unreachable!(), }; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 2c0643af67109..5f06db0d2596e 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -54,7 +54,7 @@ use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::maxwell::MaxwellParser; use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder; use crate::parser::util::{ - extract_header_inner_from_meta, extract_headers_from_meta, extreact_timestamp_from_meta, + extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta, }; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::schema::InvalidOptionError; @@ -400,12 +400,22 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap(), // handled all match cases in internal match, unwrap is safe )); } - (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta { + ( + _, + &Some(ref col @ AdditionalColumnType::DatabaseName(_)) + | &Some(ref col @ AdditionalColumnType::TableName(_)) + | &Some(ref col @ AdditionalColumnType::Timestamp(_)), + ) => match self.row_meta { Some(row_meta) => Ok(A::output_for( - extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None), + extract_cdc_meta_column(row_meta.meta, col, desc.name.as_str())? + .unwrap_or(None), )), None => parse_field(desc), // parse from payload }, + (_, &Some(AdditionalColumnType::CollectionName(_))) => { + // collection name for `mongodb-cdc` should be parsed from the message payload + parse_field(desc) + } (_, &Some(AdditionalColumnType::Partition(_))) => { // the meta info does not involve spec connector return Ok(A::output_for( diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 966c5f167474c..bce64e6f19d63 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -60,8 +60,13 @@ pub struct DebeziumChangeEvent { const BEFORE: &str = "before"; const AFTER: &str = "after"; + const SOURCE: &str = "source"; const SOURCE_TS_MS: &str = "ts_ms"; +const SOURCE_DB: &str = "db"; +const SOURCE_TABLE: &str = "table"; +const SOURCE_COLLECTION: &str = "collection"; + const OP: &str = "op"; pub const TRANSACTION_STATUS: &str = "status"; pub const TRANSACTION_ID: &str = "id"; @@ -210,6 +215,21 @@ where .to_scalar_value() })) } + &ColumnType::DatabaseName(_) => self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_DB], Some(&desc.data_type)), + &ColumnType::TableName(_) => self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_TABLE], Some(&desc.data_type)), + &ColumnType::CollectionName(_) => self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_COLLECTION], Some(&desc.data_type)), _ => Err(AccessError::UnsupportedAdditionalColumn { name: desc.name.clone(), }), diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 3b416ef1309e9..5b2a2e01ac08c 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; use std::collections::HashMap; use anyhow::Context; @@ -38,6 +39,10 @@ macro_rules! log_error { }; } pub(crate) use log_error; +use risingwave_pb::plan_common::additional_column; +use risingwave_pb::plan_common::additional_column::ColumnType; + +use crate::parser::{AccessError, AccessResult}; /// get kafka topic name pub(super) fn get_kafka_topic(props: &HashMap) -> ConnectorResult<&String> { @@ -127,11 +132,25 @@ pub(super) async fn bytes_from_url( } } -pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { - match meta { - SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), - SourceMeta::DebeziumCdc(debezium_meta) => debezium_meta.extract_timestamp(), - _ => None, +pub fn extract_cdc_meta_column( + meta: &SourceMeta, + column_type: &additional_column::ColumnType, + column_name: &str, +) -> AccessResult> { + assert_matches!(meta, &SourceMeta::DebeziumCdc(_)); + + let cdc_meta = match meta { + SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta, + _ => unreachable!(), + }; + + match column_type { + ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()), + ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()), + ColumnType::TableName(_) => Ok(cdc_meta.extract_table_name()), + _ => Err(AccessError::UnsupportedAdditionalColumn { + name: column_name.to_string(), + }), } } diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index f45a4e37e8ca8..e441f5d0aafe9 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{Datum, Scalar, Timestamptz}; +use risingwave_common::types::{Datum, Scalar, ScalarImpl, Timestamptz}; use risingwave_pb::connector_service::CdcMessage; use crate::source::base::SourceMessage; @@ -20,6 +20,8 @@ use crate::source::SourceMeta; #[derive(Debug, Clone)] pub struct DebeziumCdcMeta { + db_name_prefix_len: usize, + pub full_table_name: String, // extracted from `payload.source.ts_ms`, the time that the change event was made in the database pub source_ts_ms: i64, @@ -36,6 +38,30 @@ impl DebeziumCdcMeta { ) .into() } + + pub fn extract_database_name(&self) -> Option { + Some(ScalarImpl::from( + self.full_table_name.as_str()[0..self.db_name_prefix_len].to_string(), + )) + .into() + } + + pub fn extract_table_name(&self) -> Option { + Some(ScalarImpl::from( + self.full_table_name.as_str()[self.db_name_prefix_len..].to_string(), + )) + .into() + } + + pub fn new(full_table_name: String, source_ts_ms: i64, is_transaction_meta: bool) -> Self { + let db_name_prefix_len = full_table_name.as_str().find('.').unwrap_or(0); + Self { + db_name_prefix_len, + full_table_name, + source_ts_ms, + is_transaction_meta, + } + } } impl From for SourceMessage { @@ -53,11 +79,11 @@ impl From for SourceMessage { }, offset: message.offset, split_id: message.partition.into(), - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: message.full_table_name, - source_ts_ms: message.source_ts_ms, - is_transaction_meta: message.is_transaction_meta, - }), + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( + message.full_table_name, + message.source_ts_ms, + message.is_transaction_meta, + )), } } } diff --git a/src/prost/build.rs b/src/prost/build.rs index 67284d844cc3e..b5948e1fbbf40 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -139,6 +139,12 @@ fn main() -> Result<(), Box> { .type_attribute("plan_common.AdditionalColumnHeader", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AdditionalColumnHeaders", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AdditionalColumnOffset", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalDatabaseName", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalTableName", "#[derive(Eq, Hash)]") + .type_attribute( + "plan_common.AdditionalCollectionName", + "#[derive(Eq, Hash)]", + ) .type_attribute("common.ColumnOrder", "#[derive(Eq, Hash)]") .type_attribute("common.OrderType", "#[derive(Eq, Hash)]") .type_attribute("common.Buffer", "#[derive(Eq)]")