Skip to content

Commit

Permalink
feat(cdc): support for verify-ca and verify-full of Postgres SSL (#18015
Browse files Browse the repository at this point in the history
)
  • Loading branch information
StrikeW authored Sep 12, 2024
1 parent b9ac1ac commit 9533cd9
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class DbzConnectorConfig {
public static final String PG_PUB_NAME = "publication.name";
public static final String PG_PUB_CREATE = "publication.create.enable";
public static final String PG_SCHEMA_NAME = "schema.name";
public static final String PG_SSL_ROOT_CERT = "ssl.root.cert";

/* Sql Server configs */
public static final String SQL_SERVER_SCHEMA_NAME = "schema.name";
Expand Down Expand Up @@ -211,6 +212,10 @@ public DbzConnectorConfig(
LOG.info("Disable table filtering for the shared Postgres source");
dbzProps.remove("table.include.list");
}

if (userProps.containsKey(PG_SSL_ROOT_CERT)) {
dbzProps.setProperty("database.sslrootcert", userProps.get(PG_SSL_ROOT_CERT));
}
} else if (source == SourceTypeE.CITUS) {
var postgresProps = initiateDbConfig(POSTGRES_CONFIG_FILE, substitutor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ database.port=${port}
database.user=${username}
database.password=${password}
database.dbname=${database.name}
database.sslmode=${ssl.mode:-prefer}
table.include.list=${schema.name}.${table.name}
# The name of the PostgreSQL replication slot
slot.name=${slot.name}
Expand Down
19 changes: 17 additions & 2 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,12 @@ pub struct ExternalTableConfig {
/// Choices include `disabled`, `preferred`, and `required`.
/// This field is optional.
#[serde(rename = "ssl.mode", default = "Default::default")]
pub sslmode: SslMode,
#[serde(alias = "debezium.database.sslmode")]
pub ssl_mode: SslMode,

#[serde(rename = "ssl.root.cert")]
#[serde(alias = "debezium.database.sslrootcert")]
pub ssl_root_cert: Option<String>,
}

impl ExternalTableConfig {
Expand All @@ -253,7 +258,7 @@ impl ExternalTableConfig {
}
}

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SslMode {
#[serde(alias = "disable")]
Expand All @@ -262,6 +267,14 @@ pub enum SslMode {
Preferred,
#[serde(alias = "require")]
Required,
/// verify that the server is trustworthy by checking the certificate chain
/// up to the root certificate stored on the client.
#[serde(alias = "verify-ca")]
VerifyCa,
/// Besides verify the certificate, will also verify that the serverhost name
/// matches the name stored in the server certificate.
#[serde(alias = "verify-full")]
VerifyFull,
}

impl Default for SslMode {
Expand All @@ -277,6 +290,8 @@ impl fmt::Display for SslMode {
SslMode::Disabled => "disabled",
SslMode::Preferred => "preferred",
SslMode::Required => "required",
SslMode::VerifyCa => "verify-ca",
SslMode::VerifyFull => "verify-full",
})
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,12 @@ impl MySqlExternalTable {
.host(&config.host)
.port(config.port.parse::<u16>().unwrap())
.database(&config.database)
.ssl_mode(match config.sslmode {
.ssl_mode(match config.ssl_mode {
SslMode::Disabled | SslMode::Preferred => sqlx::mysql::MySqlSslMode::Disabled,
SslMode::Required => sqlx::mysql::MySqlSslMode::Required,
_ => {
return Err(anyhow!("unsupported SSL mode").into());
}
});

let connection = MySqlPool::connect_with(options).await?;
Expand Down Expand Up @@ -308,9 +311,10 @@ impl MySqlExternalTableReader {
.tcp_port(config.port.parse::<u16>().unwrap())
.db_name(Some(config.database));

opts_builder = match config.sslmode {
opts_builder = match config.ssl_mode {
SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None),
SslMode::Required => {
// verify-ca and verify-full are same as required for mysql now
SslMode::Required | SslMode::VerifyCa | SslMode::VerifyFull => {
let ssl_without_verify = mysql_async::SslOpts::default()
.with_danger_accept_invalid_certs(true)
.with_danger_skip_domain_validation(true);
Expand Down Expand Up @@ -529,7 +533,8 @@ mod tests {
database: "mydb".to_string(),
schema: "".to_string(),
table: "part".to_string(),
sslmode: Default::default(),
ssl_mode: Default::default(),
ssl_root_cert: None,
};

let table = MySqlExternalTable::connect(config).await.unwrap();
Expand Down
41 changes: 37 additions & 4 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,26 @@ pub struct PostgresExternalTable {
impl PostgresExternalTable {
pub async fn connect(config: ExternalTableConfig) -> ConnectorResult<Self> {
tracing::debug!("connect to postgres external table");
let options = PgConnectOptions::new()
let mut options = PgConnectOptions::new()
.username(&config.username)
.password(&config.password)
.host(&config.host)
.port(config.port.parse::<u16>().unwrap())
.database(&config.database)
.ssl_mode(match config.sslmode {
.ssl_mode(match config.ssl_mode {
SslMode::Disabled => PgSslMode::Disable,
SslMode::Preferred => PgSslMode::Prefer,
SslMode::Required => PgSslMode::Require,
SslMode::VerifyCa => PgSslMode::VerifyCa,
SslMode::VerifyFull => PgSslMode::VerifyFull,
});

if config.ssl_mode == SslMode::VerifyCa || config.ssl_mode == SslMode::VerifyFull {
if let Some(ref root_cert) = config.ssl_root_cert {
options = options.ssl_root_cert(root_cert.as_str());
}
}

let connection = PgPool::connect_with(options).await?;
let schema_discovery = SchemaDiscovery::new(connection, config.schema.as_str());
// fetch column schema and primary key
Expand Down Expand Up @@ -288,8 +296,14 @@ impl PostgresExternalTableReader {
.port(config.port.parse::<u16>().unwrap())
.dbname(&config.database);

let (_verify_ca, verify_hostname) = match config.ssl_mode {
SslMode::VerifyCa => (true, false),
SslMode::VerifyFull => (true, true),
_ => (false, false),
};

#[cfg(not(madsim))]
let connector = match config.sslmode {
let connector = match config.ssl_mode {
SslMode::Disabled => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Disable);
MaybeMakeTlsConnector::NoTls(NoTls)
Expand All @@ -315,6 +329,24 @@ impl PostgresExternalTableReader {
builder.set_verify(SslVerifyMode::NONE);
MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build()))
}

SslMode::VerifyCa | SslMode::VerifyFull => {
pg_config.ssl_mode(tokio_postgres::config::SslMode::Require);
let mut builder = SslConnector::builder(SslMethod::tls())?;
if let Some(ssl_root_cert) = config.ssl_root_cert {
builder.set_ca_file(ssl_root_cert).map_err(|e| {
anyhow!(format!("bad ssl root cert error: {}", e.to_report_string()))
})?;
}
let mut connector = MakeTlsConnector::new(builder.build());
if !verify_hostname {
connector.set_callback(|config, _| {
config.set_verify_hostname(false);
Ok(())
});
}
MaybeMakeTlsConnector::Tls(connector)
}
};
#[cfg(madsim)]
let connector = NoTls;
Expand Down Expand Up @@ -482,7 +514,8 @@ mod tests {
database: "mydb".to_string(),
schema: "public".to_string(),
table: "mytest".to_string(),
sslmode: Default::default(),
ssl_mode: Default::default(),
ssl_root_cert: None,
};

let table = PostgresExternalTable::connect(config).await.unwrap();
Expand Down

0 comments on commit 9533cd9

Please sign in to comment.