diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index 5c261bbcc766..7e45f9123633 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -24,7 +24,7 @@ use snafu::ensure; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; use store_api::storage::RegionId; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tokio_stream::StreamExt; @@ -38,7 +38,7 @@ pub(crate) struct WalEntryDistributor { raw_wal_reader: Arc, provider: Provider, /// Sends [Entry] to receivers based on [RegionId] - senders: HashMap>, + senders: HashMap>, /// Waits for the arg from the [WalEntryReader]. arg_receivers: Vec<(RegionId, oneshot::Receiver)>, } @@ -75,7 +75,7 @@ impl WalEntryDistributor { for (region_id, start_id) in args { subscribers.insert( region_id, - Receiver { + EntryReceiver { start_id, sender: self.senders[®ion_id].clone(), }, @@ -89,9 +89,9 @@ impl WalEntryDistributor { let entry_id = entry.entry_id(); let region_id = entry.region_id(); - if let Some(Receiver { sender, start_id }) = subscribers.get(®ion_id) { + if let Some(EntryReceiver { sender, start_id }) = subscribers.get(®ion_id) { if entry_id >= *start_id { - if let Err(err) = sender.send(entry) { + if let Err(err) = sender.send(entry).await { error!(err; "Failed to distribute raw entry, entry_id:{}, region_id: {}", entry_id, region_id); } } @@ -109,7 +109,7 @@ impl WalEntryDistributor { pub(crate) struct WalEntryReceiver { region_id: RegionId, /// Receives the [Entry] from the [WalEntryDistributor]. - entry_receiver: UnboundedReceiver, + entry_receiver: Receiver, /// Sends the `start_id` to the [WalEntryDistributor]. arg_sender: oneshot::Sender, } @@ -117,7 +117,7 @@ pub(crate) struct WalEntryReceiver { impl WalEntryReceiver { pub fn new( region_id: RegionId, - entry_receiver: UnboundedReceiver, + entry_receiver: Receiver, arg_sender: oneshot::Sender, ) -> Self { Self { @@ -170,9 +170,9 @@ impl WalEntryReader for WalEntryReceiver { } } -struct Receiver { +struct EntryReceiver { start_id: EntryId, - sender: UnboundedSender, + sender: Sender, } /// Returns [WalEntryDistributor] and batch [WalEntryReceiver]s. @@ -195,13 +195,14 @@ pub fn build_wal_entry_distributor_and_receivers( provider: Provider, raw_wal_reader: Arc, region_ids: Vec, + buffer_size: usize, ) -> (WalEntryDistributor, Vec) { let mut senders = HashMap::with_capacity(region_ids.len()); let mut readers = Vec::with_capacity(region_ids.len()); let mut arg_receivers = Vec::with_capacity(region_ids.len()); for region_id in region_ids { - let (entry_sender, entry_receiver) = mpsc::unbounded_channel(); + let (entry_sender, entry_receiver) = mpsc::channel(buffer_size); let (arg_sender, arg_receiver) = oneshot::channel(); senders.insert(region_id, entry_sender); @@ -266,6 +267,7 @@ mod tests { provider, reader, vec![RegionId::new(1024, 1), RegionId::new(1025, 1)], + 128, ); // Drops all receivers @@ -329,6 +331,7 @@ mod tests { RegionId::new(1024, 2), RegionId::new(1024, 3), ], + 128, ); assert_eq!(receivers.len(), 3); @@ -434,6 +437,7 @@ mod tests { provider.clone(), Arc::new(corrupted_stream), vec![region1, region2, region3], + 128, ); assert_eq!(receivers.len(), 3); let mut streams = receivers @@ -516,6 +520,7 @@ mod tests { provider.clone(), Arc::new(corrupted_stream), vec![region1, region2], + 128, ); assert_eq!(receivers.len(), 2); let mut streams = receivers @@ -607,6 +612,7 @@ mod tests { provider.clone(), reader, vec![RegionId::new(1024, 1), RegionId::new(1024, 2)], + 128, ); assert_eq!(receivers.len(), 2); let mut streams = receivers