diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index 4c05a60661..23306fb970 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -42,7 +42,7 @@ pub struct GenericNetworkManager { sqmr_outbound_payload_receivers: StreamHashMap, sqmr_outbound_response_senders: HashMap, - sqmr_outbound_report_receivers: HashMap, + sqmr_outbound_report_receivers_awaiting_assignment: HashMap, // Splitting the broadcast receivers from the broadcasted senders in order to poll all // receivers simultaneously. // Each receiver has a matching sender and vice versa (i.e the maps have the same keys). @@ -82,7 +82,7 @@ impl GenericNetworkManager { sqmr_inbound_query_senders: HashMap::new(), sqmr_outbound_payload_receivers: StreamHashMap::new(HashMap::new()), sqmr_outbound_response_senders: HashMap::new(), - sqmr_outbound_report_receivers: HashMap::new(), + sqmr_outbound_report_receivers_awaiting_assignment: HashMap::new(), messages_to_broadcast_receivers: StreamHashMap::new(HashMap::new()), broadcasted_messages_senders: HashMap::new(), reported_peer_receivers, @@ -349,13 +349,18 @@ impl GenericNetworkManager { sqmr::behaviour::ExternalEvent::ReceivedResponse { outbound_session_id, response, - peer_id: _peer_id, + peer_id, } => { trace!( "Received response from peer for session id: {outbound_session_id:?}. sending \ to sync subscriber." ); - + if let Some(report_receiver) = self + .sqmr_outbound_report_receivers_awaiting_assignment + .remove(&outbound_session_id) + { + self.handle_new_report_receiver(peer_id, report_receiver) + } if let Some(response_sender) = self.sqmr_outbound_response_senders.get_mut(&outbound_session_id) { @@ -376,9 +381,15 @@ impl GenericNetworkManager { // TODO: Handle reputation and retry. if let SessionId::OutboundSessionId(outbound_session_id) = session_id { self.sqmr_outbound_response_senders.remove(&outbound_session_id); - // TODO: check if the report receiver was already removed when session was - // assigned - self.sqmr_outbound_report_receivers.remove(&outbound_session_id); + if let Some(_report_receiver) = self + .sqmr_outbound_report_receivers_awaiting_assignment + .remove(&outbound_session_id) + { + debug!( + "Outbound session failed before peer assignment. Ignoring incoming \ + reports for the session." + ); + } } } sqmr::behaviour::ExternalEvent::SessionFinishedSuccessfully { session_id } => { @@ -386,9 +397,15 @@ impl GenericNetworkManager { self.report_session_removed_to_metrics(session_id); if let SessionId::OutboundSessionId(outbound_session_id) = session_id { self.sqmr_outbound_response_senders.remove(&outbound_session_id); - // TODO: check if the report receiver was already removed when session was - // assigned - self.sqmr_outbound_report_receivers.remove(&outbound_session_id); + if let Some(_report_receiver) = self + .sqmr_outbound_report_receivers_awaiting_assignment + .remove(&outbound_session_id) + { + error!( + "Outbound session finished with no messages in it. Ignoring incoming \ + reports for the session." + ); + } } } } @@ -459,9 +476,8 @@ impl GenericNetworkManager { self.num_active_outbound_sessions as f64 ); self.sqmr_outbound_response_senders.insert(outbound_session_id, responses_sender); - // TODO(eitan): once session is assigned call handle_new_report_receiver using map - // below - self.sqmr_outbound_report_receivers.insert(outbound_session_id, report_receiver); + self.sqmr_outbound_report_receivers_awaiting_assignment + .insert(outbound_session_id, report_receiver); } Err(e) => { info!( @@ -506,6 +522,7 @@ impl GenericNetworkManager { } } +// TODO(eitan): combine with server_send_now fn network_send_now( sender: &mut GenericSender, item: Item,