Skip to content

Commit

Permalink
feat(mysql-cdc): support mysql source capture multiple databases (#19038
Browse files Browse the repository at this point in the history
)
  • Loading branch information
StrikeW authored and StrikeW committed Oct 25, 2024
1 parent 91156f7 commit a937dc9
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 27 deletions.
39 changes: 39 additions & 0 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@ control substitution on
system ok
mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"

system ok
mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS kdb; CREATE DATABASE kdb;"

system ok
mysql --protocol=tcp -u root mytest < e2e_test/source_legacy/cdc/mysql_create.sql

system ok
mysql --protocol=tcp -u root kdb -e " CREATE TABLE kt1 (id int primary key, v1 varchar(32));
INSERT INTO kt1 VALUES (1,'aaa'),(2,'bbb');
"

# generate data to mysql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source_legacy/cdc/mysql_init_data.sql
Expand All @@ -28,6 +36,7 @@ create source mysql_mytest with (
server.id = '5601'
);


statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source
create materialized view mv as select * from mysql_mytest;

Expand Down Expand Up @@ -70,6 +79,19 @@ SINGLE {STREAM_SCAN}
SINGLE {CDC_FILTER}
HASH {SOURCE,DML}


statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = secret mysql_pwd,
database.name = 'mytest,kdb',
server.id = '5602'
);


statement ok
CREATE TABLE IF NOT EXISTS mysql_all_types(
c_boolean boolean,
Expand Down Expand Up @@ -112,6 +134,10 @@ create table orders_test (
PRIMARY KEY (order_id)
) from mysql_mytest table 'mytest.orders';


statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';

statement ok
create materialized view products_test_cnt as select count(*) as cnt from rw.products_test;

Expand All @@ -121,6 +147,9 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t
system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');"

system ok
mysql --protocol=tcp -u root kdb -e "INSERT INTO kt1 VALUES (3, 'ccc'),(4, 'ddd');"

system ok
mysql --protocol=tcp -u root mytest -e "FLUSH LOGS"

Expand All @@ -146,6 +175,16 @@ select count(*) from orders_no_backfill
----
0


query IT
select * from kt1 order by id;
----
1 aaa
2 bbb
3 ccc
4 ddd


# check ingestion results
query I
SELECT * from products_test_cnt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ public DbzConnectorConfig(

dbzProps.putAll(mysqlProps);

if (isCdcSourceJob) {
// remove table filtering for the shared MySQL source, since we
// allow user to ingest tables in different database
LOG.info("Disable table filtering for the shared MySQL source");
dbzProps.remove("table.include.list");
}

} else if (source == SourceTypeE.POSTGRES) {
var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.risingwave.connector.source.common;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.proto.Data;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -45,9 +44,7 @@ public MySqlValidator(

var dbHost = userProps.get(DbzConnectorConfig.HOST);
var dbPort = userProps.get(DbzConnectorConfig.PORT);
var dbName = userProps.get(DbzConnectorConfig.DB_NAME);
var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.MYSQL, dbHost, dbPort, dbName);

var jdbcUrl = String.format("jdbc:mysql://%s:%s", dbHost, dbPort);
var properties = new Properties();
properties.setProperty("user", userProps.get(DbzConnectorConfig.USER));
properties.setProperty("password", userProps.get(DbzConnectorConfig.PASSWORD));
Expand All @@ -72,6 +69,27 @@ public void validateDbConfig() {
if ((major > 8) || (major == 8 && minor >= 4)) {
throw ValidatorUtils.failedPrecondition("MySQL version should be less than 8.4");
}

// "database.name" is a comma-separated list of database names
var dbNames = userProps.get(DbzConnectorConfig.DB_NAME);
for (var dbName : dbNames.split(",")) {
// check the existence of the database
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("mysql.check_db_exist"))) {
stmt.setString(1, dbName.trim());
var res = stmt.executeQuery();
while (res.next()) {
var ret = res.getInt(1);
if (ret == 0) {
throw ValidatorUtils.invalidArgument(
String.format(
"MySQL database '%s' doesn't exist", dbName.trim()));
}
}
}
}

validateBinlogConfig();
} catch (SQLException e) {
throw ValidatorUtils.internalError(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mysql.bin_row_image=show variables like 'binlog_row_image'
mysql.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION
mysql.grants=SHOW GRANTS FOR CURRENT_USER()
mysql.check_db_exist=SELECT count(*) FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ?
postgres.wal=show wal_level
postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?)
postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary
Expand Down
42 changes: 19 additions & 23 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ fn gen_table_plan_inner(
return Err(ErrorCode::InvalidInputSyntax(
"When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be DO NOTHING.".to_owned(),
)
.into());
.into());
}
Some(on_conflict)
} else {
Expand Down Expand Up @@ -885,28 +885,26 @@ fn derive_with_options_for_cdc_table(
) -> Result<WithOptionsSecResolved> {
use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
// we should remove the prefix from `full_table_name`
let mut connect_properties = source_with_properties.clone();
let mut with_options = source_with_properties.clone();
if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
let table_name = match connector.as_str() {
match connector.as_str() {
MYSQL_CDC_CONNECTOR => {
let db_name = connect_properties.get(DATABASE_NAME_KEY).ok_or_else(|| {
anyhow!("{} not found in source properties", DATABASE_NAME_KEY)
// MySQL doesn't allow '.' in database name and table name, so we can split the
// external table name by '.' to get the table name
let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
})?;

let prefix = format!("{}.", db_name.as_str());
external_table_name
.strip_prefix(prefix.as_str())
.ok_or_else(|| anyhow!("The upstream table name must contain database name prefix, e.g. 'mydb.table'."))?
with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
}
POSTGRES_CDC_CONNECTOR => {
let (schema_name, table_name) = external_table_name
.split_once('.')
.ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;

// insert 'schema.name' into connect properties
connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into());

table_name
with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
}
SQL_SERVER_CDC_CONNECTOR => {
// SQL Server external table name is in 'databaseName.schemaName.tableName' pattern,
Expand All @@ -924,9 +922,8 @@ fn derive_with_options_for_cdc_table(
})?;

// insert 'schema.name' into connect properties
connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into());

table_name
with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
}
_ => {
return Err(RwError::from(anyhow!(
Expand All @@ -935,9 +932,8 @@ fn derive_with_options_for_cdc_table(
)));
}
};
connect_properties.insert(TABLE_NAME_KEY.into(), table_name.into());
}
Ok(connect_properties)
Ok(with_options)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -1034,15 +1030,15 @@ pub(super) async fn handle_create_table_plan(
)?;
source.clone()
};
let connect_properties = derive_with_options_for_cdc_table(
let cdc_with_options = derive_with_options_for_cdc_table(
&source.with_properties,
cdc_table.external_table_name.clone(),
)?;

let (columns, pk_names) = derive_schema_for_cdc_table(
&column_defs,
&constraints,
connect_properties.clone(),
cdc_with_options.clone(),
wildcard_idx.is_some(),
None,
)
Expand All @@ -1057,7 +1053,7 @@ pub(super) async fn handle_create_table_plan(
column_defs,
columns,
pk_names,
connect_properties,
cdc_with_options,
col_id_gen,
on_conflict,
with_version_column,
Expand Down Expand Up @@ -1162,7 +1158,7 @@ struct CdcSchemaChangeArgs {
async fn derive_schema_for_cdc_table(
column_defs: &Vec<ColumnDef>,
constraints: &Vec<TableConstraint>,
connect_properties: WithOptionsSecResolved,
cdc_with_options: WithOptionsSecResolved,
need_auto_schema_map: bool,
schema_change_args: Option<CdcSchemaChangeArgs>,
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
Expand All @@ -1176,7 +1172,7 @@ async fn derive_schema_for_cdc_table(
"Please define the schema manually".to_owned(),
)
})?;
let (options, secret_refs) = connect_properties.into_parts();
let (options, secret_refs) = cdc_with_options.into_parts();
let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
.context("failed to extract external table config")?;

Expand Down

0 comments on commit a937dc9

Please sign in to comment.