From 1d2cb3db022dddb6dc8e4ea11b91e03910421b43 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 18 Nov 2024 17:28:53 +0800 Subject: [PATCH] stash --- src/connector/src/connector_common/common.rs | 59 ++++----- .../src/connector_common/connection_util.rs | 125 ++++++++++++++++++ .../src/connector_common/iceberg/mod.rs | 4 - src/connector/src/connector_common/mod.rs | 29 +--- src/connector/src/macros.rs | 20 ++- src/connector/src/sink/kafka.rs | 5 +- src/connector/src/sink/redis.rs | 2 + src/connector/src/source/base.rs | 1 + .../src/source/kafka/enumerator/client.rs | 5 +- src/connector/src/source/kafka/mod.rs | 4 +- src/frontend/src/handler/create_connection.rs | 5 +- src/frontend/src/utils/with_options.rs | 29 +++- src/meta/src/error.rs | 8 ++ src/meta/src/rpc/ddl_controller.rs | 3 +- 14 files changed, 229 insertions(+), 70 deletions(-) create mode 100644 src/connector/src/connector_common/connection_util.rs diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 5b2c0200c4724..0f49ff1c60cd0 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -163,7 +163,7 @@ impl AwsAuthProps { #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] -pub struct KafkaConnection { +pub struct KafkaConnectionInner { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, @@ -241,36 +241,10 @@ pub struct KafkaConnection { sasl_oathbearer_config: Option, } -impl KafkaConnection { - #[cfg(test)] - pub fn test_default() -> Self { - Self { - brokers: "localhost:9092".to_string(), - security_protocol: None, - ssl_ca_location: None, - ssl_certificate_location: None, - ssl_key_location: None, - ssl_ca_pem: None, - ssl_certificate_pem: None, - ssl_key_pem: None, - ssl_key_password: None, - ssl_endpoint_identification_algorithm: None, - sasl_mechanism: None, - sasl_username: None, - sasl_password: None, - sasl_kerberos_service_name: None, - sasl_kerberos_keytab: None, - sasl_kerberos_principal: None, - sasl_kerberos_kinit_cmd: None, - sasl_kerberos_min_time_before_relogin: None, - sasl_oathbearer_config: None, - } - } -} - #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions)] pub struct KafkaCommon { + // connection related props are moved to `KafkaConnection` #[serde(rename = "topic", alias = "kafka.topic")] pub topic: String, @@ -283,7 +257,7 @@ pub struct KafkaCommon { } #[serde_as] -#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] pub struct KafkaPrivateLinkCommon { /// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. #[serde(rename = "broker.rewrite.endpoints")] @@ -348,7 +322,32 @@ impl RdKafkaPropertiesCommon { } } -impl KafkaConnection { +impl KafkaConnectionInner { + #[cfg(test)] + pub fn test_default() -> Self { + Self { + brokers: "localhost:9092".to_string(), + security_protocol: None, + ssl_ca_location: None, + ssl_certificate_location: None, + ssl_key_location: None, + ssl_ca_pem: None, + ssl_certificate_pem: None, + ssl_key_pem: None, + ssl_key_password: None, + ssl_endpoint_identification_algorithm: None, + sasl_mechanism: None, + sasl_username: None, + sasl_password: None, + sasl_kerberos_service_name: None, + sasl_kerberos_keytab: None, + sasl_kerberos_principal: None, + sasl_kerberos_kinit_cmd: None, + sasl_kerberos_min_time_before_relogin: None, + sasl_oathbearer_config: None, + } + } + pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { // AWS_MSK_IAM if self.is_aws_msk_iam() { diff --git a/src/connector/src/connector_common/connection_util.rs b/src/connector/src/connector_common/connection_util.rs new file mode 100644 index 0000000000000..75aa990d69642 --- /dev/null +++ b/src/connector/src/connector_common/connection_util.rs @@ -0,0 +1,125 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use rdkafka::consumer::{BaseConsumer, Consumer}; +use rdkafka::ClientConfig; +use risingwave_common::secret::LocalSecretManager; +use risingwave_pb::catalog::PbConnection; +use serde_derive::Deserialize; +use serde_with::serde_as; +use tonic::async_trait; +use with_options::WithOptions; + +use crate::connector_common::{AwsAuthProps, KafkaConnectionInner, KafkaPrivateLinkCommon}; +use crate::error::ConnectorResult; +use crate::source::kafka::{KafkaContextCommon, RwConsumerContext}; +use crate::{dispatch_connection_impl, ConnectionImpl}; + +#[async_trait] +pub trait ConnectionValidate { + async fn test_connection(&self) -> ConnectorResult<()>; +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] +pub struct KafkaConnection { + #[serde(flatten)] + pub inner: KafkaConnectionInner, + #[serde(flatten)] + pub kafka_private_link_common: KafkaPrivateLinkCommon, + #[serde(flatten)] + pub aws_auth_props: AwsAuthProps, +} + +pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<()> { + if let Some(ref info) = connection.info { + match info { + risingwave_pb::catalog::connection::Info::ConnectionParams(cp) => { + let options = cp.properties.clone().into_iter().collect(); + let secret_refs = cp.secret_refs.clone().into_iter().collect(); + let props_secret_resolved = + LocalSecretManager::global().fill_secrets(options, secret_refs)?; + let connection_impl = + ConnectionImpl::from_proto(cp.connection_type(), props_secret_resolved)?; + dispatch_connection_impl!(connection_impl, inner, inner.test_connection().await?) + } + risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => unreachable!(), + } + } + Ok(()) +} + +#[async_trait] +impl ConnectionValidate for KafkaConnection { + async fn test_connection(&self) -> ConnectorResult<()> { + let client = self.build_client().await?; + // describe cluster here + client.fetch_metadata(None, Duration::from_secs(10)).await?; + Ok(()) + } +} + +impl KafkaConnection { + async fn build_client(&self) -> ConnectorResult> { + let mut config = ClientConfig::new(); + let bootstrap_servers = &self.inner.brokers; + let broker_rewrite_map = self.kafka_private_link_common.broker_rewrite_map.clone(); + config.set("bootstrap.servers", bootstrap_servers); + self.inner.set_security_properties(&mut config); + + // dup with Kafka Enumerator + let ctx_common = KafkaContextCommon::new( + broker_rewrite_map, + None, + None, + self.aws_auth_props.clone(), + self.inner.is_aws_msk_iam(), + ) + .await?; + let client_ctx = RwConsumerContext::new(ctx_common); + let client: BaseConsumer = + config.create_with_context(client_ctx).await?; + if self.inner.is_aws_msk_iam() { + #[cfg(not(madsim))] + client.poll(Duration::from_secs(10)); // note: this is a blocking call + #[cfg(madsim)] + client.poll(Duration::from_secs(10)).await; + } + Ok(client) + } +} + +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)] +pub struct IcebergConnection {} + +#[async_trait] +impl ConnectionValidate for IcebergConnection { + async fn test_connection(&self) -> ConnectorResult<()> { + todo!() + } +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] +pub struct SchemaRegistryConnection {} + +#[async_trait] +impl ConnectionValidate for SchemaRegistryConnection { + async fn test_connection(&self) -> ConnectorResult<()> { + todo!() + } +} diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index ded75ad4080a6..d10a9eefb68aa 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -32,10 +32,6 @@ use with_options::WithOptions; use crate::deserialize_optional_bool_from_string; use crate::error::ConnectorResult; -#[serde_as] -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)] -pub struct IcebergConnection {} - #[serde_as] #[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)] pub struct IcebergCommon { diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index 5822658c1f104..c75361132a61d 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -19,30 +19,15 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService}; mod common; pub use common::{ - AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, + AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnectionInner, KafkaPrivateLinkCommon, KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, }; +mod connection_util; +pub use connection_util::{ + validate_connection, ConnectionValidate, IcebergConnection, KafkaConnection, + SchemaRegistryConnection, +}; mod iceberg; -pub use iceberg::{IcebergCommon, IcebergConnection}; - -#[cfg(test)] -mod tests { - - use super::*; - use crate::error::ConnectorResult; - use crate::{dispatch_connection_impl, ConnectionImpl}; - - #[test] - fn test_dispatch_connection() -> ConnectorResult<()> { - let kafka_conn = KafkaConnection::test_default(); - let conn_impl = ConnectionImpl::from(kafka_conn); - - let x: Result<(), ()> = dispatch_connection_impl!(conn_impl, inner, { - println!("{:?}", inner); - Ok(()) - }); - Ok(()) - } -} +pub use iceberg::IcebergCommon; diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index a54c06819edb3..e5e4140f3a2f2 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -55,8 +55,9 @@ macro_rules! for_all_connections { ($macro:path $(, $extra_args:tt)*) => { $macro! { { - { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType::Kafka }, - { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType::Iceberg } + { Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType }, + { Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType }, + { SchemaRegistry, $crate::connector_common::SchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType } } $(,$extra_args)* } @@ -213,7 +214,7 @@ macro_rules! dispatch_connection_impl_inner { #[macro_export] macro_rules! impl_connection { - ({$({ $variant_name:ident, $connection:ty, $pb_connection_type:ty }),*}) => { + ({$({ $variant_name:ident, $connection:ty, $pb_connection_path:path }),*}) => { #[derive(Debug, Clone, EnumAsInner, PartialEq)] pub enum ConnectionImpl { $( @@ -240,6 +241,19 @@ macro_rules! impl_connection { } )* + + impl ConnectionImpl { + pub fn from_proto(pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType, value_secret_filled: std::collections::BTreeMap) -> $crate::error::ConnectorResult { + match pb_connection_type { + $( + <$pb_connection_path>::$variant_name => { + Ok(serde_json::from_value(json!(value_secret_filled)).map(ConnectionImpl::$variant_name).map_err($crate::error::ConnectorError::from)?) + }, + )* + risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(), + } + } + } } } diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 9fc8da7ef7a48..279d28914a2fa 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -35,7 +35,8 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; use crate::connector_common::{ - AwsAuthProps, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, + AwsAuthProps, KafkaCommon, KafkaConnectionInner, KafkaPrivateLinkCommon, + RdKafkaPropertiesCommon, }; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -215,7 +216,7 @@ pub struct KafkaConfig { pub common: KafkaCommon, #[serde(flatten)] - pub connection: KafkaConnection, + pub connection: KafkaConnectionInner, #[serde( rename = "properties.retry.max", diff --git a/src/connector/src/sink/redis.rs b/src/connector/src/sink/redis.rs index 763d7e9bba49a..473b3ef7f70dc 100644 --- a/src/connector/src/sink/redis.rs +++ b/src/connector/src/sink/redis.rs @@ -412,6 +412,7 @@ mod test { options: BTreeMap::default(), secret_refs: BTreeMap::default(), key_encode: None, + connection_id: None, }; let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) @@ -490,6 +491,7 @@ mod test { options: btree_map, secret_refs: Default::default(), key_encode: None, + connection_id: None, }; let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc) diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 1efa03d41ff7e..bcf8ab4b3f027 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -32,6 +32,7 @@ use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo}; use risingwave_pb::plan_common::ExternalTableDesc; use risingwave_pb::source::ConnectorSplit; use serde::de::DeserializeOwned; +use serde_json::json; use tokio::sync::mpsc; use super::cdc::DebeziumCdcMeta; diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 1d7525bc7a613..df7cd787e061f 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -31,13 +31,14 @@ use crate::error::{ConnectorError, ConnectorResult}; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::{ - KafkaConnection, KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, + KafkaConnectionInner, KafkaContextCommon, KafkaProperties, RwConsumerContext, + KAFKA_ISOLATION_LEVEL, }; use crate::source::SourceEnumeratorContextRef; type KafkaClientType = BaseConsumer; -pub static SHARED_KAFKA_CLIENT: LazyLock>> = +pub static SHARED_KAFKA_CLIENT: LazyLock>> = LazyLock::new(|| moka::future::Cache::builder().build()); #[derive(Debug, Copy, Clone, Eq, PartialEq)] diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 030c190eb4942..34a165a5c29e3 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use crate::connector_common::{AwsAuthProps, KafkaConnection, KafkaPrivateLinkCommon}; +use crate::connector_common::{AwsAuthProps, KafkaConnectionInner, KafkaPrivateLinkCommon}; mod client_context; pub mod enumerator; @@ -144,7 +144,7 @@ pub struct KafkaProperties { pub common: KafkaCommon, #[serde(flatten)] - pub connection: KafkaConnection, + pub connection: KafkaConnectionInner, #[serde(flatten)] pub rdkafka_properties_common: RdKafkaPropertiesCommon, diff --git a/src/frontend/src/handler/create_connection.rs b/src/frontend/src/handler/create_connection.rs index cbfe6979a9ef4..61ab8798427e1 100644 --- a/src/frontend/src/handler/create_connection.rs +++ b/src/frontend/src/handler/create_connection.rs @@ -28,7 +28,7 @@ use crate::error::ErrorCode::ProtocolError; use crate::error::{ErrorCode, Result, RwError}; use crate::handler::HandlerArgs; use crate::session::SessionImpl; -use crate::utils::resolve_secret_ref_in_with_options; +use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options}; use crate::WithOptions; pub(crate) const CONNECTION_TYPE_PROP: &str = "type"; @@ -103,7 +103,8 @@ pub async fn handle_create_connection( }; } let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?; - let with_properties = handler_args.with_options.clone().into_connector_props(); + let mut with_properties = handler_args.with_options.clone().into_connector_props(); + resolve_privatelink_in_with_option(&mut with_properties)?; let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/utils/with_options.rs b/src/frontend/src/utils/with_options.rs index 27bd68c5c8e6b..01dcc4eab6557 100644 --- a/src/frontend/src/utils/with_options.rs +++ b/src/frontend/src/utils/with_options.rs @@ -16,6 +16,9 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; use risingwave_common::catalog::ConnectionId; +use risingwave_connector::connector_common::{ + PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, +}; use risingwave_connector::source::kafka::private_link::{ insert_privatelink_broker_rewrite_map, PRIVATELINK_ENDPOINT_KEY, }; @@ -234,8 +237,30 @@ pub(crate) fn resolve_connection_ref_and_secret_ref( }; let mut connection_type = PbConnectionType::Unspecified; - let connection_params_none_flag = connection_params.is_none(); + let connection_params_is_none_flag = connection_params.is_none(); + if let Some(connection_params) = connection_params { + // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` + // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users. + // + // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection, + // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given + // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog. + + if let Some(broker_rewrite_map) = connection_params + .get_properties() + .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY) + { + if options.contains_key(PRIVATE_LINK_TARGETS_KEY) + || options.contains_key(PRIVATELINK_ENDPOINT_KEY) + { + return Err(RwError::from(ErrorCode::InvalidParameterValue(format!( + "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause", + broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY + )))); + } + } + connection_type = connection_params.connection_type(); for (k, v) in connection_params.properties { if options.insert(k.clone(), v).is_some() { @@ -256,7 +281,7 @@ pub(crate) fn resolve_connection_ref_and_secret_ref( } } debug_assert!( - matches!(connection_type, PbConnectionType::Unspecified) && connection_params_none_flag + matches!(connection_type, PbConnectionType::Unspecified) && connection_params_is_none_flag ); Ok(( diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index f1c3bb0ffdd8a..b73d96d827b3b 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::error::BoxedError; +use risingwave_common::secret::SecretError; use risingwave_common::session_config::SessionConfigError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -132,6 +133,13 @@ pub enum MetaErrorInner { #[error("{0} has been deprecated, please use {1} instead.")] Deprecated(String, String), + + #[error("Secret error: {0}")] + SecretError( + #[from] + #[backtrace] + SecretError, + ), } impl MetaError { diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index f364ab95ecb68..15a2f978b8bbe 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -30,6 +30,7 @@ use risingwave_common::util::stream_graph_visitor::{ visit_stream_node, visit_stream_node_cont_mut, }; use risingwave_common::{bail, hash, must_match}; +use risingwave_connector::connector_common::validate_connection; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceProperties, SplitEnumerator, @@ -496,7 +497,7 @@ impl DdlController { } async fn create_connection(&self, connection: Connection) -> MetaResult { - // todo: do validation here + validate_connection(&connection).await?; self.metadata_manager .catalog_controller .create_connection(connection)