From d070937a6c14efb86bdbc8ae54f0a7b03bf542e6 Mon Sep 17 00:00:00 2001 From: jasta Date: Fri, 7 Jul 2023 16:32:58 -0700 Subject: [PATCH] Implement poll-based backend Introduces a new backend for UNIX using the level triggered poll() syscall instead of epoll or kqueue. This support is crucial for embedded systems like the esp32 family but also for alternative operating systems like Haiku. This diff does not introduce any new platform support targets itself but provides the core technical implementation necessary to support these other targets. Future PRs will introduce specific platform support however due to reasons outlined in #1602 (many thanks for this initial effort BTW!) it is not possible to automate tests for those platforms. We will instead rely on the fact that Linux can serve as a proxy to prove that the mio code is working nominally. Note that only Linux has a sufficiently complex implementation to pass all tests. This is due to SIGRDHUP missing on other platforms and is required for about a dozen or so tests that check is_read_closed(). --- .github/workflows/ci.yml | 13 + src/io_source.rs | 10 +- src/poll.rs | 11 +- src/sys/unix/mod.rs | 68 +++- src/sys/unix/selector/mod.rs | 74 ++-- src/sys/unix/selector/poll.rs | 718 ++++++++++++++++++++++++++++++++++ src/sys/unix/waker.rs | 163 ++++++-- tests/aio.rs | 5 +- tests/unix_stream.rs | 2 + 9 files changed, 984 insertions(+), 80 deletions(-) create mode 100644 src/sys/unix/selector/poll.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fe50a343d..4cbb920cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,18 @@ jobs: run: cargo test --all-features - name: Tests release build run: cargo test --release --all-features + TestPoll: + runs-on: ubuntu-latest + timeout-minutes: 10 + env: + RUSTFLAGS="--cfg mio_unsupported_force_poll_poll" + steps: + - uses: actions/checkout@v3 + - uses: dtolnay/rust-toolchain@stable + - name: Tests + run: cargo test --all-features + - name: Tests release build + run: cargo test --release --all-features MinimalVersions: runs-on: ${{ matrix.os }} timeout-minutes: 10 @@ -133,6 +145,7 @@ jobs: runs-on: ubuntu-latest needs: - Test + - TestPoll - MinimalVersions - MSRV - Nightly diff --git a/src/io_source.rs b/src/io_source.rs index 99623c116..cfbcbe99f 100644 --- a/src/io_source.rs +++ b/src/io_source.rs @@ -142,9 +142,7 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.associate(registry)?; - registry - .selector() - .register(self.inner.as_raw_fd(), token, interests) + self.state.register(registry, token, interests, self.inner.as_raw_fd()) } fn reregister( @@ -155,15 +153,13 @@ where ) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.check_association(registry)?; - registry - .selector() - .reregister(self.inner.as_raw_fd(), token, interests) + self.state.reregister(registry, token, interests, self.inner.as_raw_fd()) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { #[cfg(debug_assertions)] self.selector_id.remove_association(registry)?; - registry.selector().deregister(self.inner.as_raw_fd()) + self.state.deregister(registry, self.inner.as_raw_fd()) } } diff --git a/src/poll.rs b/src/poll.rs index 25a273ad2..695bede5d 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -1,5 +1,5 @@ use crate::{event, sys, Events, Interest, Token}; -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use std::{fmt, io}; @@ -411,7 +411,7 @@ impl Poll { } } -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] impl AsRawFd for Poll { fn as_raw_fd(&self) -> RawFd { self.registry.as_raw_fd() @@ -696,7 +696,7 @@ impl fmt::Debug for Registry { } } -#[cfg(unix)] +#[cfg(all(unix, not(mio_unsupported_force_poll_poll)))] impl AsRawFd for Registry { fn as_raw_fd(&self) -> RawFd { self.selector.as_raw_fd() @@ -704,7 +704,10 @@ impl AsRawFd for Registry { } cfg_os_poll! { - #[cfg(unix)] + #[cfg(all( + unix, + not(mio_unsupported_force_poll_poll) + ))] #[test] pub fn as_raw_fd() { let poll = Poll::new().unwrap(); diff --git a/src/sys/unix/mod.rs b/src/sys/unix/mod.rs index 231480a5d..53b579df3 100644 --- a/src/sys/unix/mod.rs +++ b/src/sys/unix/mod.rs @@ -33,25 +33,65 @@ cfg_os_poll! { } cfg_io_source! { - use std::io; - // Both `kqueue` and `epoll` don't need to hold any user space state. - pub(crate) struct IoSourceState; + #[cfg(not(mio_unsupported_force_poll_poll))] + mod stateless_io_source { + use std::io; + use std::os::unix::io::RawFd; + use crate::Registry; + use crate::Token; + use crate::Interest; - impl IoSourceState { - pub fn new() -> IoSourceState { - IoSourceState - } + pub(crate) struct IoSourceState; + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + // We don't hold state, so we can just call the function and + // return. + f(io) + } - pub fn do_io(&self, f: F, io: &T) -> io::Result - where - F: FnOnce(&T) -> io::Result, - { - // We don't hold state, so we can just call the function and - // return. - f(io) + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().register(fd, token, interests) + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().reregister(fd, token, interests) + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + // Pass through, we don't have any state + registry.selector().deregister(fd) + } } } + + #[cfg(not(mio_unsupported_force_poll_poll))] + pub(crate) use self::stateless_io_source::IoSourceState; + + #[cfg(mio_unsupported_force_poll_poll)] + pub(crate) use self::selector::IoSourceState; } cfg_os_ext! { diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 3ccbdeadf..16ff5c010 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -1,40 +1,58 @@ -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ) ))] mod epoll; -#[cfg(any( - target_os = "android", - target_os = "illumos", - target_os = "linux", - target_os = "redox", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "android", + target_os = "illumos", + target_os = "linux", + target_os = "redox", + ) ))] pub(crate) use self::epoll::{event, Event, Events, Selector}; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", +#[cfg(mio_unsupported_force_poll_poll)] +mod poll; + +#[cfg(mio_unsupported_force_poll_poll)] +pub(crate) use self::poll::{event, Event, Events, Selector, IoSourceState}; + +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "tvos", + target_os = "watchos", + ) ))] mod kqueue; -#[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "tvos", - target_os = "watchos", +#[cfg(all( + not(mio_unsupported_force_poll_poll), + any( + target_os = "dragonfly", + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "tvos", + target_os = "watchos", + ), ))] pub(crate) use self::kqueue::{event, Event, Events, Selector}; diff --git a/src/sys/unix/selector/poll.rs b/src/sys/unix/selector/poll.rs new file mode 100644 index 000000000..537031132 --- /dev/null +++ b/src/sys/unix/selector/poll.rs @@ -0,0 +1,718 @@ +// This implementation is based on the one in the `polling` crate. +// Thanks to https://github.com/Kestrer for the original implementation! +// Permission to use this code has been granted by original author: +// https://github.com/tokio-rs/mio/pull/1602#issuecomment-1218441031 + +use crate::{Interest, Token}; +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt::{Debug, Formatter}; +use std::os::unix::io::RawFd; +#[cfg(debug_assertions)] +use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; +use std::{fmt, io}; +use std::os::fd::AsRawFd; +use crate::sys::unix::selector::LOWEST_FD; +use crate::sys::unix::waker::WakerInternal; + +/// Unique id for use as `SelectorId`. +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug)] +pub struct Selector { + state: Arc, + /// Whether this selector currently has an associated waker. + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result { + let state = SelectorState::new()?; + + Ok(Selector { + state: Arc::new(state), + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result { + // Just to keep the compiler happy :) + let _ = LOWEST_FD; + + let state = self.state.clone(); + + Ok(Selector { + state, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + self.state.select(events, timeout) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.register(fd, token, interests) + } + + #[allow(dead_code)] + pub(crate) fn register_internal(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result> { + self.state.register_internal(fd, token, interests) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.state.reregister(fd, token, interests) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.state.deregister(fd) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } + + pub fn wake(&self, token: Token) -> io::Result<()> { + self.state.wake(token) + } +} + +cfg_io_source! { + impl Selector { + #[cfg(debug_assertions)] + pub fn id(&self) -> usize { + self.state.id + } + } +} + +/// Interface to poll. +#[derive(Debug)] +struct SelectorState { + /// File descriptors to poll. + fds: Mutex, + + /// File descriptors which will be removed before the next poll call. + /// + /// When a file descriptor is deregistered while a poll is running, we need to filter + /// out all removed descriptors after that poll is finished running. + pending_removal: Mutex>, + + /// Tokens associated with Wakers that have recently asked to wake. This will + /// cause a synthetic behaviour where on any wakeup we add all pending tokens + /// to the list of emitted events. + pending_wake_tokens: Mutex>, + + /// Data is written to this to wake up the current instance of `wait`, which can occur when the + /// user notifies it (in which case `notified` would have been set) or when an operation needs + /// to occur (in which case `waiting_operations` would have been incremented). + notify_waker: WakerInternal, + + /// The number of operations (`add`, `modify` or `delete`) that are currently waiting on the + /// mutex to become free. When this is nonzero, `wait` must be suspended until it reaches zero + /// again. + waiting_operations: AtomicUsize, + /// The condition variable that gets notified when `waiting_operations` reaches zero or + /// `notified` becomes true. + /// + /// This is used with the `fds` mutex. + operations_complete: Condvar, + + /// This selectors id. + #[cfg(debug_assertions)] + #[allow(dead_code)] + id: usize, +} + +/// The file descriptors to poll in a `Poller`. +#[derive(Debug, Clone)] +struct Fds { + /// The list of `pollfds` taken by poll. + /// + /// The first file descriptor is always present and is used to notify the poller. + poll_fds: Vec, + /// The map of each file descriptor to data associated with it. This does not include the file + /// descriptors created by the internal notify waker. + fd_data: HashMap, +} + +/// Transparent wrapper around `libc::pollfd`, used to support `Debug` derives without adding the +/// `extra_traits` feature of `libc`. +#[repr(transparent)] +#[derive(Clone)] +struct PollFd(libc::pollfd); + +impl Debug for PollFd { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("pollfd") + .field("fd", &self.0.fd) + .field("events", &self.0.events) + .field("revents", &self.0.revents) + .finish() + } +} + +/// Data associated with a file descriptor in a poller. +#[derive(Debug, Clone)] +struct FdData { + /// The index into `poll_fds` this file descriptor is. + poll_fds_index: usize, + /// The key of the `Event` associated with this file descriptor. + token: Token, + /// Used to communicate with IoSourceState when we need to internally deregister + /// based on a closed fd. + shared_record: Arc, +} + +impl SelectorState { + pub fn new() -> io::Result { + let notify_waker = WakerInternal::new()?; + + Ok(Self { + fds: Mutex::new(Fds { + poll_fds: vec![PollFd(libc::pollfd { + fd: notify_waker.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + })], + fd_data: HashMap::new(), + }), + pending_removal: Mutex::new(Vec::new()), + pending_wake_tokens: Mutex::new(Vec::new()), + notify_waker, + waiting_operations: AtomicUsize::new(0), + operations_complete: Condvar::new(), + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + let deadline = timeout.map(|t| Instant::now() + t); + + events.clear(); + + let mut fds = self.fds.lock().unwrap(); + let mut closed_raw_fds = Vec::new(); + + loop { + // Complete all current operations. + loop { + if self.waiting_operations.load(Ordering::SeqCst) == 0 { + break; + } + + fds = self.operations_complete.wait(fds).unwrap(); + } + + // Perform the poll. + log::trace!("Polling on {:?}", fds); + let num_events = poll(&mut fds.poll_fds, deadline)?; + if num_events == 0 && deadline.map(|v| v <= Instant::now()).unwrap_or(false) { + // timeout + return Ok(()); + } + + log::trace!("Poll finished: {:?}", fds); + let notified_events = fds.poll_fds[0].0.revents; + let notified = notified_events != 0; + let mut num_fd_events = if notified { num_events - 1 } else { num_events }; + + let mut pending_wake_tokens_guard = self.pending_wake_tokens.lock().unwrap(); + let pending_wake_tokens = std::mem::replace(pending_wake_tokens_guard.as_mut(), Vec::new()); + drop(pending_wake_tokens_guard); + + if notified { + self.notify_waker.ack_and_reset(); + num_fd_events += pending_wake_tokens.len(); + } + + // We now check whether this poll was performed with descriptors which were pending + // for removal and filter out any matching. + let mut pending_removal_guard = self.pending_removal.lock().unwrap(); + let mut pending_removal = std::mem::replace(pending_removal_guard.as_mut(), Vec::new()); + drop(pending_removal_guard); + + // Store the events if there were any. + if num_fd_events > 0 { + let fds = &mut *fds; + + events.reserve(num_fd_events); + + for pending_wake_token in pending_wake_tokens { + events.push(Event { + token: pending_wake_token, + events: notified_events, + }); + } + + for fd_data in fds.fd_data.values_mut() { + let PollFd(poll_fd) = &mut fds.poll_fds[fd_data.poll_fds_index]; + + if pending_removal.contains(&poll_fd.fd) { + // Fd was removed while poll was running + continue; + } + + if poll_fd.revents != 0 { + // Store event + events.push(Event { + token: fd_data.token, + events: poll_fd.revents, + }); + + if poll_fd.revents & (libc::POLLHUP | libc::POLLERR) != 0 { + pending_removal.push(poll_fd.fd); + closed_raw_fds.push(poll_fd.fd); + } + + // Remove the interest which just got triggered + // the IoSourceState/WakerRegistrar used with this selector will add back + // the interest using reregister. + poll_fd.events &= !poll_fd.revents; + + if events.len() == num_fd_events { + break; + } + } + } + + break; + } + } + + drop(fds); + let _ = self.deregister_all(&closed_raw_fds); + + Ok(()) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.register_internal(fd, token, interests).map(|_| ()) + } + + pub fn register_internal( + &self, + fd: RawFd, + token: Token, + interests: Interest, + ) -> io::Result> { + #[cfg(debug_assertions)] + if fd == self.notify_waker.as_raw_fd() { + return Err(io::Error::from(io::ErrorKind::InvalidInput)); + } + + // We must handle the unlikely case that the following order of operations happens: + // + // register(1 as RawFd) + // deregister(1 as RawFd) + // register(1 as RawFd) + // + // + // Fd's pending removal only get cleared when poll has been run. It is possible that + // between registering and deregistering and then _again_ registering the file descriptor + // poll never gets called, thus the fd stays stuck in the pending removal list. + // + // To avoid this scenario we remove an fd from pending removals when registering it. + let mut pending_removal = self.pending_removal.lock().unwrap(); + if let Some(idx) = pending_removal.iter().position(|&pending| pending == fd) { + pending_removal.remove(idx); + } + drop(pending_removal); + + self.modify_fds(|fds| { + if fds.fd_data.contains_key(&fd) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + "\ + same file descriptor registered twice for polling \ + (an old file descriptor might have been closed without deregistration)\ + ", + )); + } + + let poll_fds_index = fds.poll_fds.len(); + let record = Arc::new(RegistrationRecord::new()); + fds.fd_data.insert( + fd, + FdData { + poll_fds_index, + token, + shared_record: record.clone(), + }, + ); + + fds.poll_fds.push(PollFd(libc::pollfd { + fd, + events: interests_to_poll(interests), + revents: 0, + })); + + Ok(record) + }) + } + + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + self.modify_fds(|fds| { + let data = fds.fd_data.get_mut(&fd).ok_or(io::ErrorKind::NotFound)?; + data.token = token; + let poll_fds_index = data.poll_fds_index; + fds.poll_fds[poll_fds_index].0.events = interests_to_poll(interests); + + Ok(()) + }) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + self.deregister_all(&[fd]) + .map_err(|_| io::ErrorKind::NotFound)?; + Ok(()) + } + + /// Perform a modification on `fds`, interrupting the current caller of `wait` if it's running. + fn modify_fds(&self, f: impl FnOnce(&mut Fds) -> T) -> T { + self.waiting_operations.fetch_add(1, Ordering::SeqCst); + + // Wake up the current caller of `wait` if there is one. + let sent_notification = self.notify_waker.wake().is_ok(); + + let mut fds = self.fds.lock().unwrap(); + + // If there was no caller of `wait` our notification was not removed from the pipe. + if sent_notification { + self.notify_waker.ack_and_reset(); + } + + let res = f(&mut *fds); + + if self.waiting_operations.fetch_sub(1, Ordering::SeqCst) == 1 { + self.operations_complete.notify_one(); + } + + res + } + + /// Special optimized version of [Self::deregister] which handles multiple removals + /// at once. Ok result if all removals were performed, Err if any entries + /// were not found. + fn deregister_all(&self, targets: &[RawFd]) -> Result<(), ()> { + if targets.is_empty() { + return Ok(()); + } + + let mut pending_removal = self.pending_removal.lock().unwrap(); + pending_removal.extend(targets); + drop(pending_removal); + + self.modify_fds(|fds| { + for target in targets { + let data = fds.fd_data.remove(&target).ok_or(())?; + data.shared_record.mark_unregistered(); + fds.poll_fds.swap_remove(data.poll_fds_index); + if let Some(swapped_pollfd) = fds.poll_fds.get(data.poll_fds_index) { + fds.fd_data + .get_mut(&swapped_pollfd.0.fd) + .unwrap() + .poll_fds_index = data.poll_fds_index; + } + } + + Ok(()) + }) + } + + pub fn wake(&self, token: Token) -> io::Result<()> { + let mut pending_wake_tokens = self.pending_wake_tokens.lock().unwrap(); + pending_wake_tokens.push(token); + drop(pending_wake_tokens); + self.notify_waker.wake() + } +} + +/// Shared record between IoSourceState and SelectorState that allows us to internally +/// deregister partially or fully closed fds (i.e. when we get POLLHUP or PULLERR) without +/// confusing IoSourceState and trying to deregister twice. This isn't strictly +/// required as technically deregister is idempotent but it is confusing +/// when trying to debug behaviour as we get imbalanced calls to register/deregister and +/// superfluous NotFound errors. +#[derive(Debug)] +pub(crate) struct RegistrationRecord { + is_unregistered: AtomicBool, +} + +impl RegistrationRecord { + pub fn new() -> Self { + Self { is_unregistered: AtomicBool::new(false) } + } + + pub fn mark_unregistered(&self) { + self.is_unregistered.store(true, Ordering::Relaxed); + } + + #[allow(dead_code)] + pub fn is_registered(&self) -> bool { + !self.is_unregistered.load(Ordering::Relaxed) + } +} + +#[cfg(target_os = "linux")] +const POLLRDHUP: libc::c_short = libc::POLLRDHUP; +#[cfg(not(target_os = "linux"))] +const POLLRDHUP: libc::c_short = 0; + +const READ_EVENTS: libc::c_short = libc::POLLIN | POLLRDHUP; + +const WRITE_EVENTS: libc::c_short = libc::POLLOUT; + +const PRIORITY_EVENTS: libc::c_short = libc::POLLPRI; + +/// Get the input poll events for the given event. +fn interests_to_poll(interest: Interest) -> libc::c_short { + let mut kind = 0; + + if interest.is_readable() { + kind |= READ_EVENTS; + } + + if interest.is_writable() { + kind |= WRITE_EVENTS; + } + + if interest.is_priority() { + kind |= PRIORITY_EVENTS; + } + + kind +} + +/// Helper function to call poll. +fn poll(fds: &mut [PollFd], deadline: Option) -> io::Result { + loop { + // Convert the timeout to milliseconds. + let timeout_ms = deadline + .map(|deadline| { + let timeout = deadline.saturating_duration_since(Instant::now()); + + // Round up to a whole millisecond. + let mut ms = timeout.as_millis().try_into().unwrap_or(u64::MAX); + if Duration::from_millis(ms) < timeout { + ms = ms.saturating_add(1); + } + ms.try_into().unwrap_or(i32::MAX) + }) + .unwrap_or(-1); + + let res = syscall!(poll( + fds.as_mut_ptr() as *mut libc::pollfd, + fds.len() as libc::nfds_t, + timeout_ms, + )); + + match res { + Ok(num_events) => break Ok(num_events as usize), + // poll returns EAGAIN if we can retry it. + Err(e) if e.raw_os_error() == Some(libc::EAGAIN) => continue, + Err(e) => return Err(e), + } + } +} + +#[derive(Debug, Clone)] +pub struct Event { + token: Token, + events: libc::c_short, +} + +pub type Events = Vec; + +pub mod event { + use crate::sys::Event; + use crate::Token; + use std::fmt; + use crate::sys::unix::selector::poll::POLLRDHUP; + + pub fn token(event: &Event) -> Token { + event.token + } + + pub fn is_readable(event: &Event) -> bool { + (event.events & libc::POLLIN) != 0 || (event.events & libc::POLLPRI) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + (event.events & libc::POLLOUT) != 0 + } + + pub fn is_error(event: &Event) -> bool { + (event.events & libc::POLLERR) != 0 + } + + pub fn is_read_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Socket has received FIN or called shutdown(SHUT_RD) + || (event.events & POLLRDHUP) != 0 + } + + pub fn is_write_closed(event: &Event) -> bool { + // Both halves of the socket have closed + (event.events & libc::POLLHUP) != 0 + // Unix pipe write end has closed + || ((event.events & libc::POLLOUT) != 0 && (event.events & libc::POLLERR) != 0) + // The other side (read end) of a Unix pipe has closed. + || (event.events == libc::POLLERR) + } + + pub fn is_priority(event: &Event) -> bool { + (event.events & libc::POLLPRI) != 0 + } + + pub fn is_aio(_: &Event) -> bool { + // Not supported in the kernel, only in libc. + false + } + + pub fn is_lio(_: &Event) -> bool { + // Not supported. + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + #[allow(clippy::trivially_copy_pass_by_ref)] + fn check_events(got: &libc::c_short, want: &libc::c_short) -> bool { + (*got & want) != 0 + } + debug_detail!( + EventsDetails(libc::c_short), + check_events, + libc::POLLIN, + libc::POLLPRI, + libc::POLLOUT, + libc::POLLRDNORM, + libc::POLLRDBAND, + libc::POLLWRNORM, + libc::POLLWRBAND, + libc::POLLERR, + libc::POLLHUP, + ); + + // Can't reference fields in packed structures. + let e_u64 = event.token.0; + f.debug_struct("epoll_event") + .field("events", &EventsDetails(event.events)) + .field("u64", &e_u64) + .finish() + } +} + +cfg_io_source! { + use crate::Registry; + + struct InternalState { + selector: Selector, + token: Token, + interests: Interest, + fd: RawFd, + shared_record: Arc, + } + + impl Drop for InternalState { + fn drop(&mut self) { + if self.shared_record.is_registered() { + let _ = self.selector.deregister(self.fd); + } + } + } + + pub(crate) struct IoSourceState { + inner: Option>, + } + + impl IoSourceState { + pub fn new() -> IoSourceState { + IoSourceState { inner: None } + } + + pub fn do_io(&self, f: F, io: &T) -> io::Result + where + F: FnOnce(&T) -> io::Result, + { + let result = f(io); + + if let Err(err) = &result { + if err.kind() == io::ErrorKind::WouldBlock { + self.inner.as_ref().map_or(Ok(()), |state| { + state + .selector + .reregister(state.fd, state.token, state.interests) + })?; + } + } + + result + } + + pub fn register( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + if self.inner.is_some() { + Err(io::ErrorKind::AlreadyExists.into()) + } else { + let selector = registry.selector().try_clone()?; + + selector.register_internal(fd, token, interests).map(move |shared_record| { + let state = InternalState { + selector, + token, + interests, + fd, + shared_record, + }; + + self.inner = Some(Box::new(state)); + }) + } + } + + pub fn reregister( + &mut self, + registry: &Registry, + token: Token, + interests: Interest, + fd: RawFd, + ) -> io::Result<()> { + match self.inner.as_mut() { + Some(state) => registry + .selector() + .reregister(fd, token, interests) + .map(|()| { + state.token = token; + state.interests = interests; + }), + None => Err(io::ErrorKind::NotFound.into()), + } + } + + pub fn deregister(&mut self, registry: &Registry, fd: RawFd) -> io::Result<()> { + if let Some(state) = self.inner.take() { + // Marking unregistered will short circuit the drop behaviour of calling + // deregister so the call to deregister below is strictly required. + state.shared_record.mark_unregistered(); + } + + registry.selector().deregister(fd) + } + } +} \ No newline at end of file diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index 65002d690..e22b8ce8a 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -1,13 +1,77 @@ #[cfg(all( - not(mio_unsupported_force_waker_pipe), - any(target_os = "linux", target_os = "android") + not(mio_unsupported_force_poll_poll), + not(all( + not(mio_unsupported_force_waker_pipe), + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + ) + )) ))] -mod eventfd { +mod fdbased { + use std::io; + use std::os::fd::AsRawFd; use crate::sys::Selector; + #[cfg(all( + not(mio_unsupported_force_waker_pipe), + any(target_os = "linux", target_os = "android"), + ))] + use crate::sys::unix::waker::eventfd::WakerInternal; + #[cfg(any( + mio_unsupported_force_waker_pipe, + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + ))] + use crate::sys::unix::waker::pipe::WakerInternal; use crate::{Interest, Token}; + #[derive(Debug)] + pub struct Waker { + waker: WakerInternal, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result { + let waker = WakerInternal::new()?; + selector.register(waker.as_raw_fd(), token, Interest::READABLE)?; + Ok(Waker { waker }) + } + + pub fn wake(&self) -> io::Result<()> { + self.waker.wake() + } + } +} + +#[cfg(all( + not(mio_unsupported_force_poll_poll), + not(all( + not(mio_unsupported_force_waker_pipe), + any( + target_os = "freebsd", + target_os = "ios", + target_os = "macos", + target_os = "tvos", + target_os = "watchos", + ) + )) +))] +pub use self::fdbased::Waker; + +#[cfg(all( + not(mio_unsupported_force_waker_pipe), + any(target_os = "linux", target_os = "android") +))] +mod eventfd { use std::fs::File; use std::io::{self, Read, Write}; + use std::os::fd::{AsRawFd, RawFd}; use std::os::unix::io::FromRawFd; /// Waker backed by `eventfd`. @@ -17,17 +81,15 @@ mod eventfd { /// unsigned integer and added to the count. Reads must also be 8 bytes and /// reset the count to 0, returning the count. #[derive(Debug)] - pub struct Waker { + pub struct WakerInternal { fd: File, } - impl Waker { - pub fn new(selector: &Selector, token: Token) -> io::Result { + impl WakerInternal { + pub fn new() -> io::Result { let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; let file = unsafe { File::from_raw_fd(fd) }; - - selector.register(fd, token, Interest::READABLE)?; - Ok(Waker { fd: file }) + Ok(WakerInternal { fd: file }) } pub fn wake(&self) -> io::Result<()> { @@ -44,6 +106,11 @@ mod eventfd { } } + #[cfg(mio_unsupported_force_poll_poll)] + pub fn ack_and_reset(&self) { + let _ = self.reset(); + } + /// Reset the eventfd object, only need to call this if `wake` fails. fn reset(&self) -> io::Result<()> { let mut buf: [u8; 8] = 0u64.to_ne_bytes(); @@ -56,13 +123,20 @@ mod eventfd { } } } + + impl AsRawFd for WakerInternal { + fn as_raw_fd(&self) -> RawFd { + self.fd.as_raw_fd() + } + } } #[cfg(all( + mio_unsupported_force_poll_poll, not(mio_unsupported_force_waker_pipe), any(target_os = "linux", target_os = "android") ))] -pub use self::eventfd::Waker; +pub(crate) use self::eventfd::WakerInternal; #[cfg(all( not(mio_unsupported_force_waker_pipe), @@ -126,11 +200,9 @@ pub use self::kqueue::Waker; target_os = "redox", ))] mod pipe { - use crate::sys::unix::Selector; - use crate::{Interest, Token}; - use std::fs::File; use std::io::{self, Read, Write}; + use std::os::fd::{AsRawFd, RawFd}; use std::os::unix::io::FromRawFd; /// Waker backed by a unix pipe. @@ -138,20 +210,19 @@ mod pipe { /// Waker controls both the sending and receiving ends and empties the pipe /// if writing to it (waking) fails. #[derive(Debug)] - pub struct Waker { + pub struct WakerInternal { sender: File, receiver: File, } - impl Waker { - pub fn new(selector: &Selector, token: Token) -> io::Result { + impl WakerInternal { + pub fn new() -> io::Result { let mut fds = [-1; 2]; syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; let sender = unsafe { File::from_raw_fd(fds[1]) }; let receiver = unsafe { File::from_raw_fd(fds[0]) }; - selector.register(fds[0], token, Interest::READABLE)?; - Ok(Waker { sender, receiver }) + Ok(WakerInternal { sender, receiver }) } pub fn wake(&self) -> io::Result<()> { @@ -174,6 +245,11 @@ mod pipe { } } + #[cfg(mio_unsupported_force_poll_poll)] + pub fn ack_and_reset(&self) { + self.empty(); + } + /// Empty the pipe's buffer, only need to call this if `wake` fails. /// This ignores any errors. fn empty(&self) { @@ -186,14 +262,49 @@ mod pipe { } } } + + impl AsRawFd for WakerInternal { + fn as_raw_fd(&self) -> RawFd { + self.receiver.as_raw_fd() + } + } } -#[cfg(any( - mio_unsupported_force_waker_pipe, - target_os = "dragonfly", - target_os = "illumos", - target_os = "netbsd", - target_os = "openbsd", - target_os = "redox", +#[cfg(all( + mio_unsupported_force_poll_poll, + any( + mio_unsupported_force_waker_pipe, + target_os = "dragonfly", + target_os = "illumos", + target_os = "netbsd", + target_os = "openbsd", + target_os = "redox", + ) ))] -pub use self::pipe::Waker; +pub(crate) use self::pipe::WakerInternal; + +#[cfg(mio_unsupported_force_poll_poll)] +mod poll { + use std::io; + use crate::sys::Selector; + use crate::Token; + + #[derive(Debug)] + pub struct Waker { + selector: Selector, + token: Token, + } + + impl Waker { + pub fn new(selector: &Selector, token: Token) -> io::Result { + Ok(Waker { selector: selector.try_clone()?, token }) + } + + pub fn wake(&self) -> io::Result<()> { + self.selector.wake(self.token) + } + } +} + +#[cfg(mio_unsupported_force_poll_poll)] +pub use self::poll::Waker; diff --git a/tests/aio.rs b/tests/aio.rs index b8c1b47b0..19ea02003 100644 --- a/tests/aio.rs +++ b/tests/aio.rs @@ -1,4 +1,7 @@ -#![cfg(any(target_os = "freebsd", target_os = "dragonfly"))] +#![cfg(all( + not(mio_unsupported_force_poll_poll), + any(target_os = "freebsd", target_os = "dragonfly"), +))] #![cfg(all(feature = "os-poll", feature = "net"))] use mio::{event::Source, Events, Interest, Poll, Registry, Token}; diff --git a/tests/unix_stream.rs b/tests/unix_stream.rs index fcab0a057..babf18df9 100644 --- a/tests/unix_stream.rs +++ b/tests/unix_stream.rs @@ -451,6 +451,8 @@ where assert!(stream.take_error().unwrap().is_none()); + assert_would_block(stream.read(&mut buf)); + let bufs = [IoSlice::new(DATA1), IoSlice::new(DATA2)]; let wrote = stream.write_vectored(&bufs).unwrap(); assert_eq!(wrote, DATA1_LEN + DATA2_LEN);