Skip to content

Commit

Permalink
fix(remote_wal): some known issues (#3052)
Browse files Browse the repository at this point in the history
* fix: some known issues

* fix: CR

* fix: CR
  • Loading branch information
niebayes authored Dec 29, 2023
1 parent 41e51d4 commit 475a9b6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 29 deletions.
20 changes: 11 additions & 9 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::Mutex;

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};

Expand Down Expand Up @@ -67,7 +67,7 @@ pub(crate) struct ClientManager {
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: DashMap<Topic, Client>,
client_pool: Mutex<HashMap<Topic, Client>>,
}

impl ClientManager {
Expand All @@ -91,18 +91,20 @@ impl ClientManager {
Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: DashMap::new(),
client_pool: Mutex::new(HashMap::new()),
})
}

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
match self.client_pool.entry(topic.to_string()) {
DashMapEntry::Occupied(entry) => Ok(entry.get().clone()),
DashMapEntry::Vacant(entry) => {
let topic_client = self.try_create_client(topic).await?;
Ok(entry.insert(topic_client).clone())
let mut client_pool = self.client_pool.lock().await;
match client_pool.get(topic) {
Some(client) => Ok(client.clone()),
None => {
let client = self.try_create_client(topic).await?;
client_pool.insert(topic.to_string(), client.clone());
Ok(client)
}
}
}
Expand Down
39 changes: 19 additions & 20 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,24 @@ impl LogStore for KafkaLogStore {
.push(entry);
}

// Builds a record from entries belong to a region and produces them to kafka server.
let region_ids = producers.keys().cloned().collect::<Vec<_>>();

let tasks = producers
.into_values()
.map(|producer| producer.produce(&self.client_manager))
.collect::<Vec<_>>();
// Each produce operation returns a kafka offset of the produced record.
// The offsets are then converted to entry ids.
let entry_ids = futures::future::try_join_all(tasks)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
debug!("The entries are appended at offsets {:?}", entry_ids);

Ok(AppendBatchResponse {
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
})
// Produces entries for each region and gets the offset those entries written to.
// The returned offset is then converted into an entry id.
let last_entry_ids = futures::future::try_join_all(producers.into_iter().map(
|(region_id, producer)| async move {
let entry_id = producer
.produce(&self.client_manager)
.await
.map(TryInto::try_into)??;
Ok((region_id, entry_id))
},
))
.await?
.into_iter()
.collect::<HashMap<_, _>>();

debug!("Append batch result: {:?}", last_entry_ids);

Ok(AppendBatchResponse { last_entry_ids })
}

/// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids
Expand Down Expand Up @@ -186,7 +185,7 @@ impl LogStore for KafkaLogStore {
record_offset, ns_clone, high_watermark
);

// Ignores the noop record.
// Ignores noop records.
if record.record.value.is_none() {
continue;
}
Expand Down

0 comments on commit 475a9b6

Please sign in to comment.