Skip to content

Commit

Permalink
exp: introduce guard in Rx
Browse files Browse the repository at this point in the history
  • Loading branch information
mox692 committed Dec 14, 2024
1 parent d05aaf1 commit df4e42c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
56 changes: 28 additions & 28 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,34 @@ impl<T, S: Semaphore> Drop for Rx<T, S> {

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
struct Guard<'a, T, S: Semaphore> {
list: &'a mut list::Rx<T>,
tx: &'a CachePadded<crate::sync::mpsc::list::Tx<T>>,
sem: &'a S,
}

while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
self.inner.semaphore.add_permit();
impl<'a, T, S: Semaphore> Guard<'a, T, S> {
fn drain(&mut self) {
// call T's destructor.
while let Some(Value(_)) = self.list.pop(self.tx) {
self.sem.add_permit();
}
}
}

impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> {
fn drop(&mut self) {
self.drain();
}
}

let mut guard = Guard {
list: &mut rx_fields.list,
tx: &self.inner.tx,
sem: &self.inner.semaphore,
};

guard.drain();
});
}
}
Expand Down Expand Up @@ -535,32 +559,8 @@ impl<T, S> Drop for Chan<T, S> {
self.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };

struct Guard<'a, T> {
list: &'a mut list::Rx<T>,
tx: &'a CachePadded<crate::sync::mpsc::list::Tx<T>>,
}

impl<'a, T> Guard<'a, T> {
fn drain(&mut self) {
// call T's destructor.
while let Some(Value(_)) = self.list.pop(self.tx) {}
}
}

impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
self.drain();
// free memory blocks
unsafe { self.list.free_blocks() };
}
}

let mut guard = Guard {
list: &mut rx_fields.list,
tx: &self.tx,
};

guard.drain();
while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {}
unsafe { rx_fields.list.free_blocks() };
});
}
}
Expand Down
18 changes: 7 additions & 11 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1476,28 +1476,24 @@ fn drop_called() {
// This instances would be dropped in the spawned thread.
tx.send(A(true)).unwrap();
// This instances would be dropped in the main thread.
tx.send(A(true)).unwrap();
tx.send(A(false)).unwrap();
// This instances would be dropped in the main thread.
tx.send(A(false)).unwrap();

let jh = std::thread::spawn(|| {
drop(rx);
// `mpsc::Rx`'s drop is called, but got panicked while
// dropping the first value.
});
// `mpsc::Rx`'s drop is called and got panicked while dropping the first value,
// but keep dropping following element.
drop(rx);

let _ = jh.join();
assert_eq!(COUNTER.load(std::sync::atomic::Ordering::Relaxed), 3);

drop(tx);
// `mpsc::Chan`'s drop is called, got panic while dropping
// the second value.

// `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation.
}

let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
func();
}));

assert_eq!(COUNTER.load(std::sync::atomic::Ordering::Relaxed), 3);
}

fn is_debug<T: fmt::Debug>(_: &T) {}

0 comments on commit df4e42c

Please sign in to comment.