diff --git a/Cargo.lock b/Cargo.lock index 5598549..cd76415 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -464,7 +464,6 @@ dependencies = [ "libp2p-stream", "multiaddr", "owo-colors", - "rand", "serde", "serde_json", "thiserror", @@ -2863,15 +2862,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "signal-hook-registry" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" -dependencies = [ - "libc", -] - [[package]] name = "signature" version = "2.2.0" @@ -3106,7 +3096,6 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/examples/chat/Cargo.toml b/examples/chat/Cargo.toml index 47f4477..da42da9 100644 --- a/examples/chat/Cargo.toml +++ b/examples/chat/Cargo.toml @@ -32,17 +32,14 @@ libp2p = { version = "0.53.2", features = [ libp2p-stream = "0.1.0-alpha.1" multiaddr = "0.18.1" owo-colors = "4.0.0" -rand = { version = "0.8.5" } serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" thiserror = "1.0.56" tokio = { version = "1.35.1", features = [ - "io-util", "io-std", "macros", "rt", "rt-multi-thread", - "signal", ] } tokio-util = { version = "0.7.11", features = ["codec", "compat"] } tracing = "0.1.37" diff --git a/examples/chat/src/main.rs b/examples/chat/src/main.rs index da142bf..35f1235 100644 --- a/examples/chat/src/main.rs +++ b/examples/chat/src/main.rs @@ -1,6 +1,5 @@ -use rand::prelude::SliceRandom; +use std::collections::HashSet; use std::str::FromStr; -use std::time::Duration; use clap::Parser; use clap::ValueEnum; @@ -56,6 +55,19 @@ enum Mode { Echo, } +impl Mode { + fn is_interactive(&self) -> bool { + matches!(self, Mode::Interactive) + } +} + +struct Node { + mode: Mode, + store: store::Store, + network_client: network::client::NetworkClient, + pending_catchups: HashSet, +} + #[tokio::main] async fn main() -> eyre::Result<()> { tracing_subscriber::registry() @@ -81,29 +93,10 @@ async fn main() -> eyre::Result<()> { ) .await?; - if let Some(peer_addrs) = opt.dial_peer_addrs { - for addr in peer_addrs { - info!("Dialing peer: {}", addr); - network_client.dial(addr).await?; - } - } - - if let Some(topic_names) = opt.gossip_topic_names { - for topic_name in topic_names { - info!("Subscribing to topic: {}", topic_name); - let topic = gossipsub::IdentTopic::new(topic_name); - network_client.subscribe(topic.clone()).await?; - - // Wait for a second to allow some peers to connect - tokio::time::sleep(Duration::from_secs(1)).await; + let mut node = Node::new(opt.mode.clone(), store.clone(), network_client.clone()); - if let Err(err) = - perform_catchup(store.clone(), network_client.clone(), topic.hash()).await - { - error!(%err, "Failed to perform catchup"); - } - } - } + node.boot(opt.dial_peer_addrs, opt.gossip_topic_names) + .await?; let peer_id = keypair.public().to_peer_id(); match opt.mode { @@ -116,11 +109,11 @@ async fn main() -> eyre::Result<()> { let Some(event) = event else { break; }; - handle_network_event(store.clone(), network_client.clone(), event, peer_id, false).await?; + node.handle_network_event(event, peer_id).await?; } line = stdin.next_line() => { if let Some(line) = line? { - handle_line(store.clone(), network_client.clone(), line).await?; + node.handle_line(line).await?; } } } @@ -128,8 +121,7 @@ async fn main() -> eyre::Result<()> { } Mode::Echo => { while let Some(event) = network_events.recv().await { - handle_network_event(store.clone(), network_client.clone(), event, peer_id, true) - .await?; + node.handle_network_event(event, peer_id).await?; } } } @@ -146,308 +138,334 @@ fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair { const LINE_START: &str = ">>>>>>>>>> "; -async fn handle_network_event( - store: store::Store, - network_client: network::client::NetworkClient, - event: network::types::NetworkEvent, - local_peer_id: PeerId, - is_echo: bool, -) -> eyre::Result<()> { - match event { - network::types::NetworkEvent::Subscribed { peer_id, topic } => { - info!("Peer '{}' subscribed to topic: {}", peer_id, topic); +impl Node { + fn new( + mode: Mode, + store: store::Store, + network_client: network::client::NetworkClient, + ) -> Self { + Self { + mode, + store, + network_client, + pending_catchups: Default::default(), } - network::types::NetworkEvent::Message { message, .. } => { - if let Err(err) = - handle_message(store, network_client, local_peer_id, is_echo, message).await - { - error!(%err, "Failed to handle message"); + } + + async fn boot( + &mut self, + dial_peer_addrs: Option>, + gossip_topic_names: Option>, + ) -> eyre::Result<()> { + if let Some(peer_addrs) = dial_peer_addrs { + for addr in peer_addrs { + info!("Dialing peer: {}", addr); + self.network_client.dial(addr).await?; } } - network::types::NetworkEvent::ListeningOn { address, .. } => { - info!("Listening on: {}", address); - } - network::types::NetworkEvent::StreamOpened { peer_id, stream } => { - info!("Stream opened from peer: {}", peer_id); - if let Err(err) = handle_stream(store, stream).await { - error!(%err, "Failed to handle stream"); - } - info!("Stream closed from peer: {:?}", peer_id); - } - } - Ok(()) -} + if let Some(topic_names) = gossip_topic_names { + for topic_name in topic_names { + info!("Subscribing to topic: {}", topic_name); + let topic = gossipsub::IdentTopic::new(topic_name); -async fn handle_message( - mut store: store::Store, - network_client: network::client::NetworkClient, - peer_id: PeerId, - is_echo: bool, - message: gossipsub::Message, -) -> eyre::Result<()> { - let text = String::from_utf8_lossy(&message.data); - println!( - "{LINE_START} Received message: {:?}, from: {:?}", - text, message.source - ); - - store - .add_message( - types::ApplicationId::from(message.topic.clone().into_string()), - types::ChatMessage::new(message.source, message.data.clone()), - ) - .await; - - if !is_echo { - return Ok(()); - } + if self.mode.is_interactive() { + self.pending_catchups.insert(topic.hash().clone()); + } - if text.starts_with("echo") { - debug!("Ignoring echo message"); - return Ok(()); + self.pending_catchups.insert(topic.hash().clone()); + self.network_client.subscribe(topic.clone()).await?; + } + } + Ok(()) } - let text = format!("echo ({}): '{}'", peer_id, text); - - network_client - .publish(message.topic, text.into_bytes()) - .await?; - Ok(()) -} -async fn handle_stream( - store: store::Store, - mut stream: network::stream::Stream, -) -> eyre::Result<()> { - let application_id = match stream.next().await { - Some(message) => match serde_json::from_slice(&message.data)? { - types::CatchupStreamMessage::Request(req) => { - types::ApplicationId::from(req.application_id) + async fn handle_network_event( + &mut self, + event: network::types::NetworkEvent, + local_peer_id: PeerId, + ) -> eyre::Result<()> { + match event { + network::types::NetworkEvent::Subscribed { peer_id, topic } => { + info!("Peer '{}' subscribed to topic: {}", peer_id, topic); + + if self.pending_catchups.contains(&topic) { + if let Err(err) = self.perform_catchup(topic.clone(), peer_id).await { + error!(%err, "Failed to perform catchup"); + } else { + self.pending_catchups.remove(&topic); + } + } } - message => { - eyre::bail!("Unexpected message: {:?}", message) + network::types::NetworkEvent::Message { message, .. } => { + if let Err(err) = self.handle_message(local_peer_id, message).await { + error!(%err, "Failed to handle message"); + } } - }, - None => { - eyre::bail!("Stream closed unexpectedly") - } - }; - - let mut iter = store.batch_stream(application_id, 3); - while let Some(messages) = iter.next().await { - info!("Sending batch: {:?}", messages); - let response = serde_json::to_vec(&types::CatchupStreamMessage::Response( - types::CatchupResponse { messages }, - ))?; + network::types::NetworkEvent::ListeningOn { address, .. } => { + info!("Listening on: {}", address); + } + network::types::NetworkEvent::StreamOpened { peer_id, stream } => { + info!("Stream opened from peer: {}", peer_id); + if let Err(err) = self.handle_stream(stream).await { + error!(%err, "Failed to handle stream"); + } - stream - .send(network::stream::Message { data: response }) - .await?; + info!("Stream closed from peer: {:?}", peer_id); + } + } + Ok(()) } - Ok(()) -} - -async fn perform_catchup( - mut store: store::Store, - network_client: network::client::NetworkClient, - topic: gossipsub::TopicHash, -) -> eyre::Result> { - let mesh_peers = network_client.mesh_peers(topic.clone()).await; - let choosen_peer = match mesh_peers.choose(&mut rand::thread_rng()) { - Some(peer) => peer, - None => { - info!("No mesh peers found for topic: {}", topic); - return Ok(None); + async fn handle_message( + &mut self, + peer_id: PeerId, + message: gossipsub::Message, + ) -> eyre::Result<()> { + let text = String::from_utf8_lossy(&message.data); + println!( + "{LINE_START} Received message: {:?}, from: {:?}", + text, message.source + ); + + self.store + .add_message( + types::ApplicationId::from(message.topic.clone().into_string()), + types::ChatMessage::new(message.source, message.data.clone()), + ) + .await; + + if self.mode.is_interactive() { + return Ok(()); } - }; - let mut stream = network_client.open_stream(*choosen_peer).await?; - info!("Opened stream to peer: {:?}", choosen_peer); - - let request = serde_json::to_vec(&types::CatchupStreamMessage::Request( - types::CatchupRequest { - application_id: topic.clone().into_string(), - }, - ))?; + if text.starts_with("echo") { + debug!("Ignoring echo message"); + return Ok(()); + } + let text = format!("echo ({}): '{}'", peer_id, text); - stream - .send(network::stream::Message { data: request }) - .await?; + self.network_client + .publish(message.topic, text.into_bytes()) + .await?; + Ok(()) + } - info!("Sent catchup request to peer: {:?}", choosen_peer); - - while let Some(message) = stream.next().await { - match serde_json::from_slice(&message.data)? { - types::CatchupStreamMessage::Response(response) => { - for message in response.messages { - let text = String::from_utf8_lossy(&message.data); - println!( - "{LINE_START} Received cacthup message: {:?}, original from: {:?}", - text, message.source - ); - - store - .add_message( - types::ApplicationId::from(topic.clone().into_string()), - message, - ) - .await; + async fn handle_stream(&mut self, mut stream: network::stream::Stream) -> eyre::Result<()> { + let application_id = match stream.next().await { + Some(message) => match serde_json::from_slice(&message.data)? { + types::CatchupStreamMessage::Request(req) => { + types::ApplicationId::from(req.application_id) } - } - event => { - warn!(?event, "Unexpected event"); + message => { + eyre::bail!("Unexpected message: {:?}", message) + } + }, + None => { + eyre::bail!("Stream closed unexpectedly") } }; - } - return Ok(Some(())); -} + let mut iter = self.store.batch_stream(application_id, 3); + while let Some(messages) = iter.next().await { + info!("Sending batch: {:?}", messages); + let response = serde_json::to_vec(&types::CatchupStreamMessage::Response( + types::CatchupResponse { messages }, + ))?; -async fn handle_line( - store: store::Store, - network_client: network::client::NetworkClient, - line: String, -) -> eyre::Result<()> { - let (command, args) = match line.split_once(' ') { - Some((method, payload)) => (method, Some(payload)), - None => (line.as_str(), None), - }; - - match command { - "dial" => { - let args = match args { - Some(args) => args, - None => { - println!("{LINE_START} Usage: dial "); - return Ok(()); - } - }; + stream + .send(network::stream::Message { data: response }) + .await?; + } - let addr = match Multiaddr::from_str(args) { - Ok(addr) => addr, - Err(err) => { - println!("{LINE_START} Failed to parse MultiAddr: {:?}", err); - return Ok(()); - } - }; + Ok(()) + } - info!("{LINE_START} Dialing {:?}", addr); + async fn perform_catchup( + &mut self, + topic: gossipsub::TopicHash, + choosen_peer: PeerId, + ) -> eyre::Result> { + let mut stream = self.network_client.open_stream(choosen_peer).await?; + info!("Opened stream to peer: {:?}", choosen_peer); + + let request = serde_json::to_vec(&types::CatchupStreamMessage::Request( + types::CatchupRequest { + application_id: topic.clone().into_string(), + }, + ))?; - match network_client.dial(addr).await { - Ok(_) => { - println!("{LINE_START} Peer dialed"); - } - Err(err) => { - println!("{LINE_START} Failed to dial peer: {:?}", err); - } - }; - } - "subscribe" => { - let args = match args { - Some(args) => args, - None => { - println!("{LINE_START} Usage: subscribe "); - return Ok(()); - } - }; + stream + .send(network::stream::Message { data: request }) + .await?; - let topic = gossipsub::IdentTopic::new(args.to_string()); - match network_client.subscribe(topic).await { - Ok(_) => { - println!("{LINE_START} Subscribed to topic"); + info!("Sent catchup request to peer: {:?}", choosen_peer); + + while let Some(message) = stream.next().await { + match serde_json::from_slice(&message.data)? { + types::CatchupStreamMessage::Response(response) => { + for message in response.messages { + let text = String::from_utf8_lossy(&message.data); + println!( + "{LINE_START} Received cacthup message: {:?}, original from: {:?}", + text, message.source + ); + + self.store + .add_message( + types::ApplicationId::from(topic.clone().into_string()), + message, + ) + .await; + } } - Err(err) => { - println!("{LINE_START} Failed to subscribe to topic: {:?}", err); + event => { + warn!(?event, "Unexpected event"); } }; } - "unsubscribe" => { - let args = match args { - Some(args) => args, - None => { - println!("{LINE_START} Usage: unsubscribe "); - return Ok(()); - } - }; - let topic = gossipsub::IdentTopic::new(args.to_string()); - match network_client.unsubscribe(topic).await { - Ok(_) => { - println!("{LINE_START} Unsubscribed from topic"); - } - Err(err) => { - println!("{LINE_START} Failed to unsubscribe from topic: {:?}", err); - } - }; - } - "publish" => { - let args = match args { - Some(args) => args, - None => { - println!("{LINE_START} Usage: message "); - return Ok(()); - } - }; + info!("Closed stream to peer: {:?}", choosen_peer); + return Ok(Some(())); + } - let mut args_iter = args.split_whitespace(); - let topic_name = match args_iter.next() { - Some(topic) => topic, - None => { - println!("{LINE_START} Usage: message "); - return Ok(()); - } - }; + async fn handle_line(&mut self, line: String) -> eyre::Result<()> { + let (command, args) = match line.split_once(' ') { + Some((method, payload)) => (method, Some(payload)), + None => (line.as_str(), None), + }; - let message_data = match args_iter.next() { - Some(data) => data, - None => { - println!("{LINE_START} Usage: message "); - return Ok(()); - } - }; + match command { + "dial" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: dial "); + return Ok(()); + } + }; - let topic = gossipsub::IdentTopic::new(topic_name.to_string()); - match network_client - .publish(topic.hash(), message_data.as_bytes().to_vec()) - .await - { - Ok(_) => { - println!("{LINE_START} Message published successfully"); - } - Err(err) => { - println!("{LINE_START} Failed to publish message: {:?}", err); - } - }; + let addr = match Multiaddr::from_str(args) { + Ok(addr) => addr, + Err(err) => { + println!("{LINE_START} Failed to parse MultiAddr: {:?}", err); + return Ok(()); + } + }; - // Wait for a second to allow some peers to connect - tokio::time::sleep(Duration::from_secs(1)).await; + info!("{LINE_START} Dialing {:?}", addr); - if let Err(err) = - perform_catchup(store.clone(), network_client.clone(), topic.hash()).await - { - error!(%err, "Failed to perform catchup"); + match self.network_client.dial(addr).await { + Ok(_) => { + println!("{LINE_START} Peer dialed"); + } + Err(err) => { + println!("{LINE_START} Failed to dial peer: {:?}", err); + } + }; } - } - "peers" => { - let peer_info = network_client.peer_info().await; - println!("{LINE_START} Peer info: {:?}", peer_info); - } - "mesh-peers" => { - let args = match args { - Some(args) => args, - None => { - println!("{LINE_START} Usage: mesh-peers "); - return Ok(()); + "subscribe" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: subscribe "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(args.to_string()); + if self.mode.is_interactive() { + self.pending_catchups.insert(topic.hash().clone()); } - }; - let topic = gossipsub::IdentTopic::new(args.to_string()); - let mesh_peer_info = network_client.mesh_peer_info(topic.hash()).await; - println!("{LINE_START} Mesh peer info: {:?}", mesh_peer_info); + match self.network_client.subscribe(topic).await { + Ok(_) => { + println!("{LINE_START} Subscribed to topic"); + } + Err(err) => { + println!("{LINE_START} Failed to subscribe to topic: {:?}", err); + } + }; + } + "unsubscribe" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: unsubscribe "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(args.to_string()); + match self.network_client.unsubscribe(topic).await { + Ok(_) => { + println!("{LINE_START} Unsubscribed from topic"); + } + Err(err) => { + println!("{LINE_START} Failed to unsubscribe from topic: {:?}", err); + } + }; + } + "publish" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: message "); + return Ok(()); + } + }; + + let mut args_iter = args.split_whitespace(); + let topic_name = match args_iter.next() { + Some(topic) => topic, + None => { + println!("{LINE_START} Usage: message "); + return Ok(()); + } + }; + + let message_data = match args_iter.next() { + Some(data) => data, + None => { + println!("{LINE_START} Usage: message "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(topic_name.to_string()); + match self + .network_client + .publish(topic.hash(), message_data.as_bytes().to_vec()) + .await + { + Ok(_) => { + println!("{LINE_START} Message published successfully"); + } + Err(err) => { + println!("{LINE_START} Failed to publish message: {:?}", err); + } + }; + } + "peers" => { + let peer_info = self.network_client.peer_info().await; + println!("{LINE_START} Peer info: {:?}", peer_info); + } + "mesh-peers" => { + let args = match args { + Some(args) => args, + None => { + println!("{LINE_START} Usage: mesh-peers "); + return Ok(()); + } + }; + + let topic = gossipsub::IdentTopic::new(args.to_string()); + let mesh_peer_info = self.network_client.mesh_peer_info(topic.hash()).await; + println!("{LINE_START} Mesh peer info: {:?}", mesh_peer_info); + } + _ => println!("{LINE_START} Unknown command"), } - _ => println!("{LINE_START} Unknown command"), - } - Ok(()) + Ok(()) + } } diff --git a/examples/chat/src/network.rs b/examples/chat/src/network.rs index 7cbed54..45dc359 100644 --- a/examples/chat/src/network.rs +++ b/examples/chat/src/network.rs @@ -280,17 +280,6 @@ impl EventLoop { let _ = sender.send(Ok(id)); } - Command::MeshPeers { topic, sender } => { - let peers = self - .swarm - .behaviour_mut() - .gossipsub - .mesh_peers(&topic) - .map(|peer| peer.clone()) - .collect(); - - let _ = sender.send(peers); - } Command::OpenStream { peer_id, sender } => { match self.open_stream(peer_id).await { Ok(stream) => { @@ -364,10 +353,6 @@ pub(crate) enum Command { data: Vec, sender: oneshot::Sender>, }, - MeshPeers { - topic: gossipsub::TopicHash, - sender: oneshot::Sender>, - }, OpenStream { peer_id: PeerId, sender: oneshot::Sender>, diff --git a/examples/chat/src/network/client.rs b/examples/chat/src/network/client.rs index 4a9d7ac..a6d59a5 100644 --- a/examples/chat/src/network/client.rs +++ b/examples/chat/src/network/client.rs @@ -78,17 +78,6 @@ impl NetworkClient { receiver.await.expect("Sender not to be dropped.") } - pub async fn mesh_peers(&self, topic: gossipsub::TopicHash) -> Vec { - let (sender, receiver) = oneshot::channel(); - - self.sender - .send(Command::MeshPeers { topic, sender }) - .await - .expect("Command receiver not to be dropped."); - - receiver.await.expect("Sender not to be dropped.") - } - pub async fn open_stream(&self, peer_id: PeerId) -> eyre::Result { let (sender, receiver) = oneshot::channel();