diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index e272840201bb..8878a1b2bebe 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,17 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; -use dashmap::mapref::entry::Entry as DashMapEntry; -use dashmap::DashMap; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; +use tokio::sync::Mutex; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -67,7 +67,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: DashMap, + client_pool: Mutex>, } impl ClientManager { @@ -91,18 +91,20 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: DashMap::new(), + client_pool: Mutex::new(HashMap::new()), }) } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - match self.client_pool.entry(topic.to_string()) { - DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), - DashMapEntry::Vacant(entry) => { - let topic_client = self.try_create_client(topic).await?; - Ok(entry.insert(topic_client).clone()) + let mut client_pool = self.client_pool.lock().await; + match client_pool.get(topic) { + Some(client) => Ok(client.clone()), + None => { + let client = self.try_create_client(topic).await?; + client_pool.insert(topic.to_string(), client.clone()); + Ok(client) } } } diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 36c86987041b..20bcd4e7cf50 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -100,25 +100,24 @@ impl LogStore for KafkaLogStore { .push(entry); } - // Builds a record from entries belong to a region and produces them to kafka server. - let region_ids = producers.keys().cloned().collect::>(); - - let tasks = producers - .into_values() - .map(|producer| producer.produce(&self.client_manager)) - .collect::>(); - // Each produce operation returns a kafka offset of the produced record. - // The offsets are then converted to entry ids. - let entry_ids = futures::future::try_join_all(tasks) - .await? - .into_iter() - .map(TryInto::try_into) - .collect::>>()?; - debug!("The entries are appended at offsets {:?}", entry_ids); - - Ok(AppendBatchResponse { - last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), - }) + // Produces entries for each region and gets the offset those entries written to. + // The returned offset is then converted into an entry id. + let last_entry_ids = futures::future::try_join_all(producers.into_iter().map( + |(region_id, producer)| async move { + let entry_id = producer + .produce(&self.client_manager) + .await + .map(TryInto::try_into)??; + Ok((region_id, entry_id)) + }, + )) + .await? + .into_iter() + .collect::>(); + + debug!("Append batch result: {:?}", last_entry_ids); + + Ok(AppendBatchResponse { last_entry_ids }) } /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids @@ -186,7 +185,7 @@ impl LogStore for KafkaLogStore { record_offset, ns_clone, high_watermark ); - // Ignores the noop record. + // Ignores noop records. if record.record.value.is_none() { continue; }