Skip to content

Commit

Permalink
Fixed the try hang for both provider_chain and provider spread
Browse files Browse the repository at this point in the history
- provider_loop still has issues with try scripts
  • Loading branch information
tkmcmaster committed Nov 6, 2023
1 parent 9dee403 commit fc767b6
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
40 changes: 33 additions & 7 deletions lib/channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl<T: Serialize> Channel<T> {
self.on_demand_events.notify(1);
}

/// notify all OnDemand with an event listener
fn notify_all_on_demand(&self) {
self.on_demand_events.notify(std::usize::MAX);
}

/// create a listener so an OnDemand can get notice when demand has been requested
/// (a receiver tried to receive but the queue was empty)
fn on_demand_listen(&self) -> EventListener {
Expand Down Expand Up @@ -340,7 +345,13 @@ impl<T: Serialize> Sender<T> {
impl<T: Serialize> Clone for Sender<T> {
fn clone(&self) -> Self {
let count = self.channel.increment_sender_count();
info!("Sender::clone: {} new count: {}", self.name(), count);
let on_demand_count = self.channel.on_demand_count();
info!(
"Sender::Clone channel {}, new count: {}, on_demand_count: {}",
self.name(),
count,
on_demand_count
);
Sender {
channel: self.channel.clone(),
listener: None,
Expand All @@ -353,8 +364,15 @@ impl<T: Serialize> Clone for Sender<T> {
impl<T: Serialize> Drop for Sender<T> {
fn drop(&mut self) {
let count = self.channel.decrement_sender_count();
info!("Sender::drop: {} new count: {}", self.name(), count);
let on_demand_count = self.channel.on_demand_count();
info!(
"Sender::Drop channel {}, new count: {}, on_demand_count: {}",
self.name(),
count,
on_demand_count
);
if count == 0 {
info!("Sender::Drop channel {}, notify_all_receivers", self.name());
self.channel.notify_all_receivers();
}
}
Expand Down Expand Up @@ -431,7 +449,7 @@ impl<T: Serialize> Sink<T> for Sender<T> {
if self.no_receivers() {
self.listener = None;
debug!(
"Sink for Sender:poll_ready {} no_receivers, length: ${}",
"Sink for Sender:poll_ready {} no_receivers, length: {}",
self.name(),
self.len()
);
Expand Down Expand Up @@ -531,9 +549,10 @@ pub struct Receiver<T: Serialize> {
impl<T: Serialize> Clone for Receiver<T> {
fn clone(&self) -> Self {
let count = self.channel.increment_receiver_count();
let on_demand_count = self.channel.on_demand_count();
debug!(
"Receiver:Clone cloning channel {}, new count: {}",
self.channel.name, count
"Receiver:Clone channel {}, new count: {}, on_demand_count: {}",
self.channel.name, count, on_demand_count
);
Receiver {
channel: self.channel.clone(),
Expand All @@ -547,9 +566,10 @@ impl<T: Serialize> Clone for Receiver<T> {
impl<T: Serialize> Drop for Receiver<T> {
fn drop(&mut self) {
let new_count = self.channel.decrement_receiver_count();
let on_demand_count = self.channel.on_demand_count();
debug!(
"Receiver:Drop channel {}, new count: {}",
self.channel.name, new_count
"Receiver:Drop channel {}, new count: {}, on_demand_count: {}",
self.channel.name, new_count, on_demand_count
);
if new_count == 0 {
// notify all senders so they will see there are no more receivers
Expand All @@ -558,6 +578,8 @@ impl<T: Serialize> Drop for Receiver<T> {
"Receiver:Drop channel {}, notify_all_senders",
self.channel.name
);
// When there are no more receivers we need to notify the on_demand in addition to the normal senders
self.channel.notify_all_on_demand();
self.channel.notify_all_senders();
}
}
Expand Down Expand Up @@ -674,6 +696,10 @@ impl<T: Serialize + Send + 'static> Stream for OnDemandReceiver<T> {
self.channel.len()
);
self.listener = None;
debug!(
"OnDemandReceiver::poll_next {} listener: None",
self.channel.name
);
return Poll::Ready(None);
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ fn create_try_run_future(

let mut test_ended_rx = BroadcastStream::new(test_ended_tx.subscribe());
let mut left = try_join_all(endpoint_calls).map(move |r| {
debug!("create_try_run_future try_join_all finish {:?}", r);
let _ = test_ended_tx.send(r.map(|_| TestEndReason::Completed));
});
let f = future::poll_fn(move |cx| match left.poll_unpin(cx) {
Expand Down

0 comments on commit fc767b6

Please sign in to comment.