Skip to content

Commit

Permalink
Delay freeing Slab ids until events are processed
Browse files Browse the repository at this point in the history
  • Loading branch information
Drakulix committed Oct 5, 2023
1 parent 7ae6f36 commit 5624941
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

#### Bugfixes

- Fix an issue, where id-reuse could execute a PostAction on a newly registered event source

## 0.12.2 -- 2023-09-25

#### Bugfixes
Expand Down
2 changes: 1 addition & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<'l, F: AsFd> Async<'l, F> {
interest: Interest::EMPTY,
last_readiness: Readiness::EMPTY,
}));
let key = inner.sources.borrow_mut().insert(dispatcher.clone());
let key = inner.sources.borrow_mut().insert(Some(dispatcher.clone()));
dispatcher.borrow_mut().token = Some(Token { key });

// SAFETY: We are sure to deregister on drop.
Expand Down
112 changes: 85 additions & 27 deletions src/loop_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ impl RegistrationToken {

pub(crate) struct LoopInner<'l, Data> {
pub(crate) poll: RefCell<Poll>,
pub(crate) sources: RefCell<Slab<Rc<dyn EventDispatcher<Data> + 'l>>>,
// The `Option` is used to keep slots of the slab occipied, to prevent id reuse
// while in-flight events might still referr to a recently destroyed event source.
pub(crate) sources: RefCell<Slab<Option<Rc<dyn EventDispatcher<Data> + 'l>>>>,
pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
idles: RefCell<Vec<IdleCallback<'l, Data>>>,
pending_action: Cell<PostAction>,
Expand Down Expand Up @@ -149,9 +151,9 @@ impl<'l, Data> LoopHandle<'l, Data> {
)));
}

let key = sources.insert(dispatcher.clone_as_event_dispatcher());
let key = sources.insert(Some(dispatcher.clone_as_event_dispatcher()));
trace!("[calloop] Inserting new source #{}", key);
let ret = sources.get(key).unwrap().register(
let ret = sources.get(key).unwrap().as_ref().unwrap().register(
&mut poll,
&mut self
.inner
Expand Down Expand Up @@ -189,7 +191,7 @@ impl<'l, Data> LoopHandle<'l, Data> {
///
/// **Note:** this cannot be done from within the source callback.
pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if let Some(Some(source)) = self.inner.sources.borrow().get(token.key) {
trace!("[calloop] Registering source #{}", token.key);
source.register(
&mut self.inner.poll.borrow_mut(),
Expand All @@ -208,7 +210,7 @@ impl<'l, Data> LoopHandle<'l, Data> {
/// If after accessing the source you changed its parameters in a way that requires
/// updating its registration.
pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if let Some(Some(source)) = self.inner.sources.borrow().get(token.key) {
trace!("[calloop] Updating registration of source #{}", token.key);
if !source.reregister(
&mut self.inner.poll.borrow_mut(),
Expand All @@ -230,7 +232,7 @@ impl<'l, Data> LoopHandle<'l, Data> {
///
/// The source remains in the event loop, but it'll no longer generate events
pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if let Some(Some(source)) = self.inner.sources.borrow().get(token.key) {
trace!("[calloop] Unregistering source #{}", token.key);
if !source.unregister(
&mut self.inner.poll.borrow_mut(),
Expand All @@ -250,20 +252,25 @@ impl<'l, Data> LoopHandle<'l, Data> {

/// Removes this source from the event loop.
pub fn remove(&self, token: RegistrationToken) {
if let Some(source) = self.inner.sources.borrow_mut().try_remove(token.key) {
trace!("[calloop] Removing source #{}", token.key);
if let Err(e) = source.unregister(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
token,
) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
e
);
if let Some(source) = self.inner.sources.borrow_mut().get_mut(token.key) {
// We intentionally leave `None` in-place, as this might be called from
// within an event source callback. It will get cleaned up on the end
// of the currently running or next `dispatch_events` call.
if let Some(source) = source.take() {
trace!("[calloop] Removing source #{}", token.key);
if let Err(e) = source.unregister(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
token,
) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
e
);
}
}
}
}
Expand Down Expand Up @@ -353,7 +360,7 @@ impl<'l, Data> EventLoop<'l, Data> {
.borrow_mut();
let sources = &self.handle.inner.sources.borrow();
for source in &mut *extra_lifecycle_sources.values {
if let Some(disp) = sources.get(source.key) {
if let Some(Some(disp)) = sources.get(source.key) {
if let Some((readiness, token)) = disp.before_sleep()? {
// Wake up instantly after polling if we recieved an event
timeout = Some(Duration::ZERO);
Expand Down Expand Up @@ -394,7 +401,7 @@ impl<'l, Data> EventLoop<'l, Data> {
.borrow_mut();
if !extra_lifecycle_sources.values.is_empty() {
for source in &mut *extra_lifecycle_sources.values {
if let Some(disp) = self.handle.inner.sources.borrow().get(source.key) {
if let Some(Some(disp)) = self.handle.inner.sources.borrow().get(source.key) {
let iter = EventIterator {
inner: events.iter(),
registration_token: *source,
Expand All @@ -419,7 +426,7 @@ impl<'l, Data> EventLoop<'l, Data> {
.get(registroken_token)
.cloned();

if let Some(disp) = opt_disp {
if let Some(Some(disp)) = opt_disp {
trace!(
"[calloop] Dispatching events for source #{}",
registroken_token
Expand Down Expand Up @@ -472,22 +479,28 @@ impl<'l, Data> EventLoop<'l, Data> {
"[calloop] Postaction remove for source #{}",
registroken_token
);
// delete the source from the list, it'll be cleaned up with the if just below
// We intentionally leave `None` in-place, while there are still
// events being processed to prevent id reuse.
// Unregister will happen right after this match and cleanup at
// the end of the function.
self.handle
.inner
.sources
.borrow_mut()
.remove(registroken_token);
.get_mut(registroken_token)
.take();
}
PostAction::Continue => {}
}

if !self
if self
.handle
.inner
.sources
.borrow()
.contains(registroken_token)
.get(registroken_token)
.unwrap_or(&None)
.is_none()
{
// the source has been removed from within its callback, unregister it
let mut poll = self.handle.inner.poll.borrow_mut();
Expand All @@ -514,6 +527,13 @@ impl<'l, Data> EventLoop<'l, Data> {
}
}

// cleanup empty event source slots to free up ids again
self.handle
.inner
.sources
.borrow_mut()
.retain(|_, opt_disp| opt_disp.is_some());

Ok(())
}

Expand Down Expand Up @@ -1457,6 +1477,44 @@ mod tests {
assert_eq!(data, 22);
}

#[test]
fn reuse() {
use crate::sources::timer;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
let handle = evl.handle();

let data = Arc::new(Mutex::new(1));
let data_cloned = data.clone();

let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
let mut first_timer_token = evl
.handle()
.insert_source(timer_source, move |_, _, own_token| {
handle.remove(*own_token);
let data_cloned = data_cloned.clone();
let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
*data_cloned.lock().unwrap() = 2;
timer::TimeoutAction::Drop
});
timer::TimeoutAction::Drop
})
.unwrap();

let now = Instant::now();
loop {
evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
.unwrap();
if Instant::now().duration_since(now) > Duration::from_secs(3) {
break;
}
}

assert_eq!(*data.lock().unwrap(), 2);
}

#[test]
fn drop_of_subsource() {
struct WithSubSource {
Expand Down

0 comments on commit 5624941

Please sign in to comment.