diff --git a/sn_node/examples/safenode_rpc_client.rs b/sn_node/examples/safenode_rpc_client.rs index 8ba2683cbb..5536d0103c 100644 --- a/sn_node/examples/safenode_rpc_client.rs +++ b/sn_node/examples/safenode_rpc_client.rs @@ -50,6 +50,10 @@ enum Cmd { /// Note this blocks the app and it will print events as they are broadcasted by the node #[clap(name = "events")] Events, + /// Start listening for node rewards events. + /// Note this blocks the app and it will print events as they are broadcasted by the node + #[clap(name = "rewards")] + RewardsEvents, /// Subscribe to a given Gossipsub topic #[clap(name = "subscribe")] Subscribe { @@ -111,7 +115,8 @@ async fn main() -> Result<()> { match opt.cmd { Cmd::Info => node_info(addr).await, Cmd::Netinfo => network_info(addr).await, - Cmd::Events => node_events(addr).await, + Cmd::Events => node_events(addr, false).await, + Cmd::RewardsEvents => node_events(addr, true).await, Cmd::Subscribe { topic } => gossipsub_subscribe(addr, topic).await, Cmd::Unsubscribe { topic } => gossipsub_unsubscribe(addr, topic).await, Cmd::Publish { topic, msg } => gossipsub_publish(addr, topic, msg).await, @@ -169,17 +174,34 @@ pub async fn network_info(addr: SocketAddr) -> Result<()> { Ok(()) } -pub async fn node_events(addr: SocketAddr) -> Result<()> { +pub async fn node_events(addr: SocketAddr, only_rewards: bool) -> Result<()> { let endpoint = format!("https://{addr}"); let mut client = SafeNodeClient::connect(endpoint).await?; let response = client .node_events(Request::new(NodeEventsRequest {})) .await?; - println!("Listening to node events... (press Ctrl+C to exit)"); + if only_rewards { + println!("Listening to node rewards notifications... (press Ctrl+C to exit)"); + } else { + println!("Listening to node events... (press Ctrl+C to exit)"); + } + let mut stream = response.into_inner(); while let Some(Ok(e)) = stream.next().await { let event = match NodeEvent::from_bytes(&e.event) { + Ok(NodeEvent::TransferNotif { key, transfer }) if only_rewards => { + println!( + "New node reward notification received: {key:?} {}", + transfer.to_hex()? + ); + continue; + } + Ok(_) if only_rewards => continue, + Ok(NodeEvent::GossipsubMsg { topic, msg }) => { + println!("New event received: {topic} - {}", String::from_utf8(msg)?); + continue; + } Ok(event) => event, Err(_) => { println!("Error while parsing received NodeEvent"); diff --git a/sn_node/src/api.rs b/sn_node/src/api.rs index 83e2f23f8a..2a4edfccc0 100644 --- a/sn_node/src/api.rs +++ b/sn_node/src/api.rs @@ -35,6 +35,11 @@ use std::{ }; use tokio::task::spawn; +/// Expected prefix string in a topic name where transfers notifications are sent on. +/// E.g. transfer notifications for key `pub-key-A` will be sent on a topic named `TRANSFER_NOTIF_pub-key-A`. +/// The notification msg is expected to be encrypted against the referencced public key. +const TRANSFER_NOTIF_TOPIC_PREFIX: &str = "TRANSFER_NOTIF_"; + /// Once a node is started and running, the user obtains /// a `NodeRunning` object which can be used to interact with it. #[derive(Clone)] @@ -325,9 +330,24 @@ impl Node { error!("Failed to remove local record: {e:?}"); } } - NetworkEvent::GossipsubMsg { topic, msg } => { - self.events_channel - .broadcast(NodeEvent::GossipsubMsg { topic, msg }); + NetworkEvent::GossipsubMsg { mut topic, msg } => { + info!( + ">>> New Gossipsub msg received on topic '{topic}': {}", + String::from_utf8(msg.clone()).unwrap_or_else(|m| format!("{m:?}")) + ); + if topic.starts_with(TRANSFER_NOTIF_TOPIC_PREFIX) { + // this is a notification of a transfer we are subscribed to + let key_hex = topic.split_off(TRANSFER_NOTIF_TOPIC_PREFIX.len()); + let key_str = String::from_utf8(hex::decode(key_hex).unwrap()).unwrap(); + let key = bls::PublicKey::from_hex(&key_str).unwrap(); + let transfer = + sn_transfers::Transfer::from_hex(&String::from_utf8(msg).unwrap()).unwrap(); + self.events_channel + .broadcast(NodeEvent::TransferNotif { key, transfer }); + } else { + self.events_channel + .broadcast(NodeEvent::GossipsubMsg { topic, msg }); + } } } } diff --git a/sn_node/src/event.rs b/sn_node/src/event.rs index 4c14b79b04..9015786dd4 100644 --- a/sn_node/src/event.rs +++ b/sn_node/src/event.rs @@ -7,9 +7,10 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::error::{Error, Result}; +use bls::PublicKey; use serde::{Deserialize, Serialize}; use sn_protocol::storage::{ChunkAddress, RegisterAddress}; -use sn_transfers::UniquePubkey; +use sn_transfers::{Transfer, UniquePubkey}; use tokio::sync::broadcast; const NODE_EVENT_CHANNEL_SIZE: usize = 10_000; @@ -66,6 +67,13 @@ pub enum NodeEvent { /// The raw bytes of the received message msg: Vec, }, + /// Transfer notification message received for a public key + TransferNotif { + /// Public key the transfer notification is about + key: PublicKey, + /// The encrypted transfer + transfer: Transfer, + }, } impl NodeEvent {