Skip to content

Commit

Permalink
chore: replace Mutex with RwLock
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 30, 2023
1 parent fd4e506 commit b2a0802
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};

Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) struct ClientManager {
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: Mutex<HashMap<Topic, Client>>,
client_pool: RwLock<HashMap<Topic, Client>>,
}

impl ClientManager {
Expand All @@ -91,19 +91,27 @@ impl ClientManager {
Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: Mutex::new(HashMap::new()),
client_pool: RwLock::new(HashMap::new()),
})
}

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
let mut client_pool = self.client_pool.lock().await;
let client_pool = self.client_pool.read().await;
if let Some(client) = client_pool.get(topic) {
return Ok(client.clone());
}
// Manullay releases the read lock.
drop(client_pool);

// Acquires the write lock.
let mut client_pool = self.client_pool.write().await;
match client_pool.get(topic) {
Some(client) => Ok(client.clone()),
None => {
let client = self.try_create_client(topic).await?;
client_pool.insert(topic.to_string(), client.clone());
client_pool.insert(topic.clone(), client.clone());
Ok(client)
}
}
Expand Down

0 comments on commit b2a0802

Please sign in to comment.