Skip to content

Commit

Permalink
feat: special event for transfer notifs over gossipsub
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Oct 4, 2023
1 parent fba46f0 commit 3ce2289
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 7 deletions.
28 changes: 25 additions & 3 deletions sn_node/examples/safenode_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ enum Cmd {
/// Note this blocks the app and it will print events as they are broadcasted by the node
#[clap(name = "events")]
Events,
/// Start listening for node rewards events.
/// Note this blocks the app and it will print events as they are broadcasted by the node
#[clap(name = "rewards")]
RewardsEvents,
/// Subscribe to a given Gossipsub topic
#[clap(name = "subscribe")]
Subscribe {
Expand Down Expand Up @@ -111,7 +115,8 @@ async fn main() -> Result<()> {
match opt.cmd {
Cmd::Info => node_info(addr).await,
Cmd::Netinfo => network_info(addr).await,
Cmd::Events => node_events(addr).await,
Cmd::Events => node_events(addr, false).await,
Cmd::RewardsEvents => node_events(addr, true).await,
Cmd::Subscribe { topic } => gossipsub_subscribe(addr, topic).await,
Cmd::Unsubscribe { topic } => gossipsub_unsubscribe(addr, topic).await,
Cmd::Publish { topic, msg } => gossipsub_publish(addr, topic, msg).await,
Expand Down Expand Up @@ -169,17 +174,34 @@ pub async fn network_info(addr: SocketAddr) -> Result<()> {
Ok(())
}

pub async fn node_events(addr: SocketAddr) -> Result<()> {
pub async fn node_events(addr: SocketAddr, only_rewards: bool) -> Result<()> {
let endpoint = format!("https://{addr}");
let mut client = SafeNodeClient::connect(endpoint).await?;
let response = client
.node_events(Request::new(NodeEventsRequest {}))
.await?;

println!("Listening to node events... (press Ctrl+C to exit)");
if only_rewards {
println!("Listening to node rewards notifications... (press Ctrl+C to exit)");
} else {
println!("Listening to node events... (press Ctrl+C to exit)");
}

let mut stream = response.into_inner();
while let Some(Ok(e)) = stream.next().await {
let event = match NodeEvent::from_bytes(&e.event) {
Ok(NodeEvent::TransferNotif { key, transfer }) if only_rewards => {
println!(
"New node reward notification received: {key:?} {}",
transfer.to_hex()?
);
continue;
}
Ok(_) if only_rewards => continue,
Ok(NodeEvent::GossipsubMsg { topic, msg }) => {
println!("New event received: {topic} - {}", String::from_utf8(msg)?);
continue;
}
Ok(event) => event,
Err(_) => {
println!("Error while parsing received NodeEvent");
Expand Down
26 changes: 23 additions & 3 deletions sn_node/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ use std::{
};
use tokio::task::spawn;

/// Expected prefix string in a topic name where transfers notifications are sent on.
/// E.g. transfer notifications for key `pub-key-A` will be sent on a topic named `TRANSFER_NOTIF_pub-key-A`.
/// The notification msg is expected to be encrypted against the referencced public key.
const TRANSFER_NOTIF_TOPIC_PREFIX: &str = "TRANSFER_NOTIF_";

/// Once a node is started and running, the user obtains
/// a `NodeRunning` object which can be used to interact with it.
#[derive(Clone)]
Expand Down Expand Up @@ -325,9 +330,24 @@ impl Node {
error!("Failed to remove local record: {e:?}");
}
}
NetworkEvent::GossipsubMsg { topic, msg } => {
self.events_channel
.broadcast(NodeEvent::GossipsubMsg { topic, msg });
NetworkEvent::GossipsubMsg { mut topic, msg } => {
info!(
">>> New Gossipsub msg received on topic '{topic}': {}",
String::from_utf8(msg.clone()).unwrap_or_else(|m| format!("{m:?}"))
);
if topic.starts_with(TRANSFER_NOTIF_TOPIC_PREFIX) {
// this is a notification of a transfer we are subscribed to
let key_hex = topic.split_off(TRANSFER_NOTIF_TOPIC_PREFIX.len());
let key_str = String::from_utf8(hex::decode(key_hex).unwrap()).unwrap();
let key = bls::PublicKey::from_hex(&key_str).unwrap();
let transfer =
sn_transfers::Transfer::from_hex(&String::from_utf8(msg).unwrap()).unwrap();
self.events_channel
.broadcast(NodeEvent::TransferNotif { key, transfer });
} else {
self.events_channel
.broadcast(NodeEvent::GossipsubMsg { topic, msg });
}
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion sn_node/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::error::{Error, Result};
use bls::PublicKey;
use serde::{Deserialize, Serialize};
use sn_protocol::storage::{ChunkAddress, RegisterAddress};
use sn_transfers::UniquePubkey;
use sn_transfers::{Transfer, UniquePubkey};
use tokio::sync::broadcast;

const NODE_EVENT_CHANNEL_SIZE: usize = 10_000;
Expand Down Expand Up @@ -66,6 +67,13 @@ pub enum NodeEvent {
/// The raw bytes of the received message
msg: Vec<u8>,
},
/// Transfer notification message received for a public key
TransferNotif {
/// Public key the transfer notification is about
key: PublicKey,
/// The encrypted transfer
transfer: Transfer,
},
}

impl NodeEvent {
Expand Down

0 comments on commit 3ce2289

Please sign in to comment.