Skip to content

Commit

Permalink
feat(apis): adding client and node APIs, as well as safenode RPC serv…
Browse files Browse the repository at this point in the history
…ices to pub/sub to gossipsub topics
  • Loading branch information
bochaco committed Sep 21, 2023
1 parent 00f066d commit a0d4f29
Show file tree
Hide file tree
Showing 18 changed files with 279 additions and 59 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sn_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
cli::Opt,
subcommands::{
files::files_cmds,
gossipsub::gossipsub_cmds,
register::register_cmds,
wallet::{wallet_cmds, wallet_cmds_without_client, WalletCmds},
SubCmd,
Expand Down Expand Up @@ -109,6 +110,7 @@ async fn main() -> Result<()> {
SubCmd::Register(cmds) => {
register_cmds(cmds, &client, &client_data_dir_path, should_verify_store).await?
}
SubCmd::Gossipsub(cmds) => gossipsub_cmds(cmds, &client).await?,
};

Ok(())
Expand Down
51 changes: 51 additions & 0 deletions sn_cli/src/subcommands/gossipsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2023 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use clap::Subcommand;
use color_eyre::Result;
use sn_client::{Client, ClientEvent};

#[derive(Subcommand, Debug)]
pub enum GossipsubCmds {
/// Subscribe to a topic and listen for messages published on it
Subscribe {
/// The name of the topic.
#[clap(name = "topic")]
topic: String,
},
/// Publish a message on a given topic
Publish {
/// The name of the topic.
#[clap(name = "topic")]
topic: String,
/// The message to publish.
#[clap(name = "msg")]
msg: String,
},
}

pub(crate) async fn gossipsub_cmds(cmds: GossipsubCmds, client: &Client) -> Result<()> {
match cmds {
GossipsubCmds::Subscribe { topic } => {
client.subscribe_to_topic(topic.clone())?;
println!("Subscribed to topic '{topic}'. Listening for messages published on it...");
let mut events_channel = client.events_channel();
while let Ok(event) = events_channel.recv().await {
if let ClientEvent::GossipsubMsg { msg, .. } = event {
let msg = String::from_utf8(msg)?;
println!("New message published: {msg}");
}
}
}
GossipsubCmds::Publish { topic, msg } => {
client.publish_on_topic(topic.clone(), msg.into())?;
println!("Message published on topic '{topic}'.");
}
}
Ok(())
}
4 changes: 4 additions & 0 deletions sn_cli/src/subcommands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.
pub(crate) mod files;
pub(crate) mod gossipsub;
pub(crate) mod register;
pub(crate) mod wallet;

Expand All @@ -22,4 +23,7 @@ pub(super) enum SubCmd {
#[clap(name = "register", subcommand)]
/// Commands for register management
Register(register::RegisterCmds),
#[clap(name = "gossipsub", subcommand)]
/// Commands for gossipsub management
Gossipsub(gossipsub::GossipsubCmds),
}
72 changes: 47 additions & 25 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl Client {

continue;
}
Ok(ClientEvent::GossipsubMsg { .. }) => {}
Err(err) => {
error!("Unexpected error during client startup {err:?}");
println!("Unexpected error during client startup {err:?}");
Expand Down Expand Up @@ -204,36 +205,43 @@ impl Client {
}

fn handle_network_event(&mut self, event: NetworkEvent) -> Result<()> {
if let NetworkEvent::PeerAdded(peer_id) = event {
self.peers_added += 1;
debug!("PeerAdded: {peer_id}");

// In case client running in non-local-discovery mode,
// it may take some time to fill up the RT.
// To avoid such delay may fail the query with RecordNotFound,
// wait till certain amount of peers populated into RT
if self.peers_added >= CLOSE_GROUP_SIZE {
if let Some(progress) = &self.progress {
progress.finish_with_message("Connected to the Network");
// Remove the progress bar
self.progress = None;
}
match event {
NetworkEvent::PeerAdded(peer_id) => {
self.peers_added += 1;
debug!("PeerAdded: {peer_id}");

// In case client running in non-local-discovery mode,
// it may take some time to fill up the RT.
// To avoid such delay may fail the query with RecordNotFound,
// wait till certain amount of peers populated into RT
if self.peers_added >= CLOSE_GROUP_SIZE {
if let Some(progress) = &self.progress {
progress.finish_with_message("Connected to the Network");
// Remove the progress bar
self.progress = None;
}

self.events_channel
.broadcast(ClientEvent::ConnectedToNetwork)?;
} else {
debug!(
"{}/{} initial peers found.",
self.peers_added, CLOSE_GROUP_SIZE
);

if let Some(progress) = &self.progress {
progress.set_message(format!(
self.events_channel
.broadcast(ClientEvent::ConnectedToNetwork)?;
} else {
debug!(
"{}/{} initial peers found.",
self.peers_added, CLOSE_GROUP_SIZE
));
);

if let Some(progress) = &self.progress {
progress.set_message(format!(
"{}/{} initial peers found.",
self.peers_added, CLOSE_GROUP_SIZE
));
}
}
}
NetworkEvent::GossipsubMsg { topic, msg } => {
self.events_channel
.broadcast(ClientEvent::GossipsubMsg { topic, msg })?;
}
_other => {}
}

Ok(())
Expand Down Expand Up @@ -491,4 +499,18 @@ impl Client {

Ok(adjusted_costs)
}

/// Subscribe to given gossipsub topic
pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> {
info!("Subscribing to topic id: {topic_id}");
self.network.subscribe_to_topic(topic_id)?;
Ok(())
}

/// Publish message on given topic
pub fn publish_on_topic(&self, topic_id: String, msg: Vec<u8>) -> Result<()> {
info!("Publishing msg on topic id: {topic_id}");
self.network.publish_on_topic(topic_id, msg)?;
Ok(())
}
}
7 changes: 7 additions & 0 deletions sn_client/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ pub enum ClientEvent {
/// No network activity has been received for a given duration
/// we should error out
InactiveClient(std::time::Duration),
/// Gossipsub message received on a topic the client has subscribed to
GossipsubMsg {
/// Topic the message was published on
topic: String,
/// The raw bytes of the received message
msg: Vec<u8>,
},
}

/// Receiver Channel where users of the public API can listen to events broadcasted by the client.
Expand Down
16 changes: 11 additions & 5 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,12 @@ pub enum SwarmCmd {
AddKeysToReplicationFetcher {
keys: Vec<NetworkAddress>,
},
/// Subscribe to a given Gossipsub topic
GossipsubSubscribe(String),
/// Publish a message through Gossipsub protocol
GossipMsg {
GossipsubPublish {
/// Topic to publish on
topic_id: libp2p::gossipsub::IdentTopic,
topic_id: String,
/// Raw bytes of the message to publish
msg: Vec<u8>,
},
Expand Down Expand Up @@ -359,12 +361,16 @@ impl SwarmDriver {
.send(current_state)
.map_err(|_| Error::InternalMsgChannelDropped)?;
}
SwarmCmd::GossipMsg { topic_id, msg } => {
SwarmCmd::GossipsubSubscribe(topic_id) => {
let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id);
self.swarm.behaviour_mut().gossipsub.subscribe(&topic_id)?;
}
SwarmCmd::GossipsubPublish { topic_id, msg } => {
let topic_id = libp2p::gossipsub::IdentTopic::new(topic_id);
self.swarm
.behaviour_mut()
.gossipsub
.publish(topic_id, msg)
.unwrap();
.publish(topic_id, msg)?;
}
}

Expand Down
10 changes: 3 additions & 7 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,9 @@ impl NetworkBuilder {
libp2p::gossipsub::MessageAuthenticity::Signed(self.keypair.clone());

// build a gossipsub network behaviour
let mut gossipsub: libp2p::gossipsub::Behaviour =
libp2p::gossipsub::Behaviour::new(message_authenticity, gossipsub_config).unwrap();

// Create a Gossipsub topic
let topic = libp2p::gossipsub::IdentTopic::new("example-topic");
// subscribe to the topic
gossipsub.subscribe(&topic).unwrap();
let gossipsub: libp2p::gossipsub::Behaviour =
libp2p::gossipsub::Behaviour::new(message_authenticity, gossipsub_config)
.expect("Failed to instantiate Gossipsub behaviour.");

if !self.local {
debug!("Preventing non-global dials");
Expand Down
7 changes: 7 additions & 0 deletions sn_networking/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::{
gossipsub::{PublishError, SubscriptionError},
kad::{self, Record},
request_response::{OutboundFailure, RequestId},
swarm::DialError,
Expand Down Expand Up @@ -103,6 +104,12 @@ pub enum Error {
#[cfg(feature = "open-metrics")]
#[error("Network Metric error")]
NetworkMetricError,

#[error("Gossipsub publish Error: {0}")]
GossipsubPublishError(#[from] PublishError),

#[error("Gossipsub subscribe Error: {0}")]
GossipsubSubscriptionError(#[from] SubscriptionError),
}

#[cfg(test)]
Expand Down
8 changes: 4 additions & 4 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub enum NetworkEvent {
/// Report unverified record
UnverifiedRecord(Record),
/// Gossipsub message received
Gossipsub {
GossipsubMsg {
/// Topic the message was published on
topic: String,
/// The raw bytes of the received message
Expand Down Expand Up @@ -176,8 +176,8 @@ impl Debug for NetworkEvent {
let pretty_key = PrettyPrintRecordKey::from(record.key.clone());
write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})")
}
NetworkEvent::Gossipsub { topic, .. } => {
write!(f, "NetworkEvent::Gossipsub({topic})")
NetworkEvent::GossipsubMsg { topic, .. } => {
write!(f, "NetworkEvent::GossipsubMsg({topic})")
}
}
}
Expand Down Expand Up @@ -335,7 +335,7 @@ impl SwarmDriver {
libp2p::gossipsub::Event::Message { message, .. } => {
let topic = message.topic.into_string();
let msg = message.data;
self.send_event(NetworkEvent::Gossipsub { topic, msg });
self.send_event(NetworkEvent::GossipsubMsg { topic, msg });
}
other => trace!("Gossipsub Event has been ignored: {other:?}"),
},
Expand Down
12 changes: 9 additions & 3 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,15 @@ impl Network {
get_fees_from_store_cost_responses(all_costs)
}

pub fn publish_on_topic(&self, msg: Vec<u8>) -> Result<()> {
let topic_id = libp2p::gossipsub::IdentTopic::new("example-topic");
self.send_swarm_cmd(SwarmCmd::GossipMsg { topic_id, msg })?;
/// Subscribe to given gossipsub topic
pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipsubSubscribe(topic_id))?;
Ok(())
}

/// Publish a msg on a given topic
pub fn publish_on_topic(&self, topic_id: String, msg: Vec<u8>) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipsubPublish { topic_id, msg })?;
Ok(())
}

Expand Down
Loading

0 comments on commit a0d4f29

Please sign in to comment.