Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove privatelink.targets from with_options in frontend (#14907) #14967

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::source::nats::source::NatsOffset;
// The file describes the common abstractions for each connector and can be used in both source and
// sink.

pub const BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -148,10 +148,6 @@ pub struct KafkaCommon {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

#[serde(rename = "broker.rewrite.endpoints")]
#[serde_as(as = "Option<JsonString>")]
pub broker_rewrite_map: Option<HashMap<String, String>>,

#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

Expand Down Expand Up @@ -224,6 +220,15 @@ pub struct KafkaCommon {
sasl_oathbearer_config: Option<String>,
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
#[serde_as(as = "Option<JsonString>")]
pub broker_rewrite_map: Option<HashMap<String, String>>,
}

const fn default_kafka_sync_call_timeout() -> Duration {
Duration::from_secs(5)
}
Expand Down
16 changes: 14 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use with_options::WithOptions;

use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::common::{KafkaCommon, RdKafkaPropertiesCommon};
use crate::common::{KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon};
use crate::sink::catalog::desc::SinkDesc;
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
Expand Down Expand Up @@ -232,6 +232,9 @@ pub struct KafkaConfig {

#[serde(flatten)]
pub rdkafka_properties_producer: RdKafkaPropertiesProducer,

#[serde(flatten)]
pub privatelink_common: KafkaPrivateLinkCommon,
}

impl KafkaConfig {
Expand Down Expand Up @@ -261,6 +264,7 @@ impl From<KafkaConfig> for KafkaProperties {
common: val.common,
rdkafka_properties_common: val.rdkafka_properties_common,
rdkafka_properties_consumer: Default::default(),
privatelink_common: val.privatelink_common,
unknown_fields: Default::default(),
}
}
Expand Down Expand Up @@ -403,7 +407,7 @@ impl KafkaSinkWriter {

// Create the producer context, will be used to create the producer
let producer_ctx = PrivateLinkProducerContext::new(
config.common.broker_rewrite_map.clone(),
config.privatelink_common.broker_rewrite_map.clone(),
// fixme: enable kafka native metrics for sink
None,
None,
Expand Down Expand Up @@ -656,13 +660,21 @@ mod test {
"properties.sasl.password".to_string() => "test".to_string(),
"properties.retry.max".to_string() => "20".to_string(),
"properties.retry.interval".to_string() => "500ms".to_string(),
// PrivateLink
"broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(),
};
let config = KafkaConfig::from_hashmap(properties).unwrap();
assert_eq!(config.common.brokers, "localhost:9092");
assert_eq!(config.common.topic, "test");
assert_eq!(config.max_retry_num, 20);
assert_eq!(config.retry_interval, Duration::from_millis(500));

// PrivateLink fields
let hashmap: HashMap<String, String> = hashmap! {
"broker1".to_string() => "10.0.0.1:8001".to_string()
};
assert_eq!(config.privatelink_common.broker_rewrite_map, Some(hashmap));

// Optional fields eliminated.
let properties: HashMap<String, String> = hashmap! {
// "connector".to_string() => "kafka".to_string(),
Expand Down
7 changes: 5 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,8 +713,11 @@ mod tests {

let props = ConnectorProperties::extract(props, true).unwrap();
if let ConnectorProperties::Kafka(k) = props {
assert!(k.common.broker_rewrite_map.is_some());
println!("{:?}", k.common.broker_rewrite_map);
let hashmap: HashMap<String, String> = hashmap! {
"b-1:9092".to_string() => "dns-1".to_string(),
"b-2:9092".to_string() => "dns-2".to_string(),
};
assert_eq!(k.privatelink_common.broker_rewrite_map, Some(hashmap));
} else {
panic!("extract kafka config failed");
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
let common_props = &properties.common;

let broker_address = common_props.brokers.clone();
let broker_rewrite_map = common_props.broker_rewrite_map.clone();
let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();
let topic = common_props.topic.clone();
config.set("bootstrap.servers", &broker_address);
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
Expand Down
11 changes: 11 additions & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use std::collections::HashMap;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

use crate::common::KafkaPrivateLinkCommon;

pub mod enumerator;
pub mod private_link;
pub mod source;
Expand Down Expand Up @@ -130,6 +132,9 @@ pub struct KafkaProperties {
#[serde(flatten)]
pub rdkafka_properties_consumer: RdKafkaPropertiesConsumer,

#[serde(flatten)]
pub privatelink_common: KafkaPrivateLinkCommon,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
Expand Down Expand Up @@ -208,6 +213,8 @@ mod test {
"properties.fetch.max.bytes".to_string() => "114514".to_string(),
"properties.enable.auto.commit".to_string() => "true".to_string(),
"properties.fetch.queue.backoff.ms".to_string() => "114514".to_string(),
// PrivateLink
"broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(),
};

let props: KafkaProperties =
Expand Down Expand Up @@ -246,5 +253,9 @@ mod test {
props.rdkafka_properties_consumer.fetch_queue_backoff_ms,
Some(114514)
);
let hashmap: HashMap<String, String> = hashmap! {
"broker1".to_string() => "10.0.0.1:8001".to_string()
};
assert_eq!(props.privatelink_common.broker_rewrite_map, Some(hashmap));
}
}
14 changes: 9 additions & 5 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::catalog::connection::PrivateLinkService;

use crate::common::{AwsPrivateLinkItem, BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY};
use crate::common::{
AwsPrivateLinkItem, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
};
use crate::source::kafka::stats::RdKafkaStats;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};
use crate::source::KAFKA_CONNECTOR;
Expand Down Expand Up @@ -211,16 +213,18 @@ fn is_kafka_connector(with_properties: &BTreeMap<String, String>) -> bool {
}

pub fn insert_privatelink_broker_rewrite_map(
properties: &mut BTreeMap<String, String>,
with_options: &mut BTreeMap<String, String>,
svc: Option<&PrivateLinkService>,
privatelink_endpoint: Option<String>,
) -> anyhow::Result<()> {
let mut broker_rewrite_map = HashMap::new();
let servers = get_property_required(properties, kafka_props_broker_key(properties))?;
let servers = get_property_required(with_options, kafka_props_broker_key(with_options))?;
let broker_addrs = servers.split(',').collect_vec();
let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?;
let link_target_value = get_property_required(with_options, PRIVATE_LINK_TARGETS_KEY)?;
let link_targets: Vec<AwsPrivateLinkItem> =
serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?;
// remove the private link targets from WITH options, as they are useless after we constructed the rewrite mapping
with_options.remove(PRIVATE_LINK_TARGETS_KEY);

if broker_addrs.len() != link_targets.len() {
return Err(anyhow!(
Expand Down Expand Up @@ -259,6 +263,6 @@ pub fn insert_privatelink_broker_rewrite_map(
// save private link dns names into source properties, which
// will be extracted into KafkaProperties
let json = serde_json::to_string(&broker_rewrite_map).map_err(|e| anyhow!(e))?;
properties.insert(BROKER_REWRITE_MAP_KEY.to_string(), json);
with_options.insert(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY.to_string(), json);
Ok(())
}
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl SplitReader for KafkaSplitReader {
let mut config = ClientConfig::new();

let bootstrap_servers = &properties.common.brokers;
let broker_rewrite_map = properties.common.broker_rewrite_map.clone();
let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone();

// disable partition eof
config.set("enable.partition.eof", "false");
Expand Down
7 changes: 4 additions & 3 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ KafkaConfig:
field_type: String
required: true
alias: kafka.brokers
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
required: false
- name: topic
field_type: String
required: true
Expand Down Expand Up @@ -305,6 +302,10 @@ KafkaConfig:
comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking.
required: false
default: '5'
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
required: false
KinesisSinkConfig:
fields:
- name: stream
Expand Down
7 changes: 4 additions & 3 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ KafkaProperties:
field_type: String
required: true
alias: kafka.brokers
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
required: false
- name: topic
field_type: String
required: true
Expand Down Expand Up @@ -169,6 +166,10 @@ KafkaProperties:
field_type: bool
comments: 'Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). default: true'
required: false
- name: broker.rewrite.endpoints
field_type: HashMap<String,String>
comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
required: false
KinesisProperties:
fields:
- name: scan.startup.mode
Expand Down
Loading