From ce23a4453895ef418517ba0c5e8afac094aff33b Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Fri, 27 Sep 2024 13:24:35 +1000 Subject: [PATCH 01/10] Minor fixes for `bevy_utils` in `no_std` `thread_local`, `getrandom`, and `web-time` were being included even in `no_std` configurations. Additionally, `hashbrown` had extra features enabled that should've been gated. --- crates/bevy_utils/Cargo.toml | 21 ++++++++++++++------- crates/bevy_utils/src/lib.rs | 5 ++++- crates/bevy_utils/src/time.rs | 11 +++++++++++ 3 files changed, 29 insertions(+), 8 deletions(-) create mode 100644 crates/bevy_utils/src/time.rs diff --git a/crates/bevy_utils/Cargo.toml b/crates/bevy_utils/Cargo.toml index 7f83ec17c9a7b..c1555d8ba94bb 100644 --- a/crates/bevy_utils/Cargo.toml +++ b/crates/bevy_utils/Cargo.toml @@ -9,26 +9,33 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["std"] -std = ["alloc", "tracing/std", "ahash/std"] -alloc = [] +default = ["std", "serde"] +std = [ + "alloc", + "tracing/std", + "ahash/std", + "dep:thread_local", + "ahash/runtime-rng", +] +alloc = ["hashbrown/default"] detailed_trace = [] +serde = ["hashbrown/serde"] [dependencies] ahash = { version = "0.8.7", default-features = false, features = [ - "runtime-rng", + "compile-time-rng", ] } tracing = { version = "0.1", default-features = false } -web-time = { version = "1.1" } -hashbrown = { version = "0.14.2", features = ["serde"] } +hashbrown = { version = "0.14.2", default-features = false } bevy_utils_proc_macros = { version = "0.15.0-dev", path = "macros" } -thread_local = "1.0" +thread_local = { version = "1.0", optional = true } [dev-dependencies] static_assertions = "1.1.0" [target.'cfg(target_arch = "wasm32")'.dependencies] getrandom = { version = "0.2.0", features = ["js"] } +web-time = { version = "1.1" } [lints] workspace = true diff --git a/crates/bevy_utils/src/lib.rs b/crates/bevy_utils/src/lib.rs index 9cd1782345e88..1f7e1f7e5c32b 100644 --- a/crates/bevy_utils/src/lib.rs +++ b/crates/bevy_utils/src/lib.rs @@ -31,15 +31,18 @@ mod default; mod object_safe; pub use object_safe::assert_object_safe; mod once; +#[cfg(feature = "std")] mod parallel_queue; +mod time; pub use ahash::{AHasher, RandomState}; pub use bevy_utils_proc_macros::*; pub use default::default; pub use hashbrown; +#[cfg(feature = "std")] pub use parallel_queue::*; +pub use time::*; pub use tracing; -pub use web_time::{Duration, Instant, SystemTime, SystemTimeError, TryFromFloatSecsError}; #[cfg(feature = "alloc")] use alloc::boxed::Box; diff --git a/crates/bevy_utils/src/time.rs b/crates/bevy_utils/src/time.rs new file mode 100644 index 0000000000000..fb5136eb03d2a --- /dev/null +++ b/crates/bevy_utils/src/time.rs @@ -0,0 +1,11 @@ +#[cfg(target_arch = "wasm32")] +pub use web_time::{Duration, Instant, SystemTime, SystemTimeError, TryFromFloatSecsError}; + +#[cfg(all(not(target_arch = "wasm32"), feature = "std"))] +pub use { + core::time::{Duration, TryFromFloatSecsError}, + std::time::{Instant, SystemTime, SystemTimeError}, +}; + +#[cfg(all(not(target_arch = "wasm32"), not(feature = "std")))] +pub use core::time::{Duration, TryFromFloatSecsError}; From 905c1276b4c99d87108ddc398772c68c1e4b3844 Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Fri, 27 Sep 2024 12:51:04 +1000 Subject: [PATCH 02/10] Initial `no_std` support in `bevy_tasks` --- crates/bevy_tasks/Cargo.toml | 30 +- crates/bevy_tasks/src/executor.rs | 844 ++++++++++++++++++ crates/bevy_tasks/src/iter/mod.rs | 22 +- crates/bevy_tasks/src/lib.rs | 74 +- .../src/single_threaded_task_pool.rs | 104 ++- crates/bevy_tasks/src/slice.rs | 10 +- crates/bevy_tasks/src/task.rs | 12 +- crates/bevy_tasks/src/task_pool.rs | 38 +- crates/bevy_tasks/src/thread_executor.rs | 4 +- crates/bevy_tasks/src/usages.rs | 19 +- 10 files changed, 1077 insertions(+), 80 deletions(-) create mode 100644 crates/bevy_tasks/src/executor.rs diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 0e8831e59f9c9..53fd71995220d 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,14 +9,36 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -multi_threaded = ["dep:async-channel", "dep:concurrent-queue"] +default = ["std"] +multi_threaded = ["std", "dep:async-channel"] +async-io = ["std", "dep:async-io"] +std = [ + "futures-lite/std", + "async-task/std", + "slab/std", + "fastrand/std", + "spin/std", + "concurrent-queue/std", +] [dependencies] -futures-lite = "2.0.1" -async-executor = "1.11" +futures-lite = { version = "2.0.1", default-features = false, features = [ + "race", +] } +async-task = { version = "4.4.0", default-features = false } +slab = { version = "0.4.4", default-features = false } +fastrand = { version = "2.0.0", default-features = false } +spin = { version = "0.9.8", default-features = false, features = [ + "spin_mutex", + "rwlock", + "once", +] } +concurrent-queue = { version = "2.0.0", default-features = false } + +bevy_utils = { path = "../bevy_utils", version = "0.15.0-dev", default-features = false } + async-channel = { version = "2.2.0", optional = true } async-io = { version = "2.0.0", optional = true } -concurrent-queue = { version = "2.0.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs new file mode 100644 index 0000000000000..eae7f86bd79e7 --- /dev/null +++ b/crates/bevy_tasks/src/executor.rs @@ -0,0 +1,844 @@ +#![expect( + unsafe_code, + reason = "executor implementation requires use of AtomicPtr and lifetime transmutation" +)] +#![expect( + dead_code, + reason = "feature gating means certain methods aren't used on certain targets" +)] + +use alloc::{sync::Arc, vec::Vec}; + +use core::{ + fmt, + marker::PhantomData, + panic::{RefUnwindSafe, UnwindSafe}, + sync::atomic::{AtomicBool, AtomicPtr, Ordering}, + task::{Poll, Waker}, +}; + +use async_task::{Builder, Runnable, Task}; +use concurrent_queue::ConcurrentQueue; +use futures_lite::{future, prelude::*}; +use slab::Slab; +use spin::{Mutex, RwLock}; + +use bevy_utils::OnDrop; + +/// An async executor. +/// Based on the reference executor provided by [async_executor]. +/// +/// [async_executor]: https://crates.io/crates/async-executor +pub struct Executor<'a> { + /// The executor state. + state: AtomicPtr, + + /// Makes the `'a` lifetime invariant. + _marker: PhantomData>, +} + +// SAFETY: Executor stores no thread local state that can be accessed via other thread. +unsafe impl Send for Executor<'_> {} +// SAFETY: Executor internally synchronizes all of it's operations internally. +unsafe impl Sync for Executor<'_> {} + +impl UnwindSafe for Executor<'_> {} +impl RefUnwindSafe for Executor<'_> {} + +impl fmt::Debug for Executor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(self, "Executor", f) + } +} + +impl<'a> Executor<'a> { + /// Creates a new executor. + pub const fn new() -> Executor<'a> { + Executor { + state: AtomicPtr::new(core::ptr::null_mut()), + _marker: PhantomData, + } + } + + /// Returns `true` if there are no unfinished tasks. + pub fn is_empty(&self) -> bool { + self.state().active.lock().is_empty() + } + + /// Spawns a task onto the executor. + pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { + let mut active = self.state().active.lock(); + + // SAFETY: `T` and the future are `Send`. + unsafe { self.spawn_inner(future, &mut active) } + } + + /// Spawn a future while holding the inner lock. + /// + /// # Safety + /// + /// If this is an `Executor`, `F` and `T` must be `Send`. + unsafe fn spawn_inner( + &self, + future: impl Future + 'a, + active: &mut Slab, + ) -> Task { + // Remove the task from the set of active tasks when the future finishes. + let entry = active.vacant_entry(); + let index = entry.key(); + let state = self.state_as_arc(); + let future = async move { + let _guard = OnDrop::new(move || drop(state.active.lock().try_remove(index))); + future.await + }; + + // Create the task and register it in the set of active tasks. + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = unsafe { + let builder = Builder::new(); + + #[cfg(feature = "std")] + let builder = builder.propagate_panic(true); + + builder.spawn_unchecked(|()| future, self.schedule()) + }; + entry.insert(runnable.waker()); + + runnable.schedule(); + task + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + pub fn try_tick(&self) -> bool { + self.state().try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + pub async fn tick(&self) { + self.state().tick().await; + } + + /// Runs the executor until the given future completes. + pub async fn run(&self, future: impl Future) -> T { + self.state().run(future).await + } + + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.state_as_arc(); + + // TODO: If possible, push into the current local queue and notify the ticker. + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } + + /// Returns a pointer to the inner state. + #[inline] + fn state_ptr(&self) -> *const State { + let mut ptr = self.state.load(Ordering::Acquire); + if ptr.is_null() { + ptr = State::alloc_atomic(&self.state); + } + ptr + } + + /// Returns a reference to the inner state. + #[inline] + fn state(&self) -> &State { + // SAFETY: So long as an Executor lives, it's state pointer will always be valid + // when accessed through state_ptr. + unsafe { &*self.state_ptr() } + } + + // Clones the inner state Arc + #[inline] + fn state_as_arc(&self) -> Arc { + // SAFETY: So long as an Executor lives, it's state pointer will always be a valid + // Arc when accessed through state_ptr. + let arc = unsafe { Arc::from_raw(self.state_ptr()) }; + let clone = arc.clone(); + core::mem::forget(arc); + clone + } +} + +impl Drop for Executor<'_> { + fn drop(&mut self) { + let ptr = *self.state.get_mut(); + if ptr.is_null() { + return; + } + + // SAFETY: As ptr is not null, it was allocated via Arc::new and converted + // via Arc::into_raw in state_ptr. + let state = unsafe { Arc::from_raw(ptr) }; + + let mut active = state.active.lock(); + for w in active.drain() { + w.wake(); + } + drop(active); + + while state.queue.pop().is_ok() {} + } +} + +impl<'a> Default for Executor<'a> { + fn default() -> Executor<'a> { + Executor::new() + } +} + +/// A thread-local executor. +/// +/// The executor can only be run on the thread that created it. +pub struct LocalExecutor<'a> { + /// The inner executor. + inner: Executor<'a>, + + /// Makes the type `!Send` and `!Sync`. + _marker: PhantomData>, +} + +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} + +impl fmt::Debug for LocalExecutor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_executor(&self.inner, "LocalExecutor", f) + } +} + +impl<'a> LocalExecutor<'a> { + /// Creates a single-threaded executor. + pub const fn new() -> LocalExecutor<'a> { + LocalExecutor { + inner: Executor::new(), + _marker: PhantomData, + } + } + + /// Returns `true` if there are no unfinished tasks. + pub fn is_empty(&self) -> bool { + self.inner().is_empty() + } + + /// Spawns a task onto the executor. + pub fn spawn(&self, future: impl Future + 'a) -> Task { + let mut active = self.inner().state().active.lock(); + + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { self.inner().spawn_inner(future, &mut active) } + } + + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// It is assumed that the iterator provided does not block; blocking iterators can lock up + /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the + /// mutex is not released, as there are no other threads that can poll this executor. + /// + /// [`spawn`]: LocalExecutor::spawn + /// [`Executor::spawn_many`]: Executor::spawn_many + pub fn spawn_many + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let mut active = self.inner().state().active.lock(); + + // Convert all of the futures to tasks. + let tasks = futures.into_iter().map(|future| { + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { self.inner().spawn_inner(future, &mut active) } + + // As only one thread can spawn or poll tasks at a time, there is no need + // to release lock contention here. + }); + + // Push them to the user's collection. + handles.extend(tasks); + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + pub fn try_tick(&self) -> bool { + self.inner().try_tick() + } + + /// Runs a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + pub async fn tick(&self) { + self.inner().tick().await; + } + + /// Runs the executor until the given future completes. + pub async fn run(&self, future: impl Future) -> T { + self.inner().run(future).await + } + + /// Returns a reference to the inner executor. + fn inner(&self) -> &Executor<'a> { + &self.inner + } +} + +impl<'a> Default for LocalExecutor<'a> { + fn default() -> LocalExecutor<'a> { + LocalExecutor::new() + } +} + +/// The state of a executor. +struct State { + /// The global queue. + queue: ConcurrentQueue, + + /// Local queues created by runners. + local_queues: RwLock>>>, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + /// A list of sleeping tickers. + sleepers: Mutex, + + /// Currently active tasks. + active: Mutex>, +} + +impl State { + /// Creates state for a new executor. + const fn new() -> State { + State { + queue: ConcurrentQueue::unbounded(), + local_queues: RwLock::new(Vec::new()), + notified: AtomicBool::new(true), + sleepers: Mutex::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + active: Mutex::new(Slab::new()), + } + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify(&self) { + if self + .notified + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let waker = self.sleepers.lock().notify(); + if let Some(w) = waker { + w.wake(); + } + } + } + + fn try_tick(&self) -> bool { + match self.queue.pop() { + Err(_) => false, + Ok(runnable) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.notify(); + + // Run the task. + runnable.run(); + true + } + } + } + + async fn tick(&self) { + let runnable = Ticker::new(self).runnable().await; + runnable.run(); + } + + async fn run(&self, future: impl Future) -> T { + let mut runner = Runner::new(self); + let mut rng = fastrand::Rng::with_seed(0x4d595df4d0f33173); + + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; + } + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await + } + + #[cold] + fn alloc_atomic(atomic_ptr: &AtomicPtr) -> *mut Self { + let state = Arc::new(Self::new()); + let ptr = Arc::into_raw(state).cast_mut(); + if let Err(actual) = atomic_ptr.compare_exchange( + core::ptr::null_mut(), + ptr, + Ordering::AcqRel, + Ordering::Acquire, + ) { + // SAFETY: This was just created from Arc::into_raw. + drop(unsafe { Arc::from_raw(ptr) }); + actual + } else { + ptr + } + } +} + +/// A list of sleeping tickers. +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec<(usize, Waker)>, + + /// Reclaimed IDs. + free_ids: Vec, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> usize { + let id = match self.free_ids.pop() { + Some(id) => id, + None => self.count + 1, + }; + self.count += 1; + self.wakers.push((id, waker.clone())); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: usize, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.0 == id { + item.1.clone_from(waker); + return false; + } + } + + self.wakers.push((id, waker.clone())); + true + } + + /// Removes a previously inserted sleeping ticker. + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: usize) -> bool { + self.count -= 1; + self.free_ids.push(id); + + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].0 == id { + self.wakers.remove(i); + return false; + } + } + true + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.1) + } else { + None + } + } +} + +/// Runs task one by one. +struct Ticker<'a> { + /// The executor state. + state: &'a State, + + /// Set to a non-zero sleeper ID when in sleeping state. + /// + /// States a ticker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. + sleeping: usize, +} + +impl Ticker<'_> { + /// Creates a ticker. + fn new(state: &State) -> Ticker<'_> { + Ticker { state, sleeping: 0 } + } + + /// Moves the ticker into sleeping and unnotified state. + /// + /// Returns `false` if the ticker was already sleeping and unnotified. + fn sleep(&mut self, waker: &Waker) -> bool { + let mut sleepers = self.state.sleepers.lock(); + + match self.sleeping { + // Move to sleeping state. + 0 => { + self.sleeping = sleepers.insert(waker); + } + + // Already sleeping, check if notified. + id => { + if !sleepers.update(id, waker) { + return false; + } + } + } + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + true + } + + /// Moves the ticker into woken state. + fn wake(&mut self) { + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock(); + sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + } + self.sleeping = 0; + } + + /// Waits for the next runnable task to run. + async fn runnable(&mut self) -> Runnable { + self.runnable_with(|| self.state.queue.pop().ok()).await + } + + /// Waits for the next runnable task to run, given a function that searches for a task. + async fn runnable_with(&mut self, mut search: impl FnMut() -> Option) -> Runnable { + future::poll_fn(|cx| { + loop { + match search() { + None => { + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + Some(r) => { + // Wake up. + self.wake(); + + // Notify another ticker now to pick up where this ticker left off, just in + // case running the task takes a long time. + self.state.notify(); + + return Poll::Ready(r); + } + } + } + }) + .await + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + if self.sleeping != 0 { + let mut sleepers = self.state.sleepers.lock(); + let notified = sleepers.remove(self.sleeping); + + self.state + .notified + .store(sleepers.is_notified(), Ordering::Release); + + // If this ticker was notified, then notify another ticker. + if notified { + drop(sleepers); + self.state.notify(); + } + } + } +} + +/// A worker in a work-stealing executor. +/// +/// This is just a ticker that also has an associated local queue for improved cache locality. +struct Runner<'a> { + /// The executor state. + state: &'a State, + + /// Inner ticker. + ticker: Ticker<'a>, + + /// The local queue. + local: Arc>, + + /// Bumped every time a runnable task is found. + ticks: usize, +} + +impl Runner<'_> { + /// Creates a runner and registers it in the executor state. + fn new(state: &State) -> Runner<'_> { + let runner = Runner { + state, + ticker: Ticker::new(state), + local: Arc::new(ConcurrentQueue::bounded(512)), + ticks: 0, + }; + state.local_queues.write().push(runner.local.clone()); + runner + } + + /// Waits for the next runnable task to run. + async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable { + let runnable = self + .ticker + .runnable_with(|| { + // Try the local queue. + if let Ok(r) = self.local.pop() { + return Some(r); + } + + // Try stealing from the global queue. + if let Ok(r) = self.state.queue.pop() { + steal(&self.state.queue, &self.local); + return Some(r); + } + + // Try stealing from other runners. + let local_queues = self.state.local_queues.read(); + + // Pick a random starting point in the iterator list and rotate the list. + let n = local_queues.len(); + let start = rng.usize(..n); + let iter = local_queues + .iter() + .chain(local_queues.iter()) + .skip(start) + .take(n); + + // Remove this runner's local queue. + let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); + + // Try stealing from each local queue in the list. + for local in iter { + steal(local, &self.local); + if let Ok(r) = self.local.pop() { + return Some(r); + } + } + + None + }) + .await; + + // Bump the tick counter. + self.ticks = self.ticks.wrapping_add(1); + + if self.ticks % 64 == 0 { + // Steal tasks from the global queue to ensure fair task scheduling. + steal(&self.state.queue, &self.local); + } + + runnable + } +} + +impl Drop for Runner<'_> { + fn drop(&mut self) { + // Remove the local queue. + self.state + .local_queues + .write() + .retain(|local| !Arc::ptr_eq(local, &self.local)); + + // Re-schedule remaining tasks in the local queue. + while let Ok(r) = self.local.pop() { + r.schedule(); + } + } +} + +/// Steals some items from one queue into another. +fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { + // Half of `src`'s length rounded up. + let mut count = (src.len() + 1) / 2; + + if count > 0 { + // Don't steal more than fits into the queue. + if let Some(cap) = dest.capacity() { + count = count.min(cap - dest.len()); + } + + // Steal tasks. + for _ in 0..count { + if let Ok(t) = src.pop() { + assert!(dest.push(t).is_ok()); + } else { + break; + } + } + } +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // Get a reference to the state. + let ptr = executor.state.load(Ordering::Acquire); + if ptr.is_null() { + // The executor has not been initialized. + struct Uninitialized; + + impl fmt::Debug for Uninitialized { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } + } + + return f.debug_tuple(name).field(&Uninitialized).finish(); + } + + // SAFETY: If the state pointer is not null, it must have been + // allocated properly by Arc::new and converted via Arc::into_raw + // in state_ptr. + let state = unsafe { &*ptr }; + + debug_state(state, name, f) +} + +/// Debug implementation for `Executor` and `LocalExecutor`. +fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { + /// Debug wrapper for the number of active tasks. + struct ActiveTasks<'a>(&'a Mutex>); + + impl fmt::Debug for ActiveTasks<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Some(lock) => fmt::Debug::fmt(&lock.len(), f), + None => f.write_str(""), + } + } + } + + /// Debug wrapper for the local runners. + struct LocalRunners<'a>(&'a RwLock>>>); + + impl fmt::Debug for LocalRunners<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_read() { + Some(lock) => f + .debug_list() + .entries(lock.iter().map(|queue| queue.len())) + .finish(), + None => f.write_str(""), + } + } + } + + /// Debug wrapper for the sleepers. + struct SleepCount<'a>(&'a Mutex); + + impl fmt::Debug for SleepCount<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.try_lock() { + Some(lock) => fmt::Debug::fmt(&lock.count, f), + None => f.write_str(""), + } + } + } + + f.debug_struct(name) + .field("active", &ActiveTasks(&state.active)) + .field("global_tasks", &state.queue.len()) + .field("local_runners", &LocalRunners(&state.local_queues)) + .field("sleepers", &SleepCount(&state.sleepers)) + .finish() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ensure_send_and_sync() { + use futures_lite::future::pending; + + fn is_send(_: T) {} + fn is_sync(_: T) {} + fn is_static(_: T) {} + + is_send::>(Executor::new()); + is_sync::>(Executor::new()); + + let ex = Executor::new(); + is_send(ex.run(pending::<()>())); + is_sync(ex.run(pending::<()>())); + is_send(ex.tick()); + is_sync(ex.tick()); + is_send(ex.schedule()); + is_sync(ex.schedule()); + is_static(ex.schedule()); + } +} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 3910166904856..e4b4acef7f919 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -1,3 +1,5 @@ +use alloc::vec::Vec; + use crate::TaskPool; mod adapters; @@ -193,7 +195,7 @@ where fn collect(mut self, pool: &TaskPool) -> C where C: FromIterator, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::MaybeSync + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -213,7 +215,7 @@ where where C: Default + Extend + Send, F: FnMut(&BatchIter::Item) -> bool + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::MaybeSync + Send + 'static, { let (mut a, mut b) = <(C, C)>::default(); pool.scope(|s| { @@ -330,7 +332,7 @@ where /// See [`Iterator::max()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max) fn max(mut self, pool: &TaskPool) -> Option where - BatchIter::Item: Ord + Send + 'static, + BatchIter::Item: Ord + crate::MaybeSync + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -347,7 +349,7 @@ where /// See [`Iterator::min()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min) fn min(mut self, pool: &TaskPool) -> Option where - BatchIter::Item: Ord + Send + 'static, + BatchIter::Item: Ord + crate::MaybeSync + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -366,7 +368,7 @@ where where R: Ord, F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::MaybeSync + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -386,7 +388,7 @@ where fn max_by(mut self, pool: &TaskPool, f: F) -> Option where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::MaybeSync + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -406,7 +408,7 @@ where where R: Ord, F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::MaybeSync + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -426,7 +428,7 @@ where fn min_by(mut self, pool: &TaskPool, f: F) -> Option where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: Send + 'static, + BatchIter::Item: crate::MaybeSync + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -479,7 +481,7 @@ where /// See [`Iterator::sum()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.sum) fn sum(mut self, pool: &TaskPool) -> R where - S: core::iter::Sum + Send + 'static, + S: core::iter::Sum + crate::MaybeSync + Send + 'static, R: core::iter::Sum, { pool.scope(|s| { @@ -496,7 +498,7 @@ where /// See [`Iterator::product()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.product) fn product(mut self, pool: &TaskPool) -> R where - S: core::iter::Product + Send + 'static, + S: core::iter::Product + crate::MaybeSync + Send + 'static, R: core::iter::Product, { pool.scope(|s| { diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 1d6d35664ed0e..3250f25f9f6c0 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -4,10 +4,14 @@ html_logo_url = "https://bevyengine.org/assets/icon.png", html_favicon_url = "https://bevyengine.org/assets/icon.png" )] +#![cfg_attr(not(feature = "std"), no_std)] extern crate alloc; +mod executor; + mod slice; + pub use slice::{ParallelSlice, ParallelSliceMut}; #[cfg_attr(target_arch = "wasm32", path = "wasm_task.rs")] @@ -15,10 +19,18 @@ mod task; pub use task::Task; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] mod task_pool; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] @@ -28,22 +40,36 @@ mod single_threaded_task_pool; pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; mod usages; + #[cfg(not(target_arch = "wasm32"))] pub use usages::tick_global_task_pools_on_main_thread; + pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] mod thread_executor; -#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] + +#[cfg(all( + feature = "std", + not(target_arch = "wasm32"), + feature = "multi_threaded" +))] pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker}; #[cfg(feature = "async-io")] pub use async_io::block_on; -#[cfg(not(feature = "async-io"))] + +#[cfg(all(feature = "std", not(feature = "async-io")))] pub use futures_lite::future::block_on; + pub use futures_lite::future::poll_once; mod iter; + pub use iter::ParallelIterator; pub use futures_lite; @@ -52,25 +78,55 @@ pub use futures_lite; /// /// This includes the most common types in this crate, re-exported for your convenience. pub mod prelude { + #[cfg(feature = "std")] + #[doc(hidden)] + pub use crate::block_on; + #[doc(hidden)] pub use crate::{ - block_on, iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, }; } -use core::num::NonZero; - /// Gets the logical CPU core count available to the current process. /// /// This is identical to [`std::thread::available_parallelism`], except /// it will return a default value of 1 if it internally errors out. /// /// This will always return at least 1. +#[cfg(feature = "std")] pub fn available_parallelism() -> usize { std::thread::available_parallelism() - .map(NonZero::::get) + .map(core::num::NonZero::::get) .unwrap_or(1) } + +#[cfg(feature = "std")] +mod std_bounds { + /// Adds a [`Send`] requirement on `no_std` platforms. + pub trait MaybeSend {} + impl MaybeSend for T {} + + /// Adds a [`Sync`] requirement on `no_std` platforms. + pub trait MaybeSync {} + impl MaybeSync for T {} +} + +#[cfg(feature = "std")] +pub use std_bounds::*; + +#[cfg(not(feature = "std"))] +mod no_std_bounds { + /// Adds a [`Send`] requirement on `no_std` platforms. + pub trait MaybeSend: Send {} + impl MaybeSend for T {} + + /// Adds a [`Sync`] requirement on `no_std` platforms. + pub trait MaybeSync: Sync {} + impl MaybeSync for T {} +} + +#[cfg(not(feature = "std"))] +pub use no_std_bounds::*; diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index d7f994026a678..5a540bb9c0520 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,12 +1,27 @@ -use alloc::{rc::Rc, sync::Arc}; +use crate::Task; +use alloc::{string::String, sync::Arc, vec::Vec}; use core::{cell::RefCell, future::Future, marker::PhantomData, mem}; -use crate::Task; +#[cfg(feature = "std")] +type LocalExecutor<'a> = crate::executor::LocalExecutor<'a>; + +#[cfg(not(feature = "std"))] +type LocalExecutor<'a> = crate::executor::Executor<'a>; +#[cfg(feature = "std")] +type RcCell = alloc::rc::Rc>; + +#[cfg(not(feature = "std"))] +type RcCell = Arc>; + +#[cfg(feature = "std")] thread_local! { - static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; + static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; } +#[cfg(not(feature = "std"))] +static GLOBAL_EXECUTOR: LocalExecutor<'static> = LocalExecutor::new(); + /// Used to create a [`TaskPool`]. #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder {} @@ -114,15 +129,13 @@ impl TaskPool { // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor = &async_executor::LocalExecutor::new(); + let executor = &LocalExecutor::new(); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env async_executor::LocalExecutor<'env> = - unsafe { mem::transmute(executor) }; + let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) }; - let results: RefCell>>>> = RefCell::new(Vec::new()); + let results: RefCell>>> = RefCell::new(Vec::new()); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let results: &'env RefCell>>>> = - unsafe { mem::transmute(&results) }; + let results: &'env RefCell>>> = unsafe { mem::transmute(&results) }; let mut scope = Scope { executor, @@ -142,7 +155,14 @@ impl TaskPool { let results = scope.results.borrow(); results .iter() - .map(|result| result.borrow_mut().take().unwrap()) + .map(|result| { + #[cfg(feature = "std")] + let mut result = result.borrow_mut(); + #[cfg(not(feature = "std"))] + let mut result = result.write(); + + result.take().unwrap() + }) .collect() } @@ -152,14 +172,16 @@ impl TaskPool { /// end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. - pub fn spawn(&self, future: impl Future + 'static) -> Task + pub fn spawn(&self, future: impl Future + 'static + crate::MaybeSend) -> Task where - T: 'static, + T: 'static + crate::MaybeSend, { #[cfg(target_arch = "wasm32")] - return Task::wrap_future(future); + { + Task::wrap_future(future) + } - #[cfg(not(target_arch = "wasm32"))] + #[cfg(all(not(target_arch = "wasm32"), feature = "std"))] { LOCAL_EXECUTOR.with(|executor| { let task = executor.spawn(future); @@ -169,12 +191,24 @@ impl TaskPool { Task::new(task) }) } + + #[cfg(all(not(target_arch = "wasm32"), not(feature = "std")))] + { + let task = GLOBAL_EXECUTOR.spawn(future); + // Loop until all tasks are done + while GLOBAL_EXECUTOR.try_tick() {} + + Task::new(task) + } } /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`]. - pub fn spawn_local(&self, future: impl Future + 'static) -> Task + pub fn spawn_local( + &self, + future: impl Future + 'static + crate::MaybeSend, + ) -> Task where - T: 'static, + T: 'static + crate::MaybeSend, { self.spawn(future) } @@ -192,9 +226,13 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&async_executor::LocalExecutor) -> R, + F: FnOnce(&LocalExecutor) -> R, { - LOCAL_EXECUTOR.with(f) + #[cfg(feature = "std")] + return LOCAL_EXECUTOR.with(f); + + #[cfg(not(feature = "std"))] + return (f)(&GLOBAL_EXECUTOR); } } @@ -203,16 +241,16 @@ impl TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope async_executor::LocalExecutor<'scope>, + executor: &'scope LocalExecutor<'scope>, // Vector to gather results of all futures spawned during scope run - results: &'env RefCell>>>>, + results: &'env RefCell>>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, } -impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { +impl<'scope, 'env, T: Send + 'env + crate::MaybeSync> Scope<'scope, 'env, T> { /// Spawns a scoped future onto the executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. @@ -220,7 +258,10 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'scope>(&self, f: Fut) { + pub fn spawn(&self, f: Fut) + where + Fut: Future + 'scope + crate::MaybeSend, + { self.spawn_on_scope(f); } @@ -231,7 +272,10 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_external + 'scope>(&self, f: Fut) { + pub fn spawn_on_external(&self, f: Fut) + where + Fut: Future + 'scope + crate::MaybeSend, + { self.spawn_on_scope(f); } @@ -240,12 +284,22 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// returned as a part of [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope + 'scope>(&self, f: Fut) { - let result = Rc::new(RefCell::new(None)); + pub fn spawn_on_scope(&self, f: Fut) + where + Fut: Future + 'scope + crate::MaybeSend, + { + #[cfg(feature = "std")] + let result = alloc::rc::Rc::new(RefCell::new(None)); + #[cfg(not(feature = "std"))] + let result = Arc::new(spin::RwLock::new(None)); + self.results.borrow_mut().push(result.clone()); let f = async move { let temp_result = f.await; + #[cfg(feature = "std")] result.borrow_mut().replace(temp_result); + #[cfg(not(feature = "std"))] + result.write().replace(temp_result); }; self.executor.spawn(f).detach(); } diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index a8a87c9ce80a0..b9e4d1afdce57 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,3 +1,5 @@ +use alloc::vec::Vec; + use super::TaskPool; /// Provides functions for mapping read-only slices across a provided [`TaskPool`]. @@ -36,7 +38,7 @@ pub trait ParallelSlice: AsRef<[T]> { fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(usize, &[T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::MaybeSync + Send + 'static, { let slice = self.as_ref(); let f = &f; @@ -83,7 +85,7 @@ pub trait ParallelSlice: AsRef<[T]> { fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec where F: Fn(usize, &[T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::MaybeSync + Send + 'static, { let slice = self.as_ref(); let chunk_size = core::cmp::max( @@ -139,7 +141,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(usize, &mut [T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::MaybeSync + Send + 'static, { let slice = self.as_mut(); let f = &f; @@ -194,7 +196,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { ) -> Vec where F: Fn(usize, &mut [T]) -> R + Send + Sync, - R: Send + 'static, + R: crate::MaybeSync + Send + 'static, { let mut slice = self.as_mut(); let chunk_size = core::cmp::max( diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index 53292c7574f44..e1c252bea1388 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -4,7 +4,7 @@ use core::{ task::{Context, Poll}, }; -/// Wraps `async_executor::Task`, a spawned future. +/// Wraps `async_task::Task`, a spawned future. /// /// Tasks are also futures themselves and yield the output of the spawned future. /// @@ -14,16 +14,16 @@ use core::{ /// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. #[derive(Debug)] #[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."] -pub struct Task(async_executor::Task); +pub struct Task(async_task::Task); impl Task { - /// Creates a new task from a given `async_executor::Task` - pub fn new(task: async_executor::Task) -> Self { + /// Creates a new task from a given `async_task::Task` + pub fn new(task: async_task::Task) -> Self { Self(task) } /// Detaches the task to let it keep running in the background. See - /// `async_executor::Task::detach` + /// `async_task::Task::detach` pub fn detach(self) { self.0.detach(); } @@ -36,7 +36,7 @@ impl Task { /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of /// canceling because it also waits for the task to stop running. /// - /// See `async_executor::Task::cancel` + /// See `async_task::Task::cancel` pub async fn cancel(self) -> Option { self.0.cancel().await } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9fab3fbbe8317..b882ecc03038f 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,25 +2,19 @@ use alloc::sync::Arc; use core::{future::Future, marker::PhantomData, mem, panic::AssertUnwindSafe}; use std::thread::{self, JoinHandle}; -use async_executor::FallibleTask; +use async_task::FallibleTask; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; +use crate::executor::{Executor, LocalExecutor}; + use crate::{ block_on, thread_executor::{ThreadExecutor, ThreadExecutorTicker}, Task, }; -struct CallOnDrop(Option>); - -impl Drop for CallOnDrop { - fn drop(&mut self) { - if let Some(call) = self.0.as_ref() { - call(); - } - } -} +use bevy_utils::OnDrop; /// Used to create a [`TaskPool`] #[derive(Default)] @@ -102,7 +96,7 @@ impl TaskPoolBuilder { #[derive(Debug)] pub struct TaskPool { /// The executor for the pool. - executor: Arc>, + executor: Arc>, // The inner state of the pool. threads: Vec>, @@ -111,7 +105,7 @@ pub struct TaskPool { impl TaskPool { thread_local! { - static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; + static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; static THREAD_EXECUTOR: Arc> = Arc::new(ThreadExecutor::new()); } @@ -128,7 +122,7 @@ impl TaskPool { fn new_internal(builder: TaskPoolBuilder) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - let executor = Arc::new(async_executor::Executor::new()); + let executor = Arc::new(Executor::new()); let num_threads = builder .num_threads @@ -160,7 +154,11 @@ impl TaskPool { on_thread_spawn(); drop(on_thread_spawn); } - let _destructor = CallOnDrop(on_thread_destroy); + let _destructor = OnDrop::new(move || { + if let Some(f) = on_thread_destroy { + (f)(); + } + }); loop { let res = std::panic::catch_unwind(|| { let tick_forever = async move { @@ -344,9 +342,9 @@ impl TaskPool { // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor: &async_executor::Executor = &self.executor; + let executor: &Executor = &self.executor; // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; + let executor: &'env Executor = unsafe { mem::transmute(executor) }; // SAFETY: As above, all futures must complete in this function so we can change the lifetime let external_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(external_executor) }; @@ -432,7 +430,7 @@ impl TaskPool { #[inline] async fn execute_global_external_scope<'scope, 'ticker, T>( - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope Executor<'scope>, external_ticker: ThreadExecutorTicker<'scope, 'ticker>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, @@ -478,7 +476,7 @@ impl TaskPool { #[inline] async fn execute_global_scope<'scope, 'ticker, T>( - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope Executor<'scope>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, ) -> Vec { @@ -562,7 +560,7 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&async_executor::LocalExecutor) -> R, + F: FnOnce(&LocalExecutor) -> R, { Self::LOCAL_EXECUTOR.with(f) } @@ -593,7 +591,7 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope Executor<'scope>, external_executor: &'scope ThreadExecutor<'scope>, scope_executor: &'scope ThreadExecutor<'scope>, spawned: &'scope ConcurrentQueue>>>, diff --git a/crates/bevy_tasks/src/thread_executor.rs b/crates/bevy_tasks/src/thread_executor.rs index b25811b559341..c29940cbd5fa8 100644 --- a/crates/bevy_tasks/src/thread_executor.rs +++ b/crates/bevy_tasks/src/thread_executor.rs @@ -1,9 +1,11 @@ use core::marker::PhantomData; use std::thread::{self, ThreadId}; -use async_executor::{Executor, Task}; +use async_task::Task; use futures_lite::Future; +use crate::executor::Executor; + /// An executor that can only be ticked on the thread it was instantiated on. But /// can spawn `Send` tasks from other threads. /// diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index b260274a0fb11..5be44f3b9e690 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,11 +1,20 @@ use super::TaskPool; use core::ops::Deref; + +#[cfg(feature = "std")] use std::sync::OnceLock; +#[cfg(not(feature = "std"))] +use spin::Once; + macro_rules! taskpool { ($(#[$attr:meta])* ($static:ident, $type:ident)) => { + #[cfg(feature = "std")] static $static: OnceLock<$type> = OnceLock::new(); + #[cfg(not(feature = "std"))] + static $static: Once<$type> = Once::new(); + $(#[$attr])* #[derive(Debug)] pub struct $type(TaskPool); @@ -13,7 +22,15 @@ macro_rules! taskpool { impl $type { #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")] pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { - $static.get_or_init(|| Self(f())) + #[cfg(feature = "std")] + { + $static.get_or_init(|| Self(f())) + } + + #[cfg(not(feature = "std"))] + { + $static.call_once(|| Self(f())) + } } #[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \ From f44c9a65a526aa808b12af7a0b043283f02dc6d5 Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Fri, 27 Sep 2024 15:59:19 +1000 Subject: [PATCH 03/10] Expected `dead_code` doesn't apply to MacOS --- crates/bevy_tasks/src/executor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs index eae7f86bd79e7..8848599179199 100644 --- a/crates/bevy_tasks/src/executor.rs +++ b/crates/bevy_tasks/src/executor.rs @@ -2,7 +2,7 @@ unsafe_code, reason = "executor implementation requires use of AtomicPtr and lifetime transmutation" )] -#![expect( +#![allow( dead_code, reason = "feature gating means certain methods aren't used on certain targets" )] From b98cfa5d07df2591d17ac2a9b94ed9692e3adffe Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Wed, 4 Dec 2024 15:27:54 +1100 Subject: [PATCH 04/10] Reset Branch --- crates/bevy_tasks/Cargo.toml | 30 +- crates/bevy_tasks/src/executor.rs | 844 ------------------ crates/bevy_tasks/src/iter/mod.rs | 22 +- crates/bevy_tasks/src/lib.rs | 74 +- .../src/single_threaded_task_pool.rs | 104 +-- crates/bevy_tasks/src/slice.rs | 10 +- crates/bevy_tasks/src/task.rs | 12 +- crates/bevy_tasks/src/task_pool.rs | 38 +- crates/bevy_tasks/src/thread_executor.rs | 4 +- crates/bevy_tasks/src/usages.rs | 19 +- 10 files changed, 80 insertions(+), 1077 deletions(-) delete mode 100644 crates/bevy_tasks/src/executor.rs diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 039d736766296..e915cd941f57f 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,36 +9,14 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -default = ["std"] -multi_threaded = ["std", "dep:async-channel"] -async-io = ["std", "dep:async-io"] -std = [ - "futures-lite/std", - "async-task/std", - "slab/std", - "fastrand/std", - "spin/std", - "concurrent-queue/std", -] +multi_threaded = ["dep:async-channel", "dep:concurrent-queue"] [dependencies] -futures-lite = { version = "2.0.1", default-features = false, features = [ - "race", -] } -async-task = { version = "4.4.0", default-features = false } -slab = { version = "0.4.4", default-features = false } -fastrand = { version = "2.0.0", default-features = false } -spin = { version = "0.9.8", default-features = false, features = [ - "spin_mutex", - "rwlock", - "once", -] } -concurrent-queue = { version = "2.0.0", default-features = false } - -bevy_utils = { path = "../bevy_utils", version = "0.15.0-dev", default-features = false } - +futures-lite = "2.0.1" +async-executor = "1.11" async-channel = { version = "2.3.0", optional = true } async-io = { version = "2.0.0", optional = true } +concurrent-queue = { version = "2.0.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs deleted file mode 100644 index 8848599179199..0000000000000 --- a/crates/bevy_tasks/src/executor.rs +++ /dev/null @@ -1,844 +0,0 @@ -#![expect( - unsafe_code, - reason = "executor implementation requires use of AtomicPtr and lifetime transmutation" -)] -#![allow( - dead_code, - reason = "feature gating means certain methods aren't used on certain targets" -)] - -use alloc::{sync::Arc, vec::Vec}; - -use core::{ - fmt, - marker::PhantomData, - panic::{RefUnwindSafe, UnwindSafe}, - sync::atomic::{AtomicBool, AtomicPtr, Ordering}, - task::{Poll, Waker}, -}; - -use async_task::{Builder, Runnable, Task}; -use concurrent_queue::ConcurrentQueue; -use futures_lite::{future, prelude::*}; -use slab::Slab; -use spin::{Mutex, RwLock}; - -use bevy_utils::OnDrop; - -/// An async executor. -/// Based on the reference executor provided by [async_executor]. -/// -/// [async_executor]: https://crates.io/crates/async-executor -pub struct Executor<'a> { - /// The executor state. - state: AtomicPtr, - - /// Makes the `'a` lifetime invariant. - _marker: PhantomData>, -} - -// SAFETY: Executor stores no thread local state that can be accessed via other thread. -unsafe impl Send for Executor<'_> {} -// SAFETY: Executor internally synchronizes all of it's operations internally. -unsafe impl Sync for Executor<'_> {} - -impl UnwindSafe for Executor<'_> {} -impl RefUnwindSafe for Executor<'_> {} - -impl fmt::Debug for Executor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_executor(self, "Executor", f) - } -} - -impl<'a> Executor<'a> { - /// Creates a new executor. - pub const fn new() -> Executor<'a> { - Executor { - state: AtomicPtr::new(core::ptr::null_mut()), - _marker: PhantomData, - } - } - - /// Returns `true` if there are no unfinished tasks. - pub fn is_empty(&self) -> bool { - self.state().active.lock().is_empty() - } - - /// Spawns a task onto the executor. - pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { - let mut active = self.state().active.lock(); - - // SAFETY: `T` and the future are `Send`. - unsafe { self.spawn_inner(future, &mut active) } - } - - /// Spawn a future while holding the inner lock. - /// - /// # Safety - /// - /// If this is an `Executor`, `F` and `T` must be `Send`. - unsafe fn spawn_inner( - &self, - future: impl Future + 'a, - active: &mut Slab, - ) -> Task { - // Remove the task from the set of active tasks when the future finishes. - let entry = active.vacant_entry(); - let index = entry.key(); - let state = self.state_as_arc(); - let future = async move { - let _guard = OnDrop::new(move || drop(state.active.lock().try_remove(index))); - future.await - }; - - // Create the task and register it in the set of active tasks. - // - // SAFETY: - // - // If `future` is not `Send`, this must be a `LocalExecutor` as per this - // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, - // `try_tick`, `tick` and `run` can only be called from the origin - // thread of the `LocalExecutor`. Similarly, `spawn` can only be called - // from the origin thread, ensuring that `future` and the executor share - // the same origin thread. The `Runnable` can be scheduled from other - // threads, but because of the above `Runnable` can only be called or - // dropped on the origin thread. - // - // `future` is not `'static`, but we make sure that the `Runnable` does - // not outlive `'a`. When the executor is dropped, the `active` field is - // drained and all of the `Waker`s are woken. Then, the queue inside of - // the `Executor` is drained of all of its runnables. This ensures that - // runnables are dropped and this precondition is satisfied. - // - // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. - // Therefore we do not need to worry about what is done with the - // `Waker`. - let (runnable, task) = unsafe { - let builder = Builder::new(); - - #[cfg(feature = "std")] - let builder = builder.propagate_panic(true); - - builder.spawn_unchecked(|()| future, self.schedule()) - }; - entry.insert(runnable.waker()); - - runnable.schedule(); - task - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - pub fn try_tick(&self) -> bool { - self.state().try_tick() - } - - /// Runs a single task. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - pub async fn tick(&self) { - self.state().tick().await; - } - - /// Runs the executor until the given future completes. - pub async fn run(&self, future: impl Future) -> T { - self.state().run(future).await - } - - /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.state_as_arc(); - - // TODO: If possible, push into the current local queue and notify the ticker. - move |runnable| { - state.queue.push(runnable).unwrap(); - state.notify(); - } - } - - /// Returns a pointer to the inner state. - #[inline] - fn state_ptr(&self) -> *const State { - let mut ptr = self.state.load(Ordering::Acquire); - if ptr.is_null() { - ptr = State::alloc_atomic(&self.state); - } - ptr - } - - /// Returns a reference to the inner state. - #[inline] - fn state(&self) -> &State { - // SAFETY: So long as an Executor lives, it's state pointer will always be valid - // when accessed through state_ptr. - unsafe { &*self.state_ptr() } - } - - // Clones the inner state Arc - #[inline] - fn state_as_arc(&self) -> Arc { - // SAFETY: So long as an Executor lives, it's state pointer will always be a valid - // Arc when accessed through state_ptr. - let arc = unsafe { Arc::from_raw(self.state_ptr()) }; - let clone = arc.clone(); - core::mem::forget(arc); - clone - } -} - -impl Drop for Executor<'_> { - fn drop(&mut self) { - let ptr = *self.state.get_mut(); - if ptr.is_null() { - return; - } - - // SAFETY: As ptr is not null, it was allocated via Arc::new and converted - // via Arc::into_raw in state_ptr. - let state = unsafe { Arc::from_raw(ptr) }; - - let mut active = state.active.lock(); - for w in active.drain() { - w.wake(); - } - drop(active); - - while state.queue.pop().is_ok() {} - } -} - -impl<'a> Default for Executor<'a> { - fn default() -> Executor<'a> { - Executor::new() - } -} - -/// A thread-local executor. -/// -/// The executor can only be run on the thread that created it. -pub struct LocalExecutor<'a> { - /// The inner executor. - inner: Executor<'a>, - - /// Makes the type `!Send` and `!Sync`. - _marker: PhantomData>, -} - -impl UnwindSafe for LocalExecutor<'_> {} -impl RefUnwindSafe for LocalExecutor<'_> {} - -impl fmt::Debug for LocalExecutor<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - debug_executor(&self.inner, "LocalExecutor", f) - } -} - -impl<'a> LocalExecutor<'a> { - /// Creates a single-threaded executor. - pub const fn new() -> LocalExecutor<'a> { - LocalExecutor { - inner: Executor::new(), - _marker: PhantomData, - } - } - - /// Returns `true` if there are no unfinished tasks. - pub fn is_empty(&self) -> bool { - self.inner().is_empty() - } - - /// Spawns a task onto the executor. - pub fn spawn(&self, future: impl Future + 'a) -> Task { - let mut active = self.inner().state().active.lock(); - - // SAFETY: This executor is not thread safe, so the future and its result - // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } - } - - /// Spawns many tasks onto the executor. - /// - /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and - /// spawns all of the tasks in one go. With large amounts of tasks this can improve - /// contention. - /// - /// It is assumed that the iterator provided does not block; blocking iterators can lock up - /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the - /// mutex is not released, as there are no other threads that can poll this executor. - /// - /// [`spawn`]: LocalExecutor::spawn - /// [`Executor::spawn_many`]: Executor::spawn_many - pub fn spawn_many + 'a>( - &self, - futures: impl IntoIterator, - handles: &mut impl Extend>, - ) { - let mut active = self.inner().state().active.lock(); - - // Convert all of the futures to tasks. - let tasks = futures.into_iter().map(|future| { - // SAFETY: This executor is not thread safe, so the future and its result - // cannot be sent to another thread. - unsafe { self.inner().spawn_inner(future, &mut active) } - - // As only one thread can spawn or poll tasks at a time, there is no need - // to release lock contention here. - }); - - // Push them to the user's collection. - handles.extend(tasks); - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - pub fn try_tick(&self) -> bool { - self.inner().try_tick() - } - - /// Runs a single task. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - pub async fn tick(&self) { - self.inner().tick().await; - } - - /// Runs the executor until the given future completes. - pub async fn run(&self, future: impl Future) -> T { - self.inner().run(future).await - } - - /// Returns a reference to the inner executor. - fn inner(&self) -> &Executor<'a> { - &self.inner - } -} - -impl<'a> Default for LocalExecutor<'a> { - fn default() -> LocalExecutor<'a> { - LocalExecutor::new() - } -} - -/// The state of a executor. -struct State { - /// The global queue. - queue: ConcurrentQueue, - - /// Local queues created by runners. - local_queues: RwLock>>>, - - /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. - notified: AtomicBool, - - /// A list of sleeping tickers. - sleepers: Mutex, - - /// Currently active tasks. - active: Mutex>, -} - -impl State { - /// Creates state for a new executor. - const fn new() -> State { - State { - queue: ConcurrentQueue::unbounded(), - local_queues: RwLock::new(Vec::new()), - notified: AtomicBool::new(true), - sleepers: Mutex::new(Sleepers { - count: 0, - wakers: Vec::new(), - free_ids: Vec::new(), - }), - active: Mutex::new(Slab::new()), - } - } - - /// Notifies a sleeping ticker. - #[inline] - fn notify(&self) { - if self - .notified - .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) - .is_ok() - { - let waker = self.sleepers.lock().notify(); - if let Some(w) = waker { - w.wake(); - } - } - } - - fn try_tick(&self) -> bool { - match self.queue.pop() { - Err(_) => false, - Ok(runnable) => { - // Notify another ticker now to pick up where this ticker left off, just in case - // running the task takes a long time. - self.notify(); - - // Run the task. - runnable.run(); - true - } - } - } - - async fn tick(&self) { - let runnable = Ticker::new(self).runnable().await; - runnable.run(); - } - - async fn run(&self, future: impl Future) -> T { - let mut runner = Runner::new(self); - let mut rng = fastrand::Rng::with_seed(0x4d595df4d0f33173); - - // A future that runs tasks forever. - let run_forever = async { - loop { - for _ in 0..200 { - let runnable = runner.runnable(&mut rng).await; - runnable.run(); - } - future::yield_now().await; - } - }; - - // Run `future` and `run_forever` concurrently until `future` completes. - future.or(run_forever).await - } - - #[cold] - fn alloc_atomic(atomic_ptr: &AtomicPtr) -> *mut Self { - let state = Arc::new(Self::new()); - let ptr = Arc::into_raw(state).cast_mut(); - if let Err(actual) = atomic_ptr.compare_exchange( - core::ptr::null_mut(), - ptr, - Ordering::AcqRel, - Ordering::Acquire, - ) { - // SAFETY: This was just created from Arc::into_raw. - drop(unsafe { Arc::from_raw(ptr) }); - actual - } else { - ptr - } - } -} - -/// A list of sleeping tickers. -struct Sleepers { - /// Number of sleeping tickers (both notified and unnotified). - count: usize, - - /// IDs and wakers of sleeping unnotified tickers. - /// - /// A sleeping ticker is notified when its waker is missing from this list. - wakers: Vec<(usize, Waker)>, - - /// Reclaimed IDs. - free_ids: Vec, -} - -impl Sleepers { - /// Inserts a new sleeping ticker. - fn insert(&mut self, waker: &Waker) -> usize { - let id = match self.free_ids.pop() { - Some(id) => id, - None => self.count + 1, - }; - self.count += 1; - self.wakers.push((id, waker.clone())); - id - } - - /// Re-inserts a sleeping ticker's waker if it was notified. - /// - /// Returns `true` if the ticker was notified. - fn update(&mut self, id: usize, waker: &Waker) -> bool { - for item in &mut self.wakers { - if item.0 == id { - item.1.clone_from(waker); - return false; - } - } - - self.wakers.push((id, waker.clone())); - true - } - - /// Removes a previously inserted sleeping ticker. - /// - /// Returns `true` if the ticker was notified. - fn remove(&mut self, id: usize) -> bool { - self.count -= 1; - self.free_ids.push(id); - - for i in (0..self.wakers.len()).rev() { - if self.wakers[i].0 == id { - self.wakers.remove(i); - return false; - } - } - true - } - - /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. - fn is_notified(&self) -> bool { - self.count == 0 || self.count > self.wakers.len() - } - - /// Returns notification waker for a sleeping ticker. - /// - /// If a ticker was notified already or there are no tickers, `None` will be returned. - fn notify(&mut self) -> Option { - if self.wakers.len() == self.count { - self.wakers.pop().map(|item| item.1) - } else { - None - } - } -} - -/// Runs task one by one. -struct Ticker<'a> { - /// The executor state. - state: &'a State, - - /// Set to a non-zero sleeper ID when in sleeping state. - /// - /// States a ticker can be in: - /// 1) Woken. - /// 2a) Sleeping and unnotified. - /// 2b) Sleeping and notified. - sleeping: usize, -} - -impl Ticker<'_> { - /// Creates a ticker. - fn new(state: &State) -> Ticker<'_> { - Ticker { state, sleeping: 0 } - } - - /// Moves the ticker into sleeping and unnotified state. - /// - /// Returns `false` if the ticker was already sleeping and unnotified. - fn sleep(&mut self, waker: &Waker) -> bool { - let mut sleepers = self.state.sleepers.lock(); - - match self.sleeping { - // Move to sleeping state. - 0 => { - self.sleeping = sleepers.insert(waker); - } - - // Already sleeping, check if notified. - id => { - if !sleepers.update(id, waker) { - return false; - } - } - } - - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - - true - } - - /// Moves the ticker into woken state. - fn wake(&mut self) { - if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.lock(); - sleepers.remove(self.sleeping); - - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - } - self.sleeping = 0; - } - - /// Waits for the next runnable task to run. - async fn runnable(&mut self) -> Runnable { - self.runnable_with(|| self.state.queue.pop().ok()).await - } - - /// Waits for the next runnable task to run, given a function that searches for a task. - async fn runnable_with(&mut self, mut search: impl FnMut() -> Option) -> Runnable { - future::poll_fn(|cx| { - loop { - match search() { - None => { - // Move to sleeping and unnotified state. - if !self.sleep(cx.waker()) { - // If already sleeping and unnotified, return. - return Poll::Pending; - } - } - Some(r) => { - // Wake up. - self.wake(); - - // Notify another ticker now to pick up where this ticker left off, just in - // case running the task takes a long time. - self.state.notify(); - - return Poll::Ready(r); - } - } - } - }) - .await - } -} - -impl Drop for Ticker<'_> { - fn drop(&mut self) { - // If this ticker is in sleeping state, it must be removed from the sleepers list. - if self.sleeping != 0 { - let mut sleepers = self.state.sleepers.lock(); - let notified = sleepers.remove(self.sleeping); - - self.state - .notified - .store(sleepers.is_notified(), Ordering::Release); - - // If this ticker was notified, then notify another ticker. - if notified { - drop(sleepers); - self.state.notify(); - } - } - } -} - -/// A worker in a work-stealing executor. -/// -/// This is just a ticker that also has an associated local queue for improved cache locality. -struct Runner<'a> { - /// The executor state. - state: &'a State, - - /// Inner ticker. - ticker: Ticker<'a>, - - /// The local queue. - local: Arc>, - - /// Bumped every time a runnable task is found. - ticks: usize, -} - -impl Runner<'_> { - /// Creates a runner and registers it in the executor state. - fn new(state: &State) -> Runner<'_> { - let runner = Runner { - state, - ticker: Ticker::new(state), - local: Arc::new(ConcurrentQueue::bounded(512)), - ticks: 0, - }; - state.local_queues.write().push(runner.local.clone()); - runner - } - - /// Waits for the next runnable task to run. - async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable { - let runnable = self - .ticker - .runnable_with(|| { - // Try the local queue. - if let Ok(r) = self.local.pop() { - return Some(r); - } - - // Try stealing from the global queue. - if let Ok(r) = self.state.queue.pop() { - steal(&self.state.queue, &self.local); - return Some(r); - } - - // Try stealing from other runners. - let local_queues = self.state.local_queues.read(); - - // Pick a random starting point in the iterator list and rotate the list. - let n = local_queues.len(); - let start = rng.usize(..n); - let iter = local_queues - .iter() - .chain(local_queues.iter()) - .skip(start) - .take(n); - - // Remove this runner's local queue. - let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); - - // Try stealing from each local queue in the list. - for local in iter { - steal(local, &self.local); - if let Ok(r) = self.local.pop() { - return Some(r); - } - } - - None - }) - .await; - - // Bump the tick counter. - self.ticks = self.ticks.wrapping_add(1); - - if self.ticks % 64 == 0 { - // Steal tasks from the global queue to ensure fair task scheduling. - steal(&self.state.queue, &self.local); - } - - runnable - } -} - -impl Drop for Runner<'_> { - fn drop(&mut self) { - // Remove the local queue. - self.state - .local_queues - .write() - .retain(|local| !Arc::ptr_eq(local, &self.local)); - - // Re-schedule remaining tasks in the local queue. - while let Ok(r) = self.local.pop() { - r.schedule(); - } - } -} - -/// Steals some items from one queue into another. -fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { - // Half of `src`'s length rounded up. - let mut count = (src.len() + 1) / 2; - - if count > 0 { - // Don't steal more than fits into the queue. - if let Some(cap) = dest.capacity() { - count = count.min(cap - dest.len()); - } - - // Steal tasks. - for _ in 0..count { - if let Ok(t) = src.pop() { - assert!(dest.push(t).is_ok()); - } else { - break; - } - } - } -} - -/// Debug implementation for `Executor` and `LocalExecutor`. -fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // Get a reference to the state. - let ptr = executor.state.load(Ordering::Acquire); - if ptr.is_null() { - // The executor has not been initialized. - struct Uninitialized; - - impl fmt::Debug for Uninitialized { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("") - } - } - - return f.debug_tuple(name).field(&Uninitialized).finish(); - } - - // SAFETY: If the state pointer is not null, it must have been - // allocated properly by Arc::new and converted via Arc::into_raw - // in state_ptr. - let state = unsafe { &*ptr }; - - debug_state(state, name, f) -} - -/// Debug implementation for `Executor` and `LocalExecutor`. -fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { - /// Debug wrapper for the number of active tasks. - struct ActiveTasks<'a>(&'a Mutex>); - - impl fmt::Debug for ActiveTasks<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.try_lock() { - Some(lock) => fmt::Debug::fmt(&lock.len(), f), - None => f.write_str(""), - } - } - } - - /// Debug wrapper for the local runners. - struct LocalRunners<'a>(&'a RwLock>>>); - - impl fmt::Debug for LocalRunners<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.try_read() { - Some(lock) => f - .debug_list() - .entries(lock.iter().map(|queue| queue.len())) - .finish(), - None => f.write_str(""), - } - } - } - - /// Debug wrapper for the sleepers. - struct SleepCount<'a>(&'a Mutex); - - impl fmt::Debug for SleepCount<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.0.try_lock() { - Some(lock) => fmt::Debug::fmt(&lock.count, f), - None => f.write_str(""), - } - } - } - - f.debug_struct(name) - .field("active", &ActiveTasks(&state.active)) - .field("global_tasks", &state.queue.len()) - .field("local_runners", &LocalRunners(&state.local_queues)) - .field("sleepers", &SleepCount(&state.sleepers)) - .finish() -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn ensure_send_and_sync() { - use futures_lite::future::pending; - - fn is_send(_: T) {} - fn is_sync(_: T) {} - fn is_static(_: T) {} - - is_send::>(Executor::new()); - is_sync::>(Executor::new()); - - let ex = Executor::new(); - is_send(ex.run(pending::<()>())); - is_sync(ex.run(pending::<()>())); - is_send(ex.tick()); - is_sync(ex.tick()); - is_send(ex.schedule()); - is_sync(ex.schedule()); - is_static(ex.schedule()); - } -} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index e4b4acef7f919..3910166904856 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -1,5 +1,3 @@ -use alloc::vec::Vec; - use crate::TaskPool; mod adapters; @@ -195,7 +193,7 @@ where fn collect(mut self, pool: &TaskPool) -> C where C: FromIterator, - BatchIter::Item: crate::MaybeSync + Send + 'static, + BatchIter::Item: Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -215,7 +213,7 @@ where where C: Default + Extend + Send, F: FnMut(&BatchIter::Item) -> bool + Send + Sync + Clone, - BatchIter::Item: crate::MaybeSync + Send + 'static, + BatchIter::Item: Send + 'static, { let (mut a, mut b) = <(C, C)>::default(); pool.scope(|s| { @@ -332,7 +330,7 @@ where /// See [`Iterator::max()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.max) fn max(mut self, pool: &TaskPool) -> Option where - BatchIter::Item: Ord + crate::MaybeSync + Send + 'static, + BatchIter::Item: Ord + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -349,7 +347,7 @@ where /// See [`Iterator::min()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.min) fn min(mut self, pool: &TaskPool) -> Option where - BatchIter::Item: Ord + crate::MaybeSync + Send + 'static, + BatchIter::Item: Ord + Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -368,7 +366,7 @@ where where R: Ord, F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: crate::MaybeSync + Send + 'static, + BatchIter::Item: Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -388,7 +386,7 @@ where fn max_by(mut self, pool: &TaskPool, f: F) -> Option where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: crate::MaybeSync + Send + 'static, + BatchIter::Item: Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -408,7 +406,7 @@ where where R: Ord, F: FnMut(&BatchIter::Item) -> R + Send + Sync + Clone, - BatchIter::Item: crate::MaybeSync + Send + 'static, + BatchIter::Item: Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -428,7 +426,7 @@ where fn min_by(mut self, pool: &TaskPool, f: F) -> Option where F: FnMut(&BatchIter::Item, &BatchIter::Item) -> core::cmp::Ordering + Send + Sync + Clone, - BatchIter::Item: crate::MaybeSync + Send + 'static, + BatchIter::Item: Send + 'static, { pool.scope(|s| { while let Some(batch) = self.next_batch() { @@ -481,7 +479,7 @@ where /// See [`Iterator::sum()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.sum) fn sum(mut self, pool: &TaskPool) -> R where - S: core::iter::Sum + crate::MaybeSync + Send + 'static, + S: core::iter::Sum + Send + 'static, R: core::iter::Sum, { pool.scope(|s| { @@ -498,7 +496,7 @@ where /// See [`Iterator::product()`](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.product) fn product(mut self, pool: &TaskPool) -> R where - S: core::iter::Product + crate::MaybeSync + Send + 'static, + S: core::iter::Product + Send + 'static, R: core::iter::Product, { pool.scope(|s| { diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 3250f25f9f6c0..1d6d35664ed0e 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -4,14 +4,10 @@ html_logo_url = "https://bevyengine.org/assets/icon.png", html_favicon_url = "https://bevyengine.org/assets/icon.png" )] -#![cfg_attr(not(feature = "std"), no_std)] extern crate alloc; -mod executor; - mod slice; - pub use slice::{ParallelSlice, ParallelSliceMut}; #[cfg_attr(target_arch = "wasm32", path = "wasm_task.rs")] @@ -19,18 +15,10 @@ mod task; pub use task::Task; -#[cfg(all( - feature = "std", - not(target_arch = "wasm32"), - feature = "multi_threaded" -))] +#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] mod task_pool; -#[cfg(all( - feature = "std", - not(target_arch = "wasm32"), - feature = "multi_threaded" -))] +#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] @@ -40,36 +28,22 @@ mod single_threaded_task_pool; pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; mod usages; - #[cfg(not(target_arch = "wasm32"))] pub use usages::tick_global_task_pools_on_main_thread; - pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; -#[cfg(all( - feature = "std", - not(target_arch = "wasm32"), - feature = "multi_threaded" -))] +#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] mod thread_executor; - -#[cfg(all( - feature = "std", - not(target_arch = "wasm32"), - feature = "multi_threaded" -))] +#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker}; #[cfg(feature = "async-io")] pub use async_io::block_on; - -#[cfg(all(feature = "std", not(feature = "async-io")))] +#[cfg(not(feature = "async-io"))] pub use futures_lite::future::block_on; - pub use futures_lite::future::poll_once; mod iter; - pub use iter::ParallelIterator; pub use futures_lite; @@ -78,55 +52,25 @@ pub use futures_lite; /// /// This includes the most common types in this crate, re-exported for your convenience. pub mod prelude { - #[cfg(feature = "std")] - #[doc(hidden)] - pub use crate::block_on; - #[doc(hidden)] pub use crate::{ + block_on, iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, }; } +use core::num::NonZero; + /// Gets the logical CPU core count available to the current process. /// /// This is identical to [`std::thread::available_parallelism`], except /// it will return a default value of 1 if it internally errors out. /// /// This will always return at least 1. -#[cfg(feature = "std")] pub fn available_parallelism() -> usize { std::thread::available_parallelism() - .map(core::num::NonZero::::get) + .map(NonZero::::get) .unwrap_or(1) } - -#[cfg(feature = "std")] -mod std_bounds { - /// Adds a [`Send`] requirement on `no_std` platforms. - pub trait MaybeSend {} - impl MaybeSend for T {} - - /// Adds a [`Sync`] requirement on `no_std` platforms. - pub trait MaybeSync {} - impl MaybeSync for T {} -} - -#[cfg(feature = "std")] -pub use std_bounds::*; - -#[cfg(not(feature = "std"))] -mod no_std_bounds { - /// Adds a [`Send`] requirement on `no_std` platforms. - pub trait MaybeSend: Send {} - impl MaybeSend for T {} - - /// Adds a [`Sync`] requirement on `no_std` platforms. - pub trait MaybeSync: Sync {} - impl MaybeSync for T {} -} - -#[cfg(not(feature = "std"))] -pub use no_std_bounds::*; diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 8db1fdde750fb..054d22260eac4 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,27 +1,12 @@ -use crate::Task; -use alloc::{string::String, sync::Arc, vec::Vec}; +use alloc::{rc::Rc, sync::Arc}; use core::{cell::RefCell, future::Future, marker::PhantomData, mem}; -#[cfg(feature = "std")] -type LocalExecutor<'a> = crate::executor::LocalExecutor<'a>; - -#[cfg(not(feature = "std"))] -type LocalExecutor<'a> = crate::executor::Executor<'a>; - -#[cfg(feature = "std")] -type RcCell = alloc::rc::Rc>; - -#[cfg(not(feature = "std"))] -type RcCell = Arc>; +use crate::Task; -#[cfg(feature = "std")] thread_local! { - static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; + static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; } -#[cfg(not(feature = "std"))] -static GLOBAL_EXECUTOR: LocalExecutor<'static> = LocalExecutor::new(); - /// Used to create a [`TaskPool`]. #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder {} @@ -139,13 +124,15 @@ impl TaskPool { // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor = &LocalExecutor::new(); + let executor = &async_executor::LocalExecutor::new(); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) }; + let executor: &'env async_executor::LocalExecutor<'env> = + unsafe { mem::transmute(executor) }; - let results: RefCell>>> = RefCell::new(Vec::new()); + let results: RefCell>>>> = RefCell::new(Vec::new()); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let results: &'env RefCell>>> = unsafe { mem::transmute(&results) }; + let results: &'env RefCell>>>> = + unsafe { mem::transmute(&results) }; let mut scope = Scope { executor, @@ -165,14 +152,7 @@ impl TaskPool { let results = scope.results.borrow(); results .iter() - .map(|result| { - #[cfg(feature = "std")] - let mut result = result.borrow_mut(); - #[cfg(not(feature = "std"))] - let mut result = result.write(); - - result.take().unwrap() - }) + .map(|result| result.borrow_mut().take().unwrap()) .collect() } @@ -182,16 +162,14 @@ impl TaskPool { /// end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. - pub fn spawn(&self, future: impl Future + 'static + crate::MaybeSend) -> Task + pub fn spawn(&self, future: impl Future + 'static) -> Task where - T: 'static + crate::MaybeSend, + T: 'static, { #[cfg(target_arch = "wasm32")] - { - Task::wrap_future(future) - } + return Task::wrap_future(future); - #[cfg(all(not(target_arch = "wasm32"), feature = "std"))] + #[cfg(not(target_arch = "wasm32"))] { LOCAL_EXECUTOR.with(|executor| { let task = executor.spawn(future); @@ -201,24 +179,12 @@ impl TaskPool { Task::new(task) }) } - - #[cfg(all(not(target_arch = "wasm32"), not(feature = "std")))] - { - let task = GLOBAL_EXECUTOR.spawn(future); - // Loop until all tasks are done - while GLOBAL_EXECUTOR.try_tick() {} - - Task::new(task) - } } /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`]. - pub fn spawn_local( - &self, - future: impl Future + 'static + crate::MaybeSend, - ) -> Task + pub fn spawn_local(&self, future: impl Future + 'static) -> Task where - T: 'static + crate::MaybeSend, + T: 'static, { self.spawn(future) } @@ -236,13 +202,9 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&LocalExecutor) -> R, + F: FnOnce(&async_executor::LocalExecutor) -> R, { - #[cfg(feature = "std")] - return LOCAL_EXECUTOR.with(f); - - #[cfg(not(feature = "std"))] - return (f)(&GLOBAL_EXECUTOR); + LOCAL_EXECUTOR.with(f) } } @@ -251,16 +213,16 @@ impl TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope LocalExecutor<'scope>, + executor: &'scope async_executor::LocalExecutor<'scope>, // Vector to gather results of all futures spawned during scope run - results: &'env RefCell>>>, + results: &'env RefCell>>>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, } -impl<'scope, 'env, T: Send + 'env + crate::MaybeSync> Scope<'scope, 'env, T> { +impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// Spawns a scoped future onto the executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. @@ -268,10 +230,7 @@ impl<'scope, 'env, T: Send + 'env + crate::MaybeSync> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn(&self, f: Fut) - where - Fut: Future + 'scope + crate::MaybeSend, - { + pub fn spawn + 'scope>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -282,10 +241,7 @@ impl<'scope, 'env, T: Send + 'env + crate::MaybeSync> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_external(&self, f: Fut) - where - Fut: Future + 'scope + crate::MaybeSend, - { + pub fn spawn_on_external + 'scope>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -294,22 +250,12 @@ impl<'scope, 'env, T: Send + 'env + crate::MaybeSync> Scope<'scope, 'env, T> { /// returned as a part of [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope(&self, f: Fut) - where - Fut: Future + 'scope + crate::MaybeSend, - { - #[cfg(feature = "std")] - let result = alloc::rc::Rc::new(RefCell::new(None)); - #[cfg(not(feature = "std"))] - let result = Arc::new(spin::RwLock::new(None)); - + pub fn spawn_on_scope + 'scope>(&self, f: Fut) { + let result = Rc::new(RefCell::new(None)); self.results.borrow_mut().push(result.clone()); let f = async move { let temp_result = f.await; - #[cfg(feature = "std")] result.borrow_mut().replace(temp_result); - #[cfg(not(feature = "std"))] - result.write().replace(temp_result); }; self.executor.spawn(f).detach(); } diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index b9e4d1afdce57..a8a87c9ce80a0 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,5 +1,3 @@ -use alloc::vec::Vec; - use super::TaskPool; /// Provides functions for mapping read-only slices across a provided [`TaskPool`]. @@ -38,7 +36,7 @@ pub trait ParallelSlice: AsRef<[T]> { fn par_chunk_map(&self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(usize, &[T]) -> R + Send + Sync, - R: crate::MaybeSync + Send + 'static, + R: Send + 'static, { let slice = self.as_ref(); let f = &f; @@ -85,7 +83,7 @@ pub trait ParallelSlice: AsRef<[T]> { fn par_splat_map(&self, task_pool: &TaskPool, max_tasks: Option, f: F) -> Vec where F: Fn(usize, &[T]) -> R + Send + Sync, - R: crate::MaybeSync + Send + 'static, + R: Send + 'static, { let slice = self.as_ref(); let chunk_size = core::cmp::max( @@ -141,7 +139,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { fn par_chunk_map_mut(&mut self, task_pool: &TaskPool, chunk_size: usize, f: F) -> Vec where F: Fn(usize, &mut [T]) -> R + Send + Sync, - R: crate::MaybeSync + Send + 'static, + R: Send + 'static, { let slice = self.as_mut(); let f = &f; @@ -196,7 +194,7 @@ pub trait ParallelSliceMut: AsMut<[T]> { ) -> Vec where F: Fn(usize, &mut [T]) -> R + Send + Sync, - R: crate::MaybeSync + Send + 'static, + R: Send + 'static, { let mut slice = self.as_mut(); let chunk_size = core::cmp::max( diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index e1c252bea1388..53292c7574f44 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -4,7 +4,7 @@ use core::{ task::{Context, Poll}, }; -/// Wraps `async_task::Task`, a spawned future. +/// Wraps `async_executor::Task`, a spawned future. /// /// Tasks are also futures themselves and yield the output of the spawned future. /// @@ -14,16 +14,16 @@ use core::{ /// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. #[derive(Debug)] #[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."] -pub struct Task(async_task::Task); +pub struct Task(async_executor::Task); impl Task { - /// Creates a new task from a given `async_task::Task` - pub fn new(task: async_task::Task) -> Self { + /// Creates a new task from a given `async_executor::Task` + pub fn new(task: async_executor::Task) -> Self { Self(task) } /// Detaches the task to let it keep running in the background. See - /// `async_task::Task::detach` + /// `async_executor::Task::detach` pub fn detach(self) { self.0.detach(); } @@ -36,7 +36,7 @@ impl Task { /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of /// canceling because it also waits for the task to stop running. /// - /// See `async_task::Task::cancel` + /// See `async_executor::Task::cancel` pub async fn cancel(self) -> Option { self.0.cancel().await } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index b882ecc03038f..9fab3fbbe8317 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,19 +2,25 @@ use alloc::sync::Arc; use core::{future::Future, marker::PhantomData, mem, panic::AssertUnwindSafe}; use std::thread::{self, JoinHandle}; -use async_task::FallibleTask; +use async_executor::FallibleTask; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; -use crate::executor::{Executor, LocalExecutor}; - use crate::{ block_on, thread_executor::{ThreadExecutor, ThreadExecutorTicker}, Task, }; -use bevy_utils::OnDrop; +struct CallOnDrop(Option>); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + if let Some(call) = self.0.as_ref() { + call(); + } + } +} /// Used to create a [`TaskPool`] #[derive(Default)] @@ -96,7 +102,7 @@ impl TaskPoolBuilder { #[derive(Debug)] pub struct TaskPool { /// The executor for the pool. - executor: Arc>, + executor: Arc>, // The inner state of the pool. threads: Vec>, @@ -105,7 +111,7 @@ pub struct TaskPool { impl TaskPool { thread_local! { - static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; + static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; static THREAD_EXECUTOR: Arc> = Arc::new(ThreadExecutor::new()); } @@ -122,7 +128,7 @@ impl TaskPool { fn new_internal(builder: TaskPoolBuilder) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - let executor = Arc::new(Executor::new()); + let executor = Arc::new(async_executor::Executor::new()); let num_threads = builder .num_threads @@ -154,11 +160,7 @@ impl TaskPool { on_thread_spawn(); drop(on_thread_spawn); } - let _destructor = OnDrop::new(move || { - if let Some(f) = on_thread_destroy { - (f)(); - } - }); + let _destructor = CallOnDrop(on_thread_destroy); loop { let res = std::panic::catch_unwind(|| { let tick_forever = async move { @@ -342,9 +344,9 @@ impl TaskPool { // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor: &Executor = &self.executor; + let executor: &async_executor::Executor = &self.executor; // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env Executor = unsafe { mem::transmute(executor) }; + let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; // SAFETY: As above, all futures must complete in this function so we can change the lifetime let external_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(external_executor) }; @@ -430,7 +432,7 @@ impl TaskPool { #[inline] async fn execute_global_external_scope<'scope, 'ticker, T>( - executor: &'scope Executor<'scope>, + executor: &'scope async_executor::Executor<'scope>, external_ticker: ThreadExecutorTicker<'scope, 'ticker>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, @@ -476,7 +478,7 @@ impl TaskPool { #[inline] async fn execute_global_scope<'scope, 'ticker, T>( - executor: &'scope Executor<'scope>, + executor: &'scope async_executor::Executor<'scope>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, ) -> Vec { @@ -560,7 +562,7 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&LocalExecutor) -> R, + F: FnOnce(&async_executor::LocalExecutor) -> R, { Self::LOCAL_EXECUTOR.with(f) } @@ -591,7 +593,7 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope Executor<'scope>, + executor: &'scope async_executor::Executor<'scope>, external_executor: &'scope ThreadExecutor<'scope>, scope_executor: &'scope ThreadExecutor<'scope>, spawned: &'scope ConcurrentQueue>>>, diff --git a/crates/bevy_tasks/src/thread_executor.rs b/crates/bevy_tasks/src/thread_executor.rs index c29940cbd5fa8..b25811b559341 100644 --- a/crates/bevy_tasks/src/thread_executor.rs +++ b/crates/bevy_tasks/src/thread_executor.rs @@ -1,11 +1,9 @@ use core::marker::PhantomData; use std::thread::{self, ThreadId}; -use async_task::Task; +use async_executor::{Executor, Task}; use futures_lite::Future; -use crate::executor::Executor; - /// An executor that can only be ticked on the thread it was instantiated on. But /// can spawn `Send` tasks from other threads. /// diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 5be44f3b9e690..b260274a0fb11 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,20 +1,11 @@ use super::TaskPool; use core::ops::Deref; - -#[cfg(feature = "std")] use std::sync::OnceLock; -#[cfg(not(feature = "std"))] -use spin::Once; - macro_rules! taskpool { ($(#[$attr:meta])* ($static:ident, $type:ident)) => { - #[cfg(feature = "std")] static $static: OnceLock<$type> = OnceLock::new(); - #[cfg(not(feature = "std"))] - static $static: Once<$type> = Once::new(); - $(#[$attr])* #[derive(Debug)] pub struct $type(TaskPool); @@ -22,15 +13,7 @@ macro_rules! taskpool { impl $type { #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")] pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { - #[cfg(feature = "std")] - { - $static.get_or_init(|| Self(f())) - } - - #[cfg(not(feature = "std"))] - { - $static.call_once(|| Self(f())) - } + $static.get_or_init(|| Self(f())) } #[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \ From f2160bafc37453cedd578df49ee21b13106e082f Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Wed, 4 Dec 2024 16:52:38 +1100 Subject: [PATCH 05/10] Use `edge-executor` to offer `no_std` support in `bevy_tasks` --- crates/bevy_tasks/Cargo.toml | 24 +++- crates/bevy_tasks/src/executor.rs | 59 ++++++++ crates/bevy_tasks/src/iter/mod.rs | 1 + crates/bevy_tasks/src/lib.rs | 22 ++- .../src/single_threaded_task_pool.rs | 130 +++++++++++++----- crates/bevy_tasks/src/slice.rs | 1 + crates/bevy_tasks/src/task.rs | 4 +- crates/bevy_tasks/src/task_pool.rs | 20 +-- crates/bevy_tasks/src/thread_executor.rs | 2 +- crates/bevy_tasks/src/usages.rs | 19 ++- tools/ci/src/commands/compile_check_no_std.rs | 8 ++ 11 files changed, 237 insertions(+), 53 deletions(-) create mode 100644 crates/bevy_tasks/src/executor.rs diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index e915cd941f57f..41fe75643abc9 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -9,11 +9,29 @@ license = "MIT OR Apache-2.0" keywords = ["bevy"] [features] -multi_threaded = ["dep:async-channel", "dep:concurrent-queue"] +default = ["std", "async_executor"] +std = ["futures-lite/std", "async-task/std", "spin/std", "edge-executor?/std"] +multi_threaded = ["std", "dep:async-channel", "dep:concurrent-queue"] +async_executor = ["std", "dep:async-executor"] +edge_executor = ["dep:edge-executor"] +critical-section = ["edge-executor/critical-section"] +portable-atomic = ["edge-executor/portable-atomic"] [dependencies] -futures-lite = "2.0.1" -async-executor = "1.11" +futures-lite = { version = "2.0.1", default-features = false, features = ["alloc"] } +async-task = { version = "4.4.0", default-features = false } +spin = { version = "0.9.8", default-features = false, features = [ + "spin_mutex", + "rwlock", + "once", +] } +derive_more = { version = "1", default-features = false, features = [ + "deref", + "deref_mut", +] } + +async-executor = { version = "1.11", optional = true } +edge-executor = { version = "0.4.1", default-features = false, optional = true } async-channel = { version = "2.3.0", optional = true } async-io = { version = "2.0.0", optional = true } concurrent-queue = { version = "2.0.0", optional = true } diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs new file mode 100644 index 0000000000000..00f54fb942808 --- /dev/null +++ b/crates/bevy_tasks/src/executor.rs @@ -0,0 +1,59 @@ +pub use async_task::Task; +use core::{ + fmt, + panic::{RefUnwindSafe, UnwindSafe}, +}; +use derive_more::{Deref, DerefMut}; + +#[cfg(feature = "multi_threaded")] +pub use async_task::FallibleTask; + +#[cfg(feature = "async_executor")] +type ExecutorInner<'a> = async_executor::Executor<'a>; + +#[cfg(feature = "async_executor")] +type LocalExecutorInner<'a> = async_executor::LocalExecutor<'a>; + +#[cfg(all(not(feature = "async_executor"), feature = "edge_executor"))] +type ExecutorInner<'a> = edge_executor::Executor<'a, 64>; + +#[cfg(all(not(feature = "async_executor"), feature = "edge_executor"))] +type LocalExecutorInner<'a> = edge_executor::LocalExecutor<'a, 64>; + +#[derive(Deref, DerefMut, Default)] +pub struct Executor<'a>(ExecutorInner<'a>); + +#[derive(Deref, DerefMut, Default)] +pub struct LocalExecutor<'a>(LocalExecutorInner<'a>); + +impl Executor<'_> { + #[allow(dead_code, reason = "not all feature flags require this function")] + pub const fn new() -> Self { + Self(ExecutorInner::new()) + } +} + +impl LocalExecutor<'_> { + #[allow(dead_code, reason = "not all feature flags require this function")] + pub const fn new() -> Self { + Self(LocalExecutorInner::new()) + } +} + +impl UnwindSafe for Executor<'_> {} +impl RefUnwindSafe for Executor<'_> {} + +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} + +impl fmt::Debug for Executor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Executor").finish() + } +} + +impl fmt::Debug for LocalExecutor<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalExecutor").finish() + } +} diff --git a/crates/bevy_tasks/src/iter/mod.rs b/crates/bevy_tasks/src/iter/mod.rs index 3910166904856..4462fa95abd22 100644 --- a/crates/bevy_tasks/src/iter/mod.rs +++ b/crates/bevy_tasks/src/iter/mod.rs @@ -1,4 +1,5 @@ use crate::TaskPool; +use alloc::vec::Vec; mod adapters; pub use adapters::*; diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 1d6d35664ed0e..4daeab9bbb583 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -4,9 +4,12 @@ html_logo_url = "https://bevyengine.org/assets/icon.png", html_favicon_url = "https://bevyengine.org/assets/icon.png" )] +#![cfg_attr(not(feature = "std"), no_std)] extern crate alloc; +mod executor; + mod slice; pub use slice::{ParallelSlice, ParallelSliceMut}; @@ -37,9 +40,9 @@ mod thread_executor; #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] pub use thread_executor::{ThreadExecutor, ThreadExecutorTicker}; -#[cfg(feature = "async-io")] +#[cfg(all(feature = "async-io", feature = "std"))] pub use async_io::block_on; -#[cfg(not(feature = "async-io"))] +#[cfg(all(not(feature = "async-io"), feature = "std"))] pub use futures_lite::future::block_on; pub use futures_lite::future::poll_once; @@ -54,13 +57,17 @@ pub use futures_lite; pub mod prelude { #[doc(hidden)] pub use crate::{ - block_on, iter::ParallelIterator, slice::{ParallelSlice, ParallelSliceMut}, usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}, }; + + #[cfg(feature = "std")] + #[doc(hidden)] + pub use crate::block_on; } +#[cfg(feature = "std")] use core::num::NonZero; /// Gets the logical CPU core count available to the current process. @@ -70,7 +77,12 @@ use core::num::NonZero; /// /// This will always return at least 1. pub fn available_parallelism() -> usize { - std::thread::available_parallelism() + #[cfg(feature = "std")] + return std::thread::available_parallelism() .map(NonZero::::get) - .unwrap_or(1) + .unwrap_or(1); + + // Without access to std, assume a single thread is available + #[cfg(not(feature = "std"))] + return 1; } diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 054d22260eac4..71d03725ebe32 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,12 +1,28 @@ -use alloc::{rc::Rc, sync::Arc}; +use alloc::{string::String, sync::Arc, vec::Vec}; use core::{cell::RefCell, future::Future, marker::PhantomData, mem}; use crate::Task; +#[cfg(feature = "std")] +use crate::executor::LocalExecutor; + +#[cfg(not(feature = "std"))] +use crate::executor::Executor as LocalExecutor; + +#[cfg(feature = "std")] thread_local! { - static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; + static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; } +#[cfg(not(feature = "std"))] +static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() }; + +#[cfg(feature = "std")] +type ScopeResult = alloc::rc::Rc>>; + +#[cfg(not(feature = "std"))] +type ScopeResult = Arc>>; + /// Used to create a [`TaskPool`]. #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder {} @@ -124,15 +140,13 @@ impl TaskPool { // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor = &async_executor::LocalExecutor::new(); + let executor = &LocalExecutor::new(); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env async_executor::LocalExecutor<'env> = - unsafe { mem::transmute(executor) }; + let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) }; - let results: RefCell>>>> = RefCell::new(Vec::new()); + let results: RefCell>> = RefCell::new(Vec::new()); // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let results: &'env RefCell>>>> = - unsafe { mem::transmute(&results) }; + let results: &'env RefCell>> = unsafe { mem::transmute(&results) }; let mut scope = Scope { executor, @@ -152,7 +166,16 @@ impl TaskPool { let results = scope.results.borrow(); results .iter() - .map(|result| result.borrow_mut().take().unwrap()) + .map(|result| { + #[cfg(feature = "std")] + return result.borrow_mut().take().unwrap(); + + #[cfg(not(feature = "std"))] + { + let mut lock = result.lock(); + return lock.take().unwrap(); + } + }) .collect() } @@ -162,29 +185,42 @@ impl TaskPool { /// end-user. /// /// If the provided future is non-`Send`, [`TaskPool::spawn_local`] should be used instead. - pub fn spawn(&self, future: impl Future + 'static) -> Task + pub fn spawn( + &self, + future: impl Future + 'static + MaybeSend + MaybeSync, + ) -> Task where - T: 'static, + T: 'static + MaybeSend + MaybeSync, { - #[cfg(target_arch = "wasm32")] + #[cfg(all(target_arch = "wasm32", feature = "std"))] return Task::wrap_future(future); - #[cfg(not(target_arch = "wasm32"))] - { - LOCAL_EXECUTOR.with(|executor| { - let task = executor.spawn(future); - // Loop until all tasks are done - while executor.try_tick() {} + #[cfg(all(not(target_arch = "wasm32"), feature = "std"))] + return LOCAL_EXECUTOR.with(|executor| { + let task = executor.spawn(future); + // Loop until all tasks are done + while executor.try_tick() {} - Task::new(task) - }) - } + Task::new(task) + }); + + #[cfg(not(feature = "std"))] + return { + let task = LOCAL_EXECUTOR.spawn(future); + // Loop until all tasks are done + while LOCAL_EXECUTOR.try_tick() {} + + Task::new(task) + }; } /// Spawns a static future on the JS event loop. This is exactly the same as [`TaskPool::spawn`]. - pub fn spawn_local(&self, future: impl Future + 'static) -> Task + pub fn spawn_local( + &self, + future: impl Future + 'static + MaybeSend + MaybeSync, + ) -> Task where - T: 'static, + T: 'static + MaybeSend + MaybeSync, { self.spawn(future) } @@ -202,9 +238,13 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&async_executor::LocalExecutor) -> R, + F: FnOnce(&LocalExecutor) -> R, { - LOCAL_EXECUTOR.with(f) + #[cfg(feature = "std")] + return LOCAL_EXECUTOR.with(f); + + #[cfg(not(feature = "std"))] + return f(&LOCAL_EXECUTOR); } } @@ -213,9 +253,9 @@ impl TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope async_executor::LocalExecutor<'scope>, + executor: &'scope LocalExecutor<'scope>, // Vector to gather results of all futures spawned during scope run - results: &'env RefCell>>>>, + results: &'env RefCell>>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, @@ -230,7 +270,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'scope>(&self, f: Fut) { + pub fn spawn + 'scope + MaybeSend>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -241,7 +281,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_external + 'scope>(&self, f: Fut) { + pub fn spawn_on_external + 'scope + MaybeSend>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -250,13 +290,41 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// returned as a part of [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope + 'scope>(&self, f: Fut) { - let result = Rc::new(RefCell::new(None)); + pub fn spawn_on_scope + 'scope + MaybeSend>(&self, f: Fut) { + let result = ScopeResult::::default(); self.results.borrow_mut().push(result.clone()); let f = async move { let temp_result = f.await; + + #[cfg(feature = "std")] result.borrow_mut().replace(temp_result); + + #[cfg(not(feature = "std"))] + { + let mut lock = result.lock(); + *lock = Some(temp_result); + } }; self.executor.spawn(f).detach(); } } + +#[cfg(feature = "std")] +mod send_sync_bounds { + pub trait MaybeSend {} + impl MaybeSend for T {} + + pub trait MaybeSync {} + impl MaybeSync for T {} +} + +#[cfg(not(feature = "std"))] +mod send_sync_bounds { + pub trait MaybeSend: Send {} + impl MaybeSend for T {} + + pub trait MaybeSync: Sync {} + impl MaybeSync for T {} +} + +use send_sync_bounds::{MaybeSend, MaybeSync}; diff --git a/crates/bevy_tasks/src/slice.rs b/crates/bevy_tasks/src/slice.rs index a8a87c9ce80a0..5f964a4561778 100644 --- a/crates/bevy_tasks/src/slice.rs +++ b/crates/bevy_tasks/src/slice.rs @@ -1,4 +1,5 @@ use super::TaskPool; +use alloc::vec::Vec; /// Provides functions for mapping read-only slices across a provided [`TaskPool`]. pub trait ParallelSlice: AsRef<[T]> { diff --git a/crates/bevy_tasks/src/task.rs b/crates/bevy_tasks/src/task.rs index 53292c7574f44..cf5095408b0f3 100644 --- a/crates/bevy_tasks/src/task.rs +++ b/crates/bevy_tasks/src/task.rs @@ -14,11 +14,11 @@ use core::{ /// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. #[derive(Debug)] #[must_use = "Tasks are canceled when dropped, use `.detach()` to run them in the background."] -pub struct Task(async_executor::Task); +pub struct Task(crate::executor::Task); impl Task { /// Creates a new task from a given `async_executor::Task` - pub fn new(task: async_executor::Task) -> Self { + pub fn new(task: crate::executor::Task) -> Self { Self(task) } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9fab3fbbe8317..6a4e000864232 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,7 +2,7 @@ use alloc::sync::Arc; use core::{future::Future, marker::PhantomData, mem, panic::AssertUnwindSafe}; use std::thread::{self, JoinHandle}; -use async_executor::FallibleTask; +use crate::executor::FallibleTask; use concurrent_queue::ConcurrentQueue; use futures_lite::FutureExt; @@ -102,7 +102,7 @@ impl TaskPoolBuilder { #[derive(Debug)] pub struct TaskPool { /// The executor for the pool. - executor: Arc>, + executor: Arc>, // The inner state of the pool. threads: Vec>, @@ -111,7 +111,7 @@ pub struct TaskPool { impl TaskPool { thread_local! { - static LOCAL_EXECUTOR: async_executor::LocalExecutor<'static> = const { async_executor::LocalExecutor::new() }; + static LOCAL_EXECUTOR: crate::executor::LocalExecutor<'static> = const { crate::executor::LocalExecutor::new() }; static THREAD_EXECUTOR: Arc> = Arc::new(ThreadExecutor::new()); } @@ -128,7 +128,7 @@ impl TaskPool { fn new_internal(builder: TaskPoolBuilder) -> Self { let (shutdown_tx, shutdown_rx) = async_channel::unbounded::<()>(); - let executor = Arc::new(async_executor::Executor::new()); + let executor = Arc::new(crate::executor::Executor::new()); let num_threads = builder .num_threads @@ -344,9 +344,9 @@ impl TaskPool { // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. // Any usages of the references passed into `Scope` must be accessed through // the transmuted reference for the rest of this function. - let executor: &async_executor::Executor = &self.executor; + let executor: &crate::executor::Executor = &self.executor; // SAFETY: As above, all futures must complete in this function so we can change the lifetime - let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; + let executor: &'env crate::executor::Executor = unsafe { mem::transmute(executor) }; // SAFETY: As above, all futures must complete in this function so we can change the lifetime let external_executor: &'env ThreadExecutor<'env> = unsafe { mem::transmute(external_executor) }; @@ -432,7 +432,7 @@ impl TaskPool { #[inline] async fn execute_global_external_scope<'scope, 'ticker, T>( - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope crate::executor::Executor<'scope>, external_ticker: ThreadExecutorTicker<'scope, 'ticker>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, @@ -478,7 +478,7 @@ impl TaskPool { #[inline] async fn execute_global_scope<'scope, 'ticker, T>( - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope crate::executor::Executor<'scope>, scope_ticker: ThreadExecutorTicker<'scope, 'ticker>, get_results: impl Future>, ) -> Vec { @@ -562,7 +562,7 @@ impl TaskPool { /// ``` pub fn with_local_executor(&self, f: F) -> R where - F: FnOnce(&async_executor::LocalExecutor) -> R, + F: FnOnce(&crate::executor::LocalExecutor) -> R, { Self::LOCAL_EXECUTOR.with(f) } @@ -593,7 +593,7 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'scope async_executor::Executor<'scope>, + executor: &'scope crate::executor::Executor<'scope>, external_executor: &'scope ThreadExecutor<'scope>, scope_executor: &'scope ThreadExecutor<'scope>, spawned: &'scope ConcurrentQueue>>>, diff --git a/crates/bevy_tasks/src/thread_executor.rs b/crates/bevy_tasks/src/thread_executor.rs index b25811b559341..0f8a9c3be9038 100644 --- a/crates/bevy_tasks/src/thread_executor.rs +++ b/crates/bevy_tasks/src/thread_executor.rs @@ -1,7 +1,7 @@ use core::marker::PhantomData; use std::thread::{self, ThreadId}; -use async_executor::{Executor, Task}; +use crate::executor::{Executor, Task}; use futures_lite::Future; /// An executor that can only be ticked on the thread it was instantiated on. But diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index b260274a0fb11..5be44f3b9e690 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -1,11 +1,20 @@ use super::TaskPool; use core::ops::Deref; + +#[cfg(feature = "std")] use std::sync::OnceLock; +#[cfg(not(feature = "std"))] +use spin::Once; + macro_rules! taskpool { ($(#[$attr:meta])* ($static:ident, $type:ident)) => { + #[cfg(feature = "std")] static $static: OnceLock<$type> = OnceLock::new(); + #[cfg(not(feature = "std"))] + static $static: Once<$type> = Once::new(); + $(#[$attr])* #[derive(Debug)] pub struct $type(TaskPool); @@ -13,7 +22,15 @@ macro_rules! taskpool { impl $type { #[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")] pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self { - $static.get_or_init(|| Self(f())) + #[cfg(feature = "std")] + { + $static.get_or_init(|| Self(f())) + } + + #[cfg(not(feature = "std"))] + { + $static.call_once(|| Self(f())) + } } #[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \ diff --git a/tools/ci/src/commands/compile_check_no_std.rs b/tools/ci/src/commands/compile_check_no_std.rs index 82ba3d4688eb4..bc51bd7159361 100644 --- a/tools/ci/src/commands/compile_check_no_std.rs +++ b/tools/ci/src/commands/compile_check_no_std.rs @@ -70,6 +70,14 @@ impl Prepare for CompileCheckNoStdCommand { "Please fix compiler errors in output above for bevy_math no_std compatibility.", )); + commands.push(PreparedCommand::new::( + cmd!( + sh, + "cargo check -p bevy_tasks --no-default-features --features edge_executor --features critical-section --target {target}" + ), + "Please fix compiler errors in output above for bevy_tasks no_std compatibility.", + )); + commands } } From 1d74b7560f24c50e9636af12736a34cec3be796c Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Wed, 4 Dec 2024 16:55:13 +1100 Subject: [PATCH 06/10] Taplo! --- crates/bevy_tasks/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 41fe75643abc9..55e9bb8aa0cc0 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -18,7 +18,9 @@ critical-section = ["edge-executor/critical-section"] portable-atomic = ["edge-executor/portable-atomic"] [dependencies] -futures-lite = { version = "2.0.1", default-features = false, features = ["alloc"] } +futures-lite = { version = "2.0.1", default-features = false, features = [ + "alloc", +] } async-task = { version = "4.4.0", default-features = false } spin = { version = "0.9.8", default-features = false, features = [ "spin_mutex", From f54d306cf6b4548c3e16c134c5e9f9fa823c8aeb Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Thu, 5 Dec 2024 07:13:28 +1100 Subject: [PATCH 07/10] Feedback from Reviewers Properly implement `portable-atomic` and `critical-section` features to enable compilation on `thumbv6m-none-eabi` (as an example platform) Co-Authored-By: Mike <2180432+hymm@users.noreply.github.com> --- crates/bevy_tasks/Cargo.toml | 29 +++++++++++++++++-- crates/bevy_tasks/README.md | 4 +++ .../src/single_threaded_task_pool.rs | 8 ++++- tools/ci/src/commands/compile_check_no_std.rs | 2 +- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 55e9bb8aa0cc0..00ec38e4005c2 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -10,12 +10,28 @@ keywords = ["bevy"] [features] default = ["std", "async_executor"] -std = ["futures-lite/std", "async-task/std", "spin/std", "edge-executor?/std"] +std = [ + "futures-lite/std", + "async-task/std", + "spin/std", + "edge-executor?/std", + "portable-atomic-util?/std", +] multi_threaded = ["std", "dep:async-channel", "dep:concurrent-queue"] async_executor = ["std", "dep:async-executor"] edge_executor = ["dep:edge-executor"] -critical-section = ["edge-executor/critical-section"] -portable-atomic = ["edge-executor/portable-atomic"] +critical-section = [ + "dep:critical-section", + "edge-executor?/critical-section", + "portable-atomic?/critical-section", +] +portable-atomic = [ + "dep:portable-atomic", + "dep:portable-atomic-util", + "edge-executor?/portable-atomic", + "async-task/portable-atomic", + "spin/portable_atomic", +] [dependencies] futures-lite = { version = "2.0.1", default-features = false, features = [ @@ -37,6 +53,13 @@ edge-executor = { version = "0.4.1", default-features = false, optional = true } async-channel = { version = "2.3.0", optional = true } async-io = { version = "2.0.0", optional = true } concurrent-queue = { version = "2.0.0", optional = true } +critical-section = { version = "1.2.0", optional = true } +portable-atomic = { version = "1", default-features = false, features = [ + "fallback", +], optional = true } +portable-atomic-util = { version = "0.2.4", features = [ + "alloc", +], optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/README.md b/crates/bevy_tasks/README.md index 91ac95dce8a27..2af6a606f65fa 100644 --- a/crates/bevy_tasks/README.md +++ b/crates/bevy_tasks/README.md @@ -34,6 +34,10 @@ The determining factor for what kind of work should go in each pool is latency r await receiving data from somewhere (i.e. disk) and signal other systems when the data is ready for consumption. (likely via channels) +## `no_std` Support + +To enable `no_std` support in this crate, you will need to disable default features, and enable the `edge_executor` and `critical-section` features. For platforms without full support for Rust atomics, you may also need to enable the `portable-atomic` feature. + [bevy]: https://bevyengine.org [rayon]: https://github.com/rayon-rs/rayon [async-executor]: https://github.com/stjepang/async-executor diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 71d03725ebe32..84b21db90690d 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,8 +1,14 @@ -use alloc::{string::String, sync::Arc, vec::Vec}; +use alloc::{string::String, vec::Vec}; use core::{cell::RefCell, future::Future, marker::PhantomData, mem}; use crate::Task; +#[cfg(feature = "portable-atomic")] +use portable_atomic_util::Arc; + +#[cfg(not(feature = "portable-atomic"))] +use alloc::sync::Arc; + #[cfg(feature = "std")] use crate::executor::LocalExecutor; diff --git a/tools/ci/src/commands/compile_check_no_std.rs b/tools/ci/src/commands/compile_check_no_std.rs index bc51bd7159361..3c95c7836e59e 100644 --- a/tools/ci/src/commands/compile_check_no_std.rs +++ b/tools/ci/src/commands/compile_check_no_std.rs @@ -73,7 +73,7 @@ impl Prepare for CompileCheckNoStdCommand { commands.push(PreparedCommand::new::( cmd!( sh, - "cargo check -p bevy_tasks --no-default-features --features edge_executor --features critical-section --target {target}" + "cargo check -p bevy_tasks --no-default-features --features edge_executor,critical-section --target {target}" ), "Please fix compiler errors in output above for bevy_tasks no_std compatibility.", )); From b125707f9101f090f8399cdabee6aac6996e83f5 Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Fri, 6 Dec 2024 11:05:08 +1100 Subject: [PATCH 08/10] Remove duplicate entry in `check-compile-no-std` --- tools/ci/src/commands/compile_check_no_std.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tools/ci/src/commands/compile_check_no_std.rs b/tools/ci/src/commands/compile_check_no_std.rs index c5d47f1b03062..6e057ba4d2050 100644 --- a/tools/ci/src/commands/compile_check_no_std.rs +++ b/tools/ci/src/commands/compile_check_no_std.rs @@ -62,14 +62,6 @@ impl Prepare for CompileCheckNoStdCommand { "Please fix compiler errors in output above for bevy_mikktspace no_std compatibility.", )); - commands.push(PreparedCommand::new::( - cmd!( - sh, - "cargo check -p bevy_mikktspace --no-default-features --features libm --target {target}" - ), - "Please fix compiler errors in output above for bevy_mikktspace no_std compatibility.", - )); - commands.push(PreparedCommand::new::( cmd!( sh, From b1c3b750c1c545ef0137c9dd1d3f816f63445860 Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Fri, 6 Dec 2024 12:58:45 +1100 Subject: [PATCH 09/10] Added documentation to the remaining public items in `bevy_tasks` Co-Authored-By: Alice Cecile --- crates/bevy_tasks/src/executor.rs | 25 +++++++++++++++++++ crates/bevy_tasks/src/iter/adapters.rs | 13 ++++++++++ crates/bevy_tasks/src/lib.rs | 11 +++++--- .../src/single_threaded_task_pool.rs | 2 +- 4 files changed, 47 insertions(+), 4 deletions(-) diff --git a/crates/bevy_tasks/src/executor.rs b/crates/bevy_tasks/src/executor.rs index 00f54fb942808..04667c1b16d59 100644 --- a/crates/bevy_tasks/src/executor.rs +++ b/crates/bevy_tasks/src/executor.rs @@ -1,3 +1,13 @@ +//! Provides a fundamental executor primitive appropriate for the target platform +//! and feature set selected. +//! By default, the `async_executor` feature will be enabled, which will rely on +//! [`async-executor`] for the underlying implementation. This requires `std`, +//! so is not suitable for `no_std` contexts. Instead, you must use `edge_executor`, +//! which relies on the alternate [`edge-executor`] backend. +//! +//! [`async-executor`]: https://crates.io/crates/async-executor +//! [`edge-executor`]: https://crates.io/crates/edge-executor + pub use async_task::Task; use core::{ fmt, @@ -20,13 +30,27 @@ type ExecutorInner<'a> = edge_executor::Executor<'a, 64>; #[cfg(all(not(feature = "async_executor"), feature = "edge_executor"))] type LocalExecutorInner<'a> = edge_executor::LocalExecutor<'a, 64>; +/// Wrapper around a multi-threading-aware async executor. +/// Spawning will generally require tasks to be `Send` and `Sync` to allow multiple +/// threads to send/receive/advance tasks. +/// +/// If you require an executor _without_ the `Send` and `Sync` requirements, consider +/// using [`LocalExecutor`] instead. #[derive(Deref, DerefMut, Default)] pub struct Executor<'a>(ExecutorInner<'a>); +/// Wrapper around a single-threaded async executor. +/// Spawning wont generally require tasks to be `Send` and `Sync`, at the cost of +/// this executor itself not being `Send` or `Sync`. This makes it unsuitable for +/// global statics. +/// +/// If need to store an executor in a global static, or send across threads, +/// consider using [`Executor`] instead. #[derive(Deref, DerefMut, Default)] pub struct LocalExecutor<'a>(LocalExecutorInner<'a>); impl Executor<'_> { + /// Construct a new [`Executor`] #[allow(dead_code, reason = "not all feature flags require this function")] pub const fn new() -> Self { Self(ExecutorInner::new()) @@ -34,6 +58,7 @@ impl Executor<'_> { } impl LocalExecutor<'_> { + /// Construct a new [`LocalExecutor`] #[allow(dead_code, reason = "not all feature flags require this function")] pub const fn new() -> Self { Self(LocalExecutorInner::new()) diff --git a/crates/bevy_tasks/src/iter/adapters.rs b/crates/bevy_tasks/src/iter/adapters.rs index 617f5bdf868ca..a3166120790c9 100644 --- a/crates/bevy_tasks/src/iter/adapters.rs +++ b/crates/bevy_tasks/src/iter/adapters.rs @@ -1,5 +1,7 @@ use crate::iter::ParallelIterator; +/// Chains two [`ParallelIterator`]s `T` and `U`, first returning +/// batches from `T`, and then from `U`. #[derive(Debug)] pub struct Chain { pub(crate) left: T, @@ -24,6 +26,7 @@ where } } +/// Maps a [`ParallelIterator`] `P` using the provided function `F`. #[derive(Debug)] pub struct Map { pub(crate) iter: P, @@ -41,6 +44,7 @@ where } } +/// Filters a [`ParallelIterator`] `P` using the provided predicate `F`. #[derive(Debug)] pub struct Filter { pub(crate) iter: P, @@ -60,6 +64,7 @@ where } } +/// Filter-maps a [`ParallelIterator`] `P` using the provided function `F`. #[derive(Debug)] pub struct FilterMap { pub(crate) iter: P, @@ -77,6 +82,7 @@ where } } +/// Flat-maps a [`ParallelIterator`] `P` using the provided function `F`. #[derive(Debug)] pub struct FlatMap { pub(crate) iter: P, @@ -98,6 +104,7 @@ where } } +/// Flattens a [`ParallelIterator`] `P`. #[derive(Debug)] pub struct Flatten

{ pub(crate) iter: P, @@ -117,6 +124,8 @@ where } } +/// Fuses a [`ParallelIterator`] `P`, ensuring once it returns [`None`] once, it always +/// returns [`None`]. #[derive(Debug)] pub struct Fuse

{ pub(crate) iter: Option

, @@ -138,6 +147,7 @@ where } } +/// Inspects a [`ParallelIterator`] `P` using the provided function `F`. #[derive(Debug)] pub struct Inspect { pub(crate) iter: P, @@ -155,6 +165,7 @@ where } } +/// Copies a [`ParallelIterator`] `P`'s returned values. #[derive(Debug)] pub struct Copied

{ pub(crate) iter: P, @@ -171,6 +182,7 @@ where } } +/// Clones a [`ParallelIterator`] `P`'s returned values. #[derive(Debug)] pub struct Cloned

{ pub(crate) iter: P, @@ -187,6 +199,7 @@ where } } +/// Cycles a [`ParallelIterator`] `P` indefinitely. #[derive(Debug)] pub struct Cycle

{ pub(crate) iter: P, diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 4daeab9bbb583..123e557c83beb 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -76,13 +76,18 @@ use core::num::NonZero; /// it will return a default value of 1 if it internally errors out. /// /// This will always return at least 1. +#[cfg(feature = "std")] pub fn available_parallelism() -> usize { - #[cfg(feature = "std")] return std::thread::available_parallelism() .map(NonZero::::get) .unwrap_or(1); +} +/// Gets the logical CPU core count available to the current process. +/// +/// This will always return at least 1. +#[cfg(not(feature = "std"))] +pub fn available_parallelism() -> usize { // Without access to std, assume a single thread is available - #[cfg(not(feature = "std"))] - return 1; + 1 } diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 84b21db90690d..51adc739c1f8c 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -179,7 +179,7 @@ impl TaskPool { #[cfg(not(feature = "std"))] { let mut lock = result.lock(); - return lock.take().unwrap(); + lock.take().unwrap() } }) .collect() From 7ed55db55805d421ec2b6651f0efd620ccd7b6c1 Mon Sep 17 00:00:00 2001 From: Zac Harrold Date: Fri, 6 Dec 2024 13:01:41 +1100 Subject: [PATCH 10/10] Clippy! --- crates/bevy_tasks/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 123e557c83beb..3f3db301bbb00 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -78,9 +78,9 @@ use core::num::NonZero; /// This will always return at least 1. #[cfg(feature = "std")] pub fn available_parallelism() -> usize { - return std::thread::available_parallelism() + std::thread::available_parallelism() .map(NonZero::::get) - .unwrap_or(1); + .unwrap_or(1) } /// Gets the logical CPU core count available to the current process.