Skip to content

Commit

Permalink
Add support for timers on web platforms
Browse files Browse the repository at this point in the history
This commit adds support for async-io on wasm32-unknown-unknown. Not all
features of async-io can be ported to WASM; for instance:

- Async<T> can't be ported over as WASM doesn't really have a reactor. WASI
  could eventually be supported here, but that is dependent on
  smol-rs/polling#102
- block_on() can't be ported over, as blocking isn't allowed on the web.

The only thing left is Timer, which can be implemented using setTimeout and
setInterval. So that's what's been done: when the WASM target family is enabled,
Async<T> and block_on() will be disabled and Timer will switch to an
implementation that uses web timeouts.

This is not a breaking change, as this crate previously failed to compile on web
platforms anyways.

This functionality currently does not support Node.js.

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Sep 24, 2023
1 parent a9aabe5 commit 8b7c787
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 20 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
- run: rustup target add wasm32-unknown-unknown
- run: cargo build --all --all-features --all-targets
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
Expand All @@ -50,6 +51,9 @@ jobs:
# if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
# run: cargo check -Z build-std --target=riscv32imc-esp-espidf
- run: cargo test
- uses: taiki-e/install-action@wasm-pack
- run: cargo check --target wasm32-unknown-unknown --all-features --tests
- run: wasm-pack test --node

# Copied from: https://github.com/rust-lang/stacker/pull/19/files
windows_gnu:
Expand Down
19 changes: 17 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ name = "timer"
harness = false

[dependencies]
async-lock = "2.6"
cfg-if = "1"
futures-lite = { version = "1.11.0", default-features = false }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
async-lock = "2.6"
concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "1.11.0", default-features = false }
parking = "2.0.0"
polling = "3.0.0"
rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] }
Expand All @@ -36,15 +38,28 @@ socket2 = { version = "0.5.3", features = ["all"] }
tracing = { version = "0.1.37", default-features = false }
waker-fn = "1.1.0"

[target.'cfg(target_family = "wasm")'.dependencies]
atomic-waker = "1.1.1"
wasm-bindgen = "0.2.87"
web-sys = { version = "0.3.0", features = ["Window"] }

[dev-dependencies]
async-channel = "1"

[target.'cfg(not(target_family = "wasm"))'.dev-dependencies]
async-net = "1"
blocking = "1"
criterion = { version = "0.4", default-features = false, features = ["cargo_bench_support"] }
getrandom = "0.2.7"
signal-hook = "0.3"
tempfile = "3"

[target.'cfg(target_family = "wasm")'.dev-dependencies]
console_error_panic_hook = "0.1.7"
wasm-bindgen-futures = "0.4.37"
wasm-bindgen-test = "0.3.37"
web-time = "0.2.0"

[target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.10.1", default-features = false }
timerfd = "1"
Expand Down
28 changes: 24 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,31 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::time::Duration;

use futures_lite::stream::Stream;
#[cfg(not(target_family = "wasm"))]
use std::time::Instant;

use crate::reactor::Reactor;
use futures_lite::stream::Stream;

#[cfg(not(target_family = "wasm"))]
mod driver;
#[cfg(not(target_family = "wasm"))]
mod io;
#[cfg(not(target_family = "wasm"))]
mod reactor;

#[path = "timer/native.rs"]
#[cfg_attr(not(target_family = "wasm"), path = "timer/native.rs")]
#[cfg_attr(target_family = "wasm", path = "timer/web.rs")]
mod timer;

pub mod os;

#[cfg(not(target_family = "wasm"))]
pub use driver::block_on;
#[cfg(not(target_family = "wasm"))]
pub use io::{Async, IoSafe};
#[cfg(not(target_family = "wasm"))]
pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned};

/// A future or stream that emits timed events.
Expand Down Expand Up @@ -197,6 +205,7 @@ impl Timer {
/// Timer::at(when).await;
/// # });
/// ```
#[cfg(not(target_family = "wasm"))]
#[inline]
pub fn at(instant: Instant) -> Timer {
Timer(timer::Timer::at(instant))
Expand Down Expand Up @@ -236,6 +245,7 @@ impl Timer {
/// Timer::interval_at(start, period).next().await;
/// # });
/// ```
#[cfg(not(target_family = "wasm"))]
#[inline]
pub fn interval_at(start: Instant, period: Duration) -> Timer {
Timer(timer::Timer::interval_at(start, period))
Expand Down Expand Up @@ -325,6 +335,7 @@ impl Timer {
/// t.set_at(when);
/// # });
/// ```
#[cfg(not(target_family = "wasm"))]
#[inline]
pub fn set_at(&mut self, instant: Instant) {
self.0.set_at(instant)
Expand Down Expand Up @@ -376,15 +387,20 @@ impl Timer {
/// t.set_interval_at(start, period);
/// # });
/// ```
#[cfg(not(target_family = "wasm"))]
#[inline]
pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
self.0.set_interval_at(start, period)
}
}

impl Future for Timer {
#[cfg(not(target_family = "wasm"))]
type Output = Instant;

#[cfg(target_family = "wasm")]
type Output = ();

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.poll_next(cx) {
Expand All @@ -396,8 +412,12 @@ impl Future for Timer {
}

impl Stream for Timer {
#[cfg(not(target_family = "wasm"))]
type Item = Instant;

#[cfg(target_family = "wasm")]
type Item = ();

#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_next(cx)
Expand Down
2 changes: 1 addition & 1 deletion src/os/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn reactor_fd() -> Option<BorrowedFd<'static>> {
not(polling_test_poll_backend),
))] {
use std::os::unix::io::AsFd;
Some(crate::Reactor::get().poller.as_fd())
Some(crate::reactor::Reactor::get().poller.as_fd())
} else {
None
}
Expand Down
212 changes: 212 additions & 0 deletions src/timer/web.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
//! Timers for web targets.
//!
//! These use the `setTimeout` function on the web to handle timing.
use std::convert::TryInto;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use atomic_waker::AtomicWaker;
use wasm_bindgen::closure::Closure;
use wasm_bindgen::JsCast;

/// A timer for non-Web platforms.
///
/// self registers a timeout in the global reactor, which in turn sets a timeout in the poll call.
#[derive(Debug)]
pub(super) struct Timer {
/// The waker to wake when the timer fires.
waker: Arc<State>,

/// The ongoing timeout or interval.
ongoing_timeout: TimerId,

/// Keep the closure alive so we don't drop it.
closure: Option<Closure<dyn FnMut()>>,
}

#[derive(Debug)]
struct State {
/// The number of times this timer has been woken.
woken: AtomicUsize,

/// The waker to wake when the timer fires.
waker: AtomicWaker,
}

#[derive(Debug)]
enum TimerId {
NoTimer,
Timeout(i32),
Interval(i32),
}

impl Timer {
/// Create a timer that will never fire.
#[inline]
pub(super) fn never() -> Self {
Self {
waker: Arc::new(State {
woken: AtomicUsize::new(0),
waker: AtomicWaker::new(),
}),
ongoing_timeout: TimerId::NoTimer,
closure: None,
}
}

/// Create a timer that will fire at the given instant.
#[inline]
pub(super) fn after(duration: Duration) -> Timer {
let mut this = Self::never();
this.set_after(duration);
this
}

/// Create a timer that will fire at the given instant.
#[inline]
pub(super) fn interval(period: Duration) -> Timer {
let mut this = Self::never();
this.set_interval(period);
this
}

/// Returns `true` if self timer will fire at some point.
#[inline]
pub(super) fn will_fire(&self) -> bool {
matches!(
self.ongoing_timeout,
TimerId::Timeout(_) | TimerId::Interval(_)
)
}

/// Set the timer to fire after the given duration.
#[inline]
pub(super) fn set_after(&mut self, duration: Duration) {
// Set the timeout.
let id = {
let waker = self.waker.clone();
let closure: Closure<dyn FnMut()> = Closure::wrap(Box::new(move || {
waker.wake();
}));

let result = web_sys::window()
.unwrap()
.set_timeout_with_callback_and_timeout_and_arguments_0(
closure.as_ref().unchecked_ref(),
duration.as_millis().try_into().expect("timeout too long"),
);

// Make sure we don't drop the closure before it's called.
self.closure = Some(closure);

match result {
Ok(id) => id,
Err(_) => {
panic!("failed to set timeout")
}
}
};

// Set our ID.
self.ongoing_timeout = TimerId::Timeout(id);
}

/// Set the timer to emit events periodically.
#[inline]
pub(super) fn set_interval(&mut self, period: Duration) {
// Set the timeout.
let id = {
let waker = self.waker.clone();
let closure: Closure<dyn FnMut()> = Closure::wrap(Box::new(move || {
waker.wake();
}));

let result = web_sys::window()
.unwrap()
.set_interval_with_callback_and_timeout_and_arguments_0(
closure.as_ref().unchecked_ref(),
period.as_millis().try_into().expect("timeout too long"),
);

// Make sure we don't drop the closure before it's called.
self.closure = Some(closure);

match result {
Ok(id) => id,
Err(_) => {
panic!("failed to set interval")
}
}
};

// Set our ID.
self.ongoing_timeout = TimerId::Interval(id);
}

/// Poll for the next timer event.
#[inline]
pub(super) fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
let mut registered = false;
let mut woken = self.waker.woken.load(Ordering::Acquire);

loop {
if woken > 0 {
// Try to decrement the number of woken events.
if let Err(new_woken) = self.waker.woken.compare_exchange(
woken,
woken - 1,
Ordering::SeqCst,
Ordering::Acquire,
) {
woken = new_woken;
continue;
}

// If we are using a one-shot timer, clear it.
if let TimerId::Timeout(_) = self.ongoing_timeout {
self.clear();
}

return Poll::Ready(Some(()));
}

if !registered {
// Register the waker.
self.waker.waker.register(cx.waker());
registered = true;
} else {
// We've already registered, so we can just return pending.
return Poll::Pending;
}
}
}

/// Clear the current timeout.
fn clear(&mut self) {
match self.ongoing_timeout {
TimerId::NoTimer => {}
TimerId::Timeout(id) => {
web_sys::window().unwrap().clear_timeout_with_handle(id);
}
TimerId::Interval(id) => {
web_sys::window().unwrap().clear_interval_with_handle(id);
}
}
}
}

impl State {
fn wake(&self) {
self.woken.fetch_add(1, Ordering::SeqCst);
self.waker.wake();
}
}

impl Drop for Timer {
fn drop(&mut self) {
self.clear();
}
}
2 changes: 2 additions & 0 deletions tests/async.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(not(target_family = "wasm"))]

use std::future::Future;
use std::io;
use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket};
Expand Down
2 changes: 2 additions & 0 deletions tests/block_on.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(not(target_family = "wasm"))]

use async_io::block_on;
use std::{
future::Future,
Expand Down
Loading

0 comments on commit 8b7c787

Please sign in to comment.