From 90078eccc5f736cadbabe6a167bbbac9b9e670a8 Mon Sep 17 00:00:00 2001 From: bexan Date: Tue, 3 Sep 2024 17:10:05 +0200 Subject: [PATCH] service: add max_block et chunk_size in config file --- .../katana/core/src/service/messaging/mod.rs | 4 +++ .../core/src/service/messaging/service.rs | 35 ++++++++++--------- .../core/src/service/messaging/starknet.rs | 8 ++--- 3 files changed, 27 insertions(+), 20 deletions(-) diff --git a/crates/katana/core/src/service/messaging/mod.rs b/crates/katana/core/src/service/messaging/mod.rs index cd064f44be..dac9901e6b 100644 --- a/crates/katana/core/src/service/messaging/mod.rs +++ b/crates/katana/core/src/service/messaging/mod.rs @@ -112,6 +112,10 @@ pub struct MessagingConfig { pub interval: u64, /// The block on settlement chain from where Katana will start fetching messages. pub from_block: u64, + /// The maximum number of blocks in gather messages + pub max_block: u64, + /// The size of events returned by get_events call + pub chunk_size: u64, } impl MessagingConfig { diff --git a/crates/katana/core/src/service/messaging/service.rs b/crates/katana/core/src/service/messaging/service.rs index 198fe9b2dc..5d664227ca 100644 --- a/crates/katana/core/src/service/messaging/service.rs +++ b/crates/katana/core/src/service/messaging/service.rs @@ -39,6 +39,10 @@ pub struct MessagingService { send_from_block: u64, /// The message sending future. msg_send_fut: Option, + /// The maximum block number to gather messages from. + max_block: u64, + /// The chunk size of messages to gather. + chunk_size: u64, } impl MessagingService { @@ -54,31 +58,28 @@ impl MessagingService { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { - panic!( - "Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n", - ) + anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n") } }; let send_from_block = match provider.get_send_from_block() { Ok(Some(block)) => block, Ok(None) => 0, Err(_) => { - panic!( - "Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n", - ) + anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n") } }; + let max_block = config.max_block; + let chunk_size = config.chunk_size; + let interval = interval_from_seconds(config.interval); let messenger = match MessengerMode::from_config(config).await { Ok(m) => Arc::new(m), Err(_) => { - panic!( - "Messaging could not be initialized.\nVerify that the messaging target node \ - (anvil or other katana) is running.\n", - ) + anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \ + (anvil or other katana) is running.\n") } }; @@ -91,6 +92,8 @@ impl MessagingService { send_from_block, msg_gather_fut: None, msg_send_fut: None, + max_block, + chunk_size, }) } @@ -99,10 +102,9 @@ impl MessagingService { pool: TxPool, backend: Arc>, from_block: u64, + max_block: u64, + chunk_size: u64, ) -> MessengerResult<(u64, usize)> { - // 200 avoids any possible rejection from RPC with possibly lot's of messages. - // TODO: May this be configurable? - let max_block = 200; match messenger.as_ref() { MessengerMode::Ethereum(inner) => { @@ -125,7 +127,7 @@ impl MessagingService { #[cfg(feature = "starknet-messaging")] MessengerMode::Starknet(inner) => { let (block_num, txs) = - inner.gather_messages(from_block, max_block, backend.chain_id).await?; + inner.gather_messages(from_block, max_block, chunk_size, backend.chain_id).await?; let txs_count = txs.len(); txs.into_iter().for_each(|tx| { @@ -210,6 +212,7 @@ impl Stream for MessagingService { pin.pool.clone(), pin.backend.clone(), pin.gather_from_block, + pin.max_block, ))); } diff --git a/crates/katana/core/src/service/messaging/starknet.rs b/crates/katana/core/src/service/messaging/starknet.rs index 7c3f2bb3db..91a3a8b334 100644 --- a/crates/katana/core/src/service/messaging/starknet.rs +++ b/crates/katana/core/src/service/messaging/starknet.rs @@ -64,7 +64,8 @@ impl StarknetMessaging { &self, from_block: BlockId, to_block: BlockId, - ) -> Result> { + chunk_size: u64, + ) -> Result>> { trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs."); let mut events = vec![]; @@ -77,8 +78,6 @@ impl StarknetMessaging { keys: None, }; - // TODO: this chunk_size may also come from configuration? - let chunk_size = 200; let mut continuation_token: Option = None; loop { @@ -165,6 +164,7 @@ impl Messenger for StarknetMessaging { &self, from_block: u64, max_blocks: u64, + chunk_size: u64, chain_id: ChainId, ) -> MessengerResult<(u64, Vec)> { let chain_latest_block: u64 = match self.provider.block_number().await { @@ -193,7 +193,7 @@ impl Messenger for StarknetMessaging { let mut l1_handler_txs: Vec = vec![]; - self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block)) + self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block), chunk_size) .await .map_err(|_| Error::SendError) .unwrap()