From 7789de077ca4d0f0d12f5dcdbf9551216be43b90 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 15 Jan 2024 07:49:27 -0800 Subject: [PATCH] feat: Add Windows support This commit adds Windows support to this package. Since polling already has Windows support through IOCP, the main obstacle was adding a ping event source using IOCP. The hardest part is emulating a pipe using some shared state and a posted completion packet. Fixes #160 Signed-off-by: John Nunley --- .github/workflows/ci.yml | 29 ++++ Cargo.toml | 6 +- src/io.rs | 67 ++++++-- src/loop_logic.rs | 34 +++- src/sources/generic.rs | 52 +++++-- src/sources/ping.rs | 9 +- src/sources/ping/iocp.rs | 328 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 498 insertions(+), 27 deletions(-) create mode 100644 src/sources/ping/iocp.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 372a3a67..fa74468a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,6 +150,35 @@ jobs: command: test args: --all-features --manifest-path ./doc/Cargo.toml + ci-windows: + name: CI (Windows) + + needs: + - lint + + strategy: + fail-fast: false + matrix: + rust: ['1.63.0', 'stable'] + + runs-on: 'windows-latest' + + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + override: true + + - name: Run tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --features "block_on executor" + coverage: name: Coverage diff --git a/Cargo.toml b/Cargo.toml index 07b8a3c1..ccca3796 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,13 +23,15 @@ async-task = { version = "4.4.0", optional = true } bitflags = "2.4" futures-io = { version = "0.3.5", optional = true } log = "0.4" -nix = { version = "0.26", default-features = false, features = ["signal"], optional = true } pin-utils = { version = "0.1.0", optional = true } polling = "3.0.0" -rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] } slab = "0.4.8" +rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] } thiserror = "1.0.7" +[target.'cfg(unix)'.dependencies] +nix = { version = "0.26", default-features = false, features = ["signal"], optional = true } + [dev-dependencies] futures = "0.3.5" rustix = { version = "0.38", default-features = false, features = ["net"] } diff --git a/src/io.rs b/src/io.rs index 5b37e331..24f76410 100644 --- a/src/io.rs +++ b/src/io.rs @@ -7,12 +7,16 @@ //! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io use std::cell::RefCell; -use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll as TaskPoll, Waker}; -use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags}; +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{ + AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd, +}; #[cfg(feature = "futures-io")] use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; @@ -33,11 +37,18 @@ use crate::{AdditionalLifecycleEventsSet, RegistrationToken}; /// `AsyncWrite` if the underlying type implements `Read` and/or `Write`. /// /// Note that this adapter and the futures procuded from it and *not* threadsafe. +/// +/// ## Platform-Specific +/// +/// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status. +/// For example, if the file was previously nonblocking it will be set to nonblocking, and +/// if the file was blocking it will be set to blocking. However, on Windows, it is impossible +/// to tell what its status was before. Therefore it will always be set to blocking. pub struct Async<'l, F: AsFd> { fd: Option, dispatcher: Rc>, inner: Rc, - old_flags: OFlags, + was_nonblocking: bool, } impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> { @@ -50,11 +61,19 @@ impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> { impl<'l, F: AsFd> Async<'l, F> { pub(crate) fn new(inner: Rc>, fd: F) -> crate::Result> { // set non-blocking - let old_flags = fcntl_getfl(&fd).map_err(std::io::Error::from)?; - fcntl_setfl(&fd, old_flags | OFlags::NONBLOCK).map_err(std::io::Error::from)?; + let was_nonblocking = set_nonblocking( + #[cfg(unix)] + fd.as_fd(), + #[cfg(windows)] + fd.as_socket(), + true, + )?; // register in the loop let dispatcher = Rc::new(RefCell::new(IoDispatcher { + #[cfg(unix)] fd: fd.as_fd().as_raw_fd(), + #[cfg(windows)] + fd: fd.as_socket().as_raw_socket(), token: None, waker: None, is_registered: false, @@ -83,7 +102,7 @@ impl<'l, F: AsFd> Async<'l, F> { fd: Some(fd), dispatcher, inner, - old_flags, + was_nonblocking, }) } @@ -165,9 +184,9 @@ impl<'l, F: AsFd> Drop for Async<'l, F> { fn drop(&mut self) { self.inner.kill(&self.dispatcher); // restore flags - let _ = fcntl_setfl( + let _ = set_nonblocking( unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) }, - self.old_flags, + self.was_nonblocking, ); } } @@ -358,7 +377,37 @@ impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> { } } -#[cfg(all(test, feature = "executor", feature = "futures-io"))] +// https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195 +/// Set the nonblocking status of an FD and return whether it was nonblocking before. +#[allow(clippy::needless_return)] +#[inline] +fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result { + #[cfg(windows)] + { + rustix::io::ioctl_fionbio(fd, is_nonblocking)?; + + // Unfortunately it is impossible to tell if a socket was nonblocking on Windows. + // Just say it wasn't for now. + return Ok(false); + } + + #[cfg(not(windows))] + { + let previous = rustix::fs::fcntl_getfl(fd)?; + let new = if is_nonblocking { + previous | rustix::fs::OFlags::NONBLOCK + } else { + previous & !(rustix::fs::OFlags::NONBLOCK) + }; + if new != previous { + rustix::fs::fcntl_setfl(fd, new)?; + } + + return Ok(previous.contains(rustix::fs::OFlags::NONBLOCK)); + } +} + +#[cfg(all(test, unix, feature = "executor", feature = "futures-io"))] mod tests { use futures::io::{AsyncReadExt, AsyncWriteExt}; diff --git a/src/loop_logic.rs b/src/loop_logic.rs index 58bf4071..1b246814 100644 --- a/src/loop_logic.rs +++ b/src/loop_logic.rs @@ -1,6 +1,5 @@ use std::cell::{Cell, RefCell}; use std::fmt::Debug; -use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -10,6 +9,11 @@ use std::{io, slice}; #[cfg(feature = "block_on")] use std::future::Future; +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle}; + use log::trace; use polling::Poller; @@ -287,6 +291,7 @@ impl<'l, Data> LoopHandle<'l, Data> { /// /// This loop can host several event sources, that can be dynamically added or removed. pub struct EventLoop<'l, Data> { + #[allow(dead_code)] poller: Arc, handle: LoopHandle<'l, Data>, signals: Arc, @@ -664,6 +669,7 @@ impl<'l, Data> EventLoop<'l, Data> { } } +#[cfg(unix)] impl<'l, Data> AsRawFd for EventLoop<'l, Data> { /// Get the underlying raw-fd of the poller. /// @@ -677,6 +683,7 @@ impl<'l, Data> AsRawFd for EventLoop<'l, Data> { } } +#[cfg(unix)] impl<'l, Data> AsFd for EventLoop<'l, Data> { /// Get the underlying fd of the poller. /// @@ -689,6 +696,20 @@ impl<'l, Data> AsFd for EventLoop<'l, Data> { } } +#[cfg(windows)] +impl AsRawHandle for EventLoop<'_, Data> { + fn as_raw_handle(&self) -> RawHandle { + self.poller.as_raw_handle() + } +} + +#[cfg(windows)] +impl AsHandle for EventLoop<'_, Data> { + fn as_handle(&self) -> BorrowedHandle<'_> { + self.poller.as_handle() + } +} + #[derive(Clone, Debug)] /// The EventIterator is an `Iterator` over the events relevant to a particular source /// This type is used in the [`EventSource::before_handle_events`] methods for @@ -761,12 +782,14 @@ mod tests { use crate::{ channel::{channel, Channel}, - generic::Generic, ping::*, - Dispatcher, EventIterator, EventSource, Interest, Mode, Poll, PostAction, Readiness, - RegistrationToken, Token, TokenFactory, + EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token, + TokenFactory, }; + #[cfg(unix)] + use crate::{generic::Generic, Dispatcher, Interest, Mode}; + use super::EventLoop; #[test] @@ -1127,6 +1150,7 @@ mod tests { } } + #[cfg(unix)] #[test] fn insert_bad_source() { use std::os::unix::io::FromRawFd; @@ -1153,6 +1177,7 @@ mod tests { assert!(ret.is_err()); } + #[cfg(unix)] #[test] fn insert_source_no_interest() { use rustix::pipe::pipe; @@ -1310,6 +1335,7 @@ mod tests { assert_eq!(dispatched, 3); } + #[cfg(unix)] #[test] fn change_interests() { use rustix::io::write; diff --git a/src/sources/generic.rs b/src/sources/generic.rs index 63d94b2e..4a76051f 100644 --- a/src/sources/generic.rs +++ b/src/sources/generic.rs @@ -7,7 +7,8 @@ //! notification itself, and the monitored object is provided to your callback as the second //! argument. //! -//! ``` +#![cfg_attr(unix, doc = "```")] +#![cfg_attr(not(unix), doc = "```no_run")] //! # extern crate calloop; //! use calloop::{generic::Generic, Interest, Mode, PostAction}; //! @@ -15,7 +16,10 @@ //! # let mut event_loop = calloop::EventLoop::<()>::try_new() //! # .expect("Failed to initialize the event loop!"); //! # let handle = event_loop.handle(); +//! # #[cfg(unix)] //! # let io_object = std::io::stdin(); +//! # #[cfg(windows)] +//! # let io_object: std::net::TcpStream = panic!(); //! handle.insert_source( //! // wrap your IO object in a Generic, here we register for read readiness //! // in level-triggering mode @@ -38,12 +42,13 @@ //! [`EventSource`](crate::EventSource) implementation to them. use polling::Poller; -use std::{ - borrow, - marker::PhantomData, - ops, - os::unix::io::{AsFd, AsRawFd, BorrowedFd}, - sync::Arc, +use std::{borrow, marker::PhantomData, ops, sync::Arc}; + +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd}; +#[cfg(windows)] +use std::os::windows::io::{ + AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, }; use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory}; @@ -79,9 +84,15 @@ impl ops::DerefMut for FdWrapper { } impl AsFd for FdWrapper { + #[cfg(unix)] fn as_fd(&self) -> BorrowedFd { unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) } } + + #[cfg(windows)] + fn as_socket(&self) -> BorrowedFd { + unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) } + } } /// A wrapper around a type that doesn't expose it mutably safely. @@ -134,10 +145,17 @@ impl ops::Deref for NoIoDrop { } impl AsFd for NoIoDrop { + #[cfg(unix)] fn as_fd(&self) -> BorrowedFd<'_> { // SAFETY: The innter type is not mutated. self.0.as_fd() } + + #[cfg(windows)] + fn as_socket(&self) -> BorrowedFd<'_> { + // SAFETY: The innter type is not mutated. + self.0.as_socket() + } } /// A generic event source wrapping a FD-backed type @@ -199,7 +217,14 @@ impl Generic { // Remove it from the poller. if let Some(poller) = self.poller.take() { - poller.delete(file.as_fd()).ok(); + poller + .delete( + #[cfg(unix)] + file.as_fd(), + #[cfg(windows)] + file.as_socket(), + ) + .ok(); } file @@ -226,7 +251,14 @@ impl Drop for Generic { fn drop(&mut self) { // Remove it from the poller. if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) { - poller.delete(file.as_fd()).ok(); + poller + .delete( + #[cfg(unix)] + file.as_fd(), + #[cfg(windows)] + file.as_socket(), + ) + .ok(); } } } @@ -307,7 +339,7 @@ where } } -#[cfg(test)] +#[cfg(all(unix, test))] mod tests { use std::io::{Read, Write}; diff --git a/src/sources/ping.rs b/src/sources/ping.rs index b8683327..f986a83d 100644 --- a/src/sources/ping.rs +++ b/src/sources/ping.rs @@ -22,9 +22,14 @@ mod eventfd; #[cfg(target_os = "linux")] use eventfd as platform; -#[cfg(not(target_os = "linux"))] +#[cfg(windows)] +mod iocp; +#[cfg(windows)] +use iocp as platform; + +#[cfg(not(any(target_os = "linux", windows)))] mod pipe; -#[cfg(not(target_os = "linux"))] +#[cfg(not(any(target_os = "linux", windows)))] use pipe as platform; /// Create a new ping event source diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs new file mode 100644 index 00000000..16fefa88 --- /dev/null +++ b/src/sources/ping/iocp.rs @@ -0,0 +1,328 @@ +//! IOCP-based implementation of the ping event source. +//! +//! The underlying `Poller` can be woken up at any time, using the `post` method +//! to send an arbitrary packet to the I/O completion port. The complication is +//! emulating a pipe. +//! +//! Since `Poller` is already wrapped in an `Arc`, we can clone it into some +//! synchronized inner state to send a pre-determined packet into it. Thankfully +//! calloop's use of the pipe is constrained enough that we can implement it using +//! a simple bool to keep track of whether or not it is notified. + +use crate::sources::EventSource; + +use polling::os::iocp::{CompletionPacket, PollerIocpExt}; +use polling::Poller; + +use std::fmt; +use std::io; +use std::sync::{Arc, Mutex, TryLockError}; + +#[inline] +pub fn make_ping() -> io::Result<(Ping, PingSource)> { + let state = Arc::new(State { + counter: Mutex::new(Counter { + notified: false, + poll_state: None, + }), + }); + + Ok(( + Ping { + state: state.clone(), + }, + PingSource { state }, + )) +} + +/// The event to trigger. +#[derive(Clone)] +pub struct Ping { + state: Arc, +} + +impl fmt::Debug for Ping { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_ping(&self.state, "Ping", f) + } +} + +/// The event source. +pub struct PingSource { + state: Arc, +} + +impl fmt::Debug for PingSource { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_ping(&self.state, "PingSource", f) + } +} + +impl Ping { + /// Send a ping to the `PingSource`. + pub fn ping(&self) { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Indicate that we are now notified. + counter.notified = true; + + let poll_state = match &mut counter.poll_state { + Some(ps) => ps, + None => { + log::warn!("[calloop] ping was not registered with the event loop"); + return; + } + }; + + // If we aren't currently inserted in the loop, send our packet. + if let Err(e) = poll_state.notify() { + log::warn!("[calloop] failed to post packet to IOCP: {}", e); + } + } +} + +impl Drop for Ping { + fn drop(&mut self) { + // If this is the last ping, wake up the source so it removes itself. + if Arc::strong_count(&self.state) <= 2 { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + if let Some(poll_state) = &mut counter.poll_state { + if let Err(e) = poll_state.notify() { + log::warn!("[calloop] failed to post packet to IOCP during drop: {}", e); + } + } + } + } +} + +impl EventSource for PingSource { + type Error = super::PingError; + type Event = (); + type Metadata = (); + type Ret = (); + + fn process_events( + &mut self, + _readiness: crate::Readiness, + token: crate::Token, + mut callback: F, + ) -> Result + where + F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // If we aren't registered, break out. + let poll_state = match &mut counter.poll_state { + Some(ps) => ps, + None => { + // We were deregistered; indicate to the higher level loop. + return Ok(crate::PostAction::Disable); + } + }; + + // We are no longer inserted into the poller. + poll_state.inserted = false; + + // Make sure this is our token. + let token: usize = token.inner.into(); + if poll_state.packet.event().key != token { + log::warn!( + "[calloop] token does not match; expected {:x}, got {:x}", + poll_state.packet.event().key, + token + ); + return Ok(crate::PostAction::Continue); + } + + // Tell if we are registered. + if counter.notified { + counter.notified = false; + + // Call the callback. + callback((), &mut ()); + } + + // Stop looping if all of the Ping's have been dropped. + let action = if Arc::strong_count(&self.state) <= 1 { + crate::PostAction::Remove + } else { + crate::PostAction::Continue + }; + + Ok(action) + } + + fn register( + &mut self, + poll: &mut crate::Poll, + token_factory: &mut crate::TokenFactory, + ) -> crate::Result<()> { + let token = token_factory.token(); + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Make sure we haven't already been registered. + if counter.poll_state.is_some() { + return Err(io::Error::from(io::ErrorKind::AlreadyExists).into()); + } + + // Create the event to send. + let packet = { + let token = token.inner.into(); + let event = polling::Event::readable(token); + CompletionPacket::new(event) + }; + + // Create the poll state. + let poll_state = PollState::new(poll.poller(), packet, counter.notified)?; + + // Substitute it into our poll state. + counter.poll_state = Some(poll_state); + Ok(()) + } + + fn reregister( + &mut self, + poll: &mut crate::Poll, + token_factory: &mut crate::TokenFactory, + ) -> crate::Result<()> { + let token = token_factory.token(); + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Make sure that the poller has been registered. + let poll_state = match &mut counter.poll_state { + Some(ps) => ps, + None => return Err(io::Error::from(io::ErrorKind::NotFound).into()), + }; + + // If it's a different poller, throw an error. + if !Arc::ptr_eq(&poll_state.poller, poll.poller()) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "attempted to reregister() a PingSource with a different poller", + ) + .into()); + } + + // Change the token if needed. + let token = token.inner.into(); + let event = polling::Event::readable(token); + + if event.key != poll_state.packet.event().key { + poll_state.packet = CompletionPacket::new(event); + + if poll_state.inserted { + poll_state.inserted = false; + poll_state.notify()?; + } + } + + Ok(()) + } + + fn unregister(&mut self, _poll: &mut crate::Poll) -> crate::Result<()> { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Remove our current registration. + if counter.poll_state.take().is_none() { + log::trace!("[calloop] unregistered a source that wasn't registered"); + } + Ok(()) + } +} + +/// Inner state of the pipe. +struct State { + /// The counter used to keep track of our state. + counter: Mutex, +} + +/// Inner counter of the pipe. +struct Counter { + /// Are we notified? + notified: bool, + + /// The `Poller`-related state. + /// + /// This is `None` if we aren't inserted into the `Poller` yet. + poll_state: Option, +} + +/// The `Poller` combined with some associated state. +struct PollState { + /// The `Poller` that we are registered in. + poller: Arc, + + /// Are we inserted into the poller? + inserted: bool, + + /// The completion packet to send. + packet: CompletionPacket, +} + +impl PollState { + /// Create a new `PollState` based on the `Poller` and the `packet`. + /// + /// If `notified` is `true`, a packet is inserted into the poller. + fn new(poller: &Arc, packet: CompletionPacket, notified: bool) -> io::Result { + let mut poll_state = Self { + poller: poller.clone(), + packet, + inserted: false, + }; + + if notified { + poll_state.notify()?; + } + + Ok(poll_state) + } + + /// Notify the poller. + fn notify(&mut self) -> io::Result<()> { + if !self.inserted { + self.poller.post(self.packet.clone())?; + self.inserted = true; + } + + Ok(()) + } +} + +#[inline] +fn debug_ping(state: &State, name: &str, f: &mut fmt::Formatter) -> fmt::Result { + let counter = match state.counter.try_lock() { + Ok(counter) => counter, + Err(TryLockError::WouldBlock) => { + return f + .debug_tuple("Ping") + .field(&format_args!("")) + .finish() + } + Err(TryLockError::Poisoned(_)) => { + return f + .debug_tuple("Ping") + .field(&format_args!("")) + .finish() + } + }; + + let mut s = f.debug_struct(name); + s.field("notified", &counter.notified); + + // Tell if we are registered. + match &counter.poll_state { + Some(poll_state) => { + s.field("packet", poll_state.packet.event()); + s.field("inserted", &poll_state.inserted); + } + + None => { + s.field("packet", &format_args!("")); + } + } + + s.finish() +}