Skip to content

Commit

Permalink
feat(discovery): use the results of the get_closest_query
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Nov 24, 2023
1 parent 7c50c35 commit abf4976
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 18 deletions.
8 changes: 6 additions & 2 deletions sn_networking/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::SwarmDriver;
use crate::{driver::PendingGetClosestType, SwarmDriver};
use std::time::{Duration, Instant};
use tokio::time::Interval;

Expand Down Expand Up @@ -53,11 +53,15 @@ impl SwarmDriver {
// The query is just to trigger the network discovery,
// hence no need to wait for a result.
for addr in self.network_discovery_candidates.candidates() {
let _ = self
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_peers(addr.as_bytes());
let _ = self.pending_get_closest_peers.insert(
query_id,
(PendingGetClosestType::NetworkDiscovery, Default::default()),
);
}
self.network_discovery_candidates
.try_generate_new_candidates();
Expand Down
12 changes: 8 additions & 4 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
driver::SwarmDriver,
driver::{PendingGetClosestType, SwarmDriver},
error::{Error, Result},
sort_peers_by_address, GetQuorum, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
REPLICATE_RANGE,
Expand Down Expand Up @@ -512,9 +512,13 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.get_closest_peers(key.as_bytes());
let _ = self
.pending_get_closest_peers
.insert(query_id, (sender, Default::default()));
let _ = self.pending_get_closest_peers.insert(
query_id,
(
PendingGetClosestType::FunctionCall(sender),
Default::default(),
),
);
}
SwarmCmd::GetAllLocalPeers { sender } => {
let _ = sender.send(self.get_all_local_peers());
Expand Down
10 changes: 9 additions & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ use tracing::warn;
/// List of expected record holders to be verified.
pub(super) type ExpectedHoldersList = HashSet<PeerId>;

type PendingGetClosest = HashMap<QueryId, (oneshot::Sender<HashSet<PeerId>>, HashSet<PeerId>)>;
/// The ways in which the Get Closest queries are used.
pub(crate) enum PendingGetClosestType {
/// The network discovery method is present at the networking layer
/// Thus we can just process the queries made by NetworkDiscovery without using any channels
NetworkDiscovery,
/// These are queries made by a function at the upper layers and contains a channel to send the result back.
FunctionCall(oneshot::Sender<HashSet<PeerId>>),
}
type PendingGetClosest = HashMap<QueryId, (PendingGetClosestType, HashSet<PeerId>)>;
type PendingGetRecord = HashMap<
QueryId,
(
Expand Down
34 changes: 24 additions & 10 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use crate::{
close_group_majority,
driver::{truncate_patch_version, SwarmDriver},
driver::{truncate_patch_version, PendingGetClosestType, SwarmDriver},
error::{Error, Result},
multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, GetQuorum, CLOSE_GROUP_SIZE,
};
Expand Down Expand Up @@ -606,7 +606,7 @@ impl SwarmDriver {
"Query task {id:?} returned with peers {closest_peers:?}, {stats:?} - {step:?}"
);

let (sender, mut current_closest) =
let (get_closest_type, mut current_closest) =
self.pending_get_closest_peers.remove(&id).ok_or_else(|| {
trace!(
"Can't locate query task {id:?}, it has likely been completed already."
Expand All @@ -621,13 +621,20 @@ impl SwarmDriver {
let new_peers: HashSet<PeerId> = closest_peers.peers.clone().into_iter().collect();
current_closest.extend(new_peers);
if current_closest.len() >= usize::from(K_VALUE) || step.last {
sender
.send(current_closest)
.map_err(|_| Error::InternalMsgChannelDropped)?;
match get_closest_type {
PendingGetClosestType::NetworkDiscovery => self
.network_discovery_candidates
.handle_get_closest_query(current_closest),
PendingGetClosestType::FunctionCall(sender) => {
sender
.send(current_closest)
.map_err(|_| Error::InternalMsgChannelDropped)?;
}
}
} else {
let _ = self
.pending_get_closest_peers
.insert(id, (sender, current_closest));
.insert(id, (get_closest_type, current_closest));
}
}
// Handle GetClosestPeers timeouts
Expand All @@ -640,7 +647,7 @@ impl SwarmDriver {
event_string = "kad_event::get_closest_peers_err";
error!("GetClosest Query task {id:?} errored with {err:?}, {stats:?} - {step:?}");

let (sender, mut current_closest) =
let (get_closest_type, mut current_closest) =
self.pending_get_closest_peers.remove(&id).ok_or_else(|| {
trace!(
"Can't locate query task {id:?}, it has likely been completed already."
Expand All @@ -657,9 +664,16 @@ impl SwarmDriver {
}
}

sender
.send(current_closest)
.map_err(|_| Error::InternalMsgChannelDropped)?;
match get_closest_type {
PendingGetClosestType::NetworkDiscovery => self
.network_discovery_candidates
.handle_get_closest_query(current_closest),
PendingGetClosestType::FunctionCall(sender) => {
sender
.send(current_closest)
.map_err(|_| Error::InternalMsgChannelDropped)?;
}
}
}

// For `get_record` returning behaviour:
Expand Down
32 changes: 31 additions & 1 deletion sn_networking/src/network_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use libp2p::{kad::KBucketKey, PeerId};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sn_protocol::NetworkAddress;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
time::Instant,
};

Expand Down Expand Up @@ -91,6 +91,36 @@ impl NetworkDiscoveryCandidates {
.filter_map(|candidates| candidates.front())
}

pub(crate) fn handle_get_closest_query(&mut self, closest_peers: HashSet<PeerId>) {
let now = Instant::now();
for peer in closest_peers {
let peer = NetworkAddress::from_peer(peer);
let peer_key = peer.as_kbucket_key();
if let Some(ilog2_distance) = peer_key.distance(&self.self_key).ilog2() {
match self.candidates.entry(ilog2_distance) {
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
// extra check to make sure we don't insert the same peer again
if entry.len() >= MAX_PEERS_PER_BUCKET && !entry.contains(&peer) {
// pop the front (as it might have been already used for querying and insert the new one at the back
let _ = entry.pop_front();
entry.push_back(peer);
} else {
entry.push_back(peer);
}
}
Entry::Vacant(entry) => {
let _ = entry.insert(VecDeque::from([peer]));
}
}
}
}
trace!(
"It took {:?} to NetworkDiscovery::handle get closest query",
now.elapsed()
);
}

fn generate_candidates(
self_key: &KBucketKey<PeerId>,
num_to_generate: usize,
Expand Down

0 comments on commit abf4976

Please sign in to comment.