From df4e42c661db0c50b11355c47153fa9ef64a427c Mon Sep 17 00:00:00 2001 From: mox692 Date: Sat, 14 Dec 2024 21:02:13 +0900 Subject: [PATCH] exp: introduce guard in Rx --- tokio/src/sync/mpsc/chan.rs | 56 ++++++++++++++++++------------------- tokio/tests/sync_mpsc.rs | 18 +++++------- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 14ee695a338..4c0f1964eae 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -490,10 +490,34 @@ impl Drop for Rx { self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; + struct Guard<'a, T, S: Semaphore> { + list: &'a mut list::Rx, + tx: &'a CachePadded>, + sem: &'a S, + } - while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) { - self.inner.semaphore.add_permit(); + impl<'a, T, S: Semaphore> Guard<'a, T, S> { + fn drain(&mut self) { + // call T's destructor. + while let Some(Value(_)) = self.list.pop(self.tx) { + self.sem.add_permit(); + } + } } + + impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> { + fn drop(&mut self) { + self.drain(); + } + } + + let mut guard = Guard { + list: &mut rx_fields.list, + tx: &self.inner.tx, + sem: &self.inner.semaphore, + }; + + guard.drain(); }); } } @@ -535,32 +559,8 @@ impl Drop for Chan { self.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; - struct Guard<'a, T> { - list: &'a mut list::Rx, - tx: &'a CachePadded>, - } - - impl<'a, T> Guard<'a, T> { - fn drain(&mut self) { - // call T's destructor. - while let Some(Value(_)) = self.list.pop(self.tx) {} - } - } - - impl<'a, T> Drop for Guard<'a, T> { - fn drop(&mut self) { - self.drain(); - // free memory blocks - unsafe { self.list.free_blocks() }; - } - } - - let mut guard = Guard { - list: &mut rx_fields.list, - tx: &self.tx, - }; - - guard.drain(); + while let Some(Value(_)) = rx_fields.list.pop(&self.tx) {} + unsafe { rx_fields.list.free_blocks() }; }); } } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index c25c780d955..17f92d191b1 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1476,28 +1476,24 @@ fn drop_called() { // This instances would be dropped in the spawned thread. tx.send(A(true)).unwrap(); // This instances would be dropped in the main thread. - tx.send(A(true)).unwrap(); + tx.send(A(false)).unwrap(); // This instances would be dropped in the main thread. tx.send(A(false)).unwrap(); - let jh = std::thread::spawn(|| { - drop(rx); - // `mpsc::Rx`'s drop is called, but got panicked while - // dropping the first value. - }); + // `mpsc::Rx`'s drop is called and got panicked while dropping the first value, + // but keep dropping following element. + drop(rx); - let _ = jh.join(); + assert_eq!(COUNTER.load(std::sync::atomic::Ordering::Relaxed), 3); drop(tx); - // `mpsc::Chan`'s drop is called, got panic while dropping - // the second value. + + // `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation. } let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { func(); })); - - assert_eq!(COUNTER.load(std::sync::atomic::Ordering::Relaxed), 3); } fn is_debug(_: &T) {}