diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index 08131897a7..7dacaa93e4 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -98,9 +98,6 @@ impl SwarmDriver { self.record_metrics(Marker::FlaggedAsBadNode { flagged_by: &detected_by, }); - - // TODO: shall we terminate self after received such notifications - // from the majority close_group nodes around us? } else { error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us."); } diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 49e3f979bd..cd0875fa5e 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -434,8 +434,6 @@ impl Network { self.send_req_ignore_reply(request, *peer_id); } - filter_out_bad_nodes(&mut all_costs, record_address); - get_fees_from_store_cost_responses(all_costs) } @@ -1199,32 +1197,6 @@ fn get_fees_from_store_cost_responses( Ok((payee_id, payee.1, payee.2)) } -/// According to the bad_nodes list collected via quotes, -/// candidate that received majority votes from others shall be ignored. -fn filter_out_bad_nodes( - all_costs: &mut Vec<(NetworkAddress, RewardsAddress, PaymentQuote)>, - record_address: NetworkAddress, -) { - let mut bad_node_votes: BTreeMap = BTreeMap::new(); - for (peer_addr, _reward_addr, quote) in all_costs.iter() { - let bad_nodes: Vec = match rmp_serde::from_slice("e.bad_nodes) { - Ok(bad_nodes) => bad_nodes, - Err(err) => { - error!("For record {record_address:?}, failed to recover bad_nodes from quote of {peer_addr:?} with error {err:?}"); - continue; - } - }; - for bad_node in bad_nodes { - let entry = bad_node_votes.entry(bad_node).or_default(); - *entry += 1; - } - } - all_costs.retain(|(peer_addr, _, _)| { - let entry = bad_node_votes.entry(peer_addr.clone()).or_default(); - *entry < close_group_majority() - }); -} - /// Get the value of the provided Quorum pub fn get_quorum_value(quorum: &Quorum) -> usize { match quorum { diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 878b10f19c..4fb6294727 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -23,9 +23,7 @@ use rand::{ use sn_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use sn_networking::MetricsRegistries; -use sn_networking::{ - close_group_majority, Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver, -}; +use sn_networking::{Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver}; use sn_protocol::{ error::Error as ProtocolError, messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response}, @@ -44,7 +42,7 @@ use std::{ }; use tokio::{ sync::mpsc::Receiver, - task::{spawn, JoinHandle, JoinSet}, + task::{spawn, JoinSet}, }; use sn_evm::EvmNetwork; @@ -53,10 +51,6 @@ use sn_evm::EvmNetwork; /// This is the max time it should take. Minimum interval at any node will be half this pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180; -/// Interval to trigger bad node detection. -/// This is the max time it should take. Minimum interval at any node will be half this -const PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S: u64 = 600; - /// Interval to trigger storage challenge. /// This is the max time it should take. Minimum interval at any node will be half this const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200; @@ -269,19 +263,6 @@ impl Node { let mut replication_interval = tokio::time::interval(replication_interval_time); let _ = replication_interval.tick().await; // first tick completes immediately - // use a random timeout to ensure not sync when transmit messages. - let bad_nodes_check_interval: u64 = rng.gen_range( - PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S / 2 - ..PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S, - ); - let bad_nodes_check_time = Duration::from_secs(bad_nodes_check_interval); - debug!("BadNodesCheck interval set to {bad_nodes_check_time:?}"); - - let mut bad_nodes_check_interval = tokio::time::interval(bad_nodes_check_time); - let _ = bad_nodes_check_interval.tick().await; // first tick completes immediately - - let mut rolling_index = 0; - let mut uptime_metrics_update_interval = tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL); let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately @@ -334,24 +315,6 @@ impl Node { trace!("Periodic replication took {:?}", start.elapsed()); }); } - // runs every bad_nodes_check_time time - _ = bad_nodes_check_interval.tick() => { - let start = Instant::now(); - debug!("Periodic bad_nodes check triggered"); - let network = self.network().clone(); - self.record_metrics(Marker::IntervalBadNodesCheckTriggered); - - let _handle = spawn(async move { - Self::try_bad_nodes_check(network, rolling_index).await; - trace!("Periodic bad_nodes check took {:?}", start.elapsed()); - }); - - if rolling_index == 511 { - rolling_index = 0; - } else { - rolling_index += 1; - } - } _ = uptime_metrics_update_interval.tick() => { #[cfg(feature = "open-metrics")] if let Some(metrics_recorder) = self.metrics_recorder() { @@ -527,58 +490,6 @@ impl Node { ); } - // Query close_group peers to the target to verifify whether the target is bad_node - // Returns true when it is a bad_node, otherwise false - async fn close_nodes_shunning_peer(network: &Network, peer_id: PeerId) -> bool { - // using `client` to exclude self - let closest_peers = match network - .client_get_all_close_peers_in_range_or_close_group(&NetworkAddress::from_peer(peer_id)) - .await - { - Ok(peers) => peers, - Err(err) => { - error!("Failed to finding closest_peers to {peer_id:?} client_get_closest_peers errored: {err:?}"); - return false; - } - }; - - // Query the peer status from the close_group to the peer, - // raise alert as long as getting alerts from majority(3) of the close_group. - let req = Request::Query(Query::CheckNodeInProblem(NetworkAddress::from_peer( - peer_id, - ))); - let mut handles = Vec::new(); - for peer in closest_peers { - let req_copy = req.clone(); - let network_copy = network.clone(); - let handle: JoinHandle = spawn(async move { - debug!("getting node_status of {peer_id:?} from {peer:?}"); - if let Ok(resp) = network_copy.send_request(req_copy, peer).await { - match resp { - Response::Query(QueryResponse::CheckNodeInProblem { - is_in_trouble, - .. - }) => is_in_trouble, - other => { - error!("Cannot get node status of {peer_id:?} from node {peer:?}, with response {other:?}"); - false - } - } - } else { - false - } - }); - handles.push(handle); - } - let results: Vec<_> = futures::future::join_all(handles).await; - - results - .iter() - .filter(|r| *r.as_ref().unwrap_or(&false)) - .count() - >= close_group_majority() - } - // Handle the response that was not awaited at the call site fn handle_response(&self, response: Response) -> Result<()> { match response { @@ -888,62 +799,6 @@ impl Node { start.elapsed() ); } - - async fn try_bad_nodes_check(network: Network, rolling_index: usize) { - if let Ok(kbuckets) = network.get_kbuckets().await { - let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum(); - if total_peers > 100 { - // The `rolling_index` is rotating among 0-511, - // meanwhile the returned `kbuckets` only holding non-empty buckets. - // Hence using the `remainder` calculate to achieve a rolling check. - // A further `remainder of 2` is used to allow `upper or lower part` - // index within a bucket, to further reduce the concurrent queries. - let mut bucket_index = (rolling_index / 2) % kbuckets.len(); - let part_index = rolling_index % 2; - - for (distance, peers) in kbuckets.iter() { - if bucket_index == 0 { - let peers_to_query = if peers.len() > 10 { - let split_index = peers.len() / 2; - let (left, right) = peers.split_at(split_index); - if part_index == 0 { - left - } else { - right - } - } else { - peers - }; - - debug!( - "Undertake bad_nodes check against bucket {distance} having {} peers, {} candidates to be queried", - peers.len(), peers_to_query.len() - ); - for peer_id in peers_to_query { - let peer_id_clone = *peer_id; - let network_clone = network.clone(); - let _handle = spawn(async move { - let is_bad = - Self::close_nodes_shunning_peer(&network_clone, peer_id_clone) - .await; - if is_bad { - network_clone.record_node_issues( - peer_id_clone, - NodeIssue::CloseNodesShunning, - ); - } - }); - } - break; - } else { - bucket_index = bucket_index.saturating_sub(1); - } - } - } else { - debug!("Skip bad_nodes check as not having too many nodes in RT"); - } - } - } } async fn scoring_peer(