Skip to content

Commit

Permalink
Add initial overlay routing table management (#195)
Browse files Browse the repository at this point in the history
* Add initial overlay routing table management

* Address PR feedback (multiple changes)

* Modify overlay to use parking_lot::RwLock to avoid unnecessary async
* Refactor
* Fix typo

* Remove unnecessary async functions and add unit tests
  • Loading branch information
jacobkaufmann authored Dec 23, 2021
1 parent e6808d6 commit 73f38b2
Show file tree
Hide file tree
Showing 10 changed files with 1,089 additions and 134 deletions.
25 changes: 13 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ trin-state = { path = "trin-state" }
trin-cli = { path = "trin-cli" }

[dependencies.discv5]
branch = "fewer-mutable-borrows"
git = "https://github.com/carver/discv5"
branch = "master"
git = "https://github.com/sigp/discv5"

[dev-dependencies]
ethportal-peertest = { path = "ethportal-peertest" }
Expand Down
4 changes: 2 additions & 2 deletions ethportal-peertest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ trin-state = { path = "../trin-state" }
uds_windows = "1.0.1"

[dependencies.discv5]
branch = "fewer-mutable-borrows"
git = "https://github.com/carver/discv5"
branch = "master"
git = "https://github.com/sigp/discv5"
5 changes: 3 additions & 2 deletions trin-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ futures = "0.3.13"
hex = "0.4.3"
httparse = "1.5.1"
log = "0.4.14"
parking_lot = "0.11.2"
rand = "0.8.4"
rlp = "0.5.0"
rocksdb = "0.16.0"
Expand Down Expand Up @@ -45,8 +46,8 @@ uds_windows = "1.0.1"
interfaces = "0.0.7"

[dependencies.discv5]
branch = "fewer-mutable-borrows"
git = "https://github.com/carver/discv5"
branch = "master"
git = "https://github.com/sigp/discv5"

[dependencies.rusqlite]
version = "0.25.3"
Expand Down
33 changes: 17 additions & 16 deletions trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use super::{
types::uint::U256,
Enr,
};
use crate::locks::RwLoggingExt;
use crate::portalnet::types::messages::{
ByteList, Content, FindContent, FindNodes, Message, Nodes, Ping, Pong, ProtocolId, Request,
Response,
Expand All @@ -20,29 +19,34 @@ use discv5::{
TalkRequest,
};
use futures::channel::oneshot;
use parking_lot::RwLock;
use rocksdb::DB;
use ssz::Encode;
use tokio::sync::{mpsc::UnboundedSender, RwLock};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, warn};

pub use super::overlay_service::OverlayRequestError;

/// Configuration parameters for the overlay network.
#[derive(Clone)]
pub struct OverlayConfig {
pub bootnode_enrs: Vec<Enr>,
pub bucket_pending_timeout: Duration,
pub max_incoming_per_bucket: usize,
pub table_filter: Option<Box<dyn Filter<Node>>>,
pub bucket_filter: Option<Box<dyn Filter<Node>>>,
pub ping_queue_interval: Option<Duration>,
}

impl Default for OverlayConfig {
fn default() -> Self {
Self {
bootnode_enrs: vec![],
bucket_pending_timeout: Duration::from_secs(60),
max_incoming_per_bucket: 16,
table_filter: None,
bucket_filter: None,
ping_queue_interval: None,
}
}
}
Expand Down Expand Up @@ -88,6 +92,8 @@ impl OverlayProtocol {
Arc::clone(&discovery),
Arc::clone(&db),
Arc::clone(&kbuckets),
config.bootnode_enrs,
config.ping_queue_interval,
Arc::clone(&data_radius),
protocol.clone(),
)
Expand All @@ -110,12 +116,12 @@ impl OverlayProtocol {
}

/// Returns the ENR of the local node.
pub async fn local_enr(&self) -> Enr {
pub fn local_enr(&self) -> Enr {
self.discovery.discv5.local_enr()
}

/// Returns the data radius of the local node.
pub async fn data_radius(&self) -> U256 {
pub fn data_radius(&self) -> U256 {
*self.data_radius
}

Expand All @@ -135,6 +141,7 @@ impl OverlayProtocol {
Err(_) => return Err(OverlayRequestError::DecodeError),
};
let direction = RequestDirection::Incoming {
id: talk_request.id().clone(),
source: *talk_request.node_id(),
};

Expand All @@ -143,20 +150,18 @@ impl OverlayProtocol {
}

/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.
pub async fn table_entries_id(&self) -> Vec<NodeId> {
pub fn table_entries_id(&self) -> Vec<NodeId> {
self.kbuckets
.write_with_warn()
.await
.write()
.iter()
.map(|entry| *entry.node.key.preimage())
.collect()
}

/// Returns a vector of all the ENRs of nodes currently contained in the routing table.
pub async fn table_entries_enr(&self) -> Vec<Enr> {
pub fn table_entries_enr(&self) -> Vec<Enr> {
self.kbuckets
.write_with_warn()
.await
.write()
.iter()
.map(|entry| entry.node.value.enr().clone())
.collect()
Expand All @@ -166,7 +171,7 @@ impl OverlayProtocol {
pub async fn send_ping(&self, enr: Enr) -> Result<Pong, OverlayRequestError> {
// Construct the request.
let enr_seq = self.discovery.local_enr().seq();
let data_radius = self.data_radius().await;
let data_radius = self.data_radius();
let custom_payload = ByteList::from(data_radius.as_ssz_bytes());
let request = Ping {
enr_seq,
Expand Down Expand Up @@ -238,11 +243,7 @@ impl OverlayProtocol {
direction: RequestDirection,
) -> Result<Response, OverlayRequestError> {
let (tx, rx) = oneshot::channel();
let overlay_request = OverlayRequest {
request,
direction,
responder: Some(tx),
};
let overlay_request = OverlayRequest::new(request, direction, Some(tx));
if let Err(error) = self.request_tx.send(overlay_request) {
warn!(
"Failure sending request over {:?} service channel",
Expand Down
Loading

0 comments on commit 73f38b2

Please sign in to comment.