From e9250cdc3c5cb1e5501bd5a7e6ff22d554ebee77 Mon Sep 17 00:00:00 2001 From: Ash Manning Date: Mon, 1 Jul 2024 16:08:33 +0800 Subject: [PATCH] change default ZMQ port, bump version for release --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- app/cli.rs | 3 ++- app/rpc_server.rs | 7 ++++++- lib/node/mod.rs | 5 ++++- lib/node/net_task.rs | 30 ++++++++++++++---------------- 6 files changed, 31 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e1d3f7..1142480 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3839,7 +3839,7 @@ checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" [[package]] name = "plain_bitassets" -version = "0.9.0" +version = "0.9.1" dependencies = [ "addr", "anyhow", @@ -3888,7 +3888,7 @@ dependencies = [ [[package]] name = "plain_bitassets_app" -version = "0.9.0" +version = "0.9.1" dependencies = [ "anyhow", "async_zmq", @@ -3932,7 +3932,7 @@ dependencies = [ [[package]] name = "plain_bitassets_app_cli" -version = "0.9.0" +version = "0.9.1" dependencies = [ "anyhow", "bip300301", @@ -3947,7 +3947,7 @@ dependencies = [ [[package]] name = "plain_bitassets_app_rpc_api" -version = "0.9.0" +version = "0.9.1" dependencies = [ "bip300301", "fraction", diff --git a/Cargo.toml b/Cargo.toml index 2a89eb0..7a63cf4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ members = [ [workspace.package] authors = [ "Ash Manning " ] edition = "2021" -version = "0.9.0" +version = "0.9.1" [workspace.dependencies.bip300301] git = "https://github.com/Ash-L2L/bip300301.git" diff --git a/app/cli.rs b/app/cli.rs index 2446a46..9388364 100644 --- a/app/cli.rs +++ b/app/cli.rs @@ -36,7 +36,8 @@ const DEFAULT_RPC_ADDR: SocketAddr = ipv4_socket_addr([127, 0, 0, 1], 6000 + THIS_SIDECHAIN as u16); #[cfg(all(not(target_os = "windows"), feature = "zmq"))] -const DEFAULT_ZMQ_ADDR: SocketAddr = ipv4_socket_addr([127, 0, 0, 1], 28332); +const DEFAULT_ZMQ_ADDR: SocketAddr = + ipv4_socket_addr([127, 0, 0, 1], 28000 + THIS_SIDECHAIN as u16); /// Implement arg manually so that there is only a default if we can resolve /// the default data dir diff --git a/app/rpc_server.rs b/app/rpc_server.rs index 77157c0..b9d9a1a 100644 --- a/app/rpc_server.rs +++ b/app/rpc_server.rs @@ -31,15 +31,20 @@ fn custom_err(err_msg: impl Into) -> ErrorObject<'static> { } fn convert_app_err(err: app::Error) -> ErrorObject<'static> { - tracing::error!("{err}"); + let err = anyhow::anyhow!(err); + tracing::error!("{err:#}"); custom_err(err.to_string()) } fn convert_node_err(err: node::Error) -> ErrorObject<'static> { + let err = anyhow::anyhow!(err); + tracing::error!("{err:#}"); custom_err(err.to_string()) } fn convert_wallet_err(err: wallet::Error) -> ErrorObject<'static> { + let err = anyhow::anyhow!(err); + tracing::error!("{err:#}"); custom_err(err.to_string()) } diff --git a/lib/node/mod.rs b/lib/node/mod.rs index d9b2def..a4e8ce8 100644 --- a/lib/node/mod.rs +++ b/lib/node/mod.rs @@ -75,6 +75,9 @@ pub enum Error { Utreexo(String), #[error("Verify BMM error")] VerifyBmm(anyhow::Error), + #[cfg(all(not(target_os = "windows"), feature = "zmq"))] + #[error("ZMQ error")] + Zmq(#[from] async_zmq::Error), } /// Request any missing two way peg data up to the specified block hash. @@ -180,7 +183,7 @@ impl Node { }; let state = State::new(&env)?; #[cfg(all(not(target_os = "windows"), feature = "zmq"))] - let zmq_pub_handler = Arc::new(ZmqPubHandler::new(zmq_addr)); + let zmq_pub_handler = Arc::new(ZmqPubHandler::new(zmq_addr)?); let archive = Archive::new(&env)?; let mempool = MemPool::new(&env)?; let drivechain = bip300301::Drivechain::new( diff --git a/lib/node/net_task.rs b/lib/node/net_task.rs index 73b5315..1d6ce1d 100644 --- a/lib/node/net_task.rs +++ b/lib/node/net_task.rs @@ -13,7 +13,7 @@ use futures::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, oneshot, }, - stream, StreamExt, + stream, StreamExt, TryFutureExt, }; use heed::RwTxn; use thiserror::Error; @@ -73,24 +73,22 @@ pub(super) struct ZmqPubHandler { #[cfg(all(not(target_os = "windows"), feature = "zmq"))] impl ZmqPubHandler { // run the handler, obtaining a sender sink and the handler task - pub fn new(socket_addr: SocketAddr) -> Self { - use futures::SinkExt; - let (tx, mut rx) = mpsc::unbounded::>(); - let handle = tokio::task::spawn(async move { - let mut zmq_pub = - async_zmq::publish(&format!("tcp://{socket_addr}")) - .unwrap() - .bind() - .unwrap(); - - while let Some(msgs) = rx.next().await { - let () = zmq_pub.send(msgs.into()).await.unwrap(); - } + pub fn new(socket_addr: SocketAddr) -> Result { + let (tx, rx) = mpsc::unbounded::>(); + let zmq_pub = + async_zmq::publish(&format!("tcp://{socket_addr}"))?.bind()?; + let handle = tokio::task::spawn({ + rx.map(|msgs| Ok(msgs.into())) + .forward(zmq_pub) + .unwrap_or_else(|err| { + let err = anyhow::Error::from(err); + tracing::error!("{err:#}"); + }) }); - Self { + Ok(Self { tx, _handle: handle, - } + }) } }