Skip to content

Commit

Permalink
feat(l2): sync_parallelism in warp update now set based on number o…
Browse files Browse the repository at this point in the history
…f threads
  • Loading branch information
Trantorian1 committed Nov 27, 2024
1 parent 5e9ff44 commit 1e23f46
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 7 deletions.
2 changes: 1 addition & 1 deletion crates/client/eth/src/l1_messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ mod l1_messaging_tests {
use mc_db::DatabaseService;
use mc_mempool::{GasPriceProvider, L1DataProvider, Mempool};
use mp_chain_config::ChainConfig;
use mp_utils::service::{MadaraCapability, ServiceContext};
use mp_utils::service::ServiceContext;
use rstest::*;
use starknet_api::core::Nonce;
use starknet_types_core::felt::Felt;
Expand Down
10 changes: 6 additions & 4 deletions crates/client/sync/src/fetch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use std::{num::NonZeroUsize, sync::Arc};

use futures::prelude::*;
use mc_block_import::UnverifiedFullBlock;
Expand All @@ -22,7 +22,7 @@ pub struct L2FetchConfig {
pub sync_polling_interval: Option<Duration>,
pub n_blocks_to_sync: Option<u64>,
pub stop_on_sync: bool,
pub sync_parallelism: u8,
pub sync_parallelism: usize,
pub warp_update: bool,
pub warp_update_port_rpc: u16,
pub warp_update_port_fgw: u16,
Expand Down Expand Up @@ -58,7 +58,9 @@ pub async fn l2_fetch_task(
));

let save = config.sync_parallelism;
config.sync_parallelism = 50;
let available_parallelism = std::thread::available_parallelism()
.unwrap_or(NonZeroUsize::new(1usize).expect("1 should always be in usize bound"));
config.sync_parallelism = Into::<usize>::into(available_parallelism) * 2;

let next_block = match sync_blocks(backend.as_ref(), &provider, &ctx, &config).await? {
SyncStatus::Full(next_block) => next_block,
Expand Down Expand Up @@ -150,7 +152,7 @@ async fn sync_blocks(

// Have `sync_parallelism` fetches in parallel at once, using futures Buffered
let mut next_block = *first_block;
let mut fetch_stream = stream::iter(fetch_stream).buffered(*sync_parallelism as usize);
let mut fetch_stream = stream::iter(fetch_stream).buffered(*sync_parallelism);

loop {
let Some((block_n, val)) = channel_wait_or_graceful_shutdown(fetch_stream.next(), ctx).await else {
Expand Down
2 changes: 1 addition & 1 deletion crates/client/sync/src/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ pub async fn sync(
sync_polling_interval: config.sync_polling_interval,
n_blocks_to_sync: config.n_blocks_to_sync,
stop_on_sync: config.stop_on_sync,
sync_parallelism: config.sync_parallelism,
sync_parallelism: config.sync_parallelism as usize,
warp_update: config.warp_update,
warp_update_port_rpc: config.warp_update_port_rpc,
warp_update_port_fgw: config.warp_update_port_fgw,
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn graceful_shutdown(ctx: &ServiceContext) {
pub async fn wait_or_graceful_shutdown<T>(future: impl Future<Output = T>, ctx: &ServiceContext) -> Option<T> {
tokio::select! {
_ = graceful_shutdown_inner(ctx) => { None },
res = ctx.run_until_cancelled(future) => { res },
res = future => { Some(res) },
}
}

Expand Down

0 comments on commit 1e23f46

Please sign in to comment.