From fc767b63e45d1305f35a2a48e7e65a3f4ddfaf53 Mon Sep 17 00:00:00 2001 From: Trevor McMaster Date: Mon, 6 Nov 2023 11:45:22 -0700 Subject: [PATCH] Fixed the try hang for both provider_chain and provider spread - provider_loop still has issues with try scripts --- lib/channel/src/lib.rs | 40 +++++++++++++++++++++++++++++++++------- src/lib.rs | 1 + 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/lib/channel/src/lib.rs b/lib/channel/src/lib.rs index e9971f11..0353b460 100644 --- a/lib/channel/src/lib.rs +++ b/lib/channel/src/lib.rs @@ -181,6 +181,11 @@ impl Channel { self.on_demand_events.notify(1); } + /// notify all OnDemand with an event listener + fn notify_all_on_demand(&self) { + self.on_demand_events.notify(std::usize::MAX); + } + /// create a listener so an OnDemand can get notice when demand has been requested /// (a receiver tried to receive but the queue was empty) fn on_demand_listen(&self) -> EventListener { @@ -340,7 +345,13 @@ impl Sender { impl Clone for Sender { fn clone(&self) -> Self { let count = self.channel.increment_sender_count(); - info!("Sender::clone: {} new count: {}", self.name(), count); + let on_demand_count = self.channel.on_demand_count(); + info!( + "Sender::Clone channel {}, new count: {}, on_demand_count: {}", + self.name(), + count, + on_demand_count + ); Sender { channel: self.channel.clone(), listener: None, @@ -353,8 +364,15 @@ impl Clone for Sender { impl Drop for Sender { fn drop(&mut self) { let count = self.channel.decrement_sender_count(); - info!("Sender::drop: {} new count: {}", self.name(), count); + let on_demand_count = self.channel.on_demand_count(); + info!( + "Sender::Drop channel {}, new count: {}, on_demand_count: {}", + self.name(), + count, + on_demand_count + ); if count == 0 { + info!("Sender::Drop channel {}, notify_all_receivers", self.name()); self.channel.notify_all_receivers(); } } @@ -431,7 +449,7 @@ impl Sink for Sender { if self.no_receivers() { self.listener = None; debug!( - "Sink for Sender:poll_ready {} no_receivers, length: ${}", + "Sink for Sender:poll_ready {} no_receivers, length: {}", self.name(), self.len() ); @@ -531,9 +549,10 @@ pub struct Receiver { impl Clone for Receiver { fn clone(&self) -> Self { let count = self.channel.increment_receiver_count(); + let on_demand_count = self.channel.on_demand_count(); debug!( - "Receiver:Clone cloning channel {}, new count: {}", - self.channel.name, count + "Receiver:Clone channel {}, new count: {}, on_demand_count: {}", + self.channel.name, count, on_demand_count ); Receiver { channel: self.channel.clone(), @@ -547,9 +566,10 @@ impl Clone for Receiver { impl Drop for Receiver { fn drop(&mut self) { let new_count = self.channel.decrement_receiver_count(); + let on_demand_count = self.channel.on_demand_count(); debug!( - "Receiver:Drop channel {}, new count: {}", - self.channel.name, new_count + "Receiver:Drop channel {}, new count: {}, on_demand_count: {}", + self.channel.name, new_count, on_demand_count ); if new_count == 0 { // notify all senders so they will see there are no more receivers @@ -558,6 +578,8 @@ impl Drop for Receiver { "Receiver:Drop channel {}, notify_all_senders", self.channel.name ); + // When there are no more receivers we need to notify the on_demand in addition to the normal senders + self.channel.notify_all_on_demand(); self.channel.notify_all_senders(); } } @@ -674,6 +696,10 @@ impl Stream for OnDemandReceiver { self.channel.len() ); self.listener = None; + debug!( + "OnDemandReceiver::poll_next {} listener: None", + self.channel.name + ); return Poll::Ready(None); } diff --git a/src/lib.rs b/src/lib.rs index 06612eae..7c1235b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1013,6 +1013,7 @@ fn create_try_run_future( let mut test_ended_rx = BroadcastStream::new(test_ended_tx.subscribe()); let mut left = try_join_all(endpoint_calls).map(move |r| { + debug!("create_try_run_future try_join_all finish {:?}", r); let _ = test_ended_tx.send(r.map(|_| TestEndReason::Completed)); }); let f = future::poll_fn(move |cx| match left.poll_unpin(cx) {