From 028f0aea915b966e0bbdd555f7670331fa76aa18 Mon Sep 17 00:00:00 2001 From: Bohan Zhang Date: Fri, 15 Dec 2023 12:56:07 +0800 Subject: [PATCH] feat: allow kafka source/sink config `ssl_endpoint_identification_algorithm` (#13990) --- src/connector/src/common.rs | 12 ++++++++++++ src/connector/with_options_sink.yaml | 3 +++ src/connector/with_options_source.yaml | 3 +++ 3 files changed, 18 insertions(+) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index b96af4f1ed16f..25b00ebd66c22 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -167,6 +167,9 @@ pub struct KafkaCommon { #[serde(rename = "properties.security.protocol")] security_protocol: Option, + #[serde(rename = "properties.ssl.endpoint.identification.algorithm")] + ssl_endpoint_identification_algorithm: Option, + // For the properties below, please refer to [librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for more information. /// Path to CA certificate file for verifying the broker's key. #[serde(rename = "properties.ssl.ca.location")] @@ -294,6 +297,15 @@ impl KafkaCommon { if let Some(ssl_key_password) = self.ssl_key_password.as_ref() { config.set("ssl.key.password", ssl_key_password); } + if let Some(ssl_endpoint_identification_algorithm) = + self.ssl_endpoint_identification_algorithm.as_ref() + { + // accept only `none` and `http` here, let the sdk do the check + config.set( + "ssl.endpoint.identification.algorithm", + ssl_endpoint_identification_algorithm, + ); + } // SASL mechanism if let Some(sasl_mechanism) = self.sasl_mechanism.as_ref() { diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 4770fd621ad3c..ca36288a3c85a 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -170,6 +170,9 @@ KafkaConfig: field_type: String comments: Security protocol used for RisingWave to communicate with Kafka brokers. Could be PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. required: false + - name: properties.ssl.endpoint.identification.algorithm + field_type: String + required: false - name: properties.ssl.ca.location field_type: String comments: Path to CA certificate file for verifying the broker's key. diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 264443f6d7739..25e5ade6567d9 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -43,6 +43,9 @@ KafkaProperties: field_type: String comments: Security protocol used for RisingWave to communicate with Kafka brokers. Could be PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. required: false + - name: properties.ssl.endpoint.identification.algorithm + field_type: String + required: false - name: properties.ssl.ca.location field_type: String comments: Path to CA certificate file for verifying the broker's key.