Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 18, 2024
1 parent aaa6a34 commit 1d2cb3d
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 70 deletions.
59 changes: 29 additions & 30 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl AwsAuthProps {

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct KafkaConnection {
pub struct KafkaConnectionInner {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

Expand Down Expand Up @@ -241,36 +241,10 @@ pub struct KafkaConnection {
sasl_oathbearer_config: Option<String>,
}

impl KafkaConnection {
#[cfg(test)]
pub fn test_default() -> Self {
Self {
brokers: "localhost:9092".to_string(),
security_protocol: None,
ssl_ca_location: None,
ssl_certificate_location: None,
ssl_key_location: None,
ssl_ca_pem: None,
ssl_certificate_pem: None,
ssl_key_pem: None,
ssl_key_password: None,
ssl_endpoint_identification_algorithm: None,
sasl_mechanism: None,
sasl_username: None,
sasl_password: None,
sasl_kerberos_service_name: None,
sasl_kerberos_keytab: None,
sasl_kerberos_principal: None,
sasl_kerberos_kinit_cmd: None,
sasl_kerberos_min_time_before_relogin: None,
sasl_oathbearer_config: None,
}
}
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
pub struct KafkaCommon {
// connection related props are moved to `KafkaConnection`
#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

Expand All @@ -283,7 +257,7 @@ pub struct KafkaCommon {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
Expand Down Expand Up @@ -348,7 +322,32 @@ impl RdKafkaPropertiesCommon {
}
}

impl KafkaConnection {
impl KafkaConnectionInner {
#[cfg(test)]
pub fn test_default() -> Self {
Self {
brokers: "localhost:9092".to_string(),
security_protocol: None,
ssl_ca_location: None,
ssl_certificate_location: None,
ssl_key_location: None,
ssl_ca_pem: None,
ssl_certificate_pem: None,
ssl_key_pem: None,
ssl_key_password: None,
ssl_endpoint_identification_algorithm: None,
sasl_mechanism: None,
sasl_username: None,
sasl_password: None,
sasl_kerberos_service_name: None,
sasl_kerberos_keytab: None,
sasl_kerberos_principal: None,
sasl_kerberos_kinit_cmd: None,
sasl_kerberos_min_time_before_relogin: None,
sasl_oathbearer_config: None,
}
}

pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
// AWS_MSK_IAM
if self.is_aws_msk_iam() {
Expand Down
125 changes: 125 additions & 0 deletions src/connector/src/connector_common/connection_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::ClientConfig;
use risingwave_common::secret::LocalSecretManager;
use risingwave_pb::catalog::PbConnection;
use serde_derive::Deserialize;
use serde_with::serde_as;
use tonic::async_trait;
use with_options::WithOptions;

use crate::connector_common::{AwsAuthProps, KafkaConnectionInner, KafkaPrivateLinkCommon};
use crate::error::ConnectorResult;
use crate::source::kafka::{KafkaContextCommon, RwConsumerContext};
use crate::{dispatch_connection_impl, ConnectionImpl};

#[async_trait]
pub trait ConnectionValidate {
async fn test_connection(&self) -> ConnectorResult<()>;
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)]
pub struct KafkaConnection {
#[serde(flatten)]
pub inner: KafkaConnectionInner,
#[serde(flatten)]
pub kafka_private_link_common: KafkaPrivateLinkCommon,
#[serde(flatten)]
pub aws_auth_props: AwsAuthProps,
}

pub async fn validate_connection(connection: &PbConnection) -> ConnectorResult<()> {
if let Some(ref info) = connection.info {
match info {
risingwave_pb::catalog::connection::Info::ConnectionParams(cp) => {
let options = cp.properties.clone().into_iter().collect();
let secret_refs = cp.secret_refs.clone().into_iter().collect();
let props_secret_resolved =
LocalSecretManager::global().fill_secrets(options, secret_refs)?;
let connection_impl =
ConnectionImpl::from_proto(cp.connection_type(), props_secret_resolved)?;
dispatch_connection_impl!(connection_impl, inner, inner.test_connection().await?)
}
risingwave_pb::catalog::connection::Info::PrivateLinkService(_) => unreachable!(),
}
}
Ok(())
}

#[async_trait]
impl ConnectionValidate for KafkaConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
let client = self.build_client().await?;
// describe cluster here
client.fetch_metadata(None, Duration::from_secs(10)).await?;
Ok(())
}
}

impl KafkaConnection {
async fn build_client(&self) -> ConnectorResult<BaseConsumer<RwConsumerContext>> {
let mut config = ClientConfig::new();
let bootstrap_servers = &self.inner.brokers;
let broker_rewrite_map = self.kafka_private_link_common.broker_rewrite_map.clone();
config.set("bootstrap.servers", bootstrap_servers);
self.inner.set_security_properties(&mut config);

// dup with Kafka Enumerator
let ctx_common = KafkaContextCommon::new(
broker_rewrite_map,
None,
None,
self.aws_auth_props.clone(),
self.inner.is_aws_msk_iam(),
)
.await?;
let client_ctx = RwConsumerContext::new(ctx_common);
let client: BaseConsumer<RwConsumerContext> =
config.create_with_context(client_ctx).await?;
if self.inner.is_aws_msk_iam() {
#[cfg(not(madsim))]
client.poll(Duration::from_secs(10)); // note: this is a blocking call
#[cfg(madsim)]
client.poll(Duration::from_secs(10)).await;
}
Ok(client)
}
}

#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
pub struct IcebergConnection {}

#[async_trait]
impl ConnectionValidate for IcebergConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
todo!()
}
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
pub struct SchemaRegistryConnection {}

#[async_trait]
impl ConnectionValidate for SchemaRegistryConnection {
async fn test_connection(&self) -> ConnectorResult<()> {
todo!()
}
}
4 changes: 0 additions & 4 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ use with_options::WithOptions;
use crate::deserialize_optional_bool_from_string;
use crate::error::ConnectorResult;

#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
pub struct IcebergConnection {}

#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)]
pub struct IcebergCommon {
Expand Down
29 changes: 7 additions & 22 deletions src/connector/src/connector_common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,15 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService};

mod common;
pub use common::{
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon,
AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnectionInner, KafkaPrivateLinkCommon,
KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon,
RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
};
mod connection_util;
pub use connection_util::{
validate_connection, ConnectionValidate, IcebergConnection, KafkaConnection,
SchemaRegistryConnection,
};

mod iceberg;
pub use iceberg::{IcebergCommon, IcebergConnection};

#[cfg(test)]
mod tests {

use super::*;
use crate::error::ConnectorResult;
use crate::{dispatch_connection_impl, ConnectionImpl};

#[test]
fn test_dispatch_connection() -> ConnectorResult<()> {
let kafka_conn = KafkaConnection::test_default();
let conn_impl = ConnectionImpl::from(kafka_conn);

let x: Result<(), ()> = dispatch_connection_impl!(conn_impl, inner, {
println!("{:?}", inner);
Ok(())
});
Ok(())
}
}
pub use iceberg::IcebergCommon;
20 changes: 17 additions & 3 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ macro_rules! for_all_connections {
($macro:path $(, $extra_args:tt)*) => {
$macro! {
{
{ Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType::Kafka },
{ Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType::Iceberg }
{ Kafka, $crate::connector_common::KafkaConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
{ Iceberg, $crate::connector_common::IcebergConnection, risingwave_pb::catalog::connection_params::PbConnectionType },
{ SchemaRegistry, $crate::connector_common::SchemaRegistryConnection, risingwave_pb::catalog::connection_params::PbConnectionType }
}
$(,$extra_args)*
}
Expand Down Expand Up @@ -213,7 +214,7 @@ macro_rules! dispatch_connection_impl_inner {

#[macro_export]
macro_rules! impl_connection {
({$({ $variant_name:ident, $connection:ty, $pb_connection_type:ty }),*}) => {
({$({ $variant_name:ident, $connection:ty, $pb_connection_path:path }),*}) => {
#[derive(Debug, Clone, EnumAsInner, PartialEq)]
pub enum ConnectionImpl {
$(
Expand All @@ -240,6 +241,19 @@ macro_rules! impl_connection {
}

)*

impl ConnectionImpl {
pub fn from_proto(pb_connection_type: risingwave_pb::catalog::connection_params::PbConnectionType, value_secret_filled: std::collections::BTreeMap<String, String>) -> $crate::error::ConnectorResult<Self> {
match pb_connection_type {
$(
<$pb_connection_path>::$variant_name => {
Ok(serde_json::from_value(json!(value_secret_filled)).map(ConnectionImpl::$variant_name).map_err($crate::error::ConnectorError::from)?)
},
)*
risingwave_pb::catalog::connection_params::PbConnectionType::Unspecified => unreachable!(),
}
}
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/sink/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use with_options::WithOptions;
use super::catalog::{SinkFormat, SinkFormatDesc};
use super::{Sink, SinkError, SinkParam};
use crate::connector_common::{
AwsAuthProps, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon,
AwsAuthProps, KafkaCommon, KafkaConnectionInner, KafkaPrivateLinkCommon,
RdKafkaPropertiesCommon,
};
use crate::sink::formatter::SinkFormatterImpl;
use crate::sink::log_store::DeliveryFutureManagerAddFuture;
Expand Down Expand Up @@ -215,7 +216,7 @@ pub struct KafkaConfig {
pub common: KafkaCommon,

#[serde(flatten)]
pub connection: KafkaConnection,
pub connection: KafkaConnectionInner,

#[serde(
rename = "properties.retry.max",
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ mod test {
options: BTreeMap::default(),
secret_refs: BTreeMap::default(),
key_encode: None,
connection_id: None,
};

let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
Expand Down Expand Up @@ -490,6 +491,7 @@ mod test {
options: btree_map,
secret_refs: Default::default(),
key_encode: None,
connection_id: None,
};

let mut redis_sink_writer = RedisSinkWriter::mock(schema, vec![0], &format_desc)
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::source::ConnectorSplit;
use serde::de::DeserializeOwned;
use serde_json::json;
use tokio::sync::mpsc;

use super::cdc::DebeziumCdcMeta;
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ use crate::error::{ConnectorError, ConnectorResult};
use crate::source::base::SplitEnumerator;
use crate::source::kafka::split::KafkaSplit;
use crate::source::kafka::{
KafkaConnection, KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL,
KafkaConnectionInner, KafkaContextCommon, KafkaProperties, RwConsumerContext,
KAFKA_ISOLATION_LEVEL,
};
use crate::source::SourceEnumeratorContextRef;

type KafkaClientType = BaseConsumer<RwConsumerContext>;

pub static SHARED_KAFKA_CLIENT: LazyLock<MokaCache<KafkaConnection, Weak<KafkaClientType>>> =
pub static SHARED_KAFKA_CLIENT: LazyLock<MokaCache<KafkaConnectionInner, Weak<KafkaClientType>>> =
LazyLock::new(|| moka::future::Cache::builder().build());

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::collections::HashMap;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};

use crate::connector_common::{AwsAuthProps, KafkaConnection, KafkaPrivateLinkCommon};
use crate::connector_common::{AwsAuthProps, KafkaConnectionInner, KafkaPrivateLinkCommon};

mod client_context;
pub mod enumerator;
Expand Down Expand Up @@ -144,7 +144,7 @@ pub struct KafkaProperties {
pub common: KafkaCommon,

#[serde(flatten)]
pub connection: KafkaConnection,
pub connection: KafkaConnectionInner,

#[serde(flatten)]
pub rdkafka_properties_common: RdKafkaPropertiesCommon,
Expand Down
Loading

0 comments on commit 1d2cb3d

Please sign in to comment.