Skip to content

Commit

Permalink
par_collect
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Dec 10, 2024
1 parent d59dead commit 4e12b87
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions execution/executor-types/src/transactions_with_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use aptos_types::{
write_set::WriteSet,
};
use itertools::{izip, Itertools};
use rayon::prelude::*;
use std::{
fmt::{Debug, Formatter},
ops::Deref,
Expand Down Expand Up @@ -249,18 +250,21 @@ impl<'kv> StateUpdateRefs<'kv> {
fn collect_some_updates(
first_version: Version,
num_versions: usize,
shard_iters: &mut [impl Iterator<Item = (&'kv StateKey, StateUpdateRef<'kv>)> + Clone],
shard_iters: &mut [impl Iterator<Item = (&'kv StateKey, StateUpdateRef<'kv>)> + Clone + Send],
) -> BatchedStateUpdateRefs<'kv> {
let mut ret = BatchedStateUpdateRefs::new_empty(first_version, num_versions);
// exclusive
let end_version = first_version + num_versions as Version;
izip!(shard_iters, &mut ret.shards).for_each(|(shard_iter, dedupped)| {
dedupped.extend(
shard_iter
// n.b. take_while_ref so that in the next step we can process the rest of the entries from the iters.
.take_while_ref(|(_k, u)| u.version < end_version),
)
});
shard_iters
.par_iter_mut()
.zip_eq(ret.shards.par_iter_mut())
.for_each(|(shard_iter, dedupped)| {
dedupped.extend(
shard_iter
// n.b. take_while_ref so that in the next step we can process the rest of the entries from the iters.
.take_while_ref(|(_k, u)| u.version < end_version),
)
});
ret
}
}

0 comments on commit 4e12b87

Please sign in to comment.