diff --git a/src/sources/channel.rs b/src/sources/channel.rs index 91275961..c01407c9 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -126,6 +126,7 @@ impl SyncSender { pub struct Channel { receiver: mpsc::Receiver, source: PingSource, + ping: Ping, capacity: usize, } @@ -161,9 +162,13 @@ pub fn channel() -> (Sender, Channel) { let (sender, receiver) = mpsc::channel(); let (ping, source) = make_ping().expect("Failed to create a Ping."); ( - Sender { sender, ping }, + Sender { + sender, + ping: ping.clone(), + }, Channel { receiver, + ping, source, capacity: usize::MAX, }, @@ -175,10 +180,14 @@ pub fn sync_channel(bound: usize) -> (SyncSender, Channel) { let (sender, receiver) = mpsc::sync_channel(bound); let (ping, source) = make_ping().expect("Failed to create a Ping."); ( - SyncSender { sender, ping }, + SyncSender { + sender, + ping: ping.clone(), + }, Channel { receiver, source, + ping, capacity: bound, }, ) @@ -200,31 +209,36 @@ impl EventSource for Channel { C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { let receiver = &self.receiver; - + let capacity = self.capacity; let mut clear_readiness = false; - // Limit the number of elements we process at a time to the channel's capacity, or 1024. - let max = cmp::min(self.capacity, MAX_EVENTS_CHECK); - for _ in 0..max { - match receiver.try_recv() { - Ok(val) => callback(Event::Msg(val), &mut ()), - Err(mpsc::TryRecvError::Empty) => { - clear_readiness = true; - break; - } - Err(mpsc::TryRecvError::Disconnected) => { - callback(Event::Closed, &mut ()); - clear_readiness = true; - break; + let action = self + .source + .process_events(readiness, token, |(), &mut ()| { + // Limit the number of elements we process at a time to the channel's capacity, or 1024. + let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK); + for _ in 0..max { + match receiver.try_recv() { + Ok(val) => callback(Event::Msg(val), &mut ()), + Err(mpsc::TryRecvError::Empty) => { + clear_readiness = true; + break; + } + Err(mpsc::TryRecvError::Disconnected) => { + callback(Event::Closed, &mut ()); + clear_readiness = true; + break; + } + } } - } - } + }) + .map_err(ChannelError)?; if clear_readiness { - self.source - .process_events(readiness, token, |(), &mut ()| {}) - .map_err(ChannelError) + Ok(action) } else { + // Re-notify the ping source so we can try again. + self.ping.ping(); Ok(PostAction::Continue) } } diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 9a611980..553f131f 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -48,7 +48,10 @@ pub struct Executor { state: Rc>, /// Notifies us when the executor is woken up. - ping: PingSource, + source: PingSource, + + /// Used for when we need to wake ourselves up. + ping: Ping, } /// A scheduler to send futures to an executor @@ -274,7 +277,7 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { active_tasks: RefCell::new(Some(Slab::new())), sender: Arc::new(Sender { sender: Mutex::new(sender), - wake_up, + wake_up: wake_up.clone(), notified: AtomicBool::new(false), }), }); @@ -282,7 +285,8 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { Ok(( Executor { state: state.clone(), - ping, + source: ping, + ping: wake_up, }, Scheduler { state }, )) @@ -305,62 +309,66 @@ impl EventSource for Executor { { let state = &self.state; - let clear_readiness = { + // Set to the unnotified state. + state.sender.notified.store(false, Ordering::SeqCst); + + let (clear_readiness, action) = { let mut clear_readiness = false; - // Process runnables, but not too many at a time; better to move onto the next event quickly! - for _ in 0..1024 { - let runnable = match state.incoming.try_recv() { - Ok(runnable) => runnable, - Err(_) => { - // Make sure to clear the readiness if there are no more runnables. - clear_readiness = true; - break; - } - }; + let action = self + .source + .process_events(readiness, token, |(), &mut ()| { + // Process runnables, but not too many at a time; better to move onto the next event quickly! + for _ in 0..1024 { + let runnable = match state.incoming.try_recv() { + Ok(runnable) => runnable, + Err(_) => { + // Make sure to clear the readiness if there are no more runnables. + clear_readiness = true; + break; + } + }; - // Run the runnable. - let index = *runnable.metadata(); - runnable.run(); + // Run the runnable. + let index = *runnable.metadata(); + runnable.run(); - // If the runnable finished with a result, call the callback. - let mut active_guard = state.active_tasks.borrow_mut(); - let active_tasks = active_guard.as_mut().unwrap(); + // If the runnable finished with a result, call the callback. + let mut active_guard = state.active_tasks.borrow_mut(); + let active_tasks = active_guard.as_mut().unwrap(); - if let Some(state) = active_tasks.get(index) { - if state.is_finished() { - // Take out the state and provide it to the caller. - let result = match active_tasks.remove(index) { - Active::Finished(result) => result, - _ => unreachable!(), - }; + if let Some(state) = active_tasks.get(index) { + if state.is_finished() { + // Take out the state and provide it to the caller. + let result = match active_tasks.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; - // Drop the guard since the callback may register another future to the scheduler. - drop(active_guard); + // Drop the guard since the callback may register another future to the scheduler. + drop(active_guard); - callback(result, &mut ()); + callback(result, &mut ()); + } + } } - } - } + }) + .map_err(ExecutorError::WakeError)?; - clear_readiness + (clear_readiness, action) }; - // Clear the readiness of the ping source if there are no more runnables. - if clear_readiness { - self.ping - .process_events(readiness, token, |(), &mut ()| {}) - .map_err(ExecutorError::WakeError)?; + // Re-ready the ping source if we need to re-run this handler. + if !clear_readiness { + self.ping.ping(); + Ok(PostAction::Continue) + } else { + Ok(action) } - - // Set to the unnotified state. - state.sender.notified.store(false, Ordering::SeqCst); - - Ok(PostAction::Continue) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.ping.register(poll, token_factory)?; + self.source.register(poll, token_factory)?; Ok(()) } @@ -369,12 +377,12 @@ impl EventSource for Executor { poll: &mut Poll, token_factory: &mut TokenFactory, ) -> crate::Result<()> { - self.ping.reregister(poll, token_factory)?; + self.source.reregister(poll, token_factory)?; Ok(()) } fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.ping.unregister(poll)?; + self.source.unregister(poll)?; Ok(()) } }