diff --git a/Cargo.lock b/Cargo.lock index 3364891a93..b96baa2eaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2319,7 +2319,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d157562dba6017193e5285acf6b1054759e83540bfd79f75b69d6ce774c88da" dependencies = [ "asynchronous-codec", - "base64 0.21.2", + "base64 0.21.4", "byteorder", "bytes", "either", @@ -2336,9 +2336,9 @@ dependencies = [ "prometheus-client", "quick-protobuf", "quick-protobuf-codec", - "rand 0.8.5", + "rand", "regex", - "sha2 0.10.7", + "sha2", "smallvec", "unsigned-varint", "void", diff --git a/sn_cli/src/main.rs b/sn_cli/src/main.rs index 3d0d092509..890382f94f 100644 --- a/sn_cli/src/main.rs +++ b/sn_cli/src/main.rs @@ -16,6 +16,7 @@ use crate::{ cli::Opt, subcommands::{ files::files_cmds, + gossipsub::gossipsub_cmds, register::register_cmds, wallet::{wallet_cmds, wallet_cmds_without_client, WalletCmds}, SubCmd, @@ -109,6 +110,7 @@ async fn main() -> Result<()> { SubCmd::Register(cmds) => { register_cmds(cmds, &client, &client_data_dir_path, should_verify_store).await? } + SubCmd::Gossipsub(cmds) => gossipsub_cmds(cmds, &client).await?, }; Ok(()) diff --git a/sn_cli/src/subcommands/gossipsub.rs b/sn_cli/src/subcommands/gossipsub.rs new file mode 100644 index 0000000000..9dcda5d67e --- /dev/null +++ b/sn_cli/src/subcommands/gossipsub.rs @@ -0,0 +1,51 @@ +// Copyright 2023 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use clap::Subcommand; +use color_eyre::Result; +use sn_client::{Client, ClientEvent}; + +#[derive(Subcommand, Debug)] +pub enum GossipsubCmds { + /// Subscribe to a topic and listen for messages published on it + Subscribe { + /// The name of the topic. + #[clap(name = "topic")] + topic: String, + }, + /// Publish a message on a given topic + Publish { + /// The name of the topic. + #[clap(name = "topic")] + topic: String, + /// The message to publish. + #[clap(name = "msg")] + msg: String, + }, +} + +pub(crate) async fn gossipsub_cmds(cmds: GossipsubCmds, client: &Client) -> Result<()> { + match cmds { + GossipsubCmds::Subscribe { topic } => { + client.subscribe_to_topic(topic.clone())?; + println!("Subscribed to topic '{topic}'. Listening for messages published on it..."); + let mut events_channel = client.events_channel(); + while let Ok(event) = events_channel.recv().await { + if let ClientEvent::GossipsubMsg { msg, .. } = event { + let msg = String::from_utf8(msg)?; + println!("New message published: {msg}"); + } + } + } + GossipsubCmds::Publish { topic, msg } => { + client.publish_on_topic(topic.clone(), msg.into())?; + println!("Message published on topic '{topic}'."); + } + } + Ok(()) +} diff --git a/sn_cli/src/subcommands/mod.rs b/sn_cli/src/subcommands/mod.rs index 4ea4941a7c..5173fc0757 100644 --- a/sn_cli/src/subcommands/mod.rs +++ b/sn_cli/src/subcommands/mod.rs @@ -6,6 +6,7 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. pub(crate) mod files; +pub(crate) mod gossipsub; pub(crate) mod register; pub(crate) mod wallet; @@ -22,4 +23,7 @@ pub(super) enum SubCmd { #[clap(name = "register", subcommand)] /// Commands for register management Register(register::RegisterCmds), + #[clap(name = "gossipsub", subcommand)] + /// Commands for gossipsub management + Gossipsub(gossipsub::GossipsubCmds), } diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index a226b3ba2a..d475777467 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -158,6 +158,7 @@ impl Client { continue; } + Ok(ClientEvent::GossipsubMsg { .. }) => {} Err(err) => { error!("Unexpected error during client startup {err:?}"); println!("Unexpected error during client startup {err:?}"); @@ -204,36 +205,43 @@ impl Client { } fn handle_network_event(&mut self, event: NetworkEvent) -> Result<()> { - if let NetworkEvent::PeerAdded(peer_id) = event { - self.peers_added += 1; - debug!("PeerAdded: {peer_id}"); - - // In case client running in non-local-discovery mode, - // it may take some time to fill up the RT. - // To avoid such delay may fail the query with RecordNotFound, - // wait till certain amount of peers populated into RT - if self.peers_added >= CLOSE_GROUP_SIZE { - if let Some(progress) = &self.progress { - progress.finish_with_message("Connected to the Network"); - // Remove the progress bar - self.progress = None; - } + match event { + NetworkEvent::PeerAdded(peer_id) => { + self.peers_added += 1; + debug!("PeerAdded: {peer_id}"); + + // In case client running in non-local-discovery mode, + // it may take some time to fill up the RT. + // To avoid such delay may fail the query with RecordNotFound, + // wait till certain amount of peers populated into RT + if self.peers_added >= CLOSE_GROUP_SIZE { + if let Some(progress) = &self.progress { + progress.finish_with_message("Connected to the Network"); + // Remove the progress bar + self.progress = None; + } - self.events_channel - .broadcast(ClientEvent::ConnectedToNetwork)?; - } else { - debug!( - "{}/{} initial peers found.", - self.peers_added, CLOSE_GROUP_SIZE - ); - - if let Some(progress) = &self.progress { - progress.set_message(format!( + self.events_channel + .broadcast(ClientEvent::ConnectedToNetwork)?; + } else { + debug!( "{}/{} initial peers found.", self.peers_added, CLOSE_GROUP_SIZE - )); + ); + + if let Some(progress) = &self.progress { + progress.set_message(format!( + "{}/{} initial peers found.", + self.peers_added, CLOSE_GROUP_SIZE + )); + } } } + NetworkEvent::GossipsubMsg { topic, msg } => { + self.events_channel + .broadcast(ClientEvent::GossipsubMsg { topic, msg })?; + } + _other => {} } Ok(()) @@ -491,4 +499,18 @@ impl Client { Ok(adjusted_costs) } + + /// Subscribe to given gossipsub topic + pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { + info!("Subscribing to topic id: {topic_id}"); + self.network.subscribe_to_topic(topic_id)?; + Ok(()) + } + + /// Publish message on given topic + pub fn publish_on_topic(&self, topic_id: String, msg: Vec) -> Result<()> { + info!("Publishing msg on topic id: {topic_id}"); + self.network.publish_on_topic(topic_id, msg)?; + Ok(()) + } } diff --git a/sn_client/src/event.rs b/sn_client/src/event.rs index d7bcbc24c5..29927756fd 100644 --- a/sn_client/src/event.rs +++ b/sn_client/src/event.rs @@ -42,6 +42,13 @@ pub enum ClientEvent { /// No network activity has been received for a given duration /// we should error out InactiveClient(std::time::Duration), + /// Gossipsub message received on a topic the client has subscribed to + GossipsubMsg { + /// Topic the message was published on + topic: String, + /// The raw bytes of the received message + msg: Vec, + }, } /// Receiver Channel where users of the public API can listen to events broadcasted by the client. diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 5cd471e51b..b2aaf10011 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -114,10 +114,12 @@ pub enum SwarmCmd { AddKeysToReplicationFetcher { keys: Vec, }, + /// Subscribe to a given Gossipsub topic + GossipsubSubscribe(String), /// Publish a message through Gossipsub protocol - GossipMsg { + GossipsubPublish { /// Topic to publish on - topic_id: libp2p::gossipsub::IdentTopic, + topic_id: String, /// Raw bytes of the message to publish msg: Vec, }, @@ -359,12 +361,16 @@ impl SwarmDriver { .send(current_state) .map_err(|_| Error::InternalMsgChannelDropped)?; } - SwarmCmd::GossipMsg { topic_id, msg } => { + SwarmCmd::GossipsubSubscribe(topic_id) => { + let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); + self.swarm.behaviour_mut().gossipsub.subscribe(&topic_id)?; + } + SwarmCmd::GossipsubPublish { topic_id, msg } => { + let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id); self.swarm .behaviour_mut() .gossipsub - .publish(topic_id, msg) - .unwrap(); + .publish(topic_id, msg)?; } } diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 4fc36abc0a..daddd192f1 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -370,13 +370,9 @@ impl NetworkBuilder { libp2p::gossipsub::MessageAuthenticity::Signed(self.keypair.clone()); // build a gossipsub network behaviour - let mut gossipsub: libp2p::gossipsub::Behaviour = - libp2p::gossipsub::Behaviour::new(message_authenticity, gossipsub_config).unwrap(); - - // Create a Gossipsub topic - let topic = libp2p::gossipsub::IdentTopic::new("example-topic"); - // subscribe to the topic - gossipsub.subscribe(&topic).unwrap(); + let gossipsub: libp2p::gossipsub::Behaviour = + libp2p::gossipsub::Behaviour::new(message_authenticity, gossipsub_config) + .expect("Failed to instantiate Gossipsub behaviour."); if !self.local { debug!("Preventing non-global dials"); diff --git a/sn_networking/src/error.rs b/sn_networking/src/error.rs index aba5b0ff1e..ee69337e1c 100644 --- a/sn_networking/src/error.rs +++ b/sn_networking/src/error.rs @@ -7,6 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use libp2p::{ + gossipsub::{PublishError, SubscriptionError}, kad::{self, Record}, request_response::{OutboundFailure, RequestId}, swarm::DialError, @@ -103,6 +104,12 @@ pub enum Error { #[cfg(feature = "open-metrics")] #[error("Network Metric error")] NetworkMetricError, + + #[error("Gossipsub publish Error: {0}")] + GossipsubPublishError(#[from] PublishError), + + #[error("Gossipsub subscribe Error: {0}")] + GossipsubSubscriptionError(#[from] SubscriptionError), } #[cfg(test)] diff --git a/sn_networking/src/event.rs b/sn_networking/src/event.rs index 7dcade7ff8..3364a9b46e 100644 --- a/sn_networking/src/event.rs +++ b/sn_networking/src/event.rs @@ -135,7 +135,7 @@ pub enum NetworkEvent { /// Report unverified record UnverifiedRecord(Record), /// Gossipsub message received - Gossipsub { + GossipsubMsg { /// Topic the message was published on topic: String, /// The raw bytes of the received message @@ -176,8 +176,8 @@ impl Debug for NetworkEvent { let pretty_key = PrettyPrintRecordKey::from(record.key.clone()); write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})") } - NetworkEvent::Gossipsub { topic, .. } => { - write!(f, "NetworkEvent::Gossipsub({topic})") + NetworkEvent::GossipsubMsg { topic, .. } => { + write!(f, "NetworkEvent::GossipsubMsg({topic})") } } } @@ -335,7 +335,7 @@ impl SwarmDriver { libp2p::gossipsub::Event::Message { message, .. } => { let topic = message.topic.into_string(); let msg = message.data; - self.send_event(NetworkEvent::Gossipsub { topic, msg }); + self.send_event(NetworkEvent::GossipsubMsg { topic, msg }); } other => trace!("Gossipsub Event has been ignored: {other:?}"), }, diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 71fa253988..a46bee9c05 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -227,9 +227,15 @@ impl Network { get_fees_from_store_cost_responses(all_costs) } - pub fn publish_on_topic(&self, msg: Vec) -> Result<()> { - let topic_id = libp2p::gossipsub::IdentTopic::new("example-topic"); - self.send_swarm_cmd(SwarmCmd::GossipMsg { topic_id, msg })?; + /// Subscribe to given gossipsub topic + pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { + self.send_swarm_cmd(SwarmCmd::GossipsubSubscribe(topic_id))?; + Ok(()) + } + + /// Publish a msg on a given topic + pub fn publish_on_topic(&self, topic_id: String, msg: Vec) -> Result<()> { + self.send_swarm_cmd(SwarmCmd::GossipsubPublish { topic_id, msg })?; Ok(()) } diff --git a/sn_node/examples/safenode_rpc_client.rs b/sn_node/examples/safenode_rpc_client.rs index fccf5fd26f..0d34e51756 100644 --- a/sn_node/examples/safenode_rpc_client.rs +++ b/sn_node/examples/safenode_rpc_client.rs @@ -11,8 +11,8 @@ use eyre::Result; use libp2p::{Multiaddr, PeerId}; use safenode_proto::safe_node_client::SafeNodeClient; use safenode_proto::{ - NetworkInfoRequest, NodeEventsRequest, NodeInfoRequest, RecordAddressesRequest, RestartRequest, - StopRequest, UpdateRequest, + GossipsubPublishRequest, GossipsubSubscribeRequest, NetworkInfoRequest, NodeEventsRequest, + NodeInfoRequest, RecordAddressesRequest, RestartRequest, StopRequest, UpdateRequest, }; use sn_logging::{init_logging, LogFormat, LogOutputDest}; use sn_node::NodeEvent; @@ -49,6 +49,20 @@ enum Cmd { /// Note this blocks the app and it will print events as they are broadcasted by the node #[clap(name = "events")] Events, + /// Subscribe to a given Gossipsub topic + #[clap(name = "subscribe")] + Subscribe { + /// Name of the topic + topic: String, + }, + /// Publish a msg on a given Gossipsub topic + #[clap(name = "publish")] + Publish { + /// Name of the topic + topic: String, + /// Message to publish + msg: String, + }, /// Restart the node after the specified delay #[clap(name = "restart")] Restart { @@ -91,6 +105,8 @@ async fn main() -> Result<()> { Cmd::Info => node_info(addr).await, Cmd::Netinfo => network_info(addr).await, Cmd::Events => node_events(addr).await, + Cmd::Subscribe { topic } => gossipsub_subscribe(addr, topic).await, + Cmd::Publish { topic, msg } => gossipsub_publish(addr, topic, msg).await, Cmd::Restart { delay_millis } => node_restart(addr, delay_millis).await, Cmd::Stop { delay_millis } => node_stop(addr, delay_millis).await, Cmd::Update { delay_millis } => node_update(addr, delay_millis).await, @@ -184,6 +200,31 @@ pub async fn record_addresses(addr: SocketAddr) -> Result<()> { Ok(()) } +pub async fn gossipsub_subscribe(addr: SocketAddr, topic: String) -> Result<()> { + let endpoint = format!("https://{addr}"); + let mut client = SafeNodeClient::connect(endpoint).await?; + let _response = client + .subscribe_to_topic(Request::new(GossipsubSubscribeRequest { + topic: topic.clone(), + })) + .await?; + println!("Node successfully received the request to subscribe to topic '{topic}'"); + Ok(()) +} + +pub async fn gossipsub_publish(addr: SocketAddr, topic: String, msg: String) -> Result<()> { + let endpoint = format!("https://{addr}"); + let mut client = SafeNodeClient::connect(endpoint).await?; + let _response = client + .publish_on_topic(Request::new(GossipsubPublishRequest { + topic: topic.clone(), + msg: msg.into(), + })) + .await?; + println!("Node successfully received the request to publish on topic '{topic}'"); + Ok(()) +} + pub async fn node_restart(addr: SocketAddr, delay_millis: u64) -> Result<()> { let endpoint = format!("https://{addr}"); let mut client = SafeNodeClient::connect(endpoint).await?; diff --git a/sn_node/src/api.rs b/sn_node/src/api.rs index e9eae0c47d..4ed016eab7 100644 --- a/sn_node/src/api.rs +++ b/sn_node/src/api.rs @@ -76,6 +76,18 @@ impl RunningNode { let addresses = self.network.get_all_local_record_addresses().await?; Ok(addresses) } + + /// Subscribe to given gossipsub topic + pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> { + self.network.subscribe_to_topic(topic_id)?; + Ok(()) + } + + /// Publish a message on a given gossipsub topic + pub fn publish_on_topic(&self, topic_id: String, msg: Vec) -> Result<()> { + self.network.publish_on_topic(topic_id, msg)?; + Ok(()) + } } impl Node { @@ -225,7 +237,7 @@ impl Node { | NetworkEvent::PeerRemoved(_) | NetworkEvent::NewListenAddr(_) | NetworkEvent::NatStatusChanged(_) - | NetworkEvent::Gossipsub { .. } => break, + | NetworkEvent::GossipsubMsg { .. } => break, } } trace!("Handling NetworkEvent {event:?}"); @@ -300,9 +312,9 @@ impl Node { } } } - NetworkEvent::Gossipsub { topic, msg } => { + NetworkEvent::GossipsubMsg { topic, msg } => { self.events_channel - .broadcast(NodeEvent::Gossipsub { topic, msg }); + .broadcast(NodeEvent::GossipsubMsg { topic, msg }); } } } diff --git a/sn_node/src/bin/safenode/main.rs b/sn_node/src/bin/safenode/main.rs index 171fee282a..f1a64ab918 100644 --- a/sn_node/src/bin/safenode/main.rs +++ b/sn_node/src/bin/safenode/main.rs @@ -335,9 +335,6 @@ fn monitor_node_events(mut node_events_rx: NodeEventsReceiver, ctrl_tx: mpsc::Se break; } } - Ok(NodeEvent::Gossipsub {topic, msg}) => { - println!(">>>> GOSSIP MSG: {topic} ====> {msg:?}",); - } Ok(event) => { /* we ignore other events */ info!("Currently ignored node event {event:?}"); diff --git a/sn_node/src/bin/safenode/rpc.rs b/sn_node/src/bin/safenode/rpc.rs index 75172e936b..e636ba9b06 100644 --- a/sn_node/src/bin/safenode/rpc.rs +++ b/sn_node/src/bin/safenode/rpc.rs @@ -24,9 +24,11 @@ use tracing::{debug, info, trace}; use safenode_proto::safe_node_server::{SafeNode, SafeNodeServer}; use safenode_proto::{ - NetworkInfoRequest, NetworkInfoResponse, NodeEvent, NodeEventsRequest, NodeInfoRequest, - NodeInfoResponse, RecordAddressesRequest, RecordAddressesResponse, RestartRequest, - RestartResponse, StopRequest, StopResponse, UpdateRequest, UpdateResponse, + GossipsubPublishRequest, GossipsubPublishResponse, GossipsubSubscribeRequest, + GossipsubSubscribeResponse, NetworkInfoRequest, NetworkInfoResponse, NodeEvent, + NodeEventsRequest, NodeInfoRequest, NodeInfoResponse, RecordAddressesRequest, + RecordAddressesResponse, RestartRequest, RestartResponse, StopRequest, StopResponse, + UpdateRequest, UpdateResponse, }; // this includes code generated from .proto files @@ -153,6 +155,52 @@ impl SafeNode for SafeNodeRpcService { Ok(Response::new(RecordAddressesResponse { addresses })) } + async fn subscribe_to_topic( + &self, + request: Request, + ) -> Result, Status> { + trace!( + "RPC request received at {}: {:?}", + self.addr, + request.get_ref() + ); + + let topic = &request.get_ref().topic; + + match self.running_node.subscribe_to_topic(topic.clone()) { + Ok(()) => Ok(Response::new(GossipsubSubscribeResponse {})), + Err(err) => Err(Status::new( + Code::Internal, + format!("Failed to subscribe to topic '{topic}': {err}"), + )), + } + } + + async fn publish_on_topic( + &self, + request: Request, + ) -> Result, Status> { + trace!( + "RPC request received at {}: {:?}", + self.addr, + request.get_ref() + ); + + let topic = &request.get_ref().topic; + let msg = &request.get_ref().msg; + + match self + .running_node + .publish_on_topic(topic.clone(), msg.clone()) + { + Ok(()) => Ok(Response::new(GossipsubPublishResponse {})), + Err(err) => Err(Status::new( + Code::Internal, + format!("Failed to publish on topic '{topic}': {err}"), + )), + } + } + async fn stop(&self, request: Request) -> Result, Status> { trace!( "RPC request received at {}: {:?}", diff --git a/sn_node/src/event.rs b/sn_node/src/event.rs index b27d60d797..4c14b79b04 100644 --- a/sn_node/src/event.rs +++ b/sn_node/src/event.rs @@ -60,7 +60,7 @@ pub enum NodeEvent { /// AutoNAT discovered we are behind a NAT, thus private. BehindNat, /// Gossipsub message received - Gossipsub { + GossipsubMsg { /// Topic the message was published on topic: String, /// The raw bytes of the received message diff --git a/sn_node/src/protocol/safenode_proto/req_resp_types.proto b/sn_node/src/protocol/safenode_proto/req_resp_types.proto index d157a16b12..026ca26175 100644 --- a/sn_node/src/protocol/safenode_proto/req_resp_types.proto +++ b/sn_node/src/protocol/safenode_proto/req_resp_types.proto @@ -45,6 +45,21 @@ message RecordAddressesResponse { repeated bytes addresses = 1; } +// Subsribe to a gossipsub topic +message GossipsubSubscribeRequest { + string topic = 1; +} + +message GossipsubSubscribeResponse {} + +// Publish a msg on a gossipsub topic +message GossipsubPublishRequest { + string topic = 1; + bytes msg = 2; +} + +message GossipsubPublishResponse {} + // Stop the safenode app message StopRequest { uint64 delay_millis = 1; diff --git a/sn_node/src/protocol/safenode_proto/safenode.proto b/sn_node/src/protocol/safenode_proto/safenode.proto index 6ac49cac0a..83661751ef 100644 --- a/sn_node/src/protocol/safenode_proto/safenode.proto +++ b/sn_node/src/protocol/safenode_proto/safenode.proto @@ -34,6 +34,12 @@ service SafeNode { // Returns the Addresses of all the Records stored by this node rpc RecordAddresses (RecordAddressesRequest) returns (RecordAddressesResponse); + // Subscribe to a Gossipsub topic + rpc SubscribeToTopic (GossipsubSubscribeRequest) returns (GossipsubSubscribeResponse); + + // Publish a msg on a Gossipsub topic + rpc PublishOnTopic (GossipsubPublishRequest) returns (GossipsubPublishResponse); + // Stop the execution of this node rpc Stop (StopRequest) returns (StopResponse);