Skip to content

Commit

Permalink
deps: drop the dependency on async-trait
Browse files Browse the repository at this point in the history
  • Loading branch information
niklaslong committed Mar 12, 2024
1 parent 5f94e48 commit 6ca3fb4
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 127 deletions.
9 changes: 3 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,19 @@ documentation = "https://docs.rs/kadmium"
readme = "README.md"
categories = ["algorithms", "network-programming"]
keywords = ["p2p", "peer-to-peer", "networking"]
rust-version = "1.75"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
full = ["codec", "sync"]
codec = ["tokio-util/codec", "bincode", "bytes/serde"]
sync = ["async-trait", "parking_lot", "tokio"]
sync = ["parking_lot", "tokio"]

[dependencies]
rand = "0.8.5"
time = "0.3.11"

[dependencies.async-trait]
version = "0.1.56"
optional = true

[dependencies.parking_lot]
version = "0.12.1"
optional = true
Expand All @@ -50,7 +47,7 @@ optional = true
[dev-dependencies]
deadline = "0.2.0"
paste = "1.0"
pea2pea = "0.48.0"
pea2pea = "0.49.0"

[dev-dependencies.tracing]
version = "0.1.35"
Expand Down
243 changes: 127 additions & 116 deletions src/tcp/traits.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[cfg(feature = "sync")]
use std::net::SocketAddr;
use std::{future::Future, net::SocketAddr};

use bytes::Bytes;
#[cfg(feature = "sync")]
Expand All @@ -15,7 +15,6 @@ use crate::{
/// A trait used to enable core kadcast functionality on the implementor.
#[cfg(feature = "sync")]
#[cfg_attr(doc_cfg, doc(cfg(feature = "sync")))]
#[async_trait::async_trait]
pub trait Kadcast
where
Self: Clone + Send + Sync + 'static,
Expand All @@ -35,155 +34,167 @@ where
fn router(&self) -> &SyncTcpRouter;

/// Returns `true` if the address is connected, `false` if it isn't.
async fn is_connected(&self, addr: SocketAddr) -> bool;
fn is_connected(&self, addr: SocketAddr) -> impl Future<Output = bool> + Send;

/// Connects to the address and returns if it was succesful or not.
///
/// Note: Kadmium assumes this method calls [`SyncTcpRouter::insert`] and
/// [`SyncTcpRouter::set_connected`] appropriately.
async fn connect(&self, addr: SocketAddr) -> bool;
fn connect(&self, addr: SocketAddr) -> impl Future<Output = bool> + Send;

/// Disconnects the address and returns `true` if it was connected, returns `false` if it wasn't.
///
/// Note: Kadmium assumes this method calls [`SyncTcpRouter::set_disconnected`] appropriately.
async fn disconnect(&self, addr: SocketAddr) -> bool;
fn disconnect(&self, addr: SocketAddr) -> impl Future<Output = bool> + Send;

/// Sends a message to the destination address.
async fn unicast(&self, dst: SocketAddr, message: Message);
fn unicast(&self, dst: SocketAddr, message: Message) -> impl Future<Output = ()> + Send;

/// Starts the periodic ping task.
async fn ping(&self) {
let self_clone = self.clone();

tokio::spawn(async move {
loop {
for addr in self_clone.router().connected_addrs() {
self_clone
.unicast(addr, Message::Ping(self_clone.router().generate_ping()))
fn ping(&self) -> impl Future<Output = ()> + Send {
async {
let self_clone = self.clone();

tokio::spawn(async move {
loop {
for addr in self_clone.router().connected_addrs() {
self_clone
.unicast(addr, Message::Ping(self_clone.router().generate_ping()))
.await
}

tokio::time::sleep(std::time::Duration::from_secs(Self::PING_INTERVAL_SECS))
.await
}
});

tokio::time::sleep(std::time::Duration::from_secs(Self::PING_INTERVAL_SECS)).await
}
});

// TODO: consider returning the task handle, or at least track it internally.
// TODO: consider returning the task handle, or at least track it internally.
}
}

/// Starts the periodic peer discovery task.
async fn peer(&self) {
// TODO: a few current issues to consider:
//
// 1. identifiers are more likely to be in higher index buckets, not necessarily an issue
// so long as bucket size is above the minimum number of peers.
// 2. the above also guaranties a search returning K nodes can indeed return K nodes, so
// long as K is below the minimum number of peers. If K is larger a node will return at
// worst min(min peers, K) and at best min(peers, K).
//
// Therefore: bucket size >= min peers >= K is likely ideal.

let self_clone = self.clone();

tokio::spawn(async move {
loop {
for (_id, addr, is_connected) in
self_clone.router().select_search_peers(Self::ALPHA.into())
{
let is_connected = match is_connected {
true => self_clone.is_connected(addr).await,
false => self_clone.connect(addr).await,
};

if is_connected {
self_clone
.unicast(
addr,
Message::FindKNodes(self_clone.router().generate_find_k_nodes()),
)
.await;
fn peer(&self) -> impl Future<Output = ()> + Send {
async {
// TODO: a few current issues to consider:
//
// 1. identifiers are more likely to be in higher index buckets, not necessarily an issue
// so long as bucket size is above the minimum number of peers.
// 2. the above also guaranties a search returning K nodes can indeed return K nodes, so
// long as K is below the minimum number of peers. If K is larger a node will return at
// worst min(min peers, K) and at best min(peers, K).
//
// Therefore: bucket size >= min peers >= K is likely ideal.

let self_clone = self.clone();

tokio::spawn(async move {
loop {
for (_id, addr, is_connected) in
self_clone.router().select_search_peers(Self::ALPHA.into())
{
let is_connected = match is_connected {
true => self_clone.is_connected(addr).await,
false => self_clone.connect(addr).await,
};

if is_connected {
self_clone
.unicast(
addr,
Message::FindKNodes(
self_clone.router().generate_find_k_nodes(),
),
)
.await;
}
}
}

let peer_deficit =
Self::PEER_TARGET as i128 - self_clone.router().connected_addrs().len() as i128;
let peer_deficit = Self::PEER_TARGET as i128
- self_clone.router().connected_addrs().len() as i128;

if peer_deficit < 0 {
let addrs: Vec<SocketAddr> = {
let mut rng = rand::thread_rng();
if peer_deficit < 0 {
let addrs: Vec<SocketAddr> = {
let mut rng = rand::thread_rng();

self_clone
.router()
.connected_addrs()
.choose_multiple(&mut rng, peer_deficit.unsigned_abs() as usize)
.copied()
.collect()
};
self_clone
.router()
.connected_addrs()
.choose_multiple(&mut rng, peer_deficit.unsigned_abs() as usize)
.copied()
.collect()
};

for addr in addrs {
self_clone.disconnect(addr).await;
for addr in addrs {
self_clone.disconnect(addr).await;
}
}
}

if peer_deficit > 0 {
let addrs: Vec<SocketAddr> = {
let mut rng = rand::thread_rng();
self_clone
.router()
.disconnected_addrs()
.choose_multiple(&mut rng, peer_deficit as usize)
.copied()
.collect()
if peer_deficit > 0 {
let addrs: Vec<SocketAddr> = {
let mut rng = rand::thread_rng();
self_clone
.router()
.disconnected_addrs()
.choose_multiple(&mut rng, peer_deficit as usize)
.copied()
.collect()
};

for addr in addrs {
self_clone.connect(addr).await;
}
}

// Check the peer counts again.
let sleep_duration = {
std::time::Duration::from_secs(
if self_clone.router().connected_addrs().len()
< Self::PEER_TARGET.into()
{
Self::BOOTSTRAP_INTERVAL_SECS
} else {
Self::DISCOVERY_INTERVAL_SECS
},
)
};

for addr in addrs {
self_clone.connect(addr).await;
}
tokio::time::sleep(sleep_duration).await;
}

// Check the peer counts again.
let sleep_duration = {
std::time::Duration::from_secs(
if self_clone.router().connected_addrs().len() < Self::PEER_TARGET.into() {
Self::BOOTSTRAP_INTERVAL_SECS
} else {
Self::DISCOVERY_INTERVAL_SECS
},
)
};

tokio::time::sleep(sleep_duration).await;
}
});
});
}
}

// TODO: work out how and if data should be chunked (1 block per-message or multiple smaller
// messages). Up to the caller for now.
/// Broadcast data to the network, following the kadcast protocol.
async fn kadcast(&self, data: Bytes) -> Nonce {
let peers = self
.router()
.select_broadcast_peers(Id::BITS as u32)
.unwrap();

// TODO: record nonce somewhere.
let nonce = {
let mut rng = thread_rng();
rng.gen()
};

for (height, addr) in peers {
let message = Message::Chunk(Chunk {
// Can be used to trace the broadcast. If set differently for each peer here, it will
// be the same within a propagation sub-tree.
nonce,
height,
// Cheap as the backing storage is shared amongst instances.
data: data.clone(),
});
fn kadcast(&self, data: Bytes) -> impl Future<Output = Nonce> + Send {
async move {
let peers = self
.router()
.select_broadcast_peers(Id::BITS as u32)
.unwrap();

// TODO: record nonce somewhere.
let nonce = {
let mut rng = thread_rng();
rng.gen()
};

for (height, addr) in peers {
let message = Message::Chunk(Chunk {
// Can be used to trace the broadcast. If set differently for each peer here, it will
// be the same within a propagation sub-tree.
nonce,
height,
// Cheap as the backing storage is shared amongst instances.
data: data.clone(),
});

self.unicast(addr, message).await;
}

self.unicast(addr, message).await;
nonce
}

nonce
}
}
6 changes: 1 addition & 5 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ impl ProcessData<KadNode> for Data {
}
}

#[async_trait::async_trait]
impl Kadcast for KadNode {
// Shorten the defaults for testing purposes.
const PEER_TARGET: u16 = 20;
Expand Down Expand Up @@ -134,7 +133,7 @@ impl KadNode {
pub async fn new(id: Id) -> Self {
Self {
node: Node::new(Config {
listener_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
listener_addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0)),
max_connections: 1024,
..Default::default()
}),
Expand Down Expand Up @@ -163,7 +162,6 @@ impl Pea2Pea for KadNode {
}
}

#[async_trait::async_trait]
impl Reading for KadNode {
type Message = Message;
type Codec = MessageCodec;
Expand Down Expand Up @@ -214,7 +212,6 @@ impl Writing for KadNode {
}
}

#[async_trait::async_trait]
impl Handshake for KadNode {
async fn perform_handshake(&self, mut conn: Connection) -> io::Result<Connection> {
let local_id = self.router.local_id();
Expand Down Expand Up @@ -281,7 +278,6 @@ impl Handshake for KadNode {
}
}

#[async_trait::async_trait]
impl OnDisconnect for KadNode {
async fn on_disconnect(&self, addr: SocketAddr) {
self.router.set_disconnected(addr);
Expand Down

0 comments on commit 6ca3fb4

Please sign in to comment.