Skip to content

Commit

Permalink
fix: CR
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 29, 2023
1 parent 3bb7860 commit fd4e506
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 27 deletions.
11 changes: 7 additions & 4 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -100,10 +99,14 @@ impl ClientManager {
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
let mut client_pool = self.client_pool.lock().await;
if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) {
entry.insert(self.try_create_client(topic).await?);
match client_pool.get(topic) {
Some(client) => Ok(client.clone()),
None => {
let client = self.try_create_client(topic).await?;
client_pool.insert(topic.to_string(), client.clone());
Ok(client)
}
}
Ok(client_pool[topic].clone())
}

async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
Expand Down
39 changes: 16 additions & 23 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,22 @@ impl LogStore for KafkaLogStore {
.push(entry);
}

// Builds a record from entries belong to a region and produces them to kafka server.
let (region_ids, tasks): (Vec<_>, Vec<_>) = producers
.into_iter()
.map(|(id, producer)| (id, producer.produce(&self.client_manager)))
.unzip();
// Each produce operation returns a kafka offset of the produced record.
// The offsets are then converted to entry ids.
let entry_ids = futures::future::try_join_all(tasks)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
let last_entry_ids = region_ids
.into_iter()
.zip(entry_ids)
.collect::<HashMap<_, _>>();

#[cfg(debug)]
{
for (region_id, offset) in last_entry_ids.iter() {
debug!("Entries for region {region_id} are appended at the start offset {offset}");
}
}
// Produces entries for each region and gets the offset those entries written to.
// The returned offset is then converted into an entry id.
let last_entry_ids = futures::future::try_join_all(producers.into_iter().map(
|(region_id, producer)| async move {
let entry_id = producer
.produce(&self.client_manager)
.await
.map(TryInto::try_into)??;
Ok((region_id, entry_id))
},
))
.await?
.into_iter()
.collect::<HashMap<_, _>>();

debug!("Append batch result: {:?}", last_entry_ids);

Ok(AppendBatchResponse { last_entry_ids })
}
Expand Down

0 comments on commit fd4e506

Please sign in to comment.