diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..cfd0fa0f --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,13 @@ +version: 2 +updates: + - package-ecosystem: "cargo" + directory: "/" + schedule: + interval: "weekly" + open-pull-requests-limit: 50 + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + # Check for updates to GitHub Actions every weekday + interval: "daily" \ No newline at end of file diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index ac68ebee..cfd0c67e 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -104,7 +104,7 @@ jobs: features: "test,tokio,s2n,compression,metrics" name: "quic-s2n" steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install latest nightly uses: actions-rs/toolchain@v1 with: @@ -129,7 +129,7 @@ jobs: needs: coverage-report runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Download all coverage reports uses: actions/download-artifact@v3 with: @@ -137,7 +137,7 @@ jobs: - name: List contents of the reports directory run: ls -a reports - name: Upload to codecov.io - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 with: directory: reports fail_ci_if_error: false diff --git a/.github/workflows/net.yml b/.github/workflows/net.yml index 1c087056..282e8306 100644 --- a/.github/workflows/net.yml +++ b/.github/workflows/net.yml @@ -34,8 +34,12 @@ jobs: # - windows-latest runtime: [tokio, async-std, smol] stream_layer: [tls, native-tls, tcp] + # TODO: remove this exlucde when figuring out why native-tls with smol leads to a user_data* unit tests failure + exclude: + - runtime: smol + stream_layer: native-tls steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: | diff --git a/.github/workflows/quinn.yml b/.github/workflows/quinn.yml index 16eba63f..5b4960e6 100644 --- a/.github/workflows/quinn.yml +++ b/.github/workflows/quinn.yml @@ -34,7 +34,7 @@ jobs: # - windows-latest runtime: [tokio, async-std, smol] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: | diff --git a/.github/workflows/s2n.yml b/.github/workflows/s2n.yml index ff11e191..cca1b771 100644 --- a/.github/workflows/s2n.yml +++ b/.github/workflows/s2n.yml @@ -33,7 +33,7 @@ jobs: # - macos-latest, # - windows-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Install Rust run: | diff --git a/Cargo.toml b/Cargo.toml index fb4106a6..21fbc7c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ license = "MPL-2.0" repository = "https://github.com/al8n/memberlist" homepage = "https://github.com/al8n/memberlist" readme = "README.md" -rust-version = "1.75.0" +rust-version = "1.81.0" keywords = ["swim", "gossip", "service-discovery"] categories = ["network-programming", "asynchronous"] @@ -29,36 +29,44 @@ rustdoc-args = ["--cfg", "docsrs"] auto_impl = "1" atomic_refcell = "0.1" agnostic-lite = { version = "0.3", features = ["time"] } -agnostic = "0.3.5" +agnostic = "0.4" +# agnostic-lite = { version = "0.3", features = ["time"], path = "../agnostic/lite" } +# agnostic = { version = "0.4", path = "../agnostic/agnostic" } async-lock = "3" async-channel = "2" -bytes = "1.5" +bytes = "1" byteorder = "1" -derive_more = "0.99" +derive_more = { version = "1", features = ["full"] } futures = "0.3" indexmap = "2" local-ip-address = "0.6" metrics = "0.22" -nodecraft = { version = "0.3", features = [ +nodecraft = { version = "0.4", features = [ "transformable", "async", "resolver", "agnostic", ] } +# nodecraft = { version = "0.4", path = "../nodecraft", features = [ +# "transformable", +# "async", +# "resolver", +# "agnostic", +# ] } paste = "1" pin-project = "1" scopeguard = "1" serde = { version = "1", features = ["derive", "rc"] } humantime-serde = "1" smallvec = "1" -smallvec-wrapper = { version = "0.1", features = ["const_new", "either"] } -smol_str = "0.2" -transformable = { version = "0.1.6", features = ["smol_str", "bytes"] } -thiserror = "1" +smallvec-wrapper = { version = "0.2", features = ["const_new", "either"] } +smol_str = "0.3" +transformable = { version = "0.2", features = ["smol_str03", "bytes1"] } +thiserror = "2" tracing = "0.1" viewit = "0.1.5" -memberlist-core = { version = "0.2", path = "core", default-features = false } -memberlist-net = { version = "0.2", path = "transports/net", default-features = false } -memberlist-types = { version = "0.2", path = "types", default-features = false } -memberlist-quic = { version = "0.2", path = "transports/quic", default-features = false } +memberlist-core = { version = "0.3", path = "core", default-features = false } +memberlist-net = { version = "0.3", path = "transports/net", default-features = false } +memberlist-types = { version = "0.3", path = "types", default-features = false } +memberlist-quic = { version = "0.3", path = "transports/quic", default-features = false } diff --git a/README-zh_CN.md b/README-zh_CN.md index 12dfc75f..e1ee0bdc 100644 --- a/README-zh_CN.md +++ b/README-zh_CN.md @@ -127,7 +127,7 @@ For details on all of these extensions, please read Hashicorp's paper ["Lifeguar ```toml [dependencies] -memberlist = "0.1" +memberlist = "0.3" ``` ## Q & A diff --git a/README.md b/README.md index 199b1189..b6189e51 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,7 @@ For details on all of these extensions, please read Hashicorp's paper ["Lifeguar ```toml [dependencies] -memberlist = "0.1" +memberlist = "0.3" ``` ## Q & A diff --git a/core/Cargo.toml b/core/Cargo.toml index c006577c..927240b4 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "memberlist-core" -version = "0.2.1" +version = "0.3.0" edition.workspace = true license.workspace = true repository.workspace = true @@ -56,7 +56,7 @@ derive_more.workspace = true either = "1" futures.workspace = true nodecraft.workspace = true -transformable = { workspace = true, features = ["bytes", "smol_str"] } +transformable = { workspace = true, features = ["bytes1", "smol_str03"] } paste.workspace = true parking_lot = "0.12" pin-project.workspace = true diff --git a/core/src/base/tests.rs b/core/src/base/tests.rs index f64329b5..a0d6763e 100644 --- a/core/src/base/tests.rs +++ b/core/src/base/tests.rs @@ -336,7 +336,7 @@ impl CustomMergeDelegate { impl MergeDelegate for CustomMergeDelegate where - I: Id, + I: Id + Send + Sync + 'static, A: CheapClone + Send + Sync + 'static, { type Id = I; @@ -424,7 +424,7 @@ impl CustomAliveDelegate { impl AliveDelegate for CustomAliveDelegate where - I: Id, + I: Id + Send + Sync + 'static, A: CheapClone + Send + Sync + 'static, { type Id = I; @@ -957,7 +957,7 @@ impl CustomConflictDelegate { impl ConflictDelegate for CustomConflictDelegate where - I: Id, + I: Id + Send + Sync + 'static, A: CheapClone + Send + Sync + 'static, { type Id = I; @@ -1038,7 +1038,7 @@ impl CustomPingDelegate { impl PingDelegate for CustomPingDelegate where - I: Id, + I: Id + Send + Sync + 'static, A: CheapClone + Send + Sync + 'static, { type Id = I; diff --git a/core/src/delegate.rs b/core/src/delegate.rs index 68052946..2e35a632 100644 --- a/core/src/delegate.rs +++ b/core/src/delegate.rs @@ -121,7 +121,11 @@ impl VoidDelegate { } } -impl AliveDelegate for VoidDelegate { +impl AliveDelegate for VoidDelegate +where + I: Id + Send + Sync + 'static, + A: CheapClone + Send + Sync + 'static, +{ type Error = VoidDelegateError; type Id = I; type Address = A; @@ -134,7 +138,11 @@ impl AliveDelegate for VoidDelegat } } -impl MergeDelegate for VoidDelegate { +impl MergeDelegate for VoidDelegate +where + I: Id + Send + Sync + 'static, + A: CheapClone + Send + Sync + 'static, +{ type Error = VoidDelegateError; type Id = I; type Address = A; @@ -147,7 +155,11 @@ impl MergeDelegate for VoidDelegat } } -impl ConflictDelegate for VoidDelegate { +impl ConflictDelegate for VoidDelegate +where + I: Id + Send + Sync + 'static, + A: CheapClone + Send + Sync + 'static, +{ type Id = I; type Address = A; @@ -159,7 +171,11 @@ impl ConflictDelegate for VoidDele } } -impl PingDelegate for VoidDelegate { +impl PingDelegate for VoidDelegate +where + I: Id + Send + Sync + 'static, + A: CheapClone + Send + Sync + 'static, +{ type Id = I; type Address = A; @@ -180,7 +196,11 @@ impl PingDelegate for VoidDelegate } } -impl EventDelegate for VoidDelegate { +impl EventDelegate for VoidDelegate +where + I: Id + Send + Sync + 'static, + A: CheapClone + Send + Sync + 'static, +{ type Id = I; type Address = A; @@ -191,7 +211,11 @@ impl EventDelegate for VoidDelegat async fn notify_update(&self, _node: Arc>) {} } -impl NodeDelegate for VoidDelegate { +impl NodeDelegate for VoidDelegate +where + I: Id + Send + Sync + 'static, + A: CheapClone + Send + Sync + 'static, +{ async fn node_meta(&self, _limit: usize) -> Meta { Meta::empty() } @@ -217,7 +241,11 @@ impl NodeDelegate for VoidDelegate async fn merge_remote_state(&self, _buf: Bytes, _join: bool) {} } -impl Delegate for VoidDelegate { +impl Delegate for VoidDelegate +where + I: Id + Send + Sync + 'static, + A: CheapClone + Send + Sync + 'static, +{ type Id = I; type Address = A; } diff --git a/core/src/delegate/composite.rs b/core/src/delegate/composite.rs index 636ef6c8..b42b9d94 100644 --- a/core/src/delegate/composite.rs +++ b/core/src/delegate/composite.rs @@ -181,7 +181,7 @@ impl CompositeDelegate AliveDelegate for CompositeDelegate where - I: Id, + I: Id + Send + Sync + 'static, Address: CheapClone + Send + Sync + 'static, A: AliveDelegate, C: ConflictDelegate, @@ -204,7 +204,7 @@ where impl MergeDelegate for CompositeDelegate where - I: Id, + I: Id + Send + Sync + 'static, Address: CheapClone + Send + Sync + 'static, A: AliveDelegate, C: ConflictDelegate, @@ -228,7 +228,7 @@ where impl ConflictDelegate for CompositeDelegate where - I: Id, + I: Id + Send + Sync + 'static, Address: CheapClone + Send + Sync + 'static, A: AliveDelegate, C: ConflictDelegate, @@ -254,7 +254,7 @@ where impl PingDelegate for CompositeDelegate where - I: Id, + I: Id + Send + Sync + 'static, Address: CheapClone + Send + Sync + 'static, A: AliveDelegate, C: ConflictDelegate, @@ -289,7 +289,7 @@ where impl EventDelegate for CompositeDelegate where - I: Id, + I: Id + Send + Sync + 'static, Address: CheapClone + Send + Sync + 'static, A: AliveDelegate, C: ConflictDelegate, @@ -317,7 +317,7 @@ where impl NodeDelegate for CompositeDelegate where - I: Id, + I: Id + Send + Sync + 'static, Address: CheapClone + Send + Sync + 'static, A: AliveDelegate, C: ConflictDelegate, @@ -360,7 +360,7 @@ where impl Delegate for CompositeDelegate where - I: Id, + I: Id + Send + Sync + 'static, Address: CheapClone + Send + Sync + 'static, A: AliveDelegate, C: ConflictDelegate, diff --git a/core/src/delegate/event.rs b/core/src/delegate/event.rs index 9bc27461..45e1dcbf 100644 --- a/core/src/delegate/event.rs +++ b/core/src/delegate/event.rs @@ -145,7 +145,7 @@ impl SubscribleEventDelegate { impl EventDelegate for SubscribleEventDelegate where - I: Id, + I: Id + Send + Sync + 'static, A: CheapClone + Send + Sync + 'static, { type Id = I; diff --git a/core/src/delegate/mock.rs b/core/src/delegate/mock.rs index 9e52b535..b405748d 100644 --- a/core/src/delegate/mock.rs +++ b/core/src/delegate/mock.rs @@ -88,8 +88,9 @@ impl MockDelegate { } } -impl NodeDelegate for MockDelegate +impl NodeDelegate for MockDelegate where + I: Send + Sync + 'static, A: CheapClone + Send + Sync + 'static, { async fn node_meta(&self, _limit: usize) -> Meta { diff --git a/core/src/queue.rs b/core/src/queue.rs index c38e1c35..97435ee8 100644 --- a/core/src/queue.rs +++ b/core/src/queue.rs @@ -437,8 +437,8 @@ mod tests { impl crate::transport::Wire for DummyWire where - I: Transformable, - A: Transformable, + I: Transformable + Send + Sync + 'static, + A: Transformable + Send + Sync + 'static, { type Error = std::io::Error; type Address = A; @@ -642,9 +642,9 @@ mod tests { let dump = q.ordered_view(true).await; assert_eq!(dump.len(), 3); - assert_eq!(dump[0].broadcast.node.as_ref(), "test"); - assert_eq!(dump[1].broadcast.node.as_ref(), "foo"); - assert_eq!(dump[2].broadcast.node.as_ref(), "bar"); + assert_eq!(dump[0].broadcast.node.as_str(), "test"); + assert_eq!(dump[1].broadcast.node.as_str(), "foo"); + assert_eq!(dump[2].broadcast.node.as_str(), "bar"); // Should invalidate previous message q.queue_broadcast( @@ -661,9 +661,9 @@ mod tests { let dump = q.ordered_view(true).await; assert_eq!(dump.len(), 3); - assert_eq!(dump[0].broadcast.node.as_ref(), "foo"); - assert_eq!(dump[1].broadcast.node.as_ref(), "bar"); - assert_eq!(dump[2].broadcast.node.as_ref(), "test"); + assert_eq!(dump[0].broadcast.node.as_str(), "foo"); + assert_eq!(dump[1].broadcast.node.as_str(), "bar"); + assert_eq!(dump[2].broadcast.node.as_str(), "test"); } #[tokio::test] @@ -913,8 +913,8 @@ mod tests { } let dump = q.ordered_view(true).await; - assert_eq!(dump[0].broadcast.id().unwrap().as_ref(), "bar"); - assert_eq!(dump[1].broadcast.id().unwrap().as_ref(), "baz"); + assert_eq!(dump[0].broadcast.id().unwrap().as_str(), "bar"); + assert_eq!(dump[1].broadcast.id().unwrap().as_str(), "baz"); } #[tokio::test] diff --git a/core/src/state.rs b/core/src/state.rs index d73d3340..7e862332 100644 --- a/core/src/state.rs +++ b/core/src/state.rs @@ -136,6 +136,7 @@ impl Memberlist where D: Delegate::ResolvedAddress>, T: Transport, + T::Id: Send + Sync + 'static, { /// Does a complete state exchange with a specific node. pub(crate) async fn push_pull_node( @@ -574,7 +575,11 @@ enum StateMessage { Suspect(Suspect), } -impl StateMessage { +impl StateMessage +where + T: Transport, + T::Id: Send + Sync + 'static, +{ async fn run(self, s: &Memberlist) where D: Delegate::ResolvedAddress>, diff --git a/core/src/state/tests.rs b/core/src/state/tests.rs index aea26951..592d2304 100644 --- a/core/src/state/tests.rs +++ b/core/src/state/tests.rs @@ -1077,7 +1077,7 @@ impl ToggledEventDelegate { impl EventDelegate for ToggledEventDelegate where - I: Id, + I: Id + Send + Sync + 'static, A: CheapClone + Send + Sync + 'static, { type Id = I; diff --git a/core/src/suspicion.rs b/core/src/suspicion.rs index 023506c1..82b24622 100644 --- a/core/src/suspicion.rs +++ b/core/src/suspicion.rs @@ -46,6 +46,7 @@ where impl Suspicioner where T: Transport, + T::Id: Send + Sync + 'static, D: Delegate::ResolvedAddress>, { pub(crate) fn new( @@ -137,6 +138,7 @@ where impl Suspicion where T: Transport, + T::Id: Send + Sync + 'static, D: Delegate::ResolvedAddress>, { /// Returns a after_func started with the max time, and that will drive @@ -178,6 +180,7 @@ where impl Suspicion where T: Transport, + T::Id: Send + Sync + 'static, D: Delegate::ResolvedAddress>, { /// Confirm registers that a possibly new peer has also determined the given diff --git a/core/src/transport.rs b/core/src/transport.rs index 6b9ef275..801bdcf1 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -216,7 +216,6 @@ pub trait TransportError: std::error::Error + Send + Sync + 'static { /// 1. Fail to send to a remote node /// 2. Fail to receive from a remote node. /// 3. Fail to dial a remote node. - /// ... /// /// The above errors can be treated as remote failures. fn is_remote_failure(&self) -> bool; @@ -274,7 +273,7 @@ pub trait Transport: Sized + Send + Sync + 'static { /// The error type for the transport type Error: TransportError; /// The id type used to identify nodes - type Id: Id; + type Id: Id + Send + Sync + 'static; /// The address resolver used to resolve addresses type Resolver: AddressResolver; /// The promised stream used to send and receive messages @@ -367,7 +366,7 @@ pub trait Transport: Sized + Send + Sync + 'static { /// /// - number of bytes sent /// - a time stamp that's as close as possible to when the packet - /// was transmitted to help make accurate RTT measurements during probes. + /// was transmitted to help make accurate RTT measurements during probes. fn send_packet( &self, addr: &::ResolvedAddress, @@ -381,7 +380,7 @@ pub trait Transport: Sized + Send + Sync + 'static { /// /// - number of bytes sent /// - a time stamp that's as close as possible to when the packet - /// was transmitted to help make accurate RTT measurements during probes. + /// was transmitted to help make accurate RTT measurements during probes. fn send_packets( &self, addr: &::ResolvedAddress, diff --git a/core/src/transport/lpe.rs b/core/src/transport/lpe.rs index 31a9827f..7f328e69 100644 --- a/core/src/transport/lpe.rs +++ b/core/src/transport/lpe.rs @@ -47,8 +47,10 @@ impl core::fmt::Display for Lpe { impl Wire for Lpe where - I: Transformable + core::fmt::Debug, - A: Transformable + core::fmt::Debug, + I: Transformable + core::fmt::Debug + Send + Sync + 'static, + I::Error: Send + Sync + 'static, + A: Transformable + core::fmt::Debug + Send + Sync + 'static, + A::Error: Send + Sync + 'static, { type Error = MessageTransformError; type Id = I; diff --git a/core/src/transport/tests/unimplemented.rs b/core/src/transport/tests/unimplemented.rs index e7b46246..36f99f7c 100644 --- a/core/src/transport/tests/unimplemented.rs +++ b/core/src/transport/tests/unimplemented.rs @@ -51,8 +51,9 @@ pub struct UnimplementedTransport(PhantomData<(I, A, W, R)>); impl Transport for UnimplementedTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, W: Wire, R: RuntimeLite, { diff --git a/memberlist/Cargo.toml b/memberlist/Cargo.toml index 9d6247c1..d098129a 100644 --- a/memberlist/Cargo.toml +++ b/memberlist/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "memberlist" -version = "0.2.1" +version = "0.3.0" edition.workspace = true license.workspace = true repository.workspace = true diff --git a/transports/net/Cargo.toml b/transports/net/Cargo.toml index 58709f4f..37c2d3c2 100644 --- a/transports/net/Cargo.toml +++ b/transports/net/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "memberlist-net" -version = "0.2.2" +version = "0.3.0" edition.workspace = true license.workspace = true repository.workspace = true diff --git a/transports/net/src/compressor.rs b/transports/net/src/compressor.rs index fd4a0f4d..f994667c 100644 --- a/transports/net/src/compressor.rs +++ b/transports/net/src/compressor.rs @@ -69,7 +69,7 @@ const LZW_LIT_WIDTH: u8 = 8; #[derive(Debug, thiserror::Error)] pub enum CompressError { /// LZW compress errors - #[error("{0}")] + #[error(transparent)] Lzw(#[from] weezl::LzwError), } @@ -77,7 +77,7 @@ pub enum CompressError { #[derive(Debug, thiserror::Error)] pub enum DecompressError { /// LZW decompress errors - #[error("{0}")] + #[error(transparent)] Lzw(#[from] weezl::LzwError), } diff --git a/transports/net/src/error.rs b/transports/net/src/error.rs index 3ed9713e..688c97d7 100644 --- a/transports/net/src/error.rs +++ b/transports/net/src/error.rs @@ -68,10 +68,10 @@ pub enum NetTransportError { /// Returns when fail to compress/decompress message. #[cfg(feature = "compression")] #[cfg_attr(docsrs, doc(cfg(feature = "compression")))] - #[error("{0}")] + #[error(transparent)] Compressor(#[from] super::compressor::CompressorError), /// Returns when there is a security error. e.g. encryption/decryption error. - #[error("{0}")] + #[error(transparent)] #[cfg(feature = "encryption")] #[cfg_attr(docsrs, doc(cfg(feature = "encryption")))] Security(#[from] super::security::SecurityError), @@ -118,7 +118,12 @@ impl core::fmt::Debug for NetTransportError { } } -impl TransportError for NetTransportError { +impl TransportError for NetTransportError +where + A: AddressResolver, + A::Address: Send + Sync + 'static, + W: Wire, +{ fn is_remote_failure(&self) -> bool { if let Self::Connection(e) = self { e.is_remote_failure() diff --git a/transports/net/src/io/read_from_promised.rs b/transports/net/src/io/read_from_promised.rs index 83c6c718..4ecaa4dd 100644 --- a/transports/net/src/io/read_from_promised.rs +++ b/transports/net/src/io/read_from_promised.rs @@ -5,8 +5,9 @@ const MAX_INLINED_BYTES: usize = 64; impl NetTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: Runtime, diff --git a/transports/net/src/io/send_by_packet.rs b/transports/net/src/io/send_by_packet.rs index ada8df5c..6c8d1481 100644 --- a/transports/net/src/io/send_by_packet.rs +++ b/transports/net/src/io/send_by_packet.rs @@ -5,8 +5,9 @@ use super::*; impl NetTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: Runtime, @@ -539,9 +540,8 @@ where .next_socket(addr) .send_to(buf, addr) .await - .map(|num| { + .inspect(|num| { tracing::trace!(remote=%addr, total_bytes = %num, sent=?buf, "memberlist_net.packet"); - num }) .map_err(|e| ConnectionError::packet_write(e).into()) } diff --git a/transports/net/src/io/send_by_promised.rs b/transports/net/src/io/send_by_promised.rs index 84044b08..5bf51097 100644 --- a/transports/net/src/io/send_by_promised.rs +++ b/transports/net/src/io/send_by_promised.rs @@ -5,8 +5,9 @@ use memberlist_core::types::LabelBufMutExt; impl NetTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: Runtime, diff --git a/transports/net/src/lib.rs b/transports/net/src/lib.rs index 10f5726d..dce667ed 100644 --- a/transports/net/src/lib.rs +++ b/transports/net/src/lib.rs @@ -156,8 +156,9 @@ where impl NetTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: Runtime, @@ -405,8 +406,9 @@ where impl Transport for NetTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: Runtime, diff --git a/transports/net/src/packet_processor.rs b/transports/net/src/packet_processor.rs index 4ae7d681..b505be2b 100644 --- a/transports/net/src/packet_processor.rs +++ b/transports/net/src/packet_processor.rs @@ -54,6 +54,7 @@ where impl PacketProcessor where A: AddressResolver, + A::Address: Send + Sync + 'static, T: Transport, T::Runtime: Runtime, { @@ -133,7 +134,7 @@ where } } } - let _ = socket.shutdown().await; + drop(socket); tracing::info!( "memberlist.transport.net: packet processor on {} exit", local_addr diff --git a/transports/net/src/security.rs b/transports/net/src/security.rs index 03dd81c3..0e26c9cc 100644 --- a/transports/net/src/security.rs +++ b/transports/net/src/security.rs @@ -35,7 +35,7 @@ impl From for NetTransportError Listener for NativeTlsListener { } async fn shutdown(&self) -> io::Result<()> { - TcpListener::shutdown(&self.ln).await + Ok(()) } fn local_addr(&self) -> std::net::SocketAddr { diff --git a/transports/net/src/stream_layer/tcp.rs b/transports/net/src/stream_layer/tcp.rs index 7d65d2af..6dc8dc13 100644 --- a/transports/net/src/stream_layer/tcp.rs +++ b/transports/net/src/stream_layer/tcp.rs @@ -111,7 +111,7 @@ impl Listener for TcpListener { } async fn shutdown(&self) -> io::Result<()> { - agnostic::net::TcpListener::shutdown(&self.ln).await + Ok(()) } fn local_addr(&self) -> SocketAddr { diff --git a/transports/net/src/stream_layer/tls.rs b/transports/net/src/stream_layer/tls.rs index d7b4409a..fb515c7b 100644 --- a/transports/net/src/stream_layer/tls.rs +++ b/transports/net/src/stream_layer/tls.rs @@ -243,7 +243,7 @@ impl Listener for TlsListener { } async fn shutdown(&self) -> io::Result<()> { - TcpListener::shutdown(&self.ln).await + Ok(()) } } diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index c2846f10..75ff0628 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "memberlist-quic" -version = "0.2.0" +version = "0.3.0" edition.workspace = true license.workspace = true repository.workspace = true @@ -20,7 +20,7 @@ default = ["compression", "quinn"] compression = ["rayon", "weezl"] # encryption feature enables nothing, because of quic is secure by default, this feature only for adapt to other transport layer encryption = ["memberlist-core/encryption"] -quinn = ["agnostic/quinn", "dep:quinn", "rustls", "agnostic/net"] +quinn = ["agnostic/quinn", "dep:quinn", "agnostic/net", "rustls"] s2n = ["s2n-quic", "s2n-quic-transport", "futures/bilock", "futures/unstable"] serde = ["memberlist-core/serde", "dep:serde", "humantime-serde", "indexmap/serde"] metrics = ["memberlist-core/metrics", "dep:metrics"] @@ -76,15 +76,15 @@ humantime-serde = { workspace = true, optional = true } metrics = { workspace = true, optional = true } # quinn -quinn = { version = "0.10.2", default-features = false, optional = true, features = ["tls-rustls", "futures-io"] } -rustls = { version = "0.21.9", default-features = false, optional = true, features = ["dangerous_configuration"] } +quinn = { version = "0.11", default-features = false, optional = true, features = ["futures-io", "rustls"] } +rustls = { version = "0.23", default-features = false, optional = true } # test -rcgen = { version = "0.12", optional = true } +rcgen = { version = "0.13", optional = true } # s2n -s2n-quic = { version = "1.36", optional = true } -s2n-quic-transport = { version = "0.36", optional = true } +s2n-quic = { version = "1.51", optional = true } +s2n-quic-transport = { version = "0.51", optional = true } # compression rayon = { version = "1.8", optional = true } diff --git a/transports/quic/src/error.rs b/transports/quic/src/error.rs index cb73e756..d42d33c3 100644 --- a/transports/quic/src/error.rs +++ b/transports/quic/src/error.rs @@ -70,7 +70,13 @@ impl core::fmt::Debug for QuicTrans } } -impl TransportError for QuicTransportError { +impl TransportError for QuicTransportError +where + A: AddressResolver, + A::Address: Send + Sync + 'static, + S: StreamLayer, + W: Wire, +{ fn is_remote_failure(&self) -> bool { if let Self::Stream(e) = self { e.is_remote_failure() diff --git a/transports/quic/src/io/read_message.rs b/transports/quic/src/io/read_message.rs index 2823a0cd..7fc3e415 100644 --- a/transports/quic/src/io/read_message.rs +++ b/transports/quic/src/io/read_message.rs @@ -2,8 +2,9 @@ use super::*; impl QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: RuntimeLite, diff --git a/transports/quic/src/io/send_message.rs b/transports/quic/src/io/send_message.rs index f9150ce4..44b5eb4b 100644 --- a/transports/quic/src/io/send_message.rs +++ b/transports/quic/src/io/send_message.rs @@ -5,8 +5,9 @@ use super::*; impl QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: RuntimeLite, diff --git a/transports/quic/src/io/send_packet.rs b/transports/quic/src/io/send_packet.rs index e4501746..45d50ae1 100644 --- a/transports/quic/src/io/send_packet.rs +++ b/transports/quic/src/io/send_packet.rs @@ -5,8 +5,9 @@ use super::*; impl QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: RuntimeLite, diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 58f64b2f..c01836ca 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -16,8 +16,7 @@ use std::{ time::{Duration, Instant}, }; -use agnostic::AsyncSpawner; -use agnostic_lite::RuntimeLite; +use agnostic_lite::{AsyncSpawner, RuntimeLite}; use atomic_refcell::AtomicRefCell; use byteorder::{ByteOrder, NetworkEndian}; use bytes::Bytes; @@ -94,7 +93,7 @@ pub type SmolQuicTransport = /// A [`Transport`] implementation based on QUIC pub struct QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, S: StreamLayer, W: Wire, @@ -122,8 +121,9 @@ where impl QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: RuntimeLite, @@ -312,7 +312,7 @@ where impl QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, S: StreamLayer, W: Wire, @@ -398,8 +398,9 @@ where impl Transport for QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, + A::Address: Send + Sync + 'static, S: StreamLayer, W: Wire, R: RuntimeLite, @@ -691,7 +692,7 @@ where impl Drop for QuicTransport where - I: Id, + I: Id + Send + Sync + 'static, A: AddressResolver, S: StreamLayer, W: Wire, diff --git a/transports/quic/src/processor.rs b/transports/quic/src/processor.rs index 07563159..1a2cbdd7 100644 --- a/transports/quic/src/processor.rs +++ b/transports/quic/src/processor.rs @@ -29,6 +29,7 @@ pub(super) struct Processor< impl Processor where A: AddressResolver, + A::Address: Send + Sync + 'static, T: Transport, S: StreamLayer, { diff --git a/transports/quic/src/stream_layer/quinn.rs b/transports/quic/src/stream_layer/quinn.rs index c20f3c9c..3aba5ef5 100644 --- a/transports/quic/src/stream_layer/quinn.rs +++ b/transports/quic/src/stream_layer/quinn.rs @@ -268,7 +268,7 @@ impl QuicStream for QuinnStream { } async fn finish(&mut self) -> Result<(), Self::Error> { - let fut = async { self.send.finish().await.map(|_| ()).map_err(Into::into) }; + let fut = async { self.send.finish().map(|_| ()).map_err(Into::into) }; match self.write_deadline { Some(timeout) => R::timeout_at(timeout, fut) @@ -330,14 +330,14 @@ impl QuicStream for QuinnStream { } async fn close(&mut self) -> Result<(), Self::Error> { - self.send.finish().await.map_err(QuinnBiStreamError::from)?; + self.send.finish().map_err(QuinnBiStreamError::from)?; self .recv .get_mut() .1 .stop(VarInt::from_u32(0)) .map_err(|_| { - QuinnBiStreamError::Read(QuinnReadStreamError::Read(quinn::ReadError::UnknownStream)).into() + QuinnBiStreamError::Read(QuinnReadStreamError::Read(quinn::ReadError::ClosedStream)).into() }) } } diff --git a/transports/quic/src/stream_layer/quinn/error.rs b/transports/quic/src/stream_layer/quinn/error.rs index 155511b7..01e24894 100644 --- a/transports/quic/src/stream_layer/quinn/error.rs +++ b/transports/quic/src/stream_layer/quinn/error.rs @@ -14,6 +14,9 @@ pub enum QuinnError { /// Write error. #[error(transparent)] Write(#[from] QuinnWriteStreamError), + /// Stream is closed. + #[error(transparent)] + Closed(#[from] quinn::ClosedStream), /// Stopped error. #[error(transparent)] @@ -69,6 +72,7 @@ impl From for QuinnError { match err { QuinnBiStreamError::Write(err) => Self::Write(err), QuinnBiStreamError::Read(err) => Self::Read(err), + QuinnBiStreamError::Closed(err) => Self::Closed(err), } } } @@ -80,6 +84,7 @@ impl QuicError for QuinnError { Self::Read(err) => err.is_remote_failure(), Self::Write(err) => err.is_remote_failure(), Self::Stopped(_) => true, + Self::Closed(_) => false, } } } @@ -136,7 +141,7 @@ impl QuicError for QuinnReadStreamError { match self { Self::Read(err) => is_read_error_remote_failure(err), Self::ReadExact(err) => match err { - quinn::ReadExactError::FinishedEarly => true, + quinn::ReadExactError::FinishedEarly(_) => true, quinn::ReadExactError::ReadError(err) => is_read_error_remote_failure(err), }, Self::ReadToEnd(err) => match err { @@ -184,6 +189,9 @@ pub enum QuinnBiStreamError { /// Error reading from the stream. #[error(transparent)] Read(#[from] QuinnReadStreamError), + /// Error on closed stream. + #[error(transparent)] + Closed(#[from] quinn::ClosedStream), } impl From for QuinnBiStreamError { @@ -209,6 +217,7 @@ impl QuicError for QuinnBiStreamError { match self { Self::Write(err) => err.is_remote_failure(), Self::Read(err) => err.is_remote_failure(), + Self::Closed(_) => false, } } } @@ -218,7 +227,7 @@ const fn is_read_error_remote_failure(err: &quinn::ReadError) -> bool { match err { quinn::ReadError::Reset(_) => true, quinn::ReadError::ConnectionLost(_) => false, - quinn::ReadError::UnknownStream => false, + quinn::ReadError::ClosedStream => false, quinn::ReadError::IllegalOrderedRead => true, quinn::ReadError::ZeroRttRejected => true, } @@ -234,6 +243,7 @@ const fn is_connection_error_remote_failure(err: &ConnectionError) -> bool { ConnectionError::Reset => true, ConnectionError::TimedOut => true, ConnectionError::LocallyClosed => false, + ConnectionError::CidsExhausted => false, } } @@ -241,11 +251,11 @@ const fn is_connection_error_remote_failure(err: &ConnectionError) -> bool { const fn is_connect_error_remote_failure(err: &ConnectError) -> bool { match err { ConnectError::EndpointStopping => false, - ConnectError::TooManyConnections => false, - ConnectError::InvalidDnsName(_) => false, ConnectError::InvalidRemoteAddress(_) => false, ConnectError::NoDefaultClientConfig => false, ConnectError::UnsupportedVersion => false, + ConnectError::CidsExhausted => false, + ConnectError::InvalidServerName(_) => false, } } @@ -254,7 +264,7 @@ const fn is_write_error_remote_failure(err: &WriteError) -> bool { match err { WriteError::Stopped(_) => true, WriteError::ConnectionLost(_) => false, - WriteError::UnknownStream => false, + WriteError::ClosedStream => false, WriteError::ZeroRttRejected => true, } } diff --git a/transports/quic/src/stream_layer/quinn/options.rs b/transports/quic/src/stream_layer/quinn/options.rs index 2841e23f..e5fbac9d 100644 --- a/transports/quic/src/stream_layer/quinn/options.rs +++ b/transports/quic/src/stream_layer/quinn/options.rs @@ -132,7 +132,7 @@ pub struct Options { ), setter(attrs(doc = "Sets the TLS client config for the inner [`quinn::ClientConfig`].")) )] - client_tls_config: Arc, + client_config: quinn::ClientConfig, /// TLS server config for the inner [`quinn::ServerConfig`]. #[viewit( getter( @@ -141,7 +141,7 @@ pub struct Options { ), setter(attrs(doc = "Sets the TLS server config for the inner [`quinn::ServerConfig`].")) )] - server_tls_config: Arc, + server_config: quinn::ServerConfig, /// Quinn [`EndpointConfig`](quinn::EndpointConfig). #[viewit( @@ -162,14 +162,14 @@ impl Options { /// Creates a new configuration object with default values. pub fn new( server_name: impl Into, - server_tls_config: rustls::ServerConfig, - client_tls_config: rustls::ClientConfig, + server_config: quinn::ServerConfig, + client_config: quinn::ClientConfig, endpoint_config: EndpointConfig, ) -> Self { Self { server_name: server_name.into(), - client_tls_config: Arc::new(client_tls_config), - server_tls_config: Arc::new(server_tls_config), + client_config, + server_config, endpoint_config, max_idle_timeout: Duration::from_secs(10).as_millis() as u32, max_concurrent_stream_limit: 256, @@ -202,8 +202,8 @@ impl From for QuinnOptions { fn from(config: Options) -> QuinnOptions { let Options { server_name, - client_tls_config, - server_tls_config, + mut client_config, + mut server_config, max_idle_timeout, max_concurrent_stream_limit, keep_alive_interval, @@ -226,14 +226,11 @@ impl From for QuinnOptions { transport.mtu_discovery_config(mtu_discovery_config); let transport = Arc::new(transport); - let mut server_config = quinn::ServerConfig::with_crypto(server_tls_config); server_config.transport = Arc::clone(&transport); // Disables connection migration. // Long-term this should be enabled, however we then need to handle address change // on connections in the `Connection`. server_config.migration(false); - - let mut client_config = quinn::ClientConfig::new(client_tls_config); client_config.transport_config(transport); QuinnOptions { diff --git a/transports/quic/src/stream_layer/s2n/error.rs b/transports/quic/src/stream_layer/s2n/error.rs index 2889ad4b..80d22269 100644 --- a/transports/quic/src/stream_layer/s2n/error.rs +++ b/transports/quic/src/stream_layer/s2n/error.rs @@ -8,7 +8,7 @@ pub use s2n_quic_transport::connection::limits::ValidationError; #[derive(Debug, thiserror::Error)] pub enum S2nError { /// Validation error. - #[error(transparent)] + #[error("{0}")] Validation(#[from] ValidationError), /// Connection error. diff --git a/transports/quic/src/tests.rs b/transports/quic/src/tests.rs index 6b8cdf03..8e7a8ff7 100644 --- a/transports/quic/src/tests.rs +++ b/transports/quic/src/tests.rs @@ -490,8 +490,10 @@ pub use quinn_stream_layer::*; mod quinn_stream_layer { use super::*; use crate::stream_layer::quinn::*; - use ::quinn::{ClientConfig, ServerConfig}; + use ::quinn::{ClientConfig, Endpoint, ServerConfig}; use futures::Future; + use quinn::{crypto::rustls::QuicClientConfig, EndpointConfig}; + use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName, UnixTime}; use smol_str::SmolStr; use std::{ error::Error, @@ -503,57 +505,123 @@ mod quinn_stream_layer { time::Duration, }; - struct SkipServerVerification; + /// Dummy certificate verifier that treats any certificate as valid. + /// NOTE, such verification is vulnerable to MITM attacks, but convenient for testing. + #[derive(Debug)] + struct SkipServerVerification(Arc); impl SkipServerVerification { fn new() -> Arc { - Arc::new(Self) + Arc::new(Self(Arc::new(rustls::crypto::ring::default_provider()))) } } - impl rustls::client::ServerCertVerifier for SkipServerVerification { + impl rustls::client::danger::ServerCertVerifier for SkipServerVerification { fn verify_server_cert( &self, - _end_entity: &rustls::Certificate, - _intermediates: &[rustls::Certificate], - _server_name: &rustls::ServerName, - _scts: &mut dyn Iterator, - _ocsp_response: &[u8], - _now: std::time::SystemTime, - ) -> Result { - Ok(rustls::client::ServerCertVerified::assertion()) + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &ServerName<'_>, + _ocsp: &[u8], + _now: UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result { + rustls::crypto::verify_tls12_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result { + rustls::crypto::verify_tls13_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + self.0.signature_verification_algorithms.supported_schemes() } } - fn configures() -> Result<(rustls::ServerConfig, rustls::ClientConfig), Box> { - let server_config = configure_server()?; + fn configures() -> Result<(ServerConfig, ClientConfig), Box> { + let (server_config, _) = configure_server()?; let client_config = configure_client(); Ok((server_config, client_config)) } - fn configure_client() -> rustls::ClientConfig { - rustls::ClientConfig::builder() - .with_safe_defaults() - .with_custom_certificate_verifier(SkipServerVerification::new()) - .with_no_client_auth() + fn configure_client() -> ClientConfig { + ClientConfig::new(Arc::new( + QuicClientConfig::try_from( + rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(SkipServerVerification::new()) + .with_no_client_auth(), + ) + .unwrap(), + )) } - fn configure_server() -> Result> { + // fn configure_server() -> Result> { + // let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); + // let cert_der = cert.serialize_der().unwrap(); + // let priv_key = cert.serialize_private_key_der(); + // let priv_key = rustls::pki_types::pem::SectionKind::PrivateKey(priv_key); + // let cert_chain = vec![rustls::pki_types::pem::SectionKind::Certificat( + // cert_der.clone(), + // )]; + + // let mut cfg = rustls::ServerConfig::builder() + // .with_safe_default_cipher_suites() + // .with_safe_default_kx_groups() + // .with_protocol_versions(&[&rustls::version::TLS13]) + // .unwrap() + // .with_no_client_auth() + // .with_single_cert(cert_chain, priv_key)?; + // cfg.max_early_data_size = u32::MAX; + // Ok(cfg) + // } + + // fn configure_client( + // server_certs: &[&[u8]], + // ) -> Result> { + // let mut certs = rustls::RootCertStore::empty(); + // for cert in server_certs { + // certs.add(CertificateDer::from(*cert))?; + // } + + // Ok(ClientConfig::with_root_certificates(Arc::new(certs))?) + // } + + fn configure_server( + ) -> Result<(ServerConfig, CertificateDer<'static>), Box> { let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); - let cert_der = cert.serialize_der().unwrap(); - let priv_key = cert.serialize_private_key_der(); - let priv_key = rustls::PrivateKey(priv_key); - let cert_chain = vec![rustls::Certificate(cert_der.clone())]; - - let mut cfg = rustls::ServerConfig::builder() - .with_safe_default_cipher_suites() - .with_safe_default_kx_groups() - .with_protocol_versions(&[&rustls::version::TLS13]) - .unwrap() - .with_no_client_auth() - .with_single_cert(cert_chain, priv_key)?; - cfg.max_early_data_size = u32::MAX; - Ok(cfg) + let cert_der = CertificateDer::from(cert.cert); + let priv_key = PrivatePkcs8KeyDer::from(cert.key_pair.serialize_der()); + + let mut server_config = + ServerConfig::with_single_cert(vec![cert_der.clone()], priv_key.into())?; + let transport_config = Arc::get_mut(&mut server_config.transport).unwrap(); + transport_config.max_concurrent_uni_streams(0_u8.into()); + + Ok((server_config, cert_der)) } #[allow(unused)] @@ -567,7 +635,7 @@ mod quinn_stream_layer { server_name, server_config, client_config, - Default::default(), + EndpointConfig::default(), ) } @@ -581,7 +649,7 @@ mod quinn_stream_layer { server_name, server_config, client_config, - Default::default(), + EndpointConfig::default(), ) .with_connect_timeout(timeout) } diff --git a/types/Cargo.toml b/types/Cargo.toml index a57f2613..aea5d8b9 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "memberlist-types" -version = "0.2.0" +version = "0.3.0" edition.workspace = true license.workspace = true repository.workspace = true @@ -30,17 +30,17 @@ serde = [ "nodecraft/serde", "indexmap?/serde" ] -rkyv = ["dep:rkyv", "smallvec-wrapper/rkyv32", "nodecraft/rkyv"] +rkyv = ["dep:rkyv", "smallvec-wrapper/rkyv", "nodecraft/rkyv"] [dependencies] bytes.workspace = true byteorder.workspace = true derive_more.workspace = true futures.workspace = true -ipnet = "2.9" +ipnet = "2" nodecraft.workspace = true paste.workspace = true -transformable = { workspace = true, features = ["bytes", "smol_str"] } +transformable = { workspace = true, features = ["bytes1", "smol_str03"] } thiserror.workspace = true smol_str.workspace = true smallvec-wrapper.workspace = true @@ -54,7 +54,7 @@ metrics = { workspace = true, optional = true } serde = { workspace = true, optional = true } base64 = { version = "0.22", optional = true } -rkyv = { version = "0.7", optional = true, features = ["smol_str", "bytes"] } +rkyv = { version = "0.8", optional = true, features = ["smol_str-0_3", "bytes-1"] } [dev-dependencies] rand = "0.8" diff --git a/types/src/ack.rs b/types/src/ack.rs index b159af06..f3d42772 100644 --- a/types/src/ack.rs +++ b/types/src/ack.rs @@ -12,8 +12,10 @@ const MAX_INLINED_BYTES: usize = 64; feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] -#[cfg_attr(feature = "rkyv", archive_attr(derive(Debug, PartialEq, Eq, Hash)))] +#[cfg_attr( + feature = "rkyv", + rkyv(compare(PartialEq), derive(Debug, PartialEq, Eq, Hash),) +)] pub struct Ack { /// The sequence number of the ack #[viewit( @@ -68,16 +70,13 @@ impl Ack { pub enum AckTransformError { /// The buffer did not contain enough bytes to encode an ack response. #[error("encode buffer too small")] - BufferTooSmall, + InsufficientBuffer(#[from] InsufficientBuffer), /// The buffer did not contain enough bytes to decode an ack response. #[error("the buffer did not contain enough bytes to decode Ack")] NotEnoughBytes, /// Varint decoding error #[error("fail to decode sequence number: {0}")] DecodeVarint(#[from] DecodeVarintError), - /// Varint encoding error - #[error("fail to encode sequence number: {0}")] - EncodeVarint(#[from] EncodeVarintError), } impl Transformable for Ack { @@ -87,7 +86,9 @@ impl Transformable for Ack { let encoded_len = self.encoded_len(); if encoded_len > dst.len() { - return Err(Self::Error::BufferTooSmall); + return Err(Self::Error::InsufficientBuffer( + InsufficientBuffer::with_information(encoded_len as u64, dst.len() as u64), + )); } let mut offset = 0; @@ -260,10 +261,9 @@ impl Transformable for Ack { feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] #[cfg_attr( feature = "rkyv", - archive_attr(derive(Debug, Clone, PartialEq, Eq, Hash), repr(transparent)) + rkyv(derive(Debug, Clone, PartialEq, Eq, Hash), compare(PartialEq)) )] #[repr(transparent)] pub struct Nack { diff --git a/types/src/alive.rs b/types/src/alive.rs index c7b279ff..4e9f3aaf 100644 --- a/types/src/alive.rs +++ b/types/src/alive.rs @@ -17,7 +17,7 @@ use transformable::Transformable; feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] +#[cfg_attr(feature = "rkyv", rkyv(compare(PartialEq)))] pub struct Alive { /// The incarnation of the alive message #[viewit( @@ -145,7 +145,7 @@ pub enum AliveTransformError { #[error("meta transform error: {0}")] Meta(#[from] MetaError), /// Message too large. - #[error("encoded message too large, max {} got {0}", u32::MAX)] + #[error("encoded message too large, max 4294967295 got {0}")] TooLarge(u64), /// Encode buffer too small. #[error("encode buffer too small")] @@ -167,7 +167,11 @@ impl core::fmt::Debug for AliveTransformErro } } -impl Transformable for Alive { +impl Transformable for Alive +where + I: Transformable + 'static, + A: Transformable + 'static, +{ type Error = AliveTransformError; fn encode(&self, dst: &mut [u8]) -> Result { diff --git a/types/src/bad_state.rs b/types/src/bad_state.rs index 6469c675..bbcc4199 100644 --- a/types/src/bad_state.rs +++ b/types/src/bad_state.rs @@ -19,7 +19,7 @@ macro_rules! bad_bail { feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] - #[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] + #[cfg_attr(feature = "rkyv", rkyv(compare(PartialEq)))] pub struct $name { /// The incarnation of the message. #[viewit( @@ -156,7 +156,7 @@ macro_rules! bad_bail { #[error("the buffer did not contain enough bytes to decode {}", stringify!($name))] NotEnoughBytes, /// The encoded size is too large - #[error("encoded size too large, max {} got {0}", u32::MAX)] + #[error("encoded size too large, max 4294967295 got {0}")] TooLarge(u64), } diff --git a/types/src/cidr_policy.rs b/types/src/cidr_policy.rs index 0bad7f19..d2d8e184 100644 --- a/types/src/cidr_policy.rs +++ b/types/src/cidr_policy.rs @@ -102,7 +102,7 @@ impl CIDRsPolicy { /// Returns `true` connection from any IP is blocked. pub fn is_block_all(&self) -> bool { - self.allowed_cidrs.as_ref().map_or(false, |x| x.is_empty()) + self.allowed_cidrs.as_ref().is_some_and(|x| x.is_empty()) } /// Returns `true` connection from any IP is allowed. diff --git a/types/src/err.rs b/types/src/err.rs index 78341852..090fca97 100644 --- a/types/src/err.rs +++ b/types/src/err.rs @@ -14,10 +14,9 @@ use transformable::Transformable; feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] #[cfg_attr( feature = "rkyv", - archive_attr(derive(Debug, PartialEq, Eq, Hash), repr(transparent)) + rkyv(derive(Debug, PartialEq, Eq, Hash), compare(PartialEq)) )] #[repr(transparent)] pub struct ErrorResponse { diff --git a/types/src/label.rs b/types/src/label.rs index 6b4060cf..c1bb57c5 100644 --- a/types/src/label.rs +++ b/types/src/label.rs @@ -8,7 +8,7 @@ pub enum InvalidLabel { #[error("the size of label must between [0-255] bytes, got {0}")] TooLarge(usize), /// The label is not valid utf8. - #[error("{0}")] + #[error(transparent)] Utf8(#[from] core::str::Utf8Error), } @@ -275,7 +275,7 @@ impl core::fmt::Display for Label { #[derive(Debug, thiserror::Error)] pub enum LabelError { /// Invalid label. - #[error("{0}")] + #[error(transparent)] InvalidLabel(#[from] InvalidLabel), /// Not enough bytes to decode label. #[error("not enough bytes to decode label")] diff --git a/types/src/lib.rs b/types/src/lib.rs index a91fab12..4ae4cf7a 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -1,8 +1,8 @@ //! Types used by the [`memberlist`](https://crates.io/crates/memberlist) crate. #![doc(html_logo_url = "https://raw.githubusercontent.com/al8n/memberlist/main/art/logo_72x72.png")] #![forbid(unsafe_code)] -#![deny(warnings, missing_docs)] -#![allow(clippy::type_complexity)] +#![deny(warnings)] +#![allow(clippy::type_complexity, unexpected_cfgs)] #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(docsrs, allow(unused_attributes))] diff --git a/types/src/message.rs b/types/src/message.rs index 0c81bbd6..73c3bbfb 100644 --- a/types/src/message.rs +++ b/types/src/message.rs @@ -150,42 +150,46 @@ pub enum MessageTransformError { #[error("not enough bytes to decode message")] NotEnoughBytes, /// Returned when the fail to transform ping message. - #[error("{0}")] + #[error(transparent)] Ping(#[from] PingTransformError), /// Returned when the fail to transform indirect ping message. - #[error("{0}")] + #[error(transparent)] IndirectPing(#[from] IndirectPingTransformError), /// Returned when the fail to transform ack message. - #[error("{0}")] + #[error(transparent)] Ack(#[from] AckTransformError), /// Returned when the fail to transform suspect message. - #[error("{0}")] + #[error(transparent)] Suspect(#[from] SuspectTransformError), /// Returned when the fail to transform alive message. - #[error("{0}")] + #[error(transparent)] Alive(#[from] AliveTransformError), /// Returned when the fail to transform dead message. - #[error("{0}")] + #[error(transparent)] Dead(#[from] DeadTransformError), /// Returned when the fail to transform push pull message. - #[error("{0}")] + #[error(transparent)] PushPull(#[from] PushPullTransformError), /// Returned when the fail to transform user data message. - #[error("{0}")] + #[error(transparent)] UserData(#[from] BytesTransformError), /// Returned when the fail to transform nack message. - #[error("{0}")] + #[error(transparent)] Nack(#[from] NumberTransformError), /// Returned when the fail to transform error response message. - #[error("{0}")] + #[error(transparent)] ErrorResponse(#[from] StringTransformError), } const USER_DATA_LEN_SIZE: usize = core::mem::size_of::(); const INLINED_BYTES_SIZE: usize = 64; -impl Transformable - for Message +impl Transformable for Message +where + I: Transformable + core::fmt::Debug + 'static, + I::Error: Send + Sync + 'static, + A: Transformable + core::fmt::Debug + 'static, + A::Error: Send + Sync + 'static, { type Error = MessageTransformError; @@ -285,7 +289,7 @@ impl T } Self::ERRORRESPONSE_TAG => { let (len, msg) = ::decode(src)?; - (len + 1, Self::ErrorResponse(ErrorResponse { message: msg })) + (len + 1, Self::ErrorResponse(ErrorResponse::new(msg))) } _ => return Err(Self::Error::NotEnoughBytes), }) @@ -596,9 +600,7 @@ mod test { #[tokio::test] async fn test_error_response_transformable_round_trip() { - let msg = Message::::ErrorResponse(ErrorResponse { - message: "hello world".into(), - }); + let msg = Message::::ErrorResponse(ErrorResponse::new("hello world")); let mut buf = vec![0u8; msg.encoded_len()]; msg.encode(&mut buf).unwrap(); let (len, decoded) = Message::decode(&buf).unwrap(); diff --git a/types/src/meta.rs b/types/src/meta.rs index 721e47d4..ed96187c 100644 --- a/types/src/meta.rs +++ b/types/src/meta.rs @@ -15,8 +15,10 @@ pub struct LargeMeta(usize); feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] -#[cfg_attr(feature = "rkyv", archive_attr(derive(Debug, PartialEq, Eq, Hash)))] +#[cfg_attr( + feature = "rkyv", + rkyv(derive(Debug, PartialEq, Eq, Hash), compare(PartialEq)) +)] pub struct Meta(Bytes); impl Default for Meta { diff --git a/types/src/ping.rs b/types/src/ping.rs index f5c025a1..b01b1f8d 100644 --- a/types/src/ping.rs +++ b/types/src/ping.rs @@ -17,7 +17,7 @@ macro_rules! bail_ping { #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(::serde::Serialize, ::serde::Deserialize))] #[cfg_attr(feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive))] - #[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] + #[cfg_attr(feature = "rkyv", rkyv(compare(PartialEq)))] pub struct $name { /// The sequence number of the ack #[viewit( diff --git a/types/src/push_pull_state.rs b/types/src/push_pull_state.rs index 3ed4b3e0..0a800209 100644 --- a/types/src/push_pull_state.rs +++ b/types/src/push_pull_state.rs @@ -14,7 +14,7 @@ use transformable::Transformable; feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] +#[cfg_attr(feature = "rkyv", rkyv(compare(PartialEq)))] pub struct PushNodeState { /// The id of the push node state. #[viewit( @@ -403,7 +403,7 @@ const _: () = { feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] +#[cfg_attr(feature = "rkyv", rkyv(compare(PartialEq)))] pub struct PushPull { /// Whether the push pull message is a join message. #[viewit( diff --git a/types/src/server.rs b/types/src/server.rs index d8ab16c0..1199f6dd 100644 --- a/types/src/server.rs +++ b/types/src/server.rs @@ -9,10 +9,9 @@ use super::{DelegateVersion, Meta, ProtocolVersion}; feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] #[cfg_attr( feature = "rkyv", - archive_attr(derive(Debug, Clone, PartialEq, Eq, Hash)) + rkyv(compare(PartialEq), derive(Debug, Clone, PartialEq, Eq, Hash)) )] #[repr(u8)] #[non_exhaustive] @@ -81,7 +80,7 @@ pub struct UnknownState(u8); feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] +#[cfg_attr(feature = "rkyv", rkyv(compare(PartialEq)))] pub struct NodeState { /// The id of the node. #[viewit( diff --git a/types/src/version.rs b/types/src/version.rs index 7624dc08..5c9b9b9c 100644 --- a/types/src/version.rs +++ b/types/src/version.rs @@ -10,14 +10,9 @@ pub struct UnknownDelegateVersion(u8); feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] #[cfg_attr( feature = "rkyv", - archive_attr( - derive(Debug, Copy, Clone, Eq, PartialEq, Hash), - repr(u8), - non_exhaustive - ) + rkyv(compare(PartialEq), derive(Debug, Copy, Clone, Eq, PartialEq, Hash),) )] #[non_exhaustive] #[repr(u8)] @@ -76,14 +71,9 @@ pub struct UnknownProtocolVersion(u8); feature = "rkyv", derive(::rkyv::Serialize, ::rkyv::Deserialize, ::rkyv::Archive) )] -#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))] #[cfg_attr( feature = "rkyv", - archive_attr( - derive(Debug, Copy, Clone, Eq, PartialEq, Hash), - repr(u8), - non_exhaustive - ) + rkyv(compare(PartialEq), derive(Debug, Copy, Clone, Eq, PartialEq, Hash),) )] #[non_exhaustive] #[repr(u8)] @@ -139,7 +129,7 @@ mod tests { assert_eq!(DelegateVersion::V1 as u8, 1); assert_eq!(DelegateVersion::V1.to_string(), "V1"); assert_eq!(DelegateVersion::try_from(1), Ok(DelegateVersion::V1)); - assert_eq!(DelegateVersion::try_from(1), Err(UnknownDelegateVersion(1))); + assert_eq!(DelegateVersion::try_from(2), Err(UnknownDelegateVersion(2))); } #[test] @@ -147,6 +137,6 @@ mod tests { assert_eq!(ProtocolVersion::V1 as u8, 1); assert_eq!(ProtocolVersion::V1.to_string(), "V1"); assert_eq!(ProtocolVersion::try_from(1), Ok(ProtocolVersion::V1)); - assert_eq!(ProtocolVersion::try_from(1), Err(UnknownProtocolVersion(1))); + assert_eq!(ProtocolVersion::try_from(2), Err(UnknownProtocolVersion(2))); } }