Skip to content

Commit

Permalink
service: add max_block et chunk_size in config file
Browse files Browse the repository at this point in the history
  • Loading branch information
ybensacq committed Sep 19, 2024
1 parent 23d8c3e commit 90078ec
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 20 deletions.
4 changes: 4 additions & 0 deletions crates/katana/core/src/service/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ pub struct MessagingConfig {
pub interval: u64,
/// The block on settlement chain from where Katana will start fetching messages.
pub from_block: u64,
/// The maximum number of blocks in gather messages
pub max_block: u64,
/// The size of events returned by get_events call
pub chunk_size: u64,
}

impl MessagingConfig {
Expand Down
35 changes: 19 additions & 16 deletions crates/katana/core/src/service/messaging/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub struct MessagingService<EF: ExecutorFactory> {
send_from_block: u64,
/// The message sending future.
msg_send_fut: Option<MessageSettlingFuture>,
/// The maximum block number to gather messages from.
max_block: u64,
/// The chunk size of messages to gather.
chunk_size: u64,
}

impl<EF: ExecutorFactory> MessagingService<EF> {
Expand All @@ -54,31 +58,28 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
Ok(Some(block)) => block,
Ok(None) => 0,
Err(_) => {
panic!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n",
)
anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n")
}
};
let send_from_block = match provider.get_send_from_block() {
Ok(Some(block)) => block,
Ok(None) => 0,
Err(_) => {
panic!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n",
)
anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n")
}
};

let max_block = config.max_block;
let chunk_size = config.chunk_size;

let interval = interval_from_seconds(config.interval);
let messenger = match MessengerMode::from_config(config).await {
Ok(m) => Arc::new(m),
Err(_) => {
panic!(
"Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n",
)
anyhow::bail!("Messaging could not be initialized.\nVerify that the messaging target node \
(anvil or other katana) is running.\n")
}
};

Expand All @@ -91,6 +92,8 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
send_from_block,
msg_gather_fut: None,
msg_send_fut: None,
max_block,
chunk_size,
})
}

Expand All @@ -99,10 +102,9 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
pool: TxPool,
backend: Arc<Backend<EF>>,
from_block: u64,
max_block: u64,
chunk_size: u64,
) -> MessengerResult<(u64, usize)> {
// 200 avoids any possible rejection from RPC with possibly lot's of messages.
// TODO: May this be configurable?
let max_block = 200;

match messenger.as_ref() {
MessengerMode::Ethereum(inner) => {
Expand All @@ -125,7 +127,7 @@ impl<EF: ExecutorFactory> MessagingService<EF> {
#[cfg(feature = "starknet-messaging")]
MessengerMode::Starknet(inner) => {
let (block_num, txs) =
inner.gather_messages(from_block, max_block, backend.chain_id).await?;
inner.gather_messages(from_block, max_block, chunk_size, backend.chain_id).await?;
let txs_count = txs.len();

txs.into_iter().for_each(|tx| {
Expand Down Expand Up @@ -210,6 +212,7 @@ impl<EF: ExecutorFactory> Stream for MessagingService<EF> {
pin.pool.clone(),
pin.backend.clone(),
pin.gather_from_block,
pin.max_block,
)));
}

Expand Down
8 changes: 4 additions & 4 deletions crates/katana/core/src/service/messaging/starknet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ impl StarknetMessaging {
&self,
from_block: BlockId,
to_block: BlockId,
) -> Result<Vec<EmittedEvent>> {
chunk_size: u64,
) -> Result<HashMap<u64, Vec<EmittedEvent>>> {
trace!(target: LOG_TARGET, from_block = ?from_block, to_block = ?to_block, "Fetching logs.");

let mut events = vec![];
Expand All @@ -77,8 +78,6 @@ impl StarknetMessaging {
keys: None,
};

// TODO: this chunk_size may also come from configuration?
let chunk_size = 200;
let mut continuation_token: Option<String> = None;

loop {
Expand Down Expand Up @@ -165,6 +164,7 @@ impl Messenger for StarknetMessaging {
&self,
from_block: u64,
max_blocks: u64,
chunk_size: u64,
chain_id: ChainId,
) -> MessengerResult<(u64, Vec<Self::MessageTransaction>)> {
let chain_latest_block: u64 = match self.provider.block_number().await {
Expand Down Expand Up @@ -193,7 +193,7 @@ impl Messenger for StarknetMessaging {

let mut l1_handler_txs: Vec<L1HandlerTx> = vec![];

self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block))
self.fetch_events(BlockId::Number(from_block), BlockId::Number(to_block), chunk_size)
.await
.map_err(|_| Error::SendError)
.unwrap()
Expand Down

0 comments on commit 90078ec

Please sign in to comment.