From 421c299c6e01786ddc368296eb9ef9df6c47a22d Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Thu, 15 Feb 2024 14:27:45 -0500 Subject: [PATCH 01/11] Run the multi-threaded executor at the end of each system task. This allows it to run immediately instead of needing to wait for the main thread to wake up. Move the mutable executor state into a separate struct and wrap it in a mutex so it can be shared among the worker threads. --- crates/bevy_ecs/Cargo.toml | 2 +- .../src/schedule/executor/multi_threaded.rs | 427 +++++++++--------- .../src/single_threaded_task_pool.rs | 8 +- 3 files changed, 224 insertions(+), 213 deletions(-) diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index e368c1398963e..1e8f5fe1fe033 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -22,7 +22,7 @@ bevy_tasks = { path = "../bevy_tasks", version = "0.14.0-dev" } bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" } bevy_ecs_macros = { path = "macros", version = "0.14.0-dev" } -async-channel = "2.2.0" +concurrent-queue = "2.4.0" fixedbitset = "0.4.2" rustc-hash = "1.1" downcast-rs = "1.2" diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 1e00c3893053e..1bc4e541160bb 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -3,14 +3,14 @@ use std::{ sync::{Arc, Mutex}, }; -use bevy_tasks::{block_on, poll_once, ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; use bevy_utils::default; use bevy_utils::syncunsafecell::SyncUnsafeCell; #[cfg(feature = "trace")] -use bevy_utils::tracing::{info_span, Instrument, Span}; +use bevy_utils::tracing::{info_span, Span}; use std::panic::AssertUnwindSafe; -use async_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use fixedbitset::FixedBitSet; use crate::{ @@ -31,8 +31,8 @@ struct SyncUnsafeSchedule<'a> { } struct Conditions<'a> { - system_conditions: &'a mut [Vec], - set_conditions: &'a mut [Vec], + system_conditions: &'a SyncUnsafeCell<[Vec]>, + set_conditions: &'a SyncUnsafeCell<[Vec]>, sets_with_conditions_of_systems: &'a [FixedBitSet], systems_in_sets_with_conditions: &'a [FixedBitSet], } @@ -42,8 +42,10 @@ impl SyncUnsafeSchedule<'_> { SyncUnsafeSchedule { systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(), conditions: Conditions { - system_conditions: &mut schedule.system_conditions, - set_conditions: &mut schedule.set_conditions, + system_conditions: SyncUnsafeCell::from_mut( + schedule.system_conditions.as_mut_slice(), + ), + set_conditions: SyncUnsafeCell::from_mut(schedule.set_conditions.as_mut_slice()), sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems, systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions, }, @@ -75,10 +77,16 @@ struct SystemResult { /// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel. pub struct MultiThreadedExecutor { - /// Sends system completion events. - sender: Sender, - /// Receives system completion events. - receiver: Receiver, + /// The running state, protected by a mutex so that the . + state: Mutex, + /// Queue of system completion events. + system_completion: ConcurrentQueue, + /// When set, tells the executor that a thread has panicked. + panic_payload: Mutex>>, +} + +/// The state of the executor while running. +pub struct ExecutorState { /// Metadata for scheduling and running system tasks. system_task_metadata: Vec, /// Union of the accesses of all currently running systems. @@ -111,12 +119,23 @@ pub struct MultiThreadedExecutor { unapplied_systems: FixedBitSet, /// Setting when true applies deferred system buffers after all systems have run apply_final_deferred: bool, - /// When set, tells the executor that a thread has panicked. - panic_payload: Arc>>>, /// When set, stops the executor from running any more systems. stop_spawning: bool, } +/// References to data required by the executor. +/// This is copied to each system task so that can invoke the executor when they complete. +// These all need to outlive 'scope in order to be sent to new tasks, +// and keeping them all in a struct means we can use lifetime elision. +#[derive(Copy, Clone)] +pub struct Context<'scope, 'env: 'scope> { + executor: &'env MultiThreadedExecutor, + scope: &'scope Scope<'scope, 'env, ()>, + systems: &'env [SyncUnsafeCell], + conditions: &'env Conditions<'env>, + world_cell: UnsafeWorldCell<'env>, +} + impl Default for MultiThreadedExecutor { fn default() -> Self { Self::new() @@ -129,25 +148,23 @@ impl SystemExecutor for MultiThreadedExecutor { } fn init(&mut self, schedule: &SystemSchedule) { + let state = self.state.get_mut().unwrap(); // pre-allocate space let sys_count = schedule.system_ids.len(); let set_count = schedule.set_ids.len(); - let (tx, rx) = async_channel::bounded(sys_count.max(1)); + self.system_completion = ConcurrentQueue::bounded(sys_count.max(1)); + state.evaluated_sets = FixedBitSet::with_capacity(set_count); + state.ready_systems = FixedBitSet::with_capacity(sys_count); + state.ready_systems_copy = FixedBitSet::with_capacity(sys_count); + state.running_systems = FixedBitSet::with_capacity(sys_count); + state.completed_systems = FixedBitSet::with_capacity(sys_count); + state.skipped_systems = FixedBitSet::with_capacity(sys_count); + state.unapplied_systems = FixedBitSet::with_capacity(sys_count); - self.sender = tx; - self.receiver = rx; - self.evaluated_sets = FixedBitSet::with_capacity(set_count); - self.ready_systems = FixedBitSet::with_capacity(sys_count); - self.ready_systems_copy = FixedBitSet::with_capacity(sys_count); - self.running_systems = FixedBitSet::with_capacity(sys_count); - self.completed_systems = FixedBitSet::with_capacity(sys_count); - self.skipped_systems = FixedBitSet::with_capacity(sys_count); - self.unapplied_systems = FixedBitSet::with_capacity(sys_count); - - self.system_task_metadata = Vec::with_capacity(sys_count); + state.system_task_metadata = Vec::with_capacity(sys_count); for index in 0..sys_count { - self.system_task_metadata.push(SystemTaskMetadata { + state.system_task_metadata.push(SystemTaskMetadata { archetype_component_access: default(), dependents: schedule.system_dependents[index].clone(), is_send: schedule.systems[index].is_send(), @@ -160,7 +177,7 @@ impl SystemExecutor for MultiThreadedExecutor { }); } - self.num_dependencies_remaining = Vec::with_capacity(sys_count); + state.num_dependencies_remaining = Vec::with_capacity(sys_count); } fn run( @@ -169,20 +186,23 @@ impl SystemExecutor for MultiThreadedExecutor { world: &mut World, _skip_systems: Option<&FixedBitSet>, ) { + let state = self.state.get_mut().unwrap(); // reset counts - self.num_systems = schedule.systems.len(); - if self.num_systems == 0 { + state.num_systems = schedule.systems.len(); + if state.num_systems == 0 { return; } - self.num_running_systems = 0; - self.num_completed_systems = 0; - self.num_dependencies_remaining.clear(); - self.num_dependencies_remaining + state.num_running_systems = 0; + state.num_completed_systems = 0; + state.num_dependencies_remaining.clear(); + state + .num_dependencies_remaining .extend_from_slice(&schedule.system_dependencies); - for (system_index, dependencies) in self.num_dependencies_remaining.iter_mut().enumerate() { + for (system_index, dependencies) in state.num_dependencies_remaining.iter_mut().enumerate() + { if *dependencies == 0 { - self.ready_systems.insert(system_index); + state.ready_systems.insert(system_index); } } @@ -190,16 +210,16 @@ impl SystemExecutor for MultiThreadedExecutor { // not be run. #[cfg(feature = "bevy_debug_stepping")] if let Some(skipped_systems) = _skip_systems { - debug_assert_eq!(skipped_systems.len(), self.completed_systems.len()); + debug_assert_eq!(skipped_systems.len(), state.completed_systems.len()); // mark skipped systems as completed - self.completed_systems |= skipped_systems; - self.num_completed_systems = self.completed_systems.count_ones(..); + state.completed_systems |= skipped_systems; + state.num_completed_systems = state.completed_systems.count_ones(..); // signal the dependencies for each of the skipped systems, as // though they had run for system_index in skipped_systems.ones() { - self.signal_dependents(system_index); - self.ready_systems.set(system_index, false); + state.signal_dependents(system_index); + state.ready_systems.set(system_index, false); } } @@ -210,67 +230,38 @@ impl SystemExecutor for MultiThreadedExecutor { let SyncUnsafeSchedule { systems, - mut conditions, + conditions, } = SyncUnsafeSchedule::new(schedule); ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor( false, thread_executor, |scope| { - // the executor itself is a `Send` future so that it can run - // alongside systems that claim the local thread - #[allow(unused_mut)] - let mut executor = Box::pin(async { - let world_cell = world.as_unsafe_world_cell(); - while self.num_completed_systems < self.num_systems { - // SAFETY: - // - self.ready_systems does not contain running systems. - // - `world_cell` has mutable access to the entire world. - unsafe { - self.spawn_system_tasks(scope, systems, &mut conditions, world_cell); - } - - if self.num_running_systems > 0 { - // wait for systems to complete - if let Ok(result) = self.receiver.recv().await { - self.finish_system_and_handle_dependents(result); - } else { - panic!("Channel closed unexpectedly!"); - } - - while let Ok(result) = self.receiver.try_recv() { - self.finish_system_and_handle_dependents(result); - } - - self.rebuild_active_access(); - } - } - }); - - #[cfg(feature = "trace")] - let executor_span = info_span!("multithreaded executor"); - #[cfg(feature = "trace")] - let mut executor = executor.instrument(executor_span); - - // Immediately poll the task once to avoid the overhead of the executor - // and thread wake-up. Only spawn the task if the executor does not immediately - // terminate. - if block_on(poll_once(&mut executor)).is_none() { - scope.spawn(executor); - } + let context = Context { + executor: self, + scope, + systems, + conditions: &conditions, + world_cell: world.as_unsafe_world_cell(), + }; + + // The first tick won't need to process finished systems, but we still need to run the loop in + // tick_executor() in case a system completes while the first tick still holds the mutex. + context.tick_executor(); }, ); - if self.apply_final_deferred { + let state = self.state.get_mut().unwrap(); + if state.apply_final_deferred { // Do one final apply buffers after all systems have completed // Commands should be applied while on the scope's thread, not the executor's thread - let res = apply_deferred(&self.unapplied_systems, systems, world); + let res = apply_deferred(&state.unapplied_systems, systems, world); if let Err(payload) = res { let mut panic_payload = self.panic_payload.lock().unwrap(); *panic_payload = Some(payload); } - self.unapplied_systems.clear(); - debug_assert!(self.unapplied_systems.is_clear()); + state.unapplied_systems.clear(); + debug_assert!(state.unapplied_systems.is_clear()); } // check to see if there was a panic @@ -279,16 +270,64 @@ impl SystemExecutor for MultiThreadedExecutor { std::panic::resume_unwind(payload); } - debug_assert!(self.ready_systems.is_clear()); - debug_assert!(self.running_systems.is_clear()); - self.active_access.clear(); - self.evaluated_sets.clear(); - self.skipped_systems.clear(); - self.completed_systems.clear(); + debug_assert!(state.ready_systems.is_clear()); + debug_assert!(state.running_systems.is_clear()); + state.active_access.clear(); + state.evaluated_sets.clear(); + state.skipped_systems.clear(); + state.completed_systems.clear(); } fn set_apply_final_deferred(&mut self, value: bool) { - self.apply_final_deferred = value; + self.state.get_mut().unwrap().apply_final_deferred = value; + } +} + +impl<'scope, 'env: 'scope> Context<'scope, 'env> { + fn system_completed( + &self, + system_index: usize, + res: Result<(), Box>, + system: &BoxedSystem, + ) { + // tell the executor that the system finished + self.executor + .system_completion + .push(SystemResult { + system_index, + success: res.is_ok(), + }) + .unwrap_or_else(|error| unreachable!("{}", error)); + if let Err(payload) = res { + eprintln!("Encountered a panic in system `{}`!", &*system.name()); + // set the payload to propagate the error + { + let mut panic_payload = self.executor.panic_payload.lock().unwrap(); + *panic_payload = Some(payload); + } + } + self.tick_executor(); + } + + fn tick_executor(&self) { + #[cfg(feature = "trace")] + let _span = info_span!("multithreaded executor").entered(); + // Ensure that the executor handles any events pushed to the system_completion queue by this thread. + // If this thread acquires the lock, the exector runs after the push() and they are processed. + // If this thread does not acquire the lock, then the is_empty() check on the other thread runs + // after the lock is released, which is after try_lock() failed, which is after the push() + // on this thread, so the is_empty() check will see the new events and loop. + loop { + let Ok(mut guard) = self.executor.state.try_lock() else { + return; + }; + guard.tick(self); + // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events. + drop(guard); + if self.executor.system_completion.is_empty() { + return; + } + } } } @@ -297,10 +336,17 @@ impl MultiThreadedExecutor { /// /// [`Schedule`]: crate::schedule::Schedule pub fn new() -> Self { - let (sender, receiver) = async_channel::unbounded(); Self { - sender, - receiver, + state: Mutex::new(ExecutorState::new()), + system_completion: ConcurrentQueue::unbounded(), + panic_payload: Mutex::new(None), + } + } +} + +impl ExecutorState { + fn new() -> Self { + Self { system_task_metadata: Vec::new(), num_systems: 0, num_running_systems: 0, @@ -317,23 +363,31 @@ impl MultiThreadedExecutor { completed_systems: FixedBitSet::new(), unapplied_systems: FixedBitSet::new(), apply_final_deferred: true, - panic_payload: Arc::new(Mutex::new(None)), stop_spawning: false, } } + fn tick(&mut self, context: &Context<'_, '_>) { + for result in context.executor.system_completion.try_iter() { + self.finish_system_and_handle_dependents(result); + } + + self.rebuild_active_access(); + + // SAFETY: + // - self.ready_systems does not contain running systems. + // - `world_cell` has mutable access to the entire world. + unsafe { + self.spawn_system_tasks(context); + } + } + /// # Safety /// - Caller must ensure that `self.ready_systems` does not contain any systems that /// have been mutably borrowed (such as the systems currently running). /// - `world_cell` must have permission to access all world data (not counting /// any world data that is claimed by systems currently running on this executor). - unsafe fn spawn_system_tasks<'scope>( - &mut self, - scope: &Scope<'_, 'scope, ()>, - systems: &'scope [SyncUnsafeCell], - conditions: &mut Conditions, - world_cell: UnsafeWorldCell<'scope>, - ) { + unsafe fn spawn_system_tasks(&mut self, context: &Context<'_, '_>) { if self.exclusive_running { return; } @@ -347,9 +401,9 @@ impl MultiThreadedExecutor { assert!(!self.running_systems.contains(system_index)); // SAFETY: Caller assured that these systems are not running. // Therefore, no other reference to this system exists and there is no aliasing. - let system = unsafe { &mut *systems[system_index].get() }; + let system = unsafe { &mut *context.systems[system_index].get() }; - if !self.can_run(system_index, system, conditions, world_cell) { + if !self.can_run(system_index, system, context.conditions, context.world_cell) { // NOTE: exclusive systems with ambiguities are susceptible to // being significantly displaced here (compared to single-threaded order) // if systems after them in topological order can run @@ -362,7 +416,9 @@ impl MultiThreadedExecutor { // SAFETY: `can_run` returned true, which means that: // - It must have called `update_archetype_component_access` for each run condition. // - There can be no systems running whose accesses would conflict with any conditions. - if unsafe { !self.should_run(system_index, system, conditions, world_cell) } { + if unsafe { + !self.should_run(system_index, system, context.conditions, context.world_cell) + } { self.skip_system_and_signal_dependents(system_index); continue; } @@ -371,13 +427,10 @@ impl MultiThreadedExecutor { self.num_running_systems += 1; if self.system_task_metadata[system_index].is_exclusive { - // SAFETY: `can_run` returned true for this system, which means - // that no other systems currently have access to the world. - let world = unsafe { world_cell.world_mut() }; // SAFETY: `can_run` returned true for this system, // which means no systems are currently borrowed. unsafe { - self.spawn_exclusive_system_task(scope, system_index, systems, world); + self.spawn_exclusive_system_task(context, system_index); } break; } @@ -387,7 +440,7 @@ impl MultiThreadedExecutor { // - `can_run` has been called, which calls `update_archetype_component_access` with this system. // - `can_run` returned true, so no systems with conflicting world access are running. unsafe { - self.spawn_system_task(scope, system_index, systems, world_cell); + self.spawn_system_task(context, system_index); } } @@ -399,7 +452,7 @@ impl MultiThreadedExecutor { &mut self, system_index: usize, system: &mut BoxedSystem, - conditions: &mut Conditions, + conditions: &Conditions, world: UnsafeWorldCell, ) -> bool { let system_meta = &self.system_task_metadata[system_index]; @@ -415,7 +468,9 @@ impl MultiThreadedExecutor { for set_idx in conditions.sets_with_conditions_of_systems[system_index] .difference(&self.evaluated_sets) { - for condition in &mut conditions.set_conditions[set_idx] { + // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions + let set_conditions = unsafe { &mut *conditions.set_conditions.get() }; + for condition in &mut set_conditions[set_idx] { condition.update_archetype_component_access(world); if !condition .archetype_component_access() @@ -426,7 +481,9 @@ impl MultiThreadedExecutor { } } - for condition in &mut conditions.system_conditions[system_index] { + // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions + let system_conditions = unsafe { &mut *conditions.system_conditions.get() }; + for condition in &mut system_conditions[system_index] { condition.update_archetype_component_access(world); if !condition .archetype_component_access() @@ -465,7 +522,7 @@ impl MultiThreadedExecutor { &mut self, system_index: usize, _system: &BoxedSystem, - conditions: &mut Conditions, + conditions: &Conditions, world: UnsafeWorldCell, ) -> bool { let mut should_run = !self.skipped_systems.contains(system_index); @@ -474,14 +531,15 @@ impl MultiThreadedExecutor { continue; } + // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions + let set_conditions = unsafe { &mut *conditions.set_conditions.get() }; // Evaluate the system set's conditions. // SAFETY: // - The caller ensures that `world` has permission to read any data // required by the conditions. // - `update_archetype_component_access` has been called for each run condition. - let set_conditions_met = unsafe { - evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world) - }; + let set_conditions_met = + unsafe { evaluate_and_fold_conditions(&mut set_conditions[set_idx], world) }; if !set_conditions_met { self.skipped_systems @@ -492,14 +550,15 @@ impl MultiThreadedExecutor { self.evaluated_sets.insert(set_idx); } + // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions + let system_conditions = unsafe { &mut *conditions.system_conditions.get() }; // Evaluate the system's conditions. // SAFETY: // - The caller ensures that `world` has permission to read any data // required by the conditions. // - `update_archetype_component_access` has been called for each run condition. - let system_conditions_met = unsafe { - evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world) - }; + let system_conditions_met = + unsafe { evaluate_and_fold_conditions(&mut system_conditions[system_index], world) }; if !system_conditions_met { self.skipped_systems.insert(system_index); @@ -516,132 +575,84 @@ impl MultiThreadedExecutor { /// used by the specified system. /// - `update_archetype_component_access` must have been called with `world` /// on the system associated with `system_index`. - unsafe fn spawn_system_task<'scope>( - &mut self, - scope: &Scope<'_, 'scope, ()>, - system_index: usize, - systems: &'scope [SyncUnsafeCell], - world: UnsafeWorldCell<'scope>, - ) { + unsafe fn spawn_system_task(&mut self, context: &Context<'_, '_>, system_index: usize) { // SAFETY: this system is not running, no other reference exists - let system = unsafe { &mut *systems[system_index].get() }; - let sender = self.sender.clone(); - let panic_payload = self.panic_payload.clone(); + let system = unsafe { &mut *context.systems[system_index].get() }; + // Move the full context object into the new future. + let context = *context; + + let system_meta = &self.system_task_metadata[system_index]; + + #[cfg(feature = "trace")] + let system_span = system_meta.system_task_span.clone(); let task = async move { let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + #[cfg(feature = "trace")] + let _span = system_span.enter(); // SAFETY: // - The caller ensures that we have permission to // access the world data used by the system. // - `update_archetype_component_access` has been called. - unsafe { system.run_unsafe((), world) }; + unsafe { system.run_unsafe((), context.world_cell) }; })); - // tell the executor that the system finished - sender - .try_send(SystemResult { - system_index, - success: res.is_ok(), - }) - .unwrap_or_else(|error| unreachable!("{}", error)); - if let Err(payload) = res { - eprintln!("Encountered a panic in system `{}`!", &*system.name()); - // set the payload to propagate the error - { - let mut panic_payload = panic_payload.lock().unwrap(); - *panic_payload = Some(payload); - } - } + context.system_completed(system_index, res, system); }; - #[cfg(feature = "trace")] - let task = task.instrument( - self.system_task_metadata[system_index] - .system_task_span - .clone(), - ); - - let system_meta = &self.system_task_metadata[system_index]; self.active_access .extend(&system_meta.archetype_component_access); if system_meta.is_send { - scope.spawn(task); + context.scope.spawn(task); } else { self.local_thread_running = true; - scope.spawn_on_external(task); + context.scope.spawn_on_external(task); } } /// # Safety /// Caller must ensure no systems are currently borrowed. - unsafe fn spawn_exclusive_system_task<'scope>( + unsafe fn spawn_exclusive_system_task( &mut self, - scope: &Scope<'_, 'scope, ()>, + context: &Context<'_, '_>, system_index: usize, - systems: &'scope [SyncUnsafeCell], - world: &'scope mut World, ) { + // SAFETY: `can_run` returned true for this system, which means + // that no other systems currently have access to the world. + let world = unsafe { context.world_cell.world_mut() }; // SAFETY: this system is not running, no other reference exists - let system = unsafe { &mut *systems[system_index].get() }; + let system = unsafe { &mut *context.systems[system_index].get() }; + // Move the full context object into the new future. + let context = *context; + #[cfg(feature = "trace")] + let system_span = self.system_task_metadata[system_index] + .system_task_span + .clone(); - let sender = self.sender.clone(); - let panic_payload = self.panic_payload.clone(); if is_apply_deferred(system) { // TODO: avoid allocation let unapplied_systems = self.unapplied_systems.clone(); self.unapplied_systems.clear(); let task = async move { - let res = apply_deferred(&unapplied_systems, systems, world); - // tell the executor that the system finished - sender - .try_send(SystemResult { - system_index, - success: res.is_ok(), - }) - .unwrap_or_else(|error| unreachable!("{}", error)); - if let Err(payload) = res { - // set the payload to propagate the error - let mut panic_payload = panic_payload.lock().unwrap(); - *panic_payload = Some(payload); - } + let res = { + #[cfg(feature = "trace")] + let _span = system_span.enter(); + apply_deferred(&unapplied_systems, context.systems, world) + }; + context.system_completed(system_index, res, system); }; - #[cfg(feature = "trace")] - let task = task.instrument( - self.system_task_metadata[system_index] - .system_task_span - .clone(), - ); - scope.spawn_on_scope(task); + context.scope.spawn_on_scope(task); } else { let task = async move { let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + #[cfg(feature = "trace")] + let _span = system_span.enter(); system.run((), world); })); - // tell the executor that the system finished - sender - .try_send(SystemResult { - system_index, - success: res.is_ok(), - }) - .unwrap_or_else(|error| unreachable!("{}", error)); - if let Err(payload) = res { - eprintln!( - "Encountered a panic in exclusive system `{}`!", - &*system.name() - ); - // set the payload to propagate the error - let mut panic_payload = panic_payload.lock().unwrap(); - *panic_payload = Some(payload); - } + context.system_completed(system_index, res, system); }; - #[cfg(feature = "trace")] - let task = task.instrument( - self.system_task_metadata[system_index] - .system_task_span - .clone(), - ); - scope.spawn_on_scope(task); + context.scope.spawn_on_scope(task); } self.exclusive_running = true; diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 1be0b0da174e3..f3837c4766fae 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -202,7 +202,7 @@ impl FakeTask { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'env async_executor::LocalExecutor<'env>, + executor: &'scope async_executor::LocalExecutor<'scope>, // Vector to gather results of all futures spawned during scope run results: &'env RefCell>>>>, @@ -219,7 +219,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'env>(&self, f: Fut) { + pub fn spawn + 'scope>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -230,7 +230,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_external + 'env>(&self, f: Fut) { + pub fn spawn_on_external + 'scope>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -239,7 +239,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// returned as a part of [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope + 'env>(&self, f: Fut) { + pub fn spawn_on_scope + 'scope>(&self, f: Fut) { let result = Rc::new(RefCell::new(None)); self.results.borrow_mut().push(result.clone()); let f = async move { From 7fb531d47c05a3ff8ecfb7e79b8a8b91b851e64d Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Fri, 16 Feb 2024 14:35:09 -0500 Subject: [PATCH 02/11] If systems are skipped, immediately check whether their dependencies are runnable. --- .../src/schedule/executor/multi_threaded.rs | 126 ++++++++++++------ 1 file changed, 85 insertions(+), 41 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 1bc4e541160bb..a67be9e3a953d 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -394,53 +394,64 @@ impl ExecutorState { // can't borrow since loop mutably borrows `self` let mut ready_systems = std::mem::take(&mut self.ready_systems_copy); - ready_systems.clear(); - ready_systems.union_with(&self.ready_systems); - - for system_index in ready_systems.ones() { - assert!(!self.running_systems.contains(system_index)); - // SAFETY: Caller assured that these systems are not running. - // Therefore, no other reference to this system exists and there is no aliasing. - let system = unsafe { &mut *context.systems[system_index].get() }; - - if !self.can_run(system_index, system, context.conditions, context.world_cell) { - // NOTE: exclusive systems with ambiguities are susceptible to - // being significantly displaced here (compared to single-threaded order) - // if systems after them in topological order can run - // if that becomes an issue, `break;` if exclusive system - continue; - } - self.ready_systems.set(system_index, false); + // Skipping systems may cause their dependents to become ready immediately. + // If that happens, we need to run again immediately or we may fail to spawn those dependents. + let mut check_for_new_ready_systems = true; + while check_for_new_ready_systems { + check_for_new_ready_systems = false; + + ready_systems.clear(); + ready_systems.union_with(&self.ready_systems); + + for system_index in ready_systems.ones() { + assert!(!self.running_systems.contains(system_index)); + // SAFETY: Caller assured that these systems are not running. + // Therefore, no other reference to this system exists and there is no aliasing. + let system = unsafe { &mut *context.systems[system_index].get() }; + + if !self.can_run(system_index, system, context.conditions, context.world_cell) { + // NOTE: exclusive systems with ambiguities are susceptible to + // being significantly displaced here (compared to single-threaded order) + // if systems after them in topological order can run + // if that becomes an issue, `break;` if exclusive system + continue; + } - // SAFETY: `can_run` returned true, which means that: - // - It must have called `update_archetype_component_access` for each run condition. - // - There can be no systems running whose accesses would conflict with any conditions. - if unsafe { - !self.should_run(system_index, system, context.conditions, context.world_cell) - } { - self.skip_system_and_signal_dependents(system_index); - continue; - } + self.ready_systems.set(system_index, false); + + // SAFETY: `can_run` returned true, which means that: + // - It must have called `update_archetype_component_access` for each run condition. + // - There can be no systems running whose accesses would conflict with any conditions. + if unsafe { + !self.should_run(system_index, system, context.conditions, context.world_cell) + } { + self.skip_system_and_signal_dependents(system_index); + // signal_dependents may have set more systems to ready. + check_for_new_ready_systems = true; + continue; + } - self.running_systems.insert(system_index); - self.num_running_systems += 1; + self.running_systems.insert(system_index); + self.num_running_systems += 1; + + if self.system_task_metadata[system_index].is_exclusive { + // SAFETY: `can_run` returned true for this system, + // which means no systems are currently borrowed. + unsafe { + self.spawn_exclusive_system_task(context, system_index); + } + check_for_new_ready_systems = false; + break; + } - if self.system_task_metadata[system_index].is_exclusive { - // SAFETY: `can_run` returned true for this system, - // which means no systems are currently borrowed. + // SAFETY: + // - No other reference to this system exists. + // - `can_run` has been called, which calls `update_archetype_component_access` with this system. + // - `can_run` returned true, so no systems with conflicting world access are running. unsafe { - self.spawn_exclusive_system_task(context, system_index); + self.spawn_system_task(context, system_index); } - break; - } - - // SAFETY: - // - No other reference to this system exists. - // - `can_run` has been called, which calls `update_archetype_component_access` with this system. - // - `can_run` returned true, so no systems with conflicting world access are running. - unsafe { - self.spawn_system_task(context, system_index); } } @@ -780,3 +791,36 @@ impl MainThreadExecutor { MainThreadExecutor(TaskPool::get_thread_executor()) } } + +#[cfg(test)] +mod tests { + use crate::{ + self as bevy_ecs, + prelude::Resource, + schedule::{ExecutorKind, IntoSystemConfigs, Schedule}, + system::Commands, + world::World, + }; + + #[derive(Resource)] + struct R; + + #[test] + fn skipped_systems_notify_dependents() { + let mut world = World::new(); + let mut schedule = Schedule::default(); + schedule.set_executor_kind(ExecutorKind::MultiThreaded); + schedule.add_systems( + ( + (|| {}).run_if(|| false), + // This system depends on a sytem that is always skipped. + |mut commands: Commands| { + commands.insert_resource(R); + }, + ) + .chain(), + ); + schedule.run(&mut world); + assert!(world.get_resource::().is_some()); + } +} From 02bb40a18c06ff3815d8b291554ec361a789ed8f Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Sun, 18 Feb 2024 20:05:10 -0500 Subject: [PATCH 03/11] Move executor tracing span inside lock guard, and cache it. --- .../bevy_ecs/src/schedule/executor/multi_threaded.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index a67be9e3a953d..08111d3b82b25 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -83,6 +83,9 @@ pub struct MultiThreadedExecutor { system_completion: ConcurrentQueue, /// When set, tells the executor that a thread has panicked. panic_payload: Mutex>>, + /// Cached tracing span + #[cfg(feature = "trace")] + executor_span: Span, } /// The state of the executor while running. @@ -310,8 +313,6 @@ impl<'scope, 'env: 'scope> Context<'scope, 'env> { } fn tick_executor(&self) { - #[cfg(feature = "trace")] - let _span = info_span!("multithreaded executor").entered(); // Ensure that the executor handles any events pushed to the system_completion queue by this thread. // If this thread acquires the lock, the exector runs after the push() and they are processed. // If this thread does not acquire the lock, then the is_empty() check on the other thread runs @@ -340,6 +341,8 @@ impl MultiThreadedExecutor { state: Mutex::new(ExecutorState::new()), system_completion: ConcurrentQueue::unbounded(), panic_payload: Mutex::new(None), + #[cfg(feature = "trace")] + executor_span: info_span!("multithreaded executor"), } } } @@ -368,6 +371,9 @@ impl ExecutorState { } fn tick(&mut self, context: &Context<'_, '_>) { + #[cfg(feature = "trace")] + let _span = context.executor.executor_span.enter(); + for result in context.executor.system_completion.try_iter() { self.finish_system_and_handle_dependents(result); } From 241d659285d319333c35e0a333761d412659fd45 Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Mon, 19 Feb 2024 13:05:30 -0500 Subject: [PATCH 04/11] Hold Conditions in a Mutex to avoid the need for unsafe synchronization. --- .../src/schedule/executor/multi_threaded.rs | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 08111d3b82b25..a90bd85901902 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -31,8 +31,8 @@ struct SyncUnsafeSchedule<'a> { } struct Conditions<'a> { - system_conditions: &'a SyncUnsafeCell<[Vec]>, - set_conditions: &'a SyncUnsafeCell<[Vec]>, + system_conditions: &'a mut [Vec], + set_conditions: &'a mut [Vec], sets_with_conditions_of_systems: &'a [FixedBitSet], systems_in_sets_with_conditions: &'a [FixedBitSet], } @@ -42,10 +42,8 @@ impl SyncUnsafeSchedule<'_> { SyncUnsafeSchedule { systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(), conditions: Conditions { - system_conditions: SyncUnsafeCell::from_mut( - schedule.system_conditions.as_mut_slice(), - ), - set_conditions: SyncUnsafeCell::from_mut(schedule.set_conditions.as_mut_slice()), + system_conditions: &mut schedule.system_conditions, + set_conditions: &mut schedule.set_conditions, sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems, systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions, }, @@ -135,7 +133,7 @@ pub struct Context<'scope, 'env: 'scope> { executor: &'env MultiThreadedExecutor, scope: &'scope Scope<'scope, 'env, ()>, systems: &'env [SyncUnsafeCell], - conditions: &'env Conditions<'env>, + conditions: &'env Mutex>, world_cell: UnsafeWorldCell<'env>, } @@ -235,6 +233,7 @@ impl SystemExecutor for MultiThreadedExecutor { systems, conditions, } = SyncUnsafeSchedule::new(schedule); + let conditions = Mutex::new(conditions); ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor( false, @@ -398,6 +397,11 @@ impl ExecutorState { return; } + let mut conditions = context + .conditions + .try_lock() + .expect("Conditions should only be locked while owning the executor state"); + // can't borrow since loop mutably borrows `self` let mut ready_systems = std::mem::take(&mut self.ready_systems_copy); @@ -416,7 +420,7 @@ impl ExecutorState { // Therefore, no other reference to this system exists and there is no aliasing. let system = unsafe { &mut *context.systems[system_index].get() }; - if !self.can_run(system_index, system, context.conditions, context.world_cell) { + if !self.can_run(system_index, system, &mut conditions, context.world_cell) { // NOTE: exclusive systems with ambiguities are susceptible to // being significantly displaced here (compared to single-threaded order) // if systems after them in topological order can run @@ -430,7 +434,7 @@ impl ExecutorState { // - It must have called `update_archetype_component_access` for each run condition. // - There can be no systems running whose accesses would conflict with any conditions. if unsafe { - !self.should_run(system_index, system, context.conditions, context.world_cell) + !self.should_run(system_index, system, &mut conditions, context.world_cell) } { self.skip_system_and_signal_dependents(system_index); // signal_dependents may have set more systems to ready. @@ -469,7 +473,7 @@ impl ExecutorState { &mut self, system_index: usize, system: &mut BoxedSystem, - conditions: &Conditions, + conditions: &mut Conditions, world: UnsafeWorldCell, ) -> bool { let system_meta = &self.system_task_metadata[system_index]; @@ -485,9 +489,7 @@ impl ExecutorState { for set_idx in conditions.sets_with_conditions_of_systems[system_index] .difference(&self.evaluated_sets) { - // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions - let set_conditions = unsafe { &mut *conditions.set_conditions.get() }; - for condition in &mut set_conditions[set_idx] { + for condition in &mut conditions.set_conditions[set_idx] { condition.update_archetype_component_access(world); if !condition .archetype_component_access() @@ -498,9 +500,7 @@ impl ExecutorState { } } - // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions - let system_conditions = unsafe { &mut *conditions.system_conditions.get() }; - for condition in &mut system_conditions[system_index] { + for condition in &mut conditions.system_conditions[system_index] { condition.update_archetype_component_access(world); if !condition .archetype_component_access() @@ -539,7 +539,7 @@ impl ExecutorState { &mut self, system_index: usize, _system: &BoxedSystem, - conditions: &Conditions, + conditions: &mut Conditions, world: UnsafeWorldCell, ) -> bool { let mut should_run = !self.skipped_systems.contains(system_index); @@ -548,15 +548,14 @@ impl ExecutorState { continue; } - // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions - let set_conditions = unsafe { &mut *conditions.set_conditions.get() }; // Evaluate the system set's conditions. // SAFETY: // - The caller ensures that `world` has permission to read any data // required by the conditions. // - `update_archetype_component_access` has been called for each run condition. - let set_conditions_met = - unsafe { evaluate_and_fold_conditions(&mut set_conditions[set_idx], world) }; + let set_conditions_met = unsafe { + evaluate_and_fold_conditions(&mut conditions.set_conditions[set_idx], world) + }; if !set_conditions_met { self.skipped_systems @@ -567,15 +566,14 @@ impl ExecutorState { self.evaluated_sets.insert(set_idx); } - // SAFETY: We have exclusive access to ExecutorState, so no other thread is touching these conditions - let system_conditions = unsafe { &mut *conditions.system_conditions.get() }; // Evaluate the system's conditions. // SAFETY: // - The caller ensures that `world` has permission to read any data // required by the conditions. // - `update_archetype_component_access` has been called for each run condition. - let system_conditions_met = - unsafe { evaluate_and_fold_conditions(&mut system_conditions[system_index], world) }; + let system_conditions_met = unsafe { + evaluate_and_fold_conditions(&mut conditions.system_conditions[system_index], world) + }; if !system_conditions_met { self.skipped_systems.insert(system_index); From a7aebd699efe7e2210d6c14da60dea6d9da5aa0c Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Mon, 19 Feb 2024 13:10:33 -0500 Subject: [PATCH 05/11] Combine SyncUnsafeSchedule with the rest of the environment, and only store a single pointer to it in Context. --- .../src/schedule/executor/multi_threaded.rs | 102 ++++++++++-------- 1 file changed, 56 insertions(+), 46 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index a90bd85901902..a276e5558c171 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -24,10 +24,12 @@ use crate::{ use crate as bevy_ecs; -/// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`]. -struct SyncUnsafeSchedule<'a> { - systems: &'a [SyncUnsafeCell], - conditions: Conditions<'a>, +/// Borrowed data used by the [`MultiThreadedExecutor`]. +struct Environment<'env, 'sys> { + executor: &'env MultiThreadedExecutor, + systems: &'sys [SyncUnsafeCell], + conditions: Mutex>, + world_cell: UnsafeWorldCell<'env>, } struct Conditions<'a> { @@ -37,16 +39,22 @@ struct Conditions<'a> { systems_in_sets_with_conditions: &'a [FixedBitSet], } -impl SyncUnsafeSchedule<'_> { - fn new(schedule: &mut SystemSchedule) -> SyncUnsafeSchedule<'_> { - SyncUnsafeSchedule { +impl<'env, 'sys> Environment<'env, 'sys> { + fn new( + executor: &'env MultiThreadedExecutor, + schedule: &'sys mut SystemSchedule, + world: &'env mut World, + ) -> Self { + Environment { + executor, systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(), - conditions: Conditions { + conditions: Mutex::new(Conditions { system_conditions: &mut schedule.system_conditions, set_conditions: &mut schedule.set_conditions, sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems, systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions, - }, + }), + world_cell: world.as_unsafe_world_cell(), } } } @@ -129,12 +137,9 @@ pub struct ExecutorState { // These all need to outlive 'scope in order to be sent to new tasks, // and keeping them all in a struct means we can use lifetime elision. #[derive(Copy, Clone)] -pub struct Context<'scope, 'env: 'scope> { - executor: &'env MultiThreadedExecutor, +pub struct Context<'scope, 'env: 'scope, 'sys> { + environment: &'env Environment<'env, 'sys>, scope: &'scope Scope<'scope, 'env, ()>, - systems: &'env [SyncUnsafeCell], - conditions: &'env Mutex>, - world_cell: UnsafeWorldCell<'env>, } impl Default for MultiThreadedExecutor { @@ -229,23 +234,13 @@ impl SystemExecutor for MultiThreadedExecutor { .map(|e| e.0.clone()); let thread_executor = thread_executor.as_deref(); - let SyncUnsafeSchedule { - systems, - conditions, - } = SyncUnsafeSchedule::new(schedule); - let conditions = Mutex::new(conditions); + let environment = &Environment::new(self, schedule, world); ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor( false, thread_executor, |scope| { - let context = Context { - executor: self, - scope, - systems, - conditions: &conditions, - world_cell: world.as_unsafe_world_cell(), - }; + let context = Context { environment, scope }; // The first tick won't need to process finished systems, but we still need to run the loop in // tick_executor() in case a system completes while the first tick still holds the mutex. @@ -253,6 +248,9 @@ impl SystemExecutor for MultiThreadedExecutor { }, ); + // End the borrows of self and world in environment by copying out the reference to systems. + let systems = environment.systems; + let state = self.state.get_mut().unwrap(); if state.apply_final_deferred { // Do one final apply buffers after all systems have completed @@ -285,7 +283,7 @@ impl SystemExecutor for MultiThreadedExecutor { } } -impl<'scope, 'env: 'scope> Context<'scope, 'env> { +impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> { fn system_completed( &self, system_index: usize, @@ -293,7 +291,8 @@ impl<'scope, 'env: 'scope> Context<'scope, 'env> { system: &BoxedSystem, ) { // tell the executor that the system finished - self.executor + self.environment + .executor .system_completion .push(SystemResult { system_index, @@ -304,7 +303,7 @@ impl<'scope, 'env: 'scope> Context<'scope, 'env> { eprintln!("Encountered a panic in system `{}`!", &*system.name()); // set the payload to propagate the error { - let mut panic_payload = self.executor.panic_payload.lock().unwrap(); + let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap(); *panic_payload = Some(payload); } } @@ -318,13 +317,13 @@ impl<'scope, 'env: 'scope> Context<'scope, 'env> { // after the lock is released, which is after try_lock() failed, which is after the push() // on this thread, so the is_empty() check will see the new events and loop. loop { - let Ok(mut guard) = self.executor.state.try_lock() else { + let Ok(mut guard) = self.environment.executor.state.try_lock() else { return; }; guard.tick(self); // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events. drop(guard); - if self.executor.system_completion.is_empty() { + if self.environment.executor.system_completion.is_empty() { return; } } @@ -369,11 +368,11 @@ impl ExecutorState { } } - fn tick(&mut self, context: &Context<'_, '_>) { + fn tick(&mut self, context: &Context<'_, '_, '_>) { #[cfg(feature = "trace")] - let _span = context.executor.executor_span.enter(); + let _span = context.environment.executor.executor_span.enter(); - for result in context.executor.system_completion.try_iter() { + for result in context.environment.executor.system_completion.try_iter() { self.finish_system_and_handle_dependents(result); } @@ -392,12 +391,13 @@ impl ExecutorState { /// have been mutably borrowed (such as the systems currently running). /// - `world_cell` must have permission to access all world data (not counting /// any world data that is claimed by systems currently running on this executor). - unsafe fn spawn_system_tasks(&mut self, context: &Context<'_, '_>) { + unsafe fn spawn_system_tasks(&mut self, context: &Context<'_, '_, '_>) { if self.exclusive_running { return; } let mut conditions = context + .environment .conditions .try_lock() .expect("Conditions should only be locked while owning the executor state"); @@ -418,9 +418,14 @@ impl ExecutorState { assert!(!self.running_systems.contains(system_index)); // SAFETY: Caller assured that these systems are not running. // Therefore, no other reference to this system exists and there is no aliasing. - let system = unsafe { &mut *context.systems[system_index].get() }; - - if !self.can_run(system_index, system, &mut conditions, context.world_cell) { + let system = unsafe { &mut *context.environment.systems[system_index].get() }; + + if !self.can_run( + system_index, + system, + &mut conditions, + context.environment.world_cell, + ) { // NOTE: exclusive systems with ambiguities are susceptible to // being significantly displaced here (compared to single-threaded order) // if systems after them in topological order can run @@ -434,7 +439,12 @@ impl ExecutorState { // - It must have called `update_archetype_component_access` for each run condition. // - There can be no systems running whose accesses would conflict with any conditions. if unsafe { - !self.should_run(system_index, system, &mut conditions, context.world_cell) + !self.should_run( + system_index, + system, + &mut conditions, + context.environment.world_cell, + ) } { self.skip_system_and_signal_dependents(system_index); // signal_dependents may have set more systems to ready. @@ -590,9 +600,9 @@ impl ExecutorState { /// used by the specified system. /// - `update_archetype_component_access` must have been called with `world` /// on the system associated with `system_index`. - unsafe fn spawn_system_task(&mut self, context: &Context<'_, '_>, system_index: usize) { + unsafe fn spawn_system_task(&mut self, context: &Context<'_, '_, '_>, system_index: usize) { // SAFETY: this system is not running, no other reference exists - let system = unsafe { &mut *context.systems[system_index].get() }; + let system = unsafe { &mut *context.environment.systems[system_index].get() }; // Move the full context object into the new future. let context = *context; @@ -608,7 +618,7 @@ impl ExecutorState { // - The caller ensures that we have permission to // access the world data used by the system. // - `update_archetype_component_access` has been called. - unsafe { system.run_unsafe((), context.world_cell) }; + unsafe { system.run_unsafe((), context.environment.world_cell) }; })); context.system_completed(system_index, res, system); }; @@ -628,14 +638,14 @@ impl ExecutorState { /// Caller must ensure no systems are currently borrowed. unsafe fn spawn_exclusive_system_task( &mut self, - context: &Context<'_, '_>, + context: &Context<'_, '_, '_>, system_index: usize, ) { // SAFETY: `can_run` returned true for this system, which means // that no other systems currently have access to the world. - let world = unsafe { context.world_cell.world_mut() }; + let world = unsafe { context.environment.world_cell.world_mut() }; // SAFETY: this system is not running, no other reference exists - let system = unsafe { &mut *context.systems[system_index].get() }; + let system = unsafe { &mut *context.environment.systems[system_index].get() }; // Move the full context object into the new future. let context = *context; #[cfg(feature = "trace")] @@ -651,7 +661,7 @@ impl ExecutorState { let res = { #[cfg(feature = "trace")] let _span = system_span.enter(); - apply_deferred(&unapplied_systems, context.systems, world) + apply_deferred(&unapplied_systems, context.environment.systems, world) }; context.system_completed(system_index, res, system); }; From 8b5a37c0144c1ca19652da0df932ca22dfe43178 Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Tue, 20 Feb 2024 13:40:51 -0500 Subject: [PATCH 06/11] Context doesn't need to be pub Co-authored-by: James Liu --- crates/bevy_ecs/src/schedule/executor/multi_threaded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index a276e5558c171..41ddbea4642e8 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -137,7 +137,7 @@ pub struct ExecutorState { // These all need to outlive 'scope in order to be sent to new tasks, // and keeping them all in a struct means we can use lifetime elision. #[derive(Copy, Clone)] -pub struct Context<'scope, 'env: 'scope, 'sys> { +struct Context<'scope, 'env: 'scope, 'sys> { environment: &'env Environment<'env, 'sys>, scope: &'scope Scope<'scope, 'env, ()>, } From f11d4845b0145719bc8a2f8072db0acddb6c1e37 Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Fri, 23 Feb 2024 14:29:26 -0500 Subject: [PATCH 07/11] Apply suggestions from code review Co-authored-by: Mike --- crates/bevy_ecs/src/schedule/executor/multi_threaded.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 41ddbea4642e8..6bab26fe0d7d8 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -379,8 +379,8 @@ impl ExecutorState { self.rebuild_active_access(); // SAFETY: - // - self.ready_systems does not contain running systems. - // - `world_cell` has mutable access to the entire world. + // - `finish_system_and_handle_dependents` has updated the currently running systems. + // - `rebuild_active_access` locks access for all currently running systems. unsafe { self.spawn_system_tasks(context); } From f403b7b23eb1bfe81bd695fd788c38ee99fd3d4b Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Fri, 23 Feb 2024 14:30:15 -0500 Subject: [PATCH 08/11] Update crates/bevy_ecs/src/schedule/executor/multi_threaded.rs Co-authored-by: Mike --- crates/bevy_ecs/src/schedule/executor/multi_threaded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 6bab26fe0d7d8..4ce27f00ce884 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -466,7 +466,7 @@ impl ExecutorState { } // SAFETY: - // - No other reference to this system exists. + // - Caller ensured no other reference to this system exists. // - `can_run` has been called, which calls `update_archetype_component_access` with this system. // - `can_run` returned true, so no systems with conflicting world access are running. unsafe { From 609967550fcfeac5524bbeb32f740228c5302209 Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Fri, 23 Feb 2024 13:44:32 -0500 Subject: [PATCH 09/11] Move apply_final_deferred out of ExecutorState since it's never modified while running the schedule. --- .../bevy_ecs/src/schedule/executor/multi_threaded.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 4ce27f00ce884..7763712fa209c 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -87,6 +87,8 @@ pub struct MultiThreadedExecutor { state: Mutex, /// Queue of system completion events. system_completion: ConcurrentQueue, + /// Setting when true applies deferred system buffers after all systems have run + apply_final_deferred: bool, /// When set, tells the executor that a thread has panicked. panic_payload: Mutex>>, /// Cached tracing span @@ -126,8 +128,6 @@ pub struct ExecutorState { completed_systems: FixedBitSet, /// Systems that have run but have not had their buffers applied. unapplied_systems: FixedBitSet, - /// Setting when true applies deferred system buffers after all systems have run - apply_final_deferred: bool, /// When set, stops the executor from running any more systems. stop_spawning: bool, } @@ -252,7 +252,7 @@ impl SystemExecutor for MultiThreadedExecutor { let systems = environment.systems; let state = self.state.get_mut().unwrap(); - if state.apply_final_deferred { + if self.apply_final_deferred { // Do one final apply buffers after all systems have completed // Commands should be applied while on the scope's thread, not the executor's thread let res = apply_deferred(&state.unapplied_systems, systems, world); @@ -279,7 +279,7 @@ impl SystemExecutor for MultiThreadedExecutor { } fn set_apply_final_deferred(&mut self, value: bool) { - self.state.get_mut().unwrap().apply_final_deferred = value; + self.apply_final_deferred = value; } } @@ -338,6 +338,7 @@ impl MultiThreadedExecutor { Self { state: Mutex::new(ExecutorState::new()), system_completion: ConcurrentQueue::unbounded(), + apply_final_deferred: true, panic_payload: Mutex::new(None), #[cfg(feature = "trace")] executor_span: info_span!("multithreaded executor"), @@ -363,7 +364,6 @@ impl ExecutorState { skipped_systems: FixedBitSet::new(), completed_systems: FixedBitSet::new(), unapplied_systems: FixedBitSet::new(), - apply_final_deferred: true, stop_spawning: false, } } From f5a00b181c97743c05913745a8e2bb4f2b56aeb1 Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Fri, 23 Feb 2024 13:45:22 -0500 Subject: [PATCH 10/11] Finish incomplete comment. --- crates/bevy_ecs/src/schedule/executor/multi_threaded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 7763712fa209c..f027862ce58ee 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -83,7 +83,7 @@ struct SystemResult { /// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel. pub struct MultiThreadedExecutor { - /// The running state, protected by a mutex so that the . + /// The running state, protected by a mutex so that a reference to the executor can be shared across tasks. state: Mutex, /// Queue of system completion events. system_completion: ConcurrentQueue, From 708acac02dafd0c58a8dabbfd09cc433d11d7ad8 Mon Sep 17 00:00:00 2001 From: Chris Russell <8494645+chescock@users.noreply.github.com> Date: Fri, 23 Feb 2024 14:41:40 -0500 Subject: [PATCH 11/11] Remove unnecessary lifetime annotations. --- .../src/schedule/executor/multi_threaded.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index f027862ce58ee..7546abdcb5e7d 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -137,7 +137,7 @@ pub struct ExecutorState { // These all need to outlive 'scope in order to be sent to new tasks, // and keeping them all in a struct means we can use lifetime elision. #[derive(Copy, Clone)] -struct Context<'scope, 'env: 'scope, 'sys> { +struct Context<'scope, 'env, 'sys> { environment: &'env Environment<'env, 'sys>, scope: &'scope Scope<'scope, 'env, ()>, } @@ -368,7 +368,7 @@ impl ExecutorState { } } - fn tick(&mut self, context: &Context<'_, '_, '_>) { + fn tick(&mut self, context: &Context) { #[cfg(feature = "trace")] let _span = context.environment.executor.executor_span.enter(); @@ -391,7 +391,7 @@ impl ExecutorState { /// have been mutably borrowed (such as the systems currently running). /// - `world_cell` must have permission to access all world data (not counting /// any world data that is claimed by systems currently running on this executor). - unsafe fn spawn_system_tasks(&mut self, context: &Context<'_, '_, '_>) { + unsafe fn spawn_system_tasks(&mut self, context: &Context) { if self.exclusive_running { return; } @@ -600,7 +600,7 @@ impl ExecutorState { /// used by the specified system. /// - `update_archetype_component_access` must have been called with `world` /// on the system associated with `system_index`. - unsafe fn spawn_system_task(&mut self, context: &Context<'_, '_, '_>, system_index: usize) { + unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) { // SAFETY: this system is not running, no other reference exists let system = unsafe { &mut *context.environment.systems[system_index].get() }; // Move the full context object into the new future. @@ -636,11 +636,7 @@ impl ExecutorState { /// # Safety /// Caller must ensure no systems are currently borrowed. - unsafe fn spawn_exclusive_system_task( - &mut self, - context: &Context<'_, '_, '_>, - system_index: usize, - ) { + unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) { // SAFETY: `can_run` returned true for this system, which means // that no other systems currently have access to the world. let world = unsafe { context.environment.world_cell.world_mut() };