Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster small table/archetype into single Task in parallel iteration #12846

Merged
merged 10 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions benches/benches/bevy_ecs/iteration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod iter_simple_sparse_set;
mod iter_simple_system;
mod iter_simple_wide;
mod iter_simple_wide_sparse_set;
mod par_iter_simple;

use heavy_compute::*;

Expand All @@ -27,6 +28,7 @@ criterion_group!(
iter_frag_sparse,
iter_simple,
heavy_compute,
par_iter_simple,
);

fn iter_simple(c: &mut Criterion) {
Expand Down Expand Up @@ -117,3 +119,15 @@ fn iter_frag_sparse(c: &mut Criterion) {
});
group.finish();
}

fn par_iter_simple(c: &mut Criterion) {
let mut group = c.benchmark_group("par_iter_simple");
group.warm_up_time(std::time::Duration::from_millis(500));
group.measurement_time(std::time::Duration::from_secs(4));
for f in [0, 10, 100, 1000] {
group.bench_function(format!("with_{}_fragment", f), |b| {
let mut bench = par_iter_simple::Benchmark::new(f);
b.iter(move || bench.run());
});
}
}
73 changes: 73 additions & 0 deletions benches/benches/bevy_ecs/iteration/par_iter_simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use bevy_ecs::prelude::*;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use glam::*;

#[derive(Component, Copy, Clone)]
struct Transform(Mat4);

#[derive(Component, Copy, Clone)]
struct Position(Vec3);

#[derive(Component, Copy, Clone)]
struct Rotation(Vec3);

#[derive(Component, Copy, Clone)]
struct Velocity(Vec3);

#[derive(Component, Copy, Clone, Default)]
struct Data<const X: u16>(f32);
pub struct Benchmark<'w>(World, QueryState<(&'w Velocity, &'w mut Position)>);

fn insert_if_bit_enabled<const B: u16>(entity: &mut EntityWorldMut, i: u16) {
if i & 1 << B != 0 {
entity.insert(Data::<B>(1.0));
}
}

impl<'w> Benchmark<'w> {
pub fn new(fragment: u16) -> Self {
ComputeTaskPool::get_or_init(TaskPool::default);

let mut world = World::new();

let iter = world.spawn_batch(
std::iter::repeat((
Transform(Mat4::from_scale(Vec3::ONE)),
Position(Vec3::X),
Rotation(Vec3::X),
Velocity(Vec3::X),
))
.take(100_000),
);
let entities = iter.into_iter().collect::<Vec<Entity>>();
for i in 0..fragment {
let mut e = world.entity_mut(entities[i as usize]);
insert_if_bit_enabled::<0>(&mut e, i);
insert_if_bit_enabled::<1>(&mut e, i);
insert_if_bit_enabled::<2>(&mut e, i);
insert_if_bit_enabled::<3>(&mut e, i);
insert_if_bit_enabled::<4>(&mut e, i);
insert_if_bit_enabled::<5>(&mut e, i);
insert_if_bit_enabled::<6>(&mut e, i);
insert_if_bit_enabled::<7>(&mut e, i);
insert_if_bit_enabled::<8>(&mut e, i);
insert_if_bit_enabled::<9>(&mut e, i);
insert_if_bit_enabled::<10>(&mut e, i);
insert_if_bit_enabled::<11>(&mut e, i);
insert_if_bit_enabled::<12>(&mut e, i);
insert_if_bit_enabled::<13>(&mut e, i);
insert_if_bit_enabled::<14>(&mut e, i);
insert_if_bit_enabled::<15>(&mut e, i);
}

let query = world.query::<(&Velocity, &mut Position)>();
Self(world, query)
}

#[inline(never)]
pub fn run(&mut self) {
self.1
.par_iter_mut(&mut self.0)
.for_each(|(v, mut p)| p.0 += v.0);
}
}
3 changes: 2 additions & 1 deletion crates/bevy_ecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ categories = ["game-engines", "data-structures"]

[features]
trace = []
multi-threaded = ["bevy_tasks/multi-threaded"]
multi-threaded = ["bevy_tasks/multi-threaded", "arrayvec"]
bevy_debug_stepping = []
default = ["bevy_reflect", "bevy_debug_stepping"]

Expand All @@ -30,6 +30,7 @@ rustc-hash = "1.1"
serde = "1"
thiserror = "1.0"
nonmax = "0.5"
arrayvec = { version = "0.7.4", optional = true }

[dev-dependencies]
rand = "0.8"
Expand Down
119 changes: 80 additions & 39 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1387,58 +1387,99 @@ impl<D: QueryData, F: QueryFilter> QueryState<D, F> {
) {
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryManyIter, QueryCombinationIter, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
use arrayvec::ArrayVec;

bevy_tasks::ComputeTaskPool::get().scope(|scope| {
// SAFETY: We only access table data that has been registered in `self.archetype_component_access`.
let tables = unsafe { &world.storages().tables };
let archetypes = world.archetypes();
for storage_id in &self.matched_storage_ids {
if D::IS_DENSE && F::IS_DENSE {
let table_id = storage_id.table_id;
let table = &tables[table_id];
if table.is_empty() {
continue;
let mut batch_queue = ArrayVec::new();
let mut queue_entity_count = 0;

// submit a list of storages which smaller than batch_size as single task
let submit_batch_queue = |queue: &mut ArrayVec<StorageId, 128>| {
if queue.is_empty() {
return;
}
let queue = std::mem::take(queue);
let mut func = func.clone();
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let mut iter = self.iter_unchecked_manual(world, last_run, this_run);
for storage_id in queue {
if D::IS_DENSE && F::IS_DENSE {
let id = storage_id.table_id;
let table = &world.storages().tables.get(id).debug_checked_unwrap();
iter.for_each_in_table_range(&mut func, table, 0..table.entity_count());
} else {
let id = storage_id.archetype_id;
let archetype = world.archetypes().get(id).debug_checked_unwrap();
iter.for_each_in_archetype_range(
&mut func,
archetype,
0..archetype.len(),
);
}
}
});
};

let mut offset = 0;
while offset < table.entity_count() {
let mut func = func.clone();
let len = batch_size.min(table.entity_count() - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let table =
&world.storages().tables.get(table_id).debug_checked_unwrap();
// submit single storage larger than batch_size
let submit_single = |count, storage_id: StorageId| {
for offset in (0..count).step_by(batch_size) {
let mut func = func.clone();
let len = batch_size.min(count - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
if D::IS_DENSE && F::IS_DENSE {
let id = storage_id.table_id;
let table = world.storages().tables.get(id).debug_checked_unwrap();
self.iter_unchecked_manual(world, last_run, this_run)
.for_each_in_table_range(&mut func, table, batch);
});
offset += batch_size;
}
} else {
let archetype_id = storage_id.archetype_id;
let archetype = &archetypes[archetype_id];
if archetype.is_empty() {
continue;
}

let mut offset = 0;
while offset < archetype.len() {
let mut func = func.clone();
let len = batch_size.min(archetype.len() - offset);
let batch = offset..offset + len;
scope.spawn(async move {
#[cfg(feature = "trace")]
let _span = self.par_iter_span.enter();
let archetype =
world.archetypes().get(archetype_id).debug_checked_unwrap();
} else {
let id = storage_id.archetype_id;
let archetype = world.archetypes().get(id).debug_checked_unwrap();
self.iter_unchecked_manual(world, last_run, this_run)
.for_each_in_archetype_range(&mut func, archetype, batch);
});
offset += batch_size;
}
}
});
}
};

let storage_entity_count = |storage_id: StorageId| -> usize {
if D::IS_DENSE && F::IS_DENSE {
tables[storage_id.table_id].entity_count()
} else {
archetypes[storage_id.archetype_id].len()
}
};

for storage_id in &self.matched_storage_ids {
let count = storage_entity_count(*storage_id);

// skip empty storage
if count == 0 {
continue;
}
// immediately submit large storage
if count >= batch_size {
submit_single(count, *storage_id);
continue;
}
// merge small storage
batch_queue.push(*storage_id);
queue_entity_count += count;

// submit batch_queue
if queue_entity_count >= batch_size || batch_queue.is_full() {
submit_batch_queue(&mut batch_queue);
alice-i-cecile marked this conversation as resolved.
Show resolved Hide resolved
queue_entity_count = 0;
}
}
submit_batch_queue(&mut batch_queue);
});
}

Expand Down