From 22051cdf2d609b80d9f86043a3a079a886732a77 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 17 Oct 2024 10:00:15 +0800 Subject: [PATCH] fix(cdc): fix the filter expression of CdcFilter executor (#18868) (#18947) Co-authored-by: lmatz --- .../sql_server_cdc/sql_server_cdc.slt | 30 ++++++------ .../source/core/DbzChangeEventConsumer.java | 1 + src/frontend/src/handler/create_table.rs | 30 ++++++++---- .../plan_node/stream_cdc_table_scan.rs | 46 +++++++------------ 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt index ee78060a814cc..f489ebfcdad8d 100644 --- a/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt +++ b/e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt @@ -159,7 +159,7 @@ CREATE SOURCE upper_mssql_source WITH ( statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source create materialized view mv as select * from mssql_source; -statement error The upstream table name must contain schema name prefix* +statement error The upstream table name must be in 'database.schema.table' format CREATE TABLE shared_orders ( order_id INT, order_date BIGINT, @@ -180,7 +180,7 @@ CREATE TABLE shared_orders ( product_id INT, order_status SMALLINT, PRIMARY KEY (order_id) -) from mssql_source table 'dbo.wrong_orders'; +) from mssql_source table 'mydb.dbo.wrong_orders'; # invalid schema name statement error Sql Server table 'wrong_dbo'.'orders' doesn't exist @@ -192,7 +192,7 @@ CREATE TABLE shared_orders ( product_id INT, order_status SMALLINT, PRIMARY KEY (order_id) -) from mssql_source table 'wrong_dbo.orders'; +) from mssql_source table 'mydb.wrong_dbo.orders'; # invalid primary key statement error INVALID_ARGUMENT: Primary key mismatch @@ -204,7 +204,7 @@ CREATE TABLE shared_orders ( product_id INT, order_status SMALLINT, PRIMARY KEY (order_date) -) from mssql_source table 'dbo.orders'; +) from mssql_source table 'mydb.dbo.orders'; # column name mismatch statement error INVALID_ARGUMENT: Column 'wrong_order_date' not found in the upstream database @@ -216,7 +216,7 @@ CREATE TABLE shared_orders ( product_id INT, order_status SMALLINT, PRIMARY KEY (order_id) -) from mssql_source table 'dbo.orders'; +) from mssql_source table 'mydb.dbo.orders'; # column data type mismatch statement error INVALID_ARGUMENT: Incompatible data type of column order_date @@ -228,7 +228,7 @@ CREATE TABLE shared_orders ( product_id INT, order_status SMALLINT, PRIMARY KEY (order_id) -) from mssql_source table 'dbo.orders'; +) from mssql_source table 'mydb.dbo.orders'; # table without enabling cdc statement error INVALID_ARGUMENT: Table 'dbo.orders_without_cdc' has not enabled CDC. @@ -240,7 +240,7 @@ CREATE TABLE shared_orders_without_cdc ( product_id INT, order_status SMALLINT, PRIMARY KEY (order_id) -) from mssql_source table 'dbo.orders_without_cdc'; +) from mssql_source table 'mydb.dbo.orders_without_cdc'; # use batch_size = 1 here to ensure not all the data is backfilled in one batch. statement ok @@ -254,15 +254,15 @@ CREATE TABLE shared_orders ( PRIMARY KEY (order_id) ) WITH ( snapshot.batch_size = '1', -) from mssql_source table 'dbo.orders'; +) from mssql_source table 'mydb.dbo.orders'; statement ok -CREATE TABLE shared_single_type (*) from mssql_source table 'dbo.single_type'; +CREATE TABLE shared_single_type (*) from mssql_source table 'mydb.dbo.single_type'; statement ok CREATE TABLE shared_sqlserver_all_data_types ( * -) from mssql_source table 'dbo.sqlserver_all_data_types'; +) from mssql_source table 'mydb.dbo.sqlserver_all_data_types'; sleep 5s @@ -293,28 +293,28 @@ CREATE TABLE shared_sqlserver_all_data_types ( c_datetimeoffset TIMESTAMPTZ, c_xml varchar, PRIMARY KEY (id) -) from mssql_source table 'dbo.sqlserver_all_data_types'; +) from mssql_source table 'mydb.dbo.sqlserver_all_data_types'; statement error Sql Server table 'UpperSchema'.'UpperTable' doesn't exist in 'mydb' CREATE TABLE upper_table ( "ID" INT, "Name" VARCHAR, PRIMARY KEY ("ID") -) from mssql_source table 'UpperSchema.UpperTable'; +) from mssql_source table 'mydb.UpperSchema.UpperTable'; statement error Column 'name' not found in the upstream database CREATE TABLE upper_table ( "ID" INT, name VARCHAR, PRIMARY KEY ("ID") -) from upper_mssql_source table 'UpperSchema.UpperTable'; +) from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable'; statement error Sql Server table 'upperSchema'.'upperTable' doesn't exist in 'UpperDB' CREATE TABLE upper_table ( "ID" INT, "Name" VARCHAR, PRIMARY KEY ("ID") -) from upper_mssql_source table 'upperSchema.upperTable'; +) from upper_mssql_source table 'UpperDB.upperSchema.upperTable'; statement ok CREATE TABLE upper_table ( @@ -326,7 +326,7 @@ INCLUDE TIMESTAMP AS commit_ts INCLUDE DATABASE_NAME as database_name INCLUDE SCHEMA_NAME as schema_name INCLUDE TABLE_NAME as table_name -from upper_mssql_source table 'UpperSchema.UpperTable'; +from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable'; statement ok create materialized view shared_orders_cnt as select count(*) as cnt from shared_orders; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java index 375b4d4a3ad62..723f5008de402 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java @@ -235,6 +235,7 @@ var record = event.value(); // - PG: topicPrefix.schemaName.tableName // - MySQL: topicPrefix.databaseName.tableName // - Mongo: topicPrefix.databaseName.collectionName + // - SQL Server: topicPrefix.databaseName.schemaName.tableName // We can extract the full table name from the topic var fullTableName = record.topic().substring(record.topic().indexOf('.') + 1); diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 534333efbd910..366e63aec2079 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -756,7 +756,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( column_defs: Vec, mut columns: Vec, pk_names: Vec, - connect_properties: WithOptionsSecResolved, + cdc_with_options: WithOptionsSecResolved, mut col_id_gen: ColumnIdGenerator, on_conflict: Option, with_version_column: Option, @@ -771,7 +771,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( // append additional columns to the end handle_addition_columns( - &connect_properties, + &cdc_with_options, include_column_options, &mut columns, true, @@ -811,7 +811,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( .map(|idx| ColumnOrder::new(*idx, OrderType::ascending())) .collect(); - let (options, secret_refs) = connect_properties.into_parts(); + let (options, secret_refs) = cdc_with_options.into_parts(); let cdc_table_desc = CdcTableDesc { table_id, @@ -870,7 +870,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table( Ok((materialize.into(), table)) } -fn derive_connect_properties( +fn derive_with_options_for_cdc_table( source_with_properties: &WithOptionsSecResolved, external_table_name: String, ) -> Result { @@ -900,9 +900,19 @@ fn derive_connect_properties( table_name } SQL_SERVER_CDC_CONNECTOR => { - let (schema_name, table_name) = external_table_name + // SQL Server external table name is in 'databaseName.schemaName.tableName' pattern, + // we remove the database name prefix and split the schema name and table name + let schema_table_name = external_table_name .split_once('.') - .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'dbo.table'"))?; + .ok_or_else(|| { + anyhow!("The upstream table name must be in 'database.schema.table' format") + })? + .1; + + let (schema_name, table_name) = + schema_table_name.split_once('.').ok_or_else(|| { + anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'") + })?; // insert 'schema.name' into connect properties connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); @@ -1015,7 +1025,7 @@ pub(super) async fn handle_create_table_plan( )?; source.clone() }; - let connect_properties = derive_connect_properties( + let connect_properties = derive_with_options_for_cdc_table( &source.with_properties, cdc_table.external_table_name.clone(), )?; @@ -1372,7 +1382,7 @@ pub async fn generate_stream_graph_for_table( let (source, resolved_table_name, database_id, schema_id) = get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?; - let connect_properties = derive_connect_properties( + let cdc_with_options = derive_with_options_for_cdc_table( &source.with_properties, cdc_table.external_table_name.clone(), )?; @@ -1380,7 +1390,7 @@ pub async fn generate_stream_graph_for_table( let (columns, pk_names) = derive_schema_for_cdc_table( &column_defs, &constraints, - connect_properties.clone(), + cdc_with_options.clone(), false, Some(CdcSchemaChangeArgs { original_catalog: original_catalog.clone(), @@ -1398,7 +1408,7 @@ pub async fn generate_stream_graph_for_table( column_defs, columns, pk_names, - connect_properties, + cdc_with_options, col_id_gen, on_conflict, with_version_column, diff --git a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs index 406ebdc8bfae5..d6fc367d32e59 100644 --- a/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs @@ -168,16 +168,9 @@ impl StreamCdcTableScan { // We need to pass the id of upstream source job here let upstream_source_id = self.core.cdc_table_desc.source_id.table_id; - // split the table name from the qualified table name, e.g. `database_name.table_name` - let (_, cdc_table_name) = self - .core - .cdc_table_desc - .external_table_name - .split_once('.') - .unwrap(); - - // jsonb filter expr: payload->'source'->>'table' = - let filter_expr = Self::build_cdc_filter_expr(cdc_table_name); + // filter upstream source chunk by the value of `_rw_table_name` column + let filter_expr = + Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str()); let filter_operator_id = self.core.ctx.next_plan_node_id(); // The filter node receive chunks in `(payload, _rw_offset, _rw_table_name)` schema @@ -273,28 +266,13 @@ impl StreamCdcTableScan { }) } + // The filter node receive input chunks in `(payload, _rw_offset, _rw_table_name)` schema pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl { - // jsonb filter expr: payload->'source'->>'table' = + // filter by the `_rw_table_name` column FunctionCall::new( ExprType::Equal, vec![ - FunctionCall::new( - ExprType::JsonbAccessStr, - vec![ - FunctionCall::new( - ExprType::JsonbAccess, - vec![ - InputRef::new(0, DataType::Jsonb).into(), - ExprImpl::literal_varchar("source".into()), - ], - ) - .unwrap() - .into(), - ExprImpl::literal_varchar("table".into()), - ], - ) - .unwrap() - .into(), + InputRef::new(2, DataType::Varchar).into(), ExprImpl::literal_varchar(cdc_table_name.into()), ], ) @@ -334,22 +312,27 @@ mod tests { async fn test_cdc_filter_expr() { let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap(); + + // NOTES: transaction metadata column expects to be filtered out before going to cdc filter let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap(); let row1 = OwnedRow::new(vec![ Some(t1_json.into()), Some(r#"{"file": "1.binlog", "pos": 100}"#.into()), + Some("public.t2".into()), ]); let row2 = OwnedRow::new(vec![ Some(t2_json.into()), Some(r#"{"file": "2.binlog", "pos": 100}"#.into()), + Some("abs.t2".into()), ]); let row3 = OwnedRow::new(vec![ Some(trx_json.into()), Some(r#"{"file": "3.binlog", "pos": 100}"#.into()), + Some("public.t2".into()), ]); - let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("t1"); + let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2"); assert_eq!( filter_expr.eval_row(&row1).await.unwrap(), Some(ScalarImpl::Bool(true)) @@ -358,6 +341,9 @@ mod tests { filter_expr.eval_row(&row2).await.unwrap(), Some(ScalarImpl::Bool(false)) ); - assert_eq!(filter_expr.eval_row(&row3).await.unwrap(), None) + assert_eq!( + filter_expr.eval_row(&row3).await.unwrap(), + Some(ScalarImpl::Bool(true)) + ) } }