diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1e59d5a..1afcf4f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,6 +38,7 @@ jobs: - name: Install Rust # --no-self-update is necessary because the windows environment cannot self-update rustup.exe. run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }} + - run: rustup target add wasm32-unknown-unknown - run: cargo build --all --all-features --all-targets - name: Run cargo check (without dev-dependencies to catch missing feature flags) if: startsWith(matrix.rust, 'nightly') @@ -50,6 +51,9 @@ jobs: # if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest' # run: cargo check -Z build-std --target=riscv32imc-esp-espidf - run: cargo test + - uses: taiki-e/install-action@wasm-pack + - run: cargo check --target wasm32-unknown-unknown --all-features --tests + - run: wasm-pack test --node # Copied from: https://github.com/rust-lang/stacker/pull/19/files windows_gnu: diff --git a/Cargo.toml b/Cargo.toml index 701b1d0..cf69099 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,13 @@ name = "timer" harness = false [dependencies] -async-lock = "2.6" cfg-if = "1" +futures-lite = { version = "1.11.0", default-features = false } + +[target.'cfg(not(target_family = "wasm"))'.dependencies] +async-lock = "2.6" concurrent-queue = "2.2.0" futures-io = { version = "0.3.28", default-features = false, features = ["std"] } -futures-lite = { version = "1.11.0", default-features = false } parking = "2.0.0" polling = "3.0.0" rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] } @@ -36,8 +38,15 @@ socket2 = { version = "0.5.3", features = ["all"] } tracing = { version = "0.1.37", default-features = false } waker-fn = "1.1.0" +[target.'cfg(target_family = "wasm")'.dependencies] +atomic-waker = "1.1.1" +wasm-bindgen = "0.2.87" +web-sys = { version = "0.3.0", features = ["Window"] } + [dev-dependencies] async-channel = "1" + +[target.'cfg(not(target_family = "wasm"))'.dev-dependencies] async-net = "1" blocking = "1" criterion = { version = "0.4", default-features = false, features = ["cargo_bench_support"] } @@ -45,6 +54,12 @@ getrandom = "0.2.7" signal-hook = "0.3" tempfile = "3" +[target.'cfg(target_family = "wasm")'.dev-dependencies] +console_error_panic_hook = "0.1.7" +wasm-bindgen-futures = "0.4.37" +wasm-bindgen-test = "0.3.37" +web-time = "0.2.0" + [target.'cfg(target_os = "linux")'.dev-dependencies] inotify = { version = "0.10.1", default-features = false } timerfd = "1" diff --git a/src/lib.rs b/src/lib.rs index 7d3f042..a7759a5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,23 +64,31 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; -use futures_lite::stream::Stream; +#[cfg(not(target_family = "wasm"))] +use std::time::Instant; -use crate::reactor::Reactor; +use futures_lite::stream::Stream; +#[cfg(not(target_family = "wasm"))] mod driver; +#[cfg(not(target_family = "wasm"))] mod io; +#[cfg(not(target_family = "wasm"))] mod reactor; -#[path = "timer/native.rs"] +#[cfg_attr(not(target_family = "wasm"), path = "timer/native.rs")] +#[cfg_attr(target_family = "wasm", path = "timer/web.rs")] mod timer; pub mod os; +#[cfg(not(target_family = "wasm"))] pub use driver::block_on; +#[cfg(not(target_family = "wasm"))] pub use io::{Async, IoSafe}; +#[cfg(not(target_family = "wasm"))] pub use reactor::{Readable, ReadableOwned, Writable, WritableOwned}; /// A future or stream that emits timed events. @@ -197,6 +205,7 @@ impl Timer { /// Timer::at(when).await; /// # }); /// ``` + #[cfg(not(target_family = "wasm"))] #[inline] pub fn at(instant: Instant) -> Timer { Timer(timer::Timer::at(instant)) @@ -236,6 +245,7 @@ impl Timer { /// Timer::interval_at(start, period).next().await; /// # }); /// ``` + #[cfg(not(target_family = "wasm"))] #[inline] pub fn interval_at(start: Instant, period: Duration) -> Timer { Timer(timer::Timer::interval_at(start, period)) @@ -325,6 +335,7 @@ impl Timer { /// t.set_at(when); /// # }); /// ``` + #[cfg(not(target_family = "wasm"))] #[inline] pub fn set_at(&mut self, instant: Instant) { self.0.set_at(instant) @@ -376,6 +387,7 @@ impl Timer { /// t.set_interval_at(start, period); /// # }); /// ``` + #[cfg(not(target_family = "wasm"))] #[inline] pub fn set_interval_at(&mut self, start: Instant, period: Duration) { self.0.set_interval_at(start, period) @@ -383,8 +395,12 @@ impl Timer { } impl Future for Timer { + #[cfg(not(target_family = "wasm"))] type Output = Instant; + #[cfg(target_family = "wasm")] + type Output = (); + #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.poll_next(cx) { @@ -396,8 +412,12 @@ impl Future for Timer { } impl Stream for Timer { + #[cfg(not(target_family = "wasm"))] type Item = Instant; + #[cfg(target_family = "wasm")] + type Item = (); + #[inline] fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.0.poll_next(cx) diff --git a/src/os/unix.rs b/src/os/unix.rs index ffb0832..d43462e 100644 --- a/src/os/unix.rs +++ b/src/os/unix.rs @@ -62,7 +62,7 @@ pub fn reactor_fd() -> Option> { not(polling_test_poll_backend), ))] { use std::os::unix::io::AsFd; - Some(crate::Reactor::get().poller.as_fd()) + Some(crate::reactor::Reactor::get().poller.as_fd()) } else { None } diff --git a/src/timer/web.rs b/src/timer/web.rs new file mode 100644 index 0000000..4fdfc19 --- /dev/null +++ b/src/timer/web.rs @@ -0,0 +1,212 @@ +//! Timers for web targets. +//! +//! These use the `setTimeout` function on the web to handle timing. + +use std::convert::TryInto; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use atomic_waker::AtomicWaker; +use wasm_bindgen::closure::Closure; +use wasm_bindgen::JsCast; + +/// A timer for non-Web platforms. +/// +/// self registers a timeout in the global reactor, which in turn sets a timeout in the poll call. +#[derive(Debug)] +pub(super) struct Timer { + /// The waker to wake when the timer fires. + waker: Arc, + + /// The ongoing timeout or interval. + ongoing_timeout: TimerId, + + /// Keep the closure alive so we don't drop it. + closure: Option>, +} + +#[derive(Debug)] +struct State { + /// The number of times this timer has been woken. + woken: AtomicUsize, + + /// The waker to wake when the timer fires. + waker: AtomicWaker, +} + +#[derive(Debug)] +enum TimerId { + NoTimer, + Timeout(i32), + Interval(i32), +} + +impl Timer { + /// Create a timer that will never fire. + #[inline] + pub(super) fn never() -> Self { + Self { + waker: Arc::new(State { + woken: AtomicUsize::new(0), + waker: AtomicWaker::new(), + }), + ongoing_timeout: TimerId::NoTimer, + closure: None, + } + } + + /// Create a timer that will fire at the given instant. + #[inline] + pub(super) fn after(duration: Duration) -> Timer { + let mut this = Self::never(); + this.set_after(duration); + this + } + + /// Create a timer that will fire at the given instant. + #[inline] + pub(super) fn interval(period: Duration) -> Timer { + let mut this = Self::never(); + this.set_interval(period); + this + } + + /// Returns `true` if self timer will fire at some point. + #[inline] + pub(super) fn will_fire(&self) -> bool { + matches!( + self.ongoing_timeout, + TimerId::Timeout(_) | TimerId::Interval(_) + ) + } + + /// Set the timer to fire after the given duration. + #[inline] + pub(super) fn set_after(&mut self, duration: Duration) { + // Set the timeout. + let id = { + let waker = self.waker.clone(); + let closure: Closure = Closure::wrap(Box::new(move || { + waker.wake(); + })); + + let result = web_sys::window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0( + closure.as_ref().unchecked_ref(), + duration.as_millis().try_into().expect("timeout too long"), + ); + + // Make sure we don't drop the closure before it's called. + self.closure = Some(closure); + + match result { + Ok(id) => id, + Err(_) => { + panic!("failed to set timeout") + } + } + }; + + // Set our ID. + self.ongoing_timeout = TimerId::Timeout(id); + } + + /// Set the timer to emit events periodically. + #[inline] + pub(super) fn set_interval(&mut self, period: Duration) { + // Set the timeout. + let id = { + let waker = self.waker.clone(); + let closure: Closure = Closure::wrap(Box::new(move || { + waker.wake(); + })); + + let result = web_sys::window() + .unwrap() + .set_interval_with_callback_and_timeout_and_arguments_0( + closure.as_ref().unchecked_ref(), + period.as_millis().try_into().expect("timeout too long"), + ); + + // Make sure we don't drop the closure before it's called. + self.closure = Some(closure); + + match result { + Ok(id) => id, + Err(_) => { + panic!("failed to set interval") + } + } + }; + + // Set our ID. + self.ongoing_timeout = TimerId::Interval(id); + } + + /// Poll for the next timer event. + #[inline] + pub(super) fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + let mut registered = false; + let mut woken = self.waker.woken.load(Ordering::Acquire); + + loop { + if woken > 0 { + // Try to decrement the number of woken events. + if let Err(new_woken) = self.waker.woken.compare_exchange( + woken, + woken - 1, + Ordering::SeqCst, + Ordering::Acquire, + ) { + woken = new_woken; + continue; + } + + // If we are using a one-shot timer, clear it. + if let TimerId::Timeout(_) = self.ongoing_timeout { + self.clear(); + } + + return Poll::Ready(Some(())); + } + + if !registered { + // Register the waker. + self.waker.waker.register(cx.waker()); + registered = true; + } else { + // We've already registered, so we can just return pending. + return Poll::Pending; + } + } + } + + /// Clear the current timeout. + fn clear(&mut self) { + match self.ongoing_timeout { + TimerId::NoTimer => {} + TimerId::Timeout(id) => { + web_sys::window().unwrap().clear_timeout_with_handle(id); + } + TimerId::Interval(id) => { + web_sys::window().unwrap().clear_interval_with_handle(id); + } + } + } +} + +impl State { + fn wake(&self) { + self.woken.fetch_add(1, Ordering::SeqCst); + self.waker.wake(); + } +} + +impl Drop for Timer { + fn drop(&mut self) { + self.clear(); + } +} diff --git a/tests/async.rs b/tests/async.rs index c856760..5d9bd12 100644 --- a/tests/async.rs +++ b/tests/async.rs @@ -1,3 +1,5 @@ +#![cfg(not(target_family = "wasm"))] + use std::future::Future; use std::io; use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket}; diff --git a/tests/block_on.rs b/tests/block_on.rs index 70241f0..3a5c1ba 100644 --- a/tests/block_on.rs +++ b/tests/block_on.rs @@ -1,3 +1,5 @@ +#![cfg(not(target_family = "wasm"))] + use async_io::block_on; use std::{ future::Future, diff --git a/tests/timer.rs b/tests/timer.rs index cdd90db..5a16089 100644 --- a/tests/timer.rs +++ b/tests/timer.rs @@ -1,12 +1,26 @@ use std::future::Future; +#[cfg(not(target_family = "wasm"))] use std::pin::Pin; +#[cfg(not(target_family = "wasm"))] use std::sync::{Arc, Mutex}; +#[cfg(not(target_family = "wasm"))] use std::thread; + +#[cfg(not(target_family = "wasm"))] use std::time::{Duration, Instant}; +#[cfg(target_family = "wasm")] +use web_time::{Duration, Instant}; + +#[cfg(target_family = "wasm")] +wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); use async_io::Timer; -use futures_lite::{future, FutureExt, StreamExt}; +use futures_lite::{FutureExt, StreamExt}; + +#[cfg(not(target_family = "wasm"))] +use futures_lite::future; +#[cfg(not(target_family = "wasm"))] fn spawn( f: impl Future + Send + 'static, ) -> impl Future + Send + 'static { @@ -21,18 +35,60 @@ fn spawn( Box::pin(async move { r.recv().await.unwrap() }) } -#[test] -fn smoke() { - future::block_on(async { +#[cfg(target_family = "wasm")] +fn spawn(f: impl Future + 'static) -> impl Future + 'static { + let (s, r) = async_channel::bounded(1); + + #[cfg(target_family = "wasm")] + wasm_bindgen_futures::spawn_local(async move { + s.send(f.await).await.ok(); + }); + + Box::pin(async move { r.recv().await.unwrap() }) +} + +#[cfg(not(target_family = "wasm"))] +macro_rules! test { + ( + $(#[$meta:meta])* + async fn $name:ident () $bl:block + ) => { + #[test] + $(#[$meta])* + fn $name() { + futures_lite::future::block_on(async { + $bl + }) + } + }; +} + +#[cfg(target_family = "wasm")] +macro_rules! test { + ( + $(#[$meta:meta])* + async fn $name:ident () $bl:block + ) => { + // wasm-bindgen-test handles waiting on the future for us + #[wasm_bindgen_test::wasm_bindgen_test] + $(#[$meta])* + async fn $name() { + console_error_panic_hook::set_once(); + $bl + } + }; +} + +test! { + async fn smoke() { let start = Instant::now(); Timer::after(Duration::from_secs(1)).await; assert!(start.elapsed() >= Duration::from_secs(1)); - }); + } } -#[test] -fn interval() { - future::block_on(async { +test! { + async fn interval() { let period = Duration::from_secs(1); let jitter = Duration::from_millis(500); let start = Instant::now(); @@ -43,12 +99,11 @@ fn interval() { timer.next().await; let elapsed = start.elapsed(); assert!(elapsed >= period * 2 && elapsed - period * 2 < jitter); - }); + } } -#[test] -fn poll_across_tasks() { - future::block_on(async { +test! { + async fn poll_across_tasks() { let start = Instant::now(); let (sender, receiver) = async_channel::bounded(1); @@ -74,9 +129,10 @@ fn poll_across_tasks() { task2.await; assert!(start.elapsed() >= Duration::from_secs(1)); - }); + } } +#[cfg(not(target_family = "wasm"))] #[test] fn set() { future::block_on(async {