Skip to content

Commit

Permalink
experiment
Browse files Browse the repository at this point in the history
  • Loading branch information
mox692 committed Nov 17, 2024
1 parent 2f89914 commit 76ba737
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 1 deletion.
3 changes: 3 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,6 @@ path = "named-pipe-multi-client.rs"
[[example]]
name = "dump"
path = "dump.rs"
[[example]]
name = "flamegraph_mpsc"
path = "flamegraph_mpsc.rs"
43 changes: 43 additions & 0 deletions examples/flamegraph_mpsc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::hint::black_box;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{reclaim_called_count, reuse_failed_count};
use tokio::task::JoinSet;

fn multi_rt() -> Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
}

async fn run() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<usize>(100_000);

let mut set = JoinSet::new();
for _ in 0..10 {
let tx = tx.clone();
set.spawn(async move {
for i in 0..100_ {
tx.send(i).await.unwrap();
}
});
}

black_box(for _ in 0..100_0 {
rx.recv().await.unwrap();
});
while let Some(res) = set.join_next().await {
let _ = res;
}
}
fn main() {
let rt = multi_rt();
rt.block_on(async {
tokio::spawn(async {
run().await;
})
.await
.unwrap();
});
println!("{} / {}", reuse_failed_count(), reclaim_called_count())
}
8 changes: 8 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cfg_time! {
use std::fmt;
use std::task::{Context, Poll};

use super::list::{RECLAIM_CALLED_COUNT, REUSE_FAILED_COUNT};

/// Sends values to the associated `Receiver`.
///
/// Instances are created by the [`channel`] function.
Expand Down Expand Up @@ -743,6 +745,12 @@ impl<T> fmt::Debug for Receiver<T> {

impl<T> Unpin for Receiver<T> {}

pub fn reclaim_called_count() -> usize {
RECLAIM_CALLED_COUNT.load(std::sync::atomic::Ordering::Acquire)
}
pub fn reuse_failed_count() -> usize {
REUSE_FAILED_COUNT.load(std::sync::atomic::Ordering::Acquire)
}
impl<T> Sender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
Sender { chan }
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl<T> Tx<T> {
}

pub(crate) unsafe fn reclaim_block(&self, mut block: NonNull<Block<T>>) {
RECLAIM_CALLED_COUNT.fetch_add(1, Relaxed);
// The block has been removed from the linked list and ownership
// is reclaimed.
//
Expand Down Expand Up @@ -215,6 +216,7 @@ impl<T> Tx<T> {
}

if !reused {
REUSE_FAILED_COUNT.fetch_add(1, Relaxed);
let _ = Box::from_raw(block.as_ptr());
}
}
Expand All @@ -238,6 +240,9 @@ impl<T> fmt::Debug for Tx<T> {
}
}

pub(crate) static RECLAIM_CALLED_COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) static REUSE_FAILED_COUNT: AtomicUsize = AtomicUsize::new(0);

impl<T> Rx<T> {
pub(crate) fn is_empty(&self, tx: &Tx<T>) -> bool {
let block = unsafe { self.head.as_ref() };
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ pub(super) mod block;

mod bounded;
pub use self::bounded::{
channel, OwnedPermit, Permit, PermitIterator, Receiver, Sender, WeakSender,
channel, reclaim_called_count, reuse_failed_count, OwnedPermit, Permit, PermitIterator,
Receiver, Sender, WeakSender,
};

mod chan;
Expand Down

0 comments on commit 76ba737

Please sign in to comment.