From 39318f8f3135afa1a808cc0e1ac3110914764349 Mon Sep 17 00:00:00 2001 From: lmatz Date: Mon, 2 Sep 2024 21:37:02 +0800 Subject: [PATCH] feat: support more SSL related configurations in Kafka connector --- src/connector/src/connector_common/common.rs | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index b522ae2eda560..9f4211aedd4d9 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -192,14 +192,26 @@ pub struct KafkaCommon { #[serde(rename = "properties.ssl.ca.location")] ssl_ca_location: Option, + /// CA certificate string (PEM format) for verifying the broker's key. + #[serde(rename = "properties.ssl.ca.pem")] + ssl_ca_pem: Option, + /// Path to client's certificate file (PEM). #[serde(rename = "properties.ssl.certificate.location")] ssl_certificate_location: Option, + /// Client's public key string (PEM format) used for authentication. + #[serde(rename = "properties.ssl.certificate.pem")] + ssl_certificate_pem: Option, + /// Path to client's private key file (PEM). #[serde(rename = "properties.ssl.key.location")] ssl_key_location: Option, + /// Client's private key string (PEM format) used for authentication. + #[serde(rename = "properties.ssl.key.pem")] + ssl_key_pem: Option, + /// Passphrase of client's private key. #[serde(rename = "properties.ssl.key.password")] ssl_key_password: Option, @@ -325,12 +337,21 @@ impl KafkaCommon { if let Some(ssl_ca_location) = self.ssl_ca_location.as_ref() { config.set("ssl.ca.location", ssl_ca_location); } + if let Some(ssl_ca_pem) = self.ssl_ca_pem.as_ref() { + config.set("ssl.ca.pem", ssl_ca_pem); + } if let Some(ssl_certificate_location) = self.ssl_certificate_location.as_ref() { config.set("ssl.certificate.location", ssl_certificate_location); } + if let Some(ssl_certificate_pem) = self.ssl_certificate_pem.as_ref() { + config.set("ssl.certificate.pem", ssl_certificate_pem); + } if let Some(ssl_key_location) = self.ssl_key_location.as_ref() { config.set("ssl.key.location", ssl_key_location); } + if let Some(ssl_key_pem) = self.ssl_key_pem.as_ref() { + config.set("ssl.key.pem", ssl_key_pem); + } if let Some(ssl_key_password) = self.ssl_key_password.as_ref() { config.set("ssl.key.password", ssl_key_password); }