Skip to content

Commit

Permalink
fix: remove privatelink.targets from with_options in frontend (#14907)
Browse files Browse the repository at this point in the history
Co-authored-by: xxchan <[email protected]>
  • Loading branch information
StrikeW and xxchan authored Feb 4, 2024
1 parent 887a9dd commit 39ea30e
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 22 deletions.
15 changes: 10 additions & 5 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
// sink.

pub const BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -148,10 +148,6 @@ pub struct KafkaCommon {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

#[serde(rename = "broker.rewrite.endpoints")]
#[serde_as(as = "Option<JsonString>")]
pub broker_rewrite_map: Option<HashMap<String, String>>,

#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

Expand Down Expand Up @@ -224,6 +220,15 @@ pub struct KafkaCommon {
sasl_oathbearer_config: Option<String>,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
#[serde_as(as = "Option<JsonString>")]
pub broker_rewrite_map: Option<HashMap<String, String>>,
}

const fn default_kafka_sync_call_timeout() -> Duration {
Duration::from_secs(5)
}
Expand Down
16 changes: 14 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use with_options::WithOptions;

use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::common::{KafkaCommon, RdKafkaPropertiesCommon};
use crate::common::{KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
Expand Down Expand Up @@ -232,6 +232,9 @@ pub struct KafkaConfig {

#[serde(flatten)]
pub rdkafka_properties_producer: RdKafkaPropertiesProducer,

#[serde(flatten)]
pub privatelink_common: KafkaPrivateLinkCommon,
}

impl KafkaConfig {
Expand Down Expand Up @@ -261,6 +264,7 @@ impl From<KafkaConfig> for KafkaProperties {
common: val.common,
rdkafka_properties_common: val.rdkafka_properties_common,
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
unknown_fields: Default::default(),
}
}
Expand Down Expand Up @@ -403,7 +407,7 @@ impl KafkaSinkWriter {

// Create the producer context, will be used to create the producer
let producer_ctx = PrivateLinkProducerContext::new(
config.common.broker_rewrite_map.clone(),
config.privatelink_common.broker_rewrite_map.clone(),
// fixme: enable kafka native metrics for sink
None,
None,
Expand Down Expand Up @@ -656,13 +660,21 @@ mod test {
"properties.sasl.password".to_string() => "test".to_string(),
"properties.retry.max".to_string() => "20".to_string(),
"properties.retry.interval".to_string() => "500ms".to_string(),
// PrivateLink
"broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(),
};
let config = KafkaConfig::from_hashmap(properties).unwrap();
assert_eq!(config.common.brokers, "localhost:9092");
assert_eq!(config.common.topic, "test");
assert_eq!(config.max_retry_num, 20);
assert_eq!(config.retry_interval, Duration::from_millis(500));

// PrivateLink fields
let hashmap: HashMap<String, String> = hashmap! {
"broker1".to_string() => "10.0.0.1:8001".to_string()
};
assert_eq!(config.privatelink_common.broker_rewrite_map, Some(hashmap));

// Optional fields eliminated.
let properties: HashMap<String, String> = hashmap! {
// "connector".to_string() => "kafka".to_string(),
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,11 @@ mod tests {

let props = ConnectorProperties::extract(props, true).unwrap();
if let ConnectorProperties::Kafka(k) = props {
assert!(k.common.broker_rewrite_map.is_some());
println!("{:?}", k.common.broker_rewrite_map);
let hashmap: HashMap<String, String> = hashmap! {
"b-1:9092".to_string() => "dns-1".to_string(),
"b-2:9092".to_string() => "dns-2".to_string(),
};
assert_eq!(k.privatelink_common.broker_rewrite_map, Some(hashmap));
} else {
panic!("extract kafka config failed");
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
let common_props = &properties.common;

let broker_address = common_props.brokers.clone();
let broker_rewrite_map = common_props.broker_rewrite_map.clone();
let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
let topic = common_props.topic.clone();
config.set("bootstrap.servers", &broker_address);
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::collections::HashMap;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

use crate::common::KafkaPrivateLinkCommon;

pub mod enumerator;
pub mod private_link;
pub mod source;
Expand Down Expand Up @@ -130,6 +132,9 @@ pub struct KafkaProperties {
#[serde(flatten)]
pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,

#[serde(flatten)]
pub privatelink_common: KafkaPrivateLinkCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
Expand Down Expand Up @@ -208,6 +213,8 @@ mod test {
"properties.fetch.max.bytes".to_string() => "114514".to_string(),
"properties.enable.auto.commit".to_string() => "true".to_string(),
"properties.fetch.queue.backoff.ms".to_string() => "114514".to_string(),
// PrivateLink
"broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(),
};

let props: KafkaProperties =
Expand Down Expand Up @@ -246,5 +253,9 @@ mod test {
props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
Some(114514)
);
let hashmap: HashMap<String, String> = hashmap! {
"broker1".to_string() => "10.0.0.1:8001".to_string()
};
assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
}
}
14 changes: 9 additions & 5 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::catalog::connection::PrivateLinkService;

use crate::common::{AwsPrivateLinkItem, BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY};
use crate::common::{
AwsPrivateLinkItem, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
};
use crate::source::kafka::stats::RdKafkaStats;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};
use crate::source::KAFKA_CONNECTOR;
Expand Down Expand Up @@ -211,16 +213,18 @@ fn is_kafka_connector(with_properties: &BTreeMap<String, String>) -> bool {
}

pub fn insert_privatelink_broker_rewrite_map(
properties: &mut BTreeMap<String, String>,
with_options: &mut BTreeMap<String, String>,
svc: Option<&PrivateLinkService>,
privatelink_endpoint: Option<String>,
) -> anyhow::Result<()> {
let mut broker_rewrite_map = HashMap::new();
let servers = get_property_required(properties, kafka_props_broker_key(properties))?;
let servers = get_property_required(with_options, kafka_props_broker_key(with_options))?;
let broker_addrs = servers.split(',').collect_vec();
let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?;
let link_target_value = get_property_required(with_options, PRIVATE_LINK_TARGETS_KEY)?;
let link_targets: Vec<AwsPrivateLinkItem> =
serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?;
// remove the private link targets from WITH options, as they are useless after we constructed the rewrite mapping
with_options.remove(PRIVATE_LINK_TARGETS_KEY);

if broker_addrs.len() != link_targets.len() {
return Err(anyhow!(
Expand Down Expand Up @@ -259,6 +263,6 @@ pub fn insert_privatelink_broker_rewrite_map(
// save private link dns names into source properties, which
// will be extracted into KafkaProperties
let json = serde_json::to_string(&broker_rewrite_map).map_err(|e| anyhow!(e))?;
properties.insert(BROKER_REWRITE_MAP_KEY.to_string(), json);
with_options.insert(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY.to_string(), json);
Ok(())
}
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl SplitReader for KafkaSplitReader {
let mut config = ClientConfig::new();

let bootstrap_servers = &properties.common.brokers;
let broker_rewrite_map = properties.common.broker_rewrite_map.clone();
let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();

// disable partition eof
config.set("enable.partition.eof", "false");
Expand Down
7 changes: 4 additions & 3 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ KafkaConfig:
field_type: String
required: true
alias: kafka.brokers
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
required: false
- name: topic
field_type: String
required: true
Expand Down Expand Up @@ -305,6 +302,10 @@ KafkaConfig:
comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking.
required: false
default: '5'
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
required: false
KinesisSinkConfig:
fields:
- name: stream
Expand Down
7 changes: 4 additions & 3 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ KafkaProperties:
field_type: String
required: true
alias: kafka.brokers
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
required: false
- name: topic
field_type: String
required: true
Expand Down Expand Up @@ -169,6 +166,10 @@ KafkaProperties:
field_type: bool
comments: 'Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). default: true'
required: false
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
required: false
KinesisProperties:
fields:
- name: scan.startup.mode
Expand Down

0 comments on commit 39ea30e

Please sign in to comment.