Skip to content

Commit

Permalink
fix(network): return references when sorting peers
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin authored and joshuef committed Oct 21, 2023
1 parent b33e609 commit 40cfd44
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
2 changes: 1 addition & 1 deletion sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ impl SwarmDriver {

// Margin of 2 to allow our RT being bit lagging.
match sort_peers_by_address(all_peers, target, CLOSE_GROUP_SIZE + 2) {
Ok(close_group) => close_group.contains(&self.self_peer_id),
Ok(close_group) => close_group.contains(&&self.self_peer_id),
Err(err) => {
warn!("Could not get sorted peers for {target:?} with error {err:?}");
true
Expand Down
7 changes: 4 additions & 3 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,9 @@ impl SwarmDriver {

// Check for changes in our close group
fn check_for_change_in_our_close_group(&mut self) -> Option<Vec<PeerId>> {
let all_peers = self.get_all_local_peers();

let new_closest_peers = {
let all_peers = self.get_all_local_peers();
sort_peers_by_address(
&all_peers,
&NetworkAddress::from_peer(self.self_peer_id),
Expand All @@ -838,9 +839,9 @@ impl SwarmDriver {
if !new_members.is_empty() {
debug!("The close group has been updated. The new members are {new_members:?}");
debug!("New close group: {new_closest_peers:?}");
self.close_group = new_closest_peers;
self.close_group = new_closest_peers.into_iter().cloned().collect();
let _ = self.update_record_distance_range();
Some(new_members)
Some(new_members.into_iter().cloned().collect())
} else {
None
}
Expand Down
22 changes: 13 additions & 9 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,22 @@ const PUT_RECORD_RETRIES: usize = 3;
/// Sort the provided peers by their distance to the given `NetworkAddress`.
/// Return with the closest expected number of entries if has.
#[allow(clippy::result_large_err)]
pub fn sort_peers_by_address(
peers: &[PeerId],
pub fn sort_peers_by_address<'a>(
peers: &'a [PeerId],
address: &NetworkAddress,
expected_entries: usize,
) -> Result<Vec<PeerId>> {
) -> Result<Vec<&'a PeerId>> {
sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries)
}

/// Sort the provided peers by their distance to the given `KBucketKey`.
/// Return with the closest expected number of entries if has.
#[allow(clippy::result_large_err)]
pub fn sort_peers_by_key<T>(
peers: &[PeerId],
pub fn sort_peers_by_key<'a, T>(
peers: &'a [PeerId],
key: &KBucketKey<T>,
expected_entries: usize,
) -> Result<Vec<PeerId>> {
) -> Result<Vec<&'a PeerId>> {
// Get the indices and sort them by indexing into the `peers` array.
let mut indices: Vec<usize> = (0..peers.len()).collect();
indices.sort_by(|&i, &j| {
Expand All @@ -112,10 +112,10 @@ pub fn sort_peers_by_key<T>(
}
});

let sorted_peers: Vec<PeerId> = indices
let sorted_peers: Vec<&PeerId> = indices
.iter()
.take(expected_entries)
.filter_map(|&i| peers.get(i).cloned())
.filter_map(|&i| peers.get(i))
.collect();

if CLOSE_GROUP_SIZE > sorted_peers.len() {
Expand Down Expand Up @@ -641,7 +641,11 @@ impl Network {

trace!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}");

sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)
let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?
.into_iter()
.cloned()
.collect();
Ok(closest_peers)
}

/// Send a `Request` to the provided set of peers and wait for their responses concurrently.
Expand Down
8 changes: 4 additions & 4 deletions sn_node/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,20 @@ impl Node {
sort_peers_by_address(&all_peers, &key, CLOSE_GROUP_SIZE + 1)?;
let sorted_peers_pretty_print: Vec<_> = sorted_based_on_key
.iter()
.map(|peer_id| {
.map(|&peer_id| {
format!(
"{peer_id:?}({:?})",
PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key())
)
})
.collect();

if sorted_based_on_key.contains(&peer_id) {
if sorted_based_on_key.contains(&&peer_id) {
trace!("replication: close for {key:?} are: {sorted_peers_pretty_print:?}");
let target_peer = if is_removal {
// For dead peer, only replicate to farthest close_group peer,
// when the dead peer was one of the close_group peers to the record.
if let Some(farthest_peer) = sorted_based_on_key.last() {
if let Some(&farthest_peer) = sorted_based_on_key.last() {
if *farthest_peer != peer_id {
*farthest_peer
} else {
Expand All @@ -93,7 +93,7 @@ impl Node {
}
} else {
// For new peer, always replicate to it when it is close_group of the record.
if Some(&peer_id) != sorted_based_on_key.last() {
if Some(&&peer_id) != sorted_based_on_key.last() {
peer_id
} else {
continue;
Expand Down
3 changes: 2 additions & 1 deletion sn_node/tests/verify_data_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fn print_node_close_groups(all_peers: &[PeerId]) {
sort_peers_by_key(&all_peers, &key, CLOSE_GROUP_SIZE).expect("failed to sort peer");
let closest_peers_idx = closest_peers
.iter()
.map(|peer| all_peers.iter().position(|p| p == peer).unwrap() + 1)
.map(|&&peer| all_peers.iter().position(|&p| p == peer).unwrap() + 1)
.collect::<Vec<_>>();
println!("Close for {node_index}: {peer:?} are {closest_peers_idx:?}");
}
Expand Down Expand Up @@ -188,6 +188,7 @@ async fn verify_location(record_holders: &RecordHolders, all_peers: &[PeerId]) -
let record_key = KBucketKey::from(key.to_vec());
let expected_holders = sort_peers_by_key(all_peers, &record_key, CLOSE_GROUP_SIZE)?
.into_iter()
.cloned()
.collect::<BTreeSet<_>>();

let actual_holders = actual_holders_idx
Expand Down

0 comments on commit 40cfd44

Please sign in to comment.