Skip to content

Commit

Permalink
feat: implement the initial scoring system
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 25, 2024
1 parent 07ce799 commit 107918c
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions sn_networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,6 @@ pub enum NetworkEvent {
FailedToFetchHolders(BTreeSet<PeerId>),
/// Quotes to be verified
QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> },
/// Carry out chunk proof check against the specified record and peer
ChunkProofVerification {
peer_id: PeerId,
key_to_verify: NetworkAddress,
},
}

/// Terminate node for the following reason
Expand Down Expand Up @@ -206,15 +201,6 @@ impl Debug for NetworkEvent {
quotes.len()
)
}
NetworkEvent::ChunkProofVerification {
peer_id,
key_to_verify: keys_to_verify,
} => {
write!(
f,
"NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})"
)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions sn_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ futures = "~0.3.13"
hex = "~0.4.3"
itertools = "~0.12.1"
libp2p = { version = "0.54.1", features = ["tokio", "dns", "kad", "macros"] }
num-traits = "0.2"
prometheus-client = { version = "0.22", optional = true }
# watch out updating this, protoc compiler needs to be installed on all build systems
# arm builds + musl are very problematic
Expand Down
207 changes: 144 additions & 63 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::RunningNode;
use bytes::Bytes;
use itertools::Itertools;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
use num_traits::cast::ToPrimitive;
use rand::{
rngs::{OsRng, StdRng},
thread_rng, Rng, SeedableRng,
Expand All @@ -30,6 +31,7 @@ use sn_protocol::{
NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use std::{
collections::HashMap,
net::SocketAddr,
path::PathBuf,
sync::{
Expand All @@ -38,7 +40,7 @@ use std::{
},
time::Duration,
};
use tokio::{sync::mpsc::Receiver, task::spawn};
use tokio::{sync::mpsc::Receiver, task::{spawn, JoinSet}};

use sn_evm::EvmNetwork;

Expand All @@ -56,6 +58,16 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
/// Interval to clean up unrelevant records
const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);

/// Highest score to achieve from each metric sub-sector during StorageChallenge.
const HIGHEST_SCORE: usize = 100;

/// Any nodes bearing a score below this shall be considered as bad.
/// Max is to be 100 * 100
const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 2000;

/// in ms, expecting average StorageChallenge complete time to be around 500ms.
const TIME_STEP: usize = 100;

/// Helper to build and run a Node
pub struct NodeBuilder {
identity_keypair: Keypair,
Expand Down Expand Up @@ -467,26 +479,6 @@ impl Node {
quotes_verification(&network, quotes).await;
});
}
NetworkEvent::ChunkProofVerification {
peer_id,
key_to_verify,
} => {
event_header = "ChunkProofVerification";
let network = self.network().clone();

debug!("Going to carry out storage existence check against peer {peer_id:?}");

let _handle = spawn(async move {
if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await {
return;
}
info!("Peer {peer_id:?} failed storage existence challenge.");
// TODO: shall challenge failure immediately triggers the node to be removed?
// or to lower connection score once feature introduced.
// If score falls too low, sever connection.
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
});
}
}

trace!(
Expand Down Expand Up @@ -634,7 +626,7 @@ impl Node {
nonce: Nonce,
difficulty: usize,
) -> Vec<(NetworkAddress, Result<ChunkProof, ProtocolError>)> {
info!("Received StorageChallenge targeting {key:?} with difficulty level of {difficulty}.");
let start = Instant::now();
let mut results = vec![];
if difficulty == 1 {
// Client checking existence of published chunk.
Expand Down Expand Up @@ -682,8 +674,8 @@ impl Node {
}

info!(
"Respond with {} answers to the StorageChallenge targeting {key:?}.",
results.len()
"Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}",
results.len(), start.elapsed()
);

results
Expand All @@ -693,6 +685,7 @@ impl Node {
/// and randomly pick one as the verification candidate.
/// This will challenge all closest peers at once.
async fn storage_challenge(network: Network) {
let start = Instant::now();
let closest_peers: Vec<PeerId> =
if let Ok(closest_peers) = network.get_closest_k_value_local_peers().await {
closest_peers
Expand All @@ -712,7 +705,7 @@ impl Node {
return;
}

let verify_candidates: Vec<NetworkAddress> =
let mut verify_candidates: Vec<NetworkAddress> =
if let Ok(all_keys) = network.get_all_local_record_addresses().await {
all_keys
.iter()
Expand All @@ -734,72 +727,160 @@ impl Node {
return;
}

info!("Starting node StorageChallenge against neighbours!");
let index: usize = OsRng.gen_range(0..num_of_targets);
let target = verify_candidates[index].clone();
// TODO: workload shall be dynamically deduced from resource usage
let difficulty = CLOSE_GROUP_SIZE;
verify_candidates.sort_by_key(|addr| target.distance(addr));
let expected_targets = verify_candidates.into_iter().take(difficulty);
let nonce: Nonce = thread_rng().gen::<u64>();
let mut expected_proofs = HashMap::new();
for addr in expected_targets {
if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
let expected_proof = ChunkProof::new(&record.value, nonce);
let _ = expected_proofs.insert(addr, expected_proof);
} else {
error!("Local record {addr:?} cann't be loaded from disk.");
}
}
let request = Request::Query(Query::GetChunkExistenceProof {
key: target.clone(),
nonce,
difficulty,
});

// TODO: launch the challenges parrallely, so that a scoring scheme can be utilized.
let mut tasks = JoinSet::new();
for peer_id in closest_peers {
if peer_id == network.peer_id() {
continue;
}
let network_clone = network.clone();
let request_clone = request.clone();
let expected_proofs_clone = expected_proofs.clone();
let _ = tasks.spawn(async move {
let res =
scoring_peer(network_clone, peer_id, request_clone, expected_proofs_clone)
.await;
(peer_id, res)
});
}

let index: usize = OsRng.gen_range(0..num_of_targets);
if !chunk_proof_verify_peer(&network, peer_id, &verify_candidates[index]).await {
info!("Peer {peer_id:?} failed storage challenge.");
// TODO: shall the challenge failure immediately triggers the node to be removed?
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
while let Some(res) = tasks.join_next().await {
match res {
Ok((peer_id, score)) => {
if score < MIN_ACCEPTABLE_HEALTHY_SCORE {
info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}.");
// TODO: shall the challenge failure immediately triggers the node to be removed?
network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck);
}
}
Err(e) => {
info!("StorageChallenge task completed with error {e:?}");
}
}
}

info!("Completed node StorageChallenge against neighbours!");
info!(
"Completed node StorageChallenge against neighbours in {:?}!",
start.elapsed()
);
}
}

async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool {
let nonce: Nonce = thread_rng().gen::<u64>();

let request = Request::Query(Query::GetChunkExistenceProof {
key: key.clone(),
nonce,
difficulty: CLOSE_GROUP_SIZE,
});

async fn scoring_peer(
network: Network,
peer_id: PeerId,
request: Request,
expected_proofs: HashMap<NetworkAddress, ChunkProof>,
) -> usize {
let start = Instant::now();
let responses = network
.send_and_get_responses(&[peer_id], &request, true)
.await;

// TODO: cross check with local knowledge (i.e. the claimed closest shall match locals)
// this also prevent peer falsely give empty or non-existent answers.

if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) =
responses.get(&peer_id)
{
if answers.is_empty() {
info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge.");
return false;
return 0;
}
let elapsed = start.elapsed();

let mut received_proofs = vec![];
for (addr, proof) in answers {
if let Ok(proof) = proof {
if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await {
let expected_proof = ChunkProof::new(&record.value, nonce);
// Any wrong answer shall be considered as a failure
if *proof != expected_proof {
return false;
}
} else {
debug!(
"Could not get ChunkProof for {addr:?} as we don't have the record locally."
);
}
} else {
debug!(
"Could not verify answer of {addr:?} from {peer_id:?} as responded with {proof:?}"
);
received_proofs.push((addr.clone(), proof.clone()));
}
}

let score = mark_peer(elapsed, received_proofs, &expected_proofs);
info!(
"Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.",
answers.len()
);
score
} else {
info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error.");
return false;
0
}
}

// Based on following metrics:
// * the duration
// * is there false answer
// * percentage of correct answers among the expected closest
// The higher the score, the better confidence on the peer
fn mark_peer(
duration: Duration,
answers: Vec<(NetworkAddress, ChunkProof)>,
expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
) -> usize {
let duration_score = duration_score_scheme(duration);
let challenge_score = challenge_score_scheme(answers, expected_proofs);

duration_score * challenge_score
}

// Less duration shall get higher score
fn duration_score_scheme(duration: Duration) -> usize {
// So far just a simple stepped scheme, capped by HIGHEST_SCORE
let in_ms = if let Some(value) = duration.as_millis().to_usize() {
value
} else {
info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms.");
1000
};

true
let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP);
HIGHEST_SCORE - step
}

// Any false answer shall result in 0 score immediately
fn challenge_score_scheme(
answers: Vec<(NetworkAddress, ChunkProof)>,
expected_proofs: &HashMap<NetworkAddress, ChunkProof>,
) -> usize {
let mut correct_answers = 0;
for (addr, chunk_proof) in answers {
if let Some(expected_proof) = expected_proofs.get(&addr) {
if expected_proof.verify(&chunk_proof) {
correct_answers += 1;
} else {
info!("Spot a false answer to the challenge regarding {addr:?}");
// Any false answer shall result in 0 score immediately
return 0;
}
}
}
// TODO: For those answers not among the expected_proofs,
// it could be due to having different knowledge of records to us.
// shall we:
// * set the target being close to us, so that neighbours sharing same knowledge in higher chance
// * fetch from local to testify
// * fetch from network to testify
std::cmp::min(
HIGHEST_SCORE,
HIGHEST_SCORE * correct_answers / expected_proofs.len(),
)
}

0 comments on commit 107918c

Please sign in to comment.