From 81efcb2c8b83d9c0fb0626d74afaee8d64ebb111 Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 2 Apr 2024 20:13:03 +0800 Subject: [PATCH 01/10] submit --- crates/bevy_ecs/src/query/state.rs | 129 ++++++++++++++++++++--------- 1 file changed, 90 insertions(+), 39 deletions(-) diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 5731fcb77b397..66b89a0ae8432 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1392,53 +1392,104 @@ impl QueryState { // SAFETY: We only access table data that has been registered in `self.archetype_component_access`. let tables = unsafe { &world.storages().tables }; let archetypes = world.archetypes(); - for storage_id in &self.matched_storage_ids { + let mut batch_queue = vec![]; + let mut queue_entity_count = 0; + + // submit a list of storages which size smaller than batch_size as single task + let submit_batch_queue = |queue: &mut Vec| { + if queue.is_empty() { + return; + } + let queue = std::mem::take(queue); + let mut func = func.clone(); if D::IS_DENSE && F::IS_DENSE { - let table_id = storage_id.table_id; - let table = &tables[table_id]; - if table.is_empty() { - continue; - } - - let mut offset = 0; - while offset < table.entity_count() { - let mut func = func.clone(); - let len = batch_size.min(table.entity_count() - offset); - let batch = offset..offset + len; - scope.spawn(async move { - #[cfg(feature = "trace")] - let _span = self.par_iter_span.enter(); + scope.spawn(async move { + #[cfg(feature = "trace")] + let _span = self.par_iter_span.enter(); + let mut iter = self.iter_unchecked_manual(world, last_run, this_run); + for storage_id in queue { + let table_id = storage_id.table_id; + let table = + &world.storages().tables.get(table_id).debug_checked_unwrap(); + iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()) + } + }); + } else { + scope.spawn(async move { + #[cfg(feature = "trace")] + let _span = self.par_iter_span.enter(); + let mut iter = self.iter_unchecked_manual(world, last_run, this_run); + for storage_id in queue { + let table_id = storage_id.table_id; let table = &world.storages().tables.get(table_id).debug_checked_unwrap(); + iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()) + } + }); + } + }; + + // submit single storage which size larger than batch_size + let submit_single = |count, storage_id: StorageId| { + for offset in (0..count).step_by(batch_size) { + let mut func = func.clone(); + let len = batch_size.min(count - offset); + let batch = offset..offset + len; + scope.spawn(async move { + #[cfg(feature = "trace")] + let _span = self.par_iter_span.enter(); + if D::IS_DENSE && F::IS_DENSE { + let table = world + .storages() + .tables + .get(storage_id.table_id) + .debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) .for_each_in_table_range(&mut func, table, batch); - }); - offset += batch_size; - } - } else { - let archetype_id = storage_id.archetype_id; - let archetype = &archetypes[archetype_id]; - if archetype.is_empty() { - continue; - } - - let mut offset = 0; - while offset < archetype.len() { - let mut func = func.clone(); - let len = batch_size.min(archetype.len() - offset); - let batch = offset..offset + len; - scope.spawn(async move { - #[cfg(feature = "trace")] - let _span = self.par_iter_span.enter(); - let archetype = - world.archetypes().get(archetype_id).debug_checked_unwrap(); + } else { + let archetype = world + .archetypes() + .get(storage_id.archetype_id) + .debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) - .for_each_in_archetype_range(&mut func, archetype, batch); - }); - offset += batch_size; - } + .for_each_in_archetype_range(&mut func, archetype, batch) + } + }); + } + }; + + let storage_entity_count = |storage_id: StorageId| -> usize { + if D::IS_DENSE && F::IS_DENSE { + tables[storage_id.table_id].entity_count() + } else { + archetypes[storage_id.archetype_id].len() + } + }; + + for storage_id in &self.matched_storage_ids { + let count = storage_entity_count(*storage_id); + + // skip empty table + if count == 0 { + continue; + } + // immediately submit for large storage + if count >= batch_size { + submit_single(count, *storage_id); + } + // merge small storage + else { + batch_queue.push(*storage_id); + queue_entity_count += count; + } + + // submit batch_queue + if queue_entity_count >= batch_size { + submit_batch_queue(&mut batch_queue); + continue; } } + submit_batch_queue(&mut batch_queue); }); } From b5116be25b84da944ddeb34948659bc8ca824672 Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 2 Apr 2024 20:23:19 +0800 Subject: [PATCH 02/10] cleanup --- crates/bevy_ecs/src/query/state.rs | 54 ++++++++++++------------------ 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 66b89a0ae8432..b9cdec991809c 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1402,31 +1402,26 @@ impl QueryState { } let queue = std::mem::take(queue); let mut func = func.clone(); - if D::IS_DENSE && F::IS_DENSE { - scope.spawn(async move { - #[cfg(feature = "trace")] - let _span = self.par_iter_span.enter(); - let mut iter = self.iter_unchecked_manual(world, last_run, this_run); - for storage_id in queue { - let table_id = storage_id.table_id; - let table = - &world.storages().tables.get(table_id).debug_checked_unwrap(); - iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()) - } - }); - } else { - scope.spawn(async move { - #[cfg(feature = "trace")] - let _span = self.par_iter_span.enter(); - let mut iter = self.iter_unchecked_manual(world, last_run, this_run); - for storage_id in queue { - let table_id = storage_id.table_id; - let table = - &world.storages().tables.get(table_id).debug_checked_unwrap(); + scope.spawn(async move { + #[cfg(feature = "trace")] + let _span = self.par_iter_span.enter(); + let mut iter = self.iter_unchecked_manual(world, last_run, this_run); + for storage_id in queue { + if D::IS_DENSE && F::IS_DENSE { + let id = storage_id.table_id; + let table = &world.storages().tables.get(id).debug_checked_unwrap(); iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()) + } else { + let id = storage_id.archetype_id; + let archetype = world.archetypes().get(id).debug_checked_unwrap(); + iter.for_each_in_archetype_range( + &mut func, + archetype, + 0..archetype.len(), + ) } - }); - } + } + }); }; // submit single storage which size larger than batch_size @@ -1439,18 +1434,13 @@ impl QueryState { #[cfg(feature = "trace")] let _span = self.par_iter_span.enter(); if D::IS_DENSE && F::IS_DENSE { - let table = world - .storages() - .tables - .get(storage_id.table_id) - .debug_checked_unwrap(); + let id = storage_id.table_id; + let table = world.storages().tables.get(id).debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) .for_each_in_table_range(&mut func, table, batch); } else { - let archetype = world - .archetypes() - .get(storage_id.archetype_id) - .debug_checked_unwrap(); + let id = storage_id.archetype_id; + let archetype = world.archetypes().get(id).debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) .for_each_in_archetype_range(&mut func, archetype, batch) } From 2e9bb48a831a3ddac64c4de5a06d1f59abeeb68d Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 2 Apr 2024 20:38:55 +0800 Subject: [PATCH 03/10] typo --- crates/bevy_ecs/src/query/state.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index b9cdec991809c..99aa86b4f87a8 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1395,7 +1395,7 @@ impl QueryState { let mut batch_queue = vec![]; let mut queue_entity_count = 0; - // submit a list of storages which size smaller than batch_size as single task + // submit a list of storages which smaller than batch_size as single task let submit_batch_queue = |queue: &mut Vec| { if queue.is_empty() { return; @@ -1424,7 +1424,7 @@ impl QueryState { }); }; - // submit single storage which size larger than batch_size + // submit single storage larger than batch_size let submit_single = |count, storage_id: StorageId| { for offset in (0..count).step_by(batch_size) { let mut func = func.clone(); @@ -1459,11 +1459,11 @@ impl QueryState { for storage_id in &self.matched_storage_ids { let count = storage_entity_count(*storage_id); - // skip empty table + // skip empty storage if count == 0 { continue; } - // immediately submit for large storage + // immediately submit large storage if count >= batch_size { submit_single(count, *storage_id); } From fbe00219b09f4368a1ca4652c51f437c51e71791 Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 2 Apr 2024 20:52:40 +0800 Subject: [PATCH 04/10] fix cli --- crates/bevy_ecs/src/query/state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 99aa86b4f87a8..77790e139784d 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1410,7 +1410,7 @@ impl QueryState { if D::IS_DENSE && F::IS_DENSE { let id = storage_id.table_id; let table = &world.storages().tables.get(id).debug_checked_unwrap(); - iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()) + iter.for_each_in_table_range(&mut func, table, 0..table.entity_count()); } else { let id = storage_id.archetype_id; let archetype = world.archetypes().get(id).debug_checked_unwrap(); @@ -1418,7 +1418,7 @@ impl QueryState { &mut func, archetype, 0..archetype.len(), - ) + ); } } }); @@ -1442,7 +1442,7 @@ impl QueryState { let id = storage_id.archetype_id; let archetype = world.archetypes().get(id).debug_checked_unwrap(); self.iter_unchecked_manual(world, last_run, this_run) - .for_each_in_archetype_range(&mut func, archetype, batch) + .for_each_in_archetype_range(&mut func, archetype, batch); } }); } From 6e79e47c4c53b63d7c67b16736ec5908ca51613f Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 2 Apr 2024 22:43:35 +0800 Subject: [PATCH 05/10] bench --- benches/benches/bevy_ecs/iteration/mod.rs | 14 ++++ .../bevy_ecs/iteration/par_iter_simple.rs | 73 +++++++++++++++++++ 2 files changed, 87 insertions(+) create mode 100644 benches/benches/bevy_ecs/iteration/par_iter_simple.rs diff --git a/benches/benches/bevy_ecs/iteration/mod.rs b/benches/benches/bevy_ecs/iteration/mod.rs index e3ed6a6afeabe..98fcf69d1c823 100644 --- a/benches/benches/bevy_ecs/iteration/mod.rs +++ b/benches/benches/bevy_ecs/iteration/mod.rs @@ -18,6 +18,7 @@ mod iter_simple_sparse_set; mod iter_simple_system; mod iter_simple_wide; mod iter_simple_wide_sparse_set; +mod par_iter_simple; use heavy_compute::*; @@ -27,6 +28,7 @@ criterion_group!( iter_frag_sparse, iter_simple, heavy_compute, + par_iter_simple, ); fn iter_simple(c: &mut Criterion) { @@ -117,3 +119,15 @@ fn iter_frag_sparse(c: &mut Criterion) { }); group.finish(); } + +fn par_iter_simple(c: &mut Criterion) { + let mut group = c.benchmark_group("par_iter_simple"); + group.warm_up_time(std::time::Duration::from_millis(500)); + group.measurement_time(std::time::Duration::from_secs(4)); + for f in [0, 10, 500, 10000] { + group.bench_function(format!("with_{}_fragment", f), |b| { + let mut bench = par_iter_simple::Benchmark::new(f); + b.iter(move || bench.run()); + }); + } +} diff --git a/benches/benches/bevy_ecs/iteration/par_iter_simple.rs b/benches/benches/bevy_ecs/iteration/par_iter_simple.rs new file mode 100644 index 0000000000000..76489e33a84a3 --- /dev/null +++ b/benches/benches/bevy_ecs/iteration/par_iter_simple.rs @@ -0,0 +1,73 @@ +use bevy_ecs::prelude::*; +use bevy_tasks::{ComputeTaskPool, TaskPool}; +use glam::*; + +#[derive(Component, Copy, Clone)] +struct Transform(Mat4); + +#[derive(Component, Copy, Clone)] +struct Position(Vec3); + +#[derive(Component, Copy, Clone)] +struct Rotation(Vec3); + +#[derive(Component, Copy, Clone)] +struct Velocity(Vec3); + +#[derive(Component, Copy, Clone, Default)] +struct Data(f32); +pub struct Benchmark<'w>(World, QueryState<(&'w Velocity, &'w mut Position)>); + +fn insert_if_bit_enabled(entity: &mut EntityWorldMut, i: u16) { + if i & 1 << B != 0 { + entity.insert(Data::(1.0)); + } +} + +impl<'w> Benchmark<'w> { + pub fn new(fragment: u16) -> Self { + ComputeTaskPool::get_or_init(TaskPool::default); + + let mut world = World::new(); + + let iter = world.spawn_batch( + std::iter::repeat(( + Transform(Mat4::from_scale(Vec3::ONE)), + Position(Vec3::X), + Rotation(Vec3::X), + Velocity(Vec3::X), + )) + .take(100_000), + ); + let entities = iter.into_iter().collect::>(); + for i in 0..fragment { + let mut e = world.entity_mut(entities[i as usize]); + insert_if_bit_enabled::<0>(&mut e, i); + insert_if_bit_enabled::<1>(&mut e, i); + insert_if_bit_enabled::<2>(&mut e, i); + insert_if_bit_enabled::<3>(&mut e, i); + insert_if_bit_enabled::<4>(&mut e, i); + insert_if_bit_enabled::<5>(&mut e, i); + insert_if_bit_enabled::<6>(&mut e, i); + insert_if_bit_enabled::<7>(&mut e, i); + insert_if_bit_enabled::<8>(&mut e, i); + insert_if_bit_enabled::<9>(&mut e, i); + insert_if_bit_enabled::<10>(&mut e, i); + insert_if_bit_enabled::<11>(&mut e, i); + insert_if_bit_enabled::<12>(&mut e, i); + insert_if_bit_enabled::<13>(&mut e, i); + insert_if_bit_enabled::<14>(&mut e, i); + insert_if_bit_enabled::<15>(&mut e, i); + } + + let query = world.query::<(&Velocity, &mut Position)>(); + Self(world, query) + } + + #[inline(never)] + pub fn run(&mut self) { + self.1 + .par_iter_mut(&mut self.0) + .for_each(|(v, mut p)| p.0 += v.0); + } +} From 81a9b3df843250b530c35f4a4514f82ef872983f Mon Sep 17 00:00:00 2001 From: re0312 Date: Tue, 2 Apr 2024 22:50:21 +0800 Subject: [PATCH 06/10] new --- benches/benches/bevy_ecs/iteration/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/benches/bevy_ecs/iteration/mod.rs b/benches/benches/bevy_ecs/iteration/mod.rs index 98fcf69d1c823..790884335021e 100644 --- a/benches/benches/bevy_ecs/iteration/mod.rs +++ b/benches/benches/bevy_ecs/iteration/mod.rs @@ -124,7 +124,7 @@ fn par_iter_simple(c: &mut Criterion) { let mut group = c.benchmark_group("par_iter_simple"); group.warm_up_time(std::time::Duration::from_millis(500)); group.measurement_time(std::time::Duration::from_secs(4)); - for f in [0, 10, 500, 10000] { + for f in [0, 10, 100, 1000] { group.bench_function(format!("with_{}_fragment", f), |b| { let mut bench = par_iter_simple::Benchmark::new(f); b.iter(move || bench.run()); From a0785faaf31301339ac2ec7713fe26025f5bf731 Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 3 Apr 2024 07:25:24 +0800 Subject: [PATCH 07/10] clean up --- crates/bevy_ecs/src/query/state.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 77790e139784d..1ba53a256f6e6 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1466,17 +1466,15 @@ impl QueryState { // immediately submit large storage if count >= batch_size { submit_single(count, *storage_id); + continue; } // merge small storage - else { - batch_queue.push(*storage_id); - queue_entity_count += count; - } + batch_queue.push(*storage_id); + queue_entity_count += count; // submit batch_queue if queue_entity_count >= batch_size { submit_batch_queue(&mut batch_queue); - continue; } } submit_batch_queue(&mut batch_queue); From 260fa533998be5e7436c0124f9f281345649ea99 Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 3 Apr 2024 10:02:20 +0800 Subject: [PATCH 08/10] arrayvec --- crates/bevy_ecs/Cargo.toml | 1 + crates/bevy_ecs/src/query/state.rs | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index 6e54aaf7359a2..f6b98efa45c0c 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -30,6 +30,7 @@ rustc-hash = "1.1" serde = "1" thiserror = "1.0" nonmax = "0.5" +arrayvec = "0.7.4" [dev-dependencies] rand = "0.8" diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 1ba53a256f6e6..c595bf207748d 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -10,6 +10,7 @@ use crate::{ storage::{SparseSetIndex, TableId}, world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId}, }; +use arrayvec::ArrayVec; use bevy_utils::tracing::warn; #[cfg(feature = "trace")] use bevy_utils::tracing::Span; @@ -1392,11 +1393,11 @@ impl QueryState { // SAFETY: We only access table data that has been registered in `self.archetype_component_access`. let tables = unsafe { &world.storages().tables }; let archetypes = world.archetypes(); - let mut batch_queue = vec![]; + let mut batch_queue = ArrayVec::new(); let mut queue_entity_count = 0; // submit a list of storages which smaller than batch_size as single task - let submit_batch_queue = |queue: &mut Vec| { + let submit_batch_queue = |queue: &mut ArrayVec| { if queue.is_empty() { return; } @@ -1473,7 +1474,7 @@ impl QueryState { queue_entity_count += count; // submit batch_queue - if queue_entity_count >= batch_size { + if queue_entity_count >= batch_size || batch_queue.is_full() { submit_batch_queue(&mut batch_queue); } } From 38e9b0cf944bdd1550a9b68d8e67a4e380b0c29e Mon Sep 17 00:00:00 2001 From: re0312 Date: Wed, 3 Apr 2024 10:30:31 +0800 Subject: [PATCH 09/10] dep --- crates/bevy_ecs/Cargo.toml | 4 ++-- crates/bevy_ecs/src/query/state.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index f6b98efa45c0c..ab5314cfd2c15 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -11,7 +11,7 @@ categories = ["game-engines", "data-structures"] [features] trace = [] -multi-threaded = ["bevy_tasks/multi-threaded"] +multi-threaded = ["bevy_tasks/multi-threaded", "arrayvec"] bevy_debug_stepping = [] default = ["bevy_reflect", "bevy_debug_stepping"] @@ -30,7 +30,7 @@ rustc-hash = "1.1" serde = "1" thiserror = "1.0" nonmax = "0.5" -arrayvec = "0.7.4" +arrayvec = { version = "0.7.4", optional = true } [dev-dependencies] rand = "0.8" diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index c595bf207748d..673fbf48176fd 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -10,7 +10,6 @@ use crate::{ storage::{SparseSetIndex, TableId}, world::{unsafe_world_cell::UnsafeWorldCell, World, WorldId}, }; -use arrayvec::ArrayVec; use bevy_utils::tracing::warn; #[cfg(feature = "trace")] use bevy_utils::tracing::Span; @@ -1388,6 +1387,7 @@ impl QueryState { ) { // NOTE: If you are changing query iteration code, remember to update the following places, where relevant: // QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual + use arrayvec::ArrayVec; bevy_tasks::ComputeTaskPool::get().scope(|scope| { // SAFETY: We only access table data that has been registered in `self.archetype_component_access`. From 5f9a80a4ac9f3368a6646651ced20e2de7d7668e Mon Sep 17 00:00:00 2001 From: re0312 Date: Thu, 4 Apr 2024 02:43:28 +0800 Subject: [PATCH 10/10] reset --- crates/bevy_ecs/src/query/state.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/bevy_ecs/src/query/state.rs b/crates/bevy_ecs/src/query/state.rs index 673fbf48176fd..84fd805fa6ca9 100644 --- a/crates/bevy_ecs/src/query/state.rs +++ b/crates/bevy_ecs/src/query/state.rs @@ -1476,6 +1476,7 @@ impl QueryState { // submit batch_queue if queue_entity_count >= batch_size || batch_queue.is_full() { submit_batch_queue(&mut batch_queue); + queue_entity_count = 0; } } submit_batch_queue(&mut batch_queue);