diff --git a/src/sources/channel.rs b/src/sources/channel.rs index c48f70e3..91275961 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -8,6 +8,7 @@ //! A synchronous version of the channel is provided by [`sync_channel`], in which //! the [`SyncSender`] will block when the channel is full. +use std::cmp; use std::fmt; use std::sync::mpsc; @@ -15,6 +16,8 @@ use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; use super::ping::{make_ping, Ping, PingError, PingSource}; +const MAX_EVENTS_CHECK: usize = 1024; + /// The events generated by the channel event source #[derive(Debug)] pub enum Event { @@ -123,6 +126,7 @@ impl SyncSender { pub struct Channel { receiver: mpsc::Receiver, source: PingSource, + capacity: usize, } // This impl is safe because the Channel is only able to move around threads @@ -156,14 +160,28 @@ impl Channel { pub fn channel() -> (Sender, Channel) { let (sender, receiver) = mpsc::channel(); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (Sender { sender, ping }, Channel { receiver, source }) + ( + Sender { sender, ping }, + Channel { + receiver, + source, + capacity: usize::MAX, + }, + ) } /// Create a new synchronous, bounded channel pub fn sync_channel(bound: usize) -> (SyncSender, Channel) { let (sender, receiver) = mpsc::sync_channel(bound); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (SyncSender { sender, ping }, Channel { receiver, source }) + ( + SyncSender { sender, ping }, + Channel { + receiver, + source, + capacity: bound, + }, + ) } impl EventSource for Channel { @@ -182,18 +200,33 @@ impl EventSource for Channel { C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { let receiver = &self.receiver; - self.source - .process_events(readiness, token, |(), &mut ()| loop { - match receiver.try_recv() { - Ok(val) => callback(Event::Msg(val), &mut ()), - Err(mpsc::TryRecvError::Empty) => break, - Err(mpsc::TryRecvError::Disconnected) => { - callback(Event::Closed, &mut ()); - break; - } + + let mut clear_readiness = false; + + // Limit the number of elements we process at a time to the channel's capacity, or 1024. + let max = cmp::min(self.capacity, MAX_EVENTS_CHECK); + for _ in 0..max { + match receiver.try_recv() { + Ok(val) => callback(Event::Msg(val), &mut ()), + Err(mpsc::TryRecvError::Empty) => { + clear_readiness = true; + break; } - }) - .map_err(ChannelError) + Err(mpsc::TryRecvError::Disconnected) => { + callback(Event::Closed, &mut ()); + clear_readiness = true; + break; + } + } + } + + if clear_readiness { + self.source + .process_events(readiness, token, |(), &mut ()| {}) + .map_err(ChannelError) + } else { + Ok(PostAction::Continue) + } } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { @@ -338,4 +371,49 @@ mod tests { assert_eq!(received.0, 3); assert!(received.1); } + + #[test] + fn test_more_than_1024() { + let mut event_loop = crate::EventLoop::try_new().unwrap(); + let handle = event_loop.handle(); + + let (tx, rx) = channel::<()>(); + let mut received = (0u32, false); + + handle + .insert_source( + rx, + move |evt, &mut (), received: &mut (u32, bool)| match evt { + Event::Msg(()) => received.0 += 1, + Event::Closed => received.1 = true, + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, 0); + assert!(!received.1); + + // Send 1025 elements into the channel. + for _ in 0..MAX_EVENTS_CHECK + 1 { + tx.send(()).unwrap(); + } + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, MAX_EVENTS_CHECK as u32); + assert!(!received.1); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32); + assert!(!received.1); + } }