Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mysql-cdc): support mysql source capture multiple databases (#19038) #19125

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions e2e_test/source_legacy/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@ control substitution on
system ok
mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS mytest; CREATE DATABASE mytest;"

system ok
mysql --protocol=tcp -u root -e "DROP DATABASE IF EXISTS kdb; CREATE DATABASE kdb;"

system ok
mysql --protocol=tcp -u root mytest < e2e_test/source_legacy/cdc/mysql_create.sql

system ok
mysql --protocol=tcp -u root kdb -e " CREATE TABLE kt1 (id int primary key, v1 varchar(32));
INSERT INTO kt1 VALUES (1,'aaa'),(2,'bbb');
"

# generate data to mysql
system ok
mysql --protocol=tcp -u root mytest < e2e_test/source_legacy/cdc/mysql_init_data.sql
Expand All @@ -28,6 +36,7 @@ create source mysql_mytest with (
server.id = '5601'
);


statement error Should not create MATERIALIZED VIEW or SELECT directly on shared CDC source
create materialized view mv as select * from mysql_mytest;

Expand Down Expand Up @@ -70,6 +79,19 @@ SINGLE {STREAM_SCAN}
SINGLE {CDC_FILTER}
HASH {SOURCE,DML}


statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = secret mysql_pwd,
database.name = 'mytest,kdb',
server.id = '5602'
);


statement ok
CREATE TABLE IF NOT EXISTS mysql_all_types(
c_boolean boolean,
Expand Down Expand Up @@ -112,6 +134,10 @@ create table orders_test (
PRIMARY KEY (order_id)
) from mysql_mytest table 'mytest.orders';


statement ok
create table kt1 (*) from mysql_source table 'kdb.kt1';

statement ok
create materialized view products_test_cnt as select count(*) as cnt from rw.products_test;

Expand All @@ -121,6 +147,9 @@ create materialized view orders_test_cnt as select count(*) as cnt from orders_t
system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');"

system ok
mysql --protocol=tcp -u root kdb -e "INSERT INTO kt1 VALUES (3, 'ccc'),(4, 'ddd');"

system ok
mysql --protocol=tcp -u root mytest -e "FLUSH LOGS"

Expand All @@ -146,6 +175,16 @@ select count(*) from orders_no_backfill
----
0


query IT
select * from kt1 order by id;
----
1 aaa
2 bbb
3 ccc
4 ddd


# check ingestion results
query I
SELECT * from products_test_cnt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ public DbzConnectorConfig(

dbzProps.putAll(mysqlProps);

if (isCdcSourceJob) {
// remove table filtering for the shared MySQL source, since we
// allow user to ingest tables in different database
LOG.info("Disable table filtering for the shared MySQL source");
dbzProps.remove("table.include.list");
}

} else if (source == SourceTypeE.POSTGRES) {
var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.risingwave.connector.source.common;

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.proto.Data;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -45,9 +44,7 @@ public MySqlValidator(

var dbHost = userProps.get(DbzConnectorConfig.HOST);
var dbPort = userProps.get(DbzConnectorConfig.PORT);
var dbName = userProps.get(DbzConnectorConfig.DB_NAME);
var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.MYSQL, dbHost, dbPort, dbName);

var jdbcUrl = String.format("jdbc:mysql://%s:%s", dbHost, dbPort);
var properties = new Properties();
properties.setProperty("user", userProps.get(DbzConnectorConfig.USER));
properties.setProperty("password", userProps.get(DbzConnectorConfig.PASSWORD));
Expand All @@ -72,6 +69,27 @@ public void validateDbConfig() {
if ((major > 8) || (major == 8 && minor >= 4)) {
throw ValidatorUtils.failedPrecondition("MySQL version should be less than 8.4");
}

// "database.name" is a comma-separated list of database names
var dbNames = userProps.get(DbzConnectorConfig.DB_NAME);
for (var dbName : dbNames.split(",")) {
// check the existence of the database
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("mysql.check_db_exist"))) {
stmt.setString(1, dbName.trim());
var res = stmt.executeQuery();
while (res.next()) {
var ret = res.getInt(1);
if (ret == 0) {
throw ValidatorUtils.invalidArgument(
String.format(
"MySQL database '%s' doesn't exist", dbName.trim()));
}
}
}
}

validateBinlogConfig();
} catch (SQLException e) {
throw ValidatorUtils.internalError(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mysql.bin_row_image=show variables like 'binlog_row_image'
mysql.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION
mysql.grants=SHOW GRANTS FOR CURRENT_USER()
mysql.check_db_exist=SELECT count(*) FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ?
postgres.wal=show wal_level
postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?)
postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary
Expand Down
42 changes: 19 additions & 23 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ fn gen_table_plan_inner(
return Err(ErrorCode::InvalidInputSyntax(
"When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be DO NOTHING.".to_owned(),
)
.into());
.into());
}
Some(on_conflict)
} else {
Expand Down Expand Up @@ -885,28 +885,26 @@ fn derive_with_options_for_cdc_table(
) -> Result<WithOptionsSecResolved> {
use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR};
// we should remove the prefix from `full_table_name`
let mut connect_properties = source_with_properties.clone();
let mut with_options = source_with_properties.clone();
if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) {
let table_name = match connector.as_str() {
match connector.as_str() {
MYSQL_CDC_CONNECTOR => {
let db_name = connect_properties.get(DATABASE_NAME_KEY).ok_or_else(|| {
anyhow!("{} not found in source properties", DATABASE_NAME_KEY)
// MySQL doesn't allow '.' in database name and table name, so we can split the
// external table name by '.' to get the table name
let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| {
anyhow!("The upstream table name must contain database name prefix, e.g. 'database.table'")
})?;

let prefix = format!("{}.", db_name.as_str());
external_table_name
.strip_prefix(prefix.as_str())
.ok_or_else(|| anyhow!("The upstream table name must contain database name prefix, e.g. 'mydb.table'."))?
with_options.insert(DATABASE_NAME_KEY.into(), db_name.into());
with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
}
POSTGRES_CDC_CONNECTOR => {
let (schema_name, table_name) = external_table_name
.split_once('.')
.ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?;

// insert 'schema.name' into connect properties
connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into());

table_name
with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
}
SQL_SERVER_CDC_CONNECTOR => {
// SQL Server external table name is in 'databaseName.schemaName.tableName' pattern,
Expand All @@ -924,9 +922,8 @@ fn derive_with_options_for_cdc_table(
})?;

// insert 'schema.name' into connect properties
connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into());

table_name
with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into());
with_options.insert(TABLE_NAME_KEY.into(), table_name.into());
}
_ => {
return Err(RwError::from(anyhow!(
Expand All @@ -935,9 +932,8 @@ fn derive_with_options_for_cdc_table(
)));
}
};
connect_properties.insert(TABLE_NAME_KEY.into(), table_name.into());
}
Ok(connect_properties)
Ok(with_options)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -1034,15 +1030,15 @@ pub(super) async fn handle_create_table_plan(
)?;
source.clone()
};
let connect_properties = derive_with_options_for_cdc_table(
let cdc_with_options = derive_with_options_for_cdc_table(
&source.with_properties,
cdc_table.external_table_name.clone(),
)?;

let (columns, pk_names) = derive_schema_for_cdc_table(
&column_defs,
&constraints,
connect_properties.clone(),
cdc_with_options.clone(),
wildcard_idx.is_some(),
None,
)
Expand All @@ -1057,7 +1053,7 @@ pub(super) async fn handle_create_table_plan(
column_defs,
columns,
pk_names,
connect_properties,
cdc_with_options,
col_id_gen,
on_conflict,
with_version_column,
Expand Down Expand Up @@ -1162,7 +1158,7 @@ struct CdcSchemaChangeArgs {
async fn derive_schema_for_cdc_table(
column_defs: &Vec<ColumnDef>,
constraints: &Vec<TableConstraint>,
connect_properties: WithOptionsSecResolved,
cdc_with_options: WithOptionsSecResolved,
need_auto_schema_map: bool,
schema_change_args: Option<CdcSchemaChangeArgs>,
) -> Result<(Vec<ColumnCatalog>, Vec<String>)> {
Expand All @@ -1176,7 +1172,7 @@ async fn derive_schema_for_cdc_table(
"Please define the schema manually".to_owned(),
)
})?;
let (options, secret_refs) = connect_properties.into_parts();
let (options, secret_refs) = cdc_with_options.into_parts();
let config = ExternalTableConfig::try_from_btreemap(options, secret_refs)
.context("failed to extract external table config")?;

Expand Down
Loading