Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
feat(network): restore commented behaviour test (#1220)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored and nagmo-starkware committed Oct 2, 2023
1 parent 511c0f4 commit 3916951
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 49 deletions.
18 changes: 10 additions & 8 deletions crates/papyrus_network/src/streamed_data_protocol/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// #[cfg(test)]
// #[path = "behaviour_test.rs"]
// mod behaviour_test;
#[cfg(test)]
#[path = "behaviour_test.rs"]
mod behaviour_test;

use std::collections::{HashSet, VecDeque};
use std::io;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

Expand Down Expand Up @@ -50,6 +52,7 @@ pub(crate) struct Behaviour<Query: QueryBound, Data: DataBound> {
pending_queries: DefaultHashMap<PeerId, Vec<(Query, OutboundSessionId)>>,
connected_peers: HashSet<PeerId>,
next_outbound_session_id: OutboundSessionId,
next_inbound_session_id: Arc<AtomicUsize>,
}

// TODO(shahak) remove allow dead code.
Expand All @@ -62,6 +65,7 @@ impl<Query: QueryBound, Data: DataBound> Behaviour<Query, Data> {
pending_queries: Default::default(),
connected_peers: Default::default(),
next_outbound_session_id: Default::default(),
next_inbound_session_id: Arc::new(Default::default()),
}
}

Expand Down Expand Up @@ -123,8 +127,7 @@ impl<Query: QueryBound, Data: DataBound> NetworkBehaviour for Behaviour<Query, D
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
// Ok(Handler::new(self.substream_timeout))
unimplemented!();
Ok(Handler::new(self.substream_timeout, self.next_inbound_session_id.clone()))
}

fn handle_established_outbound_connection(
Expand All @@ -134,8 +137,7 @@ impl<Query: QueryBound, Data: DataBound> NetworkBehaviour for Behaviour<Query, D
_addr: &Multiaddr,
_role_override: Endpoint,
) -> Result<Self::ConnectionHandler, ConnectionDenied> {
// Ok(Handler::new(self.substream_timeout))
unimplemented!();
Ok(Handler::new(self.substream_timeout, self.next_inbound_session_id.clone()))
}

fn on_swarm_event(&mut self, event: FromSwarm<'_, Self::ConnectionHandler>) {
Expand All @@ -160,7 +162,7 @@ impl<Query: QueryBound, Data: DataBound> NetworkBehaviour for Behaviour<Query, D
_connection_id: ConnectionId,
_event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
) {
// TODO(shahak): Implement.
unimplemented!();
}

fn poll(
Expand Down
75 changes: 34 additions & 41 deletions crates/papyrus_network/src/streamed_data_protocol/behaviour_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, ToSwarm};
use libp2p::{Multiaddr, PeerId};
use prost::Message;

use super::super::handler::NewQueryEvent;
use super::super::handler::RequestFromBehaviourEvent;
use super::super::protocol::PROTOCOL_NAME;
use super::super::OutboundSessionId;
use super::super::{DataBound, OutboundSessionId, QueryBound};
use super::{Behaviour, Event};
use crate::messages::block::{GetBlocks, GetBlocksResponse};

Expand All @@ -26,12 +25,10 @@ impl PollParameters for GetBlocksPollParameters {
}
}

impl<Query: Message + Clone, Data: Message> Unpin for Behaviour<Query, Data> {}
impl<Query: QueryBound, Data: DataBound> Unpin for Behaviour<Query, Data> {}

impl<Query: Message + Clone + 'static, Data: Message + Default + 'static> Stream
for Behaviour<Query, Data>
{
type Item = ToSwarm<Event<Query, Data>, NewQueryEvent<Query>>;
impl<Query: QueryBound, Data: DataBound> Stream for Behaviour<Query, Data> {
type Item = ToSwarm<Event<Query, Data>, RequestFromBehaviourEvent<Query, Data>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::into_inner(self).poll(cx, &mut GetBlocksPollParameters {}) {
Expand All @@ -43,16 +40,26 @@ impl<Query: Message + Clone + 'static, Data: Message + Default + 'static> Stream

const SUBSTREAM_TIMEOUT: Duration = Duration::MAX;

fn validate_no_events<Query: Message + Clone + 'static, Data: Message + Default + 'static>(
fn simulate_dial_finished_from_swarm<Query: QueryBound, Data: DataBound>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
) {
assert!(behaviour.next().now_or_never().is_none());
let connection_id = ConnectionId::new_unchecked(0);
let address = Multiaddr::empty();
let role_override = Endpoint::Dialer;
let _handler = behaviour
.handle_established_outbound_connection(connection_id, *peer_id, &address, role_override)
.unwrap();
behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: *peer_id,
connection_id,
endpoint: &ConnectedPoint::Dialer { address, role_override },
failed_addresses: &[],
other_established: 0,
}));
}

async fn validate_next_event_dial<
Query: Message + Clone + 'static,
Data: Message + Default + 'static,
>(
async fn validate_dial_event<Query: QueryBound, Data: DataBound>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
) {
Expand All @@ -63,10 +70,7 @@ async fn validate_next_event_dial<
assert_eq!(*peer_id, opts.get_peer_id().unwrap());
}

async fn validate_next_event_send_query_to_handler<
Query: Message + Clone + PartialEq + 'static,
Data: Message + Default + 'static,
>(
async fn validate_create_outbound_session_event<Query: QueryBound + PartialEq, Data: DataBound>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
query: &Query,
Expand All @@ -77,16 +81,21 @@ async fn validate_next_event_send_query_to_handler<
event,
ToSwarm::NotifyHandler {
peer_id: other_peer_id,
event: NewQueryEvent::<Query> { query: other_query, outbound_session_id: other_outbound_session_id },
event: RequestFromBehaviourEvent::CreateOutboundSession { query: other_query, outbound_session_id: other_outbound_session_id },
..
} if *peer_id == other_peer_id
&& *outbound_session_id == other_outbound_session_id
&& *query == other_query
);
}

// TODO(shahak): Fix code duplication with handler test.
fn validate_no_events<Query: QueryBound, Data: DataBound>(behaviour: &mut Behaviour<Query, Data>) {
assert!(behaviour.next().now_or_never().is_none());
}

#[tokio::test]
async fn send_and_process_request() {
async fn create_and_process_outbound_session() {
let mut behaviour = Behaviour::<GetBlocks, GetBlocksResponse>::new(SUBSTREAM_TIMEOUT);

// TODO(shahak): Change to GetBlocks::default() when the bug that forbids sending default
Expand All @@ -95,29 +104,13 @@ async fn send_and_process_request() {
let peer_id = PeerId::random();

let outbound_session_id = behaviour.send_query(query.clone(), peer_id);
validate_next_event_dial(&mut behaviour, &peer_id).await;
validate_dial_event(&mut behaviour, &peer_id).await;
validate_no_events(&mut behaviour);

let connection_id = ConnectionId::new_unchecked(0);
let address = Multiaddr::empty();
let role_override = Endpoint::Dialer;
let _handler = behaviour
.handle_established_outbound_connection(connection_id, peer_id, &address, role_override)
.unwrap();
behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint: &ConnectedPoint::Dialer { address, role_override },
failed_addresses: &[],
other_established: 0,
}));
validate_next_event_send_query_to_handler(
&mut behaviour,
&peer_id,
&query,
&outbound_session_id,
)
.await;
simulate_dial_finished_from_swarm(&mut behaviour, &peer_id);

validate_create_outbound_session_event(&mut behaviour, &peer_id, &query, &outbound_session_id)
.await;
validate_no_events(&mut behaviour);

// TODO(shahak): Send responses from the handler.
Expand Down

0 comments on commit 3916951

Please sign in to comment.