Skip to content

Commit

Permalink
chore: improve ChunkProofVerification
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 15, 2024
1 parent d0e6faf commit 2f72a3d
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 74 deletions.
19 changes: 14 additions & 5 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,21 +304,30 @@ impl Network {
let request = Request::Query(Query::GetChunkExistenceProof {
key: chunk_address.clone(),
nonce,
expected_answers: 1,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let n_verified = responses
.into_iter()
.filter_map(|(peer, resp)| {
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) =
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
resp
{
if expected_proof.verify(&proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
if proofs.is_empty() {
warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
None
} else if let Ok(ref proof) = proofs[0].1 {
if expected_proof.verify(proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
None
}
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
None
}
} else {
Expand Down
177 changes: 113 additions & 64 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use sn_evm::{AttoTokens, RewardsAddress};
#[cfg(feature = "open-metrics")]
use sn_networking::MetricsRegistries;
use sn_networking::{
close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue,
SwarmDriver,
close_group_majority, Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver,
};
use sn_protocol::{
error::Error as ProtocolError,
messages::{ChunkProof, CmdResponse, Query, QueryResponse, Request, Response},
messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response},
storage::RecordType,
NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use std::{
Expand Down Expand Up @@ -675,21 +675,17 @@ impl Node {

QueryResponse::GetReplicatedRecord(result)
}
Query::GetChunkExistenceProof { key, nonce } => {
debug!("Got GetChunkExistenceProof for chunk {key:?}");

let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
let proof = ChunkProof::new(&record.value, nonce);
debug!("Chunk proof for {key:?} is {proof:?}");
result = Ok(proof)
} else {
debug!(
"Could not get ChunkProof for {key:?} as we don't have the record locally."
);
}
Query::GetChunkExistenceProof {
key,
nonce,
expected_answers,
} => {
debug!("Got GetChunkExistenceProof targeting chunk {key:?} with {expected_answers} answers.");

QueryResponse::GetChunkExistenceProof(result)
QueryResponse::GetChunkExistenceProof(
Self::respond_x_closest_chunk_proof(network, key, nonce, expected_answers)
.await,
)
}
Query::CheckNodeInProblem(target_address) => {
debug!("Got CheckNodeInProblem for peer {target_address:?}");
Expand All @@ -712,6 +708,66 @@ impl Node {
Response::Query(resp)
}

async fn respond_x_closest_chunk_proof(
network: &Network,
key: NetworkAddress,
nonce: Nonce,
expected_answers: usize,
) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
let mut results = vec![];
if expected_answers == 1 {
// Client checking existence of published chunk.
let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone()));
if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await {
let proof = ChunkProof::new(&record.value, nonce);
debug!("Chunk proof for {key:?} is {proof:?}");
result = Ok(proof)
} else {
debug!("Could not get ChunkProof for {key:?} as we don't have the record locally.");
}

results.push((key.clone(), result));
} else {
let all_local_records = network.get_all_local_record_addresses().await;

if let Ok(all_local_records) = all_local_records {
// Only `ChunkRecord`s can be consistantly verified
let mut all_chunk_addrs: Vec<_> = all_local_records
.iter()
.filter_map(|(addr, record_type)| {
if *record_type == RecordType::Chunk {
Some(addr.clone())
} else {
None
}
})
.collect();

// Sort by distance and only take first X closest entries
all_chunk_addrs.sort_by_key(|addr| key.distance(addr));

// TODO: this shall be deduced from resource usage dynamically

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let workload_factor = std::cmp::min(expected_answers, CLOSE_GROUP_SIZE);

for addr in all_chunk_addrs.iter().take(workload_factor) {
if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await
{
let proof = ChunkProof::new(&record.value, nonce);
debug!("Chunk proof for {key:?} is {proof:?}");
results.push((addr.clone(), Ok(proof)));
}
}
}
}

debug!(
"Respond with {} answers to ChunkProofChallenge targeting {key:?}.",
results.len()
);

results
}

async fn try_bad_nodes_check(network: Network, rolling_index: usize) {
if let Ok(kbuckets) = network.get_kbuckets().await {
let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum();
Expand Down Expand Up @@ -770,58 +826,51 @@ impl Node {
}

async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool {
let check_passed = if let Ok(Some(record)) =
network.get_local_record(&key.to_record_key()).await
{
let nonce = thread_rng().gen::<u64>();
let expected_proof = ChunkProof::new(&record.value, nonce);
debug!("To verify peer {peer_id:?}, chunk_proof for {key:?} is {expected_proof:?}");
let nonce: Nonce = thread_rng().gen::<u64>();

let request = Request::Query(Query::GetChunkExistenceProof {
key: key.clone(),
nonce,
});
let responses = network
.send_and_get_responses(&[peer_id], &request, true)
.await;
let n_verified = responses
.into_iter()
.filter_map(|(peer, resp)| received_valid_chunk_proof(key, &expected_proof, peer, resp))
.count();

n_verified >= 1
} else {
error!(
"To verify peer {peer_id:?} Could not get ChunkProof for {key:?} as we don't have the record locally."
);
true
};
let request = Request::Query(Query::GetChunkExistenceProof {
key: key.clone(),
nonce,
expected_answers: CLOSE_GROUP_SIZE,
});

if !check_passed {
return false;
}
let responses = network
.send_and_get_responses(&[peer_id], &request, true)
.await;

true
}
// TODO: cross check with local knowledge (i.e. the claimed closest shall match locals)

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
// this also prevent peer falsely give empty or non-existent answers.

fn received_valid_chunk_proof(
key: &NetworkAddress,
expected_proof: &ChunkProof,
peer: PeerId,
resp: Result<Response, NetworkError>,
) -> Option<()> {
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = resp {
if expected_proof.verify(&proof) {
debug!(
"Got a valid ChunkProof of {key:?} from {peer:?}, during peer chunk proof check."
);
Some(())
} else {
warn!("When verify {peer:?} with ChunkProof of {key:?}, the chunk might have been tampered?");
None
if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) =
responses.get(&peer_id)
{
if answers.is_empty() {
info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
return false;
}
for (addr, proof) in answers {
if let Ok(proof) = proof {
if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
let expected_proof = ChunkProof::new(&record.value, nonce);
// Any wrong answer shall be considered as a failure
if *proof != expected_proof {
return false;
}
} else {
debug!(
"Could not get ChunkProof for {addr:?} as we don't have the record locally."
);
}
} else {
debug!(
"Could not verify answer of {addr:?} from {peer_id:?} as responded with {proof:?}"
);
}
}
} else {
debug!("Did not get a valid response for the ChunkProof from {peer:?}");
None
info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
return false;
}

true
}
14 changes: 12 additions & 2 deletions sn_protocol/src/messages/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub enum Query {
key: NetworkAddress,
/// The random nonce that the node uses to produce the Proof (i.e., hash(record+nonce))
nonce: Nonce,
/// Expected number of answers. For client publish verification, use 1 for efficiency.
/// Node shall try their best to fulfill the number, based on their capacity.
expected_answers: usize,
},
/// Queries close_group peers whether the target peer is a bad_node
CheckNodeInProblem(NetworkAddress),
Expand Down Expand Up @@ -78,8 +81,15 @@ impl std::fmt::Display for Query {
Query::GetRegisterRecord { key, requester } => {
write!(f, "Query::GetRegisterRecord({requester:?} {key:?})")
}
Query::GetChunkExistenceProof { key, nonce } => {
write!(f, "Query::GetChunkExistenceProof({key:?} {nonce:?})")
Query::GetChunkExistenceProof {
key,
nonce,
expected_answers,
} => {
write!(
f,
"Query::GetChunkExistenceProof({key:?} {nonce:?} {expected_answers})"
)
}
Query::CheckNodeInProblem(address) => {
write!(f, "Query::CheckNodeInProblem({address:?})")
Expand Down
7 changes: 4 additions & 3 deletions sn_protocol/src/messages/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub enum QueryResponse {
/// Response to [`GetChunkExistenceProof`]
///
/// [`GetChunkExistenceProof`]: crate::messages::Query::GetChunkExistenceProof
GetChunkExistenceProof(Result<ChunkProof>),
GetChunkExistenceProof(Vec<(NetworkAddress, Result<ChunkProof>)>),
}

// Debug implementation for QueryResponse, to avoid printing Vec<u8>
Expand Down Expand Up @@ -109,8 +109,9 @@ impl Debug for QueryResponse {
write!(f, "GetRegisterRecord(Err({err:?}))")
}
},
QueryResponse::GetChunkExistenceProof(proof) => {
write!(f, "GetChunkExistenceProof(proof: {proof:?})")
QueryResponse::GetChunkExistenceProof(proofs) => {
let addresses: Vec<_> = proofs.iter().map(|(addr, _)| addr.clone()).collect();
write!(f, "GetChunkExistenceProof(checked chunks: {addresses:?})")
}
}
}
Expand Down

0 comments on commit 2f72a3d

Please sign in to comment.