From 4e12b87f4a36f3482eed7debe4f3f4e54b3618a4 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Tue, 10 Dec 2024 20:53:57 +0000 Subject: [PATCH] par_collect --- .../src/transactions_with_output.rs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/execution/executor-types/src/transactions_with_output.rs b/execution/executor-types/src/transactions_with_output.rs index 2fb96a9997f15..6c1c9fddbf4db 100644 --- a/execution/executor-types/src/transactions_with_output.rs +++ b/execution/executor-types/src/transactions_with_output.rs @@ -13,6 +13,7 @@ use aptos_types::{ write_set::WriteSet, }; use itertools::{izip, Itertools}; +use rayon::prelude::*; use std::{ fmt::{Debug, Formatter}, ops::Deref, @@ -249,18 +250,21 @@ impl<'kv> StateUpdateRefs<'kv> { fn collect_some_updates( first_version: Version, num_versions: usize, - shard_iters: &mut [impl Iterator)> + Clone], + shard_iters: &mut [impl Iterator)> + Clone + Send], ) -> BatchedStateUpdateRefs<'kv> { let mut ret = BatchedStateUpdateRefs::new_empty(first_version, num_versions); // exclusive let end_version = first_version + num_versions as Version; - izip!(shard_iters, &mut ret.shards).for_each(|(shard_iter, dedupped)| { - dedupped.extend( - shard_iter - // n.b. take_while_ref so that in the next step we can process the rest of the entries from the iters. - .take_while_ref(|(_k, u)| u.version < end_version), - ) - }); + shard_iters + .par_iter_mut() + .zip_eq(ret.shards.par_iter_mut()) + .for_each(|(shard_iter, dedupped)| { + dedupped.extend( + shard_iter + // n.b. take_while_ref so that in the next step we can process the rest of the entries from the iters. + .take_while_ref(|(_k, u)| u.version < end_version), + ) + }); ret } }