diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 1e49f977de4d..72d0b839a755 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; -use common_telemetry::debug; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; -use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock}; +use tokio::sync::RwLock; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -69,7 +67,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: TokioMutex>, + client_pool: RwLock>, } impl ClientManager { @@ -90,28 +88,36 @@ impl ClientManager { broker_endpoints: config.broker_endpoints.clone(), })?; - debug!("Created a ClientManager"); - Ok(Self { config: config.clone(), client_factory: client, - client_pool: TokioMutex::new(HashMap::new()), + client_pool: RwLock::new(HashMap::new()), }) } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - let mut client_pool = self.client_pool.lock().await; - if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) { - entry.insert(self.try_create_client(topic).await?); + let client_pool = self.client_pool.read().await; + if let Some(client) = client_pool.get(topic) { + return Ok(client.clone()); + } + // Manullay releases the read lock. + drop(client_pool); + + // Acquires the write lock. + let mut client_pool = self.client_pool.write().await; + match client_pool.get(topic) { + Some(client) => Ok(client.clone()), + None => { + let client = self.try_create_client(topic).await?; + client_pool.insert(topic.clone(), client.clone()); + Ok(client) + } } - Ok(client_pool[topic].clone()) } async fn try_create_client(&self, topic: &Topic) -> Result { - debug!("Try to create client for topic {}", topic); - // Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error. // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start. // The reconnecting won't stop until succeed or a different error returns. @@ -125,8 +131,6 @@ impl ClientManager { }) .map(Arc::new)?; - debug!("Created a client for topic {}", topic); - Ok(Client::new(raw_client, &self.config)) } } @@ -136,37 +140,24 @@ mod tests { use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY; use super::*; + use crate::get_broker_endpoints_from_env; + use crate::test_util::kafka::create_topics; use crate::test_util::kafka::topic_builder::{Affix, TopicBuilder}; /// Checks clients for the given topics are created. async fn ensure_topics_exist(topics: &[Topic], client_manager: &ClientManager) { - let client_pool = client_manager.client_pool.lock().await; + let client_pool = client_manager.client_pool.read().await; let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic)); assert_eq!(all_exist, true); } - /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly. - #[tokio::test] - async fn test_sequential() { - let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) - .unwrap() - .split(',') - .map(ToString::to_string) - .collect::>(); - if broker_endpoints.is_empty() { - return; - } - - let client = ClientBuilder::new(broker_endpoints.clone()) - .build() - .await - .unwrap(); - let ctrl_client = client.controller_client().unwrap(); - + async fn test_which(test_name: &str) { + // Creates a collection of topics in Kafka. + let broker_endpoints = get_broker_endpoints_from_env!(BROKER_ENDPOINTS_KEY); let topic_builder = TopicBuilder::default() - .with_prefix(Affix::Fixed("test_sequential".to_string())) + .with_prefix(Affix::Fixed(test_name.to_string())) .with_suffix(Affix::TimeNow); - + let topics = create_topics(256, topic_builder, &broker_endpoints).await; let config = KafkaConfig { broker_endpoints, @@ -174,36 +165,36 @@ mod tests { }; let manager = ClientManager::try_new(&config).await.unwrap(); - // Constructs a collection of mock topics. - let num_topics = 256; - let topics = (0..num_topics) - .map(|i| format!("topic_{i}")) - .collect::>(); - - // Gets all clients sequentially. - for topic in topics.iter() { - manager.get_or_insert(topic).await.unwrap(); + match test_name { + "test_sequential" => { + // Gets all clients sequentially. + for topic in topics.iter() { + manager.get_or_insert(topic).await.unwrap(); + } + } + "test_parallel" => { + // Gets all clients in parallel. + let tasks = topics + .iter() + .map(|topic| manager.get_or_insert(topic)) + .collect::>(); + futures::future::try_join_all(tasks).await.unwrap(); + } + _ => unreachable!(), } + ensure_topics_exist(&topics, &manager).await; } + /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly. + #[tokio::test] + async fn test_sequential() { + test_which("test_sequential").await; + } + /// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly. #[tokio::test] async fn test_parallel() { - let manager = ClientManager::try_new(&config).await.unwrap(); - - // Constructs a collection of mock topics. - let num_topics = 256; - let topics = (0..num_topics) - .map(|i| format!("topic_{i}")) - .collect::>(); - - // Gets all clients in parallel. - let tasks = topics - .iter() - .map(|topic| manager.get_or_insert(topic)) - .collect::>(); - futures::future::try_join_all(tasks).await.unwrap(); - ensure_topics_exist(&topics, &manager).await; + test_which("test_parallel").await; } } diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index f0170245e5e2..d78f846f9277 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -16,7 +16,6 @@ use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, WalOptions}; -use common_telemetry::{debug, warn}; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::client::partition::OffsetAt; diff --git a/src/log-store/src/test_util/kafka.rs b/src/log-store/src/test_util/kafka.rs index b66e692836d7..f2af612e1f58 100644 --- a/src/log-store/src/test_util/kafka.rs +++ b/src/log-store/src/test_util/kafka.rs @@ -16,12 +16,48 @@ pub mod entry_builder; pub mod topic_builder; use common_config::wal::KafkaWalTopic as Topic; +use rskafka::client::ClientBuilder; pub use crate::test_util::kafka::entry_builder::EntryBuilder; pub use crate::test_util::kafka::topic_builder::TopicBuilder; +/// Gets broker endpoints from environment variables with the given key. +#[macro_export] +macro_rules! get_broker_endpoints_from_env { + ($key:expr) => {{ + let broker_endpoints = std::env::var($key) + .unwrap() + .split(',') + .map(ToString::to_string) + .collect::>(); + assert!(!broker_endpoints.is_empty()); + broker_endpoints + }}; +} + /// Creates `num_topiocs` number of topics with the given TopicBuilder. -/// Requests for creating these topics on the Kafka cluster if the `broker_endpoints` is not empty. -pub fn create_topics(num_topics: usize, builder: TopicBuilder, broker_endpoints: Vec) -> Vec { - +/// Requests for creating these topics on the Kafka cluster. +pub async fn create_topics( + num_topics: usize, + mut builder: TopicBuilder, + broker_endpoints: &[String], +) -> Vec { + assert!(!broker_endpoints.is_empty()); + + let client = ClientBuilder::new(broker_endpoints.to_vec()) + .build() + .await + .unwrap(); + let ctrl_client = client.controller_client().unwrap(); + + let (topics, tasks): (Vec<_>, Vec<_>) = (0..num_topics) + .map(|i| { + let topic = builder.build(&format!("topic_{i}")); + let task = ctrl_client.create_topic(topic.clone(), 1, 1, 500); + (topic, task) + }) + .unzip(); + futures::future::try_join_all(tasks).await.unwrap(); + + topics } diff --git a/src/log-store/src/test_util/kafka/topic_builder.rs b/src/log-store/src/test_util/kafka/topic_builder.rs index 485b846cc6d4..be51d8c10f49 100644 --- a/src/log-store/src/test_util/kafka/topic_builder.rs +++ b/src/log-store/src/test_util/kafka/topic_builder.rs @@ -32,7 +32,7 @@ impl ToString for Affix { fn to_string(&self) -> String { match self { Affix::Fixed(s) => s.to_string(), - Affix::TimeNow => chrono::Local::now().to_string(), + Affix::TimeNow => chrono::Local::now().timestamp_millis().to_string(), Affix::Nothing => String::default(), } } @@ -70,13 +70,13 @@ impl TopicBuilder { } /// Builds a topic by inserting a prefix and a suffix into the given topic. - pub fn build(&mut self, topic: &Topic) -> Topic { + pub fn build(&mut self, body: &Topic) -> Topic { const ITERS: usize = 24; for _ in 0..ITERS { let topic = format!( "{}_{}_{}", self.prefix.to_string(), - topic, + body, self.suffix.to_string() ); if !self.created.contains(&topic) {