From 7920e3df88f453516885d9a7ce076f4afbba7800 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Wed, 16 Oct 2024 10:54:53 -0500 Subject: [PATCH] feat(mssql-cdc): add ssl support for sqlserver-cdc (#18912) --- .../connector/source/common/DbzConnectorConfig.java | 1 + .../connector/source/common/SqlServerValidator.java | 4 ++++ .../connector/source/common/ValidatorUtils.java | 3 +-- .../src/main/resources/sql_server.properties | 3 ++- src/connector/src/source/cdc/external/mod.rs | 5 +++++ src/connector/src/source/cdc/external/mysql.rs | 1 + src/connector/src/source/cdc/external/postgres.rs | 1 + src/connector/src/source/cdc/external/sql_server.rs | 8 ++++++-- 8 files changed, 21 insertions(+), 5 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 8ba569c7aea72..98f0a39a2a3dd 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -58,6 +58,7 @@ public class DbzConnectorConfig { /* Sql Server configs */ public static final String SQL_SERVER_SCHEMA_NAME = "schema.name"; + public static final String SQL_SERVER_ENCRYPT = "database.encrypt"; /* RisingWave configs */ private static final String DBZ_CONFIG_FILE = "debezium.properties"; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java index 56b1f679eb687..4c6e9083071d8 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/SqlServerValidator.java @@ -50,8 +50,12 @@ public SqlServerValidator( var dbName = userProps.get(DbzConnectorConfig.DB_NAME); var user = userProps.get(DbzConnectorConfig.USER); var password = userProps.get(DbzConnectorConfig.PASSWORD); + var encrypt = + Boolean.parseBoolean( + userProps.getOrDefault(DbzConnectorConfig.SQL_SERVER_ENCRYPT, "false")); var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.SQL_SERVER, dbHost, dbPort, dbName); + jdbcUrl = jdbcUrl + ";encrypt=" + encrypt + ";trustServerCertificate=true"; this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); this.dbName = dbName; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java index 4b79280e62daf..704694abd044d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/ValidatorUtils.java @@ -68,8 +68,7 @@ public static String getJdbcUrl( return String.format("jdbc:postgresql://%s:%s/%s", host, port, database); case SQL_SERVER: return String.format( - "jdbc:sqlserver://%s:%s;databaseName=%s;encrypt=false", - host, port, database); + "jdbc:sqlserver://%s:%s;databaseName=%s", host, port, database); default: throw ValidatorUtils.invalidArgument("Unknown source type: " + sourceType); } diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties b/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties index 0e0c55c939ef5..cd3b659eee16f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/sql_server.properties @@ -21,4 +21,5 @@ name=${hostname}:${port}:${database.name}.${schema.name}.${table.name:-RW_CDC_Sh # In sharing cdc mode, transaction metadata will be enabled in frontend. # For sql server, it's always false actually. provide.transaction.metadata=${transactional:-false} -database.encrypt=false +database.encrypt=${database.encrypt:-false} +database.trustServerCertificate=true diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index 7a73f9b9bce98..fbfa66ef0e7c6 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -243,6 +243,11 @@ pub struct ExternalTableConfig { #[serde(rename = "ssl.root.cert")] #[serde(alias = "debezium.database.sslrootcert")] pub ssl_root_cert: Option, + + /// `encrypt` specifies whether connect to SQL Server using SSL. + /// Only "true" means using SSL. All other values are treated as "false". + #[serde(rename = "database.encrypt", default = "Default::default")] + pub encrypt: String, } impl ExternalTableConfig { diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index c45f6143431df..be0ecf1199291 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -598,6 +598,7 @@ mod tests { table: "part".to_string(), ssl_mode: Default::default(), ssl_root_cert: None, + encrypt: "false".to_string(), }; let table = MySqlExternalTable::connect(config).await.unwrap(); diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 2e62520f35b18..112fd16e6bff5 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -540,6 +540,7 @@ mod tests { table: "mytest".to_string(), ssl_mode: Default::default(), ssl_root_cert: None, + encrypt: "false".to_string(), }; let table = PostgresExternalTable::connect(config).await.unwrap(); diff --git a/src/connector/src/source/cdc/external/sql_server.rs b/src/connector/src/source/cdc/external/sql_server.rs index 3c4e9994cc7ef..4350f3125fcd3 100644 --- a/src/connector/src/source/cdc/external/sql_server.rs +++ b/src/connector/src/source/cdc/external/sql_server.rs @@ -90,8 +90,10 @@ impl SqlServerExternalTable { &config.username, &config.password, )); - // TODO(kexiang): add ssl support // TODO(kexiang): use trust_cert_ca, trust_cert is not secure + if config.encrypt == "true" { + client_config.encryption(tiberius::EncryptionLevel::Required); + } client_config.trust_cert(); let mut client = SqlServerClient::new_with_config(client_config).await?; @@ -282,8 +284,10 @@ impl SqlServerExternalTableReader { &config.username, &config.password, )); - // TODO(kexiang): add ssl support // TODO(kexiang): use trust_cert_ca, trust_cert is not secure + if config.encrypt == "true" { + client_config.encryption(tiberius::EncryptionLevel::Required); + } client_config.trust_cert(); let client = SqlServerClient::new_with_config(client_config).await?;