From 027faa3d22f9844f3e62a778835ab4855e92ac91 Mon Sep 17 00:00:00 2001 From: Pascal Berrang Date: Fri, 29 Nov 2024 16:32:57 -0600 Subject: [PATCH] Add documentation Add further CI tools Bump version --- .github/workflows/test.yml | 35 ++++++++++++++ Cargo.lock | 4 +- Cargo.toml | 2 +- README.md | 10 ++-- demo/index.js | 2 +- proc-macro/Cargo.toml | 2 +- src/convert.rs | 6 +++ src/error.rs | 9 ++++ src/func.rs | 27 +++++++++++ src/global.rs | 28 ++++++++++- src/iter_ext/mod.rs | 16 +++++++ src/pool/mod.rs | 98 ++++++++++++++++++++++++++++---------- src/pool/scheduler.rs | 22 +++++++++ src/webworker/com.rs | 18 +++++++ src/webworker/js.rs | 6 +++ src/webworker/worker.rs | 97 +++++++++++++++++++++++++++++++++++-- test/src/convert.rs | 7 --- test/src/lib.rs | 1 + test/src/raw.rs | 17 +++---- 19 files changed, 354 insertions(+), 53 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 508ca0f..8b828e6 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,6 +3,41 @@ name: Test on: [push, pull_request] jobs: + rustfmt: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@nightly + with: + components: rustfmt + - run: cargo fmt --all -- --check + + clippy: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly + components: clippy + override: true + - uses: actions-rs/clippy-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + args: --all-features + + doc-test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - name: Run doctest + run: cargo test --doc + test: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index a323ee7..47b4364 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -582,7 +582,7 @@ checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "wasmworker" -version = "0.1.0" +version = "0.1.1" dependencies = [ "futures", "js-sys", @@ -618,7 +618,7 @@ dependencies = [ [[package]] name = "wasmworker-proc-macro" -version = "0.1.0" +version = "0.1.1" dependencies = [ "quote", "syn", diff --git a/Cargo.toml b/Cargo.toml index 9338900..c79bf3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ wasmworker-proc-macro = { version = "0.1", path = "proc-macro" } [package] name = "wasmworker" -version = "0.1.0" +version = "0.1.1" edition = "2021" description.workspace = true diff --git a/README.md b/README.md index a269ae3..9ec12ee 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ In contrast to many other libraries like [wasm-bindgen-rayon](https://github.com - [Setting up](#setting-up) - [Outsourcing tasks](#outsourcing-tasks) - [WebWorker](#webworker) - - [WorkerPool](#workerpool) + - [WebWorkerPool](#webworkerpool) - [Iterator extension](#iterator-extension) - [Feature detection](#feature-detection) - [FAQ](#faq) @@ -32,7 +32,7 @@ Without the `serde` feature, only functions with the type `fn(Box<[u8]>) -> Box< This is useful for users that do not want a direct serde dependency. Internally, the library always uses serde, though. You can then start using the library without further setup. -If you plan on using the global `WorkerPool` (using the iterator extensions or `worker_pool()`), you can *optionally* configure this pool: +If you plan on using the global `WebWorkerPool` (using the iterator extensions or `worker_pool()`), you can *optionally* configure this pool: ```rust // Importing it publicly will also expose the function on the JavaScript side. // You can instantiate the pool both via Rust and JS. @@ -49,7 +49,7 @@ async fn startup() { ### Outsourcing tasks The library offers three ways of outsourcing function calls onto concurrent workers: 1. `WebWorker`: a single worker, to which tasks can be queued to. -2. `WorkerPool`: a pool of multiple workers, to which tasks are distributed. +2. `WebWorkerPool`: a pool of multiple workers, to which tasks are distributed. 3. `par_map`: an extension to regular iterators, which allows to execute a function on every element of the iterator in parallel using the default worker pool. All approaches require the functions that should be executed to be annotated with the `#[webworker_fn]` macro. @@ -90,7 +90,7 @@ let res = worker.run(webworker!(sort_vec), &VecType(vec![5, 2, 8])).await; assert_eq!(res.0, vec![2, 5, 8]); ``` -#### WorkerPool +#### WebWorkerPool Most of the time, we probably want to schedule tasks to a pool of workers, though. The default worker pool is instantiated on first use and can be configured using `init_worker_pool()` as described above. It uses a round-robin scheduler (with the second option being a load based scheduler), a number of `navigator.hardwareConcurrency` separate workers, and the default inferred path. @@ -128,7 +128,7 @@ let res: Vec = some_vec.iter().par_map(webworker!(sort_vec)).await; So far, this library has only been tested with `--target web`. Other targets seem to generally be problematic in that the wasm glue is inaccessible or paths are not correct. - Both the `Worker` and `WorkerPool` have an option to set a custom path, which should make it possible to support other targets dynamically, though. + Both the `Worker` and `WebWorkerPool` have an option to set a custom path, which should make it possible to support other targets dynamically, though. 3. _Can I use bundlers?_ diff --git a/demo/index.js b/demo/index.js index d755795..4253300 100644 --- a/demo/index.js +++ b/demo/index.js @@ -1,5 +1,5 @@ // Import required functions. -import init, { runWorker, runPool, runParMap } from "./pkg/webworker_demo.js"; +import init, { runWorker, runPool, runParMap } from "./pkg/wasmworker_demo.js"; async function run_wasm() { // Load wasm bindgen. diff --git a/proc-macro/Cargo.toml b/proc-macro/Cargo.toml index 31eeee0..de6a22f 100644 --- a/proc-macro/Cargo.toml +++ b/proc-macro/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wasmworker-proc-macro" -version = "0.1.0" +version = "0.1.1" edition = "2021" description.workspace = true diff --git a/src/convert.rs b/src/convert.rs index eee145d..43d7cb3 100644 --- a/src/convert.rs +++ b/src/convert.rs @@ -1,11 +1,17 @@ use serde::{Deserialize, Serialize}; +/// This wrapper function encapsules our internal serialization format. +/// It is used internally to prepare values before sending them to a worker +/// or back to the main thread via `postMessage`. pub fn to_bytes(value: &T) -> Box<[u8]> { postcard::to_allocvec(value) .expect("WebWorker serialization failed") .into() } +/// This wrapper function encapsules our internal serialization format. +/// It is used internally to prepare values after receiving them from a worker +/// or the main thread via `postMessage`. pub fn from_bytes<'de, T: Deserialize<'de>>(bytes: &'de [u8]) -> T { postcard::from_bytes(bytes).expect("WebWorker deserialization failed") } diff --git a/src/error.rs b/src/error.rs index 4bc08ed..ac845a9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,14 +1,23 @@ use js_sys::wasm_bindgen::JsValue; use thiserror::Error; +/// This error is returned when a web worker has been configured with a +/// maximum number of tasks to be queued and one of the `try_run` methods +/// is called. #[derive(Debug, Error)] #[error("WebWorker capacity reached")] pub struct Full; +/// This error is returned during the creation of a new web worker. +/// It covers generic errors in the actual creation and import errors +/// during the initialization. #[derive(Debug, Error)] pub enum InitError { + /// This error covers errors during the `new Worker()` command. #[error("WebWorker creation error: {0:?}")] WebWorkerCreation(JsValue), + /// This error signals that the [`crate::WebWorker`] has been initialized with + /// an invalid path. The path should point to the glue file generated by wasm-bindgen. #[error("WebWorker module loading error: {0:?}")] WebWorkerModuleLoading(String), } diff --git a/src/func.rs b/src/func.rs index e64d631..2cb2810 100644 --- a/src/func.rs +++ b/src/func.rs @@ -1,6 +1,10 @@ use std::marker::PhantomData; +/// This struct describes the function to be called by the worker. +/// It also ensures type safety, when constructed using the [`crate::webworker!`] macro. pub struct WebWorkerFn { + /// The name of the original function. + /// The worker will automatically add the `__webworker_` prefix. pub(crate) name: &'static str, _arg: PhantomData, _res: PhantomData, @@ -15,10 +19,20 @@ impl Clone for WebWorkerFn { impl Copy for WebWorkerFn {} impl WebWorkerFn { + /// Manually creates a [`WebWorkerFn`] object. + /// This function should be avoided in most cases as it does guarantee that the function + /// has the right type or is exposed to the worker. + /// + /// Instead use the [`crate::webworker!`] macro to create an instance of this type. pub fn new_unchecked(func_name: &'static str, _f: fn(T) -> R) -> Self { Self::from_name_unchecked(func_name) } + /// Manually creates a [`WebWorkerFn`] object from only the name of a function. + /// This function should be avoided in most cases as it does guarantee that the function + /// has the right type or is exposed to the worker. + /// + /// Instead use the [`crate::webworker!`] macro to create an instance of this type. pub fn from_name_unchecked(func_name: &'static str) -> Self { Self { name: func_name, @@ -28,6 +42,19 @@ impl WebWorkerFn { } } +/// This macro safely instantiates a [`WebWorkerFn`] instance to be passed to a [`crate::WebWorker`]. +/// It ensures that the function is exposed via the `#[webworker_fn]` procedural macro. +/// +/// Example: +/// ```ignore +/// #[webworker_fn] +/// pub fn sort_vec(mut v: VecType) -> VecType { +/// v.0.sort(); +/// v +/// } +/// +/// let func: WebWorkerFn = webworker!(sort_vec); +/// ``` #[macro_export] macro_rules! webworker { ($name:ident) => {{ diff --git a/src/global.rs b/src/global.rs index ea1ee85..aa548e7 100644 --- a/src/global.rs +++ b/src/global.rs @@ -6,7 +6,28 @@ use crate::pool::{WebWorkerPool, WorkerPoolOptions}; static WORKER_POOL: OnceCell> = OnceCell::const_new(); -#[wasm_bindgen] +/// This function can be called before the first use of the global worker pool to configure it. +/// It takes a [`WorkerPoolOptions`] configuration object. Note that this function is async. +/// +/// ```ignore +/// # use wasmworker::{init_worker_pool, WorkerPoolOptions}; +/// init_worker_pool(WorkerPoolOptions { +/// num_workers: Some(2), +/// ..Default::default() +/// }).await +/// ``` +/// +/// This function can also be called from JavaScript: +/// ```js +/// // Make sure to use the correct path. +/// import init, { initWorkerPool, WorkerPoolOptions } from "./pkg/wasmworker_demo.js"; +/// +/// await init(); +/// let options = new WorkerPoolOptions(); +/// options.num_workers = 3; +/// await initWorkerPool(options); +/// ``` +#[wasm_bindgen(js_name = initWorkerPool)] pub async fn init_worker_pool(options: WorkerPoolOptions) { WORKER_POOL .get_or_init(|| async move { @@ -19,6 +40,11 @@ pub async fn init_worker_pool(options: WorkerPoolOptions) { .await; } +/// This function accesses the default worker pool. +/// If [`init_worker_pool`] has not been manually called, +/// this function will initialize the worker pool prior to returning it. +/// +/// It will use the options provided by [`WorkerPoolOptions::default()`]. pub async fn worker_pool() -> &'static WebWorkerPool { WORKER_POOL .get_or_init(|| async { diff --git a/src/iter_ext/mod.rs b/src/iter_ext/mod.rs index 986364b..b4452aa 100644 --- a/src/iter_ext/mod.rs +++ b/src/iter_ext/mod.rs @@ -5,11 +5,27 @@ use serde::{Deserialize, Serialize}; use crate::{func::WebWorkerFn, worker_pool}; +/// This extension trait defines the method [`IteratorExt::par_map`], +/// which will use the default [`crate::pool::WebWorkerPool`] as returned by [`worker_pool()`]. pub trait IteratorExt: Sized + Iterator where Self::Item: Borrow, T: Serialize + for<'de> Deserialize<'de>, { + /// The `par_map` function allows to parallelize a map operation on the default + /// [`crate::pool::WebWorkerPool`] as returned by [`worker_pool()`]. + /// + /// For each element of the iterator, a new task is scheduled on the worker pool. + /// Only functions that are annotated with the `#[webworker_fn]` macro can be used. + /// + /// Example: + /// ```ignore + /// #[webworker_fn] + /// fn my_func(arg: T) -> R { /*...*/ } + /// + /// let vec = vec![ /*...*/ ]; + /// vec.iter().par_map(webworker!(my_func)).await + /// ``` #[allow(async_fn_in_trait)] async fn par_map(self, func: WebWorkerFn) -> Vec where diff --git a/src/pool/mod.rs b/src/pool/mod.rs index bfed5a9..a447043 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -7,24 +7,33 @@ pub use scheduler::Strategy; use serde::{Deserialize, Serialize}; use web_sys::window; -use crate::{ - error::{Full, InitError}, - func::WebWorkerFn, - WebWorker, -}; +use crate::{error::InitError, func::WebWorkerFn, WebWorker}; mod scheduler; +/// This struct can be used to configure all options of the [`WebWorkerPool`]. +/// +/// If re-exported, the struct can also be accessed via JavaScript: +/// ```js +/// let options = new WorkerPoolOptions(); +/// options.num_workers = 3; +/// ``` #[wasm_bindgen(getter_with_clone)] #[derive(Default, Clone)] pub struct WorkerPoolOptions { + /// The path to the wasm-bindgen glue. By default, this path is inferred. + /// [`crate::WebWorker::with_path`] lists more details on when this path + /// should be manually configured. pub path: Option, + /// The strategy to be used by the worker pool. pub strategy: Option, + /// The number of workers that will be spawned. This defaults to `navigator.hardwareConcurrency`. pub num_workers: Option, } #[wasm_bindgen] impl WorkerPoolOptions { + /// Creates the default options. #[wasm_bindgen(constructor)] pub fn new() -> Self { Default::default() @@ -32,14 +41,17 @@ impl WorkerPoolOptions { } impl WorkerPoolOptions { + /// Returns the path to be used. fn path(&self) -> Option<&str> { self.path.as_deref() } + /// Returns the configured strategy or the default strategy. fn strategy(&self) -> Strategy { self.strategy.unwrap_or_default() } + /// Returns the number of workers, which defaults `navigator.hardwareConcurrency`. fn num_workers(&self) -> usize { self.num_workers.unwrap_or_else(|| { window() @@ -50,16 +62,39 @@ impl WorkerPoolOptions { } } +/// This struct represents a worker pool, i.e., a collection of [`WebWorker`] objects +/// and a scheduler that distributes tasks among those. +/// +/// While multiple pools can be spawned, most often it is sufficient to have a single pool. +/// This library already supports one global web worker pool, which can be accessed with +/// [`crate::worker_pool()`]. +/// +/// Example usage: +/// ```ignore +/// use wasmworker::{webworker, worker_pool}; +/// +/// let worker_pool = worker_pool().await; +/// let res = worker_pool.run(webworker!(sort_vec), &VecType(vec![5, 2, 8])).await; +/// assert_eq!(res.0, vec![2, 5, 8]); +/// ``` pub struct WebWorkerPool { + /// The workers that have been spawned. workers: Vec, + /// The internal scheduler that is used to distribute the tasks. scheduler: Scheduler, } impl WebWorkerPool { + /// Initializes a worker pool with default [`WorkerPoolOptions`]. + /// This async function might return an [`InitError`] if one of the workers + /// cannot be initialized, as described in [`WebWorker::new`]. pub async fn new() -> Result { Self::with_options(WorkerPoolOptions::default()).await } + /// Initializes a worker pool with a given strategy and otherwise default [`WorkerPoolOptions`]. + /// This async function might return an [`InitError`] if one of the workers + /// cannot be initialized, as described in [`WebWorker::new`]. pub async fn with_strategy(strategy: Strategy) -> Result { Self::with_options(WorkerPoolOptions { strategy: Some(strategy), @@ -68,6 +103,9 @@ impl WebWorkerPool { .await } + /// Initializes a worker pool with a given number of workers and otherwise default [`WorkerPoolOptions`]. + /// This async function might return an [`InitError`] if one of the workers + /// cannot be initialized, as described in [`WebWorker::new`]. pub async fn with_num_workers(num_workers: usize) -> Result { Self::with_options(WorkerPoolOptions { num_workers: Some(num_workers), @@ -76,6 +114,9 @@ impl WebWorkerPool { .await } + /// Initializes a worker pool with a given path and otherwise default [`WorkerPoolOptions`]. + /// This async function might return an [`InitError`] if one of the workers + /// cannot be initialized, as described in [`WebWorker::new`]. pub async fn with_path(path: String) -> Result { Self::with_options(WorkerPoolOptions { path: Some(path), @@ -84,6 +125,9 @@ impl WebWorkerPool { .await } + /// Initializes a worker pool with the given [`WorkerPoolOptions`]. + /// This async function might return an [`InitError`] if one of the workers + /// cannot be initialized, as described in [`WebWorker::new`]. pub async fn with_options(options: WorkerPoolOptions) -> Result { let worker_inits = (0..options.num_workers()).map(|_| { // Do not impose a task limit. @@ -98,6 +142,17 @@ impl WebWorkerPool { }) } + /// This is the most general function to outsource a task on a [`WebWorkerPool`]. + /// It will automatically handle serialization of the argument, scheduling of the task on the pool, + /// and deserialization of the return value. + /// + /// The `func`: [`WebWorkerFn`] argument should normally be instantiated using the [`crate::webworker!`] macro. + /// This ensures type safety and that the function is correctly exposed to the worker. + /// + /// Example: + /// ```ignore + /// worker_pool().await.run(webworker!(sort_vec), &my_vec).await + /// ``` #[cfg(feature = "serde")] pub async fn run(&self, func: WebWorkerFn, arg: &T) -> R where @@ -107,16 +162,17 @@ impl WebWorkerPool { self.run_internal(func, arg).await } - #[cfg(feature = "serde")] - pub async fn try_run(&self, func: WebWorkerFn, arg: &T) -> Result - where - T: Serialize + for<'de> Deserialize<'de>, - R: Serialize + for<'de> Deserialize<'de>, - { - let worker_id = self.scheduler.schedule(self); - self.workers[worker_id].try_run(func, arg).await - } - + /// This function can outsource a task on a [`WebWorkerPool`] which has `Box<[u8]>` both as input and output. + /// (De)serialization of values needs to be handled by the caller. + /// For more convenient access, make sure the `serde` feature is enabled and use [`WebWorkerPool::run`]. + /// + /// The `func`: [`WebWorkerFn`] argument should normally be instantiated using the [`crate::webworker!`] macro. + /// This ensures type safety and that the function is correctly exposed to the worker. + /// + /// Example: + /// ```ignore + /// worker_pool().await.run_bytes(webworker!(sort), &my_box).await + /// ``` pub async fn run_bytes( &self, func: WebWorkerFn, Box<[u8]>>, @@ -125,15 +181,8 @@ impl WebWorkerPool { self.run_internal(func, arg).await } - pub async fn try_run_bytes( - &self, - func: WebWorkerFn, Box<[u8]>>, - arg: &Box<[u8]>, - ) -> Result, Full> { - let worker_id = self.scheduler.schedule(self); - self.workers[worker_id].try_run_bytes(func, arg).await - } - + /// Determines the worker to run the task on using the scheduler + /// and runs the task. pub(crate) async fn run_internal(&self, func: WebWorkerFn, arg: A) -> R where A: Borrow, @@ -151,6 +200,7 @@ impl WebWorkerPool { self.workers.iter().map(WebWorker::current_load).sum() } + /// Return the number of workers in the pool. pub fn num_workers(&self) -> usize { self.workers.len() } diff --git a/src/pool/scheduler.rs b/src/pool/scheduler.rs index 681d2f4..2ebd214 100644 --- a/src/pool/scheduler.rs +++ b/src/pool/scheduler.rs @@ -4,20 +4,40 @@ use wasm_bindgen::{prelude::wasm_bindgen, UnwrapThrowExt}; use super::WebWorkerPool; +/// This enumeration contains the supported strategies for distributing +/// tasks within the worker pool. +/// +/// If re-exported, the strategy can also be accessed from JavaScript. +/// Rust: +/// ```rust +/// pub use wasmworker::pool::Strategy; +/// ``` #[wasm_bindgen] #[derive(Default, Clone, Copy, PartialEq, Eq)] pub enum Strategy { + /// The round-robin strategy will allocate tasks in a round-robin fashion + /// to the workers in the pool. #[default] RoundRobin, + /// The load-based strategy will allocate a task always to the worker with + /// the lowest number of tasks already scheduled. + /// If more than one worker has the same number of tasks scheduled, the first + /// one is chosen. LoadBased, } +/// The internal scheduler object, which contains necessary additional state +/// for the scheduling. pub(super) struct Scheduler { + /// The chosen strategy. strategy: Strategy, + /// The currently chosen worker. + /// This state is only relevant for the round-robin strategy. current_worker: Cell, } impl Scheduler { + /// Initialize a new scheduler. pub(super) fn new(strategy: Strategy) -> Self { Self { strategy, @@ -25,6 +45,8 @@ impl Scheduler { } } + /// Given the pool, apply the strategy and determine which worker + /// should receive the next task. pub(super) fn schedule(&self, pool: &WebWorkerPool) -> usize { match self.strategy { Strategy::RoundRobin => { diff --git a/src/webworker/com.rs b/src/webworker/com.rs index 8b5b7fd..dea007f 100644 --- a/src/webworker/com.rs +++ b/src/webworker/com.rs @@ -1,23 +1,41 @@ use serde::{Deserialize, Serialize}; +/// Message sent by the worker after initialization. +/// This is used to alert the main thread that initialization is complete. +/// It also indicates if errors occurred during the import. #[derive(Deserialize)] pub(super) struct PostInit { + /// `true` if initialization is complete, and `false` if import errors occurred. pub(crate) success: bool, + /// The `message` is only set if `success` is false. + /// It contains a description of the error that occurred. #[serde(default)] pub(crate) message: Option, } +/// This message is sent to the worker when a new task should be executed. #[derive(Serialize, Deserialize)] pub(super) struct Request { + /// This is the internal task id, which is used to match a [`Response`] + /// to the corresponding task. pub(crate) id: usize, + /// The name of the function to be executed by the worker. pub(crate) func_name: &'static str, + /// The serialized argument to be passed to the function. + /// Serialization is done using [`crate::convert::to_bytes`]. #[serde(with = "serde_bytes")] pub(crate) arg: Box<[u8]>, } +/// This message is sent back from the worker once a task is completed, +/// i.e., the function has been executed successfully and we have a result. #[derive(Serialize, Deserialize)] pub(super) struct Response { + /// The corresponding task id, matching the original id from the [`Request`] object. pub(crate) id: usize, + /// The response, which should only be `None` if the function could not be found. + /// This should never be the case if the [`crate::func::WebWorkerFn`] was constructed + /// using the [`crate::webworker!`] macro. #[serde(with = "serde_bytes")] pub(crate) response: Option>, } diff --git a/src/webworker/js.rs b/src/webworker/js.rs index 591758a..b3a6cc6 100644 --- a/src/webworker/js.rs +++ b/src/webworker/js.rs @@ -1,6 +1,10 @@ use js_sys::JsString; use wasm_bindgen::prelude::wasm_bindgen; +/// The initialization code for the worker, +/// which will be loaded as a blob. +/// +/// `{{wasm}}` will be replaced later by an actual path. pub(crate) const WORKER_JS: &str = r#" console.debug('Initializing worker'); @@ -39,6 +43,8 @@ console.debug('Initializing worker'); })(); "#; +/// This function normally returns the path of our wasm-bindgen glue file. +/// It only works in module environments, though. pub(crate) fn main_js() -> JsString { #[wasm_bindgen] extern "C" { diff --git a/src/webworker/worker.rs b/src/webworker/worker.rs index 6825808..289406c 100644 --- a/src/webworker/worker.rs +++ b/src/webworker/worker.rs @@ -18,17 +18,39 @@ use crate::{ func::WebWorkerFn, }; -pub type Callback = dyn FnMut(MessageEvent); - +/// An internal type for the callback. +type Callback = dyn FnMut(MessageEvent); + +/// This struct represents a single web worker instance. +/// It can be created using [`WebWorker::new`] or [`WebWorker::with_path`]. +/// When an instance of this type is dropped, it also terminates the corresponding web worker. +/// +/// Example usage: +/// ```ignore +/// use wasmworker::{webworker, WebWorker}; +/// +/// let worker = WebWorker::new(None).await; +/// let res = worker.run(webworker!(sort_vec), &VecType(vec![5, 2, 8])).await; +/// assert_eq!(res.0, vec![2, 5, 8]); +/// ``` pub struct WebWorker { + /// The underlying web worker. worker: Worker, + /// An optional limit on the number of tasks queued at the same time. task_limit: Option, + /// The current task id, which is used to reidentify responses. current_task: Cell, + /// A map between task ids and the channel they need to be sent out with. + /// [Response]s will arrive on our callback and we redistribute them to their origin. open_tasks: Rc>>>, + /// The callback handle for the worker. _callback: Closure, } impl WebWorker { + /// This function takes the [`WORKER_JS`] and creates the corresponding + /// worker blob after inserting the given path. + /// If no `wasm_path` is provided, the [`main_js()`] path is used. fn worker_blob(wasm_path: Option<&str>) -> String { let blob_options = BlobPropertyBag::new(); blob_options.set_type("application/javascript"); @@ -52,11 +74,20 @@ impl WebWorker { .expect_throw("Couldn't create object URL") } + /// Create a new [`WebWorker`] with an optional limit on the number of tasks queued. + /// This can fail with an [`InitError`], for example, if the automatically inferred + /// path for the wasm-bindgen glue is wrong. pub async fn new(task_limit: Option) -> Result { Self::with_path(None, task_limit).await } - /// Create a new WrappedWorker + /// Create a new [`WebWorker`] with an optional limit on the number of tasks queued. + /// This function also receives an optional `main_js` path, which should point to the + /// glue file generated by wasm-bindgen. + /// + /// In the standard setup, the path is automatically inferred. + /// In more complex setups, it might be necessary to manually set this path. + /// If a wrong path is given, a [`InitError`] will be returned. pub async fn with_path( main_js: Option<&str>, task_limit: Option, @@ -120,6 +151,20 @@ impl WebWorker { }) } + /// This is the most general function to outsource a task on a [`WebWorker`]. + /// It will automatically handle serialization of the argument, scheduling of the task on the worker, + /// and deserialization of the return value. + /// + /// The `func`: [`WebWorkerFn`] argument should normally be instantiated using the [`crate::webworker!`] macro. + /// This ensures type safety and that the function is correctly exposed to the worker. + /// + /// If a task limit has been set, this function will yield until previous tasks have been finished. + /// This is achieved by a semaphore before task submission to the worker. + /// + /// Example: + /// ```ignore + /// worker.run(webworker!(sort_vec), &my_vec).await + /// ``` #[cfg(feature = "serde")] pub async fn run(&self, func: WebWorkerFn, arg: &T) -> R where @@ -129,6 +174,18 @@ impl WebWorker { self.run_internal(func, arg).await } + /// This function differs from [`WebWorker::run`] by returning early if the given task limit is reached. + /// In this case a [`Full`] error is returned. + /// + /// The `func`: [`WebWorkerFn`] argument should normally be instantiated using the [`crate::webworker!`] macro. + /// This ensures type safety and that the function is correctly exposed to the worker. + /// + /// If no task limit has been set, this function can never return an error. + /// + /// Example: + /// ```ignore + /// worker.try_run(webworker!(sort_vec), &my_vec).await + /// ``` #[cfg(feature = "serde")] pub async fn try_run(&self, func: WebWorkerFn, arg: &T) -> Result where @@ -138,6 +195,20 @@ impl WebWorker { self.try_run_internal(func, arg).await } + /// This function can outsource a task on a [`WebWorker`] which has `Box<[u8]>` both as input and output. + /// (De)serialization of values needs to be handled by the caller. + /// For more convenient access, make sure the `serde` feature is enabled and use [`WebWorker::run`]. + /// + /// The `func`: [`WebWorkerFn`] argument should normally be instantiated using the [`crate::webworker!`] macro. + /// This ensures type safety and that the function is correctly exposed to the worker. + /// + /// If a task limit has been set, this function will yield until previous tasks have been finished. + /// This is achieved by a semaphore before task submission to the worker. + /// + /// Example: + /// ```ignore + /// worker.run_bytes(webworker!(sort), &my_box).await + /// ``` pub async fn run_bytes( &self, func: WebWorkerFn, Box<[u8]>>, @@ -146,6 +217,20 @@ impl WebWorker { self.run_internal(func, arg).await } + /// This function differs from [`WebWorker::run_bytes`] by returning early if the given task limit is reached. + /// In this case a [`Full`] error is returned. + /// (De)serialization of values needs to be handled by the caller. + /// For more convenient access, make sure the `serde` feature is enabled and use [`WebWorker::try_run`]. + /// + /// The `func`: [`WebWorkerFn`] argument should normally be instantiated using the [`crate::webworker!`] macro. + /// This ensures type safety and that the function is correctly exposed to the worker. + /// + /// If no task limit has been set, this function can never return an error. + /// + /// Example: + /// ```ignore + /// worker.try_run_bytes(webworker!(sort), &my_box).await + /// ``` pub async fn try_run_bytes( &self, func: WebWorkerFn, Box<[u8]>>, @@ -154,6 +239,8 @@ impl WebWorker { self.try_run_internal(func, arg).await } + /// Internal function to schedule a task to the worker. + /// This variant returns early if a semaphore permit cannot be obtained immediately. pub(crate) async fn try_run_internal( &self, func: WebWorkerFn, @@ -177,6 +264,7 @@ impl WebWorker { Ok(self.force_run(func.name, arg).await) } + /// Internal function to schedule a task to the worker. pub(crate) async fn run_internal(&self, func: WebWorkerFn, arg: &T) -> R where T: Serialize + for<'de> Deserialize<'de>, @@ -193,6 +281,9 @@ impl WebWorker { self.force_run(func.name, arg).await } + /// This function handles the communication with the worker + /// after the task limit has been checked. + /// It also handles (de)serialization. async fn force_run(&self, func_name: &'static str, arg: &T) -> R where T: Serialize + for<'de> Deserialize<'de>, diff --git a/test/src/convert.rs b/test/src/convert.rs index de2d044..608bf06 100644 --- a/test/src/convert.rs +++ b/test/src/convert.rs @@ -58,13 +58,6 @@ pub(crate) async fn can_schedule_task() { sorted_vec.sort(); let sorted_vec = VecType(sorted_vec); - // Test try run. - let res1 = pool - .try_run(webworker!(sort_vec), &vec) - .await - .expect("Should not be full"); - js_assert_eq!(res1, sorted_vec); - // Test run. let res2 = pool.run(webworker!(sort_vec), &vec).await; js_assert_eq!(res2, sorted_vec); diff --git a/test/src/lib.rs b/test/src/lib.rs index dc7f9e3..53756fc 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -22,6 +22,7 @@ macro_rules! js_assert_eq { #[wasm_bindgen(js_name = runTests)] pub async fn run_tests() { + can_handle_invalid_paths().await; can_run_task_bytes().await; can_limit_tasks_bytes().await; can_schedule_task_bytes().await; diff --git a/test/src/raw.rs b/test/src/raw.rs index ace30ed..0b420f3 100644 --- a/test/src/raw.rs +++ b/test/src/raw.rs @@ -1,4 +1,5 @@ -use wasmworker::{webworker, worker_pool, WebWorker}; +use wasm_bindgen::throw_str; +use wasmworker::{error::InitError, webworker, worker_pool, WebWorker}; use wasmworker_proc_macro::webworker_fn; use crate::js_assert_eq; @@ -9,6 +10,13 @@ pub fn sort(mut v: Box<[u8]>) -> Box<[u8]> { v } +pub(crate) async fn can_handle_invalid_paths() { + let worker = WebWorker::with_path(Some("something"), None).await; + if !matches!(worker, Err(InitError::WebWorkerModuleLoading(_))) { + throw_str("Should have failed initialization with wrong path"); + } +} + pub(crate) async fn can_run_task_bytes() { let worker = WebWorker::new(None).await.expect("Couldn't create worker"); @@ -57,13 +65,6 @@ pub(crate) async fn can_schedule_task_bytes() { let vec = vec.into(); let sorted_vec = sorted_vec.into(); - // Test try run. - let res1 = pool - .try_run_bytes(webworker!(sort), &vec) - .await - .expect("Should not be full"); - js_assert_eq!(res1, sorted_vec); - // Test run. let res2 = pool.run_bytes(webworker!(sort), &vec).await; js_assert_eq!(res2, sorted_vec);