Skip to content

Commit

Permalink
bugfix: Bound channel elements per iteration
Browse files Browse the repository at this point in the history
At the moment, the channel implentation processes as many channel
elements as possible every time "process_events" is called. However,
in multithreaded cases this can cause the loop to be stuck in the
"process_events" section of a channel forever. If one thread keeps
sending new elements into the channel while the current thread keeps
reading them, it will starve other event sources of running time.

This commit fixes this issue by bounding the number of channel elements
that can be processed every time "process_events" is called. It chooses
the smallest of the following numbers:

- The capacity of the channel.
- 1024 (chosen because this is also used by async-executor)

If the channel is not empty after we have read this number of elements,
the underlying source is not triggered. This should make it so the
channel is immediately re-polled on the next dispatch. However it gives
other sources more time to run.

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Jul 21, 2024
1 parent 3ef1bf2 commit f52b7ee
Showing 1 changed file with 91 additions and 13 deletions.
104 changes: 91 additions & 13 deletions src/sources/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
//! A synchronous version of the channel is provided by [`sync_channel`], in which
//! the [`SyncSender`] will block when the channel is full.
use std::cmp;
use std::fmt;
use std::sync::mpsc;

use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};

use super::ping::{make_ping, Ping, PingError, PingSource};

const MAX_EVENTS_CHECK: usize = 1024;

/// The events generated by the channel event source
#[derive(Debug)]
pub enum Event<T> {
Expand Down Expand Up @@ -123,6 +126,7 @@ impl<T> SyncSender<T> {
pub struct Channel<T> {
receiver: mpsc::Receiver<T>,
source: PingSource,
capacity: usize,
}

// This impl is safe because the Channel is only able to move around threads
Expand Down Expand Up @@ -156,14 +160,28 @@ impl<T> Channel<T> {
pub fn channel<T>() -> (Sender<T>, Channel<T>) {
let (sender, receiver) = mpsc::channel();
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(Sender { sender, ping }, Channel { receiver, source })
(
Sender { sender, ping },
Channel {
receiver,
source,
capacity: usize::MAX,
},
)
}

/// Create a new synchronous, bounded channel
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
let (sender, receiver) = mpsc::sync_channel(bound);
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(SyncSender { sender, ping }, Channel { receiver, source })
(
SyncSender { sender, ping },
Channel {
receiver,
source,
capacity: bound,
},
)
}

impl<T> EventSource for Channel<T> {
Expand All @@ -182,18 +200,33 @@ impl<T> EventSource for Channel<T> {
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let receiver = &self.receiver;
self.source
.process_events(readiness, token, |(), &mut ()| loop {
match receiver.try_recv() {
Ok(val) => callback(Event::Msg(val), &mut ()),
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
callback(Event::Closed, &mut ());
break;
}

let mut clear_readiness = false;

// Limit the number of elements we process at a time to the channel's capacity, or 1024.
let max = cmp::min(self.capacity, MAX_EVENTS_CHECK);
for _ in 0..max {
match receiver.try_recv() {
Ok(val) => callback(Event::Msg(val), &mut ()),
Err(mpsc::TryRecvError::Empty) => {
clear_readiness = true;
break;
}
})
.map_err(ChannelError)
Err(mpsc::TryRecvError::Disconnected) => {
callback(Event::Closed, &mut ());
clear_readiness = true;
break;
}
}
}

if clear_readiness {
self.source
.process_events(readiness, token, |(), &mut ()| {})
.map_err(ChannelError)
} else {
Ok(PostAction::Continue)
}
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
Expand Down Expand Up @@ -338,4 +371,49 @@ mod tests {
assert_eq!(received.0, 3);
assert!(received.1);
}

#[test]
fn test_more_than_1024() {
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();

let (tx, rx) = channel::<()>();
let mut received = (0u32, false);

handle
.insert_source(
rx,
move |evt, &mut (), received: &mut (u32, bool)| match evt {
Event::Msg(()) => received.0 += 1,
Event::Closed => received.1 = true,
},
)
.unwrap();

event_loop
.dispatch(Some(std::time::Duration::ZERO), &mut received)
.unwrap();

assert_eq!(received.0, 0);
assert!(!received.1);

// Send 1025 elements into the channel.
for _ in 0..MAX_EVENTS_CHECK + 1 {
tx.send(()).unwrap();
}

event_loop
.dispatch(Some(std::time::Duration::ZERO), &mut received)
.unwrap();

assert_eq!(received.0, MAX_EVENTS_CHECK as u32);
assert!(!received.1);

event_loop
.dispatch(Some(std::time::Duration::ZERO), &mut received)
.unwrap();

assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32);
assert!(!received.1);
}
}

0 comments on commit f52b7ee

Please sign in to comment.