diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 5e090d75475ab..573e14c3e073f 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -31,6 +31,9 @@ use crate::source::kafka::stats::RdKafkaStats; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; use crate::source::KAFKA_CONNECTOR; +pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint"; +pub const CONNECTION_NAME_KEY: &str = "connection.name"; + #[derive(Debug)] enum PrivateLinkContextRole { Consumer, @@ -204,16 +207,17 @@ fn is_kafka_connector(with_properties: &BTreeMap) -> bool { } pub fn insert_privatelink_broker_rewrite_map( - svc: &PrivateLinkService, properties: &mut BTreeMap, + svc: Option<&PrivateLinkService>, + privatelink_endpoint: Option, ) -> anyhow::Result<()> { let mut broker_rewrite_map = HashMap::new(); - - let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?; let servers = get_property_required(properties, kafka_props_broker_key(properties))?; let broker_addrs = servers.split(',').collect_vec(); + let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?; let link_targets: Vec = serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?; + if broker_addrs.len() != link_targets.len() { return Err(anyhow!( "The number of broker addrs {} does not match the number of private link targets {}", @@ -222,19 +226,30 @@ pub fn insert_privatelink_broker_rewrite_map( )); } - for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { - if svc.dns_entries.is_empty() { - return Err(anyhow!( - "No available private link endpoints for Kafka broker {}", - broker - )); + if let Some(endpoint) = privatelink_endpoint { + for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { + // rewrite the broker address to endpoint:port + broker_rewrite_map.insert(broker.to_string(), format!("{}:{}", &endpoint, link.port)); + } + } else { + if svc.is_none() { + return Err(anyhow!("Privatelink endpoint not found.",)); + } + let svc = svc.unwrap(); + for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { + if svc.dns_entries.is_empty() { + return Err(anyhow!( + "No available private link endpoints for Kafka broker {}", + broker + )); + } + // rewrite the broker address to the dns name w/o az + // requires the NLB has enabled the cross-zone load balancing + broker_rewrite_map.insert( + broker.to_string(), + format!("{}:{}", &svc.endpoint_dns_name, link.port), + ); } - // rewrite the broker address to the dns name w/o az - // requires the NLB has enabled the cross-zone load balancing - broker_rewrite_map.insert( - broker.to_string(), - format!("{}:{}", &svc.endpoint_dns_name, link.port), - ); } // save private link dns names into source properties, which diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index f049011748d17..7913d04379cd5 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -90,7 +90,8 @@ pub(crate) fn resolve_private_link_connection( if svc.get_provider()? == PrivateLinkProvider::Mock { return Ok(()); } - insert_privatelink_broker_rewrite_map(svc, properties).map_err(RwError::from)?; + insert_privatelink_broker_rewrite_map(properties, Some(svc), None) + .map_err(RwError::from)?; } Ok(()) } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 3a8c701d77432..05ac9c59ffae4 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -35,7 +35,7 @@ use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationC use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; -use crate::utils::resolve_connection_in_with_option; +use crate::utils::resolve_privatelink_in_with_option; use crate::Planner; pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { @@ -106,7 +106,7 @@ pub fn gen_sink_plan( let mut with_options = context.with_options().clone(); let connection_id = { let conn_id = - resolve_connection_in_with_option(&mut with_options, &sink_schema_name, session)?; + resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?; conn_id.map(ConnectionId) }; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 993d23084d21f..448d2f49923f7 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -60,7 +60,7 @@ use crate::handler::create_table::{ use crate::handler::util::{get_connector, is_kafka_connector}; use crate::handler::HandlerArgs; use crate::session::SessionImpl; -use crate::utils::resolve_connection_in_with_option; +use crate::utils::resolve_privatelink_in_with_option; use crate::{bind_data_type, WithOptions}; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; @@ -1125,10 +1125,10 @@ pub async fn handle_create_source( let columns = columns.into_iter().map(|c| c.to_protobuf()).collect_vec(); - // resolve privatelink connection for Kafka source let mut with_options = WithOptions::new(with_properties); + // resolve privatelink connection for Kafka source let connection_id = - resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?; + resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?; let definition = handler_args.normalized_sql; let source = PbSource { diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 476e15885c65d..73c5f65add34c 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -51,7 +51,7 @@ use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; use crate::session::{CheckRelationError, SessionImpl}; use crate::stream_fragmenter::build_graph; -use crate::utils::resolve_connection_in_with_option; +use crate::utils::resolve_privatelink_in_with_option; use crate::{Binder, TableCatalog, WithOptions}; /// Column ID generator for a new table or a new version of an existing table to alter. @@ -653,7 +653,7 @@ fn gen_table_plan_inner( // resolve privatelink connection for Table backed by Kafka source let mut with_options = WithOptions::new(properties); let connection_id = - resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?; + resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?; let source = source_info.map(|source_info| PbSource { id: TableId::placeholder().table_id, diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index bf17ace34119b..baf03184c4a14 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -18,6 +18,9 @@ use std::num::NonZeroU32; use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result as RwResult, RwError}; +use risingwave_connector::source::kafka::{ + insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, +}; use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::{ CompatibleSourceSchema, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, @@ -122,13 +125,27 @@ fn is_kafka_connector(with_options: &WithOptions) -> bool { connector == KAFKA_CONNECTOR } -pub(crate) fn resolve_connection_in_with_option( +pub(crate) fn resolve_privatelink_in_with_option( with_options: &mut WithOptions, schema_name: &Option, session: &SessionImpl, ) -> RwResult> { - let connection_name = get_connection_name(with_options); let is_kafka = is_kafka_connector(with_options); + let privatelink_endpoint = with_options.get(PRIVATELINK_ENDPOINT_KEY).cloned(); + + // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly + if let Some(endpoint) = privatelink_endpoint { + if !is_kafka { + return Err(RwError::from(ErrorCode::ProtocolError( + "Privatelink is only supported in kafka connector".to_string(), + ))); + } + insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint)) + .map_err(RwError::from)?; + return Ok(None); + } + + let connection_name = get_connection_name(with_options); let connection_id = match connection_name { Some(connection_name) => { let connection = session diff --git a/src/meta/src/rpc/service/cloud_service.rs b/src/meta/src/rpc/service/cloud_service.rs index b1544f3076a46..21cd77ff42fae 100644 --- a/src/meta/src/rpc/service/cloud_service.rs +++ b/src/meta/src/rpc/service/cloud_service.rs @@ -117,7 +117,9 @@ impl CloudService for CloudServiceImpl { } _ => (), }; - if let Err(e) = insert_privatelink_broker_rewrite_map(&service, &mut source_cfg) { + if let Err(e) = + insert_privatelink_broker_rewrite_map(&mut source_cfg, Some(&service), None) + { return Ok(new_rwc_validate_fail_response( ErrorType::PrivatelinkResolveErr, e.to_string(),