Skip to content

Commit

Permalink
change default ZMQ port, bump version for release
Browse files Browse the repository at this point in the history
  • Loading branch information
Ash-L2L committed Jul 1, 2024
1 parent 4014b0d commit e9250cd
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 24 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ members = [
[workspace.package]
authors = [ "Ash Manning <[email protected]>" ]
edition = "2021"
version = "0.9.0"
version = "0.9.1"

[workspace.dependencies.bip300301]
git = "https://github.com/Ash-L2L/bip300301.git"
Expand Down
3 changes: 2 additions & 1 deletion app/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const DEFAULT_RPC_ADDR: SocketAddr =
ipv4_socket_addr([127, 0, 0, 1], 6000 + THIS_SIDECHAIN as u16);

#[cfg(all(not(target_os = "windows"), feature = "zmq"))]
const DEFAULT_ZMQ_ADDR: SocketAddr = ipv4_socket_addr([127, 0, 0, 1], 28332);
const DEFAULT_ZMQ_ADDR: SocketAddr =
ipv4_socket_addr([127, 0, 0, 1], 28000 + THIS_SIDECHAIN as u16);

/// Implement arg manually so that there is only a default if we can resolve
/// the default data dir
Expand Down
7 changes: 6 additions & 1 deletion app/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,20 @@ fn custom_err(err_msg: impl Into<String>) -> ErrorObject<'static> {
}

fn convert_app_err(err: app::Error) -> ErrorObject<'static> {
tracing::error!("{err}");
let err = anyhow::anyhow!(err);
tracing::error!("{err:#}");
custom_err(err.to_string())
}

fn convert_node_err(err: node::Error) -> ErrorObject<'static> {
let err = anyhow::anyhow!(err);
tracing::error!("{err:#}");
custom_err(err.to_string())
}

fn convert_wallet_err(err: wallet::Error) -> ErrorObject<'static> {
let err = anyhow::anyhow!(err);
tracing::error!("{err:#}");
custom_err(err.to_string())
}

Expand Down
5 changes: 4 additions & 1 deletion lib/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum Error {
Utreexo(String),
#[error("Verify BMM error")]
VerifyBmm(anyhow::Error),
#[cfg(all(not(target_os = "windows"), feature = "zmq"))]
#[error("ZMQ error")]
Zmq(#[from] async_zmq::Error),
}

/// Request any missing two way peg data up to the specified block hash.
Expand Down Expand Up @@ -180,7 +183,7 @@ impl Node {
};
let state = State::new(&env)?;
#[cfg(all(not(target_os = "windows"), feature = "zmq"))]
let zmq_pub_handler = Arc::new(ZmqPubHandler::new(zmq_addr));
let zmq_pub_handler = Arc::new(ZmqPubHandler::new(zmq_addr)?);
let archive = Archive::new(&env)?;
let mempool = MemPool::new(&env)?;
let drivechain = bip300301::Drivechain::new(
Expand Down
30 changes: 14 additions & 16 deletions lib/node/net_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot,
},
stream, StreamExt,
stream, StreamExt, TryFutureExt,
};
use heed::RwTxn;
use thiserror::Error;
Expand Down Expand Up @@ -73,24 +73,22 @@ pub(super) struct ZmqPubHandler {
#[cfg(all(not(target_os = "windows"), feature = "zmq"))]
impl ZmqPubHandler {
// run the handler, obtaining a sender sink and the handler task
pub fn new(socket_addr: SocketAddr) -> Self {
use futures::SinkExt;
let (tx, mut rx) = mpsc::unbounded::<Vec<async_zmq::Message>>();
let handle = tokio::task::spawn(async move {
let mut zmq_pub =
async_zmq::publish(&format!("tcp://{socket_addr}"))
.unwrap()
.bind()
.unwrap();

while let Some(msgs) = rx.next().await {
let () = zmq_pub.send(msgs.into()).await.unwrap();
}
pub fn new(socket_addr: SocketAddr) -> Result<Self, async_zmq::Error> {
let (tx, rx) = mpsc::unbounded::<Vec<async_zmq::Message>>();
let zmq_pub =
async_zmq::publish(&format!("tcp://{socket_addr}"))?.bind()?;
let handle = tokio::task::spawn({
rx.map(|msgs| Ok(msgs.into()))
.forward(zmq_pub)
.unwrap_or_else(|err| {
let err = anyhow::Error::from(err);
tracing::error!("{err:#}");
})
});
Self {
Ok(Self {
tx,
_handle: handle,
}
})
}
}

Expand Down

0 comments on commit e9250cd

Please sign in to comment.