Skip to content

Commit

Permalink
fix(cdc): fix the filter expression of CdcFilter executor (#18868) (#…
Browse files Browse the repository at this point in the history
…18947)

Co-authored-by: lmatz <[email protected]>
  • Loading branch information
StrikeW and lmatz authored Oct 17, 2024
1 parent a464a11 commit 22051cd
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 55 deletions.
30 changes: 15 additions & 15 deletions e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ CREATE SOURCE upper_mssql_source WITH (
statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source
create materialized view mv as select * from mssql_source;

statement error The upstream table name must contain schema name prefix*
statement error The upstream table name must be in 'database.schema.table' format
CREATE TABLE shared_orders (
order_id INT,
order_date BIGINT,
Expand All @@ -180,7 +180,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.wrong_orders';
) from mssql_source table 'mydb.dbo.wrong_orders';

# invalid schema name
statement error Sql Server table 'wrong_dbo'.'orders' doesn't exist
Expand All @@ -192,7 +192,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'wrong_dbo.orders';
) from mssql_source table 'mydb.wrong_dbo.orders';

# invalid primary key
statement error INVALID_ARGUMENT: Primary key mismatch
Expand All @@ -204,7 +204,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_date)
) from mssql_source table 'dbo.orders';
) from mssql_source table 'mydb.dbo.orders';

# column name mismatch
statement error INVALID_ARGUMENT: Column 'wrong_order_date' not found in the upstream database
Expand All @@ -216,7 +216,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.orders';
) from mssql_source table 'mydb.dbo.orders';

# column data type mismatch
statement error INVALID_ARGUMENT: Incompatible data type of column order_date
Expand All @@ -228,7 +228,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.orders';
) from mssql_source table 'mydb.dbo.orders';

# table without enabling cdc
statement error INVALID_ARGUMENT: Table 'dbo.orders_without_cdc' has not enabled CDC.
Expand All @@ -240,7 +240,7 @@ CREATE TABLE shared_orders_without_cdc (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.orders_without_cdc';
) from mssql_source table 'mydb.dbo.orders_without_cdc';

# use batch_size = 1 here to ensure not all the data is backfilled in one batch.
statement ok
Expand All @@ -254,15 +254,15 @@ CREATE TABLE shared_orders (
PRIMARY KEY (order_id)
) WITH (
snapshot.batch_size = '1',
) from mssql_source table 'dbo.orders';
) from mssql_source table 'mydb.dbo.orders';

statement ok
CREATE TABLE shared_single_type (*) from mssql_source table 'dbo.single_type';
CREATE TABLE shared_single_type (*) from mssql_source table 'mydb.dbo.single_type';

statement ok
CREATE TABLE shared_sqlserver_all_data_types (
*
) from mssql_source table 'dbo.sqlserver_all_data_types';
) from mssql_source table 'mydb.dbo.sqlserver_all_data_types';

sleep 5s

Expand Down Expand Up @@ -293,28 +293,28 @@ CREATE TABLE shared_sqlserver_all_data_types (
c_datetimeoffset TIMESTAMPTZ,
c_xml varchar,
PRIMARY KEY (id)
) from mssql_source table 'dbo.sqlserver_all_data_types';
) from mssql_source table 'mydb.dbo.sqlserver_all_data_types';

statement error Sql Server table 'UpperSchema'.'UpperTable' doesn't exist in 'mydb'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
PRIMARY KEY ("ID")
) from mssql_source table 'UpperSchema.UpperTable';
) from mssql_source table 'mydb.UpperSchema.UpperTable';

statement error Column 'name' not found in the upstream database
CREATE TABLE upper_table (
"ID" INT,
name VARCHAR,
PRIMARY KEY ("ID")
) from upper_mssql_source table 'UpperSchema.UpperTable';
) from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable';

statement error Sql Server table 'upperSchema'.'upperTable' doesn't exist in 'UpperDB'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
PRIMARY KEY ("ID")
) from upper_mssql_source table 'upperSchema.upperTable';
) from upper_mssql_source table 'UpperDB.upperSchema.upperTable';

statement ok
CREATE TABLE upper_table (
Expand All @@ -326,7 +326,7 @@ INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
from upper_mssql_source table 'UpperSchema.UpperTable';
from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable';

statement ok
create materialized view shared_orders_cnt as select count(*) as cnt from shared_orders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ var record = event.value();
// - PG: topicPrefix.schemaName.tableName
// - MySQL: topicPrefix.databaseName.tableName
// - Mongo: topicPrefix.databaseName.collectionName
// - SQL Server: topicPrefix.databaseName.schemaName.tableName
// We can extract the full table name from the topic
var fullTableName =
record.topic().substring(record.topic().indexOf('.') + 1);
Expand Down
30 changes: 20 additions & 10 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
column_defs: Vec<ColumnDef>,
mut columns: Vec<ColumnCatalog>,
pk_names: Vec<String>,
connect_properties: WithOptionsSecResolved,
cdc_with_options: WithOptionsSecResolved,
mut col_id_gen: ColumnIdGenerator,
on_conflict: Option<OnConflict>,
with_version_column: Option<String>,
Expand All @@ -771,7 +771,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(

// append additional columns to the end
handle_addition_columns(
&connect_properties,
&cdc_with_options,
include_column_options,
&mut columns,
true,
Expand Down Expand Up @@ -811,7 +811,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
.map(|idx| ColumnOrder::new(*idx, OrderType::ascending()))
.collect();

let (options, secret_refs) = connect_properties.into_parts();
let (options, secret_refs) = cdc_with_options.into_parts();

let cdc_table_desc = CdcTableDesc {
table_id,
Expand Down Expand Up @@ -870,7 +870,7 @@ pub(crate) fn gen_create_table_plan_for_cdc_table(
Ok((materialize.into(), table))
}

fn derive_connect_properties(
fn derive_with_options_for_cdc_table(
source_with_properties: &WithOptionsSecResolved,
external_table_name: String,
) -> Result<WithOptionsSecResolved> {
Expand Down Expand Up @@ -900,9 +900,19 @@ fn derive_connect_properties(
table_name
}
SQL_SERVER_CDC_CONNECTOR => {
let (schema_name, table_name) = external_table_name
// SQL Server external table name is in 'databaseName.schemaName.tableName' pattern,
// we remove the database name prefix and split the schema name and table name
let schema_table_name = external_table_name
.split_once('.')
.ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'dbo.table'"))?;
.ok_or_else(|| {
anyhow!("The upstream table name must be in 'database.schema.table' format")
})?
.1;

let (schema_name, table_name) =
schema_table_name.split_once('.').ok_or_else(|| {
anyhow!("The table name must contain schema name prefix, e.g. 'dbo.table'")
})?;

// insert 'schema.name' into connect properties
connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
Expand Down Expand Up @@ -1015,7 +1025,7 @@ pub(super) async fn handle_create_table_plan(
)?;
source.clone()
};
let connect_properties = derive_connect_properties(
let connect_properties = derive_with_options_for_cdc_table(
&source.with_properties,
cdc_table.external_table_name.clone(),
)?;
Expand Down Expand Up @@ -1372,15 +1382,15 @@ pub async fn generate_stream_graph_for_table(
let (source, resolved_table_name, database_id, schema_id) =
get_source_and_resolved_table_name(session, cdc_table.clone(), table_name.clone())?;

let connect_properties = derive_connect_properties(
let cdc_with_options = derive_with_options_for_cdc_table(
&source.with_properties,
cdc_table.external_table_name.clone(),
)?;

let (columns, pk_names) = derive_schema_for_cdc_table(
&column_defs,
&constraints,
connect_properties.clone(),
cdc_with_options.clone(),
false,
Some(CdcSchemaChangeArgs {
original_catalog: original_catalog.clone(),
Expand All @@ -1398,7 +1408,7 @@ pub async fn generate_stream_graph_for_table(
column_defs,
columns,
pk_names,
connect_properties,
cdc_with_options,
col_id_gen,
on_conflict,
with_version_column,
Expand Down
46 changes: 16 additions & 30 deletions src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,9 @@ impl StreamCdcTableScan {
// We need to pass the id of upstream source job here
let upstream_source_id = self.core.cdc_table_desc.source_id.table_id;

// split the table name from the qualified table name, e.g. `database_name.table_name`
let (_, cdc_table_name) = self
.core
.cdc_table_desc
.external_table_name
.split_once('.')
.unwrap();

// jsonb filter expr: payload->'source'->>'table' = <cdc_table_name>
let filter_expr = Self::build_cdc_filter_expr(cdc_table_name);
// filter upstream source chunk by the value of `_rw_table_name` column
let filter_expr =
Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());

let filter_operator_id = self.core.ctx.next_plan_node_id();
// The filter node receive chunks in `(payload, _rw_offset, _rw_table_name)` schema
Expand Down Expand Up @@ -273,28 +266,13 @@ impl StreamCdcTableScan {
})
}

// The filter node receive input chunks in `(payload, _rw_offset, _rw_table_name)` schema
pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
// jsonb filter expr: payload->'source'->>'table' = <cdc_table_name>
// filter by the `_rw_table_name` column
FunctionCall::new(
ExprType::Equal,
vec![
FunctionCall::new(
ExprType::JsonbAccessStr,
vec![
FunctionCall::new(
ExprType::JsonbAccess,
vec![
InputRef::new(0, DataType::Jsonb).into(),
ExprImpl::literal_varchar("source".into()),
],
)
.unwrap()
.into(),
ExprImpl::literal_varchar("table".into()),
],
)
.unwrap()
.into(),
InputRef::new(2, DataType::Varchar).into(),
ExprImpl::literal_varchar(cdc_table_name.into()),
],
)
Expand Down Expand Up @@ -334,22 +312,27 @@ mod tests {
async fn test_cdc_filter_expr() {
let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();

// NOTES: transaction metadata column expects to be filtered out before going to cdc filter
let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
let row1 = OwnedRow::new(vec![
Some(t1_json.into()),
Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
Some("public.t2".into()),
]);
let row2 = OwnedRow::new(vec![
Some(t2_json.into()),
Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
Some("abs.t2".into()),
]);

let row3 = OwnedRow::new(vec![
Some(trx_json.into()),
Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
Some("public.t2".into()),
]);

let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("t1");
let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
assert_eq!(
filter_expr.eval_row(&row1).await.unwrap(),
Some(ScalarImpl::Bool(true))
Expand All @@ -358,6 +341,9 @@ mod tests {
filter_expr.eval_row(&row2).await.unwrap(),
Some(ScalarImpl::Bool(false))
);
assert_eq!(filter_expr.eval_row(&row3).await.unwrap(), None)
assert_eq!(
filter_expr.eval_row(&row3).await.unwrap(),
Some(ScalarImpl::Bool(true))
)
}
}

0 comments on commit 22051cd

Please sign in to comment.