diff --git a/Cargo.lock b/Cargo.lock index 50d78660d..37943e934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -30,7 +30,7 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", - "tokio-util 0.7.10", + "tokio-util 0.7.11", ] [[package]] @@ -40,7 +40,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -61,7 +61,7 @@ checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -120,7 +120,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom 0.2.12", + "getrandom 0.2.15", "once_cell", "version_check", ] @@ -540,7 +540,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -557,7 +557,7 @@ checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -770,7 +770,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -894,7 +894,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", "syn_derive", ] @@ -1072,15 +1072,21 @@ dependencies = [ name = "calimero-network" version = "0.1.0" dependencies = [ + "bytes", "calimero-node-primitives", "calimero-primitives", "eyre", + "futures-util", "libp2p", + "libp2p-stream", "multiaddr", "owo-colors", "serde", "serde_json", + "thiserror", "tokio", + "tokio-test", + "tokio-util 0.7.11", "tracing", ] @@ -1178,7 +1184,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", "thiserror", ] @@ -1474,7 +1480,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -1840,7 +1846,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -1864,7 +1870,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -1875,7 +1881,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -1980,7 +1986,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -2067,7 +2073,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -2135,9 +2141,9 @@ dependencies = [ [[package]] name = "either" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" +checksum = "3dca9240753cf90908d7e4aac30f630662b02aebaa1b58a3cadabdb23385b58b" [[package]] name = "elementtree" @@ -2187,7 +2193,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -2227,7 +2233,7 @@ checksum = "f282cfdfe92516eb26c2af8589c274c7c17681f5ecc03c18255fe741c6aa64eb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -2248,7 +2254,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -2626,7 +2632,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -2723,9 +2729,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.12" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if 1.0.0", "js-sys", @@ -2816,7 +2822,7 @@ dependencies = [ "indexmap 2.2.6", "slab", "tokio", - "tokio-util 0.7.10", + "tokio-util 0.7.11", "tracing", ] @@ -2835,7 +2841,7 @@ dependencies = [ "indexmap 2.2.6", "slab", "tokio", - "tokio-util 0.7.10", + "tokio-util 0.7.11", "tracing", ] @@ -3601,9 +3607,9 @@ checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libloading" @@ -3635,7 +3641,7 @@ dependencies = [ "either", "futures", "futures-timer", - "getrandom 0.2.12", + "getrandom 0.2.15", "instant", "libp2p-allow-block-list", "libp2p-autonat", @@ -3824,7 +3830,7 @@ dependencies = [ "fnv", "futures", "futures-ticker", - "getrandom 0.2.12", + "getrandom 0.2.15", "hex_fmt", "instant", "libp2p-core", @@ -4150,22 +4156,38 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-stream" +version = "0.1.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e80e4cc955913d1a3e292688aada12fc86edefec9ada087cf91825cd6368887" +dependencies = [ + "futures", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "tracing", + "void", +] + [[package]] name = "libp2p-swarm" -version = "0.44.1" +version = "0.44.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e92532fc3c4fb292ae30c371815c9b10103718777726ea5497abc268a4761866" +checksum = "80cae6cb75f89dbca53862f9ebe0b9f463aa7b302762fcfaafb9e51dcc9b0f7e" dependencies = [ "async-std", "either", "fnv", "futures", "futures-timer", - "getrandom 0.2.12", + "getrandom 0.2.15", "instant", "libp2p-core", "libp2p-identity", "libp2p-swarm-derive", + "lru 0.12.3", "multistream-select", "once_cell", "rand 0.8.5", @@ -4178,14 +4200,14 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" -version = "0.34.1" +version = "0.34.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b644268b4acfdaa6a6100b31226ee7a36d96ab4c43287d113bfd2308607d8b6f" +checksum = "5daceb9dd908417b6dfcfe8e94098bc4aac54500c282e78120b885dadc09b999" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -4968,7 +4990,7 @@ checksum = "80fca203c51edd9595ec14db1d13359fb9ede32314990bf296b6c5c4502f6ab7" dependencies = [ "quote", "serde", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -4980,7 +5002,7 @@ dependencies = [ "fs2", "near-rpc-error-core", "serde", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -5036,7 +5058,7 @@ dependencies = [ "serde_json", "strum 0.26.2", "strum_macros 0.26.2", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -5387,7 +5409,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -5490,7 +5512,7 @@ dependencies = [ "proc-macro2", "proc-macro2-diagnostics", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -5686,7 +5708,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -5825,7 +5847,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7" dependencies = [ "proc-macro2", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -5913,9 +5935,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.79" +version = "1.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" +checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" dependencies = [ "unicode-ident", ] @@ -5928,7 +5950,7 @@ checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", "version_check", "yansi", ] @@ -5968,7 +5990,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -6215,7 +6237,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.12", + "getrandom 0.2.15", ] [[package]] @@ -6274,7 +6296,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" dependencies = [ - "getrandom 0.2.12", + "getrandom 0.2.15", "libredox", "thiserror", ] @@ -6490,7 +6512,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if 1.0.0", - "getrandom 0.2.12", + "getrandom 0.2.15", "libc", "spin 0.9.8", "untrusted 0.9.0", @@ -6802,7 +6824,7 @@ checksum = "1db149f81d46d2deba7cd3c50772474707729550221e69588478ebf9ada425ae" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -6935,7 +6957,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -6978,7 +7000,7 @@ checksum = "0b2e6b945e9d3df726b65d6ee24060aff8e3533d431f677a9695db04eff9dfdb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -7029,7 +7051,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -7358,7 +7380,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -7422,9 +7444,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.57" +version = "2.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11a6ae1e52eb25aab8f3fb9fca13be982a373b8f1157ca14b897a825ba4a2d35" +checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" dependencies = [ "proc-macro2", "quote", @@ -7440,7 +7462,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -7570,7 +7592,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -7675,7 +7697,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -7710,6 +7732,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.21.0" @@ -7738,9 +7773,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", @@ -7748,7 +7783,6 @@ dependencies = [ "futures-sink", "pin-project-lite", "tokio", - "tracing", ] [[package]] @@ -7873,7 +7907,7 @@ dependencies = [ "rand 0.8.5", "slab", "tokio", - "tokio-util 0.7.10", + "tokio-util 0.7.11", "tower-layer", "tower-service", "tracing", @@ -7915,7 +7949,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "tokio", - "tokio-util 0.7.10", + "tokio-util 0.7.11", "tower-layer", "tower-service", "tracing", @@ -8016,7 +8050,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -8393,7 +8427,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", "wasm-bindgen-shared", ] @@ -8427,7 +8461,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -8657,7 +8691,7 @@ dependencies = [ "tiny-keccak", "tokio", "tokio-stream", - "tokio-util 0.7.10", + "tokio-util 0.7.11", "url", "web3-async-native-tls", ] @@ -9108,7 +9142,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] @@ -9128,7 +9162,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.57", + "syn 2.0.66", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0172531f7..8363d5483 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ axum = "0.7.4" base64 = "0.22.0" borsh = "1.3.1" bs58 = "0.5.0" +bytes = "1.6.0" camino = "1.1.6" chrono = "0.4.37" clap = "4.4.18" @@ -45,6 +46,7 @@ fragile = "2.0.0" futures-util = "0.3.30" hex = "0.4.3" libp2p = "0.53.2" +libp2p-stream = "0.1.0-alpha.1" multiaddr = "0.18.1" multibase = "0.9.1" near-jsonrpc-client = "0.8.0" @@ -69,6 +71,8 @@ serde_json = "1.0.113" syn = "2.0" thiserror = "1.0.56" tokio = "1.35.1" +tokio-test = "0.4.4" +tokio-util = "0.7.11" toml = "0.8.9" tower = "0.4.13" tower-http = "0.5.2" diff --git a/crates/network/Cargo.toml b/crates/network/Cargo.toml index b2f254728..7c577aae5 100644 --- a/crates/network/Cargo.toml +++ b/crates/network/Cargo.toml @@ -7,14 +7,22 @@ repository.workspace = true license.workspace = true [dependencies] +bytes.workspace = true eyre.workspace = true +futures-util.workspace = true libp2p = { workspace = true, features = ["full"] } +libp2p-stream.workspace = true multiaddr.workspace = true owo-colors.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true +thiserror.workspace = true tokio.workspace = true +tokio-util = { workspace = true, features = ["codec", "compat"] } tracing.workspace = true calimero-node-primitives = { path = "../node-primitives" } calimero-primitives = { path = "../primitives" } + +[dev-dependencies] +tokio-test.workspace = true diff --git a/crates/network/src/client.rs b/crates/network/src/client.rs index cfb6bedae..13e833075 100644 --- a/crates/network/src/client.rs +++ b/crates/network/src/client.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use libp2p::{gossipsub, Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; -use crate::Command; +use crate::{stream, Command}; #[derive(Clone)] pub struct NetworkClient { @@ -63,6 +63,17 @@ impl NetworkClient { receiver.await.expect("Sender not to be dropped.") } + pub async fn open_stream(&self, peer_id: PeerId) -> eyre::Result { + let (sender, receiver) = oneshot::channel(); + + self.sender + .send(Command::OpenStream { peer_id, sender }) + .await + .expect("Command receiver not to be dropped."); + + receiver.await.expect("Sender not to be dropped.") + } + pub async fn peer_count(&self) -> usize { let (sender, receiver) = oneshot::channel(); diff --git a/crates/network/src/events.rs b/crates/network/src/events.rs index d92288356..aac2e686c 100644 --- a/crates/network/src/events.rs +++ b/crates/network/src/events.rs @@ -29,6 +29,7 @@ impl EventLoop { BehaviourEvent::Rendezvous(event) => { events::EventHandler::handle(self, event).await } + BehaviourEvent::Stream(()) => {} }, SwarmEvent::NewListenAddr { listener_id, diff --git a/crates/network/src/lib.rs b/crates/network/src/lib.rs index ad896f8ad..a05107aa5 100644 --- a/crates/network/src/lib.rs +++ b/crates/network/src/lib.rs @@ -15,6 +15,7 @@ pub mod client; pub mod config; mod discovery; mod events; +pub mod stream; pub mod types; use client::NetworkClient; @@ -34,6 +35,7 @@ struct Behaviour { ping: ping::Behaviour, rendezvous: rendezvous::client::Behaviour, relay: relay::client::Behaviour, + stream: libp2p_stream::Behaviour, } pub async fn run( @@ -126,12 +128,25 @@ async fn init( ping: ping::Behaviour::default(), relay: relay_behaviour, rendezvous: rendezvous::client::Behaviour::new(key.clone()), + stream: libp2p_stream::Behaviour::new(), })? .with_swarm_config(|cfg| { cfg.with_idle_connection_timeout(tokio::time::Duration::from_secs(30)) }) .build(); + let incoming_streams = match swarm + .behaviour() + .stream + .new_control() + .accept(stream::CALIMERO_STREAM_PROTOCOL) + { + Ok(incoming_streams) => incoming_streams, + Err(err) => { + eyre::bail!("Failed to setup control for stream protocol: {:?}", err) + } + }; + let (command_sender, command_receiver) = mpsc::channel(32); let (event_sender, event_receiver) = mpsc::channel(32); @@ -141,13 +156,20 @@ async fn init( let discovery = discovery::Discovery::new(&config.discovery.rendezvous); - let event_loop = EventLoop::new(swarm, command_receiver, event_sender, discovery); + let event_loop = EventLoop::new( + swarm, + incoming_streams, + command_receiver, + event_sender, + discovery, + ); Ok((client, event_receiver, event_loop)) } pub(crate) struct EventLoop { swarm: Swarm, + incoming_streams: libp2p_stream::IncomingStreams, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, discovery: discovery::Discovery, @@ -160,12 +182,14 @@ pub(crate) struct EventLoop { impl EventLoop { fn new( swarm: Swarm, + incoming_streams: libp2p_stream::IncomingStreams, command_receiver: mpsc::Receiver, event_sender: mpsc::Sender, discovery: discovery::Discovery, ) -> Self { Self { swarm, + incoming_streams, command_receiver, event_sender, discovery, @@ -182,7 +206,12 @@ impl EventLoop { loop { tokio::select! { - event = self.swarm.next() => self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await, + event = self.swarm.next() => { + self.handle_swarm_event(event.expect("Swarm stream to be infinite.")).await; + }, + incoming_stream = self.incoming_streams.next() => { + self.handle_incoming_stream(incoming_stream.expect("Incoming streams to be infinite.")).await; + }, command = self.command_receiver.recv() => { let Some(c) = command else { break }; self.handle_command(c).await; @@ -260,6 +289,9 @@ impl EventLoop { let _ = sender.send(Ok(topic)); } + Command::OpenStream { peer_id, sender } => { + let _ = sender.send(self.open_stream(peer_id).await.map_err(Into::into)); + } Command::PeerCount { sender } => { let _ = sender.send(self.swarm.connected_peers().count()); } @@ -329,6 +361,10 @@ enum Command { topic: gossipsub::IdentTopic, sender: oneshot::Sender>, }, + OpenStream { + peer_id: PeerId, + sender: oneshot::Sender>, + }, PeerCount { sender: oneshot::Sender, }, diff --git a/crates/network/src/stream.rs b/crates/network/src/stream.rs new file mode 100644 index 000000000..13238d4a6 --- /dev/null +++ b/crates/network/src/stream.rs @@ -0,0 +1,92 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_util::{Sink as FuturesSink, SinkExt, Stream as FuturesStream}; +use libp2p::PeerId; +use tokio::io::BufStream; +use tokio_util::codec::Framed; +use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; + +use super::{types, EventLoop}; + +mod codec; + +pub use codec::{CodecError, Message}; + +pub(crate) const CALIMERO_STREAM_PROTOCOL: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/calimero/stream/0.0.1"); + +#[derive(Debug)] +pub struct Stream { + inner: Framed>, codec::MessageJsonCodec>, +} + +impl Stream { + pub fn new(stream: libp2p::Stream) -> Self { + let stream = BufStream::new(stream.compat()); + let stream = Framed::new(stream, codec::MessageJsonCodec); + Stream { inner: stream } + } +} + +impl FuturesStream for Stream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let inner = Pin::new(&mut self.get_mut().inner); + inner.poll_next(cx) + } +} + +impl FuturesSink for Stream { + type Error = CodecError; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready_unpin(cx) + } + + fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { + self.inner.start_send_unpin(item) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_flush_unpin(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close_unpin(cx) + } +} + +impl EventLoop { + pub(crate) async fn handle_incoming_stream( + &mut self, + (peer, stream): (PeerId, libp2p::Stream), + ) { + self.event_sender + .send(types::NetworkEvent::StreamOpened { + peer_id: peer, + stream: Stream::new(stream), + }) + .await + .expect("Failed to send stream opened event"); + } + + pub(crate) async fn open_stream(&mut self, peer_id: PeerId) -> eyre::Result { + let stream = match self + .swarm + .behaviour() + .stream + .new_control() + .open_stream(peer_id, CALIMERO_STREAM_PROTOCOL) + .await + { + Ok(stream) => stream, + Err(err) => { + eyre::bail!("Failed to open stream: {:?}", err); + } + }; + + Ok(Stream::new(stream)) + } +} diff --git a/crates/network/src/stream/codec.rs b/crates/network/src/stream/codec.rs new file mode 100644 index 000000000..dd2b3e739 --- /dev/null +++ b/crates/network/src/stream/codec.rs @@ -0,0 +1,104 @@ +use bytes::{Bytes, BytesMut}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct Message { + pub data: Vec, +} + +#[derive(Debug, Error)] +#[error("CodecError")] +pub enum CodecError { + StdIo(#[from] std::io::Error), + SerDe(serde_json::Error), +} + +#[derive(Debug)] +pub(crate) struct MessageJsonCodec; + +impl Decoder for MessageJsonCodec { + type Item = Message; + type Error = CodecError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let mut length_codec = LengthDelimitedCodec::new(); + let Some(frame) = length_codec.decode(src)? else { + return Ok(None); + }; + + serde_json::from_slice(&frame) + .map(Some) + .map_err(CodecError::SerDe) + } +} + +impl Encoder for MessageJsonCodec { + type Error = CodecError; + + fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut length_codec = LengthDelimitedCodec::new(); + let json = serde_json::to_vec(&item).map_err(CodecError::SerDe)?; + + length_codec + .encode(Bytes::from(json), dst) + .map_err(CodecError::StdIo) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_util::StreamExt; + use tokio_test::io::Builder; + use tokio_util::codec::FramedRead; + + #[test] + fn test_my_frame_encoding_decoding() { + let request = Message { + data: "Hello".bytes().collect(), + }; + let response = Message { + data: "World".bytes().collect(), + }; + + let mut buffer = BytesMut::new(); + let mut codec = MessageJsonCodec; + codec.encode(request.clone(), &mut buffer).unwrap(); + codec.encode(response.clone(), &mut buffer).unwrap(); + + let decoded_request = codec.decode(&mut buffer).unwrap(); + assert_eq!(decoded_request, Some(request)); + + let decoded_response = codec.decode(&mut buffer).unwrap(); + assert_eq!(decoded_response, Some(response)); + } + + #[tokio::test] + async fn test_multiple_objects_stream() { + let request = Message { + data: "Hello".bytes().collect(), + }; + let response = Message { + data: "World".bytes().collect(), + }; + + let mut buffer = BytesMut::new(); + let mut codec = MessageJsonCodec; + codec.encode(request.clone(), &mut buffer).unwrap(); + codec.encode(response.clone(), &mut buffer).unwrap(); + + let mut stream = Builder::new().read(&buffer.freeze()).build(); + let mut framed = FramedRead::new(&mut stream, MessageJsonCodec); + + let decoded_request = framed.next().await.unwrap().unwrap(); + assert_eq!(decoded_request, request); + + let decoded_response = framed.next().await.unwrap().unwrap(); + assert_eq!(decoded_response, response); + + let decoded3 = framed.next().await; + assert_eq!(decoded3.is_none(), true); + } +} diff --git a/crates/network/src/types.rs b/crates/network/src/types.rs index dbfd4025b..6514d411f 100644 --- a/crates/network/src/types.rs +++ b/crates/network/src/types.rs @@ -2,6 +2,8 @@ use libp2p::core::transport; pub use libp2p::gossipsub::{IdentTopic, Message, MessageId, TopicHash}; pub use libp2p::identity::PeerId; +use super::stream; + #[derive(Debug)] pub enum NetworkEvent { ListeningOn { @@ -16,4 +18,8 @@ pub enum NetworkEvent { id: MessageId, message: Message, }, + StreamOpened { + peer_id: PeerId, + stream: stream::Stream, + }, } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index eb380f682..7743b6440 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -367,6 +367,14 @@ impl Node { calimero_network::types::NetworkEvent::ListeningOn { address, .. } => { info!("Listening on: {}", address); } + calimero_network::types::NetworkEvent::StreamOpened { peer_id, stream } => { + info!("Stream opened from peer: {}", peer_id); + if let Err(err) = self.handle_stream(stream).await { + error!(%err, "Failed to handle stream"); + } + + info!("Stream closed from peer: {:?}", peer_id); + } } Ok(()) @@ -636,6 +644,14 @@ impl Node { Ok(outcome) } + + async fn handle_stream( + &mut self, + _stream: calimero_network::stream::Stream, + ) -> eyre::Result<()> { + // TODO: implement catchup mechanism once storage is ready + Ok(()) + } } // TODO: move this into the config