Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Next Task Optimization for the Mulithreaded Executor #12869

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 92 additions & 24 deletions crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,43 @@ use crate as bevy_ecs;

use super::__rust_begin_short_backtrace;

struct QueuedSystem {
system_index: usize,
#[cfg(feature = "trace")]
system_span: Span,
}

impl QueuedSystem {
/// # Safety
/// - Caller must not alias systems that are running.
/// - `world` must have permission to access the world data
/// used by the specified system.
/// - `update_archetype_component_access` must have been called with `world`
/// on the system associated with `system_index`.
unsafe fn run_system(self, context: &Context) {
// SAFETY: this system is not running, no other reference exists
let system = unsafe { &mut *context.environment.systems[self.system_index].get() };
// Move the full context object into the new future.
let context = *context;

let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
#[cfg(feature = "trace")]
let _span = self.system_span.enter();
// SAFETY:
// - The caller ensures that we have permission to
// access the world data used by the system.
// - `update_archetype_component_access` has been called.
unsafe {
__rust_begin_short_backtrace::run_unsafe(
&mut **system,
context.environment.world_cell,
);
};
}));
context.system_completed(self.system_index, res, system);
}
}

/// Borrowed data used by the [`MultiThreadedExecutor`].
struct Environment<'env, 'sys> {
executor: &'env MultiThreadedExecutor,
Expand Down Expand Up @@ -322,9 +359,15 @@ impl<'scope, 'env: 'scope, 'sys> Context<'scope, 'env, 'sys> {
let Ok(mut guard) = self.environment.executor.state.try_lock() else {
return;
};
guard.tick(self);
let cached_system = guard.tick(self);
// Make sure we drop the guard before checking system_completion.is_empty(), or we could lose events.
drop(guard);
if let Some(cached_system) = cached_system {
// SAFETY: The prior executor run guarantees that this system execution will not alias any
// data stored in the world, that the system has the permission to run, and that the system's
// archetypes are updated.
unsafe { cached_system.run_system(self) };
}
if self.environment.executor.system_completion.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If other threads failed to lock the executor, running the task before looping will delay spawning dependent systems until the cached system is run.

For example, if you have add_systems(Update, ((a, b).chain(), (c, d).chain())), then b and d can run concurrently. But if c completes while a holds the executor lock, then the thread that completed a will run b before running the executor to spawn d.

You may want to do something like

let mut cached_system = None;
loop {
    if let Ok(mut guard) = self.environment.executor.state.try_lock() {
        guard.tick(self, &mut cached_system);
        drop(guard);
        if !self.environment.executor.system_completion.is_empty() {
            continue;
        }
    }
    match cached_system.take() {
        Some(cached_system) => unsafe { cached_system.run_system(self) },
        None => break,
    }
}

return;
}
Expand Down Expand Up @@ -370,7 +413,8 @@ impl ExecutorState {
}
}

fn tick(&mut self, context: &Context) {
#[must_use]
fn tick(&mut self, context: &Context) -> Option<QueuedSystem> {
#[cfg(feature = "trace")]
let _span = context.environment.executor.executor_span.enter();

Expand All @@ -383,19 +427,18 @@ impl ExecutorState {
// SAFETY:
// - `finish_system_and_handle_dependents` has updated the currently running systems.
// - `rebuild_active_access` locks access for all currently running systems.
unsafe {
self.spawn_system_tasks(context);
}
unsafe { self.spawn_system_tasks(context) }
}

/// # Safety
/// - Caller must ensure that `self.ready_systems` does not contain any systems that
/// have been mutably borrowed (such as the systems currently running).
/// - `world_cell` must have permission to access all world data (not counting
/// any world data that is claimed by systems currently running on this executor).
unsafe fn spawn_system_tasks(&mut self, context: &Context) {
#[must_use]
unsafe fn spawn_system_tasks(&mut self, context: &Context) -> Option<QueuedSystem> {
if self.exclusive_running {
return;
return None;
}

let mut conditions = context
Expand All @@ -407,6 +450,8 @@ impl ExecutorState {
// can't borrow since loop mutably borrows `self`
let mut ready_systems = std::mem::take(&mut self.ready_systems_copy);

let mut cached_system = None;

// Skipping systems may cause their dependents to become ready immediately.
// If that happens, we need to run again immediately or we may fail to spawn those dependents.
let mut check_for_new_ready_systems = true;
Expand Down Expand Up @@ -467,6 +512,13 @@ impl ExecutorState {
break;
}

if cached_system.is_none() {
cached_system = self.spawn_cached_task(system_index);
if cached_system.is_some() {
continue;
}
}

// SAFETY:
// - Caller ensured no other reference to this system exists.
// - `can_run` has been called, which calls `update_archetype_component_access` with this system.
Expand All @@ -479,6 +531,7 @@ impl ExecutorState {

// give back
self.ready_systems_copy = ready_systems;
cached_system
}

fn can_run(
Expand Down Expand Up @@ -596,38 +649,53 @@ impl ExecutorState {
should_run
}

/// # Safety
/// - Caller must not alias systems that are running.
/// - `world` must have permission to access the world data
/// used by the specified system.
/// - `update_archetype_component_access` must have been called with `world`
/// on the system associated with `system_index`.
#[must_use]
unsafe fn spawn_cached_task(&mut self, system_index: usize) -> Option<QueuedSystem> {
let system_meta = &self.system_task_metadata[system_index];

system_meta.is_send.then(|| {
self.active_access
.extend(&system_meta.archetype_component_access);

#[cfg(feature = "trace")]
let system_span = system_meta.system_task_span.clone();
QueuedSystem {
system_index,
#[cfg(feature = "trace")]
system_span,
}
})
}

/// # Safety
/// - Caller must not alias systems that are running.
/// - `world` must have permission to access the world data
/// used by the specified system.
/// - `update_archetype_component_access` must have been called with `world`
/// on the system associated with `system_index`.
unsafe fn spawn_system_task(&mut self, context: &Context, system_index: usize) {
// SAFETY: this system is not running, no other reference exists
let system = unsafe { &mut *context.environment.systems[system_index].get() };
// Move the full context object into the new future.
let context = *context;

let system_meta = &self.system_task_metadata[system_index];

#[cfg(feature = "trace")]
let system_span = system_meta.system_task_span.clone();
let queued_system = QueuedSystem {
system_index,
#[cfg(feature = "trace")]
system_span,
};
let task = async move {
let res = std::panic::catch_unwind(AssertUnwindSafe(|| {
#[cfg(feature = "trace")]
let _span = system_span.enter();
// SAFETY:
// - The caller ensures that we have permission to
// access the world data used by the system.
// - `update_archetype_component_access` has been called.
unsafe {
__rust_begin_short_backtrace::run_unsafe(
&mut **system,
context.environment.world_cell,
);
};
}));
context.system_completed(system_index, res, system);
// SAFETY: This function has the same invariants as run_system. The caller
// guarantees that they're satisfied.
unsafe { queued_system.run_system(&context) };
};

self.active_access
Expand Down