From e4f1c485aba41cbcbc5161ec7a4a286bd1e31c8a Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 5 Jun 2024 17:05:48 +0800 Subject: [PATCH] feat(cdc): support more metadata columns for MySQL, PG and MongoDB (#17051) --- e2e_test/source/cdc/cdc.share_stream.slt | 27 ++++++- e2e_test/source/cdc/mongodb/mongodb_basic.slt | 13 +++- proto/plan_common.proto | 13 ++++ src/compute/tests/cdc_tests.rs | 6 +- .../src/parser/additional_columns.rs | 73 +++++++++++++++++-- src/connector/src/parser/mod.rs | 29 +++++++- src/connector/src/parser/plain_parser.rs | 36 ++++----- src/connector/src/parser/unified/debezium.rs | 30 +++++++- src/connector/src/parser/util.rs | 22 +++++- src/connector/src/source/cdc/external/mod.rs | 36 ++++----- .../src/source/cdc/external/postgres.rs | 25 ++++--- .../src/source/cdc/source/message.rs | 39 ++++++++-- src/prost/build.rs | 7 ++ .../src/executor/backfill/cdc/cdc_backfill.rs | 4 + .../backfill/cdc/upstream_table/external.rs | 8 ++ .../backfill/cdc/upstream_table/snapshot.rs | 54 ++++++++++++-- src/stream/src/from_proto/stream_cdc_scan.rs | 15 +++- 17 files changed, 353 insertions(+), 84 deletions(-) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index d30d9c53dc6f..3dc26d98c628 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -40,7 +40,10 @@ create table rw.products_test ( id INT, name STRING, description STRING, PRIMARY KEY (id) -) include timestamp as commit_ts from mysql_mytest table 'mytest.products'; +) include timestamp as commit_ts +include database_name as database_name +include table_name as table_name +from mysql_mytest table 'mytest.products'; # sleep to ensure (default,'Milk','Milk is a white liquid food') is consumed from Debezium message instead of backfill. sleep 10s @@ -153,6 +156,14 @@ SELECT id,name,description FROM rw.products_test order by id limit 3 102 car battery 12V car battery 103 12-pack drill bits 12-pack of drill bits with sizes ranging from #40 to #3 +query TT +select database_name, table_name from rw.products_test limit 3; +---- +mytest products +mytest products +mytest products + + # commit_ts of historical records should be '1970-01-01 00:00:00+00:00' query I SELECT count(*) as cnt from rw.products_test where commit_ts = '1970-01-01 00:00:00+00:00' @@ -247,7 +258,11 @@ CREATE TABLE person_new ( credit_card varchar, city varchar, PRIMARY KEY (id) -) INCLUDE TIMESTAMP AS commit_ts FROM pg_source TABLE 'public.person'; +) INCLUDE TIMESTAMP AS commit_ts +INCLUDE DATABASE_NAME as database_name +INCLUDE SCHEMA_NAME as schema_name +INCLUDE TABLE_NAME as table_name +FROM pg_source TABLE 'public.person'; statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; @@ -276,6 +291,14 @@ SELECT * from person_new_cnt ---- 6 +query TTT +SELECT database_name,schema_name,table_name from person_new limit 3; +---- +cdc_test public person +cdc_test public person +cdc_test public person + + query ITTTT SELECT id,name,email_address,credit_card,city from person_new order by id; ---- diff --git a/e2e_test/source/cdc/mongodb/mongodb_basic.slt b/e2e_test/source/cdc/mongodb/mongodb_basic.slt index 9eaad3cca41a..ccbd65400298 100644 --- a/e2e_test/source/cdc/mongodb/mongodb_basic.slt +++ b/e2e_test/source/cdc/mongodb/mongodb_basic.slt @@ -2,7 +2,11 @@ control substitution on statement ok -CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) INCLUDE TIMESTAMP as commit_ts WITH ( +CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) +INCLUDE TIMESTAMP as commit_ts +INCLUDE DATABASE_NAME as database_name +INCLUDE COLLECTION_NAME as collection_name +WITH ( connector = 'mongodb-cdc', mongodb.url = 'mongodb://mongodb:27017/?replicaSet=rs0', collection.name = 'random_data.*' @@ -30,5 +34,12 @@ select count(*) from users where commit_ts = '1970-01-01 00:00:00+00:00'; ---- 55 +query TT +select database_name, collection_name FROM users LIMIT 2; +---- +random_data users +random_data users + + statement ok DROP TABLE users cascade diff --git a/proto/plan_common.proto b/proto/plan_common.proto index 8c73374a4d7d..31718ed9ac5c 100644 --- a/proto/plan_common.proto +++ b/proto/plan_common.proto @@ -203,6 +203,15 @@ message AdditionalColumnHeader { data.DataType data_type = 2; } +// metadata column for cdc table +message AdditionalDatabaseName {} + +message AdditionalSchemaName {} + +message AdditionalTableName {} + +message AdditionalCollectionName {} + // this type means we read all headers as a whole message AdditionalColumnHeaders {} @@ -215,6 +224,10 @@ message AdditionalColumn { AdditionalColumnHeader header_inner = 5; AdditionalColumnFilename filename = 6; AdditionalColumnHeaders headers = 7; + AdditionalDatabaseName database_name = 8; + AdditionalSchemaName schema_name = 9; + AdditionalTableName table_name = 10; + AdditionalCollectionName collection_name = 11; } } diff --git a/src/compute/tests/cdc_tests.rs b/src/compute/tests/cdc_tests.rs index 628c2ca22cf4..03045fb4e0f6 100644 --- a/src/compute/tests/cdc_tests.rs +++ b/src/compute/tests/cdc_tests.rs @@ -169,7 +169,10 @@ async fn test_cdc_backfill() -> StreamResult<()> { MySqlOffset::new(binlog_file.clone(), 10), ]; - let table_name = SchemaTableName::new("mock_table".to_string(), "public".to_string()); + let table_name = SchemaTableName { + schema_name: "public".to_string(), + table_name: "mock_table".to_string(), + }; let table_schema = Schema::new(vec![ Field::with_name(DataType::Int64, "id"), // primary key Field::with_name(DataType::Float64, "price"), @@ -179,6 +182,7 @@ async fn test_cdc_backfill() -> StreamResult<()> { let external_table = ExternalStorageTable::new( TableId::new(1234), table_name, + "mydb".to_string(), ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)), table_schema.clone(), table_pk_order_types, diff --git a/src/connector/src/parser/additional_columns.rs b/src/connector/src/parser/additional_columns.rs index 012b7214c7e7..253718a00a7d 100644 --- a/src/connector/src/parser/additional_columns.rs +++ b/src/connector/src/parser/additional_columns.rs @@ -22,9 +22,10 @@ use risingwave_pb::data::data_type::TypeName; use risingwave_pb::data::DataType as PbDataType; use risingwave_pb::plan_common::additional_column::ColumnType as AdditionalColumnType; use risingwave_pb::plan_common::{ - AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, AdditionalColumnHeaders, - AdditionalColumnKey, AdditionalColumnOffset, AdditionalColumnPartition, - AdditionalColumnTimestamp, + AdditionalCollectionName, AdditionalColumn, AdditionalColumnFilename, AdditionalColumnHeader, + AdditionalColumnHeaders, AdditionalColumnKey, AdditionalColumnOffset, + AdditionalColumnPartition, AdditionalColumnTimestamp, AdditionalDatabaseName, + AdditionalSchemaName, AdditionalTableName, }; use crate::error::ConnectorResult; @@ -59,14 +60,27 @@ pub static COMPATIBLE_ADDITIONAL_COLUMNS: LazyLock>> = - LazyLock::new(|| Some(HashSet::from(["timestamp"]))); + LazyLock::new(|| { + Some(HashSet::from([ + "timestamp", + "database_name", + "schema_name", + "table_name", + ])) + }); pub fn get_supported_additional_columns( connector_name: &str, @@ -201,6 +215,55 @@ pub fn build_additional_column_catalog( is_hidden: false, }, "header" => build_header_catalog(column_id, &column_name, inner_field_name, data_type), + "database_name" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::DatabaseName( + AdditionalDatabaseName {}, + )), + }, + ), + is_hidden: false, + }, + "schema_name" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::SchemaName(AdditionalSchemaName {})), + }, + ), + is_hidden: false, + }, + + "table_name" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::TableName(AdditionalTableName {})), + }, + ), + is_hidden: false, + }, + "collection_name" => ColumnCatalog { + column_desc: ColumnDesc::named_with_additional_column( + column_name, + column_id, + DataType::Varchar, + AdditionalColumn { + column_type: Some(AdditionalColumnType::CollectionName( + AdditionalCollectionName {}, + )), + }, + ), + is_hidden: false, + }, _ => unreachable!(), }; diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index e173ef224e79..a62fec8753d9 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -56,7 +56,8 @@ use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::maxwell::MaxwellParser; use crate::parser::simd_json_parser::DebeziumMongoJsonAccessBuilder; use crate::parser::util::{ - extract_header_inner_from_meta, extract_headers_from_meta, extreact_timestamp_from_meta, + extract_cdc_meta_column, extract_header_inner_from_meta, extract_headers_from_meta, + extreact_timestamp_from_meta, }; use crate::schema::schema_registry::SchemaRegistryAuth; use crate::source::monitor::GLOBAL_SOURCE_METRICS; @@ -401,12 +402,38 @@ impl SourceStreamChunkRowWriter<'_> { .unwrap(), // handled all match cases in internal match, unwrap is safe )); } + + ( + _, // for cdc tables + &Some(ref col @ AdditionalColumnType::DatabaseName(_)) + | &Some(ref col @ AdditionalColumnType::TableName(_)), + ) => { + match self.row_meta { + Some(row_meta) => { + if let SourceMeta::DebeziumCdc(cdc_meta) = row_meta.meta { + Ok(A::output_for( + extract_cdc_meta_column(cdc_meta, col, desc.name.as_str())? + .unwrap_or(None), + )) + } else { + Err(AccessError::Uncategorized { + message: "CDC metadata not found in the message".to_string(), + }) + } + } + None => parse_field(desc), // parse from payload + } + } (_, &Some(AdditionalColumnType::Timestamp(_))) => match self.row_meta { Some(row_meta) => Ok(A::output_for( extreact_timestamp_from_meta(row_meta.meta).unwrap_or(None), )), None => parse_field(desc), // parse from payload }, + (_, &Some(AdditionalColumnType::CollectionName(_))) => { + // collection name for `mongodb-cdc` should be parsed from the message payload + parse_field(desc) + } (_, &Some(AdditionalColumnType::Partition(_))) => { // the meta info does not involve spec connector return Ok(A::output_for( diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index a0c5bee56868..6fc2b1b935ec 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -278,11 +278,11 @@ mod tests { if i == 0 { // put begin message at first source_msg_batch.push(SourceMessage { - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: "orders".to_string(), - source_ts_ms: 0, - is_transaction_meta: transactional, - }), + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( + "orders".to_string(), + 0, + transactional, + )), split_id: SplitId::from("1001"), offset: "0".into(), key: None, @@ -292,11 +292,11 @@ mod tests { // put data messages for data_msg in batch { source_msg_batch.push(SourceMessage { - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: "orders".to_string(), - source_ts_ms: 0, - is_transaction_meta: false, - }), + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( + "orders".to_string(), + 0, + false, + )), split_id: SplitId::from("1001"), offset: "0".into(), key: None, @@ -306,11 +306,11 @@ mod tests { if i == data_batches.len() - 1 { // put commit message at last source_msg_batch.push(SourceMessage { - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: "orders".to_string(), - source_ts_ms: 0, - is_transaction_meta: transactional, - }), + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( + "orders".to_string(), + 0, + transactional, + )), split_id: SplitId::from("1001"), offset: "0".into(), key: None, @@ -355,11 +355,7 @@ mod tests { let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; - let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: "orders".to_string(), - source_ts_ms: 0, - is_transaction_meta: true, - }); + let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new("orders".to_string(), 0, true)); let msg_meta = MessageMeta { meta: &cdc_meta, split_id: "1001", diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index 0a7e069a86b3..a353a39369c3 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -60,8 +60,14 @@ pub struct DebeziumChangeEvent { const BEFORE: &str = "before"; const AFTER: &str = "after"; + const SOURCE: &str = "source"; const SOURCE_TS_MS: &str = "ts_ms"; +const SOURCE_DB: &str = "db"; +const SOURCE_SCHEMA: &str = "schema"; +const SOURCE_TABLE: &str = "table"; +const SOURCE_COLLECTION: &str = "collection"; + const OP: &str = "op"; pub const TRANSACTION_STATUS: &str = "status"; pub const TRANSACTION_ID: &str = "id"; @@ -188,8 +194,8 @@ where .access(&[AFTER, &desc.name], &desc.data_type) }, |additional_column_type| { - match additional_column_type { - &ColumnType::Timestamp(_) => { + match *additional_column_type { + ColumnType::Timestamp(_) => { // access payload.source.ts_ms let ts_ms = self .value_accessor @@ -202,6 +208,26 @@ where .to_scalar_value() })) } + ColumnType::DatabaseName(_) => self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_DB], &desc.data_type), + ColumnType::SchemaName(_) => self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_SCHEMA], &desc.data_type), + ColumnType::TableName(_) => self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_TABLE], &desc.data_type), + ColumnType::CollectionName(_) => self + .value_accessor + .as_ref() + .expect("value_accessor must be provided for upsert operation") + .access(&[SOURCE, SOURCE_COLLECTION], &desc.data_type), _ => Err(AccessError::UnsupportedAdditionalColumn { name: desc.name.clone(), }), diff --git a/src/connector/src/parser/util.rs b/src/connector/src/parser/util.rs index 3b416ef1309e..d79c483dbe92 100644 --- a/src/connector/src/parser/util.rs +++ b/src/connector/src/parser/util.rs @@ -38,6 +38,11 @@ macro_rules! log_error { }; } pub(crate) use log_error; +use risingwave_pb::plan_common::additional_column; +use risingwave_pb::plan_common::additional_column::ColumnType; + +use crate::parser::{AccessError, AccessResult}; +use crate::source::cdc::DebeziumCdcMeta; /// get kafka topic name pub(super) fn get_kafka_topic(props: &HashMap) -> ConnectorResult<&String> { @@ -130,11 +135,26 @@ pub(super) async fn bytes_from_url( pub fn extreact_timestamp_from_meta(meta: &SourceMeta) -> Option { match meta { SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_timestamp(), - SourceMeta::DebeziumCdc(debezium_meta) => debezium_meta.extract_timestamp(), + SourceMeta::DebeziumCdc(cdc_meta) => cdc_meta.extract_timestamp(), _ => None, } } +pub fn extract_cdc_meta_column( + cdc_meta: &DebeziumCdcMeta, + column_type: &additional_column::ColumnType, + column_name: &str, +) -> AccessResult> { + match column_type { + ColumnType::Timestamp(_) => Ok(cdc_meta.extract_timestamp()), + ColumnType::DatabaseName(_) => Ok(cdc_meta.extract_database_name()), + ColumnType::TableName(_) => Ok(cdc_meta.extract_table_name()), + _ => Err(AccessError::UnsupportedAdditionalColumn { + name: column_name.to_string(), + }), + } +} + pub fn extract_headers_from_meta(meta: &SourceMeta) -> Option { match meta { SourceMeta::Kafka(kafka_meta) => kafka_meta.extract_headers(), /* expect output of type `array[struct]` */ diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 32a1b1ccfa80..aab3dfbee7df 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -67,18 +67,17 @@ impl CdcTableType { pub async fn create_table_reader( &self, - with_properties: HashMap, + config: ExternalTableConfig, schema: Schema, pk_indices: Vec, scan_limit: u32, ) -> ConnectorResult { match self { Self::MySql => Ok(ExternalTableReaderImpl::MySql( - MySqlExternalTableReader::new(with_properties, schema).await?, + MySqlExternalTableReader::new(config, schema).await?, )), Self::Postgres => Ok(ExternalTableReaderImpl::Postgres( - PostgresExternalTableReader::new(with_properties, schema, pk_indices, scan_limit) - .await?, + PostgresExternalTableReader::new(config, schema, pk_indices, scan_limit).await?, )), _ => bail!("invalid external table type: {:?}", *self), } @@ -97,13 +96,6 @@ pub const SCHEMA_NAME_KEY: &str = "schema.name"; pub const DATABASE_NAME_KEY: &str = "database.name"; impl SchemaTableName { - pub fn new(schema_name: String, table_name: String) -> Self { - Self { - schema_name, - table_name, - } - } - pub fn from_properties(properties: &HashMap) -> Self { let table_type = CdcTableType::from_properties(properties); let table_name = properties.get(TABLE_NAME_KEY).cloned().unwrap_or_default(); @@ -309,15 +301,7 @@ impl ExternalTableReader for MySqlExternalTableReader { } impl MySqlExternalTableReader { - pub async fn new( - with_properties: HashMap, - rw_schema: Schema, - ) -> ConnectorResult { - let config = serde_json::from_value::( - serde_json::to_value(with_properties).unwrap(), - ) - .context("failed to extract mysql connector properties")?; - + pub async fn new(config: ExternalTableConfig, rw_schema: Schema) -> ConnectorResult { let mut opts_builder = mysql_async::OptsBuilder::default() .user(Some(config.username)) .pass(Some(config.password)) @@ -394,6 +378,7 @@ impl MySqlExternalTableReader { ) }; + tracing::debug!("snapshot sql: {}", sql); let mut conn = self.conn.lock().await; // Set session timezone to UTC @@ -571,6 +556,7 @@ impl ExternalTableReaderImpl { #[cfg(test)] mod tests { + use std::collections::HashMap; use futures::pin_mut; use futures_async_stream::for_await; @@ -579,7 +565,8 @@ mod tests { use risingwave_common::types::DataType; use crate::source::cdc::external::{ - CdcOffset, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, SchemaTableName, + CdcOffset, ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, MySqlOffset, + SchemaTableName, }; #[test] @@ -629,7 +616,7 @@ mod tests { let rw_schema = Schema { fields: columns.iter().map(Field::from).collect(), }; - let props = convert_args!(hashmap!( + let props: HashMap = convert_args!(hashmap!( "hostname" => "localhost", "port" => "8306", "username" => "root", @@ -637,7 +624,10 @@ mod tests { "database.name" => "mytest", "table.name" => "t1")); - let reader = MySqlExternalTableReader::new(props, rw_schema) + let config = + serde_json::from_value::(serde_json::to_value(props).unwrap()) + .unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema) .await .unwrap(); let offset = reader.current_cdc_offset().await.unwrap(); diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 25bf884096b6..6b937b713ccd 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::HashMap; use anyhow::Context; use futures::stream::BoxStream; @@ -116,7 +115,7 @@ impl ExternalTableReader for PostgresExternalTableReader { impl PostgresExternalTableReader { pub async fn new( - properties: HashMap, + config: ExternalTableConfig, rw_schema: Schema, pk_indices: Vec, scan_limit: u32, @@ -127,11 +126,6 @@ impl PostgresExternalTableReader { "create postgres external table reader" ); - let config = serde_json::from_value::( - serde_json::to_value(properties.clone()).unwrap(), - ) - .context("failed to extract postgres connector properties")?; - let mut pg_config = tokio_postgres::Config::new(); pg_config .user(&config.username) @@ -192,7 +186,10 @@ impl PostgresExternalTableReader { .map(|i| rw_schema.fields[*i].name.clone()) .collect_vec(); - let table_name = SchemaTableName::from_properties(&properties); + let table_name = SchemaTableName { + schema_name: config.schema.clone(), + table_name: config.table.clone(), + }; let order_key = primary_keys.iter().join(","); let scan_sql = format!( "SELECT {} FROM {} WHERE {} ORDER BY {} LIMIT {scan_limit}", @@ -301,6 +298,8 @@ impl PostgresExternalTableReader { #[cfg(test)] mod tests { + use std::collections::HashMap; + use futures::pin_mut; use futures_async_stream::for_await; use maplit::{convert_args, hashmap}; @@ -309,7 +308,7 @@ mod tests { use risingwave_common::types::{DataType, ScalarImpl}; use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset}; - use crate::source::cdc::external::{ExternalTableReader, SchemaTableName}; + use crate::source::cdc::external::{ExternalTableConfig, ExternalTableReader, SchemaTableName}; #[test] fn test_postgres_offset() { @@ -351,7 +350,7 @@ mod tests { fields: columns.iter().map(Field::from).collect(), }; - let props = convert_args!(hashmap!( + let props: HashMap = convert_args!(hashmap!( "hostname" => "localhost", "port" => "8432", "username" => "myuser", @@ -359,7 +358,11 @@ mod tests { "database.name" => "mydb", "schema.name" => "public", "table.name" => "t1")); - let reader = PostgresExternalTableReader::new(props, rw_schema, vec![0, 1], 1000) + + let config = + serde_json::from_value::(serde_json::to_value(props).unwrap()) + .unwrap(); + let reader = PostgresExternalTableReader::new(config, rw_schema, vec![0, 1], 1000) .await .unwrap(); diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index f45a4e37e8ca..e74ed55ce1f9 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::types::{Datum, Scalar, Timestamptz}; +use risingwave_common::types::{Datum, Scalar, ScalarImpl, Timestamptz}; use risingwave_pb::connector_service::CdcMessage; use crate::source::base::SourceMessage; @@ -20,6 +20,8 @@ use crate::source::SourceMeta; #[derive(Debug, Clone)] pub struct DebeziumCdcMeta { + db_name_prefix_len: usize, + pub full_table_name: String, // extracted from `payload.source.ts_ms`, the time that the change event was made in the database pub source_ts_ms: i64, @@ -36,6 +38,31 @@ impl DebeziumCdcMeta { ) .into() } + + pub fn extract_database_name(&self) -> Option { + Some(ScalarImpl::from( + self.full_table_name.as_str()[0..self.db_name_prefix_len].to_string(), + )) + .into() + } + + pub fn extract_table_name(&self) -> Option { + Some(ScalarImpl::from( + self.full_table_name.as_str()[self.db_name_prefix_len..].to_string(), + )) + .into() + } + + pub fn new(full_table_name: String, source_ts_ms: i64, is_transaction_meta: bool) -> Self { + // full_table_name is in the format of `database_name.table_name` + let db_name_prefix_len = full_table_name.as_str().find('.').unwrap_or(0); + Self { + db_name_prefix_len, + full_table_name, + source_ts_ms, + is_transaction_meta, + } + } } impl From for SourceMessage { @@ -53,11 +80,11 @@ impl From for SourceMessage { }, offset: message.offset, split_id: message.partition.into(), - meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta { - full_table_name: message.full_table_name, - source_ts_ms: message.source_ts_ms, - is_transaction_meta: message.is_transaction_meta, - }), + meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( + message.full_table_name, + message.source_ts_ms, + message.is_transaction_meta, + )), } } } diff --git a/src/prost/build.rs b/src/prost/build.rs index 5f36e26f4bcb..7d17da39cd01 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -139,6 +139,13 @@ fn main() -> Result<(), Box> { .type_attribute("plan_common.AdditionalColumnHeader", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AdditionalColumnHeaders", "#[derive(Eq, Hash)]") .type_attribute("plan_common.AdditionalColumnOffset", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalDatabaseName", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalSchemaName", "#[derive(Eq, Hash)]") + .type_attribute("plan_common.AdditionalTableName", "#[derive(Eq, Hash)]") + .type_attribute( + "plan_common.AdditionalCollectionName", + "#[derive(Eq, Hash)]", + ) .type_attribute("common.ColumnOrder", "#[derive(Eq, Hash)]") .type_attribute("common.OrderType", "#[derive(Eq, Hash)]") .type_attribute("common.Buffer", "#[derive(Eq)]") diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index 8caffd59b309..168e619cc956 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -137,6 +137,8 @@ impl CdcBackfillExecutor { let upstream_table_id = self.external_table.table_id().table_id; let upstream_table_name = self.external_table.qualified_table_name(); + let schema_table_name = self.external_table.schema_table_name().clone(); + let external_database_name = self.external_table.database_name().to_owned(); let upstream_table_reader = UpstreamTableReader::new(self.external_table); let additional_columns = self @@ -264,6 +266,8 @@ impl CdcBackfillExecutor { self.rate_limit_rps, pk_indices.clone(), additional_columns.clone(), + schema_table_name.clone(), + external_database_name.clone(), ); let right_snapshot = pin!(upstream_table_reader diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs index e486f8f9bfc4..bd99eb59821e 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/external.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/external.rs @@ -26,6 +26,8 @@ pub struct ExternalStorageTable { schema_name: String, + database_name: String, + table_reader: ExternalTableReaderImpl, /// The schema of the output columns, i.e., this table VIEWED BY some executor like @@ -47,6 +49,7 @@ impl ExternalStorageTable { table_name, schema_name, }: SchemaTableName, + database_name: String, table_reader: ExternalTableReaderImpl, schema: Schema, pk_order_types: Vec, @@ -56,6 +59,7 @@ impl ExternalStorageTable { table_id, table_name, schema_name, + database_name, table_reader, schema, pk_order_types, @@ -93,4 +97,8 @@ impl ExternalStorageTable { pub fn qualified_table_name(&self) -> String { format!("{}.{}", self.schema_name, self.table_name) } + + pub fn database_name(&self) -> &str { + self.database_name.as_str() + } } diff --git a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs index 548b6a5feec7..bc54772f498f 100644 --- a/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs +++ b/src/stream/src/executor/backfill/cdc/upstream_table/snapshot.rs @@ -23,9 +23,11 @@ use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::ColumnDesc; use risingwave_common::row::OwnedRow; -use risingwave_common::types::{Scalar, Timestamptz}; +use risingwave_common::types::{Scalar, ScalarImpl, Timestamptz}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; -use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReader}; +use risingwave_connector::source::cdc::external::{ + CdcOffset, ExternalTableReader, SchemaTableName, +}; use risingwave_pb::plan_common::additional_column::ColumnType; use super::external::ExternalStorageTable; @@ -45,12 +47,14 @@ pub trait UpstreamTableRead { ) -> impl Future>> + Send + '_; } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct SnapshotReadArgs { pub current_pos: Option, pub rate_limit_rps: Option, pub pk_indices: Vec, pub additional_columns: Vec, + pub schema_table_name: SchemaTableName, + pub database_name: String, } impl SnapshotReadArgs { @@ -59,12 +63,16 @@ impl SnapshotReadArgs { rate_limit_rps: Option, pk_indices: Vec, additional_columns: Vec, + schema_table_name: SchemaTableName, + database_name: String, ) -> Self { Self { current_pos, rate_limit_rps, pk_indices, additional_columns, + schema_table_name, + database_name, } } } @@ -90,16 +98,36 @@ impl UpstreamTableReader { fn with_additional_columns( snapshot_chunk: StreamChunk, additional_columns: &[ColumnDesc], + schema_table_name: SchemaTableName, + database_name: String, ) -> StreamChunk { let (ops, mut columns, visibility) = snapshot_chunk.into_inner(); for desc in additional_columns { let mut builder = desc.data_type.create_array_builder(visibility.len()); - match desc.additional_column.column_type.as_ref().unwrap() { + match *desc.additional_column.column_type.as_ref().unwrap() { // set default value for timestamp - &ColumnType::Timestamp(_) => builder.append_n( + ColumnType::Timestamp(_) => builder.append_n( visibility.len(), Some(Timestamptz::default().to_scalar_value()), ), + ColumnType::DatabaseName(_) => { + builder.append_n( + visibility.len(), + Some(ScalarImpl::from(database_name.clone())), + ); + } + ColumnType::SchemaName(_) => { + builder.append_n( + visibility.len(), + Some(ScalarImpl::from(schema_table_name.schema_name.clone())), + ); + } + ColumnType::TableName(_) => { + builder.append_n( + visibility.len(), + Some(ScalarImpl::from(schema_table_name.table_name.clone())), + ); + } // set null for other additional columns _ => { builder.append_n_null(visibility.len()); @@ -140,6 +168,8 @@ impl UpstreamTableRead for UpstreamTableReader { }); let mut read_args = args; + let schema_table_name = read_args.schema_table_name.clone(); + let database_name = read_args.database_name.clone(); // loop to read all data from the table loop { tracing::debug!( @@ -176,6 +206,8 @@ impl UpstreamTableRead for UpstreamTableReader { yield Some(with_additional_columns( chunk, &read_args.additional_columns, + schema_table_name.clone(), + database_name.clone(), )); continue; } else { @@ -196,6 +228,8 @@ impl UpstreamTableRead for UpstreamTableReader { yield Some(with_additional_columns( chunk, &read_args.additional_columns, + schema_table_name.clone(), + database_name.clone(), )); } } @@ -221,6 +255,7 @@ impl UpstreamTableRead for UpstreamTableReader { #[cfg(test)] mod tests { + use std::collections::HashMap; use futures::pin_mut; use futures_async_stream::for_await; @@ -230,7 +265,7 @@ mod tests { use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_connector::source::cdc::external::{ - ExternalTableReader, MySqlExternalTableReader, SchemaTableName, + ExternalTableConfig, ExternalTableReader, MySqlExternalTableReader, SchemaTableName, }; use crate::executor::backfill::utils::{get_new_pos, iter_chunks}; @@ -246,7 +281,7 @@ mod tests { let rw_schema = Schema { fields: columns.iter().map(Field::from).collect(), }; - let props = convert_args!(hashmap!( + let props: HashMap = convert_args!(hashmap!( "hostname" => "localhost", "port" => "8306", "username" => "root", @@ -254,7 +289,10 @@ mod tests { "database.name" => "mydb", "table.name" => "orders_rw")); - let reader = MySqlExternalTableReader::new(props, rw_schema.clone()) + let config = + serde_json::from_value::(serde_json::to_value(props).unwrap()) + .unwrap(); + let reader = MySqlExternalTableReader::new(config, rw_schema.clone()) .await .unwrap(); diff --git a/src/stream/src/from_proto/stream_cdc_scan.rs b/src/stream/src/from_proto/stream_cdc_scan.rs index 55698c44e5e7..150812c57a1c 100644 --- a/src/stream/src/from_proto/stream_cdc_scan.rs +++ b/src/stream/src/from_proto/stream_cdc_scan.rs @@ -15,9 +15,12 @@ use std::collections::HashMap; use std::sync::Arc; +use anyhow::anyhow; use risingwave_common::catalog::{Schema, TableId}; use risingwave_common::util::sort_util::OrderType; -use risingwave_connector::source::cdc::external::{CdcTableType, SchemaTableName}; +use risingwave_connector::source::cdc::external::{ + CdcTableType, ExternalTableConfig, SchemaTableName, +}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::stream_plan::StreamCdcScanNode; @@ -88,19 +91,25 @@ impl ExecutorBuilder for StreamCdcScanExecutorBuilder { .map(Into::into) .collect(); + let schema_table_name = SchemaTableName::from_properties(&properties); + let table_config = serde_json::from_value::( + serde_json::to_value(properties).unwrap(), + ) + .map_err(|e| anyhow!("failed to parse external table config").context(e))?; + let database_name = table_config.database.clone(); let table_reader = table_type .create_table_reader( - properties.clone(), + table_config, table_schema.clone(), table_pk_indices.clone(), scan_options.snapshot_batch_size, ) .await?; - let schema_table_name = SchemaTableName::from_properties(&properties); let external_table = ExternalStorageTable::new( TableId::new(table_desc.table_id), schema_table_name, + database_name, table_reader, table_schema, table_pk_order_types,