From 107918cc0c90d9b09e6222dc2073dc7940f8a08e Mon Sep 17 00:00:00 2001 From: qima Date: Fri, 22 Nov 2024 00:18:44 +0800 Subject: [PATCH] feat: implement the initial scoring system --- Cargo.lock | 1 + sn_networking/src/event/mod.rs | 14 --- sn_node/Cargo.toml | 1 + sn_node/src/node.rs | 207 +++++++++++++++++++++++---------- 4 files changed, 146 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ff28dc1c5..e5b4b21ba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8733,6 +8733,7 @@ dependencies = [ "hex 0.4.3", "itertools 0.12.1", "libp2p 0.54.1", + "num-traits", "prometheus-client", "prost 0.9.0", "pyo3", diff --git a/sn_networking/src/event/mod.rs b/sn_networking/src/event/mod.rs index e1d8074d29..67f7c41c0d 100644 --- a/sn_networking/src/event/mod.rs +++ b/sn_networking/src/event/mod.rs @@ -143,11 +143,6 @@ pub enum NetworkEvent { FailedToFetchHolders(BTreeSet), /// Quotes to be verified QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> }, - /// Carry out chunk proof check against the specified record and peer - ChunkProofVerification { - peer_id: PeerId, - key_to_verify: NetworkAddress, - }, } /// Terminate node for the following reason @@ -206,15 +201,6 @@ impl Debug for NetworkEvent { quotes.len() ) } - NetworkEvent::ChunkProofVerification { - peer_id, - key_to_verify: keys_to_verify, - } => { - write!( - f, - "NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})" - ) - } } } } diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index 2d98a27ef8..980dc84d76 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -44,6 +44,7 @@ futures = "~0.3.13" hex = "~0.4.3" itertools = "~0.12.1" libp2p = { version = "0.54.1", features = ["tokio", "dns", "kad", "macros"] } +num-traits = "0.2" prometheus-client = { version = "0.22", optional = true } # watch out updating this, protoc compiler needs to be installed on all build systems # arm builds + musl are very problematic diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 09103d923a..29bb5ed0f5 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -15,6 +15,7 @@ use crate::RunningNode; use bytes::Bytes; use itertools::Itertools; use libp2p::{identity::Keypair, Multiaddr, PeerId}; +use num_traits::cast::ToPrimitive; use rand::{ rngs::{OsRng, StdRng}, thread_rng, Rng, SeedableRng, @@ -30,6 +31,7 @@ use sn_protocol::{ NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use std::{ + collections::HashMap, net::SocketAddr, path::PathBuf, sync::{ @@ -38,7 +40,7 @@ use std::{ }, time::Duration, }; -use tokio::{sync::mpsc::Receiver, task::spawn}; +use tokio::{sync::mpsc::Receiver, task::{spawn, JoinSet}}; use sn_evm::EvmNetwork; @@ -56,6 +58,16 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); /// Interval to clean up unrelevant records const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600); +/// Highest score to achieve from each metric sub-sector during StorageChallenge. +const HIGHEST_SCORE: usize = 100; + +/// Any nodes bearing a score below this shall be considered as bad. +/// Max is to be 100 * 100 +const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 2000; + +/// in ms, expecting average StorageChallenge complete time to be around 500ms. +const TIME_STEP: usize = 100; + /// Helper to build and run a Node pub struct NodeBuilder { identity_keypair: Keypair, @@ -467,26 +479,6 @@ impl Node { quotes_verification(&network, quotes).await; }); } - NetworkEvent::ChunkProofVerification { - peer_id, - key_to_verify, - } => { - event_header = "ChunkProofVerification"; - let network = self.network().clone(); - - debug!("Going to carry out storage existence check against peer {peer_id:?}"); - - let _handle = spawn(async move { - if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await { - return; - } - info!("Peer {peer_id:?} failed storage existence challenge."); - // TODO: shall challenge failure immediately triggers the node to be removed? - // or to lower connection score once feature introduced. - // If score falls too low, sever connection. - network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); - }); - } } trace!( @@ -634,7 +626,7 @@ impl Node { nonce: Nonce, difficulty: usize, ) -> Vec<(NetworkAddress, Result)> { - info!("Received StorageChallenge targeting {key:?} with difficulty level of {difficulty}."); + let start = Instant::now(); let mut results = vec![]; if difficulty == 1 { // Client checking existence of published chunk. @@ -682,8 +674,8 @@ impl Node { } info!( - "Respond with {} answers to the StorageChallenge targeting {key:?}.", - results.len() + "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}", + results.len(), start.elapsed() ); results @@ -693,6 +685,7 @@ impl Node { /// and randomly pick one as the verification candidate. /// This will challenge all closest peers at once. async fn storage_challenge(network: Network) { + let start = Instant::now(); let closest_peers: Vec = if let Ok(closest_peers) = network.get_closest_k_value_local_peers().await { closest_peers @@ -712,7 +705,7 @@ impl Node { return; } - let verify_candidates: Vec = + let mut verify_candidates: Vec = if let Ok(all_keys) = network.get_all_local_record_addresses().await { all_keys .iter() @@ -734,72 +727,160 @@ impl Node { return; } - info!("Starting node StorageChallenge against neighbours!"); + let index: usize = OsRng.gen_range(0..num_of_targets); + let target = verify_candidates[index].clone(); + // TODO: workload shall be dynamically deduced from resource usage + let difficulty = CLOSE_GROUP_SIZE; + verify_candidates.sort_by_key(|addr| target.distance(addr)); + let expected_targets = verify_candidates.into_iter().take(difficulty); + let nonce: Nonce = thread_rng().gen::(); + let mut expected_proofs = HashMap::new(); + for addr in expected_targets { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { + let expected_proof = ChunkProof::new(&record.value, nonce); + let _ = expected_proofs.insert(addr, expected_proof); + } else { + error!("Local record {addr:?} cann't be loaded from disk."); + } + } + let request = Request::Query(Query::GetChunkExistenceProof { + key: target.clone(), + nonce, + difficulty, + }); - // TODO: launch the challenges parrallely, so that a scoring scheme can be utilized. + let mut tasks = JoinSet::new(); for peer_id in closest_peers { if peer_id == network.peer_id() { continue; } + let network_clone = network.clone(); + let request_clone = request.clone(); + let expected_proofs_clone = expected_proofs.clone(); + let _ = tasks.spawn(async move { + let res = + scoring_peer(network_clone, peer_id, request_clone, expected_proofs_clone) + .await; + (peer_id, res) + }); + } - let index: usize = OsRng.gen_range(0..num_of_targets); - if !chunk_proof_verify_peer(&network, peer_id, &verify_candidates[index]).await { - info!("Peer {peer_id:?} failed storage challenge."); - // TODO: shall the challenge failure immediately triggers the node to be removed? - network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); + while let Some(res) = tasks.join_next().await { + match res { + Ok((peer_id, score)) => { + if score < MIN_ACCEPTABLE_HEALTHY_SCORE { + info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."); + // TODO: shall the challenge failure immediately triggers the node to be removed? + network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); + } + } + Err(e) => { + info!("StorageChallenge task completed with error {e:?}"); + } } } - info!("Completed node StorageChallenge against neighbours!"); + info!( + "Completed node StorageChallenge against neighbours in {:?}!", + start.elapsed() + ); } } -async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { - let nonce: Nonce = thread_rng().gen::(); - - let request = Request::Query(Query::GetChunkExistenceProof { - key: key.clone(), - nonce, - difficulty: CLOSE_GROUP_SIZE, - }); - +async fn scoring_peer( + network: Network, + peer_id: PeerId, + request: Request, + expected_proofs: HashMap, +) -> usize { + let start = Instant::now(); let responses = network .send_and_get_responses(&[peer_id], &request, true) .await; - // TODO: cross check with local knowledge (i.e. the claimed closest shall match locals) - // this also prevent peer falsely give empty or non-existent answers. - if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) = responses.get(&peer_id) { if answers.is_empty() { info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge."); - return false; + return 0; } + let elapsed = start.elapsed(); + + let mut received_proofs = vec![]; for (addr, proof) in answers { if let Ok(proof) = proof { - if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { - let expected_proof = ChunkProof::new(&record.value, nonce); - // Any wrong answer shall be considered as a failure - if *proof != expected_proof { - return false; - } - } else { - debug!( - "Could not get ChunkProof for {addr:?} as we don't have the record locally." - ); - } - } else { - debug!( - "Could not verify answer of {addr:?} from {peer_id:?} as responded with {proof:?}" - ); + received_proofs.push((addr.clone(), proof.clone())); } } + + let score = mark_peer(elapsed, received_proofs, &expected_proofs); + info!( + "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.", + answers.len() + ); + score } else { info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error."); - return false; + 0 } +} + +// Based on following metrics: +// * the duration +// * is there false answer +// * percentage of correct answers among the expected closest +// The higher the score, the better confidence on the peer +fn mark_peer( + duration: Duration, + answers: Vec<(NetworkAddress, ChunkProof)>, + expected_proofs: &HashMap, +) -> usize { + let duration_score = duration_score_scheme(duration); + let challenge_score = challenge_score_scheme(answers, expected_proofs); + + duration_score * challenge_score +} + +// Less duration shall get higher score +fn duration_score_scheme(duration: Duration) -> usize { + // So far just a simple stepped scheme, capped by HIGHEST_SCORE + let in_ms = if let Some(value) = duration.as_millis().to_usize() { + value + } else { + info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms."); + 1000 + }; - true + let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP); + HIGHEST_SCORE - step +} + +// Any false answer shall result in 0 score immediately +fn challenge_score_scheme( + answers: Vec<(NetworkAddress, ChunkProof)>, + expected_proofs: &HashMap, +) -> usize { + let mut correct_answers = 0; + for (addr, chunk_proof) in answers { + if let Some(expected_proof) = expected_proofs.get(&addr) { + if expected_proof.verify(&chunk_proof) { + correct_answers += 1; + } else { + info!("Spot a false answer to the challenge regarding {addr:?}"); + // Any false answer shall result in 0 score immediately + return 0; + } + } + } + // TODO: For those answers not among the expected_proofs, + // it could be due to having different knowledge of records to us. + // shall we: + // * set the target being close to us, so that neighbours sharing same knowledge in higher chance + // * fetch from local to testify + // * fetch from network to testify + std::cmp::min( + HIGHEST_SCORE, + HIGHEST_SCORE * correct_answers / expected_proofs.len(), + ) }