From e7ed953079bc6cef3aec171568ce057cb5c6a514 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 11 Sep 2023 14:58:30 +0800 Subject: [PATCH 1/5] feat: support provide privatelink endpoint in the WITH of CREATE SOURCE TODO: add support for kafka sink --- proto/catalog.proto | 3 + .../src/source/kafka/private_link.rs | 62 ++++++++++++------- .../src/catalog/connection_catalog.rs | 3 +- src/frontend/src/catalog/source_catalog.rs | 3 + src/frontend/src/handler/create_sink.rs | 4 +- src/frontend/src/handler/create_source.rs | 9 ++- src/frontend/src/handler/create_table.rs | 5 +- src/frontend/src/utils/with_options.rs | 21 ++++++- src/meta/src/rpc/service/cloud_service.rs | 4 +- 9 files changed, 80 insertions(+), 34 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index aa09274a0d818..8b032ac3cac54 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -80,6 +80,9 @@ message Source { optional uint64 initialized_at_epoch = 15; optional uint64 created_at_epoch = 16; + // Privatelink endpoint provisioned by the cloud platform or provided by the user. + optional string privatelink_endpoint = 17; + // Per-source catalog version, used by schema change. uint64 version = 100; } diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 5e090d75475ab..ead4dc189329a 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -31,6 +31,9 @@ use crate::source::kafka::stats::RdKafkaStats; use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS}; use crate::source::KAFKA_CONNECTOR; +pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint"; +pub const CONNECTION_NAME_KEY: &str = "connection.name"; + #[derive(Debug)] enum PrivateLinkContextRole { Consumer, @@ -204,37 +207,50 @@ fn is_kafka_connector(with_properties: &BTreeMap) -> bool { } pub fn insert_privatelink_broker_rewrite_map( - svc: &PrivateLinkService, properties: &mut BTreeMap, + svc: Option<&PrivateLinkService>, + privatelink_endpoint: Option, ) -> anyhow::Result<()> { - let mut broker_rewrite_map = HashMap::new(); - - let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?; + let mut broker_rewrite_map: HashMap; let servers = get_property_required(properties, kafka_props_broker_key(properties))?; let broker_addrs = servers.split(',').collect_vec(); - let link_targets: Vec = - serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?; - if broker_addrs.len() != link_targets.len() { - return Err(anyhow!( - "The number of broker addrs {} does not match the number of private link targets {}", - broker_addrs.len(), - link_targets.len() - )); - } - for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { - if svc.dns_entries.is_empty() { + if let Some(endpoint) = privatelink_endpoint { + broker_rewrite_map = broker_addrs + .into_iter() + .map(|broker| (broker.to_string(), format!("{}", &endpoint))) + .collect(); + } else { + if svc.is_none() { + return Err(anyhow!("Privatelink endpoint not found.",)); + } + let svc = svc.unwrap(); + broker_rewrite_map = HashMap::new(); + let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?; + let link_targets: Vec = + serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?; + if broker_addrs.len() != link_targets.len() { return Err(anyhow!( - "No available private link endpoints for Kafka broker {}", - broker + "The number of broker addrs {} does not match the number of private link targets {}", + broker_addrs.len(), + link_targets.len() )); } - // rewrite the broker address to the dns name w/o az - // requires the NLB has enabled the cross-zone load balancing - broker_rewrite_map.insert( - broker.to_string(), - format!("{}:{}", &svc.endpoint_dns_name, link.port), - ); + + for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { + if svc.dns_entries.is_empty() { + return Err(anyhow!( + "No available private link endpoints for Kafka broker {}", + broker + )); + } + // rewrite the broker address to the dns name w/o az + // requires the NLB has enabled the cross-zone load balancing + broker_rewrite_map.insert( + broker.to_string(), + format!("{}:{}", &svc.endpoint_dns_name, link.port), + ); + } } // save private link dns names into source properties, which diff --git a/src/frontend/src/catalog/connection_catalog.rs b/src/frontend/src/catalog/connection_catalog.rs index f049011748d17..7913d04379cd5 100644 --- a/src/frontend/src/catalog/connection_catalog.rs +++ b/src/frontend/src/catalog/connection_catalog.rs @@ -90,7 +90,8 @@ pub(crate) fn resolve_private_link_connection( if svc.get_provider()? == PrivateLinkProvider::Mock { return Ok(()); } - insert_privatelink_broker_rewrite_map(svc, properties).map_err(RwError::from)?; + insert_privatelink_broker_rewrite_map(properties, Some(svc), None) + .map_err(RwError::from)?; } Ok(()) } diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index ec35cfb7bde28..f51d34d430ee5 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -43,6 +43,7 @@ pub struct SourceCatalog { pub connection_id: Option, pub created_at_epoch: Option, pub initialized_at_epoch: Option, + pub privatelink_endpoint: Option, pub version: SourceVersionId, } @@ -72,6 +73,7 @@ impl SourceCatalog { optional_associated_table_id: self .associated_table_id .map(|id| OptionalAssociatedTableId::AssociatedTableId(id.table_id)), + privatelink_endpoint: self.privatelink_endpoint.clone(), version: self.version, } } @@ -127,6 +129,7 @@ impl From<&PbSource> for SourceCatalog { connection_id, created_at_epoch: prost.created_at_epoch.map(Epoch::from), initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), + privatelink_endpoint: prost.privatelink_endpoint.clone(), version, } } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 3a8c701d77432..05ac9c59ffae4 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -35,7 +35,7 @@ use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationC use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; -use crate::utils::resolve_connection_in_with_option; +use crate::utils::resolve_privatelink_in_with_option; use crate::Planner; pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result { @@ -106,7 +106,7 @@ pub fn gen_sink_plan( let mut with_options = context.with_options().clone(); let connection_id = { let conn_id = - resolve_connection_in_with_option(&mut with_options, &sink_schema_name, session)?; + resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?; conn_id.map(ConnectionId) }; diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index 7b38e6289e164..e87ffe2b7d917 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -35,6 +35,7 @@ use risingwave_connector::source::cdc::{ }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::filesystem::S3_CONNECTOR; +use risingwave_connector::source::kafka::PRIVATELINK_ENDPOINT_KEY; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::{ SourceEncode, SourceFormat, SourceStruct, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, @@ -60,7 +61,7 @@ use crate::handler::create_table::{ use crate::handler::util::{get_connector, is_kafka_connector}; use crate::handler::HandlerArgs; use crate::session::SessionImpl; -use crate::utils::resolve_connection_in_with_option; +use crate::utils::resolve_privatelink_in_with_option; use crate::{bind_data_type, WithOptions}; pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector"; @@ -1116,10 +1117,11 @@ pub async fn handle_create_source( let columns = columns.into_iter().map(|c| c.to_protobuf()).collect_vec(); - // resolve privatelink connection for Kafka source let mut with_options = WithOptions::new(with_properties); + let privatelink_endpoint = with_options.get(PRIVATELINK_ENDPOINT_KEY).cloned(); + // resolve privatelink connection for Kafka source let connection_id = - resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?; + resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?; let definition = handler_args.normalized_sql; let source = PbSource { @@ -1139,6 +1141,7 @@ pub async fn handle_create_source( initialized_at_epoch: None, created_at_epoch: None, optional_associated_table_id: None, + privatelink_endpoint, version: INITIAL_SOURCE_VERSION_ID, }; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index d8419ac98da38..ae5f553115b83 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -51,7 +51,7 @@ use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot}; use crate::session::{CheckRelationError, SessionImpl}; use crate::stream_fragmenter::build_graph; -use crate::utils::resolve_connection_in_with_option; +use crate::utils::resolve_privatelink_in_with_option; use crate::{Binder, TableCatalog, WithOptions}; /// Column ID generator for a new table or a new version of an existing table to alter. @@ -633,7 +633,7 @@ fn gen_table_plan_inner( // resolve privatelink connection for Table backed by Kafka source let mut with_options = WithOptions::new(properties); let connection_id = - resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?; + resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?; let source = source_info.map(|source_info| PbSource { id: TableId::placeholder().table_id, @@ -657,6 +657,7 @@ fn gen_table_plan_inner( optional_associated_table_id: Some(OptionalAssociatedTableId::AssociatedTableId( TableId::placeholder().table_id, )), + privatelink_endpoint: None, version: INITIAL_SOURCE_VERSION_ID, }); diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index bf17ace34119b..baf03184c4a14 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -18,6 +18,9 @@ use std::num::NonZeroU32; use itertools::Itertools; use risingwave_common::error::{ErrorCode, Result as RwResult, RwError}; +use risingwave_connector::source::kafka::{ + insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, +}; use risingwave_connector::source::KAFKA_CONNECTOR; use risingwave_sqlparser::ast::{ CompatibleSourceSchema, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement, @@ -122,13 +125,27 @@ fn is_kafka_connector(with_options: &WithOptions) -> bool { connector == KAFKA_CONNECTOR } -pub(crate) fn resolve_connection_in_with_option( +pub(crate) fn resolve_privatelink_in_with_option( with_options: &mut WithOptions, schema_name: &Option, session: &SessionImpl, ) -> RwResult> { - let connection_name = get_connection_name(with_options); let is_kafka = is_kafka_connector(with_options); + let privatelink_endpoint = with_options.get(PRIVATELINK_ENDPOINT_KEY).cloned(); + + // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly + if let Some(endpoint) = privatelink_endpoint { + if !is_kafka { + return Err(RwError::from(ErrorCode::ProtocolError( + "Privatelink is only supported in kafka connector".to_string(), + ))); + } + insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint)) + .map_err(RwError::from)?; + return Ok(None); + } + + let connection_name = get_connection_name(with_options); let connection_id = match connection_name { Some(connection_name) => { let connection = session diff --git a/src/meta/src/rpc/service/cloud_service.rs b/src/meta/src/rpc/service/cloud_service.rs index 141abd9a0aab9..f702db9e5285f 100644 --- a/src/meta/src/rpc/service/cloud_service.rs +++ b/src/meta/src/rpc/service/cloud_service.rs @@ -116,7 +116,9 @@ impl CloudService for CloudServiceImpl { } _ => (), }; - if let Err(e) = insert_privatelink_broker_rewrite_map(&service, &mut source_cfg) { + if let Err(e) = + insert_privatelink_broker_rewrite_map(&mut source_cfg, Some(&service), None) + { return Ok(new_rwc_validate_fail_response( ErrorType::PrivatelinkResolveErr, e.to_string(), From c62594e6985f1272ef7b9a9bce08dbe4fff777ab Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 15 Sep 2023 10:40:18 +0800 Subject: [PATCH 2/5] rewrite to endpoint with target port --- src/connector/src/source/kafka/private_link.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index ead4dc189329a..8129755aca5c8 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -216,10 +216,10 @@ pub fn insert_privatelink_broker_rewrite_map( let broker_addrs = servers.split(',').collect_vec(); if let Some(endpoint) = privatelink_endpoint { - broker_rewrite_map = broker_addrs - .into_iter() - .map(|broker| (broker.to_string(), format!("{}", &endpoint))) - .collect(); + for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { + // rewrite the broker address to endpoint:port + broker_rewrite_map.insert(broker.to_string(), format!("{}:{}", &endpoint, link.port)); + } } else { if svc.is_none() { return Err(anyhow!("Privatelink endpoint not found.",)); From c9f04c44e49e820740e8e57f6ab25fc945d3e0ef Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 15 Sep 2023 10:46:57 +0800 Subject: [PATCH 3/5] fix code --- src/connector/src/source/kafka/private_link.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 8129755aca5c8..787df9616a2b6 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -211,9 +211,12 @@ pub fn insert_privatelink_broker_rewrite_map( svc: Option<&PrivateLinkService>, privatelink_endpoint: Option, ) -> anyhow::Result<()> { - let mut broker_rewrite_map: HashMap; + let mut broker_rewrite_map = HashMap::new(); let servers = get_property_required(properties, kafka_props_broker_key(properties))?; let broker_addrs = servers.split(',').collect_vec(); + let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?; + let link_targets: Vec = + serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?; if let Some(endpoint) = privatelink_endpoint { for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { @@ -225,10 +228,6 @@ pub fn insert_privatelink_broker_rewrite_map( return Err(anyhow!("Privatelink endpoint not found.",)); } let svc = svc.unwrap(); - broker_rewrite_map = HashMap::new(); - let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?; - let link_targets: Vec = - serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?; if broker_addrs.len() != link_targets.len() { return Err(anyhow!( "The number of broker addrs {} does not match the number of private link targets {}", From 3b4cd8915f3436c3392e8e0f87e52b94730d9659 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 19 Sep 2023 16:59:56 +0800 Subject: [PATCH 4/5] check number of targets and brokers --- src/connector/src/source/kafka/private_link.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/connector/src/source/kafka/private_link.rs b/src/connector/src/source/kafka/private_link.rs index 787df9616a2b6..573e14c3e073f 100644 --- a/src/connector/src/source/kafka/private_link.rs +++ b/src/connector/src/source/kafka/private_link.rs @@ -218,6 +218,14 @@ pub fn insert_privatelink_broker_rewrite_map( let link_targets: Vec = serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?; + if broker_addrs.len() != link_targets.len() { + return Err(anyhow!( + "The number of broker addrs {} does not match the number of private link targets {}", + broker_addrs.len(), + link_targets.len() + )); + } + if let Some(endpoint) = privatelink_endpoint { for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { // rewrite the broker address to endpoint:port @@ -228,14 +236,6 @@ pub fn insert_privatelink_broker_rewrite_map( return Err(anyhow!("Privatelink endpoint not found.",)); } let svc = svc.unwrap(); - if broker_addrs.len() != link_targets.len() { - return Err(anyhow!( - "The number of broker addrs {} does not match the number of private link targets {}", - broker_addrs.len(), - link_targets.len() - )); - } - for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) { if svc.dns_entries.is_empty() { return Err(anyhow!( From 2cfd45015962bede2d1b7970239baa960f50043d Mon Sep 17 00:00:00 2001 From: StrikeW Date: Thu, 21 Sep 2023 11:01:20 +0800 Subject: [PATCH 5/5] the mapping already stored in the properties of source --- proto/catalog.proto | 3 --- src/frontend/src/catalog/source_catalog.rs | 3 --- src/frontend/src/handler/create_source.rs | 3 --- src/frontend/src/handler/create_table.rs | 1 - 4 files changed, 10 deletions(-) diff --git a/proto/catalog.proto b/proto/catalog.proto index 62395ab97ce3b..07aff3baee22f 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -87,9 +87,6 @@ message Source { optional uint64 initialized_at_epoch = 15; optional uint64 created_at_epoch = 16; - // Privatelink endpoint provisioned by the cloud platform or provided by the user. - optional string privatelink_endpoint = 17; - // Per-source catalog version, used by schema change. uint64 version = 100; } diff --git a/src/frontend/src/catalog/source_catalog.rs b/src/frontend/src/catalog/source_catalog.rs index f51d34d430ee5..ec35cfb7bde28 100644 --- a/src/frontend/src/catalog/source_catalog.rs +++ b/src/frontend/src/catalog/source_catalog.rs @@ -43,7 +43,6 @@ pub struct SourceCatalog { pub connection_id: Option, pub created_at_epoch: Option, pub initialized_at_epoch: Option, - pub privatelink_endpoint: Option, pub version: SourceVersionId, } @@ -73,7 +72,6 @@ impl SourceCatalog { optional_associated_table_id: self .associated_table_id .map(|id| OptionalAssociatedTableId::AssociatedTableId(id.table_id)), - privatelink_endpoint: self.privatelink_endpoint.clone(), version: self.version, } } @@ -129,7 +127,6 @@ impl From<&PbSource> for SourceCatalog { connection_id, created_at_epoch: prost.created_at_epoch.map(Epoch::from), initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from), - privatelink_endpoint: prost.privatelink_endpoint.clone(), version, } } diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index d3b13f54d35ba..448d2f49923f7 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -35,7 +35,6 @@ use risingwave_connector::source::cdc::{ }; use risingwave_connector::source::datagen::DATAGEN_CONNECTOR; use risingwave_connector::source::filesystem::S3_CONNECTOR; -use risingwave_connector::source::kafka::PRIVATELINK_ENDPOINT_KEY; use risingwave_connector::source::nexmark::source::{get_event_data_types_with_names, EventType}; use risingwave_connector::source::{ SourceEncode, SourceFormat, SourceStruct, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR, @@ -1127,7 +1126,6 @@ pub async fn handle_create_source( let columns = columns.into_iter().map(|c| c.to_protobuf()).collect_vec(); let mut with_options = WithOptions::new(with_properties); - let privatelink_endpoint = with_options.get(PRIVATELINK_ENDPOINT_KEY).cloned(); // resolve privatelink connection for Kafka source let connection_id = resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?; @@ -1150,7 +1148,6 @@ pub async fn handle_create_source( initialized_at_epoch: None, created_at_epoch: None, optional_associated_table_id: None, - privatelink_endpoint, version: INITIAL_SOURCE_VERSION_ID, }; diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index e5001206cfce8..73c5f65add34c 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -677,7 +677,6 @@ fn gen_table_plan_inner( optional_associated_table_id: Some(OptionalAssociatedTableId::AssociatedTableId( TableId::placeholder().table_id, )), - privatelink_endpoint: None, version: INITIAL_SOURCE_VERSION_ID, });