From 5f73b3e089ad9098fd81ca92fdeb57d26d2644e3 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 21 Oct 2024 17:59:13 +0800 Subject: [PATCH] mysql source multi database --- .../source/common/DbzConnectorConfig.java | 7 ++++ .../source/common/MySqlValidator.java | 25 +++++++++-- .../main/resources/validate_sql.properties | 1 + src/frontend/src/handler/create_table.rs | 42 +++++++++---------- 4 files changed, 48 insertions(+), 27 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 98f0a39a2a3dd..09f69df349e37 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -174,6 +174,13 @@ public DbzConnectorConfig( dbzProps.putAll(mysqlProps); + if (isCdcSourceJob) { + // remove table filtering for the shared MySQL source, since we + // allow user to ingest tables in different database + LOG.info("Disable table filtering for the shared MySQL source"); + dbzProps.remove("table.include.list"); + } + } else if (source == SourceTypeE.POSTGRES) { var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index cd7b6d29bb418..2ec63615467c5 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -15,7 +15,6 @@ package com.risingwave.connector.source.common; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.proto.Data; import java.sql.Connection; import java.sql.DriverManager; @@ -45,9 +44,7 @@ public MySqlValidator( var dbHost = userProps.get(DbzConnectorConfig.HOST); var dbPort = userProps.get(DbzConnectorConfig.PORT); - var dbName = userProps.get(DbzConnectorConfig.DB_NAME); - var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.MYSQL, dbHost, dbPort, dbName); - + var jdbcUrl = String.format("jdbc:mysql://%s:%s", dbHost, dbPort); var properties = new Properties(); properties.setProperty("user", userProps.get(DbzConnectorConfig.USER)); properties.setProperty("password", userProps.get(DbzConnectorConfig.PASSWORD)); @@ -72,6 +69,26 @@ public void validateDbConfig() { if ((major > 8) || (major == 8 && minor >= 4)) { throw ValidatorUtils.failedPrecondition("MySQL version should be less than 8.4"); } + + // "database.name" is a comma-separated list of database names + var dbNames = userProps.get(DbzConnectorConfig.DB_NAME); + for (var dbName : dbNames.split(",")) { + // check the existence of the database + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("mysql.check_db_exist"))) { + stmt.setString(1, dbName.trim()); + var res = stmt.executeQuery(); + while (res.next()) { + var ret = res.getInt(1); + if (ret == 0) { + throw ValidatorUtils.invalidArgument( + String.format("MySQL database '%s' doesn't exist", dbName)); + } + } + } + } + validateBinlogConfig(); } catch (SQLException e) { throw ValidatorUtils.internalError(e.getMessage()); diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 04eaf227b65d7..57091adac4eee 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -4,6 +4,7 @@ mysql.bin_row_image=show variables like 'binlog_row_image' mysql.table=SELECT count(*) FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? mysql.table_schema=SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION mysql.grants=SHOW GRANTS FOR CURRENT_USER() +mysql.check_db_exist=SELECT count(*) FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ? postgres.wal=show wal_level postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND tablename = ?) postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index cac81d904d038..8519af92c7e07 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -707,7 +707,7 @@ fn gen_table_plan_inner( return Err(ErrorCode::InvalidInputSyntax( "When PRIMARY KEY constraint applied to an APPEND ONLY table, the ON CONFLICT behavior must be DO NOTHING.".to_owned(), ) - .into()); + .into()); } Some(on_conflict) } else { @@ -885,18 +885,17 @@ fn derive_with_options_for_cdc_table( ) -> Result { use source::cdc::{MYSQL_CDC_CONNECTOR, POSTGRES_CDC_CONNECTOR, SQL_SERVER_CDC_CONNECTOR}; // we should remove the prefix from `full_table_name` - let mut connect_properties = source_with_properties.clone(); + let mut with_options = source_with_properties.clone(); if let Some(connector) = source_with_properties.get(UPSTREAM_SOURCE_KEY) { - let table_name = match connector.as_str() { + match connector.as_str() { MYSQL_CDC_CONNECTOR => { - let db_name = connect_properties.get(DATABASE_NAME_KEY).ok_or_else(|| { - anyhow!("{} not found in source properties", DATABASE_NAME_KEY) + // MySQL doesn't allow '.' in database name and table name, so we can split the + // external table name by '.' to get the table name + let (db_name, table_name) = external_table_name.split_once('.').ok_or_else(|| { + anyhow!("The upstream table name must contain schema name prefix, e.g. 'database.table'") })?; - - let prefix = format!("{}.", db_name.as_str()); - external_table_name - .strip_prefix(prefix.as_str()) - .ok_or_else(|| anyhow!("The upstream table name must contain database name prefix, e.g. 'mydb.table'."))? + with_options.insert(DATABASE_NAME_KEY.into(), db_name.into()); + with_options.insert(TABLE_NAME_KEY.into(), table_name.into()); } POSTGRES_CDC_CONNECTOR => { let (schema_name, table_name) = external_table_name @@ -904,9 +903,8 @@ fn derive_with_options_for_cdc_table( .ok_or_else(|| anyhow!("The upstream table name must contain schema name prefix, e.g. 'public.table'"))?; // insert 'schema.name' into connect properties - connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); - - table_name + with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); + with_options.insert(TABLE_NAME_KEY.into(), table_name.into()); } SQL_SERVER_CDC_CONNECTOR => { // SQL Server external table name is in 'databaseName.schemaName.tableName' pattern, @@ -924,9 +922,8 @@ fn derive_with_options_for_cdc_table( })?; // insert 'schema.name' into connect properties - connect_properties.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); - - table_name + with_options.insert(SCHEMA_NAME_KEY.into(), schema_name.into()); + with_options.insert(TABLE_NAME_KEY.into(), table_name.into()); } _ => { return Err(RwError::from(anyhow!( @@ -935,9 +932,8 @@ fn derive_with_options_for_cdc_table( ))); } }; - connect_properties.insert(TABLE_NAME_KEY.into(), table_name.into()); } - Ok(connect_properties) + Ok(with_options) } #[allow(clippy::too_many_arguments)] @@ -1034,7 +1030,7 @@ pub(super) async fn handle_create_table_plan( )?; source.clone() }; - let connect_properties = derive_with_options_for_cdc_table( + let cdc_with_options = derive_with_options_for_cdc_table( &source.with_properties, cdc_table.external_table_name.clone(), )?; @@ -1042,7 +1038,7 @@ pub(super) async fn handle_create_table_plan( let (columns, pk_names) = derive_schema_for_cdc_table( &column_defs, &constraints, - connect_properties.clone(), + cdc_with_options.clone(), wildcard_idx.is_some(), None, ) @@ -1057,7 +1053,7 @@ pub(super) async fn handle_create_table_plan( column_defs, columns, pk_names, - connect_properties, + cdc_with_options, col_id_gen, on_conflict, with_version_column, @@ -1162,7 +1158,7 @@ struct CdcSchemaChangeArgs { async fn derive_schema_for_cdc_table( column_defs: &Vec, constraints: &Vec, - connect_properties: WithOptionsSecResolved, + cdc_with_options: WithOptionsSecResolved, need_auto_schema_map: bool, schema_change_args: Option, ) -> Result<(Vec, Vec)> { @@ -1176,7 +1172,7 @@ async fn derive_schema_for_cdc_table( "Please define the schema manually".to_owned(), ) })?; - let (options, secret_refs) = connect_properties.into_parts(); + let (options, secret_refs) = cdc_with_options.into_parts(); let config = ExternalTableConfig::try_from_btreemap(options, secret_refs) .context("failed to extract external table config")?;