diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 3a1273ccc679..8878a1b2bebe 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -100,10 +99,14 @@ impl ClientManager { /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { let mut client_pool = self.client_pool.lock().await; - if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) { - entry.insert(self.try_create_client(topic).await?); + match client_pool.get(topic) { + Some(client) => Ok(client.clone()), + None => { + let client = self.try_create_client(topic).await?; + client_pool.insert(topic.to_string(), client.clone()); + Ok(client) + } } - Ok(client_pool[topic].clone()) } async fn try_create_client(&self, topic: &Topic) -> Result { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index fadb24038c6e..20bcd4e7cf50 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -100,29 +100,22 @@ impl LogStore for KafkaLogStore { .push(entry); } - // Builds a record from entries belong to a region and produces them to kafka server. - let (region_ids, tasks): (Vec<_>, Vec<_>) = producers - .into_iter() - .map(|(id, producer)| (id, producer.produce(&self.client_manager))) - .unzip(); - // Each produce operation returns a kafka offset of the produced record. - // The offsets are then converted to entry ids. - let entry_ids = futures::future::try_join_all(tasks) - .await? - .into_iter() - .map(TryInto::try_into) - .collect::>>()?; - let last_entry_ids = region_ids - .into_iter() - .zip(entry_ids) - .collect::>(); - - #[cfg(debug)] - { - for (region_id, offset) in last_entry_ids.iter() { - debug!("Entries for region {region_id} are appended at the start offset {offset}"); - } - } + // Produces entries for each region and gets the offset those entries written to. + // The returned offset is then converted into an entry id. + let last_entry_ids = futures::future::try_join_all(producers.into_iter().map( + |(region_id, producer)| async move { + let entry_id = producer + .produce(&self.client_manager) + .await + .map(TryInto::try_into)??; + Ok((region_id, entry_id)) + }, + )) + .await? + .into_iter() + .collect::>(); + + debug!("Append batch result: {:?}", last_entry_ids); Ok(AppendBatchResponse { last_entry_ids }) }