From fd8c35d263c6e4cf417fda4b9651b1de4ed0cbbd Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 29 Dec 2023 17:26:43 +0800 Subject: [PATCH] fix: some known issues --- src/log-store/src/kafka/client_manager.rs | 19 ++++++++------- src/log-store/src/kafka/log_store.rs | 28 ++++++++++++++--------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index e272840201bb..6c61ea1b75fe 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; -use dashmap::mapref::entry::Entry as DashMapEntry; -use dashmap::DashMap; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; +use tokio::sync::Mutex as TokioMutex; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -67,7 +68,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: DashMap, + client_pool: TokioMutex>, } impl ClientManager { @@ -91,20 +92,18 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: DashMap::new(), + client_pool: TokioMutex::new(HashMap::new()), }) } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - match self.client_pool.entry(topic.to_string()) { - DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), - DashMapEntry::Vacant(entry) => { - let topic_client = self.try_create_client(topic).await?; - Ok(entry.insert(topic_client).clone()) - } + let mut client_pool = self.client_pool.lock().await; + if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) { + entry.insert(self.try_create_client(topic).await?); } + Ok(client_pool[topic].clone()) } async fn try_create_client(&self, topic: &Topic) -> Result { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 36c86987041b..fadb24038c6e 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -101,12 +101,10 @@ impl LogStore for KafkaLogStore { } // Builds a record from entries belong to a region and produces them to kafka server. - let region_ids = producers.keys().cloned().collect::>(); - - let tasks = producers - .into_values() - .map(|producer| producer.produce(&self.client_manager)) - .collect::>(); + let (region_ids, tasks): (Vec<_>, Vec<_>) = producers + .into_iter() + .map(|(id, producer)| (id, producer.produce(&self.client_manager))) + .unzip(); // Each produce operation returns a kafka offset of the produced record. // The offsets are then converted to entry ids. let entry_ids = futures::future::try_join_all(tasks) @@ -114,11 +112,19 @@ impl LogStore for KafkaLogStore { .into_iter() .map(TryInto::try_into) .collect::>>()?; - debug!("The entries are appended at offsets {:?}", entry_ids); + let last_entry_ids = region_ids + .into_iter() + .zip(entry_ids) + .collect::>(); - Ok(AppendBatchResponse { - last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), - }) + #[cfg(debug)] + { + for (region_id, offset) in last_entry_ids.iter() { + debug!("Entries for region {region_id} are appended at the start offset {offset}"); + } + } + + Ok(AppendBatchResponse { last_entry_ids }) } /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids @@ -186,7 +192,7 @@ impl LogStore for KafkaLogStore { record_offset, ns_clone, high_watermark ); - // Ignores the noop record. + // Ignores noop records. if record.record.value.is_none() { continue; }