Skip to content

Commit

Permalink
chore(network): handle report receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Jul 18, 2024
1 parent e23be56 commit bef8ce6
Showing 1 changed file with 19 additions and 12 deletions.
31 changes: 19 additions & 12 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,17 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
sqmr::behaviour::ExternalEvent::ReceivedResponse {
outbound_session_id,
response,
peer_id: _peer_id,
peer_id,
} => {
trace!(
"Received response from peer for session id: {outbound_session_id:?}. sending \
to sync subscriber."
);

if let Some(report_receiver) =
self.sqmr_outbound_report_receivers.remove(&outbound_session_id)
{
self.handle_report_receiver(peer_id, report_receiver)
}
if let Some(response_sender) =
self.sqmr_outbound_response_senders.get_mut(&outbound_session_id)
{
Expand All @@ -376,19 +380,23 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
// TODO: Handle reputation and retry.
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
self.sqmr_outbound_response_senders.remove(&outbound_session_id);
// TODO: check if the report receiver was already removed when session was
// assigned
self.sqmr_outbound_report_receivers.remove(&outbound_session_id);
if let Some(_report_receiver) =
self.sqmr_outbound_report_receivers.remove(&outbound_session_id)
{
error!("Report receiver was not consumed. Dropping it.");
}
}
}
sqmr::behaviour::ExternalEvent::SessionFinishedSuccessfully { session_id } => {
debug!("Session completed successfully. session_id: {session_id:?}");
self.report_session_removed_to_metrics(session_id);
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
self.sqmr_outbound_response_senders.remove(&outbound_session_id);
// TODO: check if the report receiver was already removed when session was
// assigned
self.sqmr_outbound_report_receivers.remove(&outbound_session_id);
if let Some(_report_receiver) =
self.sqmr_outbound_report_receivers.remove(&outbound_session_id)
{
error!("Report receiver was not consumed. Dropping it.");
}
}
}
}
Expand All @@ -398,7 +406,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
match event {
gossipsub_impl::ExternalEvent::Received { originated_peer_id, message, topic_hash } => {
let (report_sender, report_receiver) = oneshot::channel::<()>();
self.handle_new_report_receiver(originated_peer_id, report_receiver);
self.handle_report_receiver(originated_peer_id, report_receiver);
let Some(sender) = self.broadcasted_messages_senders.get_mut(&topic_hash) else {
error!(
"Received a message from a topic we're not subscribed to with hash \
Expand Down Expand Up @@ -459,8 +467,6 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
self.num_active_outbound_sessions as f64
);
self.sqmr_outbound_response_senders.insert(outbound_session_id, responses_sender);
// TODO(eitan): once session is assigned call handle_new_report_receiver using map
// below
self.sqmr_outbound_report_receivers.insert(outbound_session_id, report_receiver);
}
Err(e) => {
Expand Down Expand Up @@ -494,7 +500,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
}
}
fn handle_new_report_receiver(&self, peer_id: PeerId, report_receiver: oneshot::Receiver<()>) {
fn handle_report_receiver(&self, peer_id: PeerId, report_receiver: oneshot::Receiver<()>) {
self.reported_peer_receivers.push(
report_receiver
.map(move |result| match result {
Expand All @@ -506,6 +512,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
}

// TODO(eitan): combine with server_send_now
fn network_send_now<Item>(
sender: &mut GenericSender<Item>,
item: Item,
Expand Down

0 comments on commit bef8ce6

Please sign in to comment.