From 1a20c437b1a9e3d55543fcae0486979fe992b212 Mon Sep 17 00:00:00 2001 From: Chris O'Neil Date: Fri, 10 Nov 2023 22:11:54 +0000 Subject: [PATCH] chore: introduce sn_rpc_client Provides a client which is a wrapper around the Protobuf-generated code and types. The intention is to avoid redefining that code in different places where the RPC protocol is used. Right now it doesn't implement the full protocol; for example, the node events stream is missing. However, it should serve as a starting point, and it is being used by the node manager. The `sn_node_rpc_client` crate was extended so that it now doubles as a library and a binary, with the client being added to the library. The binary was updated to use the client, where appropriate. --- Cargo.lock | 3 + sn_node_rpc_client/Cargo.toml | 3 + sn_node_rpc_client/README.md | 47 ++++++++- sn_node_rpc_client/src/error.rs | 16 +++ sn_node_rpc_client/src/lib.rs | 172 ++++++++++++++++++++++++++++++++ sn_node_rpc_client/src/main.rs | 96 ++++++------------ 6 files changed, 271 insertions(+), 66 deletions(-) create mode 100644 sn_node_rpc_client/src/error.rs create mode 100644 sn_node_rpc_client/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 7339a50adc..41a3cc5955 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4723,17 +4723,20 @@ name = "sn_node_rpc_client" version = "0.1.42" dependencies = [ "assert_fs", + "async-trait", "blsttc", "clap 4.4.7", "color-eyre", "hex", "libp2p 0.52.4", + "libp2p-identity", "sn_client", "sn_logging", "sn_node", "sn_peers_acquisition", "sn_protocol", "sn_transfers", + "thiserror", "tokio", "tokio-stream", "tonic 0.6.2", diff --git a/sn_node_rpc_client/Cargo.toml b/sn_node_rpc_client/Cargo.toml index 4833de47bf..0929f14960 100644 --- a/sn_node_rpc_client/Cargo.toml +++ b/sn_node_rpc_client/Cargo.toml @@ -16,17 +16,20 @@ name="safenode_rpc_client" [dependencies] assert_fs = "1.0.0" +async-trait = "0.1" bls = { package = "blsttc", version = "8.0.1" } clap = { version = "4.2.1", features = ["derive"] } color-eyre = "0.6.2" hex = "~0.4.3" libp2p = { version="0.52", features = ["kad"]} +libp2p-identity = { version="0.2.7", features = ["rand"] } sn_client = { path = "../sn_client", version = "0.98.16" } sn_logging = { path = "../sn_logging", version = "0.2.15" } sn_node = { path = "../sn_node", version = "0.98.32" } sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.1.10" } sn_protocol = { path = "../sn_protocol", version = "0.8.29" } sn_transfers = { path = "../sn_transfers", version = "0.14.21" } +thiserror = "1.0.23" # # watch out updating this, protoc compiler needs to be installed on all build systems # # arm builds + musl are very problematic tonic = { version = "0.6.2" } diff --git a/sn_node_rpc_client/README.md b/sn_node_rpc_client/README.md index eebf71d180..0496592ecc 100644 --- a/sn_node_rpc_client/README.md +++ b/sn_node_rpc_client/README.md @@ -1,7 +1,50 @@ # Safenode RPC Client -This binary provides a command line interface to interact with a running Safenode instance. -## Usage +This crate provides a client for the RPC protocol for interacting with `safenode`. It wraps the Protobuf-generated code and types such that users of the RPC protocol don't need to redefine that code. + +It also provides a binary which is a CLI for interacting with a running `safenode` instance via the protocol. + +## RPC Actions + +The `RpcActions` trait defines the protocol that is currently available for interacting with `safenode`: +``` +node_info: Returns information about the node, such as its peer ID and version. +network_info: Retrieves network-related information, such as the peers currently connected to the node. +record_addresses: Provides a list of the node's record addresses. +gossipsub_subscribe: Subscribes to a specific topic on the gossipsub network. +gossipsub_unsubscribe: Unsubscribes from a given topic on the gossipsub network. +gossipsub_publish: Publishes a message to a specified topic on the gossipsub network. +restart_node: Requests the node to restart. +stop_node: Requests the node to stop its operations. +update_node: Updates the node with provided parameters. +``` + +Users of the crate can program against the trait rather than the `RpcClient` implementation. + +This can facilitate behaviour-based unit testing, like so: +``` +use mockall::mock; +use mockall::predicate::*; + +mock! { + pub RpcClient {} + #[async_trait] + impl RpcClientInterface for RpcClient { + async fn node_info(&self) -> RpcResult; + async fn network_info(&self) -> RpcResult; + async fn record_addresses(&self) -> Result>; + async fn gossipsub_subscribe(&self, topic: &str) -> Result<()>; + async fn gossipsub_unsubscribe(&self, topic: &str) -> Result<()>; + async fn gossipsub_publish(&self, topic: &str, message: &str) -> Result<()>; + async fn node_restart(&self, delay_millis: u64) -> Result<()>; + async fn node_stop(&self, delay_millis: u64) -> Result<()>; + async fn node_update(&self, delay_millis: u64) -> Result<()>; + } +} +``` + +## Binary Usage + Run `cargo run -- ` to connect to a node. Provide the address of the node's RPC service, e.g. 127.0.0.1:12001. Followed by the command to execute. Some of the commands available are: - `info`: Retrieve information about the node itself diff --git a/sn_node_rpc_client/src/error.rs b/sn_node_rpc_client/src/error.rs new file mode 100644 index 0000000000..7f7471c6bd --- /dev/null +++ b/sn_node_rpc_client/src/error.rs @@ -0,0 +1,16 @@ +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +#[allow(missing_docs)] +pub enum Error { + #[error(transparent)] + MultiAddrError(#[from] libp2p::multiaddr::Error), + #[error(transparent)] + ParseError(#[from] libp2p_identity::ParseError), + #[error(transparent)] + TonicStatusError(#[from] tonic::Status), + #[error(transparent)] + TonicTransportError(#[from] tonic::transport::Error), +} diff --git a/sn_node_rpc_client/src/lib.rs b/sn_node_rpc_client/src/lib.rs new file mode 100644 index 0000000000..271018ec8b --- /dev/null +++ b/sn_node_rpc_client/src/lib.rs @@ -0,0 +1,172 @@ +mod error; + +pub use crate::error::{Error, Result}; + +use async_trait::async_trait; +use libp2p::kad::RecordKey; +use libp2p::{Multiaddr, PeerId}; +use sn_protocol::safenode_proto::{ + safe_node_client::SafeNodeClient, GossipsubPublishRequest, GossipsubSubscribeRequest, + GossipsubUnsubscribeRequest, NetworkInfoRequest, NodeInfoRequest, RecordAddressesRequest, + RestartRequest, StopRequest, UpdateRequest, +}; +use std::path::PathBuf; +use std::str::FromStr; +use std::time::Duration; +use tonic::Request; + +#[derive(Debug, Clone)] +pub struct NodeInfo { + pub pid: u32, + pub peer_id: PeerId, + pub log_path: PathBuf, + pub version: String, + pub uptime: Duration, +} + +#[derive(Debug, Clone)] +pub struct NetworkInfo { + pub connected_peers: Vec, + pub listeners: Vec, +} + +#[derive(Debug, Clone)] +pub struct RecordAddress { + pub key: RecordKey, +} + +#[async_trait] +pub trait RpcActions { + async fn node_info(&self) -> Result; + async fn network_info(&self) -> Result; + async fn record_addresses(&self) -> Result>; + async fn gossipsub_subscribe(&self, topic: &str) -> Result<()>; + async fn gossipsub_unsubscribe(&self, topic: &str) -> Result<()>; + async fn gossipsub_publish(&self, topic: &str, message: &str) -> Result<()>; + async fn node_restart(&self, delay_millis: u64) -> Result<()>; + async fn node_stop(&self, delay_millis: u64) -> Result<()>; + async fn node_update(&self, delay_millis: u64) -> Result<()>; +} + +pub struct RpcClient { + endpoint: String, +} + +impl RpcClient { + pub fn new(endpoint: &str) -> RpcClient { + RpcClient { + endpoint: endpoint.to_string(), + } + } +} + +#[async_trait] +impl RpcActions for RpcClient { + async fn node_info(&self) -> Result { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let response = client.node_info(Request::new(NodeInfoRequest {})).await?; + let node_info_resp = response.get_ref(); + let peer_id = PeerId::from_bytes(&node_info_resp.peer_id)?; + let node_info = NodeInfo { + pid: node_info_resp.pid, + peer_id, + log_path: PathBuf::from(node_info_resp.log_dir.clone()), + version: node_info_resp.bin_version.clone(), + uptime: Duration::from_secs(node_info_resp.uptime_secs), + }; + Ok(node_info) + } + + async fn network_info(&self) -> Result { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let response = client + .network_info(Request::new(NetworkInfoRequest {})) + .await?; + let network_info = response.get_ref(); + + let mut connected_peers = Vec::new(); + for bytes in network_info.connected_peers.iter() { + let peer_id = PeerId::from_bytes(bytes)?; + connected_peers.push(peer_id); + } + + let mut listeners = Vec::new(); + for multiaddr_str in network_info.listeners.iter() { + let multiaddr = Multiaddr::from_str(multiaddr_str)?; + listeners.push(multiaddr); + } + + Ok(NetworkInfo { + connected_peers, + listeners, + }) + } + + async fn record_addresses(&self) -> Result> { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let response = client + .record_addresses(Request::new(RecordAddressesRequest {})) + .await?; + let mut record_addresses = vec![]; + for bytes in response.get_ref().addresses.iter() { + let key = libp2p::kad::RecordKey::from(bytes.clone()); + record_addresses.push(RecordAddress { key }); + } + Ok(record_addresses) + } + + async fn gossipsub_subscribe(&self, topic: &str) -> Result<()> { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let _response = client + .subscribe_to_topic(Request::new(GossipsubSubscribeRequest { + topic: topic.to_string(), + })) + .await?; + Ok(()) + } + + async fn gossipsub_unsubscribe(&self, topic: &str) -> Result<()> { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let _response = client + .unsubscribe_from_topic(Request::new(GossipsubUnsubscribeRequest { + topic: topic.to_string(), + })) + .await?; + Ok(()) + } + + async fn gossipsub_publish(&self, topic: &str, msg: &str) -> Result<()> { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let _response = client + .publish_on_topic(Request::new(GossipsubPublishRequest { + topic: topic.to_string(), + msg: msg.into(), + })) + .await?; + Ok(()) + } + + async fn node_restart(&self, delay_millis: u64) -> Result<()> { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let _response = client + .restart(Request::new(RestartRequest { delay_millis })) + .await?; + Ok(()) + } + + async fn node_stop(&self, delay_millis: u64) -> Result<()> { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let _response = client + .stop(Request::new(StopRequest { delay_millis })) + .await?; + Ok(()) + } + + async fn node_update(&self, delay_millis: u64) -> Result<()> { + let mut client = SafeNodeClient::connect(self.endpoint.clone()).await?; + let _response = client + .update(Request::new(UpdateRequest { delay_millis })) + .await?; + Ok(()) + } +} diff --git a/sn_node_rpc_client/src/main.rs b/sn_node_rpc_client/src/main.rs index 3737fd6a21..71e81e5514 100644 --- a/sn_node_rpc_client/src/main.rs +++ b/sn_node_rpc_client/src/main.rs @@ -5,25 +5,25 @@ // under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +// + +use sn_node_rpc_client::{RpcActions, RpcClient}; use assert_fs::TempDir; use bls::SecretKey; use clap::Parser; use color_eyre::eyre::{eyre, Result}; -use libp2p::{Multiaddr, PeerId}; +use libp2p::Multiaddr; use sn_client::Client; use sn_logging::LogBuilder; use sn_node::NodeEvent; use sn_peers_acquisition::{parse_peers_args, PeersArgs}; use sn_protocol::safenode_proto::{ - safe_node_client::SafeNodeClient, GossipsubPublishRequest, GossipsubSubscribeRequest, - GossipsubUnsubscribeRequest, NetworkInfoRequest, NodeEventsRequest, NodeInfoRequest, - RecordAddressesRequest, RestartRequest, StopRequest, TransferNotifsFilterRequest, - UpdateRequest, + safe_node_client::SafeNodeClient, NodeEventsRequest, TransferNotifsFilterRequest, }; use sn_protocol::storage::SpendAddress; use sn_transfers::{LocalWallet, MainPubkey, MainSecretKey}; -use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr, time::Duration}; +use std::{fs, net::SocketAddr, path::PathBuf, time::Duration}; use tokio_stream::StreamExt; use tonic::Request; use tracing_core::Level; @@ -154,47 +154,37 @@ async fn main() -> Result<()> { pub async fn node_info(addr: SocketAddr) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint.clone()).await?; - let response = client.node_info(Request::new(NodeInfoRequest {})).await?; - let node_info = response.get_ref(); - let peer_id = PeerId::from_bytes(&node_info.peer_id)?; + let client = RpcClient::new(&endpoint); + let node_info = client.node_info().await?; println!("Node info:"); println!("=========="); println!("RPC endpoint: {endpoint}"); - println!("Peer Id: {peer_id}"); - println!("Logs dir: {}", node_info.log_dir); + println!("Peer Id: {}", node_info.peer_id); + println!("Logs dir: {}", node_info.log_path.to_string_lossy()); println!("PID: {}", node_info.pid); - println!("Binary version: {}", node_info.bin_version); - println!( - "Time since last restart: {:?}", - Duration::from_secs(node_info.uptime_secs) - ); + println!("Binary version: {}", node_info.version); + println!("Time since last restart: {:?}", node_info.uptime); Ok(()) } pub async fn network_info(addr: SocketAddr) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let response = client - .network_info(Request::new(NetworkInfoRequest {})) - .await?; - let network_info = response.get_ref(); + let client = RpcClient::new(&endpoint); + let network_info = client.network_info().await?; println!("Node's connections to the Network:"); println!(); println!("Connected peers:"); - for bytes in network_info.connected_peers.iter() { - let peer_id = PeerId::from_bytes(bytes)?; + for peer_id in network_info.connected_peers.iter() { println!("Peer: {peer_id}"); } println!(); println!("Node's listeners:"); for multiaddr_str in network_info.listeners.iter() { - let multiaddr = Multiaddr::from_str(multiaddr_str)?; - println!("Listener: {multiaddr}"); + println!("Listener: {multiaddr_str}"); } Ok(()) @@ -326,15 +316,12 @@ pub async fn transfers_events( pub async fn record_addresses(addr: SocketAddr) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let response = client - .record_addresses(Request::new(RecordAddressesRequest {})) - .await?; + let client = RpcClient::new(&endpoint); + let record_addresses = client.record_addresses().await?; println!("Records held by the node:"); - for bytes in response.get_ref().addresses.iter() { - let key = libp2p::kad::RecordKey::from(bytes.clone()); - println!("Key: {key:?}"); + for address in record_addresses.iter() { + println!("Key: {:?}", address.key); } Ok(()) @@ -342,47 +329,32 @@ pub async fn record_addresses(addr: SocketAddr) -> Result<()> { pub async fn gossipsub_subscribe(addr: SocketAddr, topic: String) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let _response = client - .subscribe_to_topic(Request::new(GossipsubSubscribeRequest { - topic: topic.clone(), - })) - .await?; + let client = RpcClient::new(&endpoint); + client.gossipsub_subscribe(&topic).await?; println!("Node successfully received the request to subscribe to topic '{topic}'"); Ok(()) } pub async fn gossipsub_unsubscribe(addr: SocketAddr, topic: String) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let _response = client - .unsubscribe_from_topic(Request::new(GossipsubUnsubscribeRequest { - topic: topic.clone(), - })) - .await?; + let client = RpcClient::new(&endpoint); + client.gossipsub_unsubscribe(&topic).await?; println!("Node successfully received the request to unsubscribe from topic '{topic}'"); Ok(()) } pub async fn gossipsub_publish(addr: SocketAddr, topic: String, msg: String) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let _response = client - .publish_on_topic(Request::new(GossipsubPublishRequest { - topic: topic.clone(), - msg: msg.into(), - })) - .await?; + let client = RpcClient::new(&endpoint); + client.gossipsub_publish(&topic, &msg).await?; println!("Node successfully received the request to publish on topic '{topic}'"); Ok(()) } pub async fn node_restart(addr: SocketAddr, delay_millis: u64) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let _response = client - .restart(Request::new(RestartRequest { delay_millis })) - .await?; + let client = RpcClient::new(&endpoint); + client.node_restart(delay_millis).await?; println!( "Node successfully received the request to restart in {:?}", Duration::from_millis(delay_millis) @@ -392,10 +364,8 @@ pub async fn node_restart(addr: SocketAddr, delay_millis: u64) -> Result<()> { pub async fn node_stop(addr: SocketAddr, delay_millis: u64) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let _response = client - .stop(Request::new(StopRequest { delay_millis })) - .await?; + let client = RpcClient::new(&endpoint); + client.node_stop(delay_millis).await?; println!( "Node successfully received the request to stop in {:?}", Duration::from_millis(delay_millis) @@ -405,10 +375,8 @@ pub async fn node_stop(addr: SocketAddr, delay_millis: u64) -> Result<()> { pub async fn node_update(addr: SocketAddr, delay_millis: u64) -> Result<()> { let endpoint = format!("https://{addr}"); - let mut client = SafeNodeClient::connect(endpoint).await?; - let _response = client - .update(Request::new(UpdateRequest { delay_millis })) - .await?; + let client = RpcClient::new(&endpoint); + client.node_update(delay_millis).await?; println!( "Node successfully received the request to try to update in {:?}", Duration::from_millis(delay_millis)