diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index e368c1398963e..1e8f5fe1fe033 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -22,7 +22,7 @@ bevy_tasks = { path = "../bevy_tasks", version = "0.14.0-dev" } bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" } bevy_ecs_macros = { path = "macros", version = "0.14.0-dev" } -async-channel = "2.2.0" +concurrent-queue = "2.4.0" fixedbitset = "0.4.2" rustc-hash = "1.1" downcast-rs = "1.2" diff --git a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs index 1e00c3893053e..7546abdcb5e7d 100644 --- a/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs +++ b/crates/bevy_ecs/src/schedule/executor/multi_threaded.rs @@ -3,14 +3,14 @@ use std::{ sync::{Arc, Mutex}, }; -use bevy_tasks::{block_on, poll_once, ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; use bevy_utils::default; use bevy_utils::syncunsafecell::SyncUnsafeCell; #[cfg(feature = "trace")] -use bevy_utils::tracing::{info_span, Instrument, Span}; +use bevy_utils::tracing::{info_span, Span}; use std::panic::AssertUnwindSafe; -use async_channel::{Receiver, Sender}; +use concurrent_queue::ConcurrentQueue; use fixedbitset::FixedBitSet; use crate::{ @@ -24,10 +24,12 @@ use crate::{ use crate as bevy_ecs; -/// A funky borrow split of [`SystemSchedule`] required by the [`MultiThreadedExecutor`]. -struct SyncUnsafeSchedule<'a> { - systems: &'a [SyncUnsafeCell], - conditions: Conditions<'a>, +/// Borrowed data used by the [`MultiThreadedExecutor`]. +struct Environment<'env, 'sys> { + executor: &'env MultiThreadedExecutor, + systems: &'sys [SyncUnsafeCell], + conditions: Mutex>, + world_cell: UnsafeWorldCell<'env>, } struct Conditions<'a> { @@ -37,16 +39,22 @@ struct Conditions<'a> { systems_in_sets_with_conditions: &'a [FixedBitSet], } -impl SyncUnsafeSchedule<'_> { - fn new(schedule: &mut SystemSchedule) -> SyncUnsafeSchedule<'_> { - SyncUnsafeSchedule { +impl<'env, 'sys> Environment<'env, 'sys> { + fn new( + executor: &'env MultiThreadedExecutor, + schedule: &'sys mut SystemSchedule, + world: &'env mut World, + ) -> Self { + Environment { + executor, systems: SyncUnsafeCell::from_mut(schedule.systems.as_mut_slice()).as_slice_of_cells(), - conditions: Conditions { + conditions: Mutex::new(Conditions { system_conditions: &mut schedule.system_conditions, set_conditions: &mut schedule.set_conditions, sets_with_conditions_of_systems: &schedule.sets_with_conditions_of_systems, systems_in_sets_with_conditions: &schedule.systems_in_sets_with_conditions, - }, + }), + world_cell: world.as_unsafe_world_cell(), } } } @@ -75,10 +83,21 @@ struct SystemResult { /// Runs the schedule using a thread pool. Non-conflicting systems can run in parallel. pub struct MultiThreadedExecutor { - /// Sends system completion events. - sender: Sender, - /// Receives system completion events. - receiver: Receiver, + /// The running state, protected by a mutex so that a reference to the executor can be shared across tasks. + state: Mutex, + /// Queue of system completion events. + system_completion: ConcurrentQueue, + /// Setting when true applies deferred system buffers after all systems have run + apply_final_deferred: bool, + /// When set, tells the executor that a thread has panicked. + panic_payload: Mutex>>, + /// Cached tracing span + #[cfg(feature = "trace")] + executor_span: Span, +} + +/// The state of the executor while running. +pub struct ExecutorState { /// Metadata for scheduling and running system tasks. system_task_metadata: Vec, /// Union of the accesses of all currently running systems. @@ -109,14 +128,20 @@ pub struct MultiThreadedExecutor { completed_systems: FixedBitSet, /// Systems that have run but have not had their buffers applied. unapplied_systems: FixedBitSet, - /// Setting when true applies deferred system buffers after all systems have run - apply_final_deferred: bool, - /// When set, tells the executor that a thread has panicked. - panic_payload: Arc>>>, /// When set, stops the executor from running any more systems. stop_spawning: bool, } +/// References to data required by the executor. +/// This is copied to each system task so that can invoke the executor when they complete. +// These all need to outlive 'scope in order to be sent to new tasks, +// and keeping them all in a struct means we can use lifetime elision. +#[derive(Copy, Clone)] +struct Context<'scope, 'env, 'sys> { + environment: &'env Environment<'env, 'sys>, + scope: &'scope Scope<'scope, 'env, ()>, +} + impl Default for MultiThreadedExecutor { fn default() -> Self { Self::new() @@ -129,25 +154,23 @@ impl SystemExecutor for MultiThreadedExecutor { } fn init(&mut self, schedule: &SystemSchedule) { + let state = self.state.get_mut().unwrap(); // pre-allocate space let sys_count = schedule.system_ids.len(); let set_count = schedule.set_ids.len(); - let (tx, rx) = async_channel::bounded(sys_count.max(1)); + self.system_completion = ConcurrentQueue::bounded(sys_count.max(1)); + state.evaluated_sets = FixedBitSet::with_capacity(set_count); + state.ready_systems = FixedBitSet::with_capacity(sys_count); + state.ready_systems_copy = FixedBitSet::with_capacity(sys_count); + state.running_systems = FixedBitSet::with_capacity(sys_count); + state.completed_systems = FixedBitSet::with_capacity(sys_count); + state.skipped_systems = FixedBitSet::with_capacity(sys_count); + state.unapplied_systems = FixedBitSet::with_capacity(sys_count); - self.sender = tx; - self.receiver = rx; - self.evaluated_sets = FixedBitSet::with_capacity(set_count); - self.ready_systems = FixedBitSet::with_capacity(sys_count); - self.ready_systems_copy = FixedBitSet::with_capacity(sys_count); - self.running_systems = FixedBitSet::with_capacity(sys_count); - self.completed_systems = FixedBitSet::with_capacity(sys_count); - self.skipped_systems = FixedBitSet::with_capacity(sys_count); - self.unapplied_systems = FixedBitSet::with_capacity(sys_count); - - self.system_task_metadata = Vec::with_capacity(sys_count); + state.system_task_metadata = Vec::with_capacity(sys_count); for index in 0..sys_count { - self.system_task_metadata.push(SystemTaskMetadata { + state.system_task_metadata.push(SystemTaskMetadata { archetype_component_access: default(), dependents: schedule.system_dependents[index].clone(), is_send: schedule.systems[index].is_send(), @@ -160,7 +183,7 @@ impl SystemExecutor for MultiThreadedExecutor { }); } - self.num_dependencies_remaining = Vec::with_capacity(sys_count); + state.num_dependencies_remaining = Vec::with_capacity(sys_count); } fn run( @@ -169,20 +192,23 @@ impl SystemExecutor for MultiThreadedExecutor { world: &mut World, _skip_systems: Option<&FixedBitSet>, ) { + let state = self.state.get_mut().unwrap(); // reset counts - self.num_systems = schedule.systems.len(); - if self.num_systems == 0 { + state.num_systems = schedule.systems.len(); + if state.num_systems == 0 { return; } - self.num_running_systems = 0; - self.num_completed_systems = 0; - self.num_dependencies_remaining.clear(); - self.num_dependencies_remaining + state.num_running_systems = 0; + state.num_completed_systems = 0; + state.num_dependencies_remaining.clear(); + state + .num_dependencies_remaining .extend_from_slice(&schedule.system_dependencies); - for (system_index, dependencies) in self.num_dependencies_remaining.iter_mut().enumerate() { + for (system_index, dependencies) in state.num_dependencies_remaining.iter_mut().enumerate() + { if *dependencies == 0 { - self.ready_systems.insert(system_index); + state.ready_systems.insert(system_index); } } @@ -190,16 +216,16 @@ impl SystemExecutor for MultiThreadedExecutor { // not be run. #[cfg(feature = "bevy_debug_stepping")] if let Some(skipped_systems) = _skip_systems { - debug_assert_eq!(skipped_systems.len(), self.completed_systems.len()); + debug_assert_eq!(skipped_systems.len(), state.completed_systems.len()); // mark skipped systems as completed - self.completed_systems |= skipped_systems; - self.num_completed_systems = self.completed_systems.count_ones(..); + state.completed_systems |= skipped_systems; + state.num_completed_systems = state.completed_systems.count_ones(..); // signal the dependencies for each of the skipped systems, as // though they had run for system_index in skipped_systems.ones() { - self.signal_dependents(system_index); - self.ready_systems.set(system_index, false); + state.signal_dependents(system_index); + state.ready_systems.set(system_index, false); } } @@ -208,69 +234,34 @@ impl SystemExecutor for MultiThreadedExecutor { .map(|e| e.0.clone()); let thread_executor = thread_executor.as_deref(); - let SyncUnsafeSchedule { - systems, - mut conditions, - } = SyncUnsafeSchedule::new(schedule); + let environment = &Environment::new(self, schedule, world); ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor( false, thread_executor, |scope| { - // the executor itself is a `Send` future so that it can run - // alongside systems that claim the local thread - #[allow(unused_mut)] - let mut executor = Box::pin(async { - let world_cell = world.as_unsafe_world_cell(); - while self.num_completed_systems < self.num_systems { - // SAFETY: - // - self.ready_systems does not contain running systems. - // - `world_cell` has mutable access to the entire world. - unsafe { - self.spawn_system_tasks(scope, systems, &mut conditions, world_cell); - } - - if self.num_running_systems > 0 { - // wait for systems to complete - if let Ok(result) = self.receiver.recv().await { - self.finish_system_and_handle_dependents(result); - } else { - panic!("Channel closed unexpectedly!"); - } - - while let Ok(result) = self.receiver.try_recv() { - self.finish_system_and_handle_dependents(result); - } - - self.rebuild_active_access(); - } - } - }); - - #[cfg(feature = "trace")] - let executor_span = info_span!("multithreaded executor"); - #[cfg(feature = "trace")] - let mut executor = executor.instrument(executor_span); + let context = Context { environment, scope }; - // Immediately poll the task once to avoid the overhead of the executor - // and thread wake-up. Only spawn the task if the executor does not immediately - // terminate. - if block_on(poll_once(&mut executor)).is_none() { - scope.spawn(executor); - } + // The first tick won't need to process finished systems, but we still need to run the loop in + // tick_executor() in case a system completes while the first tick still holds the mutex. + context.tick_executor(); }, ); + // End the borrows of self and world in environment by copying out the reference to systems. + let systems = environment.systems; + + let state = self.state.get_mut().unwrap(); if self.apply_final_deferred { // Do one final apply buffers after all systems have completed // Commands should be applied while on the scope's thread, not the executor's thread - let res = apply_deferred(&self.unapplied_systems, systems, world); + let res = apply_deferred(&state.unapplied_systems, systems, world); if let Err(payload) = res { let mut panic_payload = self.panic_payload.lock().unwrap(); *panic_payload = Some(payload); } - self.unapplied_systems.clear(); - debug_assert!(self.unapplied_systems.is_clear()); + state.unapplied_systems.clear(); + debug_assert!(state.unapplied_systems.is_clear()); } // check to see if there was a panic @@ -279,12 +270,12 @@ impl SystemExecutor for MultiThreadedExecutor { std::panic::resume_unwind(payload); } - debug_assert!(self.ready_systems.is_clear()); - debug_assert!(self.running_systems.is_clear()); - self.active_access.clear(); - self.evaluated_sets.clear(); - self.skipped_systems.clear(); - self.completed_systems.clear(); + debug_assert!(state.ready_systems.is_clear()); + debug_assert!(state.running_systems.is_clear()); + state.active_access.clear(); + state.evaluated_sets.clear(); + state.skipped_systems.clear(); + state.completed_systems.clear(); } fn set_apply_final_deferred(&mut self, value: bool) { @@ -292,15 +283,72 @@ impl SystemExecutor for MultiThreadedExecutor { } } +impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> { + fn system_completed( + &self, + system_index: usize, + res: Result<(), Box>, + system: &BoxedSystem, + ) { + // tell the executor that the system finished + self.environment + .executor + .system_completion + .push(SystemResult { + system_index, + success: res.is_ok(), + }) + .unwrap_or_else(|error| unreachable!("{}", error)); + if let Err(payload) = res { + eprintln!("Encountered a panic in system `{}`!", &*system.name()); + // set the payload to propagate the error + { + let mut panic_payload = self.environment.executor.panic_payload.lock().unwrap(); + *panic_payload = Some(payload); + } + } + self.tick_executor(); + } + + fn tick_executor(&self) { + // Ensure that the executor handles any events pushed to the system_completion queue by this thread. + // If this thread acquires the lock, the exector runs after the push() and they are processed. + // If this thread does not acquire the lock, then the is_empty() check on the other thread runs + // after the lock is released, which is after try_lock() failed, which is after the push() + // on this thread, so the is_empty() check will see the new events and loop. + loop { + let Ok(mut guard) = self.environment.executor.state.try_lock() else { + return; + }; + guard.tick(self); + // Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events. + drop(guard); + if self.environment.executor.system_completion.is_empty() { + return; + } + } + } +} + impl MultiThreadedExecutor { /// Creates a new multi-threaded executor for use with a [`Schedule`]. /// /// [`Schedule`]: crate::schedule::Schedule pub fn new() -> Self { - let (sender, receiver) = async_channel::unbounded(); Self { - sender, - receiver, + state: Mutex::new(ExecutorState::new()), + system_completion: ConcurrentQueue::unbounded(), + apply_final_deferred: true, + panic_payload: Mutex::new(None), + #[cfg(feature = "trace")] + executor_span: info_span!("multithreaded executor"), + } + } +} + +impl ExecutorState { + fn new() -> Self { + Self { system_task_metadata: Vec::new(), num_systems: 0, num_running_systems: 0, @@ -316,78 +364,114 @@ impl MultiThreadedExecutor { skipped_systems: FixedBitSet::new(), completed_systems: FixedBitSet::new(), unapplied_systems: FixedBitSet::new(), - apply_final_deferred: true, - panic_payload: Arc::new(Mutex::new(None)), stop_spawning: false, } } + fn tick(&mut self, context: &Context) { + #[cfg(feature = "trace")] + let _span = context.environment.executor.executor_span.enter(); + + for result in context.environment.executor.system_completion.try_iter() { + self.finish_system_and_handle_dependents(result); + } + + self.rebuild_active_access(); + + // SAFETY: + // - `finish_system_and_handle_dependents` has updated the currently running systems. + // - `rebuild_active_access` locks access for all currently running systems. + unsafe { + self.spawn_system_tasks(context); + } + } + /// # Safety /// - Caller must ensure that `self.ready_systems` does not contain any systems that /// have been mutably borrowed (such as the systems currently running). /// - `world_cell` must have permission to access all world data (not counting /// any world data that is claimed by systems currently running on this executor). - unsafe fn spawn_system_tasks<'scope>( - &mut self, - scope: &Scope<'_, 'scope, ()>, - systems: &'scope [SyncUnsafeCell], - conditions: &mut Conditions, - world_cell: UnsafeWorldCell<'scope>, - ) { + unsafe fn spawn_system_tasks(&mut self, context: &Context) { if self.exclusive_running { return; } + let mut conditions = context + .environment + .conditions + .try_lock() + .expect("Conditions should only be locked while owning the executor state"); + // can't borrow since loop mutably borrows `self` let mut ready_systems = std::mem::take(&mut self.ready_systems_copy); - ready_systems.clear(); - ready_systems.union_with(&self.ready_systems); - - for system_index in ready_systems.ones() { - assert!(!self.running_systems.contains(system_index)); - // SAFETY: Caller assured that these systems are not running. - // Therefore, no other reference to this system exists and there is no aliasing. - let system = unsafe { &mut *systems[system_index].get() }; - - if !self.can_run(system_index, system, conditions, world_cell) { - // NOTE: exclusive systems with ambiguities are susceptible to - // being significantly displaced here (compared to single-threaded order) - // if systems after them in topological order can run - // if that becomes an issue, `break;` if exclusive system - continue; - } - self.ready_systems.set(system_index, false); + // Skipping systems may cause their dependents to become ready immediately. + // If that happens, we need to run again immediately or we may fail to spawn those dependents. + let mut check_for_new_ready_systems = true; + while check_for_new_ready_systems { + check_for_new_ready_systems = false; - // SAFETY: `can_run` returned true, which means that: - // - It must have called `update_archetype_component_access` for each run condition. - // - There can be no systems running whose accesses would conflict with any conditions. - if unsafe { !self.should_run(system_index, system, conditions, world_cell) } { - self.skip_system_and_signal_dependents(system_index); - continue; - } + ready_systems.clear(); + ready_systems.union_with(&self.ready_systems); - self.running_systems.insert(system_index); - self.num_running_systems += 1; + for system_index in ready_systems.ones() { + assert!(!self.running_systems.contains(system_index)); + // SAFETY: Caller assured that these systems are not running. + // Therefore, no other reference to this system exists and there is no aliasing. + let system = unsafe { &mut *context.environment.systems[system_index].get() }; - if self.system_task_metadata[system_index].is_exclusive { - // SAFETY: `can_run` returned true for this system, which means - // that no other systems currently have access to the world. - let world = unsafe { world_cell.world_mut() }; - // SAFETY: `can_run` returned true for this system, - // which means no systems are currently borrowed. - unsafe { - self.spawn_exclusive_system_task(scope, system_index, systems, world); + if !self.can_run( + system_index, + system, + &mut conditions, + context.environment.world_cell, + ) { + // NOTE: exclusive systems with ambiguities are susceptible to + // being significantly displaced here (compared to single-threaded order) + // if systems after them in topological order can run + // if that becomes an issue, `break;` if exclusive system + continue; } - break; - } - // SAFETY: - // - No other reference to this system exists. - // - `can_run` has been called, which calls `update_archetype_component_access` with this system. - // - `can_run` returned true, so no systems with conflicting world access are running. - unsafe { - self.spawn_system_task(scope, system_index, systems, world_cell); + self.ready_systems.set(system_index, false); + + // SAFETY: `can_run` returned true, which means that: + // - It must have called `update_archetype_component_access` for each run condition. + // - There can be no systems running whose accesses would conflict with any conditions. + if unsafe { + !self.should_run( + system_index, + system, + &mut conditions, + context.environment.world_cell, + ) + } { + self.skip_system_and_signal_dependents(system_index); + // signal_dependents may have set more systems to ready. + check_for_new_ready_systems = true; + continue; + } + + self.running_systems.insert(system_index); + self.num_running_systems += 1; + + if self.system_task_metadata[system_index].is_exclusive { + // SAFETY: `can_run` returned true for this system, + // which means no systems are currently borrowed. + unsafe { + self.spawn_exclusive_system_task(context, system_index); + } + check_for_new_ready_systems = false; + break; + } + + // SAFETY: + // - Caller ensured no other reference to this system exists. + // - `can_run` has been called, which calls `update_archetype_component_access` with this system. + // - `can_run` returned true, so no systems with conflicting world access are running. + unsafe { + self.spawn_system_task(context, system_index); + } } } @@ -516,132 +600,80 @@ impl MultiThreadedExecutor { /// used by the specified system. /// - `update_archetype_component_access` must have been called with `world` /// on the system associated with `system_index`. - unsafe fn spawn_system_task<'scope>( - &mut self, - scope: &Scope<'_, 'scope, ()>, - system_index: usize, - systems: &'scope [SyncUnsafeCell], - world: UnsafeWorldCell<'scope>, - ) { + unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) { // SAFETY: this system is not running, no other reference exists - let system = unsafe { &mut *systems[system_index].get() }; - let sender = self.sender.clone(); - let panic_payload = self.panic_payload.clone(); + let system = unsafe { &mut *context.environment.systems[system_index].get() }; + // Move the full context object into the new future. + let context = *context; + + let system_meta = &self.system_task_metadata[system_index]; + + #[cfg(feature = "trace")] + let system_span = system_meta.system_task_span.clone(); let task = async move { let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + #[cfg(feature = "trace")] + let _span = system_span.enter(); // SAFETY: // - The caller ensures that we have permission to // access the world data used by the system. // - `update_archetype_component_access` has been called. - unsafe { system.run_unsafe((), world) }; + unsafe { system.run_unsafe((), context.environment.world_cell) }; })); - // tell the executor that the system finished - sender - .try_send(SystemResult { - system_index, - success: res.is_ok(), - }) - .unwrap_or_else(|error| unreachable!("{}", error)); - if let Err(payload) = res { - eprintln!("Encountered a panic in system `{}`!", &*system.name()); - // set the payload to propagate the error - { - let mut panic_payload = panic_payload.lock().unwrap(); - *panic_payload = Some(payload); - } - } + context.system_completed(system_index, res, system); }; - #[cfg(feature = "trace")] - let task = task.instrument( - self.system_task_metadata[system_index] - .system_task_span - .clone(), - ); - - let system_meta = &self.system_task_metadata[system_index]; self.active_access .extend(&system_meta.archetype_component_access); if system_meta.is_send { - scope.spawn(task); + context.scope.spawn(task); } else { self.local_thread_running = true; - scope.spawn_on_external(task); + context.scope.spawn_on_external(task); } } /// # Safety /// Caller must ensure no systems are currently borrowed. - unsafe fn spawn_exclusive_system_task<'scope>( - &mut self, - scope: &Scope<'_, 'scope, ()>, - system_index: usize, - systems: &'scope [SyncUnsafeCell], - world: &'scope mut World, - ) { + unsafe fn spawn_exclusive_system_task(&mut self, context: &Context, system_index: usize) { + // SAFETY: `can_run` returned true for this system, which means + // that no other systems currently have access to the world. + let world = unsafe { context.environment.world_cell.world_mut() }; // SAFETY: this system is not running, no other reference exists - let system = unsafe { &mut *systems[system_index].get() }; + let system = unsafe { &mut *context.environment.systems[system_index].get() }; + // Move the full context object into the new future. + let context = *context; + #[cfg(feature = "trace")] + let system_span = self.system_task_metadata[system_index] + .system_task_span + .clone(); - let sender = self.sender.clone(); - let panic_payload = self.panic_payload.clone(); if is_apply_deferred(system) { // TODO: avoid allocation let unapplied_systems = self.unapplied_systems.clone(); self.unapplied_systems.clear(); let task = async move { - let res = apply_deferred(&unapplied_systems, systems, world); - // tell the executor that the system finished - sender - .try_send(SystemResult { - system_index, - success: res.is_ok(), - }) - .unwrap_or_else(|error| unreachable!("{}", error)); - if let Err(payload) = res { - // set the payload to propagate the error - let mut panic_payload = panic_payload.lock().unwrap(); - *panic_payload = Some(payload); - } + let res = { + #[cfg(feature = "trace")] + let _span = system_span.enter(); + apply_deferred(&unapplied_systems, context.environment.systems, world) + }; + context.system_completed(system_index, res, system); }; - #[cfg(feature = "trace")] - let task = task.instrument( - self.system_task_metadata[system_index] - .system_task_span - .clone(), - ); - scope.spawn_on_scope(task); + context.scope.spawn_on_scope(task); } else { let task = async move { let res = std::panic::catch_unwind(AssertUnwindSafe(|| { + #[cfg(feature = "trace")] + let _span = system_span.enter(); system.run((), world); })); - // tell the executor that the system finished - sender - .try_send(SystemResult { - system_index, - success: res.is_ok(), - }) - .unwrap_or_else(|error| unreachable!("{}", error)); - if let Err(payload) = res { - eprintln!( - "Encountered a panic in exclusive system `{}`!", - &*system.name() - ); - // set the payload to propagate the error - let mut panic_payload = panic_payload.lock().unwrap(); - *panic_payload = Some(payload); - } + context.system_completed(system_index, res, system); }; - #[cfg(feature = "trace")] - let task = task.instrument( - self.system_task_metadata[system_index] - .system_task_span - .clone(), - ); - scope.spawn_on_scope(task); + context.scope.spawn_on_scope(task); } self.exclusive_running = true; @@ -769,3 +801,36 @@ impl MainThreadExecutor { MainThreadExecutor(TaskPool::get_thread_executor()) } } + +#[cfg(test)] +mod tests { + use crate::{ + self as bevy_ecs, + prelude::Resource, + schedule::{ExecutorKind, IntoSystemConfigs, Schedule}, + system::Commands, + world::World, + }; + + #[derive(Resource)] + struct R; + + #[test] + fn skipped_systems_notify_dependents() { + let mut world = World::new(); + let mut schedule = Schedule::default(); + schedule.set_executor_kind(ExecutorKind::MultiThreaded); + schedule.add_systems( + ( + (|| {}).run_if(|| false), + // This system depends on a sytem that is always skipped. + |mut commands: Commands| { + commands.insert_resource(R); + }, + ) + .chain(), + ); + schedule.run(&mut world); + assert!(world.get_resource::().is_some()); + } +} diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 1be0b0da174e3..f3837c4766fae 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -202,7 +202,7 @@ impl FakeTask { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'env async_executor::LocalExecutor<'env>, + executor: &'scope async_executor::LocalExecutor<'scope>, // Vector to gather results of all futures spawned during scope run results: &'env RefCell>>>>, @@ -219,7 +219,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'env>(&self, f: Fut) { + pub fn spawn + 'scope>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -230,7 +230,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_on_scope`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_external + 'env>(&self, f: Fut) { + pub fn spawn_on_external + 'scope>(&self, f: Fut) { self.spawn_on_scope(f); } @@ -239,7 +239,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// returned as a part of [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope + 'env>(&self, f: Fut) { + pub fn spawn_on_scope + 'scope>(&self, f: Fut) { let result = Rc::new(RefCell::new(None)); self.results.borrow_mut().push(result.clone()); let f = async move {