Skip to content

Commit

Permalink
Optimize code and fix some subtal bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Apr 13, 2024
1 parent d70770e commit fe626a8
Show file tree
Hide file tree
Showing 29 changed files with 490 additions and 318 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ jobs:
- name: List contents of the reports directory
run: ls -a reports
- name: Upload to codecov.io
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
directory: reports
fail_ci_if_error: false
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
resolver = "2"

[workspace.package]
version = "0.2.0-alpha.2"
version = "0.2.0"
edition = "2021"
license = "MPL-2.0"
repository = "https://github.com/al8n/memberlist"
Expand All @@ -28,8 +28,9 @@ rustdoc-args = ["--cfg", "docsrs"]

[workspace.dependencies]
auto_impl = "1"
atomic_refcell = "0.1"
agnostic-lite = { version = "0.3", features = ["time"] }
agnostic = "0.3"
agnostic = "0.3.5"
async-lock = "3"
async-channel = "2"
bytes = "1.5"
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ hostname = "0.3"

[dependencies]
auto_impl.workspace = true
atomic_refcell.workspace = true
agnostic-lite.workspace = true
async-channel.workspace = true
async-lock.workspace = true
Expand Down
30 changes: 14 additions & 16 deletions core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,22 @@ where

/// Returns the local node address
#[inline]
pub fn local_addr(&self) -> &<T::Resolver as AddressResolver>::Address {
pub fn local_address(&self) -> &<T::Resolver as AddressResolver>::Address {
self.inner.transport.local_address()
}

/// Returns a [`Node`] with the local id and the advertise address of local node.
#[inline]
pub fn advertise_node(&self) -> Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
Node::new(self.inner.id.clone(), self.inner.advertise.clone())
}

/// Returns the advertise address of local node.
#[inline]
pub fn advertise_address(&self) -> &<T::Resolver as AddressResolver>::ResolvedAddress {
&self.inner.advertise
}

/// Returns the keyring (only used for encryption) of the node
#[cfg(feature = "encryption")]
#[cfg_attr(docsrs, doc(cfg(feature = "encryption")))]
Expand All @@ -52,18 +64,6 @@ where
self.inner.transport.encryption_enabled()
}

/// Returns a [`Node`] with the local id and the advertise address of local node.
#[inline]
pub fn advertise_node(&self) -> Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
Node::new(self.inner.id.clone(), self.inner.advertise.clone())
}

/// Returns the advertise address of local node.
#[inline]
pub fn advertise_address(&self) -> &<T::Resolver as AddressResolver>::ResolvedAddress {
&self.inner.advertise
}

/// Returns the delegate, if any.
#[inline]
pub fn delegate(&self) -> Option<&D> {
Expand All @@ -74,14 +74,12 @@ where
#[inline]
pub async fn local_state(
&self,
) -> Arc<NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>> {
) -> Option<Arc<NodeState<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>> {
let nodes = self.inner.nodes.read().await;
// TODO: return an error
nodes
.node_map
.get(&self.inner.id)
.map(|&idx| nodes.nodes[idx].state.server.clone())
.unwrap()
}

/// Returns the node state of the given id. (if any).
Expand Down
58 changes: 37 additions & 21 deletions core/src/base.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::{HashMap, VecDeque},
sync::{
atomic::{AtomicBool, AtomicU32},
atomic::{AtomicBool, AtomicU32, AtomicUsize},
Arc,
},
};
Expand All @@ -10,6 +10,8 @@ use agnostic_lite::{AsyncSpawner, RuntimeLite};
use async_channel::{Receiver, Sender};
use async_lock::{Mutex, RwLock};

use atomic_refcell::AtomicRefCell;
use futures::{stream::FuturesUnordered, StreamExt};
use nodecraft::{resolver::AddressResolver, CheapClone, Node};

use super::{
Expand Down Expand Up @@ -82,6 +84,18 @@ where
pub(crate) suspicion: Option<Suspicion<T, D>>,
}

impl<T, D> core::fmt::Debug for Member<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
T: Transport,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Member")
.field("state", &self.state)
.finish()
}
}

impl<T, D> core::ops::Deref for Member<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
Expand Down Expand Up @@ -195,7 +209,7 @@ where
// platforms.
#[inline]
fn gen_index<R: rand::Rng + ?Sized>(rng: &mut R, ubound: usize) -> usize {
if ubound <= (core::u32::MAX as usize) {
if ubound <= (u32::MAX as usize) {
rng.gen_range(0..ubound as u32) as usize
} else {
rng.gen_range(0..ubound)
Expand Down Expand Up @@ -270,8 +284,10 @@ where
pub(crate) leave_broadcast_tx: Sender<()>,
pub(crate) leave_lock: Mutex<()>,
pub(crate) leave_broadcast_rx: Receiver<()>,
pub(crate) shutdown_lock:
Mutex<Vec<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>>,
pub(crate) handles: AtomicRefCell<
FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
>,
pub(crate) probe_index: AtomicUsize,
pub(crate) handoff_tx: Sender<()>,
pub(crate) handoff_rx: Receiver<()>,
pub(crate) queue: Mutex<MessageQueue<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>>,
Expand All @@ -294,11 +310,6 @@ where
return Ok(());
}

let mut mu = self.shutdown_lock.lock().await;
for h in core::mem::take(&mut *mu) {
let _ = h.await;
}

// Shut down the transport first, which should block until it's
// completely torn down. If we kill the memberlist-side handlers
// those I/O handlers might get stuck.
Expand All @@ -307,6 +318,9 @@ where
return Err(e);
}

let mut futs = core::mem::take(&mut *self.handles.borrow_mut());
while futs.next().await.is_some() {}

Ok(())
}
}
Expand Down Expand Up @@ -394,10 +408,6 @@ where
let num_nodes = hot.num_nodes.clone();
let broadcast = TransmitLimitedQueue::new(opts.retransmit_mult, num_nodes);

#[cfg(not(feature = "metrics"))]
let mut handles = Vec::with_capacity(7);
#[cfg(feature = "metrics")]
let mut handles = Vec::with_capacity(8);
let (shutdown_tx, shutdown_rx) = async_channel::bounded(1);
let this = Memberlist {
inner: Arc::new(MemberlistCore {
Expand All @@ -408,7 +418,8 @@ where
leave_broadcast_tx,
leave_lock: Mutex::new(()),
leave_broadcast_rx,
shutdown_lock: Mutex::new(Vec::new()),
probe_index: AtomicUsize::new(0),
handles: AtomicRefCell::new(FuturesUnordered::new()),
handoff_tx,
handoff_rx,
queue: Mutex::new(MessageQueue::new()),
Expand All @@ -422,13 +433,15 @@ where
delegate: delegate.map(Arc::new),
};

handles.push(this.stream_listener(shutdown_rx.clone()));
handles.push(this.packet_handler(shutdown_rx.clone()));
handles.push(this.packet_listener(shutdown_rx.clone()));
#[cfg(feature = "metrics")]
handles.push(this.check_broadcast_queue_depth(shutdown_rx.clone()));
{
let handles = this.inner.handles.borrow();
handles.push(this.stream_listener(shutdown_rx.clone()));
handles.push(this.packet_handler(shutdown_rx.clone()));
handles.push(this.packet_listener(shutdown_rx.clone()));
#[cfg(feature = "metrics")]
handles.push(this.check_broadcast_queue_depth(shutdown_rx.clone()));
}

*this.inner.shutdown_lock.lock().await = handles;
Ok((shutdown_rx, this.inner.advertise.cheap_clone(), this))
}
}
Expand Down Expand Up @@ -468,12 +481,15 @@ where
let this = self.clone();

<T::Runtime as RuntimeLite>::spawn(async move {
let tick = <T::Runtime as RuntimeLite>::interval(queue_check_interval);
futures::pin_mut!(tick);
loop {
futures::select! {
_ = shutdown_rx.recv().fuse() => {
tracing::info!("memberlist: broadcast queue checker exits");
return;
},
_ = <T::Runtime as RuntimeLite>::sleep(queue_check_interval).fuse() => {
_ = tick.next().fuse() => {
let numq = this.inner.broadcast.num_queued().await;
metrics::histogram!("memberlist.queue.broadcasts").record(numq as f64);
}
Expand Down
57 changes: 55 additions & 2 deletions core/src/base/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
future::Future,
marker::PhantomData,
sync::atomic::{AtomicUsize, Ordering},
sync::atomic::Ordering,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -103,7 +103,7 @@ where
.await;
assert_eq!(num, 1);

m.local_addr();
m.local_address();
m.local_state().await;
let id = m.local_id();
m.by_id(id).await.unwrap();
Expand Down Expand Up @@ -139,6 +139,59 @@ where
m.shutdown().await.unwrap();
}

/// Unit tests for create a `Memberlist` and shutdown cleanup.
pub async fn memberlist_shutdown_cleanup<T, F, R>(
t1: T::Options,
get_transport_opts: impl FnOnce(<T::Resolver as AddressResolver>::ResolvedAddress) -> F,
t1_opts: Options,
) where
T: Transport<Runtime = R>,
F: Future<Output = T::Options>,
R: RuntimeLite,
{
let m = Memberlist::<T, _>::new(t1, t1_opts.clone()).await.unwrap();
m.shutdown().await.unwrap();
R::sleep(Duration::from_millis(250)).await;

let addr = m.advertise_address().clone();
drop(m);
let topts = get_transport_opts(addr).await;
let _ = Memberlist::<T, _>::new(topts, t1_opts.clone())
.await
.unwrap();
}

/// Unit tests for create a `Memberlist` and shutdown cleanup.
pub async fn memberlist_shutdown_cleanup2<T, F, R>(
t1: T::Options,
t1_opts: Options,
t2: T::Options,
t2_opts: Options,
get_transport_opts: impl FnOnce(<T::Resolver as AddressResolver>::ResolvedAddress) -> F,
) where
T: Transport<Runtime = R>,
F: Future<Output = T::Options>,
R: RuntimeLite,
{
let m = Memberlist::<T, _>::new(t1, t1_opts.clone()).await.unwrap();
let m2 = Memberlist::<T, _>::new(t2, t2_opts.clone()).await.unwrap();
R::sleep(Duration::from_millis(250)).await;
m.join(
m2.advertise_node()
.map_address(MaybeResolvedAddress::resolved),
)
.await
.unwrap();
m.shutdown().await.unwrap();

let addr = m.advertise_address().clone();
drop(m);
let topts = get_transport_opts(addr).await;
let _ = Memberlist::<T, _>::new(topts, t1_opts.clone())
.await
.unwrap();
}

/// Unit tests for join a `Memberlist`.
pub async fn memberlist_join<T, R>(
t1: T::Options,
Expand Down
2 changes: 1 addition & 1 deletion core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
.dial_with_deadline(node.address(), Instant::now() + self.inner.opts.timeout)
.await
.map_err(Error::transport)?;
tracing::debug!(target = "memberlist", local_addr = %self.inner.id, peer_addr = %node, "initiating push/pull sync");
tracing::debug!(local_addr = %self.inner.id, peer_addr = %node, "memberlist: initiating push/pull sync");

#[cfg(feature = "metrics")]
{
Expand Down
3 changes: 2 additions & 1 deletion core/src/network/packet/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ where
loop {
futures::select! {
_ = shutdown_rx.recv().fuse() => {
tracing::debug!("memberlist: packet handler exits");
return;
}
_ = handoff_rx.recv().fuse() => while let Some(msg) = this.get_next_message().await {
Expand All @@ -31,7 +32,7 @@ where
Message::Dead(m) => this.handle_dead(msg.from, m).await,
Message::UserData(m) => this.handle_user(msg.from, m).await,
m => {
tracing::error!(target = "memberlist.packet", "message type ({}) not supported {} (packet handler)", m.kind(), msg.from);
tracing::error!("memberlist: message type ({}) not supported {} (packet handler)", m.kind(), msg.from);
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/network/packet/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ where
loop {
futures::select! {
_ = shutdown_rx.recv().fuse() => {
tracing::debug!("memberlist: packet listener exits");
return;
}
packet = packet_rx.recv().fuse() => {
Expand All @@ -55,7 +56,7 @@ where
},
Err(e) => {
if !this.inner.shutdown_tx.is_closed() {
tracing::error!(target = "memberlist.packet", "failed to receive packet: {}", e);
tracing::error!("memberlist.packet: failed to receive packet: {}", e);
}
// If we got an error, which means on the other side the transport has been closed,
// so we need to return and shutdown the packet listener
Expand Down Expand Up @@ -137,7 +138,7 @@ where
) {
// If node is provided, verify that it is for us
if p.target().id().ne(&self.inner.id) {
tracing::error!(target = "memberlist.packet", local=%self.inner.id, remote = %from, "got ping for unexpected node '{}'", p.target());
tracing::error!(local=%self.inner.id, remote = %from, "memberlist.packet: got ping for unexpected node '{}'", p.target());
return;
}

Expand All @@ -147,7 +148,7 @@ where
Ack::new(p.sequence_number())
};
if let Err(e) = self.send_msg(p.source().address(), msg.into()).await {
tracing::error!(target = "memberlist.packet", addr = %from, err = %e, "failed to send ack response");
tracing::error!(addr = %from, err = %e, "memberlist.packet: failed to send ack response");
}
}

Expand Down
Loading

0 comments on commit fe626a8

Please sign in to comment.