From 2f72a3d0166c2aff874d21fd80da4b0429a69f29 Mon Sep 17 00:00:00 2001 From: qima Date: Sat, 16 Nov 2024 01:27:04 +0800 Subject: [PATCH] chore: improve ChunkProofVerification --- sn_networking/src/lib.rs | 19 ++- sn_node/src/node.rs | 177 +++++++++++++++++---------- sn_protocol/src/messages/query.rs | 14 ++- sn_protocol/src/messages/response.rs | 7 +- 4 files changed, 143 insertions(+), 74 deletions(-) diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index b7118d18a3..9e1ceb3ebd 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -304,6 +304,7 @@ impl Network { let request = Request::Query(Query::GetChunkExistenceProof { key: chunk_address.clone(), nonce, + expected_answers: 1, }); let responses = self .send_and_get_responses(&close_nodes, &request, true) @@ -311,14 +312,22 @@ impl Network { let n_verified = responses .into_iter() .filter_map(|(peer, resp)| { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = + if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) = resp { - if expected_proof.verify(&proof) { - debug!("Got a valid ChunkProof from {peer:?}"); - Some(()) + if proofs.is_empty() { + warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty."); + None + } else if let Ok(ref proof) = proofs[0].1 { + if expected_proof.verify(proof) { + debug!("Got a valid ChunkProof from {peer:?}"); + Some(()) + } else { + warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + None + } } else { - warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1); None } } else { diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index bff4266b6b..518d037b33 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -19,12 +19,12 @@ use sn_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use sn_networking::MetricsRegistries; use sn_networking::{ - close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, - SwarmDriver, + close_group_majority, Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver, }; use sn_protocol::{ error::Error as ProtocolError, - messages::{ChunkProof, CmdResponse, Query, QueryResponse, Request, Response}, + messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response}, + storage::RecordType, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use std::{ @@ -675,21 +675,17 @@ impl Node { QueryResponse::GetReplicatedRecord(result) } - Query::GetChunkExistenceProof { key, nonce } => { - debug!("Got GetChunkExistenceProof for chunk {key:?}"); - - let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); - if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { - let proof = ChunkProof::new(&record.value, nonce); - debug!("Chunk proof for {key:?} is {proof:?}"); - result = Ok(proof) - } else { - debug!( - "Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - } + Query::GetChunkExistenceProof { + key, + nonce, + expected_answers, + } => { + debug!("Got GetChunkExistenceProof targeting chunk {key:?} with {expected_answers} answers."); - QueryResponse::GetChunkExistenceProof(result) + QueryResponse::GetChunkExistenceProof( + Self::respond_x_closest_chunk_proof(network, key, nonce, expected_answers) + .await, + ) } Query::CheckNodeInProblem(target_address) => { debug!("Got CheckNodeInProblem for peer {target_address:?}"); @@ -712,6 +708,66 @@ impl Node { Response::Query(resp) } + async fn respond_x_closest_chunk_proof( + network: &Network, + key: NetworkAddress, + nonce: Nonce, + expected_answers: usize, + ) -> Vec<(NetworkAddress, Result)> { + let mut results = vec![]; + if expected_answers == 1 { + // Client checking existence of published chunk. + let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); + if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + result = Ok(proof) + } else { + debug!("Could not get ChunkProof for {key:?} as we don't have the record locally."); + } + + results.push((key.clone(), result)); + } else { + let all_local_records = network.get_all_local_record_addresses().await; + + if let Ok(all_local_records) = all_local_records { + // Only `ChunkRecord`s can be consistantly verified + let mut all_chunk_addrs: Vec<_> = all_local_records + .iter() + .filter_map(|(addr, record_type)| { + if *record_type == RecordType::Chunk { + Some(addr.clone()) + } else { + None + } + }) + .collect(); + + // Sort by distance and only take first X closest entries + all_chunk_addrs.sort_by_key(|addr| key.distance(addr)); + + // TODO: this shall be deduced from resource usage dynamically + let workload_factor = std::cmp::min(expected_answers, CLOSE_GROUP_SIZE); + + for addr in all_chunk_addrs.iter().take(workload_factor) { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await + { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + results.push((addr.clone(), Ok(proof))); + } + } + } + } + + debug!( + "Respond with {} answers to ChunkProofChallenge targeting {key:?}.", + results.len() + ); + + results + } + async fn try_bad_nodes_check(network: Network, rolling_index: usize) { if let Ok(kbuckets) = network.get_kbuckets().await { let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum(); @@ -770,58 +826,51 @@ impl Node { } async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { - let check_passed = if let Ok(Some(record)) = - network.get_local_record(&key.to_record_key()).await - { - let nonce = thread_rng().gen::(); - let expected_proof = ChunkProof::new(&record.value, nonce); - debug!("To verify peer {peer_id:?}, chunk_proof for {key:?} is {expected_proof:?}"); + let nonce: Nonce = thread_rng().gen::(); - let request = Request::Query(Query::GetChunkExistenceProof { - key: key.clone(), - nonce, - }); - let responses = network - .send_and_get_responses(&[peer_id], &request, true) - .await; - let n_verified = responses - .into_iter() - .filter_map(|(peer, resp)| received_valid_chunk_proof(key, &expected_proof, peer, resp)) - .count(); - - n_verified >= 1 - } else { - error!( - "To verify peer {peer_id:?} Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - true - }; + let request = Request::Query(Query::GetChunkExistenceProof { + key: key.clone(), + nonce, + expected_answers: CLOSE_GROUP_SIZE, + }); - if !check_passed { - return false; - } + let responses = network + .send_and_get_responses(&[peer_id], &request, true) + .await; - true -} + // TODO: cross check with local knowledge (i.e. the claimed closest shall match locals) + // this also prevent peer falsely give empty or non-existent answers. -fn received_valid_chunk_proof( - key: &NetworkAddress, - expected_proof: &ChunkProof, - peer: PeerId, - resp: Result, -) -> Option<()> { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = resp { - if expected_proof.verify(&proof) { - debug!( - "Got a valid ChunkProof of {key:?} from {peer:?}, during peer chunk proof check." - ); - Some(()) - } else { - warn!("When verify {peer:?} with ChunkProof of {key:?}, the chunk might have been tampered?"); - None + if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) = + responses.get(&peer_id) + { + if answers.is_empty() { + info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge."); + return false; + } + for (addr, proof) in answers { + if let Ok(proof) = proof { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { + let expected_proof = ChunkProof::new(&record.value, nonce); + // Any wrong answer shall be considered as a failure + if *proof != expected_proof { + return false; + } + } else { + debug!( + "Could not get ChunkProof for {addr:?} as we don't have the record locally." + ); + } + } else { + debug!( + "Could not verify answer of {addr:?} from {peer_id:?} as responded with {proof:?}" + ); + } } } else { - debug!("Did not get a valid response for the ChunkProof from {peer:?}"); - None + info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error."); + return false; } + + true } diff --git a/sn_protocol/src/messages/query.rs b/sn_protocol/src/messages/query.rs index b28f6830fa..43f8b4e97c 100644 --- a/sn_protocol/src/messages/query.rs +++ b/sn_protocol/src/messages/query.rs @@ -47,6 +47,9 @@ pub enum Query { key: NetworkAddress, /// The random nonce that the node uses to produce the Proof (i.e., hash(record+nonce)) nonce: Nonce, + /// Expected number of answers. For client publish verification, use 1 for efficiency. + /// Node shall try their best to fulfill the number, based on their capacity. + expected_answers: usize, }, /// Queries close_group peers whether the target peer is a bad_node CheckNodeInProblem(NetworkAddress), @@ -78,8 +81,15 @@ impl std::fmt::Display for Query { Query::GetRegisterRecord { key, requester } => { write!(f, "Query::GetRegisterRecord({requester:?} {key:?})") } - Query::GetChunkExistenceProof { key, nonce } => { - write!(f, "Query::GetChunkExistenceProof({key:?} {nonce:?})") + Query::GetChunkExistenceProof { + key, + nonce, + expected_answers, + } => { + write!( + f, + "Query::GetChunkExistenceProof({key:?} {nonce:?} {expected_answers})" + ) } Query::CheckNodeInProblem(address) => { write!(f, "Query::CheckNodeInProblem({address:?})") diff --git a/sn_protocol/src/messages/response.rs b/sn_protocol/src/messages/response.rs index 17c986f581..44e9932c23 100644 --- a/sn_protocol/src/messages/response.rs +++ b/sn_protocol/src/messages/response.rs @@ -56,7 +56,7 @@ pub enum QueryResponse { /// Response to [`GetChunkExistenceProof`] /// /// [`GetChunkExistenceProof`]: crate::messages::Query::GetChunkExistenceProof - GetChunkExistenceProof(Result), + GetChunkExistenceProof(Vec<(NetworkAddress, Result)>), } // Debug implementation for QueryResponse, to avoid printing Vec @@ -109,8 +109,9 @@ impl Debug for QueryResponse { write!(f, "GetRegisterRecord(Err({err:?}))") } }, - QueryResponse::GetChunkExistenceProof(proof) => { - write!(f, "GetChunkExistenceProof(proof: {proof:?})") + QueryResponse::GetChunkExistenceProof(proofs) => { + let addresses: Vec<_> = proofs.iter().map(|(addr, _)| addr.clone()).collect(); + write!(f, "GetChunkExistenceProof(checked chunks: {addresses:?})") } } }