diff --git a/benches/benches/bevy_ecs/iteration/mod.rs b/benches/benches/bevy_ecs/iteration/mod.rs index e3ed6a6afeabe..790884335021e 100644 --- a/benches/benches/bevy_ecs/iteration/mod.rs +++ b/benches/benches/bevy_ecs/iteration/mod.rs @@ -18,6 +18,7 @@ mod iter_simple_sparse_set; mod iter_simple_system; mod iter_simple_wide; mod iter_simple_wide_sparse_set; +mod par_iter_simple; use heavy_compute::*; @@ -27,6 +28,7 @@ criterion_group!( iter_frag_sparse, iter_simple, heavy_compute, + par_iter_simple, ); fn iter_simple(c: &mut Criterion) { @@ -117,3 +119,15 @@ fn iter_frag_sparse(c: &mut Criterion) { }); group.finish(); } + +fn par_iter_simple(c: &mut Criterion) { + let mut group = c.benchmark_group("par_iter_simple"); + group.warm_up_time(std::time::Duration::from_millis(500)); + group.measurement_time(std::time::Duration::from_secs(4)); + for f in [0, 10, 100, 1000] { + group.bench_function(format!("with_{}_fragment", f), |b| { + let mut bench = par_iter_simple::Benchmark::new(f); + b.iter(move || bench.run()); + }); + } +} diff --git a/benches/benches/bevy_ecs/iteration/par_iter_simple.rs b/benches/benches/bevy_ecs/iteration/par_iter_simple.rs new file mode 100644 index 0000000000000..76489e33a84a3 --- /dev/null +++ b/benches/benches/bevy_ecs/iteration/par_iter_simple.rs @@ -0,0 +1,73 @@ +use bevy_ecs::prelude::*; +use bevy_tasks::{ComputeTaskPool, TaskPool}; +use glam::*; + +#[derive(Component, Copy, Clone)] +struct Transform(Mat4); + +#[derive(Component, Copy, Clone)] +struct Position(Vec3); + +#[derive(Component, Copy, Clone)] +struct Rotation(Vec3); + +#[derive(Component, Copy, Clone)] +struct Velocity(Vec3); + +#[derive(Component, Copy, Clone, Default)] +struct Data(f32); +pub struct Benchmark<'w>(World, QueryState<(&'w Velocity, &'w mut Position)>); + +fn insert_if_bit_enabled(entity: &mut EntityWorldMut, i: u16) { + if i & 1 << B != 0 { + entity.insert(Data::(1.0)); + } +} + +impl<'w> Benchmark<'w> { + pub fn new(fragment: u16) -> Self { + ComputeTaskPool::get_or_init(TaskPool::default); + + let mut world = World::new(); + + let iter = world.spawn_batch( + std::iter::repeat(( + Transform(Mat4::from_scale(Vec3::ONE)), + Position(Vec3::X), + Rotation(Vec3::X), + Velocity(Vec3::X), + )) + .take(100_000), + ); + let entities = iter.into_iter().collect::>(); + for i in 0..fragment { + let mut e = world.entity_mut(entities[i as usize]); + insert_if_bit_enabled::<0>(&mut e, i); + insert_if_bit_enabled::<1>(&mut e, i); + insert_if_bit_enabled::<2>(&mut e, i); + insert_if_bit_enabled::<3>(&mut e, i); + insert_if_bit_enabled::<4>(&mut e, i); + insert_if_bit_enabled::<5>(&mut e, i); + insert_if_bit_enabled::<6>(&mut e, i); + insert_if_bit_enabled::<7>(&mut e, i); + insert_if_bit_enabled::<8>(&mut e, i); + insert_if_bit_enabled::<9>(&mut e, i); + insert_if_bit_enabled::<10>(&mut e, i); + insert_if_bit_enabled::<11>(&mut e, i); + insert_if_bit_enabled::<12>(&mut e, i); + insert_if_bit_enabled::<13>(&mut e, i); + insert_if_bit_enabled::<14>(&mut e, i); + insert_if_bit_enabled::<15>(&mut e, i); + } + + let query = world.query::<(&Velocity, &mut Position)>(); + Self(world, query) + } + + #[inline(never)] + pub fn run(&mut self) { + self.1 + .par_iter_mut(&mut self.0) + .for_each(|(v, mut p)| p.0 += v.0); + } +} diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index 6e54aaf7359a2..ab5314cfd2c15 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -11,7 +11,7 @@ categories = ["game-engines", "data-structures"] [features] trace = [] -multi-threaded = ["bevy_tasks/multi-threaded"] +multi-threaded = ["bevy_tasks/multi-threaded", "arrayvec"] bevy_debug_stepping = [] default = ["bevy_reflect", "bevy_debug_stepping"] @@ -30,6 +30,7 @@ rustc-hash = "1.1" serde = "1" thiserror = "1.0" nonmax = "0.5" +arrayvec = { version = "0.7.4", optional = true } [dev-dependencies] rand = "0.8" diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 5731fcb77b397..84fd805fa6ca9 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1387,58 +1387,99 @@ impl QueryState { ) { // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual + use arrayvec::ArrayVec; bevy_tasks::ComputeTaskPool::get().scope(|scope| { // SAFETY: We only access table data that has been registered in `self.archetype_component_access`. let tables = unsafe { &world.storages().tables }; let archetypes = world.archetypes(); - for storage_id in &self.matched_storage_ids { - if D::IS_DENSE && F::IS_DENSE { - let table_id = storage_id.table_id; - let table = &tables[table_id]; - if table.is_empty() { - continue; + let mut batch_queue = ArrayVec::new(); + let mut queue_entity_count = 0; + + // submit a list of storages which smaller than batch_size as single task + let submit_batch_queue = |queue: &mut ArrayVec| { + if queue.is_empty() { + return; + } + let queue = std::mem::take(queue); + let mut func = func.clone(); + scope.spawn(async move { + #[cfg(feature = "trace")] + let _span = self.par_iter_span.enter(); + let mut iter = self.iter_unchecked_manual(world, last_run, this_run); + for storage_id in queue { + if D::IS_DENSE && F::IS_DENSE { + let id = storage_id.table_id; + let table = &world.storages().tables.get(id).debug_checked_unwrap(); + iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()); + } else { + let id = storage_id.archetype_id; + let archetype = world.archetypes().get(id).debug_checked_unwrap(); + iter.for_each_in_archetype_range( + &mut func, + archetype, + 0..archetype.len(), + ); + } } + }); + }; - let mut offset = 0; - while offset < table.entity_count() { - let mut func = func.clone(); - let len = batch_size.min(table.entity_count() - offset); - let batch = offset..offset + len; - scope.spawn(async move { - #[cfg(feature = "trace")] - let _span = self.par_iter_span.enter(); - let table = - &world.storages().tables.get(table_id).debug_checked_unwrap(); + // submit single storage larger than batch_size + let submit_single = |count, storage_id: StorageId| { + for offset in (0..count).step_by(batch_size) { + let mut func = func.clone(); + let len = batch_size.min(count - offset); + let batch = offset..offset + len; + scope.spawn(async move { + #[cfg(feature = "trace")] + let _span = self.par_iter_span.enter(); + if D::IS_DENSE && F::IS_DENSE { + let id = storage_id.table_id; + let table = world.storages().tables.get(id).debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) .for_each_in_table_range(&mut func, table, batch); - }); - offset += batch_size; - } - } else { - let archetype_id = storage_id.archetype_id; - let archetype = &archetypes[archetype_id]; - if archetype.is_empty() { - continue; - } - - let mut offset = 0; - while offset < archetype.len() { - let mut func = func.clone(); - let len = batch_size.min(archetype.len() - offset); - let batch = offset..offset + len; - scope.spawn(async move { - #[cfg(feature = "trace")] - let _span = self.par_iter_span.enter(); - let archetype = - world.archetypes().get(archetype_id).debug_checked_unwrap(); + } else { + let id = storage_id.archetype_id; + let archetype = world.archetypes().get(id).debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) .for_each_in_archetype_range(&mut func, archetype, batch); - }); - offset += batch_size; - } + } + }); + } + }; + + let storage_entity_count = |storage_id: StorageId| -> usize { + if D::IS_DENSE && F::IS_DENSE { + tables[storage_id.table_id].entity_count() + } else { + archetypes[storage_id.archetype_id].len() + } + }; + + for storage_id in &self.matched_storage_ids { + let count = storage_entity_count(*storage_id); + + // skip empty storage + if count == 0 { + continue; + } + // immediately submit large storage + if count >= batch_size { + submit_single(count, *storage_id); + continue; + } + // merge small storage + batch_queue.push(*storage_id); + queue_entity_count += count; + + // submit batch_queue + if queue_entity_count >= batch_size || batch_queue.is_full() { + submit_batch_queue(&mut batch_queue); + queue_entity_count = 0; } } + submit_batch_queue(&mut batch_queue); }); }