diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index a9808e3a9e1e2..664796bbe12de 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -16,6 +16,7 @@ pub mod mock_external_table; mod postgres; use std::collections::HashMap; +use std::fmt; use anyhow::Context; use futures::stream::BoxStream; @@ -246,6 +247,32 @@ pub struct ExternalTableConfig { pub schema: String, #[serde(rename = "table.name")] pub table: String, + #[serde(rename = "ssl.name", default = "Default::default")] + pub sslmode: SslMode, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum SslMode { + Disable, + Prefer, + Require, +} + +impl Default for SslMode { + fn default() -> Self { + Self::Disable + } +} + +impl fmt::Display for SslMode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + SslMode::Disable => "disable", + SslMode::Prefer => "prefer", + SslMode::Require => "require", + }) + } } impl ExternalTableReader for MySqlExternalTableReader { diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 9f9a055fd7d8f..a2f740c51bece 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -128,8 +128,13 @@ impl PostgresExternalTableReader { .context("failed to extract postgres connector properties")?; let database_url = format!( - "postgresql://{}:{}@{}:{}/{}", - config.username, config.password, config.host, config.port, config.database + "postgresql://{}:{}@{}:{}/{}?sslmode={}", + config.username, + config.password, + config.host, + config.port, + config.database, + dbg!(&config.sslmode) ); let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?;