Skip to content

Commit

Permalink
Merge pull request #28 from rust-amplify/fix/hangup
Browse files Browse the repository at this point in the history
Fix missed service notifications on all resource unregistrations
  • Loading branch information
dr-orlovsky authored Jul 6, 2024
2 parents ba5f1b0 + ceb7c48 commit 745b373
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 36 deletions.
4 changes: 2 additions & 2 deletions src/poller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ impl Display for IoType {
#[derive(Copy, Clone, Debug, Display, Error)]
#[display(doc_comments)]
pub enum IoFail {
/// connection is absent (POSIX events {0:#b})
/// hung up (POSIX events {0:#b})
Connectivity(i16),
/// OS-level error (POSIX events {0:#b})
/// errored (POSIX events {0:#b})
Os(i16),
}

Expand Down
46 changes: 12 additions & 34 deletions src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{io, thread};

use crossbeam_channel as chan;

use crate::poller::{IoFail, IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::poller::{IoType, Poll, Waker, WakerRecv, WakerSend};
use crate::resource::WriteError;
use crate::{Resource, ResourceId, ResourceType, Timer, Timestamp, WriteAtomic};

Expand Down Expand Up @@ -505,11 +505,10 @@ impl<H: Handler, P: Poll> Runtime<H, P> {

/// # Returns
///
/// Whether it was awaken by a waker
/// Whether it was awakened by a waker
fn handle_events(&mut self, time: Timestamp) -> bool {
let mut awoken = false;

let mut unregister_queue = vec![];
while let Some((id, res)) = self.poller.next() {
if id == ResourceId::WAKER {
if let Err(err) = res {
Expand All @@ -536,19 +535,13 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
}
}
}
Err(IoFail::Connectivity(flags)) => {
Err(err) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Listener {id} hung up (OS flags {flags:#b})");

let listener = self.listeners.remove(&id).expect("resource disappeared");
unregister_queue.push(id);
log::trace!(target: "reactor", "Listener {id} {err}");
let listener =
self.unregister_listener(id).expect("listener has disappeared");
self.service.handle_error(Error::ListenerDisconnect(id, listener));
}
Err(IoFail::Os(flags)) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Listener {id} errored (OS flags {flags:#b})");
self.unregister_listener(id);
}
}
} else if self.transports.contains_key(&id) {
match res {
Expand All @@ -563,19 +556,13 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
}
}
}
Err(IoFail::Connectivity(posix_events)) => {
Err(err) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Transport {id} hanged up (POSIX events are {posix_events:#b})");

let transport = self.transports.remove(&id).expect("resource disappeared");
unregister_queue.push(id);
log::trace!(target: "reactor", "Transport {id} {err}");
let transport =
self.unregister_transport(id).expect("transport has disappeared");
self.service.handle_error(Error::TransportDisconnect(id, transport));
}
Err(IoFail::Os(posix_events)) => {
#[cfg(feature = "log")]
log::trace!(target: "reactor", "Transport {id} errored (POSIX events are {posix_events:#b})");
self.unregister_transport(id);
}
}
} else {
panic!(
Expand All @@ -584,11 +571,6 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
}
}

// We need this b/c of borrow checker
for id in unregister_queue {
self.poller.unregister(id);
}

awoken
}

Expand Down Expand Up @@ -708,10 +690,8 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
return None;
};

let fd = listener.as_raw_fd();

#[cfg(feature = "log")]
log::debug!(target: "reactor", "Handling over listener {id} (fd={fd})");
log::debug!(target: "reactor", "Handling over listener {id} (fd={})", listener.as_raw_fd());

self.poller.unregister(id);

Expand All @@ -725,10 +705,8 @@ impl<H: Handler, P: Poll> Runtime<H, P> {
return None;
};

let fd = transport.as_raw_fd();

#[cfg(feature = "log")]
log::debug!(target: "reactor", "Unregistering over transport {id} (fd={fd})");
log::debug!(target: "reactor", "Unregistering over transport {id} (fd={})", transport.as_raw_fd());

self.poller.unregister(id);

Expand Down

0 comments on commit 745b373

Please sign in to comment.