Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support privatelink.endpoint option in the WITH clause for Kafka connector #12266

Merged
merged 8 commits into from
Sep 22, 2023
45 changes: 30 additions & 15 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use crate::source::kafka::stats::RdKafkaStats;
use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};
use crate::source::KAFKA_CONNECTOR;

pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
pub const CONNECTION_NAME_KEY: &str = "connection.name";

#[derive(Debug)]
enum PrivateLinkContextRole {
Consumer,
Expand Down Expand Up @@ -204,16 +207,17 @@ fn is_kafka_connector(with_properties: &BTreeMap<String, String>) -> bool {
}

pub fn insert_privatelink_broker_rewrite_map(
svc: &PrivateLinkService,
properties: &mut BTreeMap<String, String>,
svc: Option<&PrivateLinkService>,
privatelink_endpoint: Option<String>,
) -> anyhow::Result<()> {
let mut broker_rewrite_map = HashMap::new();

let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?;
let servers = get_property_required(properties, kafka_props_broker_key(properties))?;
let broker_addrs = servers.split(',').collect_vec();
let link_target_value = get_property_required(properties, PRIVATE_LINK_TARGETS_KEY)?;
let link_targets: Vec<AwsPrivateLinkItem> =
serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?;

if broker_addrs.len() != link_targets.len() {
return Err(anyhow!(
"The number of broker addrs {} does not match the number of private link targets {}",
Expand All @@ -222,19 +226,30 @@ pub fn insert_privatelink_broker_rewrite_map(
));
}

for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) {
if svc.dns_entries.is_empty() {
return Err(anyhow!(
"No available private link endpoints for Kafka broker {}",
broker
));
if let Some(endpoint) = privatelink_endpoint {
for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) {
// rewrite the broker address to endpoint:port
broker_rewrite_map.insert(broker.to_string(), format!("{}:{}", &endpoint, link.port));
}
} else {
if svc.is_none() {
return Err(anyhow!("Privatelink endpoint not found.",));
}
let svc = svc.unwrap();
for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) {
if svc.dns_entries.is_empty() {
return Err(anyhow!(
"No available private link endpoints for Kafka broker {}",
broker
));
}
// rewrite the broker address to the dns name w/o az
// requires the NLB has enabled the cross-zone load balancing
broker_rewrite_map.insert(
broker.to_string(),
format!("{}:{}", &svc.endpoint_dns_name, link.port),
);
}
// rewrite the broker address to the dns name w/o az
// requires the NLB has enabled the cross-zone load balancing
broker_rewrite_map.insert(
broker.to_string(),
format!("{}:{}", &svc.endpoint_dns_name, link.port),
);
}

// save private link dns names into source properties, which
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/catalog/connection_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ pub(crate) fn resolve_private_link_connection(
if svc.get_provider()? == PrivateLinkProvider::Mock {
return Ok(());
}
insert_privatelink_broker_rewrite_map(svc, properties).map_err(RwError::from)?;
insert_privatelink_broker_rewrite_map(properties, Some(svc), None)
.map_err(RwError::from)?;
}
Ok(())
}
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationC
use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
use crate::session::SessionImpl;
use crate::stream_fragmenter::build_graph;
use crate::utils::resolve_connection_in_with_option;
use crate::utils::resolve_privatelink_in_with_option;
use crate::Planner;

pub fn gen_sink_query_from_name(from_name: ObjectName) -> Result<Query> {
Expand Down Expand Up @@ -106,7 +106,7 @@ pub fn gen_sink_plan(
let mut with_options = context.with_options().clone();
let connection_id = {
let conn_id =
resolve_connection_in_with_option(&mut with_options, &sink_schema_name, session)?;
resolve_privatelink_in_with_option(&mut with_options, &sink_schema_name, session)?;
conn_id.map(ConnectionId)
};

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use crate::handler::create_table::{
use crate::handler::util::{get_connector, is_kafka_connector};
use crate::handler::HandlerArgs;
use crate::session::SessionImpl;
use crate::utils::resolve_connection_in_with_option;
use crate::utils::resolve_privatelink_in_with_option;
use crate::{bind_data_type, WithOptions};

pub(crate) const UPSTREAM_SOURCE_KEY: &str = "connector";
Expand Down Expand Up @@ -1125,10 +1125,10 @@ pub async fn handle_create_source(

let columns = columns.into_iter().map(|c| c.to_protobuf()).collect_vec();

// resolve privatelink connection for Kafka source
let mut with_options = WithOptions::new(with_properties);
// resolve privatelink connection for Kafka source
let connection_id =
resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?;
resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?;
let definition = handler_args.normalized_sql;

let source = PbSource {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::optimizer::property::{Order, RequiredDist};
use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, PlanRoot};
use crate::session::{CheckRelationError, SessionImpl};
use crate::stream_fragmenter::build_graph;
use crate::utils::resolve_connection_in_with_option;
use crate::utils::resolve_privatelink_in_with_option;
use crate::{Binder, TableCatalog, WithOptions};

/// Column ID generator for a new table or a new version of an existing table to alter.
Expand Down Expand Up @@ -653,7 +653,7 @@ fn gen_table_plan_inner(
// resolve privatelink connection for Table backed by Kafka source
let mut with_options = WithOptions::new(properties);
let connection_id =
resolve_connection_in_with_option(&mut with_options, &schema_name, &session)?;
resolve_privatelink_in_with_option(&mut with_options, &schema_name, &session)?;

let source = source_info.map(|source_info| PbSource {
id: TableId::placeholder().table_id,
Expand Down
21 changes: 19 additions & 2 deletions src/frontend/src/utils/with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::num::NonZeroU32;

use itertools::Itertools;
use risingwave_common::error::{ErrorCode, Result as RwResult, RwError};
use risingwave_connector::source::kafka::{
insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY,
};
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_sqlparser::ast::{
CompatibleSourceSchema, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
Expand Down Expand Up @@ -122,13 +125,27 @@ fn is_kafka_connector(with_options: &WithOptions) -> bool {
connector == KAFKA_CONNECTOR
}

pub(crate) fn resolve_connection_in_with_option(
pub(crate) fn resolve_privatelink_in_with_option(
with_options: &mut WithOptions,
schema_name: &Option<String>,
session: &SessionImpl,
) -> RwResult<Option<ConnectionId>> {
let connection_name = get_connection_name(with_options);
let is_kafka = is_kafka_connector(with_options);
let privatelink_endpoint = with_options.get(PRIVATELINK_ENDPOINT_KEY).cloned();

// if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
if let Some(endpoint) = privatelink_endpoint {
if !is_kafka {
return Err(RwError::from(ErrorCode::ProtocolError(
"Privatelink is only supported in kafka connector".to_string(),
)));
}
insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
.map_err(RwError::from)?;
return Ok(None);
}

let connection_name = get_connection_name(with_options);
let connection_id = match connection_name {
Some(connection_name) => {
let connection = session
Expand Down
4 changes: 3 additions & 1 deletion src/meta/src/rpc/service/cloud_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ impl CloudService for CloudServiceImpl {
}
_ => (),
};
if let Err(e) = insert_privatelink_broker_rewrite_map(&service, &mut source_cfg) {
if let Err(e) =
insert_privatelink_broker_rewrite_map(&mut source_cfg, Some(&service), None)
{
return Ok(new_rwc_validate_fail_response(
ErrorType::PrivatelinkResolveErr,
e.to_string(),
Expand Down
Loading