From 6bce9ce0d30f80e553ac60bf7b2cf40278d69f6b Mon Sep 17 00:00:00 2001 From: Shahak Shama Date: Mon, 9 Dec 2024 15:55:21 +0200 Subject: [PATCH] refactor(starknet_state_sync): move storage reader from StateSyncRunner to StateSync --- Cargo.lock | 2 +- crates/starknet_state_sync/src/lib.rs | 49 ++++++++++++------ crates/starknet_state_sync/src/runner/mod.rs | 52 ++------------------ 3 files changed, 37 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 490dd03e62..9c46ec04ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10561,9 +10561,9 @@ version = "0.0.0" dependencies = [ "axum", "hyper 0.14.30", + "infra_utils", "metrics 0.21.1", "metrics-exporter-prometheus", - "infra_utils", "papyrus_config", "pretty_assertions", "serde", diff --git a/crates/starknet_state_sync/src/lib.rs b/crates/starknet_state_sync/src/lib.rs index 598404ad97..83777158bf 100644 --- a/crates/starknet_state_sync/src/lib.rs +++ b/crates/starknet_state_sync/src/lib.rs @@ -2,25 +2,28 @@ pub mod config; pub mod runner; use async_trait::async_trait; -use futures::channel::{mpsc, oneshot}; -use futures::SinkExt; +use papyrus_storage::body::BodyStorageReader; +use papyrus_storage::state::StateStorageReader; +use papyrus_storage::StorageReader; +use starknet_api::block::BlockNumber; use starknet_sequencer_infra::component_definitions::ComponentRequestHandler; -use starknet_state_sync_types::communication::{StateSyncRequest, StateSyncResponse}; -use starknet_state_sync_types::errors::StateSyncError; +use starknet_state_sync_types::communication::{ + StateSyncRequest, + StateSyncResponse, + StateSyncResult, +}; +use starknet_state_sync_types::state_sync_types::SyncBlock; use crate::config::StateSyncConfig; use crate::runner::StateSyncRunner; -// TODO(shahak): consider adding to config -const BUFFER_SIZE: usize = 100000; - pub fn create_state_sync_and_runner(config: StateSyncConfig) -> (StateSync, StateSyncRunner) { - let (request_sender, request_receiver) = mpsc::channel(BUFFER_SIZE); - (StateSync { request_sender }, StateSyncRunner::new(config, request_receiver)) + let (state_sync_runner, storage_reader) = StateSyncRunner::new(config); + (StateSync { storage_reader }, state_sync_runner) } pub struct StateSync { - pub request_sender: mpsc::Sender<(StateSyncRequest, oneshot::Sender)>, + storage_reader: StorageReader, } // TODO(shahak): Have StateSyncRunner call StateSync instead of the opposite once we stop supporting @@ -28,12 +31,26 @@ pub struct StateSync { #[async_trait] impl ComponentRequestHandler for StateSync { async fn handle_request(&mut self, request: StateSyncRequest) -> StateSyncResponse { - let (response_sender, response_receiver) = oneshot::channel(); - if self.request_sender.send((request, response_sender)).await.is_err() { - return StateSyncResponse::GetBlock(Err(StateSyncError::RunnerCommunicationError)); + match request { + StateSyncRequest::GetBlock(block_number) => { + StateSyncResponse::GetBlock(self.get_block(block_number)) + } + } + } +} + +impl StateSync { + fn get_block(&self, block_number: BlockNumber) -> StateSyncResult> { + let txn = self.storage_reader.begin_ro_txn()?; + if let Some(block_transaction_hashes) = txn.get_block_transaction_hashes(block_number)? { + if let Some(thin_state_diff) = txn.get_state_diff(block_number)? { + return Ok(Some(SyncBlock { + state_diff: thin_state_diff, + transaction_hashes: block_transaction_hashes, + })); + } } - response_receiver.await.unwrap_or_else(|_| { - StateSyncResponse::GetBlock(Err(StateSyncError::RunnerCommunicationError)) - }) + + Ok(None) } } diff --git a/crates/starknet_state_sync/src/runner/mod.rs b/crates/starknet_state_sync/src/runner/mod.rs index 35a289b09c..f8f140042c 100644 --- a/crates/starknet_state_sync/src/runner/mod.rs +++ b/crates/starknet_state_sync/src/runner/mod.rs @@ -2,33 +2,19 @@ mod test; use async_trait::async_trait; -use futures::channel::{mpsc, oneshot}; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use papyrus_network::network_manager::{self, NetworkError}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; -use papyrus_storage::body::BodyStorageReader; -use papyrus_storage::state::StateStorageReader; use papyrus_storage::{open_storage, StorageReader}; -use starknet_api::block::BlockNumber; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; -use starknet_state_sync_types::communication::{ - StateSyncRequest, - StateSyncResponse, - StateSyncResult, -}; -use starknet_state_sync_types::state_sync_types::SyncBlock; use crate::config::StateSyncConfig; pub struct StateSyncRunner { - #[allow(dead_code)] - request_receiver: mpsc::Receiver<(StateSyncRequest, oneshot::Sender)>, - #[allow(dead_code)] - storage_reader: StorageReader, network_future: BoxFuture<'static, Result<(), NetworkError>>, // TODO: change client and server to requester and responder respectively p2p_sync_client_future: BoxFuture<'static, Result<(), P2PSyncClientError>>, @@ -47,25 +33,13 @@ impl ComponentStarter for StateSyncRunner { () = &mut self.p2p_sync_server_future => { return Err(ComponentError::InternalComponentError); } - Some((request, sender)) = self.request_receiver.next() => { - let response = match request { - StateSyncRequest::GetBlock(block_number) => { - StateSyncResponse::GetBlock(self.get_block(block_number)) - } - }; - - sender.send(response).map_err(|_| ComponentError::InternalComponentError)? - } } } } } impl StateSyncRunner { - pub fn new( - config: StateSyncConfig, - request_receiver: mpsc::Receiver<(StateSyncRequest, oneshot::Sender)>, - ) -> Self { + pub fn new(config: StateSyncConfig) -> (Self, StorageReader) { let (storage_reader, storage_writer) = open_storage(config.storage_config).expect("StateSyncRunner failed opening storage"); @@ -119,27 +93,7 @@ impl StateSyncRunner { let p2p_sync_server_future = p2p_sync_server.run().boxed(); // TODO(shahak): add rpc. - Self { - request_receiver, - storage_reader, - network_future, - p2p_sync_client_future, - p2p_sync_server_future, - } - } - - fn get_block(&self, block_number: BlockNumber) -> StateSyncResult> { - let txn = self.storage_reader.begin_ro_txn()?; - if let Some(block_transaction_hashes) = txn.get_block_transaction_hashes(block_number)? { - if let Some(thin_state_diff) = txn.get_state_diff(block_number)? { - return Ok(Some(SyncBlock { - state_diff: thin_state_diff, - transaction_hashes: block_transaction_hashes, - })); - } - } - - Ok(None) + (Self { network_future, p2p_sync_client_future, p2p_sync_server_future }, storage_reader) } }