Skip to content

Commit

Permalink
fix(chain manager): unregister peers coincident with your beacon in c…
Browse files Browse the repository at this point in the history
…ase of different superblock consensus
  • Loading branch information
girazoki committed Nov 23, 2020
1 parent c5aa51d commit 3c2ee24
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 13 deletions.
3 changes: 2 additions & 1 deletion data_structures/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
proto::{schema::witnet, ProtobufConvert},
transaction::Transaction,
};
use serde::{Deserialize, Serialize};

/// Witnet's protocol messages
#[derive(Debug, Eq, PartialEq, Clone, ProtobufConvert)]
Expand Down Expand Up @@ -138,7 +139,7 @@ pub struct InventoryRequest {
pub inventory: Vec<InventoryEntry>,
}

#[derive(Debug, Eq, PartialEq, Clone, ProtobufConvert, Hash)]
#[derive(Debug, Deserialize, Eq, PartialEq, Clone, ProtobufConvert, Serialize, Hash)]
#[protobuf_convert(pb = "witnet::LastBeacon")]
pub struct LastBeacon {
pub highest_block_checkpoint: CheckpointBeacon,
Expand Down
Binary file removed node/.Cargo.toml.swp
Binary file not shown.
1 change: 1 addition & 0 deletions node/src/actors/chain_manager/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ impl Handler<PeersBeacons> for ChainManager {
let beacon_consensus = peers_beacons.superblock_consensus(consensus_threshold);
let outbound_limit = peers_beacons.outbound_limit;
let pb_len = peers_beacons.pb.len();
self.last_received_beacons = peers_beacons.pb.clone();
let peers_needed_for_consensus = outbound_limit
.map(|x| {
// ceil(x * consensus_threshold / 100)
Expand Down
17 changes: 16 additions & 1 deletion node/src/actors/chain_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
cmp::{max, min, Ordering},
collections::{HashMap, HashSet},
convert::TryFrom,
net::SocketAddr,
time::Duration,
};

Expand Down Expand Up @@ -204,6 +205,8 @@ pub struct ChainManager {
temp_superblock_votes: Vec<SuperBlockVote>,
/// Commits and reveals to process later
temp_commits_and_reveals: Vec<Transaction>,
/// Last received Beacons
last_received_beacons: Vec<(SocketAddr, Option<LastBeacon>)>,
}

/// Wrapper around a block candidate that contains additional metadata regarding
Expand Down Expand Up @@ -1368,9 +1371,21 @@ impl ChainManager {
"Superblock consensus {} different from current superblock",
target_superblock_hash
);

// We are on a different chain than the one dictated by the network. Thus we need
// to throw away those outbound peers that were in the wrong consensus with us, and
// find new ones that can give us the blocks consolidated by the network
let mut peers_to_unregister: Vec<SocketAddr> = Vec::new();
for (addr, peer_beacon) in act.last_received_beacons.iter() {
if let Some(peer_beacon) = peer_beacon {
if peer_beacon.highest_block_checkpoint == act.get_chain_beacon() {
peers_to_unregister.push(*addr)
}
}
};
let sessions_manager_addr = SessionsManager::from_registry();

sessions_manager_addr.do_send(DropOutboundPeers {});
sessions_manager_addr.do_send(DropOutboundPeers {peers_to_drop: peers_to_unregister});
act.initialize_from_storage(ctx);
act.update_state_machine(StateMachine::WaitingConsensus);

Expand Down
5 changes: 4 additions & 1 deletion node/src/actors/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,10 @@ impl Message for LogMessage {

/// Drop all outbound peers
#[derive(Clone, Debug)]
pub struct DropOutboundPeers {}
pub struct DropOutboundPeers {
/// peers to be dropped
pub peers_to_drop: Vec<SocketAddr>,
}
impl Message for DropOutboundPeers {
type Result = ();
}
Expand Down
12 changes: 2 additions & 10 deletions node/src/actors/sessions_manager/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
fmt::{Debug, Display},
marker::Send,
net::SocketAddr,
};

use actix::{
Expand Down Expand Up @@ -474,14 +473,7 @@ impl Handler<SetLastBeacon> for SessionsManager {
impl Handler<DropOutboundPeers> for SessionsManager {
type Result = <DropOutboundPeers as Message>::Result;

fn handle(&mut self, _msg: DropOutboundPeers, _ctx: &mut Context<Self>) -> Self::Result {
let outbound_peers: Vec<SocketAddr> = self
.sessions
.outbound_consolidated
.collection
.keys()
.cloned()
.collect();
self.drop_outbound_peers(outbound_peers.as_ref());
fn handle(&mut self, msg: DropOutboundPeers, _ctx: &mut Context<Self>) -> Self::Result {
self.drop_outbound_peers(msg.peers_to_drop.as_ref());
}
}

0 comments on commit 3c2ee24

Please sign in to comment.