Skip to content

Commit

Permalink
bugfix: Use a different strategy for yielding
Browse files Browse the repository at this point in the history
Previously, we only processed the source in the channel/executor
implementations when we needed to clear the readiness; however this
causes issues in the Windows `Ping` implementation. As a workaround,
this commit always calls "process_events" in `Ping`, but simply
re-notifies the PingSource if we need to try again.

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Jul 21, 2024
1 parent aa61d50 commit d09724a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 67 deletions.
56 changes: 35 additions & 21 deletions src/sources/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl<T> SyncSender<T> {
pub struct Channel<T> {
receiver: mpsc::Receiver<T>,
source: PingSource,
ping: Ping,
capacity: usize,
}

Expand Down Expand Up @@ -161,9 +162,13 @@ pub fn channel<T>() -> (Sender<T>, Channel<T>) {
let (sender, receiver) = mpsc::channel();
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(
Sender { sender, ping },
Sender {
sender,
ping: ping.clone(),
},
Channel {
receiver,
ping,
source,
capacity: usize::MAX,
},
Expand All @@ -175,10 +180,14 @@ pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
let (sender, receiver) = mpsc::sync_channel(bound);
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(
SyncSender { sender, ping },
SyncSender {
sender,
ping: ping.clone(),
},
Channel {
receiver,
source,
ping,
capacity: bound,
},
)
Expand All @@ -200,31 +209,36 @@ impl<T> EventSource for Channel<T> {
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let receiver = &self.receiver;

let capacity = self.capacity;
let mut clear_readiness = false;

// Limit the number of elements we process at a time to the channel's capacity, or 1024.
let max = cmp::min(self.capacity, MAX_EVENTS_CHECK);
for _ in 0..max {
match receiver.try_recv() {
Ok(val) => callback(Event::Msg(val), &mut ()),
Err(mpsc::TryRecvError::Empty) => {
clear_readiness = true;
break;
}
Err(mpsc::TryRecvError::Disconnected) => {
callback(Event::Closed, &mut ());
clear_readiness = true;
break;
let action = self
.source
.process_events(readiness, token, |(), &mut ()| {
// Limit the number of elements we process at a time to the channel's capacity, or 1024.
let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK);
for _ in 0..max {
match receiver.try_recv() {
Ok(val) => callback(Event::Msg(val), &mut ()),
Err(mpsc::TryRecvError::Empty) => {
clear_readiness = true;
break;
}
Err(mpsc::TryRecvError::Disconnected) => {
callback(Event::Closed, &mut ());
clear_readiness = true;
break;
}
}
}
}
}
})
.map_err(ChannelError)?;

if clear_readiness {
self.source
.process_events(readiness, token, |(), &mut ()| {})
.map_err(ChannelError)
Ok(action)
} else {
// Re-notify the ping source so we can try again.
self.ping.ping();
Ok(PostAction::Continue)
}
}
Expand Down
100 changes: 54 additions & 46 deletions src/sources/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ pub struct Executor<T> {
state: Rc<State<T>>,

/// Notifies us when the executor is woken up.
ping: PingSource,
source: PingSource,

/// Used for when we need to wake ourselves up.
ping: Ping,
}

/// A scheduler to send futures to an executor
Expand Down Expand Up @@ -274,15 +277,16 @@ pub fn executor<T>() -> crate::Result<(Executor<T>, Scheduler<T>)> {
active_tasks: RefCell::new(Some(Slab::new())),
sender: Arc::new(Sender {
sender: Mutex::new(sender),
wake_up,
wake_up: wake_up.clone(),
notified: AtomicBool::new(false),
}),
});

Ok((
Executor {
state: state.clone(),
ping,
source: ping,
ping: wake_up,
},
Scheduler { state },
))
Expand All @@ -305,62 +309,66 @@ impl<T> EventSource for Executor<T> {
{
let state = &self.state;

let clear_readiness = {
// Set to the unnotified state.
state.sender.notified.store(false, Ordering::SeqCst);

let (clear_readiness, action) = {
let mut clear_readiness = false;

// Process runnables, but not too many at a time; better to move onto the next event quickly!
for _ in 0..1024 {
let runnable = match state.incoming.try_recv() {
Ok(runnable) => runnable,
Err(_) => {
// Make sure to clear the readiness if there are no more runnables.
clear_readiness = true;
break;
}
};
let action = self
.source
.process_events(readiness, token, |(), &mut ()| {
// Process runnables, but not too many at a time; better to move onto the next event quickly!
for _ in 0..1024 {
let runnable = match state.incoming.try_recv() {
Ok(runnable) => runnable,
Err(_) => {
// Make sure to clear the readiness if there are no more runnables.
clear_readiness = true;
break;
}
};

// Run the runnable.
let index = *runnable.metadata();
runnable.run();
// Run the runnable.
let index = *runnable.metadata();
runnable.run();

// If the runnable finished with a result, call the callback.
let mut active_guard = state.active_tasks.borrow_mut();
let active_tasks = active_guard.as_mut().unwrap();
// If the runnable finished with a result, call the callback.
let mut active_guard = state.active_tasks.borrow_mut();
let active_tasks = active_guard.as_mut().unwrap();

if let Some(state) = active_tasks.get(index) {
if state.is_finished() {
// Take out the state and provide it to the caller.
let result = match active_tasks.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),
};
if let Some(state) = active_tasks.get(index) {
if state.is_finished() {
// Take out the state and provide it to the caller.
let result = match active_tasks.remove(index) {
Active::Finished(result) => result,
_ => unreachable!(),

Check warning on line 345 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L345

Added line #L345 was not covered by tests
};

// Drop the guard since the callback may register another future to the scheduler.
drop(active_guard);
// Drop the guard since the callback may register another future to the scheduler.
drop(active_guard);

callback(result, &mut ());
callback(result, &mut ());
}
}

Check warning on line 353 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L353

Added line #L353 was not covered by tests
}
}
}
})
.map_err(ExecutorError::WakeError)?;

clear_readiness
(clear_readiness, action)
};

// Clear the readiness of the ping source if there are no more runnables.
if clear_readiness {
self.ping
.process_events(readiness, token, |(), &mut ()| {})
.map_err(ExecutorError::WakeError)?;
// Re-ready the ping source if we need to re-run this handler.
if !clear_readiness {
self.ping.ping();
Ok(PostAction::Continue)
} else {
Ok(action)
}

// Set to the unnotified state.
state.sender.notified.store(false, Ordering::SeqCst);

Ok(PostAction::Continue)
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.ping.register(poll, token_factory)?;
self.source.register(poll, token_factory)?;
Ok(())
}

Expand All @@ -369,12 +377,12 @@ impl<T> EventSource for Executor<T> {
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.ping.reregister(poll, token_factory)?;
self.source.reregister(poll, token_factory)?;

Check warning on line 380 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L380

Added line #L380 was not covered by tests
Ok(())
}

fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.ping.unregister(poll)?;
self.source.unregister(poll)?;

Check warning on line 385 in src/sources/futures.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/futures.rs#L385

Added line #L385 was not covered by tests
Ok(())
}
}
Expand Down

0 comments on commit d09724a

Please sign in to comment.