From 2e2ec9f435af0955761692c21faaf3b190681ef0 Mon Sep 17 00:00:00 2001 From: eitanm-starkware Date: Tue, 23 Jul 2024 10:24:44 +0300 Subject: [PATCH] refactor(sync): combine server channels --- .../src/network_manager/mod.rs | 82 +++---- .../src/network_manager/test.rs | 6 +- .../src/peer_manager/behaviour_impl.rs | 2 +- crates/papyrus_node/src/main.rs | 78 ++----- crates/papyrus_p2p_sync/src/client/header.rs | 15 +- crates/papyrus_p2p_sync/src/client/mod.rs | 18 +- .../papyrus_p2p_sync/src/client/state_diff.rs | 28 +-- .../src/client/state_diff_test.rs | 20 +- .../src/client/stream_builder.rs | 10 +- crates/papyrus_p2p_sync/src/server/mod.rs | 211 +++++++++--------- crates/papyrus_p2p_sync/src/server/test.rs | 135 +++++------ 11 files changed, 263 insertions(+), 342 deletions(-) diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index 2e723469c3..e924deeb81 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -9,7 +9,7 @@ use futures::channel::mpsc::{Receiver, SendError, Sender}; use futures::channel::oneshot; use futures::future::{ready, BoxFuture, Ready}; use futures::sink::With; -use futures::stream::{self, BoxStream, FuturesUnordered, Map, Stream}; +use futures::stream::{self, FuturesUnordered, Map, Stream}; use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt}; use libp2p::gossipsub::{SubscriptionError, TopicHash}; use libp2p::swarm::SwarmEvent; @@ -36,9 +36,8 @@ pub enum NetworkError { pub struct GenericNetworkManager { swarm: SwarmT, inbound_protocol_to_buffer_size: HashMap, - sqmr_inbound_response_receivers: - StreamHashMap>>, - sqmr_inbound_query_senders: HashMap)>>, + sqmr_inbound_response_receivers: StreamHashMap, + sqmr_inbound_payload_senders: HashMap, sqmr_outbound_payload_receivers: StreamHashMap, sqmr_outbound_response_senders: HashMap, @@ -79,7 +78,7 @@ impl GenericNetworkManager { swarm, inbound_protocol_to_buffer_size: HashMap::new(), sqmr_inbound_response_receivers: StreamHashMap::new(HashMap::new()), - sqmr_inbound_query_senders: HashMap::new(), + sqmr_inbound_payload_senders: HashMap::new(), sqmr_outbound_payload_receivers: StreamHashMap::new(HashMap::new()), sqmr_outbound_response_senders: HashMap::new(), sqmr_outbound_report_receivers: HashMap::new(), @@ -96,10 +95,11 @@ impl GenericNetworkManager { &mut self, protocol: String, buffer_size: usize, - ) -> SqmrQueryReceiver + ) -> SqmrServerReceiver where Bytes: From, Query: TryFrom, + Response: 'static, { let protocol = StreamProtocol::try_from_owned(protocol) .expect("Could not parse protocol into StreamProtocol."); @@ -109,19 +109,18 @@ impl GenericNetworkManager { { panic!("Protocol '{}' has already been registered as a server.", protocol); } - let (inbound_query_sender, inbound_query_receiver) = + let (inbound_payload_sender, inbound_payload_receiver) = futures::channel::mpsc::channel(buffer_size); - let result = self.sqmr_inbound_query_senders.insert(protocol.clone(), inbound_query_sender); - if result.is_some() { + let insert_result = self + .sqmr_inbound_payload_senders + .insert(protocol.clone(), Box::new(inbound_payload_sender)); + if insert_result.is_some() { panic!("Protocol '{}' has already been registered as a server.", protocol); } - inbound_query_receiver.map(|(query_bytes, response_bytes_sender)| { - ( - Query::try_from(query_bytes), - response_bytes_sender.with(|response| ready(Ok(Bytes::from(response)))), - ) - }) + let inbound_payload_receiver = inbound_payload_receiver + .map(|payload: SqmrServerPayloadForNetwork| SqmrServerPayload::from(payload)); + Box::new(inbound_payload_receiver) } /// TODO: Support multiple protocols where they're all different versions of the same protocol @@ -155,8 +154,6 @@ impl GenericNetworkManager { }; let payload_sender = payload_sender.with(payload_fn); - // let response_fn: ReceivedMessagesConverterFn = |x| Response::try_from(x); - Box::new(payload_sender) } @@ -310,7 +307,7 @@ impl GenericNetworkManager { sqmr::behaviour::ExternalEvent::NewInboundSession { query, inbound_session_id, - peer_id: _, + peer_id, protocol_name, } => { info!( @@ -321,30 +318,33 @@ impl GenericNetworkManager { papyrus_metrics::PAPYRUS_NUM_ACTIVE_INBOUND_SESSIONS, self.num_active_inbound_sessions as f64 ); + let (report_sender, report_receiver) = oneshot::channel::<()>(); + self.handle_new_report_receiver(peer_id, report_receiver); // TODO: consider returning error instead of panic. - let Some(query_sender) = self.sqmr_inbound_query_senders.get_mut(&protocol_name) + let Some(query_sender) = self.sqmr_inbound_payload_senders.get_mut(&protocol_name) else { return; }; - let (response_sender, response_receiver) = futures::channel::mpsc::channel( + let (responses_sender, responses_receiver) = futures::channel::mpsc::channel( *self.inbound_protocol_to_buffer_size.get(&protocol_name).expect( "A protocol is registered in NetworkManager but it has no buffer size.", ), ); + let responses_sender = Box::new(responses_sender); + self.sqmr_inbound_response_receivers.insert( + inbound_session_id, + Box::new(responses_receiver.map(Some).chain(stream::once(ready(None)))), + ); // TODO(shahak): Close the inbound session if the buffer is full. - server_send_now( + send_now( query_sender, - (query, response_sender), + SqmrServerPayloadForNetwork { query, report_sender, responses_sender }, format!( "Received an inbound query while the buffer is full. Dropping query for \ session {inbound_session_id:?}" ), ); - self.sqmr_inbound_response_receivers.insert( - inbound_session_id, - response_receiver.map(Some).chain(stream::once(ready(None))).boxed(), - ); } sqmr::behaviour::ExternalEvent::ReceivedResponse { outbound_session_id, @@ -360,7 +360,7 @@ impl GenericNetworkManager { self.sqmr_outbound_response_senders.get_mut(&outbound_session_id) { // TODO(shahak): Close the channel if the buffer is full. - network_send_now( + send_now( response_sender, response, format!( @@ -506,11 +506,7 @@ impl GenericNetworkManager { } } -fn network_send_now( - sender: &mut GenericSender, - item: Item, - buffer_full_message: String, -) { +fn send_now(sender: &mut GenericSender, item: Item, buffer_full_message: String) { pin_mut!(sender); match sender.as_mut().send(item).now_or_never() { Some(Ok(())) => {} @@ -523,17 +519,6 @@ fn network_send_now( } } -fn server_send_now(sender: &mut Sender, item: Item, buffer_full_message: String) { - if let Err(error) = sender.try_send(item) { - if error.is_disconnected() { - panic!("Receiver was dropped. This should never happen.") - } else if error.is_full() { - // TODO(shahak): Consider doing something else rather than dropping the message. - error!(buffer_full_message); - } - } -} - pub type NetworkManager = GenericNetworkManager>; impl NetworkManager { @@ -625,6 +610,7 @@ type GenericSender = Box + Unpin + Send>; type GenericReceiver = Box + Unpin + Send>; type ResponsesSenderForNetwork = GenericSender; +type ResponsesReceiverForNetwork = GenericReceiver>; type ResponsesSender = GenericSender>::Error>>; @@ -645,7 +631,6 @@ pub struct SqmrServerPayload, Response> { pub responses_sender: GenericSender, } -// TODO(shahak): Return this type in register_sqmr_protocol_server pub type SqmrServerReceiver = GenericReceiver>; struct SqmrClientPayloadForNetwork { @@ -696,15 +681,6 @@ where } } -pub type SqmrQueryReceiver = - Map)>, ReceivedQueryConverterFn>; - -type ReceivedQueryConverterFn = - fn( - (Bytes, Sender), - ) - -> (Result>::Error>, BroadcastSubscriberSender); - // TODO(eitan): improve naming of final channel types pub type BroadcastSubscriberSender = With< Sender, diff --git a/crates/papyrus_network/src/network_manager/test.rs b/crates/papyrus_network/src/network_manager/test.rs index 4d076e20e9..c900c4d7c4 100644 --- a/crates/papyrus_network/src/network_manager/test.rs +++ b/crates/papyrus_network/src/network_manager/test.rs @@ -25,7 +25,7 @@ use super::swarm_trait::{Event, SwarmTrait}; use super::GenericNetworkManager; use crate::gossipsub_impl::{self, Topic}; use crate::mixed_behaviour; -use crate::network_manager::SqmrClientPayload; +use crate::network_manager::{SqmrClientPayload, SqmrServerPayload}; use crate::sqmr::behaviour::{PeerNotConnected, SessionIdNotFoundError}; use crate::sqmr::{Bytes, GenericEvent, InboundSessionId, OutboundSessionId}; @@ -283,7 +283,7 @@ async fn process_incoming_query() { let mut network_manager = GenericNetworkManager::generic_new(mock_swarm); - let mut inbound_query_receiver = network_manager + let mut inbound_payload_receiver = network_manager .register_sqmr_protocol_server::, Vec>(protocol.to_string(), BUFFER_SIZE); let actual_protocol = get_supported_inbound_protocol_fut.next().await.unwrap(); @@ -292,7 +292,7 @@ async fn process_incoming_query() { let responses_clone = responses.clone(); select! { _ = async move { - let (query_got, mut responses_sender) = inbound_query_receiver.next().await.unwrap(); + let SqmrServerPayload{query: query_got, report_sender: _report_sender, mut responses_sender} = inbound_payload_receiver.next().await.unwrap(); assert_eq!(query_got.unwrap(), query); for response in responses_clone { responses_sender.feed(response).await.unwrap(); diff --git a/crates/papyrus_network/src/peer_manager/behaviour_impl.rs b/crates/papyrus_network/src/peer_manager/behaviour_impl.rs index ec5ffc6bf1..cbcaaccec7 100644 --- a/crates/papyrus_network/src/peer_manager/behaviour_impl.rs +++ b/crates/papyrus_network/src/peer_manager/behaviour_impl.rs @@ -106,7 +106,7 @@ where } let res = self.report_peer(peer_id, super::ReputationModifier::Bad); if res.is_err() { - error!("Dial failure of an unknow peer. peer id: {}", peer_id) + error!("Dial failure of an unknown peer. peer id: {}", peer_id) } // Re-assign a peer to the session so that a SessionAssgined Event will be emitted. // TODO: test this case diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 71d3e5f350..e2fd7a97c7 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -11,7 +11,7 @@ use futures::future::BoxFuture; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; -use papyrus_common::pending_classes::{ApiContractClass, PendingClasses}; +use papyrus_common::pending_classes::PendingClasses; use papyrus_common::BlockHashAndNumber; use papyrus_config::presentation::get_config_presentation; use papyrus_config::validators::config_validate; @@ -21,33 +21,19 @@ use papyrus_consensus::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_consensus::types::ConsensusError; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{ - BroadcastSubscriberChannels, - NetworkError, - SqmrQueryReceiver, -}; +use papyrus_network::network_manager::{BroadcastSubscriberChannels, NetworkError}; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_node::config::NodeConfig; use papyrus_node::version::VERSION_FULL; use papyrus_p2p_sync::client::{ + P2PClientSyncError, P2PSyncClient, P2PSyncClientChannels, P2PSyncClientConfig, - P2PSyncError, }; -use papyrus_p2p_sync::server::P2PSyncServer; +use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; use papyrus_protobuf::consensus::ConsensusMessage; -use papyrus_protobuf::sync::{ - ClassQuery, - DataOrFin, - EventQuery, - HeaderQuery, - SignedBlockHeader, - StateDiffChunk, - StateDiffQuery, - TransactionQuery, -}; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -57,7 +43,6 @@ use papyrus_sync::sources::pending::PendingSource; use papyrus_sync::{StateSync, StateSyncError, SyncConfig}; use starknet_api::block::BlockHash; use starknet_api::felt; -use starknet_api::transaction::{Event, Transaction, TransactionHash, TransactionOutput}; use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; use starknet_client::reader::PendingData; use tokio::sync::RwLock; @@ -187,21 +172,9 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // P2P Sync Server task. let p2p_sync_server_future = match maybe_sync_server_channels { - Some(( - header_server_channel, - state_diff_server_channel, - transaction_server_channel, - class_server_channel, - event_server_channel, - )) => { - let p2p_sync_server = P2PSyncServer::new( - storage_reader.clone(), - header_server_channel, - state_diff_server_channel, - transaction_server_channel, - class_server_channel, - event_server_channel, - ); + Some(p2p_sync_server_channels) => { + let p2p_sync_server = + P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); p2p_sync_server.run().boxed() } None => pending().boxed(), @@ -321,7 +294,7 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_client_channels: P2PSyncClientChannels, - ) -> Result<(), P2PSyncError> { + ) -> Result<(), P2PClientSyncError> { let p2p_sync = P2PSyncClient::new( p2p_sync_client_config, storage_reader, @@ -335,13 +308,7 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { type NetworkRunReturn = ( BoxFuture<'static, Result<(), NetworkError>>, Option, - Option<( - SqmrQueryReceiver>, - SqmrQueryReceiver>, - SqmrQueryReceiver>, - SqmrQueryReceiver>, - SqmrQueryReceiver>, - )>, + Option, Option>, String, ); @@ -380,22 +347,23 @@ fn run_network( ), None => None, }; - let p2p_sync_channels = P2PSyncClientChannels { - header_payload_sender: header_client_sender, - state_diff_payload_sender: state_diff_client_sender, - transaction_payload_sender: transaction_client_sender, - }; + let p2p_sync_client_channels = P2PSyncClientChannels::new( + header_client_sender, + state_diff_client_sender, + transaction_client_sender, + ); + let p2p_sync_server_channels = P2PSyncServerChannels::new( + header_server_channel, + state_diff_server_channel, + transaction_server_channel, + class_server_channel, + event_server_channel, + ); Ok(( network_manager.run().boxed(), - Some(p2p_sync_channels), - Some(( - header_server_channel, - state_diff_server_channel, - transaction_server_channel, - class_server_channel, - event_server_channel, - )), + Some(p2p_sync_client_channels), + Some(p2p_sync_server_channels), consensus_channels, local_peer_id, )) diff --git a/crates/papyrus_p2p_sync/src/client/header.rs b/crates/papyrus_p2p_sync/src/client/header.rs index 32c2acc53c..017c1dad18 100644 --- a/crates/papyrus_p2p_sync/src/client/header.rs +++ b/crates/papyrus_p2p_sync/src/client/header.rs @@ -6,7 +6,12 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder}; -use super::{P2PSyncError, ResponseReceiver, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT}; +use super::{ + P2PClientSyncError, + ResponseReceiver, + ALLOWED_SIGNATURES_LENGTH, + NETWORK_DATA_TIMEOUT, +}; impl BlockData for SignedBlockHeader { fn write_to_storage( @@ -42,12 +47,12 @@ impl DataStreamBuilder for HeaderStreamBuilder { signed_headers_receiver: &'a mut ResponseReceiver, block_number: BlockNumber, _storage_reader: &'a StorageReader, - ) -> BoxFuture<'a, Result, P2PSyncError>> { + ) -> BoxFuture<'a, Result, P2PClientSyncError>> { async move { let maybe_signed_header = tokio::time::timeout(NETWORK_DATA_TIMEOUT, signed_headers_receiver.next()) .await? - .ok_or(P2PSyncError::ReceiverChannelTerminated { + .ok_or(P2PClientSyncError::ReceiverChannelTerminated { type_description: Self::TYPE_DESCRIPTION, })?; let Some(signed_block_header) = maybe_signed_header?.0 else { @@ -56,13 +61,13 @@ impl DataStreamBuilder for HeaderStreamBuilder { // TODO(shahak): Check that parent_hash is the same as the previous block's hash // and handle reverts. if block_number != signed_block_header.block_header.block_number { - return Err(P2PSyncError::HeadersUnordered { + return Err(P2PClientSyncError::HeadersUnordered { expected_block_number: block_number, actual_block_number: signed_block_header.block_header.block_number, }); } if signed_block_header.signatures.len() != ALLOWED_SIGNATURES_LENGTH { - return Err(P2PSyncError::WrongSignaturesLength { + return Err(P2PClientSyncError::WrongSignaturesLength { signatures: signed_block_header.signatures, }); } diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index cf6112db37..ca3e862d04 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -111,7 +111,7 @@ impl Default for P2PSyncClientConfig { } #[derive(thiserror::Error, Debug)] -pub enum P2PSyncError { +pub enum P2PClientSyncError { // TODO(shahak): Remove this and report to network on invalid data once that's possible. // TODO(shahak): Consider removing this error and handling unordered headers without failing. #[error( @@ -179,12 +179,20 @@ type TransactionPayloadSender = SqmrClientSender>; pub struct P2PSyncClientChannels { - pub header_payload_sender: HeaderPayloadSender, - pub state_diff_payload_sender: StateDiffPayloadSender, - pub transaction_payload_sender: TransactionPayloadSender, + header_payload_sender: HeaderPayloadSender, + state_diff_payload_sender: StateDiffPayloadSender, + #[allow(dead_code)] + transaction_payload_sender: TransactionPayloadSender, } impl P2PSyncClientChannels { + pub fn new( + header_payload_sender: HeaderPayloadSender, + state_diff_payload_sender: StateDiffPayloadSender, + transaction_payload_sender: TransactionPayloadSender, + ) -> Self { + Self { header_payload_sender, state_diff_payload_sender, transaction_payload_sender } + } pub(crate) fn create_stream( self, storage_reader: StorageReader, @@ -244,7 +252,7 @@ impl P2PSyncClient { } #[instrument(skip(self), level = "debug", err)] - pub async fn run(mut self) -> Result<(), P2PSyncError> { + pub async fn run(mut self) -> Result<(), P2PClientSyncError> { let mut data_stream = self.p2p_sync_channels.create_stream(self.storage_reader.clone(), self.config); diff --git a/crates/papyrus_p2p_sync/src/client/state_diff.rs b/crates/papyrus_p2p_sync/src/client/state_diff.rs index ab31de6e4d..d16b3bf46a 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff.rs @@ -12,7 +12,7 @@ use starknet_api::state::ThinStateDiff; use super::ResponseReceiver; use crate::client::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder}; -use crate::client::{P2PSyncError, NETWORK_DATA_TIMEOUT}; +use crate::client::{P2PClientSyncError, NETWORK_DATA_TIMEOUT}; impl BlockData for (ThinStateDiff, BlockNumber) { #[latency_histogram("p2p_sync_state_diff_write_to_storage_latency_seconds", true)] @@ -37,7 +37,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { state_diff_chunks_receiver: &'a mut ResponseReceiver, block_number: BlockNumber, storage_reader: &'a StorageReader, - ) -> BoxFuture<'a, Result, P2PSyncError>> { + ) -> BoxFuture<'a, Result, P2PClientSyncError>> { async move { let mut result = ThinStateDiff::default(); let mut prev_result_len = 0; @@ -47,7 +47,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { .get_block_header(block_number)? .expect("A header with number lower than the header marker is missing") .state_diff_length - .ok_or(P2PSyncError::OldHeaderInStorage { + .ok_or(P2PClientSyncError::OldHeaderInStorage { block_number, missing_field: "state_diff_length", })?; @@ -56,14 +56,14 @@ impl DataStreamBuilder for StateDiffStreamBuilder { let maybe_state_diff_chunk = tokio::time::timeout(NETWORK_DATA_TIMEOUT, state_diff_chunks_receiver.next()) .await? - .ok_or(P2PSyncError::ReceiverChannelTerminated { + .ok_or(P2PClientSyncError::ReceiverChannelTerminated { type_description: Self::TYPE_DESCRIPTION, })?; let Some(state_diff_chunk) = maybe_state_diff_chunk?.0 else { if current_state_diff_len == 0 { return Ok(None); } else { - return Err(P2PSyncError::WrongStateDiffLength { + return Err(P2PClientSyncError::WrongStateDiffLength { expected_length: target_state_diff_len, possible_lengths: vec![current_state_diff_len], }); @@ -71,7 +71,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { }; prev_result_len = current_state_diff_len; if state_diff_chunk.is_empty() { - return Err(P2PSyncError::EmptyStateDiffPart); + return Err(P2PClientSyncError::EmptyStateDiffPart); } // It's cheaper to calculate the length of `state_diff_part` than the length of // `result`. @@ -80,7 +80,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { } if current_state_diff_len != target_state_diff_len { - return Err(P2PSyncError::WrongStateDiffLength { + return Err(P2PClientSyncError::WrongStateDiffLength { expected_length: target_state_diff_len, possible_lengths: vec![prev_result_len, current_state_diff_len], }); @@ -103,7 +103,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { fn unite_state_diffs( state_diff: &mut ThinStateDiff, state_diff_chunk: StateDiffChunk, -) -> Result<(), P2PSyncError> { +) -> Result<(), P2PClientSyncError> { match state_diff_chunk { StateDiffChunk::ContractDiff(contract_diff) => { if let Some(class_hash) = contract_diff.class_hash { @@ -112,12 +112,12 @@ fn unite_state_diffs( .insert(contract_diff.contract_address, class_hash) .is_some() { - return Err(P2PSyncError::ConflictingStateDiffParts); + return Err(P2PClientSyncError::ConflictingStateDiffParts); } } if let Some(nonce) = contract_diff.nonce { if state_diff.nonces.insert(contract_diff.contract_address, nonce).is_some() { - return Err(P2PSyncError::ConflictingStateDiffParts); + return Err(P2PClientSyncError::ConflictingStateDiffParts); } } if !contract_diff.storage_diffs.is_empty() { @@ -125,7 +125,7 @@ fn unite_state_diffs( Some(storage_diffs) => { for (k, v) in contract_diff.storage_diffs { if storage_diffs.insert(k, v).is_some() { - return Err(P2PSyncError::ConflictingStateDiffParts); + return Err(P2PClientSyncError::ConflictingStateDiffParts); } } } @@ -143,7 +143,7 @@ fn unite_state_diffs( .insert(declared_class.class_hash, declared_class.compiled_class_hash) .is_some() { - return Err(P2PSyncError::ConflictingStateDiffParts); + return Err(P2PClientSyncError::ConflictingStateDiffParts); } } StateDiffChunk::DeprecatedDeclaredClass(deprecated_declared_class) => { @@ -159,13 +159,13 @@ fn unite_state_diffs( )] fn validate_deprecated_declared_classes_non_conflicting( state_diff: &ThinStateDiff, -) -> Result<(), P2PSyncError> { +) -> Result<(), P2PClientSyncError> { // TODO(shahak): Check if sorting is more efficient. if state_diff.deprecated_declared_classes.len() == state_diff.deprecated_declared_classes.iter().cloned().collect::>().len() { Ok(()) } else { - Err(P2PSyncError::ConflictingStateDiffParts) + Err(P2PClientSyncError::ConflictingStateDiffParts) } } diff --git a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs index 1f720fbfd4..63e60b53aa 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs @@ -33,7 +33,7 @@ use super::test_utils::{ SLEEP_DURATION_TO_LET_SYNC_ADVANCE, STATE_DIFF_QUERY_LENGTH, }; -use super::{P2PSyncError, StateDiffQuery}; +use super::{P2PClientSyncError, StateDiffQuery}; const TIMEOUT_FOR_TEST: Duration = Duration::from_secs(5); @@ -185,7 +185,7 @@ async fn state_diff_basic_flow() { #[tokio::test] async fn state_diff_empty_state_diff() { validate_state_diff_fails(1, vec![Some(StateDiffChunk::default())], |error| { - assert_matches!(error, P2PSyncError::EmptyStateDiffPart) + assert_matches!(error, P2PClientSyncError::EmptyStateDiffPart) }) .await; } @@ -198,7 +198,7 @@ async fn state_diff_stopped_in_middle() { Some(StateDiffChunk::DeprecatedDeclaredClass(DeprecatedDeclaredClass::default())), None, ], - |error| assert_matches!(error, P2PSyncError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1]), + |error| assert_matches!(error, P2PClientSyncError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1]), ) .await; } @@ -216,7 +216,7 @@ async fn state_diff_not_split_correctly() { ..Default::default() }),) ], - |error| assert_matches!(error, P2PSyncError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1, 3]), + |error| assert_matches!(error, P2PClientSyncError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1, 3]), ) .await; } @@ -237,7 +237,7 @@ async fn state_diff_conflicting() { ..Default::default() })), ], - |error| assert_matches!(error, P2PSyncError::ConflictingStateDiffParts), + |error| assert_matches!(error, P2PClientSyncError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -254,7 +254,7 @@ async fn state_diff_conflicting() { ..Default::default() })), ], - |error| assert_matches!(error, P2PSyncError::ConflictingStateDiffParts), + |error| assert_matches!(error, P2PClientSyncError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -269,7 +269,7 @@ async fn state_diff_conflicting() { compiled_class_hash: CompiledClassHash::default(), })), ], - |error| assert_matches!(error, P2PSyncError::ConflictingStateDiffParts), + |error| assert_matches!(error, P2PClientSyncError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -282,7 +282,7 @@ async fn state_diff_conflicting() { class_hash: ClassHash::default(), })), ], - |error| assert_matches!(error, P2PSyncError::ConflictingStateDiffParts), + |error| assert_matches!(error, P2PClientSyncError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -299,7 +299,7 @@ async fn state_diff_conflicting() { ..Default::default() })), ], - |error| assert_matches!(error, P2PSyncError::ConflictingStateDiffParts), + |error| assert_matches!(error, P2PClientSyncError::ConflictingStateDiffParts), ) .await; } @@ -307,7 +307,7 @@ async fn state_diff_conflicting() { async fn validate_state_diff_fails( state_diff_length_in_header: usize, state_diff_chunks: Vec>, - error_validator: impl Fn(P2PSyncError), + error_validator: impl Fn(P2PClientSyncError), ) { let TestArgs { p2p_sync, diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 13b88011a4..f9ef83af99 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -14,11 +14,11 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use tracing::{debug, info}; -use super::{P2PSyncError, ResponseReceiver, WithPayloadSender, STEP}; +use super::{P2PClientSyncError, ResponseReceiver, WithPayloadSender, STEP}; use crate::client::SyncResponse; use crate::BUFFER_SIZE; -pub type DataStreamResult = Result, P2PSyncError>; +pub type DataStreamResult = Result, P2PClientSyncError>; pub(crate) trait BlockData: Send { fn write_to_storage( @@ -49,7 +49,7 @@ where data_receiver: &'a mut ResponseReceiver, block_number: BlockNumber, storage_reader: &'a StorageReader, - ) -> BoxFuture<'a, Result, P2PSyncError>>; + ) -> BoxFuture<'a, Result, P2PClientSyncError>>; fn get_start_block_number(storage_reader: &StorageReader) -> Result; @@ -135,8 +135,8 @@ where Some(Ok(DataOrFin(None))) => { debug!("Query sent to network for {:?} finished", Self::TYPE_DESCRIPTION); }, - Some(_) => Err(P2PSyncError::TooManyResponses)?, - None => Err(P2PSyncError::ReceiverChannelTerminated { + Some(_) => Err(P2PClientSyncError::TooManyResponses)?, + None => Err(P2PClientSyncError::ReceiverChannelTerminated { type_description: Self::TYPE_DESCRIPTION })?, } diff --git a/crates/papyrus_p2p_sync/src/server/mod.rs b/crates/papyrus_p2p_sync/src/server/mod.rs index 5a4bca89be..5aec82c630 100644 --- a/crates/papyrus_p2p_sync/src/server/mod.rs +++ b/crates/papyrus_p2p_sync/src/server/mod.rs @@ -1,9 +1,8 @@ use std::vec; -use futures::channel::mpsc::SendError; -use futures::{Sink, SinkExt, Stream, StreamExt}; +use futures::{Sink, SinkExt, StreamExt}; use papyrus_common::pending_classes::ApiContractClass; -use papyrus_protobuf::converters::ProtobufConversionError; +use papyrus_network::network_manager::{SqmrServerPayload, SqmrServerReceiver}; use papyrus_protobuf::sync::{ BlockHashOrNumber, ClassQuery, @@ -68,155 +67,147 @@ impl P2PSyncServerError { } } +type HeaderPayloadReceiver = SqmrServerReceiver>; +type StateDiffPayloadReceiver = SqmrServerReceiver>; +type TransactionPayloadReceiver = + SqmrServerReceiver>; +type ClassPayloadReceiver = SqmrServerReceiver>; +type EventPayloadReceiver = SqmrServerReceiver>; + +pub struct P2PSyncServerChannels { + header_payload_receiver: HeaderPayloadReceiver, + state_diff_payload_receiver: StateDiffPayloadReceiver, + transaction_payload_receiver: TransactionPayloadReceiver, + class_payload_receiver: ClassPayloadReceiver, + event_payload_receiver: EventPayloadReceiver, +} + +impl P2PSyncServerChannels { + pub fn new( + header_payload_receiver: HeaderPayloadReceiver, + state_diff_payload_receiver: StateDiffPayloadReceiver, + transaction_payload_receiver: TransactionPayloadReceiver, + class_payload_receiver: ClassPayloadReceiver, + event_payload_receiver: EventPayloadReceiver, + ) -> Self { + Self { + header_payload_receiver, + state_diff_payload_receiver, + transaction_payload_receiver, + class_payload_receiver, + event_payload_receiver, + } + } +} + /// A P2PSyncServer receives inbound queries and returns their corresponding data. -pub struct P2PSyncServer< - HeaderQueryReceiver, - StateDiffQueryReceiver, - TransactionQueryReceiver, - ClassQueryReceiver, - EventQueryReceiver, -> { +pub struct P2PSyncServer { storage_reader: StorageReader, - header_queries_receiver: HeaderQueryReceiver, - state_diff_queries_receiver: StateDiffQueryReceiver, - transaction_queries_receiver: TransactionQueryReceiver, - class_queries_receiver: ClassQueryReceiver, - event_queries_receiver: EventQueryReceiver, + p2p_sync_channels: P2PSyncServerChannels, } -impl< - HeaderQueryReceiver, - StateDiffQueryReceiver, - TransactionQueryReceiver, - ClassQueryReceiver, - EventQueryReceiver, - HeaderResponsesSender, - StateDiffResponsesSender, - TransactionResponsesSender, - ClassResponsesSender, - EventResponsesSender, -> - P2PSyncServer< - HeaderQueryReceiver, - StateDiffQueryReceiver, - TransactionQueryReceiver, - ClassQueryReceiver, - EventQueryReceiver, - > -where - HeaderQueryReceiver: Stream, HeaderResponsesSender)> - + Unpin, - HeaderResponsesSender: - Sink, Error = SendError> + Unpin + Send + 'static, - StateDiffQueryReceiver: Stream, StateDiffResponsesSender)> - + Unpin, - StateDiffResponsesSender: - Sink, Error = SendError> + Unpin + Send + 'static, - TransactionQueryReceiver: Stream< - Item = (Result, TransactionResponsesSender), - > + Unpin, - TransactionResponsesSender: Sink, Error = SendError> - + Unpin - + Send - + 'static, - ClassQueryReceiver: - Stream, ClassResponsesSender)> + Unpin, - ClassResponsesSender: - Sink, Error = SendError> + Unpin + Send + 'static, - EventQueryReceiver: - Stream, EventResponsesSender)> + Unpin, - EventResponsesSender: - Sink, Error = SendError> + Unpin + Send + 'static, -{ - pub async fn run(mut self) { +impl P2PSyncServer { + pub async fn run(self) { + let P2PSyncServerChannels { + mut header_payload_receiver, + mut state_diff_payload_receiver, + mut transaction_payload_receiver, + mut class_payload_receiver, + mut event_payload_receiver, + } = self.p2p_sync_channels; loop { tokio::select! { - result = self.header_queries_receiver.next() => { - let (query_result, response_sender) = result.expect( + result = header_payload_receiver.next() => { + let SqmrServerPayload { + query: query_result, + report_sender: _report_sender, + responses_sender + } = result.expect( "Header queries sender was unexpectedly dropped." ); // TODO(shahak): Report if query_result is Err. if let Ok(query) = query_result { - self.register_query(query.0, response_sender); + register_query(self.storage_reader.clone(), query.0, responses_sender); } } - result = self.state_diff_queries_receiver.next() => { - let (query_result, response_sender) = result.expect( + result = state_diff_payload_receiver.next() => { + let SqmrServerPayload { + query: query_result, + report_sender: _report_sender, + responses_sender + } = result.expect( "State diff queries sender was unexpectedly dropped." + ); // TODO(shahak): Report if query_result is Err. if let Ok(query) = query_result { - self.register_query(query.0, response_sender); + register_query(self.storage_reader.clone(), query.0, responses_sender); } } - result = self.transaction_queries_receiver.next() => { - let (query_result, response_sender) = result.expect( + result = transaction_payload_receiver.next() => { + let SqmrServerPayload { + query: query_result, + report_sender: _report_sender, + responses_sender + } = result.expect( "Transaction queries sender was unexpectedly dropped." ); // TODO: Report if query_result is Err. if let Ok(query) = query_result { - self.register_query(query.0, response_sender); + register_query(self.storage_reader.clone(), query.0, responses_sender); } } - result = self.class_queries_receiver.next() => { - let (query_result, response_sender) = result.expect( + result = class_payload_receiver.next() => { + let SqmrServerPayload { + query: query_result, + report_sender: _report_sender, + responses_sender + } = result.expect( "Class queries sender was unexpectedly dropped." ); // TODO: Report if query_result is Err. if let Ok(query) = query_result { - self.register_query(query.0, response_sender); + register_query(self.storage_reader.clone(), query.0, responses_sender); } } - result = self.event_queries_receiver.next() => { - let (query_result, response_sender) = result.expect( + result = event_payload_receiver.next() => { + let SqmrServerPayload { + query: query_result, + report_sender: _report_sender, + responses_sender + } = result.expect( "Event queries sender was unexpectedly dropped." ); // TODO: Report if query_result is Err. if let Ok(query) = query_result { - self.register_query(query.0, response_sender); + register_query(self.storage_reader.clone(), query.0, responses_sender); } } }; } } - pub fn new( - storage_reader: StorageReader, - header_queries_receiver: HeaderQueryReceiver, - state_diff_queries_receiver: StateDiffQueryReceiver, - transaction_queries_receiver: TransactionQueryReceiver, - class_queries_receiver: ClassQueryReceiver, - event_queries_receiver: EventQueryReceiver, - ) -> Self { - Self { - storage_reader, - header_queries_receiver, - state_diff_queries_receiver, - transaction_queries_receiver, - class_queries_receiver, - event_queries_receiver, - } + pub fn new(storage_reader: StorageReader, p2p_sync_channels: P2PSyncServerChannels) -> Self { + Self { storage_reader, p2p_sync_channels } } - - fn register_query(&self, query: Query, sender: Sender) - where - Data: FetchBlockDataFromDb + Send + 'static, - Sender: Sink> + Unpin + Send + 'static, - P2PSyncServerError: From<>>::Error>, - { - let storage_reader_clone = self.storage_reader.clone(); - tokio::task::spawn(async move { - let result = send_data_for_query(storage_reader_clone, query.clone(), sender).await; - if let Err(error) = result { - if error.should_log_in_error_level() { - error!("Running inbound query {query:?} failed on {error:?}"); - } - Err(error) - } else { - Ok(()) +} +fn register_query(storage_reader: StorageReader, query: Query, sender: Sender) +where + Data: FetchBlockDataFromDb + Send + 'static, + Sender: Sink> + Unpin + Send + 'static, + P2PSyncServerError: From<>>::Error>, +{ + tokio::task::spawn(async move { + let result = send_data_for_query(storage_reader, query.clone(), sender).await; + if let Err(error) = result { + if error.should_log_in_error_level() { + error!("Running inbound query {query:?} failed on {error:?}"); } - }); - } + Err(error) + } else { + Ok(()) + } + }); } pub trait FetchBlockDataFromDb: Sized { diff --git a/crates/papyrus_p2p_sync/src/server/test.rs b/crates/papyrus_p2p_sync/src/server/test.rs index 9eee0bc03c..d1497e9618 100644 --- a/crates/papyrus_p2p_sync/src/server/test.rs +++ b/crates/papyrus_p2p_sync/src/server/test.rs @@ -1,9 +1,9 @@ -use futures::channel::mpsc::{Receiver, Sender}; +use futures::channel::mpsc::Sender; use futures::StreamExt; use lazy_static::lazy_static; use papyrus_common::pending_classes::ApiContractClass; use papyrus_common::state::create_random_state_diff; -use papyrus_protobuf::converters::ProtobufConversionError; +use papyrus_network::network_manager::SqmrServerPayload; use papyrus_protobuf::sync::{ BlockHashOrNumber, ClassQuery, @@ -30,7 +30,8 @@ use starknet_api::state::ContractClass; use starknet_api::transaction::{Event, Transaction, TransactionHash, TransactionOutput}; use test_utils::{get_rng, get_test_body, GetTestInstance}; -use super::{split_thin_state_diff, FetchBlockDataFromDb, P2PSyncServer}; +use super::{split_thin_state_diff, FetchBlockDataFromDb, P2PSyncServer, P2PSyncServerChannels}; +use crate::server::register_query; const BUFFER_SIZE: usize = 10; const NUM_OF_BLOCKS: u64 = 10; const NUM_TXS_PER_BLOCK: usize = 5; @@ -242,16 +243,16 @@ where T: FetchBlockDataFromDb + std::fmt::Debug + PartialEq + Send + Sync + 'static, F: FnOnce(Vec), { - let ( + let TestArgs { p2p_sync_server, storage_reader, mut storage_writer, - _header_queries_sender, - _state_diff_queries_sender, - _transaction_queries_sender, - _class_queries_sender, - _event_queries_sender, - ) = setup(); + header_payload_sender: _header_payload_sender, + state_diff_payload_sender: _state_diff_payload_sender, + transaction_payload_sender: _transaction_payload_sender, + class_payload_sender: _class_payload_sender, + event_payload_sender: _event_payload_sender, + } = setup(); // put some data in the storage. insert_to_storage_test_blocks_up_to(&mut storage_writer); @@ -272,7 +273,7 @@ where // register a query. let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE); let query = Query { start_block, direction: Direction::Forward, limit: NUM_OF_BLOCKS, step: 1 }; - p2p_sync_server.register_query::(query, sender); + register_query::(p2p_sync_server.storage_reader.clone(), query, sender); // run p2p_sync_server and collect query results. tokio::select! { @@ -289,83 +290,55 @@ where } } +pub struct TestArgs { + #[allow(clippy::type_complexity)] + pub p2p_sync_server: P2PSyncServer, + pub storage_reader: StorageReader, + pub storage_writer: StorageWriter, + pub header_payload_sender: Sender>>, + pub state_diff_payload_sender: + Sender>>, + pub transaction_payload_sender: + Sender>>, + pub class_payload_sender: Sender>>, + pub event_payload_sender: + Sender>>, +} + #[allow(clippy::type_complexity)] -fn setup() -> ( - P2PSyncServer< - Receiver<( - Result, - Sender>, - )>, - Receiver<( - Result, - Sender>, - )>, - Receiver<( - Result, - Sender>, - )>, - Receiver<( - Result, - Sender>, - )>, - Receiver<( - Result, - Sender>, - )>, - >, - StorageReader, - StorageWriter, - Sender<(Result, Sender>)>, - Sender<(Result, Sender>)>, - Sender<( - Result, - Sender>, - )>, - Sender<(Result, Sender>)>, - Sender<( - Result, - Sender>, - )>, -) { +fn setup() -> TestArgs { let ((storage_reader, storage_writer), _temp_dir) = get_test_storage(); - let (header_queries_sender, header_queries_receiver) = futures::channel::mpsc::channel::<( - Result, - Sender>, - )>(BUFFER_SIZE); - let (state_diff_queries_sender, state_diff_queries_receiver) = futures::channel::mpsc::channel::< - (Result, Sender>), - >(BUFFER_SIZE); - let (transaction_sender, transaction_queries_receiver) = futures::channel::mpsc::channel::<( - Result, - Sender>, - )>(BUFFER_SIZE); - let (class_sender, class_queries_receiver) = futures::channel::mpsc::channel::<( - Result, - Sender>, - )>(BUFFER_SIZE); - let (event_sender, event_queries_receiver) = futures::channel::mpsc::channel::<( - Result, - Sender>, - )>(BUFFER_SIZE); + let (header_payload_sender, header_payload_receiver) = + futures::channel::mpsc::channel(BUFFER_SIZE); + let (state_diff_payload_sender, state_diff_payload_receiver) = + futures::channel::mpsc::channel(BUFFER_SIZE); + let (transaction_payload_sender, transaction_payload_receiver) = + futures::channel::mpsc::channel(BUFFER_SIZE); + let (class_payload_sender, class_payload_receiver) = + futures::channel::mpsc::channel(BUFFER_SIZE); + let (event_payload_sender, event_payload_receiver) = + futures::channel::mpsc::channel(BUFFER_SIZE); - let p2p_sync_server = super::P2PSyncServer::new( - storage_reader.clone(), - header_queries_receiver, - state_diff_queries_receiver, - transaction_queries_receiver, - class_queries_receiver, - event_queries_receiver, + let p2p_sync_server_channels = P2PSyncServerChannels::new( + Box::new(header_payload_receiver), + Box::new(state_diff_payload_receiver), + Box::new(transaction_payload_receiver), + Box::new(class_payload_receiver), + Box::new(event_payload_receiver), ); - ( + + let p2p_sync_server = + super::P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); + TestArgs { p2p_sync_server, storage_reader, storage_writer, - header_queries_sender, - state_diff_queries_sender, - transaction_sender, - class_sender, - event_sender, - ) + header_payload_sender, + state_diff_payload_sender, + transaction_payload_sender, + class_payload_sender, + event_payload_sender, + } } use starknet_api::core::ClassHash; fn insert_to_storage_test_blocks_up_to(storage_writer: &mut StorageWriter) {