diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index ec40031f4c..ca137c815f 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -754,7 +754,8 @@ impl Subscribers { async fn send(&mut self, event: Event) -> bool { let futs = self.0.iter().map(|sender| sender.send_async(event.clone())); let res = futures::future::join_all(futs).await; - for (i, res) in res.into_iter().enumerate() { + // reverse the order so removing does not shift remaining indices + for (i, res) in res.into_iter().enumerate().rev() { if res.is_err() { self.0.remove(i); } @@ -782,3 +783,21 @@ fn fmt_accept_namespace(res: &Result) -> String { .unwrap_or_else(|| "unknown".to_string()), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_sync_remove() { + let pk = PublicKey::from_bytes(&[1; 32]).unwrap(); + let (a_tx, a_rx) = flume::unbounded(); + let (b_tx, b_rx) = flume::unbounded(); + let mut subscribers = Subscribers::default(); + subscribers.subscribe(a_tx); + subscribers.subscribe(b_tx); + drop(a_rx); + drop(b_rx); + subscribers.send(Event::NeighborUp(pk)).await; + } +}