Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

fix(network): nake behaviour generic #1152

Merged
merged 1 commit into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions crates/papyrus_network/src/get_blocks/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,66 +21,73 @@ use libp2p::swarm::{
ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use prost::Message;

use super::handler::{Handler, NewRequestEvent};
use super::RequestId;
use crate::messages::block::GetBlocks;
use super::handler::{Handler, NewQueryEvent};
use super::{InboundSessionId, OutboundSessionId};

#[derive(Debug)]
pub enum Event {
// TODO(shahak): Implement.
pub enum Event<Query: Message, Data: Message> {
NewInboundQuery { query: Query, inbound_session_id: InboundSessionId },
RecievedData { data: Data, outbound_session_id: OutboundSessionId },
}

pub struct Behaviour {
pub struct Behaviour<Query: Message + Clone, Data: Message> {
substream_timeout: Duration,
pending_events: VecDeque<ToSwarm<Event, NewRequestEvent>>,
pending_requests: DefaultHashMap<PeerId, Vec<(GetBlocks, RequestId)>>,
pending_events: VecDeque<ToSwarm<Event<Query, Data>, NewQueryEvent<Query>>>,
pending_queries: DefaultHashMap<PeerId, Vec<(Query, OutboundSessionId)>>,
connected_peers: HashSet<PeerId>,
next_request_id: RequestId,
next_outbound_session_id: OutboundSessionId,
}

impl Behaviour {
impl<Query: Message + Clone, Data: Message> Behaviour<Query, Data> {
pub fn new(substream_timeout: Duration) -> Self {
Self {
substream_timeout,
pending_events: Default::default(),
pending_requests: Default::default(),
pending_queries: Default::default(),
connected_peers: Default::default(),
next_request_id: Default::default(),
next_outbound_session_id: Default::default(),
}
}

pub fn send_request(&mut self, request: GetBlocks, peer_id: PeerId) -> RequestId {
let request_id = self.next_request_id;
self.next_request_id.0 += 1;
pub fn send_query(&mut self, query: Query, peer_id: PeerId) -> OutboundSessionId {
let outbound_session_id = self.next_outbound_session_id;
self.next_outbound_session_id.value += 1;
if self.connected_peers.contains(&peer_id) {
self.send_request_to_handler(peer_id, request, request_id);
return request_id;
self.send_query_to_handler(peer_id, query, outbound_session_id);
return outbound_session_id;
}
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id).condition(PeerCondition::Disconnected).build(),
});
self.pending_requests.get_mut(peer_id).push((request, request_id));
request_id
self.pending_queries.get_mut(peer_id).push((query, outbound_session_id));
outbound_session_id
}

fn send_request_to_handler(
pub fn send_data(&mut self, _data: Data, _inbound_session_id: InboundSessionId) {
unimplemented!();
}

fn send_query_to_handler(
&mut self,
peer_id: PeerId,
request: GetBlocks,
request_id: RequestId,
query: Query,
outbound_session_id: OutboundSessionId,
) {
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: NewRequestEvent { request, request_id },
event: NewQueryEvent { query, outbound_session_id },
});
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type ToSwarm = Event;
impl<Query: Message + 'static + Clone, Data: Message + 'static + Default> NetworkBehaviour
for Behaviour<Query, Data>
{
type ConnectionHandler = Handler<Query, Data>;
type ToSwarm = Event<Query, Data>;

fn handle_established_inbound_connection(
&mut self,
Expand All @@ -106,9 +113,9 @@ impl NetworkBehaviour for Behaviour {
match event {
FromSwarm::ConnectionEstablished(connection_established) => {
let ConnectionEstablished { peer_id, .. } = connection_established;
if let Some(requests) = self.pending_requests.remove(&peer_id) {
for (request, request_id) in requests.into_iter() {
self.send_request_to_handler(peer_id, request, request_id);
if let Some(queries) = self.pending_queries.remove(&peer_id) {
for (query, outbound_session_id) in queries.into_iter() {
self.send_query_to_handler(peer_id, query, outbound_session_id);
}
}
}
Expand Down
59 changes: 39 additions & 20 deletions crates/papyrus_network/src/get_blocks/behaviour_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, ToSwarm};
use libp2p::{Multiaddr, PeerId};
use prost::Message;

use super::super::handler::NewRequestEvent;
use super::super::handler::NewQueryEvent;
use super::super::protocol::PROTOCOL_NAME;
use super::super::RequestId;
use super::super::OutboundSessionId;
use super::{Behaviour, Event};
use crate::messages::block::GetBlocks;
use crate::messages::block::{GetBlocks, GetBlocksResponse};

pub struct GetBlocksPollParameters {}

Expand All @@ -25,10 +26,12 @@ impl PollParameters for GetBlocksPollParameters {
}
}

impl Unpin for Behaviour {}
impl<Query: Message + Clone, Data: Message> Unpin for Behaviour<Query, Data> {}

impl Stream for Behaviour {
type Item = ToSwarm<Event, NewRequestEvent>;
impl<Query: Message + Clone + 'static, Data: Message + Default + 'static> Stream
for Behaviour<Query, Data>
{
type Item = ToSwarm<Event<Query, Data>, NewQueryEvent<Query>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::into_inner(self).poll(cx, &mut GetBlocksPollParameters {}) {
Expand All @@ -40,45 +43,56 @@ impl Stream for Behaviour {

const SUBSTREAM_TIMEOUT: Duration = Duration::MAX;

fn validate_no_events(behaviour: &mut Behaviour) {
fn validate_no_events<Query: Message + Clone + 'static, Data: Message + Default + 'static>(
behaviour: &mut Behaviour<Query, Data>,
) {
assert!(behaviour.next().now_or_never().is_none());
}

async fn validate_next_event_dial(behaviour: &mut Behaviour, peer_id: &PeerId) {
async fn validate_next_event_dial<
Query: Message + Clone + 'static,
Data: Message + Default + 'static,
>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
) {
let event = behaviour.next().await.unwrap();
let ToSwarm::Dial { opts } = event else {
panic!("Got unexpected event");
};
assert_eq!(*peer_id, opts.get_peer_id().unwrap());
}

async fn validate_next_event_send_request_to_handler(
behaviour: &mut Behaviour,
async fn validate_next_event_send_query_to_handler<
Query: Message + Clone + PartialEq + 'static,
Data: Message + Default + 'static,
>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
request: &GetBlocks,
request_id: &RequestId,
query: &Query,
outbound_session_id: &OutboundSessionId,
) {
let event = behaviour.next().await.unwrap();
assert_matches!(
event,
ToSwarm::NotifyHandler {
peer_id: other_peer_id,
event: NewRequestEvent { request: other_request, request_id: other_request_id },
event: NewQueryEvent::<Query> { query: other_query, outbound_session_id: other_outbound_session_id },
..
} if *peer_id == other_peer_id
&& *request_id == other_request_id
&& *request == other_request
&& *outbound_session_id == other_outbound_session_id
&& *query == other_query
);
}

#[tokio::test]
async fn send_and_process_request() {
let mut behaviour = Behaviour::new(SUBSTREAM_TIMEOUT);
let mut behaviour = Behaviour::<GetBlocks, GetBlocksResponse>::new(SUBSTREAM_TIMEOUT);

let request = GetBlocks::default();
let query = GetBlocks::default();
let peer_id = PeerId::random();

let request_id = behaviour.send_request(request.clone(), peer_id);
let outbound_session_id = behaviour.send_query(query.clone(), peer_id);
validate_next_event_dial(&mut behaviour, &peer_id).await;
validate_no_events(&mut behaviour);

Expand All @@ -95,8 +109,13 @@ async fn send_and_process_request() {
failed_addresses: &[],
other_established: 0,
}));
validate_next_event_send_request_to_handler(&mut behaviour, &peer_id, &request, &request_id)
.await;
validate_next_event_send_query_to_handler(
&mut behaviour,
&peer_id,
&query,
&outbound_session_id,
)
.await;
validate_no_events(&mut behaviour);

// TODO(shahak): Send responses from the handler.
Expand Down
Loading