diff --git a/crates/papyrus_network/src/sqmr/behaviour.rs b/crates/papyrus_network/src/sqmr/behaviour.rs index 0c646dfaad..f45879b1ba 100644 --- a/crates/papyrus_network/src/sqmr/behaviour.rs +++ b/crates/papyrus_network/src/sqmr/behaviour.rs @@ -9,9 +9,7 @@ use std::sync::Arc; use std::task::{Context, Poll, Waker}; use std::time::Duration; -use defaultmap::DefaultHashMap; use libp2p::core::Endpoint; -use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::{ ConnectionClosed, ConnectionDenied, @@ -110,8 +108,6 @@ pub struct PeerNotConnected; pub struct Behaviour { config: Config, pending_events: VecDeque>, - // TODO(shahak) Remove this once we remove send_query. - connection_ids_map: DefaultHashMap>, session_id_to_peer_id_and_connection_id: HashMap, next_outbound_session_id: OutboundSessionId, next_inbound_session_id: Arc, @@ -126,7 +122,6 @@ impl Behaviour { Self { config, pending_events: Default::default(), - connection_ids_map: Default::default(), session_id_to_peer_id_and_connection_id: Default::default(), next_outbound_session_id: Default::default(), next_inbound_session_id: Arc::new(Default::default()), @@ -137,37 +132,6 @@ impl Behaviour { } } - /// Send query to the given peer and start a new outbound session with it. Return the id of the - /// new session. - // TODO(shahak) Remove this function once Network manager uses start_query. - pub fn send_query( - &mut self, - query: Bytes, - peer_id: PeerId, - protocol_name: StreamProtocol, - ) -> Result { - let connection_id = - *self.connection_ids_map.get(peer_id).iter().next().ok_or(PeerNotConnected)?; - - let outbound_session_id = self.next_outbound_session_id; - self.next_outbound_session_id.value += 1; - - self.session_id_to_peer_id_and_connection_id - .insert(outbound_session_id.into(), (peer_id, connection_id)); - - self.add_event_to_queue(ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection_id), - event: RequestFromBehaviourEvent::CreateOutboundSession { - query, - outbound_session_id, - protocol_name, - }, - }); - - Ok(outbound_session_id) - } - /// Assign some peer and start a query. Return the id of the new session. pub fn start_query( &mut self, @@ -293,13 +257,6 @@ impl NetworkBehaviour for Behaviour { fn on_swarm_event(&mut self, event: FromSwarm<'_>) { match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - .. - }) => { - self.connection_ids_map.get_mut(peer_id).insert(connection_id); - } FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => { let mut session_ids = Vec::new(); self.session_id_to_peer_id_and_connection_id.retain( diff --git a/crates/papyrus_network/src/sqmr/behaviour_test.rs b/crates/papyrus_network/src/sqmr/behaviour_test.rs index 9bc73cfa49..b9f5f5e9d0 100644 --- a/crates/papyrus_network/src/sqmr/behaviour_test.rs +++ b/crates/papyrus_network/src/sqmr/behaviour_test.rs @@ -1,5 +1,3 @@ -// TODO(shahak): Use start_query in all tests instead of send_query - use std::pin::Pin; use std::task::{Context, Poll}; @@ -7,14 +5,15 @@ use assert_matches::assert_matches; use futures::{FutureExt, Stream, StreamExt}; use lazy_static::lazy_static; use libp2p::core::{ConnectedPoint, Endpoint}; -use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::{ConnectionClosed, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm}; use libp2p::{Multiaddr, PeerId, StreamProtocol}; use super::super::handler::{RequestFromBehaviourEvent, RequestToBehaviourEvent}; use super::super::{Bytes, Config, GenericEvent, InboundSessionId, OutboundSessionId, SessionId}; -use super::{Behaviour, Event, ExternalEvent, SessionError}; +use super::{Behaviour, Event, ExternalEvent, SessionError, ToOtherBehaviourEvent}; +use crate::mixed_behaviour::BridgedBehaviour; use crate::test_utils::dummy_data; +use crate::{mixed_behaviour, peer_manager}; impl Unpin for Behaviour {} @@ -34,37 +33,19 @@ lazy_static! { static ref PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/"); } -fn simulate_connection_established(behaviour: &mut Behaviour, peer_id: PeerId) { - let connection_id = ConnectionId::new_unchecked(0); - let address = Multiaddr::empty(); - let role_override = Endpoint::Dialer; - let _handler = behaviour - .handle_established_outbound_connection(connection_id, peer_id, &address, role_override) - .unwrap(); - behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint: &ConnectedPoint::Dialer { address, role_override }, - failed_addresses: &[], - other_established: 0, - })); -} - -fn simulate_listener_connection(behaviour: &mut Behaviour, peer_id: PeerId) { - let connection_id = ConnectionId::new_unchecked(0); - let address = Multiaddr::empty(); - let local_addr = Multiaddr::empty(); - let role_override = Endpoint::Listener; - let _handler = behaviour - .handle_established_outbound_connection(connection_id, peer_id, &address, role_override) - .unwrap(); - behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint: &ConnectedPoint::Listener { send_back_addr: address, local_addr }, - failed_addresses: &[], - other_established: 0, - })); +fn simulate_peer_assigned( + behaviour: &mut Behaviour, + peer_id: PeerId, + outbound_session_id: OutboundSessionId, +) { + behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::PeerManager( + peer_manager::ToOtherBehaviourEvent::SessionAssigned { + outbound_session_id, + peer_id, + // TODO(shahak): Add test with multiple connections + connection_id: ConnectionId::new_unchecked(0), + }, + )); } fn simulate_new_inbound_session( @@ -75,6 +56,7 @@ fn simulate_new_inbound_session( ) { behaviour.on_connection_handler_event( peer_id, + // This is the same connection_id from simulate_peer_assigned ConnectionId::new_unchecked(0), RequestToBehaviourEvent::GenerateEvent(GenericEvent::NewInboundSession { query, @@ -93,6 +75,7 @@ fn simulate_received_response( ) { behaviour.on_connection_handler_event( peer_id, + // This is the same connection_id from simulate_peer_assigned ConnectionId::new_unchecked(0), RequestToBehaviourEvent::GenerateEvent(GenericEvent::ReceivedResponse { response, @@ -109,6 +92,7 @@ fn simulate_session_finished_successfully( ) { behaviour.on_connection_handler_event( peer_id, + // This is the same connection_id from simulate_peer_assigned ConnectionId::new_unchecked(0), RequestToBehaviourEvent::GenerateEvent(GenericEvent::SessionFinishedSuccessfully { session_id, @@ -117,7 +101,7 @@ fn simulate_session_finished_successfully( } fn simulate_connection_closed(behaviour: &mut Behaviour, peer_id: PeerId) { - // This is the same connection_id from simulate_connection_established + // This is the same connection_id from simulate_peer_assigned let connection_id = ConnectionId::new_unchecked(0); behaviour.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, @@ -140,6 +124,20 @@ fn simulate_session_dropped(behaviour: &mut Behaviour, peer_id: PeerId, session_ ); } +async fn validate_request_peer_assignment_event( + behaviour: &mut Behaviour, + outbound_session_id: OutboundSessionId, +) { + let event = behaviour.next().await.unwrap(); + assert_matches!( + event, + ToSwarm::GenerateEvent(Event::ToOtherBehaviourEvent(ToOtherBehaviourEvent::RequestPeerAssignment { + outbound_session_id: event_outbound_session_id + }, + )) if outbound_session_id == event_outbound_session_id + ); +} + async fn validate_create_outbound_session_event( behaviour: &mut Behaviour, peer_id: &PeerId, @@ -280,8 +278,6 @@ async fn process_inbound_session() { let peer_id = PeerId::random(); let inbound_session_id = InboundSessionId::default(); - simulate_listener_connection(&mut behaviour, peer_id); - simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone()); validate_new_inbound_session_event(&mut behaviour, &peer_id, inbound_session_id, &QUERY).await; validate_no_events(&mut behaviour); @@ -319,9 +315,11 @@ async fn create_and_process_outbound_session() { let peer_id = PeerId::random(); - simulate_connection_established(&mut behaviour, peer_id); - let outbound_session_id = - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap(); + let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone()); + + validate_request_peer_assignment_event(&mut behaviour, outbound_session_id).await; + validate_no_events(&mut behaviour); + simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id); validate_create_outbound_session_event(&mut behaviour, &peer_id, &QUERY, &outbound_session_id) .await; @@ -351,17 +349,17 @@ async fn connection_closed() { let peer_id = PeerId::random(); - simulate_connection_established(&mut behaviour, peer_id); - - let outbound_session_id = - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap(); - + // Add an outbound session on the connection. + let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone()); + // Consume the event to request peer assignment. + behaviour.next().await.unwrap(); + simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id); // Consume the event to create an outbound session. behaviour.next().await.unwrap(); + // Add an inbound session on the connection. let inbound_session_id = InboundSessionId::default(); simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone()); - // Consume the event to notify the user about the new inbound session. behaviour.next().await.unwrap(); @@ -399,11 +397,10 @@ async fn drop_outbound_session() { let peer_id = PeerId::random(); - simulate_connection_established(&mut behaviour, peer_id); - - let outbound_session_id = - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap(); - + let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone()); + // Consume the event to request peer assignment. + behaviour.next().await.unwrap(); + simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id); // Consume the event to create an outbound session. behaviour.next().await.unwrap(); @@ -433,8 +430,6 @@ async fn drop_inbound_session() { let peer_id = PeerId::random(); let inbound_session_id = InboundSessionId::default(); - simulate_listener_connection(&mut behaviour, peer_id); - simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone()); // Consume the event that a new inbound session was created. @@ -466,12 +461,3 @@ fn send_response_non_existing_session_fails() { behaviour.send_response(response, InboundSessionId::default()).unwrap_err(); } } - -#[test] -fn send_query_peer_not_connected_fails() { - let mut behaviour = Behaviour::new(Config::get_test_config()); - - let peer_id = PeerId::random(); - - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap_err(); -} diff --git a/crates/papyrus_network/src/sqmr/flow_test.rs b/crates/papyrus_network/src/sqmr/flow_test.rs index 820ab8cfe6..619bdafeb5 100644 --- a/crates/papyrus_network/src/sqmr/flow_test.rs +++ b/crates/papyrus_network/src/sqmr/flow_test.rs @@ -5,13 +5,15 @@ use std::time::Duration; use defaultmap::DefaultHashMap; use futures::StreamExt; -use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; +use libp2p::swarm::{ConnectionId, NetworkBehaviour, SwarmEvent}; use libp2p::{PeerId, StreamProtocol, Swarm}; -use super::behaviour::{Behaviour, Event, ExternalEvent}; +use super::behaviour::{Behaviour, Event, ExternalEvent, ToOtherBehaviourEvent}; use super::{Bytes, Config, InboundSessionId, OutboundSessionId, SessionId}; +use crate::mixed_behaviour::BridgedBehaviour; use crate::test_utils::create_fully_connected_swarms_stream; use crate::utils::StreamHashMap; +use crate::{mixed_behaviour, peer_manager}; const NUM_PEERS: usize = 3; const NUM_MESSAGES_PER_SESSION: usize = 5; @@ -59,23 +61,36 @@ fn perform_action_on_swarms( } } -fn send_query_and_update_map( +fn start_query_and_update_map( outbound_swarm: &mut Swarm, inbound_peer_id: PeerId, outbound_session_id_to_peer_id: &mut HashMap<(PeerId, OutboundSessionId), PeerId>, ) { let outbound_peer_id = *outbound_swarm.local_peer_id(); - let outbound_session_id = outbound_swarm - .behaviour_mut() - .send_query( - get_bytes_from_query_indices(outbound_peer_id, inbound_peer_id), - inbound_peer_id, - PROTOCOL_NAME, - ) - .unwrap(); + let outbound_session_id = outbound_swarm.behaviour_mut().start_query( + get_bytes_from_query_indices(outbound_peer_id, inbound_peer_id), + PROTOCOL_NAME, + ); outbound_session_id_to_peer_id.insert((outbound_peer_id, outbound_session_id), inbound_peer_id); } +fn assign_peer_to_outbound_session( + outbound_swarm: &mut Swarm, + inbound_peer_id: PeerId, + outbound_session_id: OutboundSessionId, + connection_id: ConnectionId, +) { + outbound_swarm.behaviour_mut().on_other_behaviour_event( + &mixed_behaviour::ToOtherBehaviourEvent::PeerManager( + peer_manager::ToOtherBehaviourEvent::SessionAssigned { + outbound_session_id, + peer_id: inbound_peer_id, + connection_id, + }, + ), + ); +} + fn send_response( inbound_swarm: &mut Swarm, outbound_peer_id: PeerId, @@ -105,6 +120,25 @@ fn close_inbound_session( .unwrap(); } +fn check_request_peer_assignment_event_and_return_session_id( + outbound_peer_id: PeerId, + swarm_event: SwarmEventAlias, + outbound_session_id_to_peer_id: &HashMap<(PeerId, OutboundSessionId), PeerId>, +) -> Option<(PeerId, OutboundSessionId)> { + let SwarmEvent::Behaviour(event) = swarm_event else { + return None; + }; + let Event::ToOtherBehaviourEvent(ToOtherBehaviourEvent::RequestPeerAssignment { + outbound_session_id, + }) = event + else { + panic!("Got unexpected event {:?} when expecting RequestPeerAssignment", event); + }; + let assigned_peer_id = + *outbound_session_id_to_peer_id.get(&(outbound_peer_id, outbound_session_id)).unwrap(); + Some((assigned_peer_id, outbound_session_id)) +} + fn check_new_inbound_session_event_and_return_id( inbound_peer_id: PeerId, swarm_event: SwarmEventAlias, @@ -188,15 +222,16 @@ fn get_response_from_indices(peer_id1: PeerId, peer_id2: PeerId, message_index: #[tokio::test] async fn everyone_sends_to_everyone() { - let mut swarms_stream = create_fully_connected_swarms_stream(NUM_PEERS, || { - let mut behaviour = Behaviour::new(Config { session_timeout: Duration::from_secs(5) }); - let supported_inbound_protocols = vec![PROTOCOL_NAME, OTHER_PROTOCOL_NAME]; - for protocol in supported_inbound_protocols { - behaviour.add_new_supported_inbound_protocol(protocol); - } - behaviour - }) - .await; + let (mut swarms_stream, connection_ids) = + create_fully_connected_swarms_stream(NUM_PEERS, || { + let mut behaviour = Behaviour::new(Config { session_timeout: Duration::from_secs(5) }); + let supported_inbound_protocols = vec![PROTOCOL_NAME, OTHER_PROTOCOL_NAME]; + for protocol in supported_inbound_protocols { + behaviour.add_new_supported_inbound_protocol(protocol); + } + behaviour + }) + .await; let peer_ids = swarms_stream.keys().copied().collect::>(); @@ -205,7 +240,7 @@ async fn everyone_sends_to_everyone() { &mut swarms_stream, &peer_ids, &mut |outbound_swarm, inbound_peer_id| { - send_query_and_update_map( + start_query_and_update_map( outbound_swarm, inbound_peer_id, &mut outbound_session_id_to_peer_id, @@ -213,6 +248,35 @@ async fn everyone_sends_to_everyone() { }, ); + let peers_to_outbound_session_id = collect_events_from_swarms( + &mut swarms_stream, + |peer_id, event| { + check_request_peer_assignment_event_and_return_session_id( + peer_id, + event, + &outbound_session_id_to_peer_id, + ) + }, + true, + ) + .await; + perform_action_on_swarms( + &mut swarms_stream, + &peer_ids, + &mut |outbound_swarm, inbound_peer_id| { + let outbound_peer_id = *outbound_swarm.local_peer_id(); + let outbound_session_id = + *peers_to_outbound_session_id.get(&(outbound_peer_id, inbound_peer_id)).unwrap(); + let connection_id = *connection_ids.get(&(outbound_peer_id, inbound_peer_id)).unwrap(); + assign_peer_to_outbound_session( + outbound_swarm, + inbound_peer_id, + outbound_session_id, + connection_id, + ) + }, + ); + let inbound_session_ids = collect_events_from_swarms( &mut swarms_stream, check_new_inbound_session_event_and_return_id, diff --git a/crates/papyrus_network/src/test_utils/mod.rs b/crates/papyrus_network/src/test_utils/mod.rs index fc54f66823..9afbb35347 100644 --- a/crates/papyrus_network/src/test_utils/mod.rs +++ b/crates/papyrus_network/src/test_utils/mod.rs @@ -1,15 +1,16 @@ mod get_stream; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::pin::Pin; use std::task::{ready, Context, Poll}; use std::time::Duration; -use futures::future::Future; +use futures::future::{Either, Future}; use futures::pin_mut; use futures::stream::Stream as StreamTrait; -use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::{ConnectionId, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::{PeerId, Stream, StreamProtocol}; use libp2p_swarm_test::SwarmExt; use tokio::sync::Mutex; @@ -60,11 +61,11 @@ impl crate::sqmr::handler::Handler { } /// Create num_swarms swarms and connect each pair of swarms. Return them as a combined stream of -/// events. +/// events. Also return all the connection ids of the created connections pub(crate) async fn create_fully_connected_swarms_stream( num_swarms: usize, behaviour_gen: impl Fn() -> TBehaviour, -) -> StreamHashMap> +) -> (StreamHashMap>, HashMap<(PeerId, PeerId), ConnectionId>) where ::ToSwarm: Debug, { @@ -75,15 +76,77 @@ where swarm.listen().with_memory_addr_external().await; } + let mut connection_ids = HashMap::new(); + for i in 0..(swarms.len() - 1) { let (swarms1, swarms2) = swarms.split_at_mut(i + 1); let swarm1 = &mut swarms1[i]; + let peer_id1 = *swarm1.local_peer_id(); for swarm2 in swarms2 { - swarm1.connect(swarm2).await; + let (connection_id1, connection_id2) = connect_swarms(swarm1, swarm2).await; + let peer_id2 = *swarm2.local_peer_id(); + connection_ids.insert((peer_id1, peer_id2), connection_id1); + connection_ids.insert((peer_id2, peer_id1), connection_id2); } } - StreamHashMap::new(swarms.into_iter().map(|swarm| (*swarm.local_peer_id(), swarm)).collect()) + ( + StreamHashMap::new( + swarms.into_iter().map(|swarm| (*swarm.local_peer_id(), swarm)).collect(), + ), + connection_ids, + ) +} + +// Copied from SwarmExt::connect, but this function returns the connection id. +/// Connect two swarms and return the connection id that each swarm gave to this connection. +async fn connect_swarms( + swarm1: &mut Swarm, + swarm2: &mut Swarm, +) -> (ConnectionId, ConnectionId) +where + ::ToSwarm: Debug, +{ + let external_addresses = swarm2.external_addresses().cloned().collect(); + + let dial_opts = DialOpts::peer_id(*swarm2.local_peer_id()) + .addresses(external_addresses) + .condition(PeerCondition::Always) + .build(); + + swarm1.dial(dial_opts).unwrap(); + + let mut dialer_connection_id = None; + let mut listener_connection_id = None; + + loop { + match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await { + Either::Left((SwarmEvent::ConnectionEstablished { connection_id, .. }, _)) => { + dialer_connection_id = Some(connection_id); + } + Either::Right((SwarmEvent::ConnectionEstablished { connection_id, .. }, _)) => { + listener_connection_id = Some(connection_id); + } + Either::Left((swarm2, _)) => { + tracing::debug!( + dialer=?swarm2, + "Ignoring event from dialer" + ); + } + Either::Right((swarm2, _)) => { + tracing::debug!( + listener=?swarm2, + "Ignoring event from listener" + ); + } + } + + if let Some((dialer_connection_id, listener_connection_id)) = + dialer_connection_id.zip(listener_connection_id) + { + return (dialer_connection_id, listener_connection_id); + } + } } // I tried making this generic on the async function we run, but it caused a lot of lifetime