Skip to content

Commit

Permalink
fix(disc): allow concurrent lookups (#1539)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Feb 24, 2023
1 parent 3589879 commit dc57fba
Showing 1 changed file with 28 additions and 19 deletions.
47 changes: 28 additions & 19 deletions crates/net/discv4/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,6 @@ impl Discv4Service {
}
}

/// Returns true if there's a lookup in progress
fn is_lookup_in_progress(&self) -> bool {
!self.pending_find_nodes.is_empty()
}

/// Returns the current enr sequence
fn enr_seq(&self) -> Option<u64> {
if self.config.enable_eip868 {
Expand Down Expand Up @@ -636,7 +631,7 @@ impl Discv4Service {
);

// From those 16, pick the 3 closest to start the concurrent lookup.
let closest = ctx.closest(ALPHA);
let closest = ctx.closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));

trace!(target : "net::discv4", ?target, num = closest.len(), "Start lookup closest nodes");

Expand Down Expand Up @@ -1029,7 +1024,13 @@ impl Discv4Service {
}
PingReason::Lookup(node, ctx) => {
self.update_on_pong(node, pong.enr_sq);
self.find_node(&node, ctx);
if self.pending_find_nodes.contains_key(&node.id) {
// there's already another pending request, unmark it so the next round can try
// to send it
ctx.unmark_queried(node.id);
} else {
self.find_node(&node, ctx);
}
}
}
}
Expand Down Expand Up @@ -1171,7 +1172,7 @@ impl Discv4Service {
}

// get the next closest nodes, not yet queried nodes and start over.
let closest = ctx.closest(ALPHA);
let closest = ctx.closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));

for closest in closest {
let key = kad_key(closest.id);
Expand Down Expand Up @@ -1380,10 +1381,7 @@ impl Discv4Service {
}

// trigger self lookup
if self.config.enable_lookup &&
!self.is_lookup_in_progress() &&
self.lookup_interval.poll_tick(cx).is_ready()
{
if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() {
let target = self.lookup_rotator.next(&self.local_node_record.id);
self.lookup_with(target, None);
}
Expand Down Expand Up @@ -1706,14 +1704,18 @@ impl LookupContext {
self.inner.target.preimage().0
}

/// Returns an iterator over the closest nodes that are not queried yet.
fn closest(&self, num: usize) -> Vec<NodeRecord> {
/// Returns the closest nodes that have not been queried yet.
fn closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
where
P: FnMut(&NodeRecord) -> bool,
{
self.inner
.closest_nodes
.borrow()
.iter()
.filter(|(_, node)| !node.queried)
.map(|(_, n)| n.record)
.filter(filter)
.take(num)
.collect()
}
Expand All @@ -1727,15 +1729,24 @@ impl LookupContext {
}
}

/// Marks the node as queried
fn mark_queried(&self, id: PeerId) {
fn set_queried(&self, id: PeerId, val: bool) {
if let Some((_, node)) =
self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
{
node.queried = true;
node.queried = val;
}
}

/// Marks the node as queried
fn mark_queried(&self, id: PeerId) {
self.set_queried(id, true)
}

/// Marks the node as not queried
fn unmark_queried(&self, id: PeerId) {
self.set_queried(id, false)
}

/// Marks the node as responded
fn mark_responded(&self, id: PeerId) {
if let Some((_, node)) =
Expand Down Expand Up @@ -2066,12 +2077,10 @@ mod tests {
);

service.lookup_self();
assert!(service.is_lookup_in_progress());
assert_eq!(service.pending_find_nodes.len(), 1);

poll_fn(|cx| {
let _ = service.poll(cx);
assert!(service.is_lookup_in_progress());
assert_eq!(service.pending_find_nodes.len(), 1);

Poll::Ready(())
Expand Down

0 comments on commit dc57fba

Please sign in to comment.