Skip to content

Commit

Permalink
refactor: Extract parallel queue abstraction (#7348)
Browse files Browse the repository at this point in the history
# Objective
There's a repeating pattern of `ThreadLocal<Cell<Vec<T>>>` which is very
useful for low overhead, low contention multithreaded queues that have
cropped up in a few places in the engine. This pattern is surprisingly
useful when building deferred mutation across multiple threads, as noted
by it's use in `ParallelCommands`.

However, `ThreadLocal<Cell<Vec<T>>>` is not only a mouthful, it's also
hard to ensure the thread-local queue is replaced after it's been
temporarily removed from the `Cell`.

## Solution
Wrap the pattern into `bevy_utils::Parallel<T>` which codifies the
entire pattern and ensures the user follows the contract. Instead of
fetching indivdual cells, removing the value, mutating it, and replacing
it, `Parallel::get` returns a `ParRef<'a, T>` which contains the
temporarily removed value and a reference back to the cell, and will
write the mutated value back to the cell upon being dropped.

I would like to use this to simplify the remaining part of #4899 that
has not been adopted/merged.

---

## Changelog
TODO

---------

Co-authored-by: Joseph <[email protected]>
  • Loading branch information
james7132 and JoJoJet authored Feb 19, 2024
1 parent 5f1dd39 commit e34fb68
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 28 deletions.
1 change: 0 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.13.0" }
bevy_ecs_macros = { path = "macros", version = "0.13.0" }

async-channel = "2.1.0"
thread_local = "1.1.4"
fixedbitset = "0.4.2"
rustc-hash = "1.1"
downcast-rs = "1.2"
Expand Down
25 changes: 8 additions & 17 deletions crates/bevy_ecs/src/system/commands/parallel_scope.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::cell::Cell;

use thread_local::ThreadLocal;
use bevy_utils::Parallel;

use crate::{
self as bevy_ecs,
Expand All @@ -13,7 +11,7 @@ use super::{CommandQueue, Commands};

#[derive(Default)]
struct ParallelCommandQueue {
thread_local_storage: ThreadLocal<Cell<CommandQueue>>,
thread_queues: Parallel<CommandQueue>,
}

/// An alternative to [`Commands`] that can be used in parallel contexts, such as those in [`Query::par_iter`](crate::system::Query::par_iter)
Expand Down Expand Up @@ -53,8 +51,8 @@ impl SystemBuffer for ParallelCommandQueue {
fn apply(&mut self, _system_meta: &SystemMeta, world: &mut World) {
#[cfg(feature = "trace")]
let _system_span = _system_meta.commands_span.enter();
for cq in &mut self.thread_local_storage {
cq.get_mut().apply(world);
for cq in self.thread_queues.iter_mut() {
cq.apply(world);
}
}
}
Expand All @@ -64,16 +62,9 @@ impl<'w, 's> ParallelCommands<'w, 's> {
///
/// For an example, see the type-level documentation for [`ParallelCommands`].
pub fn command_scope<R>(&self, f: impl FnOnce(Commands) -> R) -> R {
let store = &self.state.thread_local_storage;
let command_queue_cell = store.get_or_default();
let mut command_queue = command_queue_cell.take();

let r = f(Commands::new_from_entities(
&mut command_queue,
self.entities,
));

command_queue_cell.set(command_queue);
r
self.state.thread_queues.scope(|queue| {
let commands = Commands::new_from_entities(queue, self.entities);
f(commands)
})
}
}
17 changes: 7 additions & 10 deletions crates/bevy_render/src/view/visibility/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use bevy_ecs::prelude::*;
use bevy_hierarchy::{Children, Parent};
use bevy_reflect::{std_traits::ReflectDefault, Reflect};
use bevy_transform::{components::GlobalTransform, TransformSystem};
use std::cell::Cell;
use thread_local::ThreadLocal;
use bevy_utils::Parallel;

use crate::deterministic::DeterministicRenderingConfig;
use crate::{
Expand Down Expand Up @@ -372,7 +371,7 @@ fn reset_view_visibility(mut query: Query<&mut ViewVisibility>) {
/// [`ViewVisibility`] of all entities, and for each view also compute the [`VisibleEntities`]
/// for that view.
pub fn check_visibility(
mut thread_queues: Local<ThreadLocal<Cell<Vec<Entity>>>>,
mut thread_queues: Local<Parallel<Vec<Entity>>>,
mut view_query: Query<(
&mut VisibleEntities,
&Frustum,
Expand Down Expand Up @@ -440,15 +439,13 @@ pub fn check_visibility(
}

view_visibility.set();
let cell = thread_queues.get_or_default();
let mut queue = cell.take();
queue.push(entity);
cell.set(queue);
thread_queues.scope(|queue| {
queue.push(entity);
});
});

for cell in &mut thread_queues {
visible_entities.entities.append(cell.get_mut());
}
visible_entities.entities.clear();
thread_queues.drain_into(&mut visible_entities.entities);
if deterministic_rendering_config.stable_sort_z_fighting {
// We can use the faster unstable sort here because
// the values (`Entity`) are guaranteed to be unique.
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hashbrown = { version = "0.14", features = ["serde"] }
bevy_utils_proc_macros = { version = "0.13.0", path = "macros" }
petgraph = "0.6"
thiserror = "1.0"
thread_local = "1.0"
nonmax = "0.5"
smallvec = { version = "1.11", features = ["serde", "union", "const_generics"] }

Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod default;
mod float_ord;
pub mod intern;
mod once;
mod parallel_queue;

pub use crate::uuid::Uuid;
pub use ahash::{AHasher, RandomState};
Expand All @@ -30,6 +31,7 @@ pub use cow_arc::*;
pub use default::default;
pub use float_ord::*;
pub use hashbrown;
pub use parallel_queue::*;
pub use petgraph;
pub use smallvec;
pub use thiserror;
Expand Down
72 changes: 72 additions & 0 deletions crates/bevy_utils/src/parallel_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use core::cell::Cell;
use thread_local::ThreadLocal;

/// A cohesive set of thread-local values of a given type.
///
/// Mutable references can be fetched if `T: Default` via [`Parallel::scope`].
#[derive(Default)]
pub struct Parallel<T: Send> {
locals: ThreadLocal<Cell<T>>,
}

impl<T: Send> Parallel<T> {
/// Gets a mutable iterator over all of the per-thread queues.
pub fn iter_mut(&mut self) -> impl Iterator<Item = &'_ mut T> {
self.locals.iter_mut().map(|cell| cell.get_mut())
}

/// Clears all of the stored thread local values.
pub fn clear(&mut self) {
self.locals.clear();
}
}

impl<T: Default + Send> Parallel<T> {
/// Retrieves the thread-local value for the current thread and runs `f` on it.
///
/// If there is no thread-local value, it will be initialized to it's default.
pub fn scope<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
let cell = self.locals.get_or_default();
let mut value = cell.take();
let ret = f(&mut value);
cell.set(value);
ret
}
}

impl<T, I> Parallel<I>
where
I: IntoIterator<Item = T> + Default + Send + 'static,
{
/// Drains all enqueued items from all threads and returns an iterator over them.
///
/// Unlike [`Vec::drain`], this will piecemeal remove chunks of the data stored.
/// If iteration is terminated part way, the rest of the enqueued items in the same
/// chunk will be dropped, and the rest of the undrained elements will remain.
///
/// The ordering is not guaranteed.
pub fn drain<B>(&mut self) -> impl Iterator<Item = T> + '_
where
B: FromIterator<T>,
{
self.locals.iter_mut().flat_map(|item| item.take())
}
}

impl<T: Send> Parallel<Vec<T>> {
/// Collect all enqueued items from all threads and appends them to the end of a
/// single Vec.
///
/// The ordering is not guarenteed.
pub fn drain_into(&mut self, out: &mut Vec<T>) {
let size = self
.locals
.iter_mut()
.map(|queue| queue.get_mut().len())
.sum();
out.reserve(size);
for queue in self.locals.iter_mut() {
out.append(queue.get_mut());
}
}
}

0 comments on commit e34fb68

Please sign in to comment.