Skip to content

Commit

Permalink
use std::sync::mpsc::channel in all cases, WinitPlugin now spawns…
Browse files Browse the repository at this point in the history
… extra listener thread
  • Loading branch information
maniwani committed Nov 9, 2023
1 parent 6cf907d commit 5202c4e
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 145 deletions.
90 changes: 59 additions & 31 deletions crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
app_thread_channel, AppEvent, Main, MainSchedulePlugin, PlaceholderPlugin, Plugin, Plugins,
PluginsState, SubApp, SubApps,
app_thread_channel, AppEvent, AppEventReceiver, AppEventSender, Main, MainSchedulePlugin,
PlaceholderPlugin, Plugin, Plugins, PluginsState, SubApp, SubApps,
};
pub use bevy_derive::AppLabel;
use bevy_ecs::{
Expand Down Expand Up @@ -64,6 +64,8 @@ pub struct App {
pub sub_apps: SubApps,
#[doc(hidden)]
pub tls: ThreadLocalStorage,
send: AppEventSender,
recv: AppEventReceiver,
/// The function that will manage the app's lifecycle.
///
/// Bevy provides the [`WinitPlugin`] and [`ScheduleRunnerPlugin`] for windowed and headless
Expand Down Expand Up @@ -114,65 +116,96 @@ impl App {
///
/// Use this constructor if you want to customize scheduling, exit handling, cleanup, etc.
pub fn empty() -> App {
let (send, recv) = app_thread_channel();
Self {
sub_apps: SubApps::new(),
tls: ThreadLocalStorage::new(),
send,
recv,
runner: Some(Box::new(run_once)),
}
}

/// Disassembles the [`App`] and returns its individual parts.
pub fn into_parts(self) -> (SubApps, ThreadLocalStorage, Option<RunnerFn>) {
#[doc(hidden)]
pub fn into_parts(
self,
) -> (
SubApps,
ThreadLocalStorage,
AppEventSender,
AppEventReceiver,
Option<RunnerFn>,
) {
let Self {
sub_apps,
tls,
send,
recv,
runner,
} = self;

(sub_apps, tls, runner)
(sub_apps, tls, send, recv, runner)
}

/// Returns an [`App`] assembled from the given individual parts.
#[doc(hidden)]
pub fn from_parts(
sub_apps: SubApps,
tls: ThreadLocalStorage,
send: AppEventSender,
recv: AppEventReceiver,
runner: Option<RunnerFn>,
) -> Self {
App {
sub_apps,
tls,
send,
recv,
runner,
}
}

/// Inserts the channel to [`ThreadLocals`] into all sub-apps.
#[doc(hidden)]
pub fn insert_tls_channel(&mut self) {
self.sub_apps.iter_mut().for_each(|sub_app| {
self.tls
.insert_channel(sub_app.world_mut(), self.send.clone());
});
}

/// Removes the channel to [`ThreadLocals`] from all sub-apps.
#[doc(hidden)]
pub fn remove_tls_channel(&mut self) {
self.sub_apps
.iter_mut()
.for_each(|sub_app| self.tls.remove_channel(sub_app.world_mut()));
}

/// Runs the default schedules of all sub-apps (starting with the "main" app) once.
pub fn update(&mut self) {
if self.is_building_plugins() {
panic!("App::update() was called while a plugin was building.");
}

// disassemble
let (mut sub_apps, tls, runner) = std::mem::take(self).into_parts();

// create channel
let (send, recv) = app_thread_channel();
self.insert_tls_channel();

// insert channel
sub_apps
.iter_mut()
.for_each(|sub_app| tls.insert_channel(sub_app.world_mut(), send.clone()));
// disassemble
let (mut sub_apps, tls, send, recv, runner) = std::mem::take(self).into_parts();

#[cfg(not(target_arch = "wasm32"))]
{
// Move sub-apps to another thread and run an event loop in this thread.
let thread_send = send.clone();
let thread = std::thread::spawn(move || {
let result = catch_unwind(AssertUnwindSafe(|| {
sub_apps.update();
send.send(AppEvent::Exit(sub_apps)).unwrap();
thread_send.send(AppEvent::Exit(sub_apps)).unwrap();
}));

if let Some(payload) = result.err() {
send.send(AppEvent::Error(payload)).unwrap();
thread_send.send(AppEvent::Error(payload)).unwrap();
}
});

Expand Down Expand Up @@ -200,13 +233,10 @@ impl App {
sub_apps.update();
}

// remove channel
sub_apps
.iter_mut()
.for_each(|sub_app| tls.remove_channel(sub_app.world_mut()));

// reassemble
*self = App::from_parts(sub_apps, tls, runner);
*self = App::from_parts(sub_apps, tls, send, recv, runner);

self.remove_tls_channel();
}

/// Runs the [`App`] by calling its [runner](Self::set_runner).
Expand Down Expand Up @@ -239,6 +269,10 @@ impl App {
panic!("App::run() was called while a plugin was building.");
}

// Insert channel here because some sub-apps are moved to a different thread during
// plugin build.
self.insert_tls_channel();

if self.plugins_state() == PluginsState::Ready {
// If we're already ready, we finish up now and advance one frame.
// This prevents black frames during the launch transition on iOS.
Expand Down Expand Up @@ -893,14 +927,6 @@ impl App {
type RunnerFn = Box<dyn FnOnce(App)>;

fn run_once(mut app: App) {
// TODO: rework app setup
// create channel
let (send, recv) = app_thread_channel();
// insert channel
app.sub_apps
.iter_mut()
.for_each(|sub_app| app.tls.insert_channel(sub_app.world_mut(), send.clone()));

// wait for plugins to finish setting up
let plugins_state = app.plugins_state();
if plugins_state != PluginsState::Cleaned {
Expand All @@ -912,13 +938,13 @@ fn run_once(mut app: App) {
app.cleanup();
}

// if plugins where cleaned before the runner start, an update already ran
// If plugins where cleaned before the runner start, an update already ran
if plugins_state == PluginsState::Cleaned {
return;
}

// disassemble
let (mut sub_apps, _, _) = app.into_parts();
let (mut sub_apps, mut tls, send, recv, _) = app.into_parts();

#[cfg(not(target_arch = "wasm32"))]
{
Expand Down Expand Up @@ -955,6 +981,8 @@ fn run_once(mut app: App) {
{
sub_apps.update();
}

tls.clear();
}

/// An event that indicates the [`App`] should exit. If one or more of these are present at the
Expand Down
21 changes: 15 additions & 6 deletions crates/bevy_app/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,21 @@ impl ThreadLocalTaskSender for AppEventSender {
&mut self,
task: ThreadLocalTask,
) -> Result<(), ThreadLocalTaskSendError<ThreadLocalTask>> {
self.send(AppEvent::Task(task)).map_err(|error| {
let AppEvent::Task(task) = error.0 else {
unreachable!()
};
ThreadLocalTaskSendError(task)
})
#[cfg(not(target_arch = "wasm32"))]
{
self.send(AppEvent::Task(task)).map_err(|error| {
let AppEvent::Task(task) = error.0 else {
unreachable!()
};
ThreadLocalTaskSendError(task)
})
}

#[cfg(target_arch = "wasm32")]
{
// wasm builds should always access TLS directly
unreachable!("currently, only single-threaded wasm is supported")
}
}
}

Expand Down
14 changes: 4 additions & 10 deletions crates/bevy_app/src/schedule_runner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{app_thread_channel, App, AppEvent, AppExit, Plugin, PluginsState, SubApps};
use crate::{App, AppEvent, AppExit, Plugin, PluginsState, SubApps};
use bevy_ecs::event::{Events, ManualEventReader};
use bevy_utils::{Duration, Instant};

Expand Down Expand Up @@ -66,14 +66,6 @@ impl Plugin for ScheduleRunnerPlugin {
fn build(&self, app: &mut App) {
let run_mode = self.run_mode;
app.set_runner(move |mut app: App| {
// TODO: rework app setup
// create channel
let (send, recv) = app_thread_channel();
// insert channel
app.sub_apps.iter_mut().for_each(|sub_app| {
app.tls.insert_channel(sub_app.world_mut(), send.clone());
});

// wait for plugins to finish setting up
let plugins_state = app.plugins_state();
if plugins_state != PluginsState::Cleaned {
Expand Down Expand Up @@ -116,7 +108,7 @@ impl Plugin for ScheduleRunnerPlugin {
};

// disassemble
let (mut sub_apps, _, _) = app.into_parts();
let (mut sub_apps, mut tls, send, recv, _) = app.into_parts();

#[cfg(not(target_arch = "wasm32"))]
{
Expand Down Expand Up @@ -152,6 +144,8 @@ impl Plugin for ScheduleRunnerPlugin {
}
}
}

tls.clear();
}

#[cfg(target_arch = "wasm32")]
Expand Down
26 changes: 12 additions & 14 deletions crates/bevy_ecs/src/storage/resource_non_send.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::any::TypeId;
use std::cell::RefCell;
use std::marker::PhantomData;
use std::mem;
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
use std::sync::mpsc::sync_channel;

use bevy_ptr::{OwningPtr, Ptr};

Expand Down Expand Up @@ -337,7 +340,8 @@ pub type ThreadLocalTask = Box<dyn FnOnce() + Send + 'static>;
#[derive(Debug)]
pub struct ThreadLocalTaskSendError<T>(pub T);

/// Channel for sending [`ThreadLocalTask`] instances.
/// Sends [`ThreadLocalTask`].
// TODO: can remove this trait if `bevy_app` changes from its own crate to a `bevy_ecs` feature
pub trait ThreadLocalTaskSender: Send + 'static {
/// Attempts to send a task over this channel, returning it back if it could not be sent.
fn send_task(
Expand All @@ -353,11 +357,7 @@ struct ThreadLocalChannel {
sender: Box<dyn ThreadLocalTaskSender>,
}

// SAFETY: The pointer to the thread-local storage is only dereferenced in its owning thread.
unsafe impl Send for ThreadLocalChannel {}

// SAFETY: The pointer to the thread-local storage is only dereferenced in its owning thread.
// Likewise, all operations require an exclusive reference, so there can be no races.
// SAFETY: All operations require an exclusive reference, so there can be no races.
unsafe impl Sync for ThreadLocalChannel {}

/// A guard to access [`ThreadLocals`].
Expand Down Expand Up @@ -484,7 +484,7 @@ impl ThreadLocal<'_, '_> {

TLS.with_borrow_mut(|tls| {
tls.update_change_tick();
let saved = std::mem::replace(&mut tls.last_tick, *self.last_run);
let saved = mem::replace(&mut tls.last_tick, *self.last_run);
let result = f(tls);
tls.last_tick = saved;
*self.last_run = tls.curr_tick;
Expand All @@ -502,15 +502,13 @@ impl ThreadLocal<'_, '_> {
};

let system_tick = *self.last_run;
let (result_tx, result_rx) = std::sync::mpsc::sync_channel(1);
let (result_tx, result_rx) = sync_channel(1);
let task = move || {
TLS.with_borrow_mut(|tls| {
tls.update_change_tick();
let saved = std::mem::replace(&mut tls.last_tick, system_tick);
let saved = mem::replace(&mut tls.last_tick, system_tick);
// we want to propagate to caller instead of panicking in the main thread
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
(f(tls), tls.curr_tick)
}));
let result = catch_unwind(AssertUnwindSafe(|| (f(tls), tls.curr_tick)));
tls.last_tick = saved;
result_tx.send(result).unwrap();
});
Expand All @@ -519,7 +517,7 @@ impl ThreadLocal<'_, '_> {
let task: Box<dyn FnOnce() + Send> = Box::new(task);
// SAFETY: This function will block the calling thread until `f` completes,
// so any captured references in `f` will remain valid until then.
let task: Box<dyn FnOnce() + Send + 'static> = unsafe { std::mem::transmute(task) };
let task: Box<dyn FnOnce() + Send + 'static> = unsafe { mem::transmute(task) };

// Send task to the main thread.
sender
Expand All @@ -533,7 +531,7 @@ impl ThreadLocal<'_, '_> {
result
}
Err(payload) => {
std::panic::resume_unwind(payload);
resume_unwind(payload);
}
}
}
Expand Down
Loading

0 comments on commit 5202c4e

Please sign in to comment.