Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(network): handle report receiver #2234

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {

sqmr_outbound_payload_receivers: StreamHashMap<StreamProtocol, SqmrClientReceiver>,
sqmr_outbound_response_senders: HashMap<OutboundSessionId, ResponsesSenderForNetwork>,
sqmr_outbound_report_receivers: HashMap<OutboundSessionId, ReportReceiver>,
sqmr_outbound_report_receivers_awaiting_assignment: HashMap<OutboundSessionId, ReportReceiver>,
// Splitting the broadcast receivers from the broadcasted senders in order to poll all
// receivers simultaneously.
// Each receiver has a matching sender and vice versa (i.e the maps have the same keys).
Expand Down Expand Up @@ -82,7 +82,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
sqmr_inbound_query_senders: HashMap::new(),
sqmr_outbound_payload_receivers: StreamHashMap::new(HashMap::new()),
sqmr_outbound_response_senders: HashMap::new(),
sqmr_outbound_report_receivers: HashMap::new(),
sqmr_outbound_report_receivers_awaiting_assignment: HashMap::new(),
messages_to_broadcast_receivers: StreamHashMap::new(HashMap::new()),
broadcasted_messages_senders: HashMap::new(),
reported_peer_receivers,
Expand Down Expand Up @@ -349,13 +349,18 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
sqmr::behaviour::ExternalEvent::ReceivedResponse {
outbound_session_id,
response,
peer_id: _peer_id,
peer_id,
} => {
trace!(
"Received response from peer for session id: {outbound_session_id:?}. sending \
to sync subscriber."
);

if let Some(report_receiver) = self
.sqmr_outbound_report_receivers_awaiting_assignment
.remove(&outbound_session_id)
{
self.handle_new_report_receiver(peer_id, report_receiver)
}
if let Some(response_sender) =
self.sqmr_outbound_response_senders.get_mut(&outbound_session_id)
{
Expand All @@ -376,19 +381,31 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
// TODO: Handle reputation and retry.
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
self.sqmr_outbound_response_senders.remove(&outbound_session_id);
// TODO: check if the report receiver was already removed when session was
// assigned
self.sqmr_outbound_report_receivers.remove(&outbound_session_id);
if let Some(_report_receiver) = self
.sqmr_outbound_report_receivers_awaiting_assignment
.remove(&outbound_session_id)
{
debug!(
"Outbound session failed before peer assignment. Ignoring incoming \
reports for the session."
);
}
}
}
sqmr::behaviour::ExternalEvent::SessionFinishedSuccessfully { session_id } => {
debug!("Session completed successfully. session_id: {session_id:?}");
self.report_session_removed_to_metrics(session_id);
if let SessionId::OutboundSessionId(outbound_session_id) = session_id {
self.sqmr_outbound_response_senders.remove(&outbound_session_id);
// TODO: check if the report receiver was already removed when session was
// assigned
self.sqmr_outbound_report_receivers.remove(&outbound_session_id);
if let Some(_report_receiver) = self
.sqmr_outbound_report_receivers_awaiting_assignment
.remove(&outbound_session_id)
{
error!(
"Outbound session finished with no messages in it. Ignoring incoming \
reports for the session."
);
}
}
}
}
Expand Down Expand Up @@ -459,9 +476,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
self.num_active_outbound_sessions as f64
);
self.sqmr_outbound_response_senders.insert(outbound_session_id, responses_sender);
// TODO(eitan): once session is assigned call handle_new_report_receiver using map
// below
self.sqmr_outbound_report_receivers.insert(outbound_session_id, report_receiver);
self.sqmr_outbound_report_receivers_awaiting_assignment
.insert(outbound_session_id, report_receiver);
}
Err(e) => {
info!(
Expand Down Expand Up @@ -506,6 +522,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
}

// TODO(eitan): combine with server_send_now
fn network_send_now<Item>(
sender: &mut GenericSender<Item>,
item: Item,
Expand Down
Loading