Skip to content

Commit

Permalink
release resources on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Dec 23, 2024
1 parent fa5f93f commit b0ee463
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 25 deletions.
2 changes: 1 addition & 1 deletion transports/net/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "memberlist-net"
version = "0.3.0"
version = "0.3.1"
edition.workspace = true
license.workspace = true
repository.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions transports/net/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ where
/// Connection kind.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
#[non_exhaustive]
pub enum ConnectionKind {
/// Promised connection, e.g. TCP, QUIC.
Promised,
Expand Down Expand Up @@ -167,6 +168,7 @@ impl ConnectionKind {
/// Connection error kind.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(u8)]
#[non_exhaustive]
pub enum ConnectionErrorKind {
/// Failed to accept a connection.
Accept,
Expand Down Expand Up @@ -278,4 +280,12 @@ impl ConnectionError {
error: err,
}
}

pub(super) fn packet_write_on_transport_shutdown(err: std::io::Error) -> Self {
Self {
kind: ConnectionKind::Packet,
error_kind: ConnectionErrorKind::Close,
error: err,
}
}
}
27 changes: 19 additions & 8 deletions transports/net/src/io/send_by_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,13 +536,24 @@ where
addr: &A::ResolvedAddress,
buf: &[u8],
) -> Result<usize, NetTransportError<A, W>> {
self
.next_socket(addr)
.send_to(buf, addr)
.await
.inspect(|num| {
tracing::trace!(remote=%addr, total_bytes = %num, sent=?buf, "memberlist_net.packet");
})
.map_err(|e| ConnectionError::packet_write(e).into())
match self.next_socket(addr) {
Some(skt) => skt
.send_to(buf, addr)
.await
.inspect(|num| {
tracing::trace!(remote=%addr, total_bytes = %num, sent=?buf, "memberlist_net.packet");
})
.map_err(|e| ConnectionError::packet_write(e).into()),
None => {
tracing::error!("memberlist_net.packet: transport is being shutdown");

Check warning on line 548 in transports/net/src/io/send_by_packet.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/io/send_by_packet.rs#L547-L548

Added lines #L547 - L548 were not covered by tests
Err(
ConnectionError::packet_write_on_transport_shutdown(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"transport is being shutdown",

Check warning on line 552 in transports/net/src/io/send_by_packet.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/io/send_by_packet.rs#L550-L552

Added lines #L550 - L552 were not covered by tests
))
.into(),

Check warning on line 554 in transports/net/src/io/send_by_packet.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/io/send_by_packet.rs#L554

Added line #L554 was not covered by tests
)
}
}
}
}
75 changes: 59 additions & 16 deletions transports/net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ where
local_addr: A::Address,
packet_rx: PacketSubscriber<I, A::ResolvedAddress>,
stream_rx: StreamSubscriber<A::ResolvedAddress, S::Stream>,
num_v4_sockets: usize,
v4_round_robin: AtomicUsize,
v4_sockets: SmallVec<Arc<<<A::Runtime as Runtime>::Net as Net>::UdpSocket>>,
v4_sockets: AtomicRefCell<SmallVec<Arc<<<A::Runtime as Runtime>::Net as Net>::UdpSocket>>>,
num_v6_sockets: usize,
v6_round_robin: AtomicUsize,
v6_sockets: SmallVec<Arc<<<A::Runtime as Runtime>::Net as Net>::UdpSocket>>,
v6_sockets: AtomicRefCell<SmallVec<Arc<<<A::Runtime as Runtime>::Net as Net>::UdpSocket>>>,
stream_layer: Arc<S>,
#[cfg(feature = "encryption")]
encryptor: Option<SecretKeyring>,
Expand Down Expand Up @@ -367,9 +369,11 @@ where
packet_rx,
stream_rx,
handles: AtomicRefCell::new(handles),
v4_sockets: v4_sockets.into_iter().map(|(ln, _)| ln).collect(),
num_v4_sockets: v4_sockets.len(),
v4_sockets: AtomicRefCell::new(v4_sockets.into_iter().map(|(ln, _)| ln).collect()),
v4_round_robin: AtomicUsize::new(0),
v6_sockets: v6_sockets.into_iter().map(|(ln, _)| ln).collect(),
num_v6_sockets: v6_sockets.len(),
v6_sockets: AtomicRefCell::new(v6_sockets.into_iter().map(|(ln, _)| ln).collect()),
v6_round_robin: AtomicUsize::new(0),
stream_layer,
#[cfg(feature = "encryption")]
Expand All @@ -383,23 +387,47 @@ where
fn next_socket(
&self,
addr: &A::ResolvedAddress,
) -> &<<A::Runtime as Runtime>::Net as Net>::UdpSocket {
if addr.is_ipv4() {
) -> Option<Arc<<<A::Runtime as Runtime>::Net as Net>::UdpSocket>> {
enum Kind {
V4(usize),
V6(usize),
}

let kind = if addr.is_ipv4() {
// if there's no v4 sockets, we assume remote addr can accept both v4 and v6
// give a try on v6
if self.v4_sockets.is_empty() {
let idx = self.v6_round_robin.fetch_add(1, Ordering::AcqRel) % self.v6_sockets.len();
&self.v6_sockets[idx]
if self.num_v4_sockets == 0 {
let idx = self.v6_round_robin.fetch_add(1, Ordering::AcqRel) % self.num_v6_sockets;
Kind::V6(idx)

Check warning on line 401 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L400-L401

Added lines #L400 - L401 were not covered by tests
} else {
let idx = self.v4_round_robin.fetch_add(1, Ordering::AcqRel) % self.v4_sockets.len();
&self.v4_sockets[idx]
let idx = self.v4_round_robin.fetch_add(1, Ordering::AcqRel) % self.num_v4_sockets;
Kind::V4(idx)
}
} else if self.v6_sockets.is_empty() {
let idx = self.v4_round_robin.fetch_add(1, Ordering::AcqRel) % self.v4_sockets.len();
&self.v4_sockets[idx]
} else if self.num_v6_sockets == 0 {
let idx = self.v4_round_robin.fetch_add(1, Ordering::AcqRel) % self.num_v4_sockets;
Kind::V4(idx)

Check warning on line 408 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L407-L408

Added lines #L407 - L408 were not covered by tests
} else {
let idx = self.v6_round_robin.fetch_add(1, Ordering::AcqRel) % self.v6_sockets.len();
&self.v6_sockets[idx]
let idx = self.v6_round_robin.fetch_add(1, Ordering::AcqRel) % self.num_v6_sockets;
Kind::V6(idx)
};

// if we failed to borrow, it means that this transport is being shut down.

match kind {
Kind::V4(idx) => {
if let Ok(sockets) = self.v4_sockets.try_borrow() {
Some(sockets[idx].clone())
} else {
None

Check warning on line 421 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L421

Added line #L421 was not covered by tests
}
}
Kind::V6(idx) => {
if let Ok(sockets) = self.v6_sockets.try_borrow() {
Some(sockets[idx].clone())
} else {
None

Check warning on line 428 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L428

Added line #L428 was not covered by tests
}
}
}
}
}
Expand Down Expand Up @@ -666,6 +694,21 @@ where
return Ok(());
}

// clear all udp sockets
loop {

Check warning on line 698 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L698

Added line #L698 was not covered by tests
if let Ok(mut s) = self.v4_sockets.try_borrow_mut() {
s.clear();
break;

Check warning on line 701 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L701

Added line #L701 was not covered by tests
}
}

loop {

Check warning on line 705 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L705

Added line #L705 was not covered by tests
if let Ok(mut s) = self.v6_sockets.try_borrow_mut() {
s.clear();
break;

Check warning on line 708 in transports/net/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

transports/net/src/lib.rs#L708

Added line #L708 was not covered by tests
}
}

let mut handles = core::mem::take(&mut *self.handles.borrow_mut());
while handles.next().await.is_some() {}
Ok(())
Expand Down

0 comments on commit b0ee463

Please sign in to comment.