Skip to content

Commit

Permalink
fix: some known issues
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 29, 2023
1 parent 7551432 commit 521c53f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
19 changes: 9 additions & 10 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::Mutex as TokioMutex;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};

Expand Down Expand Up @@ -67,7 +68,7 @@ pub(crate) struct ClientManager {
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: DashMap<Topic, Client>,
client_pool: TokioMutex<HashMap<Topic, Client>>,
}

impl ClientManager {
Expand All @@ -91,20 +92,18 @@ impl ClientManager {
Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: DashMap::new(),
client_pool: TokioMutex::new(HashMap::new()),
})
}

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
match self.client_pool.entry(topic.to_string()) {
DashMapEntry::Occupied(entry) => Ok(entry.get().clone()),
DashMapEntry::Vacant(entry) => {
let topic_client = self.try_create_client(topic).await?;
Ok(entry.insert(topic_client).clone())
}
let mut client_pool = self.client_pool.lock().await;
if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) {
entry.insert(self.try_create_client(topic).await?);
}
Ok(client_pool[topic].clone())
}

async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
Expand Down
28 changes: 17 additions & 11 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,24 +101,30 @@ impl LogStore for KafkaLogStore {
}

// Builds a record from entries belong to a region and produces them to kafka server.
let region_ids = producers.keys().cloned().collect::<Vec<_>>();

let tasks = producers
.into_values()
.map(|producer| producer.produce(&self.client_manager))
.collect::<Vec<_>>();
let (region_ids, tasks): (Vec<_>, Vec<_>) = producers
.into_iter()
.map(|(id, producer)| (id, producer.produce(&self.client_manager)))
.unzip();
// Each produce operation returns a kafka offset of the produced record.
// The offsets are then converted to entry ids.
let entry_ids = futures::future::try_join_all(tasks)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
debug!("The entries are appended at offsets {:?}", entry_ids);
let last_entry_ids = region_ids
.into_iter()
.zip(entry_ids)
.collect::<HashMap<_, _>>();

Ok(AppendBatchResponse {
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
})
#[cfg(debug)]
{
for (region_id, offset) in last_entry_ids.iter() {
debug!("Entries for region {region_id} are appended at the start offset {offset}");
}
}

Ok(AppendBatchResponse { last_entry_ids })
}

/// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids
Expand Down Expand Up @@ -186,7 +192,7 @@ impl LogStore for KafkaLogStore {
record_offset, ns_clone, high_watermark
);

// Ignores the noop record.
// Ignores noop records.
if record.record.value.is_none() {
continue;
}
Expand Down

0 comments on commit 521c53f

Please sign in to comment.