diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index df61dc157b40b..f55953a7e550e 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -41,7 +41,7 @@ use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and // sink. -pub const BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints"; +pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "privatelink.broker.rewrite.endpoints"; pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets"; #[derive(Debug, Clone, Deserialize)] @@ -148,10 +148,6 @@ pub struct KafkaCommon { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, - #[serde(rename = "broker.rewrite.endpoints")] - #[serde_as(as = "Option")] - pub broker_rewrite_map: Option>, - #[serde(rename = "topic", alias = "kafka.topic")] pub topic: String, @@ -224,6 +220,20 @@ pub struct KafkaCommon { sasl_oathbearer_config: Option, } +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions)] +pub struct KafkaPrivateLinkCommon { + #[serde(rename = "privatelink.targets")] + pub private_link_targets: Option, + + #[serde(rename = "privatelink.endpoint")] + pub private_link_endpoint: Option, + + #[serde(rename = "privatelink.broker.rewrite.endpoints")] + #[serde_as(as = "Option")] + pub broker_rewrite_map: Option>, +} + const fn default_kafka_sync_call_timeout() -> Duration { Duration::from_secs(5) } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 0c59780916cb4..7c0484b14edc7 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -33,7 +33,7 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; -use crate::common::{KafkaCommon, RdKafkaPropertiesCommon}; +use crate::common::{KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -232,6 +232,9 @@ pub struct KafkaConfig { #[serde(flatten)] pub rdkafka_properties_producer: RdKafkaPropertiesProducer, + + #[serde(flatten)] + pub privatelink_common: KafkaPrivateLinkCommon, } impl KafkaConfig { @@ -261,6 +264,7 @@ impl From for KafkaProperties { common: val.common, rdkafka_properties_common: val.rdkafka_properties_common, rdkafka_properties_consumer: Default::default(), + privatelink_common: val.privatelink_common, unknown_fields: Default::default(), } } @@ -403,7 +407,7 @@ impl KafkaSinkWriter { // Create the producer context, will be used to create the producer let producer_ctx = PrivateLinkProducerContext::new( - config.common.broker_rewrite_map.clone(), + config.privatelink_common.broker_rewrite_map.clone(), // fixme: enable kafka native metrics for sink None, None, @@ -656,6 +660,10 @@ mod test { "properties.sasl.password".to_string() => "test".to_string(), "properties.retry.max".to_string() => "20".to_string(), "properties.retry.interval".to_string() => "500ms".to_string(), + // PrivateLink + "privatelink.targets".to_string() => "[{\"port\": 9292}]".to_string(), + "privatelink.endpoint".to_string() => "10.0.0.1".to_string(), + "privatelink.broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(), }; let config = KafkaConfig::from_hashmap(properties).unwrap(); assert_eq!(config.common.brokers, "localhost:9092"); @@ -663,6 +671,20 @@ mod test { assert_eq!(config.max_retry_num, 20); assert_eq!(config.retry_interval, Duration::from_millis(500)); + // PrivateLink fields + assert_eq!( + config.privatelink_common.private_link_endpoint, + Some("10.0.0.1".into()) + ); + assert_eq!( + config.privatelink_common.private_link_targets, + Some("[{\"port\": 9292}]".into()) + ); + let hashmap: HashMap = hashmap! { + "broker1".to_string() => "10.0.0.1:8001".to_string() + }; + assert_eq!(config.privatelink_common.broker_rewrite_map, Some(hashmap)); + // Optional fields eliminated. let properties: HashMap = hashmap! { // "connector".to_string() => "kafka".to_string(), diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 3f027b2dddfe9..9a34db2a8ad14 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -713,8 +713,8 @@ mod tests { let props = ConnectorProperties::extract(props, true).unwrap(); if let ConnectorProperties::Kafka(k) = props { - assert!(k.common.broker_rewrite_map.is_some()); - println!("{:?}", k.common.broker_rewrite_map); + assert!(k.privatelink_common.broker_rewrite_map.is_some()); + println!("{:?}", k.privatelink_common.broker_rewrite_map); } else { panic!("extract kafka config failed"); } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index b0ae57f610492..9850a4c244920 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -62,7 +62,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { let common_props = &properties.common; let broker_address = common_props.brokers.clone(); - let broker_rewrite_map = common_props.broker_rewrite_map.clone(); + let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); config.set("isolation.level", KAFKA_ISOLATION_LEVEL); diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index d2879302f7ece..c527dfb9786eb 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,6 +17,8 @@ use std::collections::HashMap; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; +use crate::common::KafkaPrivateLinkCommon; + pub mod enumerator; pub mod private_link; pub mod source; @@ -130,6 +132,9 @@ pub struct KafkaProperties { #[serde(flatten)] pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer, + #[serde(flatten)] + pub privatelink_common: KafkaPrivateLinkCommon, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -208,6 +213,10 @@ mod test { "properties.fetch.max.bytes".to_string() => "114514".to_string(), "properties.enable.auto.commit".to_string() => "true".to_string(), "properties.fetch.queue.backoff.ms".to_string() => "114514".to_string(), + // PrivateLink + "privatelink.targets".to_string() => "[{\"port\": 9292}]".to_string(), + "privatelink.endpoint".to_string() => "10.0.0.1".to_string(), + "privatelink.broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(), }; let props: KafkaProperties = @@ -246,5 +255,18 @@ mod test { props.rdkafka_properties_consumer.fetch_queue_backoff_ms, Some(114514) ); + assert_eq!( + props.privatelink_common.private_link_endpoint, + Some("10.0.0.1".into()) + ); + assert_eq!( + props.privatelink_common.private_link_targets, + Some("[{\"port\": 9292}]".into()) + ); + + let hashmap: HashMap = hashmap! { + "broker1".to_string() => "10.0.0.1:8001".to_string() + }; + assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap)); } } diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 5ba99a87d414e..ce84eb5a76cd5 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -26,7 +26,9 @@ use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::connection::PrivateLinkService; -use crate::common::{AwsPrivateLinkItem, BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY}; +use crate::common::{ + AwsPrivateLinkItem, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, +}; use crate::source::kafka::stats::RdKafkaStats; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; use crate::source::KAFKA_CONNECTOR; @@ -259,6 +261,6 @@ pub fn insert_privatelink_broker_rewrite_map( // save private link dns names into source properties, which // will be extracted into KafkaProperties let json = serde_json::to_string(&broker_rewrite_map).map_err(|e| anyhow!(e))?; - properties.insert(BROKER_REWRITE_MAP_KEY.to_string(), json); + properties.insert(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY.to_string(), json); Ok(()) } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index ed9a7929c3231..2a0ef8babe992 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -61,7 +61,7 @@ impl SplitReader for KafkaSplitReader { let mut config = ClientConfig::new(); let bootstrap_servers = &properties.common.brokers; - let broker_rewrite_map = properties.common.broker_rewrite_map.clone(); + let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); // disable partition eof config.set("enable.partition.eof", "false");