Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
revert(network): remove send_query from streamed bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Jul 23, 2024
1 parent 6403530 commit a271b99
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 134 deletions.
43 changes: 0 additions & 43 deletions crates/papyrus_network/src/sqmr/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::Duration;

use defaultmap::DefaultHashMap;
use libp2p::core::Endpoint;
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{
ConnectionClosed,
ConnectionDenied,
Expand Down Expand Up @@ -110,8 +108,6 @@ pub struct PeerNotConnected;
pub struct Behaviour {
config: Config,
pending_events: VecDeque<ToSwarm<Event, RequestFromBehaviourEvent>>,
// TODO(shahak) Remove this once we remove send_query.
connection_ids_map: DefaultHashMap<PeerId, HashSet<ConnectionId>>,
session_id_to_peer_id_and_connection_id: HashMap<SessionId, (PeerId, ConnectionId)>,
next_outbound_session_id: OutboundSessionId,
next_inbound_session_id: Arc<AtomicUsize>,
Expand All @@ -126,7 +122,6 @@ impl Behaviour {
Self {
config,
pending_events: Default::default(),
connection_ids_map: Default::default(),
session_id_to_peer_id_and_connection_id: Default::default(),
next_outbound_session_id: Default::default(),
next_inbound_session_id: Arc::new(Default::default()),
Expand All @@ -137,37 +132,6 @@ impl Behaviour {
}
}

/// Send query to the given peer and start a new outbound session with it. Return the id of the
/// new session.
// TODO(shahak) Remove this function once Network manager uses start_query.
pub fn send_query(
&mut self,
query: Bytes,
peer_id: PeerId,
protocol_name: StreamProtocol,
) -> Result<OutboundSessionId, PeerNotConnected> {
let connection_id =
*self.connection_ids_map.get(peer_id).iter().next().ok_or(PeerNotConnected)?;

let outbound_session_id = self.next_outbound_session_id;
self.next_outbound_session_id.value += 1;

self.session_id_to_peer_id_and_connection_id
.insert(outbound_session_id.into(), (peer_id, connection_id));

self.add_event_to_queue(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: RequestFromBehaviourEvent::CreateOutboundSession {
query,
outbound_session_id,
protocol_name,
},
});

Ok(outbound_session_id)
}

/// Assign some peer and start a query. Return the id of the new session.
pub fn start_query(
&mut self,
Expand Down Expand Up @@ -293,13 +257,6 @@ impl NetworkBehaviour for Behaviour {

fn on_swarm_event(&mut self, event: FromSwarm<'_>) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
..
}) => {
self.connection_ids_map.get_mut(peer_id).insert(connection_id);
}
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
let mut session_ids = Vec::new();
self.session_id_to_peer_id_and_connection_id.retain(
Expand Down
112 changes: 49 additions & 63 deletions crates/papyrus_network/src/sqmr/behaviour_test.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
// TODO(shahak): Use start_query in all tests instead of send_query

use std::pin::Pin;
use std::task::{Context, Poll};

use assert_matches::assert_matches;
use futures::{FutureExt, Stream, StreamExt};
use lazy_static::lazy_static;
use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{ConnectionClosed, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm};
use libp2p::{Multiaddr, PeerId, StreamProtocol};

use super::super::handler::{RequestFromBehaviourEvent, RequestToBehaviourEvent};
use super::super::{Bytes, Config, GenericEvent, InboundSessionId, OutboundSessionId, SessionId};
use super::{Behaviour, Event, ExternalEvent, SessionError};
use super::{Behaviour, Event, ExternalEvent, SessionError, ToOtherBehaviourEvent};
use crate::mixed_behaviour::BridgedBehaviour;
use crate::test_utils::dummy_data;
use crate::{mixed_behaviour, peer_manager};

impl Unpin for Behaviour {}

Expand All @@ -34,37 +33,19 @@ lazy_static! {
static ref PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/");
}

fn simulate_connection_established(behaviour: &mut Behaviour, peer_id: PeerId) {
let connection_id = ConnectionId::new_unchecked(0);
let address = Multiaddr::empty();
let role_override = Endpoint::Dialer;
let _handler = behaviour
.handle_established_outbound_connection(connection_id, peer_id, &address, role_override)
.unwrap();
behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint: &ConnectedPoint::Dialer { address, role_override },
failed_addresses: &[],
other_established: 0,
}));
}

fn simulate_listener_connection(behaviour: &mut Behaviour, peer_id: PeerId) {
let connection_id = ConnectionId::new_unchecked(0);
let address = Multiaddr::empty();
let local_addr = Multiaddr::empty();
let role_override = Endpoint::Listener;
let _handler = behaviour
.handle_established_outbound_connection(connection_id, peer_id, &address, role_override)
.unwrap();
behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint: &ConnectedPoint::Listener { send_back_addr: address, local_addr },
failed_addresses: &[],
other_established: 0,
}));
fn simulate_peer_assigned(
behaviour: &mut Behaviour,
peer_id: PeerId,
outbound_session_id: OutboundSessionId,
) {
behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::PeerManager(
peer_manager::ToOtherBehaviourEvent::SessionAssigned {
outbound_session_id,
peer_id,
// TODO(shahak): Add test with multiple connections
connection_id: ConnectionId::new_unchecked(0),
},
));
}

fn simulate_new_inbound_session(
Expand All @@ -75,6 +56,7 @@ fn simulate_new_inbound_session(
) {
behaviour.on_connection_handler_event(
peer_id,
// This is the same connection_id from simulate_peer_assigned
ConnectionId::new_unchecked(0),
RequestToBehaviourEvent::GenerateEvent(GenericEvent::NewInboundSession {
query,
Expand All @@ -93,6 +75,7 @@ fn simulate_received_response(
) {
behaviour.on_connection_handler_event(
peer_id,
// This is the same connection_id from simulate_peer_assigned
ConnectionId::new_unchecked(0),
RequestToBehaviourEvent::GenerateEvent(GenericEvent::ReceivedResponse {
response,
Expand All @@ -109,6 +92,7 @@ fn simulate_session_finished_successfully(
) {
behaviour.on_connection_handler_event(
peer_id,
// This is the same connection_id from simulate_peer_assigned
ConnectionId::new_unchecked(0),
RequestToBehaviourEvent::GenerateEvent(GenericEvent::SessionFinishedSuccessfully {
session_id,
Expand All @@ -117,7 +101,7 @@ fn simulate_session_finished_successfully(
}

fn simulate_connection_closed(behaviour: &mut Behaviour, peer_id: PeerId) {
// This is the same connection_id from simulate_connection_established
// This is the same connection_id from simulate_peer_assigned
let connection_id = ConnectionId::new_unchecked(0);
behaviour.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
Expand All @@ -140,6 +124,20 @@ fn simulate_session_dropped(behaviour: &mut Behaviour, peer_id: PeerId, session_
);
}

async fn validate_request_peer_assignment_event(
behaviour: &mut Behaviour,
outbound_session_id: OutboundSessionId,
) {
let event = behaviour.next().await.unwrap();
assert_matches!(
event,
ToSwarm::GenerateEvent(Event::ToOtherBehaviourEvent(ToOtherBehaviourEvent::RequestPeerAssignment {
outbound_session_id: event_outbound_session_id
},
)) if outbound_session_id == event_outbound_session_id
);
}

async fn validate_create_outbound_session_event(
behaviour: &mut Behaviour,
peer_id: &PeerId,
Expand Down Expand Up @@ -280,8 +278,6 @@ async fn process_inbound_session() {
let peer_id = PeerId::random();
let inbound_session_id = InboundSessionId::default();

simulate_listener_connection(&mut behaviour, peer_id);

simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone());
validate_new_inbound_session_event(&mut behaviour, &peer_id, inbound_session_id, &QUERY).await;
validate_no_events(&mut behaviour);
Expand Down Expand Up @@ -319,9 +315,11 @@ async fn create_and_process_outbound_session() {

let peer_id = PeerId::random();

simulate_connection_established(&mut behaviour, peer_id);
let outbound_session_id =
behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap();
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());

validate_request_peer_assignment_event(&mut behaviour, outbound_session_id).await;
validate_no_events(&mut behaviour);
simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id);

validate_create_outbound_session_event(&mut behaviour, &peer_id, &QUERY, &outbound_session_id)
.await;
Expand Down Expand Up @@ -351,17 +349,17 @@ async fn connection_closed() {

let peer_id = PeerId::random();

simulate_connection_established(&mut behaviour, peer_id);

let outbound_session_id =
behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap();

// Add an outbound session on the connection.
let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
// Consume the event to request peer assignment.
behaviour.next().await.unwrap();
simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id);
// Consume the event to create an outbound session.
behaviour.next().await.unwrap();

// Add an inbound session on the connection.
let inbound_session_id = InboundSessionId::default();
simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone());

// Consume the event to notify the user about the new inbound session.
behaviour.next().await.unwrap();

Expand Down Expand Up @@ -399,11 +397,10 @@ async fn drop_outbound_session() {

let peer_id = PeerId::random();

simulate_connection_established(&mut behaviour, peer_id);

let outbound_session_id =
behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap();

let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone());
// Consume the event to request peer assignment.
behaviour.next().await.unwrap();
simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id);
// Consume the event to create an outbound session.
behaviour.next().await.unwrap();

Expand Down Expand Up @@ -433,8 +430,6 @@ async fn drop_inbound_session() {
let peer_id = PeerId::random();
let inbound_session_id = InboundSessionId::default();

simulate_listener_connection(&mut behaviour, peer_id);

simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone());

// Consume the event that a new inbound session was created.
Expand Down Expand Up @@ -466,12 +461,3 @@ fn send_response_non_existing_session_fails() {
behaviour.send_response(response, InboundSessionId::default()).unwrap_err();
}
}

#[test]
fn send_query_peer_not_connected_fails() {
let mut behaviour = Behaviour::new(Config::get_test_config());

let peer_id = PeerId::random();

behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap_err();
}
Loading

0 comments on commit a271b99

Please sign in to comment.