From f52b7ee4ab5d8287cc2f153abbfcc56797e75f22 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 20 Jul 2024 18:41:50 -0700 Subject: [PATCH] bugfix: Bound channel elements per iteration At the moment, the channel implentation processes as many channel elements as possible every time "process_events" is called. However, in multithreaded cases this can cause the loop to be stuck in the "process_events" section of a channel forever. If one thread keeps sending new elements into the channel while the current thread keeps reading them, it will starve other event sources of running time. This commit fixes this issue by bounding the number of channel elements that can be processed every time "process_events" is called. It chooses the smallest of the following numbers: - The capacity of the channel. - 1024 (chosen because this is also used by async-executor) If the channel is not empty after we have read this number of elements, the underlying source is not triggered. This should make it so the channel is immediately re-polled on the next dispatch. However it gives other sources more time to run. Signed-off-by: John Nunley --- src/sources/channel.rs | 104 +++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 13 deletions(-) diff --git a/src/sources/channel.rs b/src/sources/channel.rs index c48f70e3..91275961 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -8,6 +8,7 @@ //! A synchronous version of the channel is provided by [`sync_channel`], in which //! the [`SyncSender`] will block when the channel is full. +use std::cmp; use std::fmt; use std::sync::mpsc; @@ -15,6 +16,8 @@ use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; use super::ping::{make_ping, Ping, PingError, PingSource}; +const MAX_EVENTS_CHECK: usize = 1024; + /// The events generated by the channel event source #[derive(Debug)] pub enum Event { @@ -123,6 +126,7 @@ impl SyncSender { pub struct Channel { receiver: mpsc::Receiver, source: PingSource, + capacity: usize, } // This impl is safe because the Channel is only able to move around threads @@ -156,14 +160,28 @@ impl Channel { pub fn channel() -> (Sender, Channel) { let (sender, receiver) = mpsc::channel(); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (Sender { sender, ping }, Channel { receiver, source }) + ( + Sender { sender, ping }, + Channel { + receiver, + source, + capacity: usize::MAX, + }, + ) } /// Create a new synchronous, bounded channel pub fn sync_channel(bound: usize) -> (SyncSender, Channel) { let (sender, receiver) = mpsc::sync_channel(bound); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (SyncSender { sender, ping }, Channel { receiver, source }) + ( + SyncSender { sender, ping }, + Channel { + receiver, + source, + capacity: bound, + }, + ) } impl EventSource for Channel { @@ -182,18 +200,33 @@ impl EventSource for Channel { C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { let receiver = &self.receiver; - self.source - .process_events(readiness, token, |(), &mut ()| loop { - match receiver.try_recv() { - Ok(val) => callback(Event::Msg(val), &mut ()), - Err(mpsc::TryRecvError::Empty) => break, - Err(mpsc::TryRecvError::Disconnected) => { - callback(Event::Closed, &mut ()); - break; - } + + let mut clear_readiness = false; + + // Limit the number of elements we process at a time to the channel's capacity, or 1024. + let max = cmp::min(self.capacity, MAX_EVENTS_CHECK); + for _ in 0..max { + match receiver.try_recv() { + Ok(val) => callback(Event::Msg(val), &mut ()), + Err(mpsc::TryRecvError::Empty) => { + clear_readiness = true; + break; } - }) - .map_err(ChannelError) + Err(mpsc::TryRecvError::Disconnected) => { + callback(Event::Closed, &mut ()); + clear_readiness = true; + break; + } + } + } + + if clear_readiness { + self.source + .process_events(readiness, token, |(), &mut ()| {}) + .map_err(ChannelError) + } else { + Ok(PostAction::Continue) + } } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { @@ -338,4 +371,49 @@ mod tests { assert_eq!(received.0, 3); assert!(received.1); } + + #[test] + fn test_more_than_1024() { + let mut event_loop = crate::EventLoop::try_new().unwrap(); + let handle = event_loop.handle(); + + let (tx, rx) = channel::<()>(); + let mut received = (0u32, false); + + handle + .insert_source( + rx, + move |evt, &mut (), received: &mut (u32, bool)| match evt { + Event::Msg(()) => received.0 += 1, + Event::Closed => received.1 = true, + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, 0); + assert!(!received.1); + + // Send 1025 elements into the channel. + for _ in 0..MAX_EVENTS_CHECK + 1 { + tx.send(()).unwrap(); + } + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, MAX_EVENTS_CHECK as u32); + assert!(!received.1); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32); + assert!(!received.1); + } }