Skip to content

Commit

Permalink
work on review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vkomenda committed Nov 1, 2018
1 parent 8247d33 commit b1346be
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 94 deletions.
16 changes: 10 additions & 6 deletions examples/simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl EpochInfo {
let txs = batch.iter().unique().count();
println!(
"{:>5} {:6} {:6} {:5} {:9} {:>9}B",
batch.seqnum().to_string().cyan(),
batch.epoch().to_string().cyan(),
min_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000,
max_t.as_secs() * 1000 + u64::from(max_t.subsec_nanos()) / 1_000_000,
txs,
Expand All @@ -397,7 +397,7 @@ fn simulate_honey_badger(mut network: TestNetwork<QHB>) {
let mut epochs = Vec::new();
while let Some(id) = network.step() {
for &(time, ref batch) in &network.nodes[&id].outputs {
let epoch = batch.seqnum() as usize;
let epoch = batch.epoch() as usize;
if epochs.len() <= epoch {
epochs.resize(epoch + 1, EpochInfo::default());
}
Expand Down Expand Up @@ -437,14 +437,18 @@ fn main() {
.map(|_| Transaction::new(args.flag_tx_size))
.collect();
let new_honey_badger = |netinfo: NetworkInfo<NodeId>| {
let dhb = DynamicHoneyBadger::builder().build(netinfo.clone());
let our_id = *netinfo.our_id();
let peer_ids: Vec<_> = netinfo
.all_ids()
.filter(|&&them| them != our_id)
.cloned()
.collect();
let dhb = DynamicHoneyBadger::builder().build(netinfo);
let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb)
.batch_size(args.flag_b)
.build_with_transactions(txs.clone(), rand::thread_rng().gen::<Isaac64Rng>())
.expect("instantiate QueueingHoneyBadger");
let our_id = *netinfo.our_id();
let peer_ids = netinfo.all_ids().filter(|&&them| them != our_id).cloned();
let (sq, mut step) = SenderQueue::builder(qhb, peer_ids).build(our_id);
let (sq, mut step) = SenderQueue::builder(qhb, peer_ids.into_iter()).build(our_id);
step.extend_with(qhb_step, Message::from);
(sq, step)
};
Expand Down
29 changes: 7 additions & 22 deletions src/dynamic_honey_badger/batch.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use super::{ChangeState, Epoch, JoinPlan};
use {Epoched, NetworkInfo, NodeIdT};
use super::{ChangeState, JoinPlan};
use {NetworkInfo, NodeIdT};

/// A batch of transactions the algorithm has output.
#[derive(Clone, Debug)]
pub struct Batch<C, N> {
/// The sequence number: there is exactly one batch in each epoch.
pub(super) seqnum: u64,
pub(super) epoch: u64,
/// The current `DynamicHoneyBadger` era.
pub(super) era: u64,
/// The user contributions committed in this epoch.
Expand All @@ -20,25 +20,10 @@ pub struct Batch<C, N> {
pub(super) netinfo: Arc<NetworkInfo<N>>,
}

impl<C, N: NodeIdT> Epoched for Batch<C, N> {
type Epoch = Epoch;

/// Returns the **next** `DynamicHoneyBadger` epoch after the sequential epoch of the batch.
fn epoch(&self) -> Epoch {
let seqnum = self.seqnum;
let era = self.era;
if self.change == ChangeState::None {
Epoch(era, Some(seqnum - era + 1))
} else {
Epoch(seqnum + 1, Some(0))
}
}
}

impl<C, N: NodeIdT> Batch<C, N> {
/// Returns the linear epoch of this `DynamicHoneyBadger` batch.
pub fn seqnum(&self) -> u64 {
self.seqnum
pub fn epoch(&self) -> u64 {
self.epoch
}

/// Returns the `DynamicHoneyBadger` era of the batch.
Expand Down Expand Up @@ -109,7 +94,7 @@ impl<C, N: NodeIdT> Batch<C, N> {
return None;
}
Some(JoinPlan {
era: self.seqnum + 1,
era: self.epoch + 1,
change: self.change.clone(),
pub_key_set: self.netinfo.public_key_set().clone(),
pub_keys: self.netinfo.public_key_map().clone(),
Expand All @@ -122,7 +107,7 @@ impl<C, N: NodeIdT> Batch<C, N> {
where
C: PartialEq,
{
self.seqnum == other.seqnum
self.epoch == other.epoch
&& self.era == other.era
&& self.contributions == other.contributions
&& self.change == other.change
Expand Down
8 changes: 4 additions & 4 deletions src/dynamic_honey_badger/dynamic_honey_badger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ where
let output = step.extend_with(hb_step, |hb_msg| Message::HoneyBadger(self.era, hb_msg));
for hb_batch in output {
let batch_era = self.era;
let batch_seqnum = hb_batch.epoch + batch_era;
let batch_epoch = hb_batch.epoch + batch_era;
let mut batch_contributions = BTreeMap::new();

// Add the user transactions to `batch` and handle votes and DKG messages.
Expand Down Expand Up @@ -282,17 +282,17 @@ where
// If DKG completed, apply the change, restart Honey Badger, and inform the user.
debug!("{:?} DKG for {:?} complete!", self.our_id(), kgs.change);
self.netinfo = kgs.key_gen.into_network_info()?;
self.restart_honey_badger(batch_seqnum + 1);
self.restart_honey_badger(batch_epoch + 1);
ChangeState::Complete(kgs.change)
} else if let Some(change) = self.vote_counter.compute_winner().cloned() {
// If there is a new change, restart DKG. Inform the user about the current change.
step.extend(self.update_key_gen(batch_seqnum + 1, &change)?);
step.extend(self.update_key_gen(batch_epoch + 1, &change)?);
ChangeState::InProgress(change)
} else {
ChangeState::None
};
step.output.push(Batch {
seqnum: batch_seqnum,
epoch: batch_epoch,
era: batch_era,
change,
netinfo: Arc::new(self.netinfo.clone()),
Expand Down
11 changes: 9 additions & 2 deletions src/dynamic_honey_badger/sender_queueable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ where
}
}

fn convert_epoch(&self) -> Epoch {
self.epoch()
fn next_epoch(&self) -> Epoch {
let epoch = self.epoch;
let era = self.era;
if self.change == ChangeState::None {
Epoch(era, Some(epoch - era + 1))
} else {
Epoch(epoch + 1, Some(0))
}
}
}

Expand All @@ -41,6 +47,7 @@ where
(Some(us), Some(them)) => them <= us && us <= them + max_future_epochs,
(None, Some(_)) => true,
(_, None) => {
// TODO: return a Fault.
error!("Peer's Honey Badger epoch undefined");
false
}
Expand Down
11 changes: 1 addition & 10 deletions src/honey_badger/batch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::BTreeMap;

use {Epoched, NodeIdT};
use NodeIdT;

/// A batch of contributions the algorithm has output.
#[derive(Clone, Debug)]
Expand All @@ -9,15 +9,6 @@ pub struct Batch<C, N> {
pub contributions: BTreeMap<N, C>,
}

impl<C, N: NodeIdT> Epoched for Batch<C, N> {
type Epoch = u64;

/// Returns the **next** `HoneyBadger` epoch after the sequential epoch of the batch.
fn epoch(&self) -> u64 {
self.epoch + 1
}
}

impl<C, N: NodeIdT> Batch<C, N> {
/// Returns an iterator over references to all transactions included in the batch.
pub fn iter<'a>(&'a self) -> impl Iterator<Item = <&'a C as IntoIterator>::Item>
Expand Down
4 changes: 2 additions & 2 deletions src/honey_badger/sender_queueable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ where
None
}

fn convert_epoch(&self) -> u64 {
self.epoch()
fn next_epoch(&self) -> u64 {
self.epoch + 1
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/sender_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait SenderQueueableMessage: Epoched {
fn is_obsolete(&self, them: <Self as Epoched>::Epoch) -> bool;
}

pub trait SenderQueueableOutput<N, M>: Epoched
pub trait SenderQueueableOutput<N, M>
where
N: NodeIdT,
M: Epoched,
Expand All @@ -34,8 +34,8 @@ where
/// all nodes.
fn added_node(&self) -> Option<N>;

/// Performs type conversion of the batch epoch info a fixed epoch type.
fn convert_epoch(&self) -> <M as Epoched>::Epoch;
/// Computes the next epoch after the `DynamicHoneyBadger` epoch of the batch.
fn next_epoch(&self) -> <M as Epoched>::Epoch;
}

pub trait SenderQueueableEpoch
Expand Down Expand Up @@ -250,7 +250,7 @@ where
fn update_epoch(&mut self, step: &::Step<D>) -> Step<D> {
// Look up `DynamicHoneyBadger` epoch updates and collect any added peers.
let new_epoch = step.output.iter().fold(self.epoch, |epoch, batch| {
let max_epoch = epoch.max(batch.convert_epoch());
let max_epoch = epoch.max(batch.next_epoch());
if let Some(node) = batch.added_node() {
if &node != self.our_id() {
self.peer_epochs
Expand Down
72 changes: 29 additions & 43 deletions src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Common supertraits for distributed algorithms.
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeMap;
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::iter::once;
Expand Down Expand Up @@ -223,7 +223,6 @@ where
<D as DistAlgorithm>::NodeId: NodeIdT + Rand,
<D as DistAlgorithm>::Message:
'i + Clone + SenderQueueableMessage + Serialize + DeserializeOwned,
<D as DistAlgorithm>::Output: Epoched,
{
/// Removes and returns any messages that are not yet accepted by remote nodes according to the
/// mapping `remote_epochs`. This way the returned messages are postponed until later, and the
Expand All @@ -237,59 +236,46 @@ where
<D as DistAlgorithm>::NodeId: 'i,
{
let messages = &mut self.messages;
let (mut passed_msgs, failed_msgs): (Vec<_>, Vec<_>) =
messages
.drain(..)
.partition(|TargetedMessage { target, message }| match target {
let pass =
|TargetedMessage { target, message }: &TargetedMessage<D::Message, D::NodeId>| {
match target {
Target::All => peer_epochs
.values()
.all(|&them| message.is_accepted(them, max_future_epochs)),
Target::Node(id) => peer_epochs
.get(&id)
.map_or(false, |&them| message.is_accepted(them, max_future_epochs)),
});
}
};
// `Target::All` messages contained in the result of the partitioning are analyzed further
// and each split into two sets of point messages: those which can be sent without delay and
// those which should be postponed.
let remote_nodes: BTreeSet<&D::NodeId> = peer_epochs.keys().collect();
let mut deferred_msgs: Vec<(D::NodeId, D::Message)> = Vec::new();
for msg in failed_msgs {
let m = msg.message;
match msg.target {
Target::Node(id) => {
let defer = {
let lagging = |&them| {
!(m.is_accepted(them, max_future_epochs) || m.is_obsolete(them))
let mut passed_msgs: Vec<_> = Vec::new();
for msg in messages.drain(..) {
if pass(&msg) {
passed_msgs.push(msg);
} else {
let m = msg.message;
match msg.target {
Target::Node(ref id) => {
let defer = {
let lagging = |&them| {
!(m.is_accepted(them, max_future_epochs) || m.is_obsolete(them))
};
peer_epochs.get(&id).map_or(true, lagging)
};
peer_epochs.get(&id).map_or(true, lagging)
};
if defer {
deferred_msgs.push((id, m));
}
}
Target::All => {
let isnt_earlier_epoch =
|&them| m.is_accepted(them, max_future_epochs) || m.is_obsolete(them);
let lagging = |them| !isnt_earlier_epoch(them);
let accepts = |&them| m.is_accepted(them, max_future_epochs);
let accepting_nodes: BTreeSet<&D::NodeId> = peer_epochs
.iter()
.filter(|(_, them)| accepts(them))
.map(|(id, _)| id)
.collect();
let non_lagging_nodes: BTreeSet<&D::NodeId> = peer_epochs
.iter()
.filter(|(_, them)| isnt_earlier_epoch(them))
.map(|(id, _)| id)
.collect();
for &id in &accepting_nodes {
passed_msgs.push(Target::Node(id.clone()).message(m.clone()));
if defer {
deferred_msgs.push((id.clone(), m));
}
}
let lagging_nodes: BTreeSet<_> =
remote_nodes.difference(&non_lagging_nodes).collect();
for &id in lagging_nodes {
if peer_epochs.get(id).map_or(true, lagging) {
deferred_msgs.push((id.clone(), m.clone()));
Target::All => {
for (id, &them) in peer_epochs {
if m.is_accepted(them, max_future_epochs) {
passed_msgs.push(Target::Node(id.clone()).message(m.clone()));
} else if !m.is_obsolete(them) {
deferred_msgs.push((id.clone(), m.clone()));
}
}
}
}
Expand Down
1 change: 0 additions & 1 deletion tests/net_dynamic_hb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{collections, time};

use hbbft::dynamic_honey_badger::{Change, ChangeState, DynamicHoneyBadger, Input};
use hbbft::sender_queue::SenderQueue;
use hbbft::Epoched;
use net::adversary::ReorderingAdversary;
use net::proptest::{gen_seed, NetworkDimension, TestRng, TestRngSeed};
use net::{NetBuilder, NewNodeInfo};
Expand Down

0 comments on commit b1346be

Please sign in to comment.