Skip to content

Commit

Permalink
feat: Expose waitable handles in Windows
Browse files Browse the repository at this point in the history
This commit allows waitable handles to be polled in Windows. This allows
I/O constructs like processes, mutexes and waitable events be registered
into the poller and be polled just like anything else.

cc #25

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored Jan 13, 2024
1 parent 6499077 commit fefe804
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 13 deletions.
34 changes: 34 additions & 0 deletions examples/windows-command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Runs a command using waitable handles on Windows.
//!
//! Run with:
//!
//! ```
//! cargo run --example windows-command
//! ```
#[cfg(windows)]
fn main() -> std::io::Result<()> {
use async_io::os::windows::Waitable;
use std::process::Command;

futures_lite::future::block_on(async {
// Spawn a process.
let process = Command::new("cmd")
.args(["/C", "echo hello"])
.spawn()
.expect("failed to spawn process");

// Wrap the process in an `Async` object that waits for it to exit.
let process = Waitable::new(process)?;

// Wait for the process to exit.
process.ready().await?;

Ok(())
})
}

#[cfg(not(windows))]
fn main() {
println!("This example is only supported on Windows.");
}
3 changes: 3 additions & 0 deletions src/os.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ pub mod unix;
target_os = "dragonfly",
))]
pub mod kqueue;

#[cfg(windows)]
pub mod windows;
192 changes: 192 additions & 0 deletions src/os/windows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
//! Functionality that is only available on Windows.
use crate::reactor::{Reactor, Readable, Registration};
use crate::Async;

use std::convert::TryFrom;
use std::future::Future;
use std::io::{self, Result};
use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, OwnedHandle, RawHandle};
use std::pin::Pin;
use std::task::{Context, Poll};

/// A waitable handle registered in the reactor.
///
/// Some handles in Windows are “waitable”, which means that they emit a “readiness” signal after some event occurs. This function can be used to wait for such events to occur on a handle. This function can be used in addition to regular socket polling.
///
/// Waitable objects include the following:
///
/// - Console inputs
/// - Waitable events
/// - Mutexes
/// - Processes
/// - Semaphores
/// - Threads
/// - Timer
///
/// This structure can be used to wait for any of these objects to become ready.
///
/// ## Implementation
///
/// The current implementation waits on the handle by registering it in the application-global
/// Win32 threadpool. However, in the futur it may be possible to migrate to an implementation
/// on Windows 10 that uses a mechanism similar to [`MsgWaitForMultipleObjectsEx`].
///
/// [`MsgWaitForMultipleObjectsEx`]: https://docs.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-msgwaitformultipleobjectsex
///
/// ## Caveats
///
/// Read the documentation for the [`Async`](crate::Async) type for more information regarding the
/// abilities and caveats with using this type.
#[derive(Debug)]
pub struct Waitable<T>(Async<T>);

impl<T> AsRef<T> for Waitable<T> {
fn as_ref(&self) -> &T {
self.0.as_ref()
}
}

impl<T: AsHandle> Waitable<T> {
/// Create a new [`Waitable`] around a waitable handle.
///
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::windows::Waitable;
///
/// // Create a new process to wait for.
/// let mut child = Command::new("sleep").arg("5").spawn().unwrap();
///
/// // Wrap the process in an `Async` object that waits for it to exit.
/// let process = Waitable::new(child).unwrap();
///
/// // Wait for the process to exit.
/// # async_io::block_on(async {
/// process.ready().await.unwrap();
/// # });
/// ```
pub fn new(handle: T) -> Result<Self> {
Ok(Self(Async {
source: Reactor::get()
.insert_io(unsafe { Registration::new_waitable(handle.as_handle()) })?,
io: Some(handle),
}))
}
}

impl<T: AsRawHandle> AsRawHandle for Waitable<T> {
fn as_raw_handle(&self) -> RawHandle {
self.get_ref().as_raw_handle()
}
}

impl<T: AsHandle> AsHandle for Waitable<T> {
fn as_handle(&self) -> BorrowedHandle<'_> {
self.get_ref().as_handle()
}
}

impl<T: AsHandle + From<OwnedHandle>> TryFrom<OwnedHandle> for Waitable<T> {
type Error = io::Error;

fn try_from(handle: OwnedHandle) -> Result<Self> {
Self::new(handle.into())
}
}

impl<T: Into<OwnedHandle>> TryFrom<Waitable<T>> for OwnedHandle {
type Error = io::Error;

fn try_from(value: Waitable<T>) -> std::result::Result<Self, Self::Error> {
value.into_inner().map(|handle| handle.into())
}
}

impl<T> Waitable<T> {
/// Get a reference to the inner handle.
pub fn get_ref(&self) -> &T {
self.0.get_ref()
}

/// Get a mutable reference to the inner handle.
///
/// # Safety
///
/// The underlying I/O source must not be dropped or moved out using this function.
pub unsafe fn get_mut(&mut self) -> &mut T {
self.0.get_mut()
}

/// Consumes the [`Waitable`], returning the inner handle.
pub fn into_inner(self) -> Result<T> {
self.0.into_inner()
}

/// Waits until the [`Waitable`] object is ready.
///
/// This method completes when the underlying [`Waitable`] object has completed. See the documentation
/// for the [`Waitable`] object for more information.
///
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::windows::Waitable;
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Waitable::new(child)?;
///
/// // Wait for the process to exit.
/// process.ready().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn ready(&self) -> Ready<'_, T> {
Ready(self.0.readable())
}

/// Polls the I/O handle for readiness.
///
/// When this method returns [`Poll::Ready`], that means that the OS has delivered a notification
/// that the underlying [`Waitable`] object is ready. See the documentation for the [`Waitable`]
/// object for more information.
///
/// # Caveats
///
/// Two different tasks should not call this method concurrently. Otherwise, conflicting tasks
/// will just keep waking each other in turn, thus wasting CPU time.
///
/// # Examples
///
/// ```no_run
/// use std::process::Command;
/// use async_io::os::windows::Waitable;
/// use futures_lite::future;
///
/// # futures_lite::future::block_on(async {
/// let child = Command::new("sleep").arg("5").spawn()?;
/// let process = Waitable::new(child)?;
///
/// // Wait for the process to exit.
/// future::poll_fn(|cx| process.poll_ready(cx)).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.0.poll_readable(cx)
}
}

/// Future for [`Filter::ready`].
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct Ready<'a, T>(Readable<'a, T>);

impl<T> Future for Ready<'_, T> {
type Output = Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
69 changes: 56 additions & 13 deletions src/reactor/windows.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,42 @@
// SPDX-License-Identifier: MIT OR Apache-2.0

use polling::{Event, Poller};
use polling::os::iocp::PollerIocpExt;
use polling::{Event, PollMode, Poller};
use std::fmt;
use std::io::Result;
use std::os::windows::io::{AsRawSocket, BorrowedSocket, RawSocket};
use std::os::windows::io::{
AsRawHandle, AsRawSocket, BorrowedHandle, BorrowedSocket, RawHandle, RawSocket,
};

/// The raw registration into the reactor.
#[doc(hidden)]
pub struct Registration {
pub enum Registration {
/// Raw socket handle on Windows.
///
/// # Invariant
///
/// This describes a valid socket that has not been `close`d. It will not be
/// closed while this object is alive.
raw: RawSocket,
Socket(RawSocket),

/// Waitable handle for Windows.
///
/// # Invariant
///
/// This describes a valid waitable handle that has not been `close`d. It will not be
/// closed while this object is alive.
Handle(RawHandle),
}

unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}

impl fmt::Debug for Registration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.raw, f)
match self {
Self::Socket(raw) => fmt::Debug::fmt(raw, f),
Self::Handle(handle) => fmt::Debug::fmt(handle, f),
}
}
}

Expand All @@ -30,31 +47,57 @@ impl Registration {
///
/// The provided file descriptor must be valid and not be closed while this object is alive.
pub(crate) unsafe fn new(f: BorrowedSocket<'_>) -> Self {
Self {
raw: f.as_raw_socket(),
}
Self::Socket(f.as_raw_socket())
}

/// Create a new [`Registration`] around a waitable handle.
///
/// # Safety
///
/// The provided handle must be valid and not be closed while this object is alive.
pub(crate) unsafe fn new_waitable(f: BorrowedHandle<'_>) -> Self {
Self::Handle(f.as_raw_handle())
}

/// Registers the object into the reactor.
#[inline]
pub(crate) fn add(&self, poller: &Poller, token: usize) -> Result<()> {
// SAFETY: This object's existence validates the invariants of Poller::add
unsafe { poller.add(self.raw, Event::none(token)) }
unsafe {
match self {
Self::Socket(raw) => poller.add(*raw, Event::none(token)),
Self::Handle(handle) => {
poller.add_waitable(*handle, Event::none(token), PollMode::Oneshot)
}
}
}
}

/// Re-registers the object into the reactor.
#[inline]
pub(crate) fn modify(&self, poller: &Poller, interest: Event) -> Result<()> {
// SAFETY: self.raw is a valid file descriptor
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
poller.modify(fd, interest)
match self {
Self::Socket(raw) => {
poller.modify(unsafe { BorrowedSocket::borrow_raw(*raw) }, interest)
}
Self::Handle(handle) => poller.modify_waitable(
unsafe { BorrowedHandle::borrow_raw(*handle) },
interest,
PollMode::Oneshot,
),
}
}

/// Deregisters the object from the reactor.
#[inline]
pub(crate) fn delete(&self, poller: &Poller) -> Result<()> {
// SAFETY: self.raw is a valid file descriptor
let fd = unsafe { BorrowedSocket::borrow_raw(self.raw) };
poller.delete(fd)
match self {
Self::Socket(raw) => poller.delete(unsafe { BorrowedSocket::borrow_raw(*raw) }),
Self::Handle(handle) => {
poller.remove_waitable(unsafe { BorrowedHandle::borrow_raw(*handle) })
}
}
}
}

0 comments on commit fefe804

Please sign in to comment.