From 0fe544d0f3853043c599f14d7cf4b53a9050de33 Mon Sep 17 00:00:00 2001 From: Alex Metelli Date: Tue, 14 Jan 2025 04:37:26 +0800 Subject: [PATCH] fix: formatting and mutability issues - Remove extra space in katana block-time command - Add mut keyword to handle_events method to allow MMR updates --- crates/client/src/client.rs | 154 +++++++++++++++++++++++------------- docker-compose.yml | 20 ++--- 2 files changed, 111 insertions(+), 63 deletions(-) diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index dc155be..a9e40a9 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -38,7 +38,8 @@ pub struct LightClient { l2_store_addr: String, verifier_addr: String, chain_id: u64, - latest_processed_block: u64, + latest_processed_events_block: u64, + latest_processed_mmr_block: u64, starknet_private_key: String, starknet_account_address: String, polling_interval: Duration, @@ -58,6 +59,7 @@ impl LightClient { error!("Polling interval must be greater than zero"); return Err(LightClientError::PollingIntervalError); } + // Load environment variables let starknet_rpc_url = get_env_var("STARKNET_RPC_URL")?; let l2_store_addr = get_env_var("FOSSIL_STORE")?; @@ -65,6 +67,7 @@ impl LightClient { let starknet_private_key = get_env_var("STARKNET_PRIVATE_KEY")?; let starknet_account_address = get_env_var("STARKNET_ACCOUNT_ADDRESS")?; let chain_id = get_env_var("CHAIN_ID")?.parse::()?; + // Initialize providers let starknet_provider = StarknetProvider::new(&starknet_rpc_url)?; @@ -82,7 +85,8 @@ impl LightClient { l2_store_addr, verifier_addr, chain_id, - latest_processed_block: start_block, + latest_processed_events_block: start_block.saturating_sub(1), + latest_processed_mmr_block: start_block.saturating_sub(1), starknet_private_key, starknet_account_address, polling_interval: Duration::from_secs(polling_interval), @@ -100,6 +104,8 @@ impl LightClient { info!( polling_interval_secs = self.polling_interval.as_secs(), + start_block = self.latest_processed_events_block + 1, + blocks_per_run = self.blocks_per_run, "Light client started" ); @@ -125,19 +131,55 @@ impl LightClient { number => number.saturating_sub(10), // Stay 10 blocks behind to handle reorgs }; - // Poll for new events, starting from the block after the last processed block - let to_block = if self.blocks_per_run > 0 { - BlockId::Number(std::cmp::min( - self.latest_processed_block + self.blocks_per_run, + info!( + latest_block, + last_processed_events = self.latest_processed_events_block, + last_processed_mmr = self.latest_processed_mmr_block, + "Checking for new events" + ); + + // Don't process if we're already caught up with events + if self.latest_processed_events_block >= latest_block { + info!( latest_block, - )) + last_processed_events = self.latest_processed_events_block, + "Already up to date with latest events" + ); + return Ok(()); + } + + // Calculate the to_block based on blocks_per_run + let to_block = if self.blocks_per_run > 0 { + std::cmp::min( + self.latest_processed_events_block + self.blocks_per_run, + latest_block + ) } else { - BlockId::Number(latest_block) + latest_block }; + let from_block = self.latest_processed_events_block + 1; + + // Add validation to prevent block number regression + if from_block > to_block { + error!( + from_block, + to_block, + "Invalid block range: from_block is greater than to_block" + ); + return Ok(()); + } + + info!( + from_block, + to_block, + blocks_to_process = to_block - from_block + 1, + "Processing block range for events" + ); + let event_filter = EventFilter { - from_block: Some(BlockId::Number(self.latest_processed_block + 1)), - to_block: Some(to_block), + from_block: Some(BlockId::Number(from_block)), + to_block: Some(BlockId::Number(to_block)), address: Some(Felt::from_hex(&self.l2_store_addr)?), keys: Some(vec![vec![selector!("LatestBlockhashFromL1Stored")]]), }; @@ -148,46 +190,31 @@ impl LightClient { .get_events(event_filter, None, 1) .await?; + info!( + from_block, + to_block, + event_count = events.events.len(), + "Retrieved events from Starknet" + ); + + // Update the latest processed events block + let old_processed_block = self.latest_processed_events_block; + self.latest_processed_events_block = to_block; + + info!( + old_processed = old_processed_block, + new_processed = self.latest_processed_events_block, + blocks_advanced = self.latest_processed_events_block - old_processed_block, + "Updated processed events block" + ); + if !events.events.is_empty() { info!( event_count = events.events.len(), - latest_block = self.latest_processed_block, - "New events processed" + "Processing new events" ); - // Update the latest processed block to the latest block from the new events - let new_latest_block = events - .events - .last() - .and_then(|event| event.block_number) - .unwrap_or(self.latest_processed_block); - - // Check if we've reached the block limit for this run - if self.blocks_per_run > 0 - && new_latest_block > self.latest_processed_block + self.blocks_per_run - { - info!( - "Reached block limit for this run. Stopping at block {}", - self.latest_processed_block + self.blocks_per_run - ); - return Ok(()); - } - - // Invariant check: new_latest_block should be greater or equal to the current - if new_latest_block < self.latest_processed_block { - error!( - "New latest_processed_block ({}) is less than the current ({})", - new_latest_block, self.latest_processed_block - ); - return Err(LightClientError::StateError( - self.latest_processed_block, - new_latest_block, - )); - } - - self.latest_processed_block = new_latest_block; - - // Process the events + // Process the events and update MMR self.handle_events().await?; } @@ -196,7 +223,7 @@ impl LightClient { /// Handles the events by updating the MMR and verifying proofs. #[instrument(skip(self))] - pub async fn handle_events(&self) -> Result<(), LightClientError> { + pub async fn handle_events(&mut self) -> Result<(), LightClientError> { // Fetch the latest stored blockhash from L1 let latest_relayed_block = self .starknet_provider @@ -219,23 +246,35 @@ impl LightClient { /// Updates the MMR and verifies the proof on-chain. #[instrument(skip(self))] pub async fn update_mmr( - &self, + &mut self, latest_mmr_block: u64, latest_relayed_block: u64, ) -> Result<(), LightClientError> { + info!( + latest_mmr_block, + latest_relayed_block, + current_processed_mmr = self.latest_processed_mmr_block, + "Starting MMR update" + ); + + // If MMR is already up to date with the relayed block, nothing to do if latest_mmr_block >= latest_relayed_block { - warn!( + info!( latest_mmr_block, latest_relayed_block, - "Latest MMR block is greater than the latest relayed block, skipping proof verification" + "MMR already up to date with latest relayed block" ); - return Err(LightClientError::StateError( - latest_mmr_block, - latest_relayed_block, - )); + return Ok(()); } - info!("Starting proof verification..."); + info!( + from_block = latest_mmr_block + 1, + to_block = latest_relayed_block, + batch_size = self.batch_size, + "Starting proof verification" + ); + + // Update MMR publisher::prove_mmr_update( &self.starknet_provider.rpc_url().to_string(), self.chain_id, @@ -249,6 +288,13 @@ impl LightClient { false, ) .await?; + + // Update our tracking of the latest processed MMR block + self.latest_processed_mmr_block = latest_relayed_block; + + info!( + "Proof verification completed successfully" + ); Ok(()) } } diff --git a/docker-compose.yml b/docker-compose.yml index 963e245..62225bf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,15 +37,17 @@ services: - "5050:5050" volumes: - ./config:/app/config - command: - - katana - - --messaging - - /app/config/anvil.messaging.docker.json - - --dev - - --dev.no-fee - - --dev.no-account-validation - - --http.addr - - 0.0.0.0 + command: + - "katana" + - "--messaging" + - "/app/config/anvil.messaging.docker.json" + - "--dev" + - "--dev.no-fee" + - "--dev.no-account-validation" + - "--http.addr" + - "0.0.0.0" + - "--block-time" + - "10000" depends_on: anvil: condition: service_healthy