diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index df61dc157b40b..25179bb209866 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -41,7 +41,7 @@ use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and // sink. -pub const BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints"; +pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints"; pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets"; #[derive(Debug, Clone, Deserialize)] @@ -148,10 +148,6 @@ pub struct KafkaCommon { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, - #[serde(rename = "broker.rewrite.endpoints")] - #[serde_as(as = "Option")] - pub broker_rewrite_map: Option>, - #[serde(rename = "topic", alias = "kafka.topic")] pub topic: String, @@ -224,6 +220,15 @@ pub struct KafkaCommon { sasl_oathbearer_config: Option, } +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions)] +pub struct KafkaPrivateLinkCommon { + /// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. + #[serde(rename = "broker.rewrite.endpoints")] + #[serde_as(as = "Option")] + pub broker_rewrite_map: Option>, +} + const fn default_kafka_sync_call_timeout() -> Duration { Duration::from_secs(5) } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 0c59780916cb4..25681125f9069 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -33,7 +33,7 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; -use crate::common::{KafkaCommon, RdKafkaPropertiesCommon}; +use crate::common::{KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon}; use crate::sink::catalog::desc::SinkDesc; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -232,6 +232,9 @@ pub struct KafkaConfig { #[serde(flatten)] pub rdkafka_properties_producer: RdKafkaPropertiesProducer, + + #[serde(flatten)] + pub privatelink_common: KafkaPrivateLinkCommon, } impl KafkaConfig { @@ -261,6 +264,7 @@ impl From for KafkaProperties { common: val.common, rdkafka_properties_common: val.rdkafka_properties_common, rdkafka_properties_consumer: Default::default(), + privatelink_common: val.privatelink_common, unknown_fields: Default::default(), } } @@ -403,7 +407,7 @@ impl KafkaSinkWriter { // Create the producer context, will be used to create the producer let producer_ctx = PrivateLinkProducerContext::new( - config.common.broker_rewrite_map.clone(), + config.privatelink_common.broker_rewrite_map.clone(), // fixme: enable kafka native metrics for sink None, None, @@ -656,6 +660,8 @@ mod test { "properties.sasl.password".to_string() => "test".to_string(), "properties.retry.max".to_string() => "20".to_string(), "properties.retry.interval".to_string() => "500ms".to_string(), + // PrivateLink + "broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(), }; let config = KafkaConfig::from_hashmap(properties).unwrap(); assert_eq!(config.common.brokers, "localhost:9092"); @@ -663,6 +669,12 @@ mod test { assert_eq!(config.max_retry_num, 20); assert_eq!(config.retry_interval, Duration::from_millis(500)); + // PrivateLink fields + let hashmap: HashMap = hashmap! { + "broker1".to_string() => "10.0.0.1:8001".to_string() + }; + assert_eq!(config.privatelink_common.broker_rewrite_map, Some(hashmap)); + // Optional fields eliminated. let properties: HashMap = hashmap! { // "connector".to_string() => "kafka".to_string(), diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index faa857aae0dd6..941723bea2848 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -718,8 +718,11 @@ mod tests { let props = ConnectorProperties::extract(props, true).unwrap(); if let ConnectorProperties::Kafka(k) = props { - assert!(k.common.broker_rewrite_map.is_some()); - println!("{:?}", k.common.broker_rewrite_map); + let hashmap: HashMap = hashmap! { + "b-1:9092".to_string() => "dns-1".to_string(), + "b-2:9092".to_string() => "dns-2".to_string(), + }; + assert_eq!(k.privatelink_common.broker_rewrite_map, Some(hashmap)); } else { panic!("extract kafka config failed"); } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index b0ae57f610492..9850a4c244920 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -62,7 +62,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { let common_props = &properties.common; let broker_address = common_props.brokers.clone(); - let broker_rewrite_map = common_props.broker_rewrite_map.clone(); + let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); config.set("isolation.level", KAFKA_ISOLATION_LEVEL); diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index d2879302f7ece..52d410a8ee717 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,6 +17,8 @@ use std::collections::HashMap; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; +use crate::common::KafkaPrivateLinkCommon; + pub mod enumerator; pub mod private_link; pub mod source; @@ -130,6 +132,9 @@ pub struct KafkaProperties { #[serde(flatten)] pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer, + #[serde(flatten)] + pub privatelink_common: KafkaPrivateLinkCommon, + #[serde(flatten)] pub unknown_fields: HashMap, } @@ -208,6 +213,8 @@ mod test { "properties.fetch.max.bytes".to_string() => "114514".to_string(), "properties.enable.auto.commit".to_string() => "true".to_string(), "properties.fetch.queue.backoff.ms".to_string() => "114514".to_string(), + // PrivateLink + "broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(), }; let props: KafkaProperties = @@ -246,5 +253,9 @@ mod test { props.rdkafka_properties_consumer.fetch_queue_backoff_ms, Some(114514) ); + let hashmap: HashMap = hashmap! { + "broker1".to_string() => "10.0.0.1:8001".to_string() + }; + assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap)); } } diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 5ba99a87d414e..645588f457498 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -26,7 +26,9 @@ use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_pb::catalog::connection::PrivateLinkService; -use crate::common::{AwsPrivateLinkItem, BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY}; +use crate::common::{ + AwsPrivateLinkItem, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, +}; use crate::source::kafka::stats::RdKafkaStats; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; use crate::source::KAFKA_CONNECTOR; @@ -211,16 +213,18 @@ fn is_kafka_connector(with_properties: &BTreeMap) -> bool { } pub fn insert_privatelink_broker_rewrite_map( - properties: &mut BTreeMap, + with_options: &mut BTreeMap, svc: Option<&PrivateLinkService>, privatelink_endpoint: Option, ) -> anyhow::Result<()> { let mut broker_rewrite_map = HashMap::new(); - let servers = get_property_required(properties, kafka_props_broker_key(properties))?; + let servers = get_property_required(with_options, kafka_props_broker_key(with_options))?; let broker_addrs = servers.split(',').collect_vec(); - let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?; + let link_target_value = get_property_required(with_options, PRIVATE_LINK_TARGETS_KEY)?; let link_targets: Vec = serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?; + // remove the private link targets from WITH options, as they are useless after we constructed the rewrite mapping + with_options.remove(PRIVATE_LINK_TARGETS_KEY); if broker_addrs.len() != link_targets.len() { return Err(anyhow!( @@ -259,6 +263,6 @@ pub fn insert_privatelink_broker_rewrite_map( // save private link dns names into source properties, which // will be extracted into KafkaProperties let json = serde_json::to_string(&broker_rewrite_map).map_err(|e| anyhow!(e))?; - properties.insert(BROKER_REWRITE_MAP_KEY.to_string(), json); + with_options.insert(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY.to_string(), json); Ok(()) } diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index ed9a7929c3231..2a0ef8babe992 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -61,7 +61,7 @@ impl SplitReader for KafkaSplitReader { let mut config = ClientConfig::new(); let bootstrap_servers = &properties.common.brokers; - let broker_rewrite_map = properties.common.broker_rewrite_map.clone(); + let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); // disable partition eof config.set("enable.partition.eof", "false"); diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 15f9d40f67807..b2090d1dbf4bb 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -158,9 +158,6 @@ KafkaConfig: field_type: String required: true alias: kafka.brokers - - name: broker.rewrite.endpoints - field_type: HashMap - required: false - name: topic field_type: String required: true @@ -305,6 +302,10 @@ KafkaConfig: comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking. required: false default: '5' + - name: broker.rewrite.endpoints + field_type: HashMap + comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. + required: false KinesisSinkConfig: fields: - name: stream diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 84a125813733e..3ef323aa88574 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -61,9 +61,6 @@ KafkaProperties: field_type: String required: true alias: kafka.brokers - - name: broker.rewrite.endpoints - field_type: HashMap - required: false - name: topic field_type: String required: true @@ -169,6 +166,10 @@ KafkaProperties: field_type: bool comments: 'Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). default: true' required: false + - name: broker.rewrite.endpoints + field_type: HashMap + comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. + required: false KinesisProperties: fields: - name: scan.startup.mode