Skip to content

Commit

Permalink
feat(network): implement custom stream protocol (#391)
Browse files Browse the repository at this point in the history
* feat(network): implement custom stream protocol

* chore(network): compress open stream command handler
  • Loading branch information
fbozic authored Jun 19, 2024
1 parent 25a6cf5 commit da96058
Show file tree
Hide file tree
Showing 10 changed files with 386 additions and 74 deletions.
176 changes: 105 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ axum = "0.7.4"
base64 = "0.22.0"
borsh = "1.3.1"
bs58 = "0.5.0"
bytes = "1.6.0"
camino = "1.1.6"
chrono = "0.4.37"
clap = "4.4.18"
Expand All @@ -45,6 +46,7 @@ fragile = "2.0.0"
futures-util = "0.3.30"
hex = "0.4.3"
libp2p = "0.53.2"
libp2p-stream = "0.1.0-alpha.1"
multiaddr = "0.18.1"
multibase = "0.9.1"
near-jsonrpc-client = "0.8.0"
Expand All @@ -69,6 +71,8 @@ serde_json = "1.0.113"
syn = "2.0"
thiserror = "1.0.56"
tokio = "1.35.1"
tokio-test = "0.4.4"
tokio-util = "0.7.11"
toml = "0.8.9"
tower = "0.4.13"
tower-http = "0.5.2"
Expand Down
8 changes: 8 additions & 0 deletions crates/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ repository.workspace = true
license.workspace = true

[dependencies]
bytes.workspace = true
eyre.workspace = true
futures-util.workspace = true
libp2p = { workspace = true, features = ["full"] }
libp2p-stream.workspace = true
multiaddr.workspace = true
owo-colors.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util = { workspace = true, features = ["codec", "compat"] }
tracing.workspace = true

calimero-node-primitives = { path = "../node-primitives" }
calimero-primitives = { path = "../primitives" }

[dev-dependencies]
tokio-test.workspace = true
13 changes: 12 additions & 1 deletion crates/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use libp2p::{gossipsub, Multiaddr, PeerId};
use tokio::sync::{mpsc, oneshot};

use crate::Command;
use crate::{stream, Command};

#[derive(Clone)]
pub struct NetworkClient {
Expand Down Expand Up @@ -63,6 +63,17 @@ impl NetworkClient {
receiver.await.expect("Sender not to be dropped.")
}

pub async fn open_stream(&self, peer_id: PeerId) -> eyre::Result<stream::Stream> {
let (sender, receiver) = oneshot::channel();

self.sender
.send(Command::OpenStream { peer_id, sender })
.await
.expect("Command receiver not to be dropped.");

receiver.await.expect("Sender not to be dropped.")
}

pub async fn peer_count(&self) -> usize {
let (sender, receiver) = oneshot::channel();

Expand Down
1 change: 1 addition & 0 deletions crates/network/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl EventLoop {
BehaviourEvent::Rendezvous(event) => {
events::EventHandler::handle(self, event).await
}
BehaviourEvent::Stream(()) => {}
},
SwarmEvent::NewListenAddr {
listener_id,
Expand Down
40 changes: 38 additions & 2 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod client;
pub mod config;
mod discovery;
mod events;
pub mod stream;
pub mod types;

use client::NetworkClient;
Expand All @@ -34,6 +35,7 @@ struct Behaviour {
ping: ping::Behaviour,
rendezvous: rendezvous::client::Behaviour,
relay: relay::client::Behaviour,
stream: libp2p_stream::Behaviour,
}

pub async fn run(
Expand Down Expand Up @@ -126,12 +128,25 @@ async fn init(
ping: ping::Behaviour::default(),
relay: relay_behaviour,
rendezvous: rendezvous::client::Behaviour::new(key.clone()),
stream: libp2p_stream::Behaviour::new(),
})?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(tokio::time::Duration::from_secs(30))
})
.build();

let incoming_streams = match swarm
.behaviour()
.stream
.new_control()
.accept(stream::CALIMERO_STREAM_PROTOCOL)
{
Ok(incoming_streams) => incoming_streams,
Err(err) => {
eyre::bail!("Failed to setup control for stream protocol: {:?}", err)
}
};

let (command_sender, command_receiver) = mpsc::channel(32);
let (event_sender, event_receiver) = mpsc::channel(32);

Expand All @@ -141,13 +156,20 @@ async fn init(

let discovery = discovery::Discovery::new(&config.discovery.rendezvous);

let event_loop = EventLoop::new(swarm, command_receiver, event_sender, discovery);
let event_loop = EventLoop::new(
swarm,
incoming_streams,
command_receiver,
event_sender,
discovery,
);

Ok((client, event_receiver, event_loop))
}

pub(crate) struct EventLoop {
swarm: Swarm<Behaviour>,
incoming_streams: libp2p_stream::IncomingStreams,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<types::NetworkEvent>,
discovery: discovery::Discovery,
Expand All @@ -160,12 +182,14 @@ pub(crate) struct EventLoop {
impl EventLoop {
fn new(
swarm: Swarm<Behaviour>,
incoming_streams: libp2p_stream::IncomingStreams,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<types::NetworkEvent>,
discovery: discovery::Discovery,
) -> Self {
Self {
swarm,
incoming_streams,
command_receiver,
event_sender,
discovery,
Expand All @@ -182,7 +206,12 @@ impl EventLoop {

loop {
tokio::select! {
event = self.swarm.next() => self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await,
event = self.swarm.next() => {
self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await;
},
incoming_stream = self.incoming_streams.next() => {
self.handle_incoming_stream(incoming_stream.expect("Incoming streams to be infinite.")).await;
},
command = self.command_receiver.recv() => {
let Some(c) = command else { break };
self.handle_command(c).await;
Expand Down Expand Up @@ -260,6 +289,9 @@ impl EventLoop {

let _ = sender.send(Ok(topic));
}
Command::OpenStream { peer_id, sender } => {
let _ = sender.send(self.open_stream(peer_id).await.map_err(Into::into));
}
Command::PeerCount { sender } => {
let _ = sender.send(self.swarm.connected_peers().count());
}
Expand Down Expand Up @@ -329,6 +361,10 @@ enum Command {
topic: gossipsub::IdentTopic,
sender: oneshot::Sender<eyre::Result<gossipsub::IdentTopic>>,
},
OpenStream {
peer_id: PeerId,
sender: oneshot::Sender<eyre::Result<stream::Stream>>,
},
PeerCount {
sender: oneshot::Sender<usize>,
},
Expand Down
92 changes: 92 additions & 0 deletions crates/network/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::pin::Pin;
use std::task::{Context, Poll};

use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream};
use libp2p::PeerId;
use tokio::io::BufStream;
use tokio_util::codec::Framed;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt};

use super::{types, EventLoop};

mod codec;

pub use codec::{CodecError, Message};

pub(crate) const CALIMERO_STREAM_PROTOCOL: libp2p::StreamProtocol =
libp2p::StreamProtocol::new("/calimero/stream/0.0.1");

#[derive(Debug)]
pub struct Stream {
inner: Framed<BufStream<Compat<libp2p::Stream>>, codec::MessageJsonCodec>,
}

impl Stream {
pub fn new(stream: libp2p::Stream) -> Self {
let stream = BufStream::new(stream.compat());
let stream = Framed::new(stream, codec::MessageJsonCodec);
Stream { inner: stream }
}
}

impl FuturesStream for Stream {
type Item = Result<Message, CodecError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = Pin::new(&mut self.get_mut().inner);
inner.poll_next(cx)
}
}

impl FuturesSink<Message> for Stream {
type Error = CodecError;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready_unpin(cx)
}

fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
self.inner.start_send_unpin(item)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_flush_unpin(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_close_unpin(cx)
}
}

impl EventLoop {
pub(crate) async fn handle_incoming_stream(
&mut self,
(peer, stream): (PeerId, libp2p::Stream),
) {
self.event_sender
.send(types::NetworkEvent::StreamOpened {
peer_id: peer,
stream: Stream::new(stream),
})
.await
.expect("Failed to send stream opened event");
}

pub(crate) async fn open_stream(&mut self, peer_id: PeerId) -> eyre::Result<Stream> {
let stream = match self
.swarm
.behaviour()
.stream
.new_control()
.open_stream(peer_id, CALIMERO_STREAM_PROTOCOL)
.await
{
Ok(stream) => stream,
Err(err) => {
eyre::bail!("Failed to open stream: {:?}", err);
}
};

Ok(Stream::new(stream))
}
}
104 changes: 104 additions & 0 deletions crates/network/src/stream/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use bytes::{Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Message {
pub data: Vec<u8>,
}

#[derive(Debug, Error)]
#[error("CodecError")]
pub enum CodecError {
StdIo(#[from] std::io::Error),
SerDe(serde_json::Error),
}

#[derive(Debug)]
pub(crate) struct MessageJsonCodec;

impl Decoder for MessageJsonCodec {
type Item = Message;
type Error = CodecError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let mut length_codec = LengthDelimitedCodec::new();
let Some(frame) = length_codec.decode(src)? else {
return Ok(None);
};

serde_json::from_slice(&frame)
.map(Some)
.map_err(CodecError::SerDe)
}
}

impl Encoder<Message> for MessageJsonCodec {
type Error = CodecError;

fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut length_codec = LengthDelimitedCodec::new();
let json = serde_json::to_vec(&item).map_err(CodecError::SerDe)?;

length_codec
.encode(Bytes::from(json), dst)
.map_err(CodecError::StdIo)
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures_util::StreamExt;
use tokio_test::io::Builder;
use tokio_util::codec::FramedRead;

#[test]
fn test_my_frame_encoding_decoding() {
let request = Message {
data: "Hello".bytes().collect(),
};
let response = Message {
data: "World".bytes().collect(),
};

let mut buffer = BytesMut::new();
let mut codec = MessageJsonCodec;
codec.encode(request.clone(), &mut buffer).unwrap();
codec.encode(response.clone(), &mut buffer).unwrap();

let decoded_request = codec.decode(&mut buffer).unwrap();
assert_eq!(decoded_request, Some(request));

let decoded_response = codec.decode(&mut buffer).unwrap();
assert_eq!(decoded_response, Some(response));
}

#[tokio::test]
async fn test_multiple_objects_stream() {
let request = Message {
data: "Hello".bytes().collect(),
};
let response = Message {
data: "World".bytes().collect(),
};

let mut buffer = BytesMut::new();
let mut codec = MessageJsonCodec;
codec.encode(request.clone(), &mut buffer).unwrap();
codec.encode(response.clone(), &mut buffer).unwrap();

let mut stream = Builder::new().read(&buffer.freeze()).build();
let mut framed = FramedRead::new(&mut stream, MessageJsonCodec);

let decoded_request = framed.next().await.unwrap().unwrap();
assert_eq!(decoded_request, request);

let decoded_response = framed.next().await.unwrap().unwrap();
assert_eq!(decoded_response, response);

let decoded3 = framed.next().await;
assert_eq!(decoded3.is_none(), true);
}
}
Loading

0 comments on commit da96058

Please sign in to comment.