From b9b2f79c30f628b1aae7e4422fcd4f7d9d949cb0 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 21:37:18 +0800 Subject: [PATCH 01/25] share kafka client on meta Signed-off-by: tabVersion --- .../src/source/kafka/enumerator/client.rs | 81 +++++++++++++------ src/meta/src/stream/source_manager.rs | 27 +++++++ 2 files changed, 83 insertions(+), 25 deletions(-) diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index a425de418ef4..67b1dd374886 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; use std::time::Duration; use anyhow::{anyhow, Context}; @@ -32,6 +33,16 @@ use crate::source::kafka::{ }; use crate::source::SourceEnumeratorContextRef; +pub static SHARED_KAFKA_CLIENT: LazyLock>> = + LazyLock::new(|| tokio::sync::Mutex::new(HashMap::new())); + +#[derive(Clone)] +pub struct SharedKafkaItem { + pub client: Arc>, + pub ref_count: i32, + pub props: KafkaProperties, +} + #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum KafkaEnumeratorOffset { Earliest, @@ -44,7 +55,7 @@ pub struct KafkaSplitEnumerator { context: SourceEnumeratorContextRef, broker_address: String, topic: String, - client: BaseConsumer, + client: Arc>, start_offset: KafkaEnumeratorOffset, // maybe used in the future for batch processing @@ -94,36 +105,56 @@ impl SplitEnumerator for KafkaSplitEnumerator { scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset) } - // don't need kafka metrics from enumerator - let ctx_common = KafkaContextCommon::new( - broker_rewrite_map, - None, - None, - properties.aws_auth_props, - common_props.is_aws_msk_iam(), - ) - .await?; - let client_ctx = RwConsumerContext::new(ctx_common); - let client: BaseConsumer = - config.create_with_context(client_ctx).await?; - - // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call - // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either - // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval - // of an initial token to occur. - // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf - if common_props.is_aws_msk_iam() { - #[cfg(not(madsim))] - client.poll(Duration::from_secs(10)); // note: this is a blocking call - #[cfg(madsim)] - client.poll(Duration::from_secs(10)).await; + let kafka_client: Arc>; + if let Some(item) = SHARED_KAFKA_CLIENT + .lock() + .await + .get_mut(broker_address.as_str()) + { + kafka_client = item.client.clone(); + item.ref_count += 1; + } else { + // don't need kafka metrics from enumerator + let ctx_common = KafkaContextCommon::new( + broker_rewrite_map, + None, + None, + properties.aws_auth_props.clone(), + common_props.is_aws_msk_iam(), + ) + .await?; + let client_ctx = RwConsumerContext::new(ctx_common); + let client: BaseConsumer = + config.create_with_context(client_ctx).await?; + + // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call + // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either + // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval + // of an initial token to occur. + // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf + if common_props.is_aws_msk_iam() { + #[cfg(not(madsim))] + client.poll(Duration::from_secs(10)); // note: this is a blocking call + #[cfg(madsim)] + client.poll(Duration::from_secs(10)).await; + } + + kafka_client = Arc::new(client); + SHARED_KAFKA_CLIENT.lock().await.insert( + broker_address.clone(), + SharedKafkaItem { + client: kafka_client.clone(), + ref_count: 1, + props: properties.clone(), + }, + ); } Ok(Self { context, broker_address, topic, - client, + client: kafka_client, start_offset: scan_start_offset, stop_offset: KafkaEnumeratorOffset::None, sync_call_timeout: properties.common.sync_call_timeout, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index c5bcc0c179ba..6f4b81099799 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -24,6 +24,7 @@ use anyhow::Context; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_connector::error::ConnectorResult; +use risingwave_connector::source::kafka::SHARED_KAFKA_CLIENT; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, SplitEnumerator, SplitId, SplitImpl, SplitMetaData, @@ -110,6 +111,7 @@ pub async fn create_source_worker_handle( let current_splits_ref = splits.clone(); let connector_properties = extract_prop_from_new_source(source)?; + let share_client_entry = get_kafka_bootstrap_addr(&connector_properties); let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { @@ -144,6 +146,7 @@ pub async fn create_source_worker_handle( sync_call_tx, splits, enable_scale_in, + share_client_entry, }) } @@ -264,6 +267,7 @@ pub struct ConnectorSourceWorkerHandle { sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, enable_scale_in: bool, + pub share_client_entry: Option, } impl ConnectorSourceWorkerHandle { @@ -1033,6 +1037,15 @@ impl SourceManager { for source_id in source_ids { if let Some(handle) = core.managed_sources.remove(&source_id) { handle.handle.abort(); + if let Some(entry) = handle.share_client_entry { + let mut share_client_guard = SHARED_KAFKA_CLIENT.lock().await; + if let Some(item) = share_client_guard.get_mut(&entry) { + item.ref_count -= 1; + if item.ref_count == 0 { + share_client_guard.remove(&entry); + } + } + } } } } @@ -1050,6 +1063,9 @@ impl SourceManager { let source_id = source.id; let connector_properties = extract_prop_from_existing_source(&source)?; + + let share_client_entry = get_kafka_bootstrap_addr(&connector_properties); + let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { @@ -1089,6 +1105,7 @@ impl SourceManager { sync_call_tx, splits, enable_scale_in, + share_client_entry, }, ); Ok(()) @@ -1180,6 +1197,16 @@ pub fn build_actor_split_impls( .collect() } +fn get_kafka_bootstrap_addr(connector_properties: &ConnectorProperties) -> Option { + { + // for kafka source: extract the bootstrap servers from the source properties as shared source entry (on meta) + if let ConnectorProperties::Kafka(kafka_props) = connector_properties { + return Some(kafka_props.common.brokers.clone()); + } + None + } +} + #[cfg(test)] mod tests { use std::collections::{BTreeMap, HashMap, HashSet}; From ab4667220baa7424c2b8e6211d3a5369ff29d9c5 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 22:30:11 +0800 Subject: [PATCH 02/25] check kafka connection identical Signed-off-by: tabVersion --- src/connector/src/connector_common/common.rs | 45 ++++++++++++------- src/connector/src/connector_common/mod.rs | 7 +-- src/connector/src/sink/kafka.rs | 8 ++-- .../src/source/kafka/enumerator/client.rs | 10 +++-- .../src/source/kafka/source/reader.rs | 9 ++-- src/meta/src/stream/source_manager.rs | 2 +- 6 files changed, 51 insertions(+), 30 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 1be41b4e09ca..4d29ad9c527b 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -60,8 +60,10 @@ use aws_types::region::Region; use aws_types::SdkConfig; use risingwave_common::util::env_var::env_var_is_true; +use crate::source::kafka::KafkaProperties; + /// A flatten config map for aws auth. -#[derive(Deserialize, Debug, Clone, WithOptions)] +#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)] pub struct AwsAuthProps { #[serde(rename = "aws.region", alias = "region")] pub region: Option, @@ -160,22 +162,18 @@ impl AwsAuthProps { } } +pub fn check_kafka_connection_identical(lhs: &KafkaProperties, rhs: &KafkaProperties) -> bool { + lhs.aws_auth_props == rhs.aws_auth_props + && lhs.privatelink_common == rhs.privatelink_common + && lhs.common.connection == rhs.common.connection +} + #[serde_as] -#[derive(Debug, Clone, Deserialize, WithOptions)] -pub struct KafkaCommon { +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] +pub struct KafkaConnection { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, - #[serde(rename = "topic", alias = "kafka.topic")] - pub topic: String, - - #[serde( - rename = "properties.sync.call.timeout", - deserialize_with = "deserialize_duration_from_string", - default = "default_kafka_sync_call_timeout" - )] - pub sync_call_timeout: Duration, - /// Security protocol used for RisingWave to communicate with Kafka brokers. Could be /// PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. #[serde(rename = "properties.security.protocol")] @@ -252,6 +250,23 @@ pub struct KafkaCommon { #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions)] +pub struct KafkaCommon { + #[serde(rename = "topic", alias = "kafka.topic")] + pub topic: String, + + #[serde( + rename = "properties.sync.call.timeout", + deserialize_with = "deserialize_duration_from_string", + default = "default_kafka_sync_call_timeout" + )] + pub sync_call_timeout: Duration, + + #[serde(flatten)] + pub connection: KafkaConnection, +} + +#[serde_as] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] pub struct KafkaPrivateLinkCommon { /// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. #[serde(rename = "broker.rewrite.endpoints")] @@ -269,7 +284,7 @@ pub struct RdKafkaPropertiesCommon { /// Maximum Kafka protocol request message size. Due to differing framing overhead between /// protocol versions the producer is unable to reliably enforce a strict max message limit at /// produce time and may exceed the maximum size by one message in protocol ProduceRequests, - /// the broker will enforce the the topic's max.message.bytes limit + /// the broker will enforce the topic's max.message.bytes limit #[serde(rename = "properties.message.max.bytes")] #[serde_as(as = "Option")] pub message_max_bytes: Option, @@ -316,7 +331,7 @@ impl RdKafkaPropertiesCommon { } } -impl KafkaCommon { +impl KafkaConnection { pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { // AWS_MSK_IAM if self.is_aws_msk_iam() { diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index 30d699198ccd..024ff5cb5558 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -19,9 +19,10 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService}; mod common; pub use common::{ - AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaPrivateLinkCommon, KinesisCommon, - MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon, - PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, + check_kafka_connection_identical, AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, + KafkaPrivateLinkCommon, KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, + PulsarOauthCommon, RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, + PRIVATE_LINK_TARGETS_KEY, }; mod iceberg; diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 68f20fcf7d58..25745d31c018 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -368,7 +368,7 @@ impl Sink for KafkaSink { if !check.check_reachability().await { return Err(SinkError::Config(anyhow!( "cannot connect to kafka broker ({})", - self.config.common.brokers + self.config.common.connection.brokers ))); } Ok(()) @@ -413,11 +413,11 @@ impl KafkaSinkWriter { let mut c = ClientConfig::new(); // KafkaConfig configuration - config.common.set_security_properties(&mut c); + config.common.connection.set_security_properties(&mut c); config.set_client(&mut c); // ClientConfig configuration - c.set("bootstrap.servers", &config.common.brokers); + c.set("bootstrap.servers", &config.common.connection.brokers); // Create the producer context, will be used to create the producer let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone(); @@ -426,7 +426,7 @@ impl KafkaSinkWriter { None, None, config.aws_auth_props.clone(), - config.common.is_aws_msk_iam(), + config.common.connection.is_aws_msk_iam(), ) .await?; let producer_ctx = RwProducerContext::new(ctx_common); diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 67b1dd374886..1b1ea962bc13 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -25,6 +25,7 @@ use rdkafka::{Offset, TopicPartitionList}; use risingwave_common::bail; use risingwave_common::metrics::LabelGuardedMetric; +use crate::connector_common::check_kafka_connection_identical; use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; @@ -79,12 +80,12 @@ impl SplitEnumerator for KafkaSplitEnumerator { let mut config = rdkafka::ClientConfig::new(); let common_props = &properties.common; - let broker_address = common_props.brokers.clone(); + let broker_address = common_props.connection.brokers.clone(); let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); config.set("isolation.level", KAFKA_ISOLATION_LEVEL); - common_props.set_security_properties(&mut config); + common_props.connection.set_security_properties(&mut config); properties.set_client(&mut config); let mut scan_start_offset = match properties .scan_startup_mode @@ -110,6 +111,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { .lock() .await .get_mut(broker_address.as_str()) + && check_kafka_connection_identical(&item.props, &properties) { kafka_client = item.client.clone(); item.ref_count += 1; @@ -120,7 +122,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { None, None, properties.aws_auth_props.clone(), - common_props.is_aws_msk_iam(), + common_props.connection.is_aws_msk_iam(), ) .await?; let client_ctx = RwConsumerContext::new(ctx_common); @@ -132,7 +134,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval // of an initial token to occur. // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf - if common_props.is_aws_msk_iam() { + if common_props.connection.is_aws_msk_iam() { #[cfg(not(madsim))] client.poll(Duration::from_secs(10)); // note: this is a blocking call #[cfg(madsim)] diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index d58f1b70dd9f..fd930344ca8b 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -64,7 +64,7 @@ impl SplitReader for KafkaSplitReader { ) -> Result { let mut config = ClientConfig::new(); - let bootstrap_servers = &properties.common.brokers; + let bootstrap_servers = &properties.common.connection.brokers; let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); // disable partition eof @@ -73,7 +73,10 @@ impl SplitReader for KafkaSplitReader { config.set("isolation.level", KAFKA_ISOLATION_LEVEL); config.set("bootstrap.servers", bootstrap_servers); - properties.common.set_security_properties(&mut config); + properties + .common + .connection + .set_security_properties(&mut config); properties.set_client(&mut config); let group_id_prefix = properties @@ -95,7 +98,7 @@ impl SplitReader for KafkaSplitReader { // explicitly Some(source_ctx.metrics.rdkafka_native_metric.clone()), properties.aws_auth_props, - properties.common.is_aws_msk_iam(), + properties.common.connection.is_aws_msk_iam(), ) .await?; diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 6f4b81099799..84dc1cda4c5e 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -1201,7 +1201,7 @@ fn get_kafka_bootstrap_addr(connector_properties: &ConnectorProperties) -> Optio { // for kafka source: extract the bootstrap servers from the source properties as shared source entry (on meta) if let ConnectorProperties::Kafka(kafka_props) = connector_properties { - return Some(kafka_props.common.brokers.clone()); + return Some(kafka_props.common.connection.brokers.clone()); } None } From 3373a5828c2ebae9c3f1c15a376ee734b69c4983 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 22:30:44 +0800 Subject: [PATCH 03/25] fix Signed-off-by: tabVersion --- src/connector/src/sink/kafka.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 25745d31c018..ee8fc4197e48 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -685,7 +685,7 @@ mod test { "broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(), }; let config = KafkaConfig::from_btreemap(properties).unwrap(); - assert_eq!(config.common.brokers, "localhost:9092"); + assert_eq!(config.common.connection.brokers, "localhost:9092"); assert_eq!(config.common.topic, "test"); assert_eq!(config.max_retry_num, 20); assert_eq!(config.retry_interval, Duration::from_millis(500)); From c5203d6c22d5cca2a60c18c300326d9c5b73b1e6 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 22 Oct 2024 23:27:44 +0800 Subject: [PATCH 04/25] fix with props Signed-off-by: tabVersion --- src/connector/with_options_sink.yaml | 12 ++++++------ src/connector/with_options_source.yaml | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index eb03288dfbcc..dd2dd7098f99 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -434,11 +434,6 @@ IcebergConfig: default: Default::default KafkaConfig: fields: - - name: properties.bootstrap.server - field_type: String - required: true - alias: - - kafka.brokers - name: topic field_type: String required: true @@ -448,6 +443,11 @@ KafkaConfig: field_type: Duration required: false default: 'Duration :: from_secs (5)' + - name: properties.bootstrap.server + field_type: String + required: true + alias: + - kafka.brokers - name: properties.security.protocol field_type: String comments: |- @@ -542,7 +542,7 @@ KafkaConfig: Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, - the broker will enforce the the topic's max.message.bytes limit + the broker will enforce the topic's max.message.bytes limit required: false - name: properties.receive.message.max.bytes field_type: usize diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 20d3fc171423..b1b3731a1d38 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -178,11 +178,6 @@ KafkaProperties: combine both key and value fields of the Kafka message. TODO: Currently, `Option` can not be parsed here. required: false - - name: properties.bootstrap.server - field_type: String - required: true - alias: - - kafka.brokers - name: topic field_type: String required: true @@ -192,6 +187,11 @@ KafkaProperties: field_type: Duration required: false default: 'Duration :: from_secs (5)' + - name: properties.bootstrap.server + field_type: String + required: true + alias: + - kafka.brokers - name: properties.security.protocol field_type: String comments: |- @@ -271,7 +271,7 @@ KafkaProperties: Maximum Kafka protocol request message size. Due to differing framing overhead between protocol versions the producer is unable to reliably enforce a strict max message limit at produce time and may exceed the maximum size by one message in protocol ProduceRequests, - the broker will enforce the the topic's max.message.bytes limit + the broker will enforce the topic's max.message.bytes limit required: false - name: properties.receive.message.max.bytes field_type: usize From 3b6a6a24c6291038a85cf6cf31bf4901fa30583a Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 00:17:52 +0800 Subject: [PATCH 05/25] fix Signed-off-by: tabVersion --- src/connector/src/connector_common/common.rs | 5 +- src/connector/src/connector_common/mod.rs | 4 +- src/connector/src/sink/kafka.rs | 14 +- .../src/source/kafka/enumerator/client.rs | 8 +- src/connector/src/source/kafka/mod.rs | 5 +- .../src/source/kafka/source/reader.rs | 9 +- src/connector/with_options_sink.yaml | 159 +++++++++--------- src/meta/src/stream/source_manager.rs | 2 +- 8 files changed, 106 insertions(+), 100 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 4d29ad9c527b..3c3f1d8493fb 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -165,7 +165,7 @@ impl AwsAuthProps { pub fn check_kafka_connection_identical(lhs: &KafkaProperties, rhs: &KafkaProperties) -> bool { lhs.aws_auth_props == rhs.aws_auth_props && lhs.privatelink_common == rhs.privatelink_common - && lhs.common.connection == rhs.common.connection + && lhs.connection == rhs.connection } #[serde_as] @@ -260,9 +260,6 @@ pub struct KafkaCommon { default = "default_kafka_sync_call_timeout" )] pub sync_call_timeout: Duration, - - #[serde(flatten)] - pub connection: KafkaConnection, } #[serde_as] diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index 024ff5cb5558..fcce2aa80a37 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -20,8 +20,8 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService}; mod common; pub use common::{ check_kafka_connection_identical, AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, - KafkaPrivateLinkCommon, KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, - PulsarOauthCommon, RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, + KafkaConnection, KafkaPrivateLinkCommon, KinesisCommon, MongodbCommon, NatsCommon, + PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, }; diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index ee8fc4197e48..67d2d0854b0c 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -214,6 +214,9 @@ pub struct KafkaConfig { #[serde(flatten)] pub common: KafkaCommon, + #[serde(flatten)] + pub connection: crate::connector_common::KafkaConnection, + #[serde( rename = "properties.retry.max", default = "_default_max_retries", @@ -269,6 +272,7 @@ impl From for KafkaProperties { time_offset: None, upsert: None, common: val.common, + connection: val.connection, rdkafka_properties_common: val.rdkafka_properties_common, rdkafka_properties_consumer: Default::default(), privatelink_common: val.privatelink_common, @@ -368,7 +372,7 @@ impl Sink for KafkaSink { if !check.check_reachability().await { return Err(SinkError::Config(anyhow!( "cannot connect to kafka broker ({})", - self.config.common.connection.brokers + self.config.connection.brokers ))); } Ok(()) @@ -413,11 +417,11 @@ impl KafkaSinkWriter { let mut c = ClientConfig::new(); // KafkaConfig configuration - config.common.connection.set_security_properties(&mut c); + config.connection.set_security_properties(&mut c); config.set_client(&mut c); // ClientConfig configuration - c.set("bootstrap.servers", &config.common.connection.brokers); + c.set("bootstrap.servers", &config.connection.brokers); // Create the producer context, will be used to create the producer let broker_rewrite_map = config.privatelink_common.broker_rewrite_map.clone(); @@ -426,7 +430,7 @@ impl KafkaSinkWriter { None, None, config.aws_auth_props.clone(), - config.common.connection.is_aws_msk_iam(), + config.connection.is_aws_msk_iam(), ) .await?; let producer_ctx = RwProducerContext::new(ctx_common); @@ -685,7 +689,7 @@ mod test { "broker.rewrite.endpoints".to_string() => "{\"broker1\": \"10.0.0.1:8001\"}".to_string(), }; let config = KafkaConfig::from_btreemap(properties).unwrap(); - assert_eq!(config.common.connection.brokers, "localhost:9092"); + assert_eq!(config.connection.brokers, "localhost:9092"); assert_eq!(config.common.topic, "test"); assert_eq!(config.max_retry_num, 20); assert_eq!(config.retry_interval, Duration::from_millis(500)); diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 1b1ea962bc13..cdb5fbf9b88f 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -80,12 +80,12 @@ impl SplitEnumerator for KafkaSplitEnumerator { let mut config = rdkafka::ClientConfig::new(); let common_props = &properties.common; - let broker_address = common_props.connection.brokers.clone(); + let broker_address = properties.connection.brokers.clone(); let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); config.set("isolation.level", KAFKA_ISOLATION_LEVEL); - common_props.connection.set_security_properties(&mut config); + properties.connection.set_security_properties(&mut config); properties.set_client(&mut config); let mut scan_start_offset = match properties .scan_startup_mode @@ -122,7 +122,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { None, None, properties.aws_auth_props.clone(), - common_props.connection.is_aws_msk_iam(), + properties.connection.is_aws_msk_iam(), ) .await?; let client_ctx = RwConsumerContext::new(ctx_common); @@ -134,7 +134,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval // of an initial token to occur. // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf - if common_props.connection.is_aws_msk_iam() { + if properties.connection.is_aws_msk_iam() { #[cfg(not(madsim))] client.poll(Duration::from_secs(10)); // note: this is a blocking call #[cfg(madsim)] diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index 125bf73a3529..030c190eb494 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use crate::connector_common::{AwsAuthProps, KafkaPrivateLinkCommon}; +use crate::connector_common::{AwsAuthProps, KafkaConnection, KafkaPrivateLinkCommon}; mod client_context; pub mod enumerator; @@ -143,6 +143,9 @@ pub struct KafkaProperties { #[serde(flatten)] pub common: KafkaCommon, + #[serde(flatten)] + pub connection: KafkaConnection, + #[serde(flatten)] pub rdkafka_properties_common: RdKafkaPropertiesCommon, diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index fd930344ca8b..b9523eca98b5 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -64,7 +64,7 @@ impl SplitReader for KafkaSplitReader { ) -> Result { let mut config = ClientConfig::new(); - let bootstrap_servers = &properties.common.connection.brokers; + let bootstrap_servers = &properties.connection.brokers; let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); // disable partition eof @@ -73,10 +73,7 @@ impl SplitReader for KafkaSplitReader { config.set("isolation.level", KAFKA_ISOLATION_LEVEL); config.set("bootstrap.servers", bootstrap_servers); - properties - .common - .connection - .set_security_properties(&mut config); + properties.connection.set_security_properties(&mut config); properties.set_client(&mut config); let group_id_prefix = properties @@ -98,7 +95,7 @@ impl SplitReader for KafkaSplitReader { // explicitly Some(source_ctx.metrics.rdkafka_native_metric.clone()), properties.aws_auth_props, - properties.common.connection.is_aws_msk_iam(), + properties.connection.is_aws_msk_iam(), ) .await?; diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index dd2dd7098f99..9b63353bc41f 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -443,84 +443,9 @@ KafkaConfig: field_type: Duration required: false default: 'Duration :: from_secs (5)' - - name: properties.bootstrap.server - field_type: String + - name: connection + field_type: crate::connector_common::KafkaConnection required: true - alias: - - kafka.brokers - - name: properties.security.protocol - field_type: String - comments: |- - Security protocol used for RisingWave to communicate with Kafka brokers. Could be - PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. - required: false - - name: properties.ssl.endpoint.identification.algorithm - field_type: String - required: false - - name: properties.ssl.ca.location - field_type: String - comments: Path to CA certificate file for verifying the broker's key. - required: false - - name: properties.ssl.ca.pem - field_type: String - comments: CA certificate string (PEM format) for verifying the broker's key. - required: false - - name: properties.ssl.certificate.location - field_type: String - comments: Path to client's certificate file (PEM). - required: false - - name: properties.ssl.certificate.pem - field_type: String - comments: Client's public key string (PEM format) used for authentication. - required: false - - name: properties.ssl.key.location - field_type: String - comments: Path to client's private key file (PEM). - required: false - - name: properties.ssl.key.pem - field_type: String - comments: Client's private key string (PEM format) used for authentication. - required: false - - name: properties.ssl.key.password - field_type: String - comments: Passphrase of client's private key. - required: false - - name: properties.sasl.mechanism - field_type: String - comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM. - required: false - - name: properties.sasl.username - field_type: String - comments: SASL username for SASL/PLAIN and SASL/SCRAM. - required: false - - name: properties.sasl.password - field_type: String - comments: SASL password for SASL/PLAIN and SASL/SCRAM. - required: false - - name: properties.sasl.kerberos.service.name - field_type: String - comments: Kafka server's Kerberos principal name under SASL/GSSAPI, not including /hostname@REALM. - required: false - - name: properties.sasl.kerberos.keytab - field_type: String - comments: Path to client's Kerberos keytab file under SASL/GSSAPI. - required: false - - name: properties.sasl.kerberos.principal - field_type: String - comments: Client's Kerberos principal name under SASL/GSSAPI. - required: false - - name: properties.sasl.kerberos.kinit.cmd - field_type: String - comments: Shell command to refresh or acquire the client's Kerberos ticket under SASL/GSSAPI. - required: false - - name: properties.sasl.kerberos.min.time.before.relogin - field_type: String - comments: Minimum time in milliseconds between key refresh attempts under SASL/GSSAPI. - required: false - - name: properties.sasl.oauthbearer.config - field_type: String - comments: Configurations for SASL/OAUTHBEARER. - required: false - name: properties.retry.max field_type: u32 required: false @@ -682,6 +607,86 @@ KafkaConfig: required: false alias: - profile +KafkaConnection: + fields: + - name: properties.bootstrap.server + field_type: String + required: true + alias: + - kafka.brokers + - name: properties.security.protocol + field_type: String + comments: |- + Security protocol used for RisingWave to communicate with Kafka brokers. Could be + PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. + required: false + - name: properties.ssl.endpoint.identification.algorithm + field_type: String + required: false + - name: properties.ssl.ca.location + field_type: String + comments: Path to CA certificate file for verifying the broker's key. + required: false + - name: properties.ssl.ca.pem + field_type: String + comments: CA certificate string (PEM format) for verifying the broker's key. + required: false + - name: properties.ssl.certificate.location + field_type: String + comments: Path to client's certificate file (PEM). + required: false + - name: properties.ssl.certificate.pem + field_type: String + comments: Client's public key string (PEM format) used for authentication. + required: false + - name: properties.ssl.key.location + field_type: String + comments: Path to client's private key file (PEM). + required: false + - name: properties.ssl.key.pem + field_type: String + comments: Client's private key string (PEM format) used for authentication. + required: false + - name: properties.ssl.key.password + field_type: String + comments: Passphrase of client's private key. + required: false + - name: properties.sasl.mechanism + field_type: String + comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM. + required: false + - name: properties.sasl.username + field_type: String + comments: SASL username for SASL/PLAIN and SASL/SCRAM. + required: false + - name: properties.sasl.password + field_type: String + comments: SASL password for SASL/PLAIN and SASL/SCRAM. + required: false + - name: properties.sasl.kerberos.service.name + field_type: String + comments: Kafka server's Kerberos principal name under SASL/GSSAPI, not including /hostname@REALM. + required: false + - name: properties.sasl.kerberos.keytab + field_type: String + comments: Path to client's Kerberos keytab file under SASL/GSSAPI. + required: false + - name: properties.sasl.kerberos.principal + field_type: String + comments: Client's Kerberos principal name under SASL/GSSAPI. + required: false + - name: properties.sasl.kerberos.kinit.cmd + field_type: String + comments: Shell command to refresh or acquire the client's Kerberos ticket under SASL/GSSAPI. + required: false + - name: properties.sasl.kerberos.min.time.before.relogin + field_type: String + comments: Minimum time in milliseconds between key refresh attempts under SASL/GSSAPI. + required: false + - name: properties.sasl.oauthbearer.config + field_type: String + comments: Configurations for SASL/OAUTHBEARER. + required: false KinesisSinkConfig: fields: - name: stream diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 84dc1cda4c5e..deaf34da2a54 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -1201,7 +1201,7 @@ fn get_kafka_bootstrap_addr(connector_properties: &ConnectorProperties) -> Optio { // for kafka source: extract the bootstrap servers from the source properties as shared source entry (on meta) if let ConnectorProperties::Kafka(kafka_props) = connector_properties { - return Some(kafka_props.common.connection.brokers.clone()); + return Some(kafka_props.connection.brokers.clone()); } None } From 3b3f7256fa46a18b1fe4007dc7cd5e58ea4f0205 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 01:06:14 +0800 Subject: [PATCH 06/25] use connection hash as hashmap entry Signed-off-by: tabVersion --- src/connector/src/connector_common/common.rs | 9 ++++++++- .../src/source/kafka/enumerator/client.rs | 19 ++++++------------- src/meta/src/stream/source_manager.rs | 12 ++++++------ 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 3c3f1d8493fb..6015461344eb 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::hash::{Hash, Hasher}; use std::io::Write; use std::time::Duration; @@ -169,7 +170,7 @@ pub fn check_kafka_connection_identical(lhs: &KafkaProperties, rhs: &KafkaProper } #[serde_as] -#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq)] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash)] pub struct KafkaConnection { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, @@ -329,6 +330,12 @@ impl RdKafkaPropertiesCommon { } impl KafkaConnection { + pub fn get_hash(&self) -> u64 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + self.hash(&mut hasher); + hasher.finish() + } + pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { // AWS_MSK_IAM if self.is_aws_msk_iam() { diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index cdb5fbf9b88f..9ae66afefec6 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -25,7 +25,6 @@ use rdkafka::{Offset, TopicPartitionList}; use risingwave_common::bail; use risingwave_common::metrics::LabelGuardedMetric; -use crate::connector_common::check_kafka_connection_identical; use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; @@ -34,14 +33,12 @@ use crate::source::kafka::{ }; use crate::source::SourceEnumeratorContextRef; -pub static SHARED_KAFKA_CLIENT: LazyLock>> = +pub static SHARED_KAFKA_CLIENT: LazyLock>> = LazyLock::new(|| tokio::sync::Mutex::new(HashMap::new())); -#[derive(Clone)] pub struct SharedKafkaItem { pub client: Arc>, pub ref_count: i32, - pub props: KafkaProperties, } #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -81,6 +78,8 @@ impl SplitEnumerator for KafkaSplitEnumerator { let common_props = &properties.common; let broker_address = properties.connection.brokers.clone(); + + let connection_hash = properties.connection.get_hash(); let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); @@ -107,12 +106,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { } let kafka_client: Arc>; - if let Some(item) = SHARED_KAFKA_CLIENT - .lock() - .await - .get_mut(broker_address.as_str()) - && check_kafka_connection_identical(&item.props, &properties) - { + if let Some(item) = SHARED_KAFKA_CLIENT.lock().await.get_mut(&connection_hash) { kafka_client = item.client.clone(); item.ref_count += 1; } else { @@ -121,7 +115,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { broker_rewrite_map, None, None, - properties.aws_auth_props.clone(), + properties.aws_auth_props, properties.connection.is_aws_msk_iam(), ) .await?; @@ -143,11 +137,10 @@ impl SplitEnumerator for KafkaSplitEnumerator { kafka_client = Arc::new(client); SHARED_KAFKA_CLIENT.lock().await.insert( - broker_address.clone(), + connection_hash, SharedKafkaItem { client: kafka_client.clone(), ref_count: 1, - props: properties.clone(), }, ); } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index deaf34da2a54..57733c15f601 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -111,7 +111,7 @@ pub async fn create_source_worker_handle( let current_splits_ref = splits.clone(); let connector_properties = extract_prop_from_new_source(source)?; - let share_client_entry = get_kafka_bootstrap_addr(&connector_properties); + let share_client_entry = get_kafka_connection_hash(&connector_properties); let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { @@ -267,7 +267,7 @@ pub struct ConnectorSourceWorkerHandle { sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, enable_scale_in: bool, - pub share_client_entry: Option, + pub share_client_entry: Option, } impl ConnectorSourceWorkerHandle { @@ -1064,7 +1064,7 @@ impl SourceManager { let connector_properties = extract_prop_from_existing_source(&source)?; - let share_client_entry = get_kafka_bootstrap_addr(&connector_properties); + let share_client_entry = get_kafka_connection_hash(&connector_properties); let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -1197,11 +1197,11 @@ pub fn build_actor_split_impls( .collect() } -fn get_kafka_bootstrap_addr(connector_properties: &ConnectorProperties) -> Option { +fn get_kafka_connection_hash(connector_properties: &ConnectorProperties) -> Option { { - // for kafka source: extract the bootstrap servers from the source properties as shared source entry (on meta) + // for kafka source: get the hash of connection props as shared source entry (on meta) if let ConnectorProperties::Kafka(kafka_props) = connector_properties { - return Some(kafka_props.connection.brokers.clone()); + return Some(kafka_props.connection.get_hash()); } None } From 968ed087d76761f24c75ccf0e1073daf381b501b Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 01:11:25 +0800 Subject: [PATCH 07/25] fix Signed-off-by: tabVersion --- src/connector/src/connector_common/common.rs | 6 ------ src/connector/src/connector_common/mod.rs | 7 +++---- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 6015461344eb..b21f63a4b16b 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -163,12 +163,6 @@ impl AwsAuthProps { } } -pub fn check_kafka_connection_identical(lhs: &KafkaProperties, rhs: &KafkaProperties) -> bool { - lhs.aws_auth_props == rhs.aws_auth_props - && lhs.privatelink_common == rhs.privatelink_common - && lhs.connection == rhs.connection -} - #[serde_as] #[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash)] pub struct KafkaConnection { diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index fcce2aa80a37..57b614fdf548 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -19,10 +19,9 @@ pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService}; mod common; pub use common::{ - check_kafka_connection_identical, AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, - KafkaConnection, KafkaPrivateLinkCommon, KinesisCommon, MongodbCommon, NatsCommon, - PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, - PRIVATE_LINK_TARGETS_KEY, + AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, + KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, + RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, }; mod iceberg; From a41f3fceea4f8a415216da92f5b05922d60b1681 Mon Sep 17 00:00:00 2001 From: tabversion Date: Wed, 23 Oct 2024 13:45:35 +0800 Subject: [PATCH 08/25] fix Signed-off-by: tabversion --- src/connector/src/connector_common/common.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index b21f63a4b16b..7c3c273e99ac 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -61,8 +61,6 @@ use aws_types::region::Region; use aws_types::SdkConfig; use risingwave_common::util::env_var::env_var_is_true; -use crate::source::kafka::KafkaProperties; - /// A flatten config map for aws auth. #[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)] pub struct AwsAuthProps { From 6534ebf57eec5530b86719b4a37d97f93e751251 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 15:58:44 +0800 Subject: [PATCH 09/25] rerun Signed-off-by: tabVersion From ad8b98929f59a2f0af4440328cbd48b399de996a Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 18:03:04 +0800 Subject: [PATCH 10/25] fix Signed-off-by: tabVersion --- .../src/source/kafka/enumerator/client.rs | 22 ++++++++++++++++--- src/meta/src/stream/source_manager.rs | 2 +- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 9ae66afefec6..af202b76ee9b 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -106,10 +106,20 @@ impl SplitEnumerator for KafkaSplitEnumerator { } let kafka_client: Arc>; - if let Some(item) = SHARED_KAFKA_CLIENT.lock().await.get_mut(&connection_hash) { + let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await; + if let Some(item) = shared_client_guard.get_mut(&connection_hash) { + tracing::info!( + "reusing kafka client for connection hash {}, to broker {}", + connection_hash, + broker_address + ); kafka_client = item.client.clone(); item.ref_count += 1; + drop(shared_client_guard); } else { + // drop the guard and acquire a new one to avoid a 10s blocking call + drop(shared_client_guard); + // don't need kafka metrics from enumerator let ctx_common = KafkaContextCommon::new( broker_rewrite_map, @@ -136,7 +146,13 @@ impl SplitEnumerator for KafkaSplitEnumerator { } kafka_client = Arc::new(client); - SHARED_KAFKA_CLIENT.lock().await.insert( + tracing::debug!( + "created kafka client for connection hash {} to broker {}", + connection_hash, + broker_address + ); + let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await; + shared_client_guard.insert( connection_hash, SharedKafkaItem { client: kafka_client.clone(), @@ -174,7 +190,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { .fetch_stop_offset(topic_partitions.as_ref(), &watermarks) .await?; - let ret = topic_partitions + let ret: Vec<_> = topic_partitions .into_iter() .map(|partition| KafkaSplit { topic: self.topic.clone(), diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 57733c15f601..74098e7ebdd7 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -140,7 +140,6 @@ pub async fn create_source_worker_handle( tokio::spawn(async move { worker.run(sync_call_rx).await }) }); - Ok(ConnectorSourceWorkerHandle { handle, sync_call_tx, @@ -1042,6 +1041,7 @@ impl SourceManager { if let Some(item) = share_client_guard.get_mut(&entry) { item.ref_count -= 1; if item.ref_count == 0 { + tracing::info!("removing shared kafka client entry {}", entry); share_client_guard.remove(&entry); } } From ae1b70a3b591b2cd7196353676069358ac1ea95a Mon Sep 17 00:00:00 2001 From: tabVersion Date: Wed, 23 Oct 2024 20:06:49 +0800 Subject: [PATCH 11/25] fix Signed-off-by: tabVersion --- e2e_test/source_legacy/basic/ddl.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e_test/source_legacy/basic/ddl.slt b/e2e_test/source_legacy/basic/ddl.slt index 8e63971fb5b8..4ce630f04da6 100644 --- a/e2e_test/source_legacy/basic/ddl.slt +++ b/e2e_test/source_legacy/basic/ddl.slt @@ -30,7 +30,7 @@ Caused by these errors (recent errors listed first): 1: gRPC request to meta service failed: Internal error 2: failed to create source worker 3: failed to parse json - 4: missing field `properties.bootstrap.server` + 4: missing field `topic` statement error From 58b51280f94784154d1c31994791628748a0c42b Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 25 Oct 2024 10:25:38 +0800 Subject: [PATCH 12/25] better with options --- src/connector/src/sink/kafka.rs | 4 +- src/connector/with_options_sink.yaml | 159 +++++++++++++-------------- 2 files changed, 79 insertions(+), 84 deletions(-) diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 67d2d0854b0c..9fc8da7ef7a4 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -35,7 +35,7 @@ use with_options::WithOptions; use super::catalog::{SinkFormat, SinkFormatDesc}; use super::{Sink, SinkError, SinkParam}; use crate::connector_common::{ - AwsAuthProps, KafkaCommon, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, + AwsAuthProps, KafkaCommon, KafkaConnection, KafkaPrivateLinkCommon, RdKafkaPropertiesCommon, }; use crate::sink::formatter::SinkFormatterImpl; use crate::sink::log_store::DeliveryFutureManagerAddFuture; @@ -215,7 +215,7 @@ pub struct KafkaConfig { pub common: KafkaCommon, #[serde(flatten)] - pub connection: crate::connector_common::KafkaConnection, + pub connection: KafkaConnection, #[serde( rename = "properties.retry.max", diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 9b63353bc41f..dd2dd7098f99 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -443,9 +443,84 @@ KafkaConfig: field_type: Duration required: false default: 'Duration :: from_secs (5)' - - name: connection - field_type: crate::connector_common::KafkaConnection + - name: properties.bootstrap.server + field_type: String required: true + alias: + - kafka.brokers + - name: properties.security.protocol + field_type: String + comments: |- + Security protocol used for RisingWave to communicate with Kafka brokers. Could be + PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. + required: false + - name: properties.ssl.endpoint.identification.algorithm + field_type: String + required: false + - name: properties.ssl.ca.location + field_type: String + comments: Path to CA certificate file for verifying the broker's key. + required: false + - name: properties.ssl.ca.pem + field_type: String + comments: CA certificate string (PEM format) for verifying the broker's key. + required: false + - name: properties.ssl.certificate.location + field_type: String + comments: Path to client's certificate file (PEM). + required: false + - name: properties.ssl.certificate.pem + field_type: String + comments: Client's public key string (PEM format) used for authentication. + required: false + - name: properties.ssl.key.location + field_type: String + comments: Path to client's private key file (PEM). + required: false + - name: properties.ssl.key.pem + field_type: String + comments: Client's private key string (PEM format) used for authentication. + required: false + - name: properties.ssl.key.password + field_type: String + comments: Passphrase of client's private key. + required: false + - name: properties.sasl.mechanism + field_type: String + comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM. + required: false + - name: properties.sasl.username + field_type: String + comments: SASL username for SASL/PLAIN and SASL/SCRAM. + required: false + - name: properties.sasl.password + field_type: String + comments: SASL password for SASL/PLAIN and SASL/SCRAM. + required: false + - name: properties.sasl.kerberos.service.name + field_type: String + comments: Kafka server's Kerberos principal name under SASL/GSSAPI, not including /hostname@REALM. + required: false + - name: properties.sasl.kerberos.keytab + field_type: String + comments: Path to client's Kerberos keytab file under SASL/GSSAPI. + required: false + - name: properties.sasl.kerberos.principal + field_type: String + comments: Client's Kerberos principal name under SASL/GSSAPI. + required: false + - name: properties.sasl.kerberos.kinit.cmd + field_type: String + comments: Shell command to refresh or acquire the client's Kerberos ticket under SASL/GSSAPI. + required: false + - name: properties.sasl.kerberos.min.time.before.relogin + field_type: String + comments: Minimum time in milliseconds between key refresh attempts under SASL/GSSAPI. + required: false + - name: properties.sasl.oauthbearer.config + field_type: String + comments: Configurations for SASL/OAUTHBEARER. + required: false - name: properties.retry.max field_type: u32 required: false @@ -607,86 +682,6 @@ KafkaConfig: required: false alias: - profile -KafkaConnection: - fields: - - name: properties.bootstrap.server - field_type: String - required: true - alias: - - kafka.brokers - - name: properties.security.protocol - field_type: String - comments: |- - Security protocol used for RisingWave to communicate with Kafka brokers. Could be - PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. - required: false - - name: properties.ssl.endpoint.identification.algorithm - field_type: String - required: false - - name: properties.ssl.ca.location - field_type: String - comments: Path to CA certificate file for verifying the broker's key. - required: false - - name: properties.ssl.ca.pem - field_type: String - comments: CA certificate string (PEM format) for verifying the broker's key. - required: false - - name: properties.ssl.certificate.location - field_type: String - comments: Path to client's certificate file (PEM). - required: false - - name: properties.ssl.certificate.pem - field_type: String - comments: Client's public key string (PEM format) used for authentication. - required: false - - name: properties.ssl.key.location - field_type: String - comments: Path to client's private key file (PEM). - required: false - - name: properties.ssl.key.pem - field_type: String - comments: Client's private key string (PEM format) used for authentication. - required: false - - name: properties.ssl.key.password - field_type: String - comments: Passphrase of client's private key. - required: false - - name: properties.sasl.mechanism - field_type: String - comments: SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM, GSSAPI, and AWS_MSK_IAM. - required: false - - name: properties.sasl.username - field_type: String - comments: SASL username for SASL/PLAIN and SASL/SCRAM. - required: false - - name: properties.sasl.password - field_type: String - comments: SASL password for SASL/PLAIN and SASL/SCRAM. - required: false - - name: properties.sasl.kerberos.service.name - field_type: String - comments: Kafka server's Kerberos principal name under SASL/GSSAPI, not including /hostname@REALM. - required: false - - name: properties.sasl.kerberos.keytab - field_type: String - comments: Path to client's Kerberos keytab file under SASL/GSSAPI. - required: false - - name: properties.sasl.kerberos.principal - field_type: String - comments: Client's Kerberos principal name under SASL/GSSAPI. - required: false - - name: properties.sasl.kerberos.kinit.cmd - field_type: String - comments: Shell command to refresh or acquire the client's Kerberos ticket under SASL/GSSAPI. - required: false - - name: properties.sasl.kerberos.min.time.before.relogin - field_type: String - comments: Minimum time in milliseconds between key refresh attempts under SASL/GSSAPI. - required: false - - name: properties.sasl.oauthbearer.config - field_type: String - comments: Configurations for SASL/OAUTHBEARER. - required: false KinesisSinkConfig: fields: - name: stream From f115a0c1ac01c3ec4bb903969e8cfda28f2294ed Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 25 Oct 2024 10:40:05 +0800 Subject: [PATCH 13/25] use kafka connection as hashkey --- src/connector/src/connector_common/common.rs | 2 +- .../src/source/kafka/enumerator/client.rs | 19 ++++++++----------- src/meta/src/stream/source_manager.rs | 16 ++++++++++------ 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 7c3c273e99ac..34d151d2f87e 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -162,7 +162,7 @@ impl AwsAuthProps { } #[serde_as] -#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash)] +#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)] pub struct KafkaConnection { #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")] pub brokers: String, diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index af202b76ee9b..05ea7df51aa9 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -29,12 +29,13 @@ use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::{ - KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, + KafkaConnection, KafkaContextCommon, KafkaProperties, RwConsumerContext, KAFKA_ISOLATION_LEVEL, }; use crate::source::SourceEnumeratorContextRef; -pub static SHARED_KAFKA_CLIENT: LazyLock>> = - LazyLock::new(|| tokio::sync::Mutex::new(HashMap::new())); +pub static SHARED_KAFKA_CLIENT: LazyLock< + tokio::sync::Mutex>, +> = LazyLock::new(|| tokio::sync::Mutex::new(HashMap::new())); pub struct SharedKafkaItem { pub client: Arc>, @@ -78,8 +79,6 @@ impl SplitEnumerator for KafkaSplitEnumerator { let common_props = &properties.common; let broker_address = properties.connection.brokers.clone(); - - let connection_hash = properties.connection.get_hash(); let broker_rewrite_map = properties.privatelink_common.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); @@ -107,10 +106,9 @@ impl SplitEnumerator for KafkaSplitEnumerator { let kafka_client: Arc>; let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await; - if let Some(item) = shared_client_guard.get_mut(&connection_hash) { + if let Some(item) = shared_client_guard.get_mut(&properties.connection) { tracing::info!( - "reusing kafka client for connection hash {}, to broker {}", - connection_hash, + "reusing kafka client for connection to broker {}", broker_address ); kafka_client = item.client.clone(); @@ -147,13 +145,12 @@ impl SplitEnumerator for KafkaSplitEnumerator { kafka_client = Arc::new(client); tracing::debug!( - "created kafka client for connection hash {} to broker {}", - connection_hash, + "created kafka client for connection to broker {}", broker_address ); let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await; shared_client_guard.insert( - connection_hash, + properties.connection, SharedKafkaItem { client: kafka_client.clone(), ref_count: 1, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 74098e7ebdd7..49c5904b078c 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -23,6 +23,7 @@ use std::time::Duration; use anyhow::Context; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; +use risingwave_connector::connector_common::KafkaConnection; use risingwave_connector::error::ConnectorResult; use risingwave_connector::source::kafka::SHARED_KAFKA_CLIENT; use risingwave_connector::source::{ @@ -111,7 +112,7 @@ pub async fn create_source_worker_handle( let current_splits_ref = splits.clone(); let connector_properties = extract_prop_from_new_source(source)?; - let share_client_entry = get_kafka_connection_hash(&connector_properties); + let share_client_entry = get_kafka_connection(&connector_properties); let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { @@ -266,7 +267,7 @@ pub struct ConnectorSourceWorkerHandle { sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, enable_scale_in: bool, - pub share_client_entry: Option, + pub share_client_entry: Option, } impl ConnectorSourceWorkerHandle { @@ -1041,7 +1042,10 @@ impl SourceManager { if let Some(item) = share_client_guard.get_mut(&entry) { item.ref_count -= 1; if item.ref_count == 0 { - tracing::info!("removing shared kafka client entry {}", entry); + tracing::info!( + "removing shared kafka client entry to broker {:?}", + entry.brokers + ); share_client_guard.remove(&entry); } } @@ -1064,7 +1068,7 @@ impl SourceManager { let connector_properties = extract_prop_from_existing_source(&source)?; - let share_client_entry = get_kafka_connection_hash(&connector_properties); + let share_client_entry = get_kafka_connection(&connector_properties); let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); @@ -1197,11 +1201,11 @@ pub fn build_actor_split_impls( .collect() } -fn get_kafka_connection_hash(connector_properties: &ConnectorProperties) -> Option { +fn get_kafka_connection(connector_properties: &ConnectorProperties) -> Option { { // for kafka source: get the hash of connection props as shared source entry (on meta) if let ConnectorProperties::Kafka(kafka_props) = connector_properties { - return Some(kafka_props.connection.get_hash()); + return Some(kafka_props.connection.clone()); } None } From d128644b81174d46bcacb10e5684a0070ef8f720 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 25 Oct 2024 17:38:12 +0800 Subject: [PATCH 14/25] use moka --- Cargo.lock | 116 +++------------- src/connector/Cargo.toml | 2 +- .../src/source/kafka/enumerator/client.rs | 129 ++++++++++-------- src/meta/Cargo.toml | 1 + src/meta/src/stream/source_manager.rs | 52 +++++-- 5 files changed, 136 insertions(+), 164 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4e315686fde0..dbde7ed89aa3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1040,7 +1040,7 @@ dependencies = [ "async-channel 2.2.1", "async-executor", "async-io", - "async-lock 3.4.0", + "async-lock", "blocking", "futures-lite", "once_cell", @@ -1053,7 +1053,7 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6baa8f0178795da0e71bc42c9e5d13261aac7ee549853162e66a241ba17964" dependencies = [ - "async-lock 3.4.0", + "async-lock", "cfg-if", "concurrent-queue", "futures-io", @@ -1066,15 +1066,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "async-lock" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" -dependencies = [ - "event-listener 2.5.3", -] - [[package]] name = "async-lock" version = "3.4.0" @@ -1141,7 +1132,7 @@ dependencies = [ "async-channel 1.9.0", "async-global-executor", "async-io", - "async-lock 3.4.0", + "async-lock", "crossbeam-utils", "futures-channel", "futures-core", @@ -2322,12 +2313,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "bytecount" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce89b21cab1437276d2650d57e971f9d548a2d9037cc231abdc0562b97498ce" - [[package]] name = "bytemuck" version = "1.14.0" @@ -2389,15 +2374,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "camino" -version = "1.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" -dependencies = [ - "serde", -] - [[package]] name = "cap-fs-ext" version = "3.0.0" @@ -2469,28 +2445,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1582e1c9e755dd6ad6b224dcffb135d199399a4568d454bd89fe515ca8425695" -[[package]] -name = "cargo-platform" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24b1f0365a6c6bb4020cd05806fd0d33c44d38046b8bd7f0e40814b9763cabfc" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver 1.0.18", - "serde", - "serde_json", -] - [[package]] name = "cast" version = "0.3.0" @@ -4731,15 +4685,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "error-chain" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" -dependencies = [ - "version_check", -] - [[package]] name = "escape8259" version = "0.5.2" @@ -7610,21 +7555,21 @@ dependencies = [ [[package]] name = "moka" -version = "0.12.0" +version = "0.12.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" dependencies = [ - "async-lock 2.8.0", + "async-lock", "async-trait", "crossbeam-channel", "crossbeam-epoch", "crossbeam-utils", + "event-listener 5.3.1", "futures-util", "once_cell", "parking_lot 0.12.1", "quanta", "rustc_version 0.4.0", - "skeptic", "smallvec", "tagptr", "thiserror", @@ -9688,17 +9633,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "pulldown-cmark" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" -dependencies = [ - "bitflags 2.6.0", - "memchr", - "unicase", -] - [[package]] name = "pulsar" version = "6.3.0" @@ -9808,12 +9742,12 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quanta" -version = "0.11.0" -source = "git+https://github.com/madsim-rs/quanta.git?rev=948bdc3#948bdc3d4cd3fcfe3d52d03dd83deee96d97d770" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" dependencies = [ "crossbeam-utils", "libc", - "mach2", "once_cell", "raw-cpuid", "wasi", @@ -9912,11 +9846,11 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.7.0" +version = "11.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +checksum = "1ab240315c661615f2ee9f0f2cd32d5a7343a84d5ebcccb99d46e6637565e7b0" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.6.0", ] [[package]] @@ -11564,6 +11498,7 @@ dependencies = [ "maplit", "memcomparable", "mime_guess", + "moka", "notify", "num-integer", "num-traits", @@ -13024,9 +12959,6 @@ name = "semver" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" -dependencies = [ - "serde", -] [[package]] name = "semver-parser" @@ -13488,21 +13420,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fed904c7fb2856d868b92464fc8fa597fce366edea1a9cbfaa8cb5fe080bd6d" -[[package]] -name = "skeptic" -version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" -dependencies = [ - "bytecount", - "cargo_metadata", - "error-chain", - "glob", - "pulldown-cmark", - "tempfile", - "walkdir", -] - [[package]] name = "slab" version = "0.4.9" @@ -16612,3 +16529,8 @@ dependencies = [ "libc", "pkg-config", ] + +[[patch.unused]] +name = "quanta" +version = "0.11.0" +source = "git+https://github.com/madsim-rs/quanta.git?rev=948bdc3#948bdc3d4cd3fcfe3d52d03dd83deee96d97d770" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 79764508e22c..7313783651c1 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -71,7 +71,7 @@ jni = { version = "0.21.1", features = ["invocation"] } jsonbb = { workspace = true } jsonwebtoken = "9.2.0" maplit = "1.0.2" -moka = { version = "0.12.0", features = ["future"] } +moka = { version = "0.12.8", features = ["future"] } mongodb = { version = "2.8.2", features = ["tokio-runtime"] } mysql_async = { version = "0.34", default-features = false, features = [ "default", diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 05ea7df51aa9..66714f2d0436 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -18,12 +18,14 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use async_trait::async_trait; +use moka::future::Cache as MokaCache; use prometheus::core::{AtomicI64, GenericGauge}; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; use rdkafka::{Offset, TopicPartitionList}; use risingwave_common::bail; use risingwave_common::metrics::LabelGuardedMetric; +use thiserror_ext::AsReport; use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; @@ -34,9 +36,10 @@ use crate::source::kafka::{ use crate::source::SourceEnumeratorContextRef; pub static SHARED_KAFKA_CLIENT: LazyLock< - tokio::sync::Mutex>, -> = LazyLock::new(|| tokio::sync::Mutex::new(HashMap::new())); + MokaCache>>, +> = LazyLock::new(|| moka::future::Cache::builder().build()); +#[derive(Clone)] pub struct SharedKafkaItem { pub client: Arc>, pub ref_count: i32, @@ -104,59 +107,77 @@ impl SplitEnumerator for KafkaSplitEnumerator { scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset) } - let kafka_client: Arc>; - let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await; - if let Some(item) = shared_client_guard.get_mut(&properties.connection) { - tracing::info!( - "reusing kafka client for connection to broker {}", - broker_address - ); - kafka_client = item.client.clone(); - item.ref_count += 1; - drop(shared_client_guard); - } else { - // drop the guard and acquire a new one to avoid a 10s blocking call - drop(shared_client_guard); - - // don't need kafka metrics from enumerator - let ctx_common = KafkaContextCommon::new( - broker_rewrite_map, - None, - None, - properties.aws_auth_props, - properties.connection.is_aws_msk_iam(), - ) - .await?; - let client_ctx = RwConsumerContext::new(ctx_common); - let client: BaseConsumer = - config.create_with_context(client_ctx).await?; - - // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call - // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either - // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval - // of an initial token to occur. - // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf - if properties.connection.is_aws_msk_iam() { - #[cfg(not(madsim))] - client.poll(Duration::from_secs(10)); // note: this is a blocking call - #[cfg(madsim)] - client.poll(Duration::from_secs(10)).await; - } + let client_arc = SHARED_KAFKA_CLIENT + .entry_by_ref(&properties.connection) + .and_upsert_with(|maybe_item| async { + tracing::info!( + "entry {:?} maybe item: {:?}", + &properties.connection, + maybe_item.is_some() + ); + if let Some(item) = maybe_item { + let arc_item = item.into_value(); + match &arc_item.as_ref() { + Ok(item) => { + tracing::info!( + "reusing source {} shared kafka client to {} (ref count: {})", + &context.info.source_id, + &broker_address, + item.ref_count + 1 + ); + Arc::new(Ok(SharedKafkaItem { + client: item.client.clone(), + ref_count: item.ref_count + 1, + })) + } + Err(_) => arc_item.clone(), + } + } else { + let build_client: ConnectorResult = async { + let ctx_common = KafkaContextCommon::new( + broker_rewrite_map, + None, + None, + properties.aws_auth_props, + properties.connection.is_aws_msk_iam(), + ) + .await?; + let client_ctx = RwConsumerContext::new(ctx_common); + let client: BaseConsumer = + config.create_with_context(client_ctx).await?; + + // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call + // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either + // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval + // of an initial token to occur. + // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf + if properties.connection.is_aws_msk_iam() { + #[cfg(not(madsim))] + client.poll(Duration::from_secs(10)); // note: this is a blocking call + #[cfg(madsim)] + client.poll(Duration::from_secs(10)).await; + } + + Ok(SharedKafkaItem { + client: Arc::new(client), + ref_count: 1, + }) + } + .await; + + Arc::new(build_client) + } + }) + .await + .into_value(); - kafka_client = Arc::new(client); - tracing::debug!( - "created kafka client for connection to broker {}", - broker_address - ); - let mut shared_client_guard = SHARED_KAFKA_CLIENT.lock().await; - shared_client_guard.insert( - properties.connection, - SharedKafkaItem { - client: kafka_client.clone(), - ref_count: 1, - }, - ); - } + let kafka_client: Arc> = match client_arc.as_ref() { + Ok(client) => client.client.clone(), + Err(e) => { + SHARED_KAFKA_CLIENT.remove(&properties.connection).await; + bail!("{}", e.as_report()); + } + }; Ok(Self { context, diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index d3174b7ddfb1..afc5357c7ebd 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -42,6 +42,7 @@ jsonbb = { workspace = true } maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" +moka = { version = "0.12.8", features = ["future"] } notify = { version = "6", default-features = false, features = ["macos_fsevent"] } num-integer = "0.1" num-traits = "0.2" diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 49c5904b078c..48b260e837d9 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -21,11 +21,12 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; +use moka::ops::compute::Op; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; use risingwave_connector::connector_common::KafkaConnection; use risingwave_connector::error::ConnectorResult; -use risingwave_connector::source::kafka::SHARED_KAFKA_CLIENT; +use risingwave_connector::source::kafka::{SharedKafkaItem, SHARED_KAFKA_CLIENT}; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, SplitEnumerator, SplitId, SplitImpl, SplitMetaData, @@ -1037,18 +1038,45 @@ impl SourceManager { for source_id in source_ids { if let Some(handle) = core.managed_sources.remove(&source_id) { handle.handle.abort(); + // remove the shared kafka client entry if it exists if let Some(entry) = handle.share_client_entry { - let mut share_client_guard = SHARED_KAFKA_CLIENT.lock().await; - if let Some(item) = share_client_guard.get_mut(&entry) { - item.ref_count -= 1; - if item.ref_count == 0 { - tracing::info!( - "removing shared kafka client entry to broker {:?}", - entry.brokers - ); - share_client_guard.remove(&entry); - } - } + SHARED_KAFKA_CLIENT + .entry_by_ref(&entry) + .and_compute_with(|maybe_item| async { + if let Some(item) = maybe_item { + match item.into_value().as_ref() { + Ok(item_val) => { + let share_item = SharedKafkaItem { + client: item_val.client.clone(), + ref_count: item_val.ref_count - 1, + }; + tracing::info!( + "drop source {} shared kafka client to {} (ref count: {})", + &source_id, + &entry.brokers, + share_item.ref_count + ); + if share_item.ref_count == 0 { + tracing::info!( + "source shared kafka client to {} is freed, ref count drop to 0.", + &entry.brokers + ); + Op::Remove + } else { + Op::Put(Arc::new(Ok(share_item))) + } + }, + Err(_) => Op::Nop, + } + } else { + tracing::warn!( + "source worker's shared kafka client to {} is not found.", + &entry.brokers + ); + Op::Nop + } + }) + .await; } } } From ae9df4187e5d8a233b3a77d90b5e43cb86c3cb5b Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 25 Oct 2024 18:03:37 +0800 Subject: [PATCH 15/25] fix lint --- src/connector/src/source/kafka/enumerator/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 66714f2d0436..9dc6c0d798ab 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -175,7 +175,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { Ok(client) => client.client.clone(), Err(e) => { SHARED_KAFKA_CLIENT.remove(&properties.connection).await; - bail!("{}", e.as_report()); + bail!("{}", anyhow!(e.as_report())); } }; From 45295bcf3e76418e9c6423434fa00b2a617d01b9 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 25 Oct 2024 18:22:36 +0800 Subject: [PATCH 16/25] fix --- src/connector/src/source/kafka/enumerator/client.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 9dc6c0d798ab..91f893599ba5 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -27,7 +27,7 @@ use risingwave_common::bail; use risingwave_common::metrics::LabelGuardedMetric; use thiserror_ext::AsReport; -use crate::error::ConnectorResult; +use crate::error::{ConnectorError, ConnectorResult}; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::{ @@ -175,7 +175,8 @@ impl SplitEnumerator for KafkaSplitEnumerator { Ok(client) => client.client.clone(), Err(e) => { SHARED_KAFKA_CLIENT.remove(&properties.connection).await; - bail!("{}", anyhow!(e.as_report())); + #[allow(rw::format_error)] + bail!("{}", e.as_report()); } }; From a9e34c7a701716a144e7f70beb23d4b9bf29c121 Mon Sep 17 00:00:00 2001 From: tabversion Date: Fri, 25 Oct 2024 18:33:28 +0800 Subject: [PATCH 17/25] fix --- src/connector/src/source/kafka/enumerator/client.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 91f893599ba5..ceef939965a4 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -27,7 +27,7 @@ use risingwave_common::bail; use risingwave_common::metrics::LabelGuardedMetric; use thiserror_ext::AsReport; -use crate::error::{ConnectorError, ConnectorResult}; +use crate::error::ConnectorResult; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::{ @@ -176,7 +176,9 @@ impl SplitEnumerator for KafkaSplitEnumerator { Err(e) => { SHARED_KAFKA_CLIENT.remove(&properties.connection).await; #[allow(rw::format_error)] - bail!("{}", e.as_report()); + { + bail!("{}", e.as_report()); + } } }; From 725e23c851e2dc3836b53d6c6e30c3e86ef48f25 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 28 Oct 2024 15:02:28 +0800 Subject: [PATCH 18/25] remove get hash func --- src/connector/src/connector_common/common.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 34d151d2f87e..291770711776 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -322,12 +322,6 @@ impl RdKafkaPropertiesCommon { } impl KafkaConnection { - pub fn get_hash(&self) -> u64 { - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - self.hash(&mut hasher); - hasher.finish() - } - pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) { // AWS_MSK_IAM if self.is_aws_msk_iam() { From 832f66fa2374a46f50f0bfd7f305f47e71a840af Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 28 Oct 2024 16:55:05 +0800 Subject: [PATCH 19/25] migrate to Weak --- src/connector/src/connector_common/common.rs | 2 +- src/connector/src/error.rs | 3 + .../src/source/kafka/enumerator/client.rs | 149 ++++++++---------- src/meta/src/stream/source_manager.rs | 59 ------- 4 files changed, 69 insertions(+), 144 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 291770711776..3eaffa93d02a 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; -use std::hash::{Hash, Hasher}; +use std::hash::Hash; use std::io::Write; use std::time::Duration; diff --git a/src/connector/src/error.rs b/src/connector/src/error.rs index cfee9674d1ce..c95d7fc97f4e 100644 --- a/src/connector/src/error.rs +++ b/src/connector/src/error.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use risingwave_common::array::ArrayError; use risingwave_common::error::def_anyhow_newtype; use risingwave_pb::PbFieldNotFound; @@ -29,6 +31,7 @@ def_anyhow_newtype! { // Common errors std::io::Error => transparent, + Arc => transparent, // Fine-grained connector errors AccessError => transparent, diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index ceef939965a4..f9b77a40ec1c 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -12,22 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; -use std::sync::{Arc, LazyLock}; +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, LazyLock, Weak}; use std::time::Duration; use anyhow::{anyhow, Context}; use async_trait::async_trait; use moka::future::Cache as MokaCache; +use moka::ops::compute::{CompResult, Op}; use prometheus::core::{AtomicI64, GenericGauge}; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; -use rdkafka::{Offset, TopicPartitionList}; +use rdkafka::{ClientConfig, Offset, TopicPartitionList}; use risingwave_common::bail; use risingwave_common::metrics::LabelGuardedMetric; -use thiserror_ext::AsReport; -use crate::error::ConnectorResult; +use crate::error::{ConnectorError, ConnectorResult}; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; use crate::source::kafka::{ @@ -35,15 +35,10 @@ use crate::source::kafka::{ }; use crate::source::SourceEnumeratorContextRef; -pub static SHARED_KAFKA_CLIENT: LazyLock< - MokaCache>>, -> = LazyLock::new(|| moka::future::Cache::builder().build()); +type KafkaClientType = BaseConsumer; -#[derive(Clone)] -pub struct SharedKafkaItem { - pub client: Arc>, - pub ref_count: i32, -} +pub static SHARED_KAFKA_CLIENT: LazyLock>> = + LazyLock::new(|| moka::future::Cache::builder().build()); #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum KafkaEnumeratorOffset { @@ -57,7 +52,7 @@ pub struct KafkaSplitEnumerator { context: SourceEnumeratorContextRef, broker_address: String, topic: String, - client: Arc>, + client: Arc, start_offset: KafkaEnumeratorOffset, // maybe used in the future for batch processing @@ -107,86 +102,72 @@ impl SplitEnumerator for KafkaSplitEnumerator { scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset) } - let client_arc = SHARED_KAFKA_CLIENT + async fn build_kafka_client( + config: &ClientConfig, + properties: &KafkaProperties, + rewrite_map: Option>, + ) -> ConnectorResult { + let ctx_common = KafkaContextCommon::new( + rewrite_map, + None, + None, + properties.aws_auth_props.clone(), + properties.connection.is_aws_msk_iam(), + ) + .await?; + let client_ctx = RwConsumerContext::new(ctx_common); + let client: BaseConsumer = + config.create_with_context(client_ctx).await?; + + // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call + // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either + // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval + // of an initial token to occur. + // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf + if properties.connection.is_aws_msk_iam() { + #[cfg(not(madsim))] + client.poll(Duration::from_secs(10)); // note: this is a blocking call + #[cfg(madsim)] + client.poll(Duration::from_secs(10)).await; + } + Ok(client) + } + + let client_weak = match SHARED_KAFKA_CLIENT .entry_by_ref(&properties.connection) - .and_upsert_with(|maybe_item| async { - tracing::info!( - "entry {:?} maybe item: {:?}", - &properties.connection, - maybe_item.is_some() - ); - if let Some(item) = maybe_item { - let arc_item = item.into_value(); - match &arc_item.as_ref() { - Ok(item) => { - tracing::info!( - "reusing source {} shared kafka client to {} (ref count: {})", - &context.info.source_id, - &broker_address, - item.ref_count + 1 - ); - Arc::new(Ok(SharedKafkaItem { - client: item.client.clone(), - ref_count: item.ref_count + 1, - })) - } - Err(_) => arc_item.clone(), - } - } else { - let build_client: ConnectorResult = async { - let ctx_common = KafkaContextCommon::new( - broker_rewrite_map, - None, - None, - properties.aws_auth_props, - properties.connection.is_aws_msk_iam(), - ) - .await?; - let client_ctx = RwConsumerContext::new(ctx_common); - let client: BaseConsumer = - config.create_with_context(client_ctx).await?; - - // Note that before any SASL/OAUTHBEARER broker connection can succeed the application must call - // rd_kafka_oauthbearer_set_token() once – either directly or, more typically, by invoking either - // rd_kafka_poll(), rd_kafka_consumer_poll(), rd_kafka_queue_poll(), etc, in order to cause retrieval - // of an initial token to occur. - // https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a988395722598f63396d7a1bedb22adaf - if properties.connection.is_aws_msk_iam() { - #[cfg(not(madsim))] - client.poll(Duration::from_secs(10)); // note: this is a blocking call - #[cfg(madsim)] - client.poll(Duration::from_secs(10)).await; - } - - Ok(SharedKafkaItem { - client: Arc::new(client), - ref_count: 1, - }) + .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async { + if let Some(entry) = maybe_entry { + let entry_value = entry.into_value(); + if entry_value.upgrade().is_some() { + // return if the client is already built + return Ok(Op::Nop); } - .await; - - Arc::new(build_client) } + let client_arc = Arc::new( + build_kafka_client(&config, &properties, broker_rewrite_map.clone()).await?, + ); + Ok(Op::Put(Arc::downgrade(&client_arc))) }) - .await - .into_value(); - - let kafka_client: Arc> = match client_arc.as_ref() { - Ok(client) => client.client.clone(), - Err(e) => { - SHARED_KAFKA_CLIENT.remove(&properties.connection).await; - #[allow(rw::format_error)] - { - bail!("{}", e.as_report()); - } - } + .await? + { + CompResult::Unchanged(entry) + | CompResult::Inserted(entry) + | CompResult::ReplacedWith(entry) => entry.into_value(), + CompResult::Removed(_) | CompResult::StillNone(_) => unreachable!(), + }; + + let client_arc: Arc; + if let Some(client_arc_upgrade) = client_weak.upgrade() { + client_arc = client_arc_upgrade; + } else { + unreachable!() }; Ok(Self { context, broker_address, topic, - client: kafka_client, + client: client_arc, start_offset: scan_start_offset, stop_offset: KafkaEnumeratorOffset::None, sync_call_timeout: properties.common.sync_call_timeout, diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 48b260e837d9..0a0bc0076593 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -21,12 +21,9 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use moka::ops::compute::Op; use risingwave_common::catalog::TableId; use risingwave_common::metrics::LabelGuardedIntGauge; -use risingwave_connector::connector_common::KafkaConnection; use risingwave_connector::error::ConnectorResult; -use risingwave_connector::source::kafka::{SharedKafkaItem, SHARED_KAFKA_CLIENT}; use risingwave_connector::source::{ ConnectorProperties, SourceEnumeratorContext, SourceEnumeratorInfo, SourceProperties, SplitEnumerator, SplitId, SplitImpl, SplitMetaData, @@ -113,7 +110,6 @@ pub async fn create_source_worker_handle( let current_splits_ref = splits.clone(); let connector_properties = extract_prop_from_new_source(source)?; - let share_client_entry = get_kafka_connection(&connector_properties); let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = dispatch_source_prop!(connector_properties, prop, { @@ -147,7 +143,6 @@ pub async fn create_source_worker_handle( sync_call_tx, splits, enable_scale_in, - share_client_entry, }) } @@ -268,7 +263,6 @@ pub struct ConnectorSourceWorkerHandle { sync_call_tx: UnboundedSender>>, splits: SharedSplitMapRef, enable_scale_in: bool, - pub share_client_entry: Option, } impl ConnectorSourceWorkerHandle { @@ -1038,46 +1032,6 @@ impl SourceManager { for source_id in source_ids { if let Some(handle) = core.managed_sources.remove(&source_id) { handle.handle.abort(); - // remove the shared kafka client entry if it exists - if let Some(entry) = handle.share_client_entry { - SHARED_KAFKA_CLIENT - .entry_by_ref(&entry) - .and_compute_with(|maybe_item| async { - if let Some(item) = maybe_item { - match item.into_value().as_ref() { - Ok(item_val) => { - let share_item = SharedKafkaItem { - client: item_val.client.clone(), - ref_count: item_val.ref_count - 1, - }; - tracing::info!( - "drop source {} shared kafka client to {} (ref count: {})", - &source_id, - &entry.brokers, - share_item.ref_count - ); - if share_item.ref_count == 0 { - tracing::info!( - "source shared kafka client to {} is freed, ref count drop to 0.", - &entry.brokers - ); - Op::Remove - } else { - Op::Put(Arc::new(Ok(share_item))) - } - }, - Err(_) => Op::Nop, - } - } else { - tracing::warn!( - "source worker's shared kafka client to {} is not found.", - &entry.brokers - ); - Op::Nop - } - }) - .await; - } } } } @@ -1096,8 +1050,6 @@ impl SourceManager { let connector_properties = extract_prop_from_existing_source(&source)?; - let share_client_entry = get_kafka_connection(&connector_properties); - let enable_scale_in = connector_properties.enable_split_scale_in(); let (sync_call_tx, sync_call_rx) = tokio::sync::mpsc::unbounded_channel(); let handle = tokio::spawn(async move { @@ -1137,7 +1089,6 @@ impl SourceManager { sync_call_tx, splits, enable_scale_in, - share_client_entry, }, ); Ok(()) @@ -1229,16 +1180,6 @@ pub fn build_actor_split_impls( .collect() } -fn get_kafka_connection(connector_properties: &ConnectorProperties) -> Option { - { - // for kafka source: get the hash of connection props as shared source entry (on meta) - if let ConnectorProperties::Kafka(kafka_props) = connector_properties { - return Some(kafka_props.connection.clone()); - } - None - } -} - #[cfg(test)] mod tests { use std::collections::{BTreeMap, HashMap, HashSet}; From 73f0b7bc90a46af1c425c2c3796971c2dbf33cb0 Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 28 Oct 2024 16:58:36 +0800 Subject: [PATCH 20/25] minor --- src/connector/src/source/kafka/enumerator/client.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index f9b77a40ec1c..1dd9a4f581d8 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -133,36 +133,31 @@ impl SplitEnumerator for KafkaSplitEnumerator { Ok(client) } - let client_weak = match SHARED_KAFKA_CLIENT + let client_arc = match SHARED_KAFKA_CLIENT .entry_by_ref(&properties.connection) .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async { if let Some(entry) = maybe_entry { let entry_value = entry.into_value(); if entry_value.upgrade().is_some() { // return if the client is already built + tracing::info!("reuse existing kafka client for {}", broker_address); return Ok(Op::Nop); } } let client_arc = Arc::new( build_kafka_client(&config, &properties, broker_rewrite_map.clone()).await?, ); + tracing::info!("build new kafka client for {}", broker_address); Ok(Op::Put(Arc::downgrade(&client_arc))) }) .await? { CompResult::Unchanged(entry) | CompResult::Inserted(entry) - | CompResult::ReplacedWith(entry) => entry.into_value(), + | CompResult::ReplacedWith(entry) => entry.into_value().upgrade().unwrap(), CompResult::Removed(_) | CompResult::StillNone(_) => unreachable!(), }; - let client_arc: Arc; - if let Some(client_arc_upgrade) = client_weak.upgrade() { - client_arc = client_arc_upgrade; - } else { - unreachable!() - }; - Ok(Self { context, broker_address, From ac1d63d28f7ac5441d2260324184d13acc0d049f Mon Sep 17 00:00:00 2001 From: tabversion Date: Mon, 28 Oct 2024 17:35:08 +0800 Subject: [PATCH 21/25] fix --- .../src/source/kafka/enumerator/client.rs | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 1dd9a4f581d8..1d7525bc7a61 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -19,7 +19,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use async_trait::async_trait; use moka::future::Cache as MokaCache; -use moka::ops::compute::{CompResult, Op}; +use moka::ops::compute::Op; use prometheus::core::{AtomicI64, GenericGauge}; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; @@ -133,36 +133,33 @@ impl SplitEnumerator for KafkaSplitEnumerator { Ok(client) } - let client_arc = match SHARED_KAFKA_CLIENT + let mut client_arc: Option> = None; + SHARED_KAFKA_CLIENT .entry_by_ref(&properties.connection) .and_try_compute_with::<_, _, ConnectorError>(|maybe_entry| async { if let Some(entry) = maybe_entry { let entry_value = entry.into_value(); - if entry_value.upgrade().is_some() { + if let Some(client) = entry_value.upgrade() { // return if the client is already built tracing::info!("reuse existing kafka client for {}", broker_address); + client_arc = Some(client); return Ok(Op::Nop); } } - let client_arc = Arc::new( + let new_client_arc = Arc::new( build_kafka_client(&config, &properties, broker_rewrite_map.clone()).await?, ); tracing::info!("build new kafka client for {}", broker_address); - Ok(Op::Put(Arc::downgrade(&client_arc))) + client_arc = Some(new_client_arc.clone()); + Ok(Op::Put(Arc::downgrade(&new_client_arc))) }) - .await? - { - CompResult::Unchanged(entry) - | CompResult::Inserted(entry) - | CompResult::ReplacedWith(entry) => entry.into_value().upgrade().unwrap(), - CompResult::Removed(_) | CompResult::StillNone(_) => unreachable!(), - }; + .await?; Ok(Self { context, broker_address, topic, - client: client_arc, + client: client_arc.unwrap(), start_offset: scan_start_offset, stop_offset: KafkaEnumeratorOffset::None, sync_call_timeout: properties.common.sync_call_timeout, From ec4909612d001ae14fcadc3c434760e8ee6c5032 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 29 Oct 2024 15:38:50 +0800 Subject: [PATCH 22/25] test bump quanta to 0.12.3 --- Cargo.lock | 8 +------- Cargo.toml | 8 ++------ 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 08a3b3015939..5d09f70ac019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9770,8 +9770,7 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quanta" version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +source = "git+https://github.com/tabVersion/quanta.git?rev=da62f4f04516fcfc72f686869026bad87b712f37#da62f4f04516fcfc72f686869026bad87b712f37" dependencies = [ "crossbeam-utils", "libc", @@ -16554,8 +16553,3 @@ dependencies = [ "libc", "pkg-config", ] - -[[patch.unused]] -name = "quanta" -version = "0.11.0" -source = "git+https://github.com/madsim-rs/quanta.git?rev=948bdc3#948bdc3d4cd3fcfe3d52d03dd83deee96d97d770" diff --git a/Cargo.toml b/Cargo.toml index da6b0bfa8738..1d22c656feb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,11 +153,7 @@ arrow-udf-flight = "0.4" clap = { version = "4", features = ["cargo", "derive", "env"] } # Use a forked version which removes the dependencies on dynamo db to reduce # compile time and binary size. -deltalake = { version = "0.20.1", features = [ - "s3", - "gcs", - "datafusion", -] } +deltalake = { version = "0.20.1", features = ["s3", "gcs", "datafusion"] } itertools = "0.13.0" jsonbb = "0.1.4" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", rev = "2682b85" } @@ -343,7 +339,7 @@ opt-level = 2 [patch.crates-io] # Patch third-party crates for deterministic simulation. -quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "948bdc3" } +quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "da62f4f04516fcfc72f686869026bad87b712f37" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" } # Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies. # Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`. From 16d8c423c253a485f5f7205a57ed043d3c040881 Mon Sep 17 00:00:00 2001 From: tabVersion Date: Tue, 29 Oct 2024 15:58:42 +0800 Subject: [PATCH 23/25] update patch --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5d09f70ac019..4030ac7873c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9770,7 +9770,7 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quanta" version = "0.12.3" -source = "git+https://github.com/tabVersion/quanta.git?rev=da62f4f04516fcfc72f686869026bad87b712f37#da62f4f04516fcfc72f686869026bad87b712f37" +source = "git+https://github.com/tabVersion/quanta.git?rev=bb6c780894d06c0ec3f487d58c72920665b5cb0a#bb6c780894d06c0ec3f487d58c72920665b5cb0a" dependencies = [ "crossbeam-utils", "libc", diff --git a/Cargo.toml b/Cargo.toml index 1d22c656feb0..14fcd7aa5e5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -339,7 +339,7 @@ opt-level = 2 [patch.crates-io] # Patch third-party crates for deterministic simulation. -quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "da62f4f04516fcfc72f686869026bad87b712f37" } +quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "bb6c780894d06c0ec3f487d58c72920665b5cb0a" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" } # Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies. # Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`. From b3efda6f9fa0fe0be7101706a9eedc68974af397 Mon Sep 17 00:00:00 2001 From: tabversion Date: Wed, 30 Oct 2024 15:05:35 +0800 Subject: [PATCH 24/25] moka 0.12.3 --- src/meta/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index b0c918407b36..b9c2835e76e1 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -42,7 +42,7 @@ jsonbb = { workspace = true } maplit = "1.0.2" memcomparable = { version = "0.2" } mime_guess = "2" -moka = { version = "0.12.8", features = ["future"] } +moka = { version = "0.12.3", features = ["future"] } notify = { version = "7", default-features = false, features = [ "macos_fsevent", ] } From 51eca61c25b1d5d0dfbdcdc5a37a75a518c6ac8a Mon Sep 17 00:00:00 2001 From: tabversion Date: Sat, 2 Nov 2024 21:35:16 +0800 Subject: [PATCH 25/25] switch to madsim repo --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4030ac7873c6..d86e74fac1f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9770,7 +9770,7 @@ checksum = "658fa1faf7a4cc5f057c9ee5ef560f717ad9d8dc66d975267f709624d6e1ab88" [[package]] name = "quanta" version = "0.12.3" -source = "git+https://github.com/tabVersion/quanta.git?rev=bb6c780894d06c0ec3f487d58c72920665b5cb0a#bb6c780894d06c0ec3f487d58c72920665b5cb0a" +source = "git+https://github.com/madsim-rs/quanta.git?rev=ea9ba802327b1d72c4b1c7202c759b0a5243271e#ea9ba802327b1d72c4b1c7202c759b0a5243271e" dependencies = [ "crossbeam-utils", "libc", diff --git a/Cargo.toml b/Cargo.toml index 14fcd7aa5e5f..7788ed36038f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -339,7 +339,7 @@ opt-level = 2 [patch.crates-io] # Patch third-party crates for deterministic simulation. -quanta = { git = "https://github.com/tabVersion/quanta.git", rev = "bb6c780894d06c0ec3f487d58c72920665b5cb0a" } +quanta = { git = "https://github.com/madsim-rs/quanta.git", rev = "ea9ba802327b1d72c4b1c7202c759b0a5243271e" } getrandom = { git = "https://github.com/madsim-rs/getrandom.git", rev = "e79a7ae" } # Don't patch `tokio-stream`, but only use the madsim version for **direct** dependencies. # Imagine an unpatched dependency depends on the original `tokio` and the patched `tokio-stream`.