From 6d28dd57fe4e0accc1220f0031bbd5364d3d9a67 Mon Sep 17 00:00:00 2001 From: eitanm-starkware Date: Thu, 18 Jul 2024 12:05:37 +0300 Subject: [PATCH] chore(network): handle report receiver --- .../src/network_manager/mod.rs | 29 ++++++++++++------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index 4c05a60661..1616cd3619 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -349,13 +349,17 @@ impl GenericNetworkManager { sqmr::behaviour::ExternalEvent::ReceivedResponse { outbound_session_id, response, - peer_id: _peer_id, + peer_id, } => { trace!( "Received response from peer for session id: {outbound_session_id:?}. sending \ to sync subscriber." ); - + if let Some(report_receiver) = + self.sqmr_outbound_report_receivers.remove(&outbound_session_id) + { + self.handle_report_receiver(peer_id, report_receiver) + } if let Some(response_sender) = self.sqmr_outbound_response_senders.get_mut(&outbound_session_id) { @@ -376,9 +380,11 @@ impl GenericNetworkManager { // TODO: Handle reputation and retry. if let SessionId::OutboundSessionId(outbound_session_id) = session_id { self.sqmr_outbound_response_senders.remove(&outbound_session_id); - // TODO: check if the report receiver was already removed when session was - // assigned - self.sqmr_outbound_report_receivers.remove(&outbound_session_id); + if let Some(_report_receiver) = + self.sqmr_outbound_report_receivers.remove(&outbound_session_id) + { + error!("Report receiver was not consumed. Dropping it."); + } } } sqmr::behaviour::ExternalEvent::SessionFinishedSuccessfully { session_id } => { @@ -386,9 +392,11 @@ impl GenericNetworkManager { self.report_session_removed_to_metrics(session_id); if let SessionId::OutboundSessionId(outbound_session_id) = session_id { self.sqmr_outbound_response_senders.remove(&outbound_session_id); - // TODO: check if the report receiver was already removed when session was - // assigned - self.sqmr_outbound_report_receivers.remove(&outbound_session_id); + if let Some(_report_receiver) = + self.sqmr_outbound_report_receivers.remove(&outbound_session_id) + { + error!("Report receiver was not consumed. Dropping it."); + } } } } @@ -398,7 +406,7 @@ impl GenericNetworkManager { match event { gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } => { let (report_sender, report_receiver) = oneshot::channel::<()>(); - self.handle_new_report_receiver(originated_peer_id, report_receiver); + self.handle_report_receiver(originated_peer_id, report_receiver); let Some(sender) = self.broadcasted_messages_senders.get_mut(&topic_hash) else { error!( "Received a message from a topic we're not subscribed to with hash \ @@ -494,7 +502,7 @@ impl GenericNetworkManager { } } } - fn handle_new_report_receiver(&self, peer_id: PeerId, report_receiver: oneshot::Receiver<()>) { + fn handle_report_receiver(&self, peer_id: PeerId, report_receiver: oneshot::Receiver<()>) { self.reported_peer_receivers.push( report_receiver .map(move |result| match result { @@ -506,6 +514,7 @@ impl GenericNetworkManager { } } +// TODO(eitan): combine with server_send_now fn network_send_now( sender: &mut GenericSender, item: Item,