diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index fb8aa62916f60..8ba569c7aea72 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -54,6 +54,7 @@ public class DbzConnectorConfig { public static final String PG_PUB_NAME = "publication.name"; public static final String PG_PUB_CREATE = "publication.create.enable"; public static final String PG_SCHEMA_NAME = "schema.name"; + public static final String PG_SSL_ROOT_CERT = "ssl.root.cert"; /* Sql Server configs */ public static final String SQL_SERVER_SCHEMA_NAME = "schema.name"; @@ -211,6 +212,10 @@ public DbzConnectorConfig( LOG.info("Disable table filtering for the shared Postgres source"); dbzProps.remove("table.include.list"); } + + if (userProps.containsKey(PG_SSL_ROOT_CERT)) { + dbzProps.setProperty("database.sslrootcert", userProps.get(PG_SSL_ROOT_CERT)); + } } else if (source == SourceTypeE.CITUS) { var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor); diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties index 06c4210fcf468..c36b62a7aa531 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/postgres.properties @@ -7,6 +7,7 @@ database.port=${port} database.user=${username} database.password=${password} database.dbname=${database.name} +database.sslmode=${ssl.mode:-prefer} table.include.list=${schema.name}.${table.name} # The name of the PostgreSQL replication slot slot.name=${slot.name} diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index be1c891b8d078..7a73f9b9bce98 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -237,7 +237,12 @@ pub struct ExternalTableConfig { /// Choices include `disabled`, `preferred`, and `required`. /// This field is optional. #[serde(rename = "ssl.mode", default = "Default::default")] - pub sslmode: SslMode, + #[serde(alias = "debezium.database.sslmode")] + pub ssl_mode: SslMode, + + #[serde(rename = "ssl.root.cert")] + #[serde(alias = "debezium.database.sslrootcert")] + pub ssl_root_cert: Option, } impl ExternalTableConfig { @@ -253,7 +258,7 @@ impl ExternalTableConfig { } } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, PartialEq, Deserialize)] #[serde(rename_all = "lowercase")] pub enum SslMode { #[serde(alias = "disable")] @@ -262,6 +267,14 @@ pub enum SslMode { Preferred, #[serde(alias = "require")] Required, + /// verify that the server is trustworthy by checking the certificate chain + /// up to the root certificate stored on the client. + #[serde(alias = "verify-ca")] + VerifyCa, + /// Besides verify the certificate, will also verify that the serverhost name + /// matches the name stored in the server certificate. + #[serde(alias = "verify-full")] + VerifyFull, } impl Default for SslMode { @@ -277,6 +290,8 @@ impl fmt::Display for SslMode { SslMode::Disabled => "disabled", SslMode::Preferred => "preferred", SslMode::Required => "required", + SslMode::VerifyCa => "verify-ca", + SslMode::VerifyFull => "verify-full", }) } } diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index 0e7ec02cfac27..59971f8761068 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -85,9 +85,12 @@ impl MySqlExternalTable { .host(&config.host) .port(config.port.parse::().unwrap()) .database(&config.database) - .ssl_mode(match config.sslmode { + .ssl_mode(match config.ssl_mode { SslMode::Disabled | SslMode::Preferred => sqlx::mysql::MySqlSslMode::Disabled, SslMode::Required => sqlx::mysql::MySqlSslMode::Required, + _ => { + return Err(anyhow!("unsupported SSL mode").into()); + } }); let connection = MySqlPool::connect_with(options).await?; @@ -308,9 +311,10 @@ impl MySqlExternalTableReader { .tcp_port(config.port.parse::().unwrap()) .db_name(Some(config.database)); - opts_builder = match config.sslmode { + opts_builder = match config.ssl_mode { SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None), - SslMode::Required => { + // verify-ca and verify-full are same as required for mysql now + SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => { let ssl_without_verify = mysql_async::SslOpts::default() .with_danger_accept_invalid_certs(true) .with_danger_skip_domain_validation(true); @@ -529,7 +533,8 @@ mod tests { database: "mydb".to_string(), schema: "".to_string(), table: "part".to_string(), - sslmode: Default::default(), + ssl_mode: Default::default(), + ssl_root_cert: None, }; let table = MySqlExternalTable::connect(config).await.unwrap(); diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index ca0caf46d6125..9123c7451b74e 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -86,18 +86,26 @@ pub struct PostgresExternalTable { impl PostgresExternalTable { pub async fn connect(config: ExternalTableConfig) -> ConnectorResult { tracing::debug!("connect to postgres external table"); - let options = PgConnectOptions::new() + let mut options = PgConnectOptions::new() .username(&config.username) .password(&config.password) .host(&config.host) .port(config.port.parse::().unwrap()) .database(&config.database) - .ssl_mode(match config.sslmode { + .ssl_mode(match config.ssl_mode { SslMode::Disabled => PgSslMode::Disable, SslMode::Preferred => PgSslMode::Prefer, SslMode::Required => PgSslMode::Require, + SslMode::VerifyCa => PgSslMode::VerifyCa, + SslMode::VerifyFull => PgSslMode::VerifyFull, }); + if config.ssl_mode == SslMode::VerifyCa || config.ssl_mode == SslMode::VerifyFull { + if let Some(ref root_cert) = config.ssl_root_cert { + options = options.ssl_root_cert(root_cert.as_str()); + } + } + let connection = PgPool::connect_with(options).await?; let schema_discovery = SchemaDiscovery::new(connection, config.schema.as_str()); // fetch column schema and primary key @@ -288,8 +296,14 @@ impl PostgresExternalTableReader { .port(config.port.parse::().unwrap()) .dbname(&config.database); + let (_verify_ca, verify_hostname) = match config.ssl_mode { + SslMode::VerifyCa => (true, false), + SslMode::VerifyFull => (true, true), + _ => (false, false), + }; + #[cfg(not(madsim))] - let connector = match config.sslmode { + let connector = match config.ssl_mode { SslMode::Disabled => { pg_config.ssl_mode(tokio_postgres::config::SslMode::Disable); MaybeMakeTlsConnector::NoTls(NoTls) @@ -315,6 +329,24 @@ impl PostgresExternalTableReader { builder.set_verify(SslVerifyMode::NONE); MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build())) } + + SslMode::VerifyCa | SslMode::VerifyFull => { + pg_config.ssl_mode(tokio_postgres::config::SslMode::Require); + let mut builder = SslConnector::builder(SslMethod::tls())?; + if let Some(ssl_root_cert) = config.ssl_root_cert { + builder.set_ca_file(ssl_root_cert).map_err(|e| { + anyhow!(format!("bad ssl root cert error: {}", e.to_report_string())) + })?; + } + let mut connector = MakeTlsConnector::new(builder.build()); + if !verify_hostname { + connector.set_callback(|config, _| { + config.set_verify_hostname(false); + Ok(()) + }); + } + MaybeMakeTlsConnector::Tls(connector) + } }; #[cfg(madsim)] let connector = NoTls; @@ -482,7 +514,8 @@ mod tests { database: "mydb".to_string(), schema: "public".to_string(), table: "mytest".to_string(), - sslmode: Default::default(), + ssl_mode: Default::default(), + ssl_root_cert: None, }; let table = PostgresExternalTable::connect(config).await.unwrap();