From cbe20a29a21722d8bcca5c6758aac0f78838e88c Mon Sep 17 00:00:00 2001 From: John Nunley Date: Fri, 20 Oct 2023 17:09:59 -0700 Subject: [PATCH 1/4] Handle interrupts while polling Previous, `Poller::wait` would bubble signal interruption error to the user. However, this may be unexpected for simple use cases. Thus, this commit makes it so, if `ErrorKind::Interrupted` is received by the underlying `wait()` call, it clears the events and tries to wait again. This also adds a test for this interruption written by @psychon. Co-Authored-By: Uli Schlachter Signed-off-by: John Nunley --- Cargo.toml | 4 +++ src/lib.rs | 25 +++++++++++------ tests/concurrent_modification.rs | 47 ++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3c3e8db..1dc9022 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,3 +44,7 @@ features = [ [dev-dependencies] easy-parallel = "3.1.0" fastrand = "2.0.0" + +[target.'cfg(unix)'.dev-dependencies] +libc = "0.2" +signal-hook = "0.3.17" diff --git a/src/lib.rs b/src/lib.rs index 9882c8e..a049f79 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -651,14 +651,23 @@ impl Poller { let _enter = span.enter(); if let Ok(_lock) = self.lock.try_lock() { - // Wait for I/O events. - self.poller.wait(&mut events.events, timeout)?; - - // Clear the notification, if any. - self.notified.swap(false, Ordering::SeqCst); - - // Indicate number of events. - Ok(events.len()) + loop { + // Wait for I/O events. + if let Err(e) = self.poller.wait(&mut events.events, timeout) { + // If the wait was interrupted by a signal, clear events and try again. + if e.kind() == io::ErrorKind::Interrupted { + continue; + } else { + return Err(e); + } + } + + // Clear the notification, if any. + self.notified.swap(false, Ordering::SeqCst); + + // Indicate number of events. + return Ok(events.len()); + } } else { tracing::trace!("wait: skipping because another thread is already waiting on I/O"); Ok(0) diff --git a/tests/concurrent_modification.rs b/tests/concurrent_modification.rs index 0797b0f..ab3e5fb 100644 --- a/tests/concurrent_modification.rs +++ b/tests/concurrent_modification.rs @@ -76,6 +76,53 @@ fn concurrent_modify() -> io::Result<()> { Ok(()) } +#[cfg(unix)] +#[test] +fn concurrent_interruption() -> io::Result<()> { + struct MakeItSend(T); + unsafe impl Send for MakeItSend {} + + let (reader, _writer) = tcp_pair()?; + let poller = Poller::new()?; + unsafe { + poller.add(&reader, Event::none(0))?; + } + + let mut events = Events::new(); + let events_borrow = &mut events; + let (sender, receiver) = std::sync::mpsc::channel(); + + Parallel::new() + .add(move || { + // Register a signal handler so that the syscall is actually interrupted. A signal that + // is ignored by default does not cause an interrupted syscall. + signal_hook::flag::register(signal_hook::consts::signal::SIGURG, Default::default())?; + + // Signal to the other thread how to send a signal to us + sender + .send(MakeItSend(unsafe { libc::pthread_self() })) + .unwrap(); + + poller.wait(events_borrow, Some(Duration::from_secs(1)))?; + Ok(()) + }) + .add(move || { + let MakeItSend(target_thread) = receiver.recv().unwrap(); + thread::sleep(Duration::from_millis(100)); + assert_eq!(0, unsafe { + libc::pthread_kill(target_thread, libc::SIGURG) + }); + Ok(()) + }) + .run() + .into_iter() + .collect::>()?; + + assert_eq!(events.len(), 0); + + Ok(()) +} + fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { let listener = TcpListener::bind("127.0.0.1:0")?; let a = TcpStream::connect(listener.local_addr()?)?; From bee0eea550f82720ee7d587feee0bfea60d8e8d4 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Fri, 20 Oct 2023 17:17:14 -0700 Subject: [PATCH 2/4] Forgot to actually clear events Signed-off-by: John Nunley --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index a049f79..e714676 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -656,6 +656,7 @@ impl Poller { if let Err(e) = self.poller.wait(&mut events.events, timeout) { // If the wait was interrupted by a signal, clear events and try again. if e.kind() == io::ErrorKind::Interrupted { + events.clear(); continue; } else { return Err(e); From b6262d9874103db334cab6c64c993b974ff71c10 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 21 Oct 2023 22:16:33 -0700 Subject: [PATCH 3/4] Fix timeouts Signed-off-by: John Nunley --- src/lib.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e714676..d03e9d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,7 @@ use std::marker::PhantomData; use std::num::NonZeroUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Mutex; -use std::time::Duration; +use std::time::{Duration, Instant}; use cfg_if::cfg_if; @@ -646,12 +646,18 @@ impl Poller { /// poller.delete(&socket)?; /// # std::io::Result::Ok(()) /// ``` - pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result { + pub fn wait(&self, events: &mut Events, mut timeout: Option) -> io::Result { let span = tracing::trace_span!("Poller::wait", ?timeout); let _enter = span.enter(); if let Ok(_lock) = self.lock.try_lock() { + let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout)); + loop { + // Figure out how long to wait for. + let timeout = + deadline.map(|deadline| deadline.saturating_duration_since(Instant::now())); + // Wait for I/O events. if let Err(e) = self.poller.wait(&mut events.events, timeout) { // If the wait was interrupted by a signal, clear events and try again. From 069131b72649bac740c29409abbad24f80b39e56 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 22 Oct 2023 11:49:45 -0700 Subject: [PATCH 4/4] Remove unused mut Signed-off-by: John Nunley --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index d03e9d8..4caf19a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -646,7 +646,7 @@ impl Poller { /// poller.delete(&socket)?; /// # std::io::Result::Ok(()) /// ``` - pub fn wait(&self, events: &mut Events, mut timeout: Option) -> io::Result { + pub fn wait(&self, events: &mut Events, timeout: Option) -> io::Result { let span = tracing::trace_span!("Poller::wait", ?timeout); let _enter = span.enter();