Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore!: improve ChunkProofVerification #2446

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ fn check_and_wipe_storage_dir_if_necessary(
// * the storage_dir shall be wiped out
// * the version file shall be updated
if cur_version_str != prev_version_str {
warn!("Trying to wipe out storege dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
warn!("Trying to wipe out storage dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
let _ = fs::remove_dir_all(storage_dir_path);

let mut file = fs::OpenOptions::new()
Expand Down
14 changes: 0 additions & 14 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ pub enum NetworkEvent {
FailedToFetchHolders(BTreeSet<PeerId>),
/// Quotes to be verified
QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
/// Carry out chunk proof check against the specified record and peer
ChunkProofVerification {
peer_id: PeerId,
key_to_verify: NetworkAddress,
},
}

/// Terminate node for the following reason
Expand Down Expand Up @@ -206,15 +201,6 @@ impl Debug for NetworkEvent {
quotes.len()
)
}
NetworkEvent::ChunkProofVerification {
peer_id,
key_to_verify: keys_to_verify,
} => {
write!(
f,
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
}
}
}
Expand Down
112 changes: 4 additions & 108 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
cmd::NetworkSwarmCmd, log_markers::Marker, sort_peers_by_address, MsgResponder, NetworkError,
NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE,
cmd::NetworkSwarmCmd, log_markers::Marker, MsgResponder, NetworkError, NetworkEvent,
SwarmDriver,
};
use itertools::Itertools;
use libp2p::request_response::{self, Message};
use rand::{rngs::OsRng, thread_rng, Rng};
use sn_protocol::{
messages::{CmdResponse, Request, Response},
storage::RecordType,
Expand Down Expand Up @@ -207,14 +205,10 @@ impl SwarmDriver {
return;
}

let more_than_one_key = incoming_keys.len() > 1;

// On receive a replication_list from a close_group peer, we undertake two tasks:
// On receive a replication_list from a close_group peer, we undertake:
// 1, For those keys that we don't have:
// fetch them if close enough to us
// 2, For those keys that we have and supposed to be held by the sender as well:
// start chunk_proof check against a randomly selected chunk type record to the sender
// 3, For those spends that we have that differ in the hash, we fetch the other version
// 2, For those spends that we have that differ in the hash, we fetch the other version
// and update our local copy.
let all_keys = self
.swarm
Expand All @@ -230,103 +224,5 @@ impl SwarmDriver {
} else {
self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch));
}

// Only trigger chunk_proof check based every X% of the time
let mut rng = thread_rng();
// 5% probability
if more_than_one_key && rng.gen_bool(0.05) {
self.verify_peer_storage(sender.clone());

// In additon to verify the sender, we also verify a random close node.
// This is to avoid malicious node escaping the check by never send a replication_list.
// With further reduced probability of 1% (5% * 20%)
if rng.gen_bool(0.2) {
let close_group_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&self.self_peer_id.into())
.map(|peer| peer.into_preimage())
.take(CLOSE_GROUP_SIZE)
.collect_vec();
if close_group_peers.len() == CLOSE_GROUP_SIZE {
loop {
let index: usize = OsRng.gen_range(0..close_group_peers.len());
let candidate = NetworkAddress::from_peer(close_group_peers[index]);
if sender != candidate {
self.verify_peer_storage(candidate);
break;
}
}
}
}
}
}

/// Check among all chunk type records that we have, select those close to the peer,
/// and randomly pick one as the verification candidate.
fn verify_peer_storage(&mut self, peer: NetworkAddress) {
let mut closest_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&self.self_peer_id.into())
.map(|peer| peer.into_preimage())
.take(20)
.collect_vec();
closest_peers.push(self.self_peer_id);

let target_peer = if let Some(peer_id) = peer.as_peer_id() {
peer_id
} else {
error!("Target {peer:?} is not a valid PeerId");
return;
};

let all_keys = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.record_addresses_ref();

// Targeted chunk type record shall be expected within the close range from our perspective.
let mut verify_candidates: Vec<NetworkAddress> = all_keys
.values()
.filter_map(|(addr, record_type)| {
if RecordType::Chunk == *record_type {
match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) {
Ok(close_group) => {
if close_group.contains(&&target_peer) {
Some(addr.clone())
} else {
None
}
}
Err(err) => {
warn!("Could not get sorted peers for {addr:?} with error {err:?}");
None
}
}
} else {
None
}
})
.collect();

verify_candidates.sort_by_key(|a| peer.distance(a));

// To ensure the candidate must have to be held by the peer,
// we only carry out check when there are already certain amount of chunks uploaded
// AND choose candidate from certain reduced range.
if verify_candidates.len() > 50 {
let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2));
self.send_event(NetworkEvent::ChunkProofVerification {
peer_id: target_peer,
key_to_verify: verify_candidates[index].clone(),
});
} else {
debug!("No valid candidate to be checked against peer {peer:?}");
}
}
}
35 changes: 29 additions & 6 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl Network {
}

/// Get the Chunk existence proof from the close nodes to the provided chunk address.
/// This is to be used by client only to verify the success of the upload.
pub async fn verify_chunk_existence(
&self,
chunk_address: NetworkAddress,
Expand Down Expand Up @@ -304,21 +305,30 @@ impl Network {
let request = Request::Query(Query::GetChunkExistenceProof {
key: chunk_address.clone(),
nonce,
difficulty: 1,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
let n_verified = responses
.into_iter()
.filter_map(|(peer, resp)| {
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) =
if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) =
resp
{
if expected_proof.verify(&proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
if proofs.is_empty() {
warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty.");
None
} else if let Ok(ref proof) = proofs[0].1 {
if expected_proof.verify(proof) {
debug!("Got a valid ChunkProof from {peer:?}");
Some(())
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
None
}
} else {
warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?");
warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1);
None
}
} else {
Expand Down Expand Up @@ -370,7 +380,12 @@ impl Network {
return Err(NetworkError::NoStoreCostResponses);
}

let request = Request::Query(Query::GetStoreCost(record_address.clone()));
// Client shall decide whether to carry out storage verification or not.
let request = Request::Query(Query::GetStoreCost {
key: record_address.clone(),
nonce: None,
difficulty: 0,
});
let responses = self
.send_and_get_responses(&close_nodes, &request, true)
.await;
Expand All @@ -388,7 +403,11 @@ impl Network {
quote: Ok(quote),
payment_address,
peer_address,
storage_proofs,
}) => {
if !storage_proofs.is_empty() {
debug!("Storage proofing during GetStoreCost to be implemented.");
}
// Check the quote itself is valid.
if quote.cost
!= AttoTokens::from_u64(calculate_cost_for_records(
Expand All @@ -406,7 +425,11 @@ impl Network {
quote: Err(ProtocolError::RecordExists(_)),
payment_address,
peer_address,
storage_proofs,
}) => {
if !storage_proofs.is_empty() {
debug!("Storage proofing during GetStoreCost to be implemented.");
}
all_costs.push((peer_address, payment_address, PaymentQuote::zero()));
}
_ => {
Expand Down
1 change: 1 addition & 0 deletions sn_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ futures = "~0.3.13"
hex = "~0.4.3"
itertools = "~0.12.1"
libp2p = { version = "0.54.1", features = ["tokio", "dns", "kad", "macros"] }
num-traits = "0.2"
prometheus-client = { version = "0.22", optional = true }
# watch out updating this, protoc compiler needs to be installed on all build systems
# arm builds + musl are very problematic
Expand Down
Loading
Loading