diff --git a/Cargo.lock b/Cargo.lock index 0ff28dc1c5..e5b4b21ba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8733,6 +8733,7 @@ dependencies = [ "hex 0.4.3", "itertools 0.12.1", "libp2p 0.54.1", + "num-traits", "prometheus-client", "prost 0.9.0", "pyo3", diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 43a5525ccf..2afa0b0701 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -768,7 +768,7 @@ fn check_and_wipe_storage_dir_if_necessary( // * the storage_dir shall be wiped out // * the version file shall be updated if cur_version_str != prev_version_str { - warn!("Trying to wipe out storege dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}"); + warn!("Trying to wipe out storage dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}"); let _ = fs::remove_dir_all(storage_dir_path); let mut file = fs::OpenOptions::new() diff --git a/sn_networking/src/event/mod.rs b/sn_networking/src/event/mod.rs index e1d8074d29..67f7c41c0d 100644 --- a/sn_networking/src/event/mod.rs +++ b/sn_networking/src/event/mod.rs @@ -143,11 +143,6 @@ pub enum NetworkEvent { FailedToFetchHolders(BTreeSet), /// Quotes to be verified QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> }, - /// Carry out chunk proof check against the specified record and peer - ChunkProofVerification { - peer_id: PeerId, - key_to_verify: NetworkAddress, - }, } /// Terminate node for the following reason @@ -206,15 +201,6 @@ impl Debug for NetworkEvent { quotes.len() ) } - NetworkEvent::ChunkProofVerification { - peer_id, - key_to_verify: keys_to_verify, - } => { - write!( - f, - "NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})" - ) - } } } } diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index a028d34129..7dacaa93e4 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -7,12 +7,10 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - cmd::NetworkSwarmCmd, log_markers::Marker, sort_peers_by_address, MsgResponder, NetworkError, - NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE, + cmd::NetworkSwarmCmd, log_markers::Marker, MsgResponder, NetworkError, NetworkEvent, + SwarmDriver, }; -use itertools::Itertools; use libp2p::request_response::{self, Message}; -use rand::{rngs::OsRng, thread_rng, Rng}; use sn_protocol::{ messages::{CmdResponse, Request, Response}, storage::RecordType, @@ -207,14 +205,10 @@ impl SwarmDriver { return; } - let more_than_one_key = incoming_keys.len() > 1; - - // On receive a replication_list from a close_group peer, we undertake two tasks: + // On receive a replication_list from a close_group peer, we undertake: // 1, For those keys that we don't have: // fetch them if close enough to us - // 2, For those keys that we have and supposed to be held by the sender as well: - // start chunk_proof check against a randomly selected chunk type record to the sender - // 3, For those spends that we have that differ in the hash, we fetch the other version + // 2, For those spends that we have that differ in the hash, we fetch the other version // and update our local copy. let all_keys = self .swarm @@ -230,103 +224,5 @@ impl SwarmDriver { } else { self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch)); } - - // Only trigger chunk_proof check based every X% of the time - let mut rng = thread_rng(); - // 5% probability - if more_than_one_key && rng.gen_bool(0.05) { - self.verify_peer_storage(sender.clone()); - - // In additon to verify the sender, we also verify a random close node. - // This is to avoid malicious node escaping the check by never send a replication_list. - // With further reduced probability of 1% (5% * 20%) - if rng.gen_bool(0.2) { - let close_group_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(CLOSE_GROUP_SIZE) - .collect_vec(); - if close_group_peers.len() == CLOSE_GROUP_SIZE { - loop { - let index: usize = OsRng.gen_range(0..close_group_peers.len()); - let candidate = NetworkAddress::from_peer(close_group_peers[index]); - if sender != candidate { - self.verify_peer_storage(candidate); - break; - } - } - } - } - } - } - - /// Check among all chunk type records that we have, select those close to the peer, - /// and randomly pick one as the verification candidate. - fn verify_peer_storage(&mut self, peer: NetworkAddress) { - let mut closest_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(20) - .collect_vec(); - closest_peers.push(self.self_peer_id); - - let target_peer = if let Some(peer_id) = peer.as_peer_id() { - peer_id - } else { - error!("Target {peer:?} is not a valid PeerId"); - return; - }; - - let all_keys = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .record_addresses_ref(); - - // Targeted chunk type record shall be expected within the close range from our perspective. - let mut verify_candidates: Vec = all_keys - .values() - .filter_map(|(addr, record_type)| { - if RecordType::Chunk == *record_type { - match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) { - Ok(close_group) => { - if close_group.contains(&&target_peer) { - Some(addr.clone()) - } else { - None - } - } - Err(err) => { - warn!("Could not get sorted peers for {addr:?} with error {err:?}"); - None - } - } - } else { - None - } - }) - .collect(); - - verify_candidates.sort_by_key(|a| peer.distance(a)); - - // To ensure the candidate must have to be held by the peer, - // we only carry out check when there are already certain amount of chunks uploaded - // AND choose candidate from certain reduced range. - if verify_candidates.len() > 50 { - let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2)); - self.send_event(NetworkEvent::ChunkProofVerification { - peer_id: target_peer, - key_to_verify: verify_candidates[index].clone(), - }); - } else { - debug!("No valid candidate to be checked against peer {peer:?}"); - } } } diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 74ea3cbd46..cd5c513fad 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -269,6 +269,7 @@ impl Network { } /// Get the Chunk existence proof from the close nodes to the provided chunk address. + /// This is to be used by client only to verify the success of the upload. pub async fn verify_chunk_existence( &self, chunk_address: NetworkAddress, @@ -304,6 +305,7 @@ impl Network { let request = Request::Query(Query::GetChunkExistenceProof { key: chunk_address.clone(), nonce, + difficulty: 1, }); let responses = self .send_and_get_responses(&close_nodes, &request, true) @@ -311,14 +313,22 @@ impl Network { let n_verified = responses .into_iter() .filter_map(|(peer, resp)| { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = + if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) = resp { - if expected_proof.verify(&proof) { - debug!("Got a valid ChunkProof from {peer:?}"); - Some(()) + if proofs.is_empty() { + warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty."); + None + } else if let Ok(ref proof) = proofs[0].1 { + if expected_proof.verify(proof) { + debug!("Got a valid ChunkProof from {peer:?}"); + Some(()) + } else { + warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + None + } } else { - warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1); None } } else { @@ -370,7 +380,12 @@ impl Network { return Err(NetworkError::NoStoreCostResponses); } - let request = Request::Query(Query::GetStoreCost(record_address.clone())); + // Client shall decide whether to carry out storage verification or not. + let request = Request::Query(Query::GetStoreCost { + key: record_address.clone(), + nonce: None, + difficulty: 0, + }); let responses = self .send_and_get_responses(&close_nodes, &request, true) .await; @@ -388,7 +403,11 @@ impl Network { quote: Ok(quote), payment_address, peer_address, + storage_proofs, }) => { + if !storage_proofs.is_empty() { + debug!("Storage proofing during GetStoreCost to be implemented."); + } // Check the quote itself is valid. if quote.cost != AttoTokens::from_u64(calculate_cost_for_records( @@ -406,7 +425,11 @@ impl Network { quote: Err(ProtocolError::RecordExists(_)), payment_address, peer_address, + storage_proofs, }) => { + if !storage_proofs.is_empty() { + debug!("Storage proofing during GetStoreCost to be implemented."); + } all_costs.push((peer_address, payment_address, PaymentQuote::zero())); } _ => { diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index 2d98a27ef8..980dc84d76 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -44,6 +44,7 @@ futures = "~0.3.13" hex = "~0.4.3" itertools = "~0.12.1" libp2p = { version = "0.54.1", features = ["tokio", "dns", "kad", "macros"] } +num-traits = "0.2" prometheus-client = { version = "0.22", optional = true } # watch out updating this, protoc compiler needs to be installed on all build systems # arm builds + musl are very problematic diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 22ec7e9336..bd4e31c36b 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -13,20 +13,25 @@ use super::{ use crate::metrics::NodeMetricsRecorder; use crate::RunningNode; use bytes::Bytes; +use itertools::Itertools; use libp2p::{identity::Keypair, Multiaddr, PeerId}; -use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; +use num_traits::cast::ToPrimitive; +use rand::{ + rngs::{OsRng, StdRng}, + thread_rng, Rng, SeedableRng, +}; use sn_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use sn_networking::MetricsRegistries; -use sn_networking::{ - Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver, -}; +use sn_networking::{Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver}; use sn_protocol::{ error::Error as ProtocolError, - messages::{ChunkProof, CmdResponse, Query, QueryResponse, Request, Response}, + messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response}, + storage::RecordType, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use std::{ + collections::HashMap, net::SocketAddr, path::PathBuf, sync::{ @@ -35,7 +40,10 @@ use std::{ }, time::Duration, }; -use tokio::{sync::mpsc::Receiver, task::spawn}; +use tokio::{ + sync::mpsc::Receiver, + task::{spawn, JoinSet}, +}; use sn_evm::EvmNetwork; @@ -43,12 +51,9 @@ use sn_evm::EvmNetwork; /// This is the max time it should take. Minimum interval at any node will be half this pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180; -/// Max number of attempts that chunk proof verification will be carried out against certain target, -/// before classifying peer as a bad peer. -const MAX_CHUNK_PROOF_VERIFY_ATTEMPTS: usize = 3; - -/// Interval between chunk proof verification to be retired against the same target. -const CHUNK_PROOF_VERIFY_RETRY_INTERVAL: Duration = Duration::from_secs(15); +/// Interval to trigger storage challenge. +/// This is the max time it should take. Minimum interval at any node will be half this +const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200; /// Interval to update the nodes uptime metric const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); @@ -56,6 +61,16 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); /// Interval to clean up unrelevant records const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600); +/// Highest score to achieve from each metric sub-sector during StorageChallenge. +const HIGHEST_SCORE: usize = 100; + +/// Any nodes bearing a score below this shall be considered as bad. +/// Max is to be 100 * 100 +const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 5000; + +/// in ms, expecting average StorageChallenge complete time to be around 250ms. +const TIME_STEP: usize = 20; + /// Helper to build and run a Node pub struct NodeBuilder { identity_keypair: Keypair, @@ -256,6 +271,17 @@ impl Node { tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL); let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately + // use a random neighbour storage challenge ticker to ensure + // neighbour do not carryout challenges at the same time + let storage_challenge_interval: u64 = + rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S); + let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval); + debug!("Storage challenge interval set to {storage_challenge_interval_time:?}"); + + let mut storage_challenge_interval = + tokio::time::interval(storage_challenge_interval_time); + let _ = storage_challenge_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -302,6 +328,17 @@ impl Node { Self::trigger_irrelevant_record_cleanup(network); }); } + // runs every storage_challenge_interval time + _ = storage_challenge_interval.tick() => { + let start = Instant::now(); + debug!("Periodic storage challenge triggered"); + let network = self.network().clone(); + + let _handle = spawn(async move { + Self::storage_challenge(network).await; + trace!("Periodic storage challenge took {:?}", start.elapsed()); + }); + } } } }); @@ -445,38 +482,6 @@ impl Node { quotes_verification(&network, quotes).await; }); } - NetworkEvent::ChunkProofVerification { - peer_id, - key_to_verify, - } => { - event_header = "ChunkProofVerification"; - let network = self.network().clone(); - - debug!("Going to verify chunk {key_to_verify} against peer {peer_id:?}"); - - let _handle = spawn(async move { - // To avoid the peer is in the process of getting the copy via replication, - // repeat the verification for couple of times (in case of error). - // Only report the node as bad when ALL the verification attempts failed. - let mut attempts = 0; - while attempts < MAX_CHUNK_PROOF_VERIFY_ATTEMPTS { - if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await { - return; - } - // Replication interval is 22s - 45s. - // Hence some re-try erquired to allow copies to spread out. - tokio::time::sleep(CHUNK_PROOF_VERIFY_RETRY_INTERVAL).await; - attempts += 1; - } - // Now ALL attempts failed, hence report the issue. - // Note this won't immediately trigger the node to be considered as BAD. - // Only the same peer accumulated three same issue - // within 5 mins will be considered as BAD. - // As the chunk_proof_check will be triggered every periodical replication, - // a low performed or cheaty peer will raise multiple issue alerts during it. - network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); - }); - } } trace!( @@ -509,13 +514,30 @@ impl Node { payment_address: RewardsAddress, ) -> Response { let resp: QueryResponse = match query { - Query::GetStoreCost(address) => { - debug!("Got GetStoreCost request for {address:?}"); - let record_key = address.to_record_key(); + Query::GetStoreCost { + key, + nonce, + difficulty, + } => { + debug!("Got GetStoreCost request for {key:?} with difficulty {difficulty}"); + let record_key = key.to_record_key(); let self_id = network.peer_id(); let store_cost = network.get_local_storecost(record_key.clone()).await; + let storage_proofs = if let Some(nonce) = nonce { + Self::respond_x_closest_record_proof( + network, + key.clone(), + nonce, + difficulty, + false, + ) + .await + } else { + vec![] + }; + match store_cost { Ok((cost, quoting_metrics, bad_nodes)) => { if cost == AttoTokens::zero() { @@ -525,19 +547,21 @@ impl Node { )), payment_address, peer_address: NetworkAddress::from_peer(self_id), + storage_proofs, } } else { QueryResponse::GetStoreCost { quote: Self::create_quote_for_storecost( network, cost, - &address, + &key, "ing_metrics, bad_nodes, &payment_address, ), payment_address, peer_address: NetworkAddress::from_peer(self_id), + storage_proofs, } } } @@ -545,6 +569,7 @@ impl Node { quote: Err(ProtocolError::GetStoreCostFailed), payment_address, peer_address: NetworkAddress::from_peer(self_id), + storage_proofs, }, } } @@ -584,21 +609,19 @@ impl Node { QueryResponse::GetReplicatedRecord(result) } - Query::GetChunkExistenceProof { key, nonce } => { - debug!("Got GetChunkExistenceProof for chunk {key:?}"); - - let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); - if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { - let proof = ChunkProof::new(&record.value, nonce); - debug!("Chunk proof for {key:?} is {proof:?}"); - result = Ok(proof) - } else { - debug!( - "Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - } - - QueryResponse::GetChunkExistenceProof(result) + Query::GetChunkExistenceProof { + key, + nonce, + difficulty, + } => { + debug!( + "Got GetChunkExistenceProof targeting chunk {key:?} with {difficulty} answers." + ); + + QueryResponse::GetChunkExistenceProof( + Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true) + .await, + ) } Query::CheckNodeInProblem(target_address) => { debug!("Got CheckNodeInProblem for peer {target_address:?}"); @@ -620,61 +643,278 @@ impl Node { }; Response::Query(resp) } -} -async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { - let check_passed = if let Ok(Some(record)) = - network.get_local_record(&key.to_record_key()).await - { - let nonce = thread_rng().gen::(); - let expected_proof = ChunkProof::new(&record.value, nonce); - debug!("To verify peer {peer_id:?}, chunk_proof for {key:?} is {expected_proof:?}"); + // Nodes only check ChunkProof each other, to avoid `multi-version` issue + // Client check proof against all records, as have to fetch from network anyway. + async fn respond_x_closest_record_proof( + network: &Network, + key: NetworkAddress, + nonce: Nonce, + difficulty: usize, + chunk_only: bool, + ) -> Vec<(NetworkAddress, Result)> { + let start = Instant::now(); + let mut results = vec![]; + if difficulty == 1 { + // Client checking existence of published chunk. + let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); + if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + result = Ok(proof) + } else { + debug!("Could not get ChunkProof for {key:?} as we don't have the record locally."); + } + results.push((key.clone(), result)); + } else { + let all_local_records = network.get_all_local_record_addresses().await; + + if let Ok(all_local_records) = all_local_records { + let mut all_chunk_addrs: Vec<_> = if chunk_only { + all_local_records + .iter() + .filter_map(|(addr, record_type)| { + if *record_type == RecordType::Chunk { + Some(addr.clone()) + } else { + None + } + }) + .collect() + } else { + all_local_records.keys().cloned().collect() + }; + + // Sort by distance and only take first X closest entries + all_chunk_addrs.sort_by_key(|addr| key.distance(addr)); + + // TODO: this shall be deduced from resource usage dynamically + let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE); + + for addr in all_chunk_addrs.iter().take(workload_factor) { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await + { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + results.push((addr.clone(), Ok(proof))); + } + } + } + } + + info!( + "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}", + results.len(), start.elapsed() + ); + + results + } + + /// Check among all chunk type records that we have, + /// and randomly pick one as the verification candidate. + /// This will challenge all closest peers at once. + async fn storage_challenge(network: Network) { + let start = Instant::now(); + let closest_peers: Vec = + if let Ok(closest_peers) = network.get_closest_k_value_local_peers().await { + closest_peers + .into_iter() + .take(CLOSE_GROUP_SIZE) + .collect_vec() + } else { + error!("Cannot get local neighbours"); + return; + }; + if closest_peers.len() < CLOSE_GROUP_SIZE { + debug!( + "Not enough neighbours ({}/{}) to carry out storage challenge.", + closest_peers.len(), + CLOSE_GROUP_SIZE + ); + return; + } + + let mut verify_candidates: Vec = + if let Ok(all_keys) = network.get_all_local_record_addresses().await { + all_keys + .iter() + .filter_map(|(addr, record_type)| { + if RecordType::Chunk == *record_type { + Some(addr.clone()) + } else { + None + } + }) + .collect() + } else { + error!("Failed to get local record addresses."); + return; + }; + let num_of_targets = verify_candidates.len(); + if num_of_targets < 50 { + debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours."); + return; + } + + // To ensure the neighbours sharing same knowledge as to us, + // The target is choosen to be not far from us. + let self_addr = NetworkAddress::from_peer(network.peer_id()); + verify_candidates.sort_by_key(|addr| self_addr.distance(addr)); + let index: usize = OsRng.gen_range(0..num_of_targets / 2); + let target = verify_candidates[index].clone(); + // TODO: workload shall be dynamically deduced from resource usage + let difficulty = CLOSE_GROUP_SIZE; + verify_candidates.sort_by_key(|addr| target.distance(addr)); + let expected_targets = verify_candidates.into_iter().take(difficulty); + let nonce: Nonce = thread_rng().gen::(); + let mut expected_proofs = HashMap::new(); + for addr in expected_targets { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { + let expected_proof = ChunkProof::new(&record.value, nonce); + let _ = expected_proofs.insert(addr, expected_proof); + } else { + error!("Local record {addr:?} cann't be loaded from disk."); + } + } let request = Request::Query(Query::GetChunkExistenceProof { - key: key.clone(), + key: target.clone(), nonce, + difficulty, }); - let responses = network - .send_and_get_responses(&[peer_id], &request, true) - .await; - let n_verified = responses - .into_iter() - .filter_map(|(peer, resp)| received_valid_chunk_proof(key, &expected_proof, peer, resp)) - .count(); - - n_verified >= 1 - } else { - error!( - "To verify peer {peer_id:?} Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - true - }; - if !check_passed { - return false; - } + let mut tasks = JoinSet::new(); + for peer_id in closest_peers { + if peer_id == network.peer_id() { + continue; + } + let network_clone = network.clone(); + let request_clone = request.clone(); + let expected_proofs_clone = expected_proofs.clone(); + let _ = tasks.spawn(async move { + let res = + scoring_peer(network_clone, peer_id, request_clone, expected_proofs_clone) + .await; + (peer_id, res) + }); + } + + while let Some(res) = tasks.join_next().await { + match res { + Ok((peer_id, score)) => { + if score < MIN_ACCEPTABLE_HEALTHY_SCORE { + info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."); + // TODO: shall the challenge failure immediately triggers the node to be removed? + network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); + } + } + Err(e) => { + info!("StorageChallenge task completed with error {e:?}"); + } + } + } - true + info!( + "Completed node StorageChallenge against neighbours in {:?}!", + start.elapsed() + ); + } } -fn received_valid_chunk_proof( - key: &NetworkAddress, - expected_proof: &ChunkProof, - peer: PeerId, - resp: Result, -) -> Option<()> { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = resp { - if expected_proof.verify(&proof) { - debug!( - "Got a valid ChunkProof of {key:?} from {peer:?}, during peer chunk proof check." - ); - Some(()) - } else { - warn!("When verify {peer:?} with ChunkProof of {key:?}, the chunk might have been tampered?"); - None +async fn scoring_peer( + network: Network, + peer_id: PeerId, + request: Request, + expected_proofs: HashMap, +) -> usize { + let start = Instant::now(); + let responses = network + .send_and_get_responses(&[peer_id], &request, true) + .await; + + if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) = + responses.get(&peer_id) + { + if answers.is_empty() { + info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge."); + return 0; + } + let elapsed = start.elapsed(); + + let mut received_proofs = vec![]; + for (addr, proof) in answers { + if let Ok(proof) = proof { + received_proofs.push((addr.clone(), proof.clone())); + } } + + let score = mark_peer(elapsed, received_proofs, &expected_proofs); + info!( + "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.", + answers.len() + ); + score } else { - debug!("Did not get a valid response for the ChunkProof from {peer:?}"); - None + info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error."); + 0 + } +} + +// Based on following metrics: +// * the duration +// * is there false answer +// * percentage of correct answers among the expected closest +// The higher the score, the better confidence on the peer +fn mark_peer( + duration: Duration, + answers: Vec<(NetworkAddress, ChunkProof)>, + expected_proofs: &HashMap, +) -> usize { + let duration_score = duration_score_scheme(duration); + let challenge_score = challenge_score_scheme(answers, expected_proofs); + + duration_score * challenge_score +} + +// Less duration shall get higher score +fn duration_score_scheme(duration: Duration) -> usize { + // So far just a simple stepped scheme, capped by HIGHEST_SCORE + let in_ms = if let Some(value) = duration.as_millis().to_usize() { + value + } else { + info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms."); + 1000 + }; + + let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP); + HIGHEST_SCORE - step +} + +// Any false answer shall result in 0 score immediately +fn challenge_score_scheme( + answers: Vec<(NetworkAddress, ChunkProof)>, + expected_proofs: &HashMap, +) -> usize { + let mut correct_answers = 0; + for (addr, chunk_proof) in answers { + if let Some(expected_proof) = expected_proofs.get(&addr) { + if expected_proof.verify(&chunk_proof) { + correct_answers += 1; + } else { + info!("Spot a false answer to the challenge regarding {addr:?}"); + // Any false answer shall result in 0 score immediately + return 0; + } + } } + // TODO: For those answers not among the expected_proofs, + // it could be due to having different knowledge of records to us. + // shall we: + // * set the target being close to us, so that neighbours sharing same knowledge in higher chance + // * fetch from local to testify + // * fetch from network to testify + std::cmp::min( + HIGHEST_SCORE, + HIGHEST_SCORE * correct_answers / expected_proofs.len(), + ) } diff --git a/sn_protocol/src/messages/query.rs b/sn_protocol/src/messages/query.rs index b28f6830fa..c7e4a56639 100644 --- a/sn_protocol/src/messages/query.rs +++ b/sn_protocol/src/messages/query.rs @@ -18,7 +18,18 @@ use serde::{Deserialize, Serialize}; #[derive(Eq, PartialEq, PartialOrd, Clone, Serialize, Deserialize, Debug)] pub enum Query { /// Retrieve the cost of storing a record at the given address. - GetStoreCost(NetworkAddress), + /// The storage verification is optional to be undertaken + GetStoreCost { + /// The Address of the record to be stored. + key: NetworkAddress, + /// The random nonce that nodes use to produce the Proof (i.e., hash(record+nonce)) + /// Set to None if no need to carry out storage check. + nonce: Option, + /// Defines the expected number of answers to the challenge. + /// Node shall try their best to fulfill the number, based on their capacity. + /// Set to 0 to indicate not carry out any verification. + difficulty: usize, + }, /// Retrieve a specific record from a specific peer. /// /// This should eventually lead to a [`GetReplicatedRecord`] response. @@ -47,6 +58,10 @@ pub enum Query { key: NetworkAddress, /// The random nonce that the node uses to produce the Proof (i.e., hash(record+nonce)) nonce: Nonce, + /// Defines the expected number of answers to the challenge. + /// For client publish verification, use 1 for efficiency. + /// Node shall try their best to fulfill the number, based on their capacity. + difficulty: usize, }, /// Queries close_group peers whether the target peer is a bad_node CheckNodeInProblem(NetworkAddress), @@ -56,10 +71,11 @@ impl Query { /// Used to send a query to the close group of the address. pub fn dst(&self) -> NetworkAddress { match self { - Query::GetStoreCost(address) | Query::CheckNodeInProblem(address) => address.clone(), + Query::CheckNodeInProblem(address) => address.clone(), // Shall not be called for this, as this is a `one-to-one` message, // and the destination shall be decided by the requester already. - Query::GetReplicatedRecord { key, .. } + Query::GetStoreCost { key, .. } + | Query::GetReplicatedRecord { key, .. } | Query::GetRegisterRecord { key, .. } | Query::GetChunkExistenceProof { key, .. } => key.clone(), } @@ -69,8 +85,12 @@ impl Query { impl std::fmt::Display for Query { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Query::GetStoreCost(address) => { - write!(f, "Query::GetStoreCost({address:?})") + Query::GetStoreCost { + key, + nonce, + difficulty, + } => { + write!(f, "Query::GetStoreCost({key:?} {nonce:?} {difficulty})") } Query::GetReplicatedRecord { key, requester } => { write!(f, "Query::GetReplicatedRecord({requester:?} {key:?})") @@ -78,8 +98,15 @@ impl std::fmt::Display for Query { Query::GetRegisterRecord { key, requester } => { write!(f, "Query::GetRegisterRecord({requester:?} {key:?})") } - Query::GetChunkExistenceProof { key, nonce } => { - write!(f, "Query::GetChunkExistenceProof({key:?} {nonce:?})") + Query::GetChunkExistenceProof { + key, + nonce, + difficulty, + } => { + write!( + f, + "Query::GetChunkExistenceProof({key:?} {nonce:?} {difficulty})" + ) } Query::CheckNodeInProblem(address) => { write!(f, "Query::CheckNodeInProblem({address:?})") diff --git a/sn_protocol/src/messages/response.rs b/sn_protocol/src/messages/response.rs index 17c986f581..f29aecc76f 100644 --- a/sn_protocol/src/messages/response.rs +++ b/sn_protocol/src/messages/response.rs @@ -30,6 +30,8 @@ pub enum QueryResponse { payment_address: RewardsAddress, /// Node's Peer Address peer_address: NetworkAddress, + /// Storage proofs based on requested target address and difficulty + storage_proofs: Vec<(NetworkAddress, Result)>, }, CheckNodeInProblem { /// Address of the peer that queried @@ -56,7 +58,7 @@ pub enum QueryResponse { /// Response to [`GetChunkExistenceProof`] /// /// [`GetChunkExistenceProof`]: crate::messages::Query::GetChunkExistenceProof - GetChunkExistenceProof(Result), + GetChunkExistenceProof(Vec<(NetworkAddress, Result)>), } // Debug implementation for QueryResponse, to avoid printing Vec @@ -67,10 +69,12 @@ impl Debug for QueryResponse { quote, payment_address, peer_address, + storage_proofs, } => { write!( f, - "GetStoreCost(quote: {quote:?}, from {peer_address:?} w/ payment_address: {payment_address:?})" + "GetStoreCost(quote: {quote:?}, from {peer_address:?} w/ payment_address: {payment_address:?}, and {} storage proofs)", + storage_proofs.len() ) } QueryResponse::CheckNodeInProblem { @@ -109,8 +113,9 @@ impl Debug for QueryResponse { write!(f, "GetRegisterRecord(Err({err:?}))") } }, - QueryResponse::GetChunkExistenceProof(proof) => { - write!(f, "GetChunkExistenceProof(proof: {proof:?})") + QueryResponse::GetChunkExistenceProof(proofs) => { + let addresses: Vec<_> = proofs.iter().map(|(addr, _)| addr.clone()).collect(); + write!(f, "GetChunkExistenceProof(checked chunks: {addresses:?})") } } }