Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
fix(network): nake behaviour generic (#1152)
Browse files Browse the repository at this point in the history
  • Loading branch information
nagmo-starkware authored Sep 10, 2023
1 parent 14402de commit 9336578
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 170 deletions.
65 changes: 36 additions & 29 deletions crates/papyrus_network/src/get_blocks/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,66 +21,73 @@ use libp2p::swarm::{
ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use prost::Message;

use super::handler::{Handler, NewRequestEvent};
use super::RequestId;
use crate::messages::block::GetBlocks;
use super::handler::{Handler, NewQueryEvent};
use super::{InboundSessionId, OutboundSessionId};

#[derive(Debug)]
pub enum Event {
// TODO(shahak): Implement.
pub enum Event<Query: Message, Data: Message> {
NewInboundQuery { query: Query, inbound_session_id: InboundSessionId },
RecievedData { data: Data, outbound_session_id: OutboundSessionId },
}

pub struct Behaviour {
pub struct Behaviour<Query: Message + Clone, Data: Message> {
substream_timeout: Duration,
pending_events: VecDeque<ToSwarm<Event, NewRequestEvent>>,
pending_requests: DefaultHashMap<PeerId, Vec<(GetBlocks, RequestId)>>,
pending_events: VecDeque<ToSwarm<Event<Query, Data>, NewQueryEvent<Query>>>,
pending_queries: DefaultHashMap<PeerId, Vec<(Query, OutboundSessionId)>>,
connected_peers: HashSet<PeerId>,
next_request_id: RequestId,
next_outbound_session_id: OutboundSessionId,
}

impl Behaviour {
impl<Query: Message + Clone, Data: Message> Behaviour<Query, Data> {
pub fn new(substream_timeout: Duration) -> Self {
Self {
substream_timeout,
pending_events: Default::default(),
pending_requests: Default::default(),
pending_queries: Default::default(),
connected_peers: Default::default(),
next_request_id: Default::default(),
next_outbound_session_id: Default::default(),
}
}

pub fn send_request(&mut self, request: GetBlocks, peer_id: PeerId) -> RequestId {
let request_id = self.next_request_id;
self.next_request_id.0 += 1;
pub fn send_query(&mut self, query: Query, peer_id: PeerId) -> OutboundSessionId {
let outbound_session_id = self.next_outbound_session_id;
self.next_outbound_session_id.value += 1;
if self.connected_peers.contains(&peer_id) {
self.send_request_to_handler(peer_id, request, request_id);
return request_id;
self.send_query_to_handler(peer_id, query, outbound_session_id);
return outbound_session_id;
}
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(peer_id).condition(PeerCondition::Disconnected).build(),
});
self.pending_requests.get_mut(peer_id).push((request, request_id));
request_id
self.pending_queries.get_mut(peer_id).push((query, outbound_session_id));
outbound_session_id
}

fn send_request_to_handler(
pub fn send_data(&mut self, _data: Data, _inbound_session_id: InboundSessionId) {
unimplemented!();
}

fn send_query_to_handler(
&mut self,
peer_id: PeerId,
request: GetBlocks,
request_id: RequestId,
query: Query,
outbound_session_id: OutboundSessionId,
) {
self.pending_events.push_back(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event: NewRequestEvent { request, request_id },
event: NewQueryEvent { query, outbound_session_id },
});
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = Handler;
type ToSwarm = Event;
impl<Query: Message + 'static + Clone, Data: Message + 'static + Default> NetworkBehaviour
for Behaviour<Query, Data>
{
type ConnectionHandler = Handler<Query, Data>;
type ToSwarm = Event<Query, Data>;

fn handle_established_inbound_connection(
&mut self,
Expand All @@ -106,9 +113,9 @@ impl NetworkBehaviour for Behaviour {
match event {
FromSwarm::ConnectionEstablished(connection_established) => {
let ConnectionEstablished { peer_id, .. } = connection_established;
if let Some(requests) = self.pending_requests.remove(&peer_id) {
for (request, request_id) in requests.into_iter() {
self.send_request_to_handler(peer_id, request, request_id);
if let Some(queries) = self.pending_queries.remove(&peer_id) {
for (query, outbound_session_id) in queries.into_iter() {
self.send_query_to_handler(peer_id, query, outbound_session_id);
}
}
}
Expand Down
59 changes: 39 additions & 20 deletions crates/papyrus_network/src/get_blocks/behaviour_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::{ConnectionId, FromSwarm, NetworkBehaviour, PollParameters, ToSwarm};
use libp2p::{Multiaddr, PeerId};
use prost::Message;

use super::super::handler::NewRequestEvent;
use super::super::handler::NewQueryEvent;
use super::super::protocol::PROTOCOL_NAME;
use super::super::RequestId;
use super::super::OutboundSessionId;
use super::{Behaviour, Event};
use crate::messages::block::GetBlocks;
use crate::messages::block::{GetBlocks, GetBlocksResponse};

pub struct GetBlocksPollParameters {}

Expand All @@ -25,10 +26,12 @@ impl PollParameters for GetBlocksPollParameters {
}
}

impl Unpin for Behaviour {}
impl<Query: Message + Clone, Data: Message> Unpin for Behaviour<Query, Data> {}

impl Stream for Behaviour {
type Item = ToSwarm<Event, NewRequestEvent>;
impl<Query: Message + Clone + 'static, Data: Message + Default + 'static> Stream
for Behaviour<Query, Data>
{
type Item = ToSwarm<Event<Query, Data>, NewQueryEvent<Query>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::into_inner(self).poll(cx, &mut GetBlocksPollParameters {}) {
Expand All @@ -40,45 +43,56 @@ impl Stream for Behaviour {

const SUBSTREAM_TIMEOUT: Duration = Duration::MAX;

fn validate_no_events(behaviour: &mut Behaviour) {
fn validate_no_events<Query: Message + Clone + 'static, Data: Message + Default + 'static>(
behaviour: &mut Behaviour<Query, Data>,
) {
assert!(behaviour.next().now_or_never().is_none());
}

async fn validate_next_event_dial(behaviour: &mut Behaviour, peer_id: &PeerId) {
async fn validate_next_event_dial<
Query: Message + Clone + 'static,
Data: Message + Default + 'static,
>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
) {
let event = behaviour.next().await.unwrap();
let ToSwarm::Dial { opts } = event else {
panic!("Got unexpected event");
};
assert_eq!(*peer_id, opts.get_peer_id().unwrap());
}

async fn validate_next_event_send_request_to_handler(
behaviour: &mut Behaviour,
async fn validate_next_event_send_query_to_handler<
Query: Message + Clone + PartialEq + 'static,
Data: Message + Default + 'static,
>(
behaviour: &mut Behaviour<Query, Data>,
peer_id: &PeerId,
request: &GetBlocks,
request_id: &RequestId,
query: &Query,
outbound_session_id: &OutboundSessionId,
) {
let event = behaviour.next().await.unwrap();
assert_matches!(
event,
ToSwarm::NotifyHandler {
peer_id: other_peer_id,
event: NewRequestEvent { request: other_request, request_id: other_request_id },
event: NewQueryEvent::<Query> { query: other_query, outbound_session_id: other_outbound_session_id },
..
} if *peer_id == other_peer_id
&& *request_id == other_request_id
&& *request == other_request
&& *outbound_session_id == other_outbound_session_id
&& *query == other_query
);
}

#[tokio::test]
async fn send_and_process_request() {
let mut behaviour = Behaviour::new(SUBSTREAM_TIMEOUT);
let mut behaviour = Behaviour::<GetBlocks, GetBlocksResponse>::new(SUBSTREAM_TIMEOUT);

let request = GetBlocks::default();
let query = GetBlocks::default();
let peer_id = PeerId::random();

let request_id = behaviour.send_request(request.clone(), peer_id);
let outbound_session_id = behaviour.send_query(query.clone(), peer_id);
validate_next_event_dial(&mut behaviour, &peer_id).await;
validate_no_events(&mut behaviour);

Expand All @@ -95,8 +109,13 @@ async fn send_and_process_request() {
failed_addresses: &[],
other_established: 0,
}));
validate_next_event_send_request_to_handler(&mut behaviour, &peer_id, &request, &request_id)
.await;
validate_next_event_send_query_to_handler(
&mut behaviour,
&peer_id,
&query,
&outbound_session_id,
)
.await;
validate_no_events(&mut behaviour);

// TODO(shahak): Send responses from the handler.
Expand Down
Loading

0 comments on commit 9336578

Please sign in to comment.