diff --git a/Cargo.toml b/Cargo.toml index 79dc63b..0a1ab0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,10 @@ documentation = "https://docs.rs/async-io" keywords = ["mio", "epoll", "kqueue", "iocp", "wepoll"] categories = ["asynchronous", "network-programming", "os"] +[features] +default = ["driver"] +driver = [] + [dependencies] concurrent-queue = "1.2.2" fastrand = "1.3.5" diff --git a/src/block_on.rs b/src/block_on.rs new file mode 100644 index 0000000..dd95536 --- /dev/null +++ b/src/block_on.rs @@ -0,0 +1,157 @@ +use std::cell::Cell; +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use futures_lite::pin; +use waker_fn::waker_fn; + +use crate::reactor::Reactor; + +/// Blocks the current thread on a future, processing I/O events when idle. +/// +/// # Examples +/// +/// ``` +/// use async_io::Timer; +/// use std::time::Duration; +/// +/// async_io::block_on(async { +/// // This timer will likely be processed by the current +/// // thread rather than the fallback "async-io" thread. +/// Timer::after(Duration::from_millis(1)).await; +/// }); +/// ``` +pub fn block_on(future: impl Future) -> T { + log::trace!("block_on()"); + + // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive. + #[cfg(feature = "driver")] + crate::driver::BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst); + + // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread. + #[cfg(feature = "driver")] + let _guard = CallOnDrop(|| { + crate::driver::BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst); + crate::driver::UNPARKER.unpark(); + }); + + // Parker and unparker for notifying the current thread. + let (p, u) = parking::pair(); + // This boolean is set to `true` when the current thread is blocked on I/O. + let io_blocked = Arc::new(AtomicBool::new(false)); + + thread_local! { + // Indicates that the current thread is polling I/O, but not necessarily blocked on it. + static IO_POLLING: Cell = Cell::new(false); + } + + // Prepare the waker. + let waker = waker_fn({ + let io_blocked = io_blocked.clone(); + move || { + if u.unpark() { + // Check if waking from another thread and if currently blocked on I/O. + if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) { + Reactor::get().notify(); + } + } + } + }); + let cx = &mut Context::from_waker(&waker); + pin!(future); + + loop { + // Poll the future. + if let Poll::Ready(t) = future.as_mut().poll(cx) { + log::trace!("block_on: completed"); + return t; + } + + // Check if a notification was received. + if p.park_timeout(Duration::from_secs(0)) { + log::trace!("block_on: notified"); + + // Try grabbing a lock on the reactor to process I/O events. + if let Some(mut reactor_lock) = Reactor::get().try_lock() { + // First let wakers know this parker is processing I/O events. + IO_POLLING.with(|io| io.set(true)); + let _guard = CallOnDrop(|| { + IO_POLLING.with(|io| io.set(false)); + }); + + // Process available I/O events. + reactor_lock.react(Some(Duration::from_secs(0))).ok(); + } + continue; + } + + // Try grabbing a lock on the reactor to wait on I/O. + if let Some(mut reactor_lock) = Reactor::get().try_lock() { + // Record the instant at which the lock was grabbed. + let start = Instant::now(); + + loop { + // First let wakers know this parker is blocked on I/O. + IO_POLLING.with(|io| io.set(true)); + io_blocked.store(true, Ordering::SeqCst); + let _guard = CallOnDrop(|| { + IO_POLLING.with(|io| io.set(false)); + io_blocked.store(false, Ordering::SeqCst); + }); + + // Check if a notification has been received before `io_blocked` was updated + // because in that case the reactor won't receive a wakeup. + if p.park_timeout(Duration::from_secs(0)) { + log::trace!("block_on: notified"); + break; + } + + // Wait for I/O events. + log::trace!("block_on: waiting on I/O"); + reactor_lock.react(None).ok(); + + // Check if a notification has been received. + if p.park_timeout(Duration::from_secs(0)) { + log::trace!("block_on: notified"); + break; + } + + // Check if this thread been handling I/O events for a long time. + if start.elapsed() > Duration::from_micros(500) { + log::trace!("block_on: stops hogging the reactor"); + + // This thread is clearly processing I/O events for some other threads + // because it didn't get a notification yet. It's best to stop hogging the + // reactor and give other threads a chance to process I/O events for + // themselves. + drop(reactor_lock); + + // Unpark the "async-io" thread in case no other thread is ready to start + // processing I/O events. This way we prevent a potential latency spike. + #[cfg(feature = "driver")] + crate::driver::UNPARKER.unpark(); + + // Wait for a notification. + p.park(); + break; + } + } + } else { + // Wait for an actual notification. + log::trace!("block_on: sleep until notification"); + p.park(); + } + } +} + +/// Runs a closure when dropped. +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +} diff --git a/src/driver.rs b/src/driver.rs index dd52621..e6bd176 100644 --- a/src/driver.rs +++ b/src/driver.rs @@ -1,22 +1,16 @@ -use std::cell::Cell; -use std::future::Future; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Duration; -use futures_lite::pin; use once_cell::sync::Lazy; -use waker_fn::waker_fn; use crate::reactor::Reactor; /// Number of currently active `block_on()` invocations. -static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); +pub(crate) static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0); /// Unparker for the "async-io" thread. -static UNPARKER: Lazy = Lazy::new(|| { +pub(crate) static UNPARKER: Lazy = Lazy::new(|| { let (parker, unparker) = parking::pair(); // Spawn a helper thread driving the reactor. @@ -85,146 +79,3 @@ fn main_loop(parker: parking::Parker) { } } } - -/// Blocks the current thread on a future, processing I/O events when idle. -/// -/// # Examples -/// -/// ``` -/// use async_io::Timer; -/// use std::time::Duration; -/// -/// async_io::block_on(async { -/// // This timer will likely be processed by the current -/// // thread rather than the fallback "async-io" thread. -/// Timer::after(Duration::from_millis(1)).await; -/// }); -/// ``` -pub fn block_on(future: impl Future) -> T { - log::trace!("block_on()"); - - // Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive. - BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst); - - // Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread. - let _guard = CallOnDrop(|| { - BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst); - UNPARKER.unpark(); - }); - - // Parker and unparker for notifying the current thread. - let (p, u) = parking::pair(); - // This boolean is set to `true` when the current thread is blocked on I/O. - let io_blocked = Arc::new(AtomicBool::new(false)); - - thread_local! { - // Indicates that the current thread is polling I/O, but not necessarily blocked on it. - static IO_POLLING: Cell = Cell::new(false); - } - - // Prepare the waker. - let waker = waker_fn({ - let io_blocked = io_blocked.clone(); - move || { - if u.unpark() { - // Check if waking from another thread and if currently blocked on I/O. - if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) { - Reactor::get().notify(); - } - } - } - }); - let cx = &mut Context::from_waker(&waker); - pin!(future); - - loop { - // Poll the future. - if let Poll::Ready(t) = future.as_mut().poll(cx) { - log::trace!("block_on: completed"); - return t; - } - - // Check if a notification was received. - if p.park_timeout(Duration::from_secs(0)) { - log::trace!("block_on: notified"); - - // Try grabbing a lock on the reactor to process I/O events. - if let Some(mut reactor_lock) = Reactor::get().try_lock() { - // First let wakers know this parker is processing I/O events. - IO_POLLING.with(|io| io.set(true)); - let _guard = CallOnDrop(|| { - IO_POLLING.with(|io| io.set(false)); - }); - - // Process available I/O events. - reactor_lock.react(Some(Duration::from_secs(0))).ok(); - } - continue; - } - - // Try grabbing a lock on the reactor to wait on I/O. - if let Some(mut reactor_lock) = Reactor::get().try_lock() { - // Record the instant at which the lock was grabbed. - let start = Instant::now(); - - loop { - // First let wakers know this parker is blocked on I/O. - IO_POLLING.with(|io| io.set(true)); - io_blocked.store(true, Ordering::SeqCst); - let _guard = CallOnDrop(|| { - IO_POLLING.with(|io| io.set(false)); - io_blocked.store(false, Ordering::SeqCst); - }); - - // Check if a notification has been received before `io_blocked` was updated - // because in that case the reactor won't receive a wakeup. - if p.park_timeout(Duration::from_secs(0)) { - log::trace!("block_on: notified"); - break; - } - - // Wait for I/O events. - log::trace!("block_on: waiting on I/O"); - reactor_lock.react(None).ok(); - - // Check if a notification has been received. - if p.park_timeout(Duration::from_secs(0)) { - log::trace!("block_on: notified"); - break; - } - - // Check if this thread been handling I/O events for a long time. - if start.elapsed() > Duration::from_micros(500) { - log::trace!("block_on: stops hogging the reactor"); - - // This thread is clearly processing I/O events for some other threads - // because it didn't get a notification yet. It's best to stop hogging the - // reactor and give other threads a chance to process I/O events for - // themselves. - drop(reactor_lock); - - // Unpark the "async-io" thread in case no other thread is ready to start - // processing I/O events. This way we prevent a potential latency spike. - UNPARKER.unpark(); - - // Wait for a notification. - p.park(); - break; - } - } - } else { - // Wait for an actual notification. - log::trace!("block_on: sleep until notification"); - p.park(); - } - } -} - -/// Runs a closure when dropped. -struct CallOnDrop(F); - -impl Drop for CallOnDrop { - fn drop(&mut self) { - (self.0)(); - } -} diff --git a/src/lib.rs b/src/lib.rs index b3c710c..83816a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,10 +81,12 @@ use socket2::{Domain, Protocol, SockAddr, Socket, Type}; use crate::reactor::{Reactor, Source}; +mod block_on; +#[cfg(feature = "driver")] mod driver; mod reactor; -pub use driver::block_on; +pub use block_on::block_on; /// Use `Duration::MAX` once `duration_constants` are stabilized. fn duration_max() -> Duration { diff --git a/src/reactor.rs b/src/reactor.rs index 1f3836d..1947a3c 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -63,6 +63,7 @@ impl Reactor { /// Returns a reference to the reactor. pub(crate) fn get() -> &'static Reactor { static REACTOR: Lazy = Lazy::new(|| { + #[cfg(feature = "driver")] crate::driver::init(); Reactor { poller: Poller::new().expect("cannot initialize I/O event notification"), @@ -158,6 +159,7 @@ impl Reactor { } /// Locks the reactor, potentially blocking if the lock is held by another thread. + #[allow(dead_code)] pub(crate) fn lock(&self) -> ReactorLock<'_> { let reactor = self; let events = self.events.lock().unwrap();