Skip to content

Commit

Permalink
feat(node): ice outbound peer that sent invalid block batch
Browse files Browse the repository at this point in the history
  • Loading branch information
lrubiorod committed Dec 21, 2020
1 parent 0bad7a2 commit 1790269
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 30 deletions.
1 change: 1 addition & 0 deletions node/src/actors/chain_manager/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl ChainManager {
let genesis_block = ig.build_genesis_block(consensus_constants.bootstrap_hash);
ctx.notify(AddBlocks {
blocks: vec![genesis_block],
sender: None,
});
}
}
Expand Down
31 changes: 7 additions & 24 deletions node/src/actors/chain_manager/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ impl Handler<AddBlocks> for ChainManager {
);

let consensus_constants = self.consensus_constants();
let sender = msg.sender;

match self.sm_state {
StateMachine::WaitingConsensus | StateMachine::AlmostSynced => {
Expand Down Expand Up @@ -403,10 +404,7 @@ impl Handler<AddBlocks> for ChainManager {
let (batch_succeeded, num_processed_blocks) =
self.process_first_batch(ctx, &sync_target, &blocks);
if !batch_succeeded {
// We receive an invalid batch of blocks. So we need to throw away our
// outbound peers in order to find new ones that can give us the blocks
// consolidated by the network
self.drop_all_outbounds();
self.drop_all_outbounds_and_ice_sender(sender);

return;
}
Expand All @@ -432,10 +430,7 @@ impl Handler<AddBlocks> for ChainManager {
let (batch_succeeded, num_processed_blocks) =
self.process_first_batch(ctx, &sync_target, &consolidate_blocks);
if !batch_succeeded {
// We receive an invalid batch of blocks. So we need to throw away our
// outbound peers in order to find new ones that can give us the blocks
// consolidated by the network
self.drop_all_outbounds();
self.drop_all_outbounds_and_ice_sender(sender);

return;
}
Expand Down Expand Up @@ -468,10 +463,7 @@ impl Handler<AddBlocks> for ChainManager {
// Process remaining blocks
let (batch_succeeded, num_processed_blocks) = act.process_blocks_batch(ctx, &sync_target, &remainig_blocks);
if !batch_succeeded {
// We receive an invalid batch of blocks. So we need to throw away our
// outbound peers in order to find new ones that can give us the blocks
// consolidated by the network
act.drop_all_outbounds();
act.drop_all_outbounds_and_ice_sender(sender);

return actix::fut::err(());
}
Expand All @@ -494,10 +486,7 @@ impl Handler<AddBlocks> for ChainManager {
let (batch_succeeded, num_processed_blocks) =
self.process_first_batch(ctx, &sync_target, &consolidate_blocks);
if !batch_succeeded {
// We receive an invalid batch of blocks. So we need to throw away our
// outbound peers in order to find new ones that can give us the blocks
// consolidated by the network
self.drop_all_outbounds();
self.drop_all_outbounds_and_ice_sender(sender);

return;
}
Expand Down Expand Up @@ -530,10 +519,7 @@ impl Handler<AddBlocks> for ChainManager {
// Process remaining blocks
let (batch_succeeded, num_processed_blocks) = act.process_blocks_batch(ctx, &sync_target, &candidate_blocks);
if !batch_succeeded {
// We receive an invalid batch of blocks. So we need to throw away our
// outbound peers in order to find new ones that can give us the blocks
// consolidated by the network
act.drop_all_outbounds();
act.drop_all_outbounds_and_ice_sender(sender);

act.update_state_machine(StateMachine::WaitingConsensus);

Expand Down Expand Up @@ -579,10 +565,7 @@ impl Handler<AddBlocks> for ChainManager {
// Process remaining blocks
let (batch_succeeded, num_processed_blocks) = act.process_blocks_batch(ctx, &sync_target, &remaining_blocks);
if !batch_succeeded {
// We receive an invalid batch of blocks. So we need to throw away our
// outbound peers in order to find new ones that can give us the blocks
// consolidated by the network
act.drop_all_outbounds();
act.drop_all_outbounds_and_ice_sender(sender);

log::error!("Received invalid blocks batch...");
act.update_state_machine(StateMachine::WaitingConsensus);
Expand Down
31 changes: 27 additions & 4 deletions node/src/actors/chain_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ use crate::{
json_rpc::JsonRpcServer,
messages::{
AddItem, AddItems, AddTransaction, Anycast, BlockNotify, Broadcast, DropOutboundPeers,
GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, SendInventoryItem,
SendInventoryRequest, SendLastBeacon, SendSuperBlockVote, StoreInventoryItem,
SuperBlockNotify,
GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, RemoveAddressesFromTried,
SendInventoryItem, SendInventoryRequest, SendLastBeacon, SendSuperBlockVote,
StoreInventoryItem, SuperBlockNotify,
},
peers_manager::PeersManager,
sessions_manager::SessionsManager,
storage_keys,
},
Expand Down Expand Up @@ -1842,7 +1843,7 @@ impl ChainManager {
Box::new(fut)
}

/// This function send a message to SessionsManager to drop all outbounds peers
/// Send a message to `SessionsManager` to drop all outbound peers.
pub fn drop_all_outbounds(&self) {
let peers_to_unregister = self
.last_received_beacons
Expand All @@ -1854,6 +1855,28 @@ impl ChainManager {
peers_to_drop: peers_to_unregister,
});
}

/// Send a message to `PeersManager` to ice a specific peer.
pub fn ice_peer(&self, addr: Option<SocketAddr>) {
if let Some(addr) = addr {
let peers_manager_addr = PeersManager::from_registry();
peers_manager_addr.do_send(RemoveAddressesFromTried {
addresses: vec![addr],
ice: true,
});
}
}

/// Execute `drop_all_outbounds` and `ice_peer` at once.
///
/// This is called when we receive an invalid batch of blocks. It will throw away our outbound
/// peers in order to find new ones that can give us the blocks consolidated by the network,
/// and ice the node that sent the invalid batch.
pub fn drop_all_outbounds_and_ice_sender(&self, sender: Option<SocketAddr>) {
self.drop_all_outbounds();
// Ice the invalid blocks' batch sender
self.ice_peer(sender);
}
}

/// Helper struct used to persist an old copy of the `ChainState` to the storage
Expand Down
2 changes: 2 additions & 0 deletions node/src/actors/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl Message for GetSuperBlockVotes {
pub struct AddBlocks {
/// Blocks
pub blocks: Vec<Block>,
/// Sender peer
pub sender: Option<SocketAddr>,
}

impl Message for AddBlocks {
Expand Down
5 changes: 4 additions & 1 deletion node/src/actors/session/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ impl Actor for Session {
// Get ChainManager address
let chain_manager_addr = ChainManager::from_registry();

chain_manager_addr.do_send(AddBlocks { blocks: vec![] });
chain_manager_addr.do_send(AddBlocks {
blocks: vec![],
sender: None,
});
log::warn!("Session disconnected during block exchange");
}

Expand Down
6 changes: 5 additions & 1 deletion node/src/actors/session/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ impl Handler<EpochNotification<EveryEpochPayload>> for Session {
// Get ChainManager address
let chain_manager_addr = ChainManager::from_registry();

chain_manager_addr.do_send(AddBlocks { blocks: vec![] });
chain_manager_addr.do_send(AddBlocks {
blocks: vec![],
sender: None,
});
log::warn!("Timeout for waiting blocks achieved");
ctx.stop();
}
Expand Down Expand Up @@ -667,6 +670,7 @@ fn inventory_process_block(session: &mut Session, _ctx: &mut Context<Session>, b
// Send a message to the ChainManager to try to add a new block
chain_manager_addr.do_send(AddBlocks {
blocks: blocks_vector,
sender: session.public_addr,
});

// Clear requested block structures
Expand Down

0 comments on commit 1790269

Please sign in to comment.