diff --git a/Cargo.lock b/Cargo.lock index 849e52446a..cff774fa61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7575,6 +7575,7 @@ dependencies = [ "serde", "starknet-types-core", "starknet_api", + "starknet_state_sync_types", "static_assertions", "thiserror", "tokio", diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index e26b379759..6cf1b19356 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::StreamExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; use papyrus_common::pending_classes::PendingClasses; @@ -304,6 +305,7 @@ async fn spawn_sync_client( storage_reader, storage_writer, p2p_sync_client_channels, + futures::stream::pending().boxed(), ); tokio::spawn(async move { Ok(p2p_sync.run().await?) }) } diff --git a/crates/papyrus_p2p_sync/Cargo.toml b/crates/papyrus_p2p_sync/Cargo.toml index 653fbd259b..0121d5cf7e 100644 --- a/crates/papyrus_p2p_sync/Cargo.toml +++ b/crates/papyrus_p2p_sync/Cargo.toml @@ -19,9 +19,12 @@ papyrus_network.workspace = true papyrus_proc_macros.workspace = true papyrus_protobuf.workspace = true papyrus_storage.workspace = true +papyrus_test_utils.workspace = true rand.workspace = true +rand_chacha.workspace = true serde.workspace = true starknet_api.workspace = true +starknet_state_sync_types.workspace = true starknet-types-core.workspace = true thiserror.workspace = true tokio.workspace = true @@ -34,8 +37,6 @@ lazy_static.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_protobuf = { workspace = true, features = ["testing"] } papyrus_storage = { workspace = true, features = ["testing"] } -papyrus_test_utils.workspace = true -rand_chacha.workspace = true static_assertions.workspace = true tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 01e13465f7..25fbc67400 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -18,8 +18,9 @@ use std::collections::BTreeMap; use std::time::Duration; use class::ClassStreamBuilder; -use futures::channel::mpsc::SendError; -use futures::Stream; +use futures::channel::mpsc::{Receiver, SendError, Sender}; +use futures::stream::BoxStream; +use futures::{SinkExt as _, Stream}; use header::HeaderStreamBuilder; use papyrus_common::pending_classes::ApiContractClass; use papyrus_config::converters::deserialize_milliseconds_to_duration; @@ -36,15 +37,27 @@ use papyrus_protobuf::sync::{ TransactionQuery, }; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; +use papyrus_test_utils::{get_rng, GetTestInstance}; use serde::{Deserialize, Serialize}; -use starknet_api::block::BlockNumber; +use starknet_api::block::{ + BlockBody, + BlockHash, + BlockHeader, + BlockHeaderWithoutHash, + BlockNumber, + BlockSignature, +}; use starknet_api::core::ClassHash; -use starknet_api::transaction::FullTransaction; +use starknet_api::hash::StarkHash; +use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff}; +use starknet_api::transaction::{FullTransaction, Transaction, TransactionOutput}; +use starknet_state_sync_types::state_sync_types::SyncBlock; use state_diff::StateDiffStreamBuilder; use stream_builder::{DataStreamBuilder, DataStreamResult}; use tokio_stream::StreamExt; use tracing::instrument; use transaction::TransactionStreamFactory; + const STEP: u64 = 1; const ALLOWED_SIGNATURES_LENGTH: usize = 1; @@ -176,15 +189,16 @@ impl P2PSyncClientChannels { ) -> Self { Self { header_sender, state_diff_sender, transaction_sender, class_sender } } - pub(crate) fn create_stream( + fn create_stream( self, storage_reader: StorageReader, config: P2PSyncClientConfig, + internal_blocks_receivers: InternalBlocksReceivers, ) -> impl Stream + Send + 'static { let header_stream = HeaderStreamBuilder::create_stream( self.header_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.header_receiver), config.wait_period_for_new_data, config.num_headers_per_query, config.stop_sync_at_block_number, @@ -193,7 +207,7 @@ impl P2PSyncClientChannels { let state_diff_stream = StateDiffStreamBuilder::create_stream( self.state_diff_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.state_diff_receiver), config.wait_period_for_new_data, config.num_block_state_diffs_per_query, config.stop_sync_at_block_number, @@ -202,7 +216,7 @@ impl P2PSyncClientChannels { let transaction_stream = TransactionStreamFactory::create_stream( self.transaction_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.transaction_receiver), config.wait_period_for_new_data, config.num_block_transactions_per_query, config.stop_sync_at_block_number, @@ -226,6 +240,7 @@ pub struct P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, + internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, } impl P2PSyncClient { @@ -234,18 +249,128 @@ impl P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, + internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, ) -> Self { - Self { config, storage_reader, storage_writer, p2p_sync_channels } + Self { config, storage_reader, storage_writer, p2p_sync_channels, internal_blocks_receiver } } #[instrument(skip(self), level = "debug", err)] - pub async fn run(mut self) -> Result<(), P2PSyncClientError> { - let mut data_stream = - self.p2p_sync_channels.create_stream(self.storage_reader.clone(), self.config); + pub async fn run(self) -> Result<(), P2PSyncClientError> { + let internal_blocks_channels = InternalBlocksChannels::new(); + let P2PSyncClient { + config, + storage_reader, + mut storage_writer, + p2p_sync_channels, + internal_blocks_receiver, + } = self; + Self::create_internal_blocks_sender_task( + internal_blocks_channels.senders, + internal_blocks_receiver, + ); + let mut data_stream = p2p_sync_channels.create_stream( + storage_reader, + config, + internal_blocks_channels.receivers, + ); loop { let data = data_stream.next().await.expect("Sync data stream should never end")?; - data.write_to_storage(&mut self.storage_writer)?; + data.write_to_storage(&mut storage_writer)?; + } + } + + fn create_internal_blocks_sender_task( + internal_blocks_senders: InternalBlocksSenders, + mut internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, + ) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + loop { + let (block_number, sync_block) = StreamExt::next(&mut internal_blocks_receiver) + .await + .expect("Internal blocks stream should never end"); + let InternalBlocksSenders { + header_sender, + state_diff_sender, + transaction_sender, + class_sender: _, + } = &mut internal_blocks_senders.clone(); + let block_header = SignedBlockHeader { + block_header: BlockHeader { + block_hash: BlockHash(StarkHash::from(block_number.0)), + block_header_without_hash: BlockHeaderWithoutHash { + block_number, + ..Default::default() + }, + state_diff_length: Some(sync_block.state_diff.len()), + n_transactions: sync_block.transaction_hashes.len(), + ..Default::default() + }, + signatures: vec![BlockSignature::default()], + }; + + header_sender.send((block_number, block_header)).await?; + let state_diff = sync_block.state_diff; + state_diff_sender.send((block_number, (state_diff, block_number))).await?; + let num_transactions = sync_block.transaction_hashes.len(); + let mut rng = get_rng(); + let block_body = BlockBody { + transaction_hashes: sync_block.transaction_hashes, + transaction_outputs: vec![ + TransactionOutput::get_test_instance(&mut rng); + num_transactions + ], + transactions: vec![Transaction::get_test_instance(&mut rng); num_transactions], + }; + transaction_sender.send((block_number, (block_body, block_number))).await?; + } + }) + } +} + +struct InternalBlocksReceivers { + header_receiver: Receiver<(BlockNumber, SignedBlockHeader)>, + state_diff_receiver: Receiver<(BlockNumber, (ThinStateDiff, BlockNumber))>, + transaction_receiver: Receiver<(BlockNumber, (BlockBody, BlockNumber))>, + #[allow(dead_code)] + class_receiver: + Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, +} + +#[derive(Clone)] +struct InternalBlocksSenders { + header_sender: Sender<(BlockNumber, SignedBlockHeader)>, + state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>, + transaction_sender: Sender<(BlockNumber, (BlockBody, BlockNumber))>, + #[allow(dead_code)] + class_sender: Sender<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, +} + +struct InternalBlocksChannels { + receivers: InternalBlocksReceivers, + senders: InternalBlocksSenders, +} + +impl InternalBlocksChannels { + pub fn new() -> Self { + let (header_sender, header_receiver) = futures::channel::mpsc::channel(100); + let (state_diff_sender, state_diff_receiver) = futures::channel::mpsc::channel(100); + let (transaction_sender, transaction_receiver) = futures::channel::mpsc::channel(100); + let (class_sender, class_receiver) = futures::channel::mpsc::channel(100); + + Self { + receivers: InternalBlocksReceivers { + header_receiver, + state_diff_receiver, + transaction_receiver, + class_receiver, + }, + senders: InternalBlocksSenders { + header_sender, + state_diff_sender, + transaction_sender, + class_sender, + }, } } } diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 36718a939d..257bcb2cd0 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -1,11 +1,12 @@ use std::cmp::min; +use std::collections::HashMap; use std::time::Duration; use async_stream::stream; use futures::channel::mpsc::Receiver; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use papyrus_network::network_manager::{ClientResponsesManager, SqmrClientSender}; use papyrus_protobuf::converters::ProtobufConversionError; use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query}; @@ -53,10 +54,35 @@ where fn get_start_block_number(storage_reader: &StorageReader) -> Result; + fn get_internal_blocks( + internal_blocks_received: &mut HashMap, + internal_block_receiver: &mut Option>, + current_block_number: BlockNumber, + ) -> Option { + if let Some(block) = internal_blocks_received.remove(¤t_block_number) { + return Some(block); + } + if let Some(internal_block_receiver) = internal_block_receiver { + while let Some((block_number, block_data)) = internal_block_receiver + .next() + .now_or_never() + .map(|now_or_never_res| now_or_never_res.expect("internal block receiver closed")) + { + if block_number >= current_block_number { + if block_number == current_block_number { + return Some(block_data); + } + internal_blocks_received.insert(block_number, block_data); + } + } + } + None + } + fn create_stream( mut sqmr_sender: SqmrClientSender>, storage_reader: StorageReader, - _internal_block_receiver: Option>, + mut internal_block_receiver: Option>, wait_period_for_new_data: Duration, num_blocks_per_query: u64, stop_sync_at_block_number: Option, @@ -67,6 +93,7 @@ where { stream! { let mut current_block_number = Self::get_start_block_number(&storage_reader)?; + let mut internal_blocks_received = HashMap::new(); 'send_query_and_parse_responses: loop { let limit = match Self::BLOCK_NUMBER_LIMIT { BlockNumberLimit::Unlimited => num_blocks_per_query, @@ -92,6 +119,16 @@ where current_block_number.0, end_block_number, ); + let end_block_number = min(end_block_number, stop_sync_at_block_number.map(|block_number| block_number.0).unwrap_or(end_block_number)); + while current_block_number.0 < end_block_number { + if let Some(block) = Self::get_internal_blocks(&mut internal_blocks_received, &mut internal_block_receiver, current_block_number) + { + yield Ok(Box::::from(Box::new(block))); + current_block_number = current_block_number.unchecked_next(); + } else { + break; + } + } // TODO(shahak): Use the report callback. let mut client_response_manager = sqmr_sender .send_new_query( @@ -101,9 +138,7 @@ where limit, step: STEP, }) - ) - .await?; - + ).await?; while current_block_number.0 < end_block_number { match Self::parse_data_for_block( &mut client_response_manager, current_block_number, &storage_reader @@ -134,12 +169,12 @@ where } info!("Added {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number); current_block_number = current_block_number.unchecked_next(); - if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { - current_block_number >= stop_sync_at_block_number - }) { - info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); - return; - } + } + if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| { + current_block_number >= stop_sync_at_block_number + }) { + info!("{:?} hit the stop sync block number.", Self::TYPE_DESCRIPTION); + return; } // Consume the None message signaling the end of the query. diff --git a/crates/papyrus_p2p_sync/src/client/test_utils.rs b/crates/papyrus_p2p_sync/src/client/test_utils.rs index 006ce510f9..038dd78191 100644 --- a/crates/papyrus_p2p_sync/src/client/test_utils.rs +++ b/crates/papyrus_p2p_sync/src/client/test_utils.rs @@ -112,6 +112,7 @@ pub fn setup() -> TestArgs { storage_reader.clone(), storage_writer, p2p_sync_channels, + futures::stream::pending().boxed(), ); TestArgs { p2p_sync, @@ -194,6 +195,7 @@ pub async fn run_test(max_query_lengths: HashMap, actions: Vec