From 521c53f24ef0ae677f1a02f9543dc62f4624b6df Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 29 Dec 2023 17:26:43 +0800 Subject: [PATCH 1/4] fix: some known issues --- src/log-store/src/kafka/client_manager.rs | 19 ++++++++------- src/log-store/src/kafka/log_store.rs | 28 ++++++++++++++--------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index e272840201bb..6c61ea1b75fe 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; -use dashmap::mapref::entry::Entry as DashMapEntry; -use dashmap::DashMap; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; +use tokio::sync::Mutex as TokioMutex; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -67,7 +68,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: DashMap, + client_pool: TokioMutex>, } impl ClientManager { @@ -91,20 +92,18 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: DashMap::new(), + client_pool: TokioMutex::new(HashMap::new()), }) } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - match self.client_pool.entry(topic.to_string()) { - DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), - DashMapEntry::Vacant(entry) => { - let topic_client = self.try_create_client(topic).await?; - Ok(entry.insert(topic_client).clone()) - } + let mut client_pool = self.client_pool.lock().await; + if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) { + entry.insert(self.try_create_client(topic).await?); } + Ok(client_pool[topic].clone()) } async fn try_create_client(&self, topic: &Topic) -> Result { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 36c86987041b..fadb24038c6e 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -101,12 +101,10 @@ impl LogStore for KafkaLogStore { } // Builds a record from entries belong to a region and produces them to kafka server. - let region_ids = producers.keys().cloned().collect::>(); - - let tasks = producers - .into_values() - .map(|producer| producer.produce(&self.client_manager)) - .collect::>(); + let (region_ids, tasks): (Vec<_>, Vec<_>) = producers + .into_iter() + .map(|(id, producer)| (id, producer.produce(&self.client_manager))) + .unzip(); // Each produce operation returns a kafka offset of the produced record. // The offsets are then converted to entry ids. let entry_ids = futures::future::try_join_all(tasks) @@ -114,11 +112,19 @@ impl LogStore for KafkaLogStore { .into_iter() .map(TryInto::try_into) .collect::>>()?; - debug!("The entries are appended at offsets {:?}", entry_ids); + let last_entry_ids = region_ids + .into_iter() + .zip(entry_ids) + .collect::>(); - Ok(AppendBatchResponse { - last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), - }) + #[cfg(debug)] + { + for (region_id, offset) in last_entry_ids.iter() { + debug!("Entries for region {region_id} are appended at the start offset {offset}"); + } + } + + Ok(AppendBatchResponse { last_entry_ids }) } /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids @@ -186,7 +192,7 @@ impl LogStore for KafkaLogStore { record_offset, ns_clone, high_watermark ); - // Ignores the noop record. + // Ignores noop records. if record.record.value.is_none() { continue; } From 3bb786078a02c27fb6895df6a21a2864da57ffda Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 29 Dec 2023 18:13:33 +0800 Subject: [PATCH 2/4] fix: CR --- src/log-store/src/kafka/client_manager.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 6c61ea1b75fe..3a1273ccc679 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -23,7 +23,7 @@ use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; -use tokio::sync::Mutex as TokioMutex; +use tokio::sync::Mutex; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -68,7 +68,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: TokioMutex>, + client_pool: Mutex>, } impl ClientManager { @@ -92,7 +92,7 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: TokioMutex::new(HashMap::new()), + client_pool: Mutex::new(HashMap::new()), }) } From fd4e506cc6a7cc4094ba3505e24075dcf186d621 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 29 Dec 2023 21:21:26 +0800 Subject: [PATCH 3/4] fix: CR --- src/log-store/src/kafka/client_manager.rs | 11 ++++--- src/log-store/src/kafka/log_store.rs | 39 ++++++++++------------- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 3a1273ccc679..8878a1b2bebe 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -100,10 +99,14 @@ impl ClientManager { /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { let mut client_pool = self.client_pool.lock().await; - if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) { - entry.insert(self.try_create_client(topic).await?); + match client_pool.get(topic) { + Some(client) => Ok(client.clone()), + None => { + let client = self.try_create_client(topic).await?; + client_pool.insert(topic.to_string(), client.clone()); + Ok(client) + } } - Ok(client_pool[topic].clone()) } async fn try_create_client(&self, topic: &Topic) -> Result { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index fadb24038c6e..20bcd4e7cf50 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -100,29 +100,22 @@ impl LogStore for KafkaLogStore { .push(entry); } - // Builds a record from entries belong to a region and produces them to kafka server. - let (region_ids, tasks): (Vec<_>, Vec<_>) = producers - .into_iter() - .map(|(id, producer)| (id, producer.produce(&self.client_manager))) - .unzip(); - // Each produce operation returns a kafka offset of the produced record. - // The offsets are then converted to entry ids. - let entry_ids = futures::future::try_join_all(tasks) - .await? - .into_iter() - .map(TryInto::try_into) - .collect::>>()?; - let last_entry_ids = region_ids - .into_iter() - .zip(entry_ids) - .collect::>(); - - #[cfg(debug)] - { - for (region_id, offset) in last_entry_ids.iter() { - debug!("Entries for region {region_id} are appended at the start offset {offset}"); - } - } + // Produces entries for each region and gets the offset those entries written to. + // The returned offset is then converted into an entry id. + let last_entry_ids = futures::future::try_join_all(producers.into_iter().map( + |(region_id, producer)| async move { + let entry_id = producer + .produce(&self.client_manager) + .await + .map(TryInto::try_into)??; + Ok((region_id, entry_id)) + }, + )) + .await? + .into_iter() + .collect::>(); + + debug!("Append batch result: {:?}", last_entry_ids); Ok(AppendBatchResponse { last_entry_ids }) } From b2a08029ae81ec4f0d023304a185a75d61d49484 Mon Sep 17 00:00:00 2001 From: niebayes Date: Sat, 30 Dec 2023 18:46:45 +0800 Subject: [PATCH 4/4] chore: replace Mutex with RwLock --- src/log-store/src/kafka/client_manager.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 8878a1b2bebe..cd2f705c4db9 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -22,7 +22,7 @@ use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -67,7 +67,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: Mutex>, + client_pool: RwLock>, } impl ClientManager { @@ -91,19 +91,27 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: Mutex::new(HashMap::new()), + client_pool: RwLock::new(HashMap::new()), }) } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - let mut client_pool = self.client_pool.lock().await; + let client_pool = self.client_pool.read().await; + if let Some(client) = client_pool.get(topic) { + return Ok(client.clone()); + } + // Manullay releases the read lock. + drop(client_pool); + + // Acquires the write lock. + let mut client_pool = self.client_pool.write().await; match client_pool.get(topic) { Some(client) => Ok(client.clone()), None => { let client = self.try_create_client(topic).await?; - client_pool.insert(topic.to_string(), client.clone()); + client_pool.insert(topic.clone(), client.clone()); Ok(client) } }