Skip to content

Commit

Permalink
feat: try sync peers on connect (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored Nov 13, 2024
1 parent a0b9124 commit f239a91
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/server/http/stats_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ impl StatsCollector {
let formatter = Formatter::new();

info!(target: LOG_TARGET,
"========= Uptime: {}. Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner(A/R): {}/{}. Pool(A/R) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{}==== ",
"========= Uptime: {}. V{} Chains: Rx {}..{}, Sha3 {}..{}. Difficulty (Target/Network): Rx: {}/{} Sha3x: {}/{} Miner(A/R): {}/{}. Pool(A/R) {}/{}. Peers(a/g/b) {}/{}/{} libp2p (i/o) {}/{}==== ",
humantime::format_duration(Duration::from_secs(
EpochTime::now().as_u64().checked_sub(
self.first_stat_received.unwrap_or(EpochTime::now()).as_u64())
.unwrap_or_default())),
env!("CARGO_PKG_VERSION"),
self.randomx_chain_height.saturating_sub(self.randomx_chain_length),
self.randomx_chain_height,
self.sha3x_chain_height.saturating_sub(self.sha3x_chain_length),
Expand Down
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pub mod grpc;
pub mod http;
pub mod p2p;

pub const PROTOCOL_VERSION: u64 = 16;
pub const PROTOCOL_VERSION: u64 = 17;
4 changes: 3 additions & 1 deletion src/server/p2p/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,15 @@ impl CatchUpSyncResponse {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DirectPeerInfoRequest {
pub peer_id: String,
pub info: PeerInfo,
pub my_info: PeerInfo,
pub best_peers: Vec<PeerInfo>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DirectPeerInfoResponse {
pub peer_id: String,
pub info: PeerInfo,
pub best_peers: Vec<PeerInfo>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down
43 changes: 37 additions & 6 deletions src/server/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ const CATCH_UP_SYNC_BLOCKS_IN_I_HAVE: usize = 100;
const MAX_CATCH_UP_ATTEMPTS: usize = 150;
// Time to start up and catch up before we start processing new tip messages
const STARTUP_CATCH_UP_TIME: Duration = Duration::from_secs(1);
const NUM_PEERS_TO_SYNC_PER_ALGO: usize = 8;

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct Squad {
Expand Down Expand Up @@ -754,16 +755,22 @@ where S: ShareChain
return;
}

// TODO: Should we send them our details? The problem is that if we send too many of these, libp2p
// starts dropping requests with "libp2p_relay::priv_client::handler Dropping in-flight connect
// request because we are at capacity"
let mut my_best_peers = self
.network_peer_store
.best_peers_to_sync(NUM_PEERS_TO_SYNC_PER_ALGO, PowAlgorithm::RandomX);
my_best_peers.extend(
self.network_peer_store
.best_peers_to_sync(NUM_PEERS_TO_SYNC_PER_ALGO, PowAlgorithm::Sha3x),
);

let my_best_peers: Vec<_> = my_best_peers.into_iter().map(|p| p.peer_info).collect();
self.swarm
.behaviour_mut()
.direct_peer_exchange
.send_request(peer, DirectPeerInfoRequest {
info: my_info,
my_info,
peer_id: local_peer_id.to_base58(),
best_peers: my_best_peers,
});
}
}
Expand All @@ -773,11 +780,15 @@ where S: ShareChain
channel: ResponseChannel<DirectPeerInfoResponse>,
request: DirectPeerInfoRequest,
) {
if request.info.version != PROTOCOL_VERSION {
if request.my_info.version != PROTOCOL_VERSION {
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", request.peer_id);
return;
}

let source_peer = request.my_info.peer_id;
let num_peers = request.best_peers.len();

info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[DIRECT_PEER_EXCHANGE_TOPIC] New peer info: {source_peer:?} with {num_peers} new peers");
let local_peer_id = *self.swarm.local_peer_id();
if let Ok(info) = self
.create_peer_info(self.swarm.external_addresses().cloned().collect())
Expand All @@ -786,13 +797,22 @@ where S: ShareChain
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to create peer info: {error:?}");
})
{
let mut my_best_peers = self
.network_peer_store
.best_peers_to_sync(NUM_PEERS_TO_SYNC_PER_ALGO, PowAlgorithm::RandomX);
my_best_peers.extend(
self.network_peer_store
.best_peers_to_sync(NUM_PEERS_TO_SYNC_PER_ALGO, PowAlgorithm::Sha3x),
);
let my_best_peers: Vec<_> = my_best_peers.into_iter().map(|p| p.peer_info).collect();
if self
.swarm
.behaviour_mut()
.direct_peer_exchange
.send_response(channel, DirectPeerInfoResponse {
peer_id: local_peer_id.to_base58(),
info,
best_peers: my_best_peers,
})
.is_err()
{
Expand All @@ -802,9 +822,14 @@ where S: ShareChain

match request.peer_id.parse::<PeerId>() {
Ok(peer_id) => {
if self.add_peer(request.info, peer_id).await {
if self.add_peer(request.my_info, peer_id).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
for peer in request.best_peers {
if let Some(peer_id) = peer.peer_id {
self.add_peer(peer, peer_id).await;
}
}
},
Err(error) => {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to parse peer id: {error:?}");
Expand All @@ -817,11 +842,17 @@ where S: ShareChain
debug!(target: LOG_TARGET, squad = &self.config.squad; "Peer {} has an outdated version, skipping", response.peer_id);
return;
}
info!(target: PEER_INFO_LOGGING_LOG_TARGET, "[DIRECT_PEER_EXCHANGE_TOPIC] New peer info: {} with {} peers", response.peer_id, response.best_peers.len());
match response.peer_id.parse::<PeerId>() {
Ok(peer_id) => {
if self.add_peer(response.info, peer_id).await {
self.swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
for peer in response.best_peers {
if let Some(peer_id) = peer.peer_id {
self.add_peer(peer, peer_id).await;
}
}
},
Err(error) => {
error!(target: LOG_TARGET, squad = &self.config.squad; "Failed to parse peer id: {error:?}");
Expand Down

0 comments on commit f239a91

Please sign in to comment.