diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index b522ae2eda560..9f4211aedd4d9 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -192,14 +192,26 @@ pub struct KafkaCommon { #[serde(rename = "properties.ssl.ca.location")] ssl_ca_location: Option, + /// CA certificate string (PEM format) for verifying the broker's key. + #[serde(rename = "properties.ssl.ca.pem")] + ssl_ca_pem: Option, + /// Path to client's certificate file (PEM). #[serde(rename = "properties.ssl.certificate.location")] ssl_certificate_location: Option, + /// Client's public key string (PEM format) used for authentication. + #[serde(rename = "properties.ssl.certificate.pem")] + ssl_certificate_pem: Option, + /// Path to client's private key file (PEM). #[serde(rename = "properties.ssl.key.location")] ssl_key_location: Option, + /// Client's private key string (PEM format) used for authentication. + #[serde(rename = "properties.ssl.key.pem")] + ssl_key_pem: Option, + /// Passphrase of client's private key. #[serde(rename = "properties.ssl.key.password")] ssl_key_password: Option, @@ -325,12 +337,21 @@ impl KafkaCommon { if let Some(ssl_ca_location) = self.ssl_ca_location.as_ref() { config.set("ssl.ca.location", ssl_ca_location); } + if let Some(ssl_ca_pem) = self.ssl_ca_pem.as_ref() { + config.set("ssl.ca.pem", ssl_ca_pem); + } if let Some(ssl_certificate_location) = self.ssl_certificate_location.as_ref() { config.set("ssl.certificate.location", ssl_certificate_location); } + if let Some(ssl_certificate_pem) = self.ssl_certificate_pem.as_ref() { + config.set("ssl.certificate.pem", ssl_certificate_pem); + } if let Some(ssl_key_location) = self.ssl_key_location.as_ref() { config.set("ssl.key.location", ssl_key_location); } + if let Some(ssl_key_pem) = self.ssl_key_pem.as_ref() { + config.set("ssl.key.pem", ssl_key_pem); + } if let Some(ssl_key_password) = self.ssl_key_password.as_ref() { config.set("ssl.key.password", ssl_key_password); } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index f321de880c72c..7fd1fa8b44ab9 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -377,14 +377,26 @@ KafkaConfig: field_type: String comments: Path to CA certificate file for verifying the broker's key. required: false + - name: properties.ssl.ca.pem + field_type: String + comments: CA certificate string (PEM format) for verifying the broker's key. + required: false - name: properties.ssl.certificate.location field_type: String comments: Path to client's certificate file (PEM). required: false + - name: properties.ssl.certificate.pem + field_type: String + comments: Client's public key string (PEM format) used for authentication. + required: false - name: properties.ssl.key.location field_type: String comments: Path to client's private key file (PEM). required: false + - name: properties.ssl.key.pem + field_type: String + comments: Client's private key string (PEM format) used for authentication. + required: false - name: properties.ssl.key.password field_type: String comments: Passphrase of client's private key. diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 4eaf1e0d3db4b..a6a19e80c89a3 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -199,14 +199,26 @@ KafkaProperties: field_type: String comments: Path to CA certificate file for verifying the broker's key. required: false + - name: properties.ssl.ca.pem + field_type: String + comments: CA certificate string (PEM format) for verifying the broker's key. + required: false - name: properties.ssl.certificate.location field_type: String comments: Path to client's certificate file (PEM). required: false + - name: properties.ssl.certificate.pem + field_type: String + comments: Client's public key string (PEM format) used for authentication. + required: false - name: properties.ssl.key.location field_type: String comments: Path to client's private key file (PEM). required: false + - name: properties.ssl.key.pem + field_type: String + comments: Client's private key string (PEM format) used for authentication. + required: false - name: properties.ssl.key.password field_type: String comments: Passphrase of client's private key.