From dc57fbad0751eb025840f1d0882facf0e7b11ef8 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 24 Feb 2023 13:21:05 +0100 Subject: [PATCH] fix(disc): allow concurrent lookups (#1539) --- crates/net/discv4/src/lib.rs | 47 +++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index bcfe49f73b17..eb1424ba47e2 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -484,11 +484,6 @@ impl Discv4Service { } } - /// Returns true if there's a lookup in progress - fn is_lookup_in_progress(&self) -> bool { - !self.pending_find_nodes.is_empty() - } - /// Returns the current enr sequence fn enr_seq(&self) -> Option { if self.config.enable_eip868 { @@ -636,7 +631,7 @@ impl Discv4Service { ); // From those 16, pick the 3 closest to start the concurrent lookup. - let closest = ctx.closest(ALPHA); + let closest = ctx.closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id)); trace!(target : "net::discv4", ?target, num = closest.len(), "Start lookup closest nodes"); @@ -1029,7 +1024,13 @@ impl Discv4Service { } PingReason::Lookup(node, ctx) => { self.update_on_pong(node, pong.enr_sq); - self.find_node(&node, ctx); + if self.pending_find_nodes.contains_key(&node.id) { + // there's already another pending request, unmark it so the next round can try + // to send it + ctx.unmark_queried(node.id); + } else { + self.find_node(&node, ctx); + } } } } @@ -1171,7 +1172,7 @@ impl Discv4Service { } // get the next closest nodes, not yet queried nodes and start over. - let closest = ctx.closest(ALPHA); + let closest = ctx.closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id)); for closest in closest { let key = kad_key(closest.id); @@ -1380,10 +1381,7 @@ impl Discv4Service { } // trigger self lookup - if self.config.enable_lookup && - !self.is_lookup_in_progress() && - self.lookup_interval.poll_tick(cx).is_ready() - { + if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() { let target = self.lookup_rotator.next(&self.local_node_record.id); self.lookup_with(target, None); } @@ -1706,14 +1704,18 @@ impl LookupContext { self.inner.target.preimage().0 } - /// Returns an iterator over the closest nodes that are not queried yet. - fn closest(&self, num: usize) -> Vec { + /// Returns the closest nodes that have not been queried yet. + fn closest

(&self, num: usize, filter: P) -> Vec + where + P: FnMut(&NodeRecord) -> bool, + { self.inner .closest_nodes .borrow() .iter() .filter(|(_, node)| !node.queried) .map(|(_, n)| n.record) + .filter(filter) .take(num) .collect() } @@ -1727,15 +1729,24 @@ impl LookupContext { } } - /// Marks the node as queried - fn mark_queried(&self, id: PeerId) { + fn set_queried(&self, id: PeerId, val: bool) { if let Some((_, node)) = self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id) { - node.queried = true; + node.queried = val; } } + /// Marks the node as queried + fn mark_queried(&self, id: PeerId) { + self.set_queried(id, true) + } + + /// Marks the node as not queried + fn unmark_queried(&self, id: PeerId) { + self.set_queried(id, false) + } + /// Marks the node as responded fn mark_responded(&self, id: PeerId) { if let Some((_, node)) = @@ -2066,12 +2077,10 @@ mod tests { ); service.lookup_self(); - assert!(service.is_lookup_in_progress()); assert_eq!(service.pending_find_nodes.len(), 1); poll_fn(|cx| { let _ = service.poll(cx); - assert!(service.is_lookup_in_progress()); assert_eq!(service.pending_find_nodes.len(), 1); Poll::Ready(())