Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove protoc dep, use new MerkleTree methods, fix Agreement. #32

Merged
merged 2 commits into from
May 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ serde = "1.0.55"
serde_derive = { version = "1.0.55", optional = true }

[features]
serialization-protobuf = [ "protobuf", "protoc-rust" ]
serialization-protobuf = [ "protobuf", "protobuf-codegen-pure" ]
serialization-serde = [ "merkle/serialization-serde", "serde_derive" ]

[build-dependencies]
protoc-rust = { version = "1.6.0", optional = true }
protobuf-codegen-pure = { version = "1.6.0", optional = true }

[dev-dependencies]
colored = "1.6"
Expand Down
40 changes: 1 addition & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,8 @@ An example is included to run a simulation of a network:

$ cargo run --example simulation -- --nodes 10 --faulty 1 --txs 1000 --batch 100

# Requirements

`hbbft` has optional protobuf support. To use protobuf, enable the feature
`serialization-protobuf` in your `Cargo.toml`, and make sure you have
Google's Protocol Buffer Compiler, `protoc` binary, located somewhere in
your `$PATH`. You must be using Protocol Buffer Compiler version 3 or greater.
Running any of the following install methods will save a `protoc` binary at
`/usr/local/bin/protoc`.

*Note:* as of writing this, the latest stable release of `protoc` is
v3.5.1. You can find out what is the latest compiler version is
[here](https://github.com/google/protobuf/releases), if you are not
installing `protoc` on Debian 9 or Ubuntu 17, change your cURL URL and zip
file names accordingly.

## Installing `protoc` on Debian 9 (Strech) and Ubuntu 17 (Artful)

$ sudo apt-get update
$ sudo apt-get install -y protobuf-compiler

## Installing `protoc` on other versions of Debian and Ubuntu

$ sudo apt-get update
$ sudo apt-get install -y unzip
$ cd <some temporary working directory>
$ curl -OL https://github.com/google/protobuf/releases/download/v3.5.1/protoc-3.5.1-linux-x86_64.zip
$ sudo unzip protoc-3.5.1-linux-x86_64.zip -d /usr/local bin/protoc
$ sudo chown $(whoami) /usr/local/bin/protoc
$ rm protoc-3.5.1-linux-x86_64.zip

## Installing `protoc` on OSX

$ cd <some temporary working directory>
$ curl -OL https://github.com/google/protobuf/releases/download/v3.5.1/protoc-3.5.1-osx-x86_64.zip
$ sudo unzip protoc-3.5.1-osx-x86_64.zip -d /usr/local bin/protoc
$ rm protoc-3.5.1-osx-x86_64.zip

# Building

Once you have verified that the `protoc` binary is in your `$PATH`, you can
build `hbbft` using cargo:
You can build `hbbft` using cargo:

$ cargo build [--release]
24 changes: 2 additions & 22 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,10 @@
#[cfg(feature = "serialization-protobuf")]
mod feature_protobuf {
extern crate protoc_rust;

use std::env;

fn protoc_exists() -> bool {
let name = "PATH";
match env::var_os(name) {
Some(paths) => {
for path in env::split_paths(&paths) {
if path.join("protoc").exists() {
return true;
}
}
}
None => println!("PATH environment variable is not defined."),
}
false
}
extern crate protobuf_codegen_pure;

pub fn main() {
if !protoc_exists() {
panic!("protoc cannot be found. Install the protobuf compiler in the system path.");
}
println!("cargo:rerun-if-changed=proto/message.proto");
protoc_rust::run(protoc_rust::Args {
protobuf_codegen_pure::run(protobuf_codegen_pure::Args {
out_dir: "src/proto",
input: &["proto/message.proto"],
includes: &["proto"],
Expand Down
99 changes: 59 additions & 40 deletions src/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use itertools::Itertools;
use std::collections::{BTreeSet, HashMap, VecDeque};
use std::fmt::Debug;
use std::hash::Hash;
use std::mem;

use messaging::{DistAlgorithm, Target, TargetedMessage};

Expand All @@ -17,6 +18,15 @@ pub enum AgreementMessage {
Aux(u32, bool),
}

impl AgreementMessage {
fn epoch(&self) -> u32 {
match *self {
AgreementMessage::BVal(epoch, _) => epoch,
AgreementMessage::Aux(epoch, _) => epoch,
}
}
}

/// Binary Agreement instance
pub struct Agreement<NodeUid> {
/// The UID of the corresponding proposer node.
Expand Down Expand Up @@ -44,6 +54,9 @@ pub struct Agreement<NodeUid> {
/// ever there at all. While the output value will still be required in a later epoch to decide
/// the termination state.
decision: Option<bool>,
/// A cache for messages for future epochs that cannot be handled yet.
// TODO: Find a better solution for this; defend against spam.
incoming_queue: Vec<(NodeUid, AgreementMessage)>,
/// Termination flag. The Agreement instance doesn't terminate immediately
/// upon deciding on the agreed value. This is done in order to help other
/// nodes decide despite asynchrony of communication. Once the instance
Expand Down Expand Up @@ -71,18 +84,20 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
sender_id: &Self::NodeUid,
message: Self::Message,
) -> Result<(), Self::Error> {
if self.terminated {
return Err(Error::Terminated);
}
if message.epoch() < self.epoch {
return Ok(()); // Message is obsolete: We are already in a later epoch.
}
if message.epoch() > self.epoch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be else if.

// Message is for a later epoch. We can't handle that yet.
self.incoming_queue.push((sender_id.clone(), message));
return Ok(());
}
match message {
// The algorithm instance has already terminated.
_ if self.terminated => Err(Error::Terminated),

AgreementMessage::BVal(epoch, b) if epoch == self.epoch => {
self.handle_bval(sender_id, b)
}

AgreementMessage::Aux(epoch, b) if epoch == self.epoch => self.handle_aux(sender_id, b),

// Epoch does not match. Ignore the message.
_ => Ok(()),
AgreementMessage::BVal(_, b) => self.handle_bval(sender_id, b),
AgreementMessage::Aux(_, b) => self.handle_aux(sender_id, b),
}
}

Expand All @@ -108,7 +123,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU
}
}

impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> {
pub fn new(uid: NodeUid, num_nodes: usize) -> Self {
let num_faulty_nodes = (num_nodes - 1) / 3;

Expand All @@ -124,6 +139,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
estimated: None,
output: None,
decision: None,
incoming_queue: Vec::new(),
terminated: false,
messages: VecDeque::new(),
}
Expand All @@ -134,20 +150,16 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
if self.epoch != 0 || self.estimated.is_some() {
return Err(Error::InputNotAccepted);
}
if self.num_nodes == 1 {
self.decision = Some(input);
self.output = Some(input);
self.terminated = true;
}

// Set the initial estimated value to the input value.
self.estimated = Some(input);
// Record the input value as sent.
self.sent_bval.insert(input);
// Receive the BVAL message locally.
self.received_bval
.entry(self.uid.clone())
.or_insert_with(BTreeSet::new)
.insert(input);
// Multicast BVAL
self.messages
.push_back(AgreementMessage::BVal(self.epoch, input));
Ok(())
self.send_bval(input)
}

/// Acceptance check to be performed before setting the input value.
Expand Down Expand Up @@ -178,32 +190,35 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
self.messages
.push_back(AgreementMessage::Aux(self.epoch, b));
// Receive the AUX message locally.
self.received_aux.insert(self.uid.clone(), b);
let our_uid = self.uid.clone();
self.handle_aux(&our_uid, b)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense too.

}

self.try_coin();
self.try_coin()?;
}
// upon receiving BVAL_r(b) messages from f + 1 nodes, if
// BVAL_r(b) has not been sent, multicast BVAL_r(b)
else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) {
// Record the value `b` as sent.
self.sent_bval.insert(b);
// Receive the BVAL message locally.
self.received_bval
.entry(self.uid.clone())
.or_insert_with(BTreeSet::new)
.insert(b);
// Multicast BVAL.
self.messages
.push_back(AgreementMessage::BVal(self.epoch, b));
self.send_bval(b)?;
}
Ok(())
}

fn send_bval(&mut self, b: bool) -> Result<(), Error> {
// Record the value `b` as sent.
self.sent_bval.insert(b);
// Multicast BVAL.
self.messages
.push_back(AgreementMessage::BVal(self.epoch, b));
// Receive the BVAL message locally.
let our_uid = self.uid.clone();
self.handle_bval(&our_uid, b)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well spotted!

}

fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> {
self.received_aux.insert(sender_id.clone(), b);
if !self.bin_values.is_empty() {
self.try_coin();
self.try_coin()?;
}
Ok(())
}
Expand Down Expand Up @@ -237,11 +252,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
/// to compute the next decision estimate and outputs the optional decision
/// value. The function may start the next epoch. In that case, it also
/// returns a message for broadcast.
fn try_coin(&mut self) {
fn try_coin(&mut self) -> Result<(), Error> {
let (count_aux, vals) = self.count_aux();
if count_aux < self.num_nodes - self.num_faulty_nodes {
// Continue waiting for the (N - f) AUX messages.
return;
return Ok(());
}

debug!("{:?} try_coin in epoch {}", self.uid, self.epoch);
Expand All @@ -255,6 +270,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
self.terminated = self.terminated || self.decision == Some(coin);
if self.terminated {
debug!("Agreement instance {:?} terminated", self.uid);
return Ok(());
}

// Start the next epoch.
Expand Down Expand Up @@ -286,9 +302,12 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> {
};

let b = self.estimated.unwrap();
self.sent_bval.insert(b);
self.messages
.push_back(AgreementMessage::BVal(self.epoch, b));
self.send_bval(b)?;
let queued_msgs = mem::replace(&mut self.incoming_queue, Vec::new());
for (sender_id, msg) in queued_msgs {
self.handle_message(&sender_id, msg)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is one possible place for introducing spam control. I've no idea though how to avoid spam apart from using a set instead of Vec of queued messages and filtering out messages with unknown sender IDs. The latter is already done at the top level.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. Basically we just need to store per epoch: who sent us which BVals and which Aux.

We still might want to limit the number of future epochs somehow. On the other hand, that wouldn't be strictly correct…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a new issue for this: #43

}
Ok(())
}
}

Expand Down
58 changes: 4 additions & 54 deletions src/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use fmt::{HexBytes, HexList, HexProof};
use merkle::proof::{Lemma, Positioned, Proof};
use merkle::MerkleTree;
use merkle::{MerkleTree, Proof};
use reed_solomon_erasure as rse;
use reed_solomon_erasure::ReedSolomon;
use ring::digest;
#[cfg(feature = "serialization-serde")]
use serde::{Deserialize, Deserializer};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::fmt::{self, Debug};
use std::iter;
Expand All @@ -14,27 +11,15 @@ use messaging::{DistAlgorithm, Target, TargetedMessage};

/// The three kinds of message sent during the reliable broadcast stage of the
/// consensus algorithm.
#[cfg_attr(feature = "serialization-serde", derive(Serialize))]
#[cfg_attr(feature = "serialization-serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq)]
pub enum BroadcastMessage {
#[cfg_attr(feature = "serialization-serde", serde(deserialize_with = "deserialize_proof"))]
Value(Proof<Vec<u8>>),
#[cfg_attr(feature = "serialization-serde", serde(deserialize_with = "deserialize_proof"))]
Echo(Proof<Vec<u8>>),
Ready(Vec<u8>),
}

#[cfg(feature = "serialization-serde")]
#[allow(unused)]
fn deserialize_proof<'de, D>(d: D) -> Result<Proof<Vec<u8>>, D::Error>
where
D: Deserializer<'de>,
{
let data: ::merkle::proof::ProofData<Vec<u8>> = Deserialize::deserialize(d)?;
Ok(data.into_proof(&digest::SHA256))
}

impl fmt::Debug for BroadcastMessage {
impl Debug for BroadcastMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
BroadcastMessage::Value(ref v) => write!(f, "Value({:?})", HexProof(&v)),
Expand Down Expand Up @@ -381,11 +366,6 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
self.all_uids.iter().position(|id| id == node_id)
}

/// Returns the index of this proof's leave in the Merkle tree.
fn index_of_proof(&self, proof: &Proof<Vec<u8>>) -> usize {
index_of_lemma(&proof.lemma, self.num_nodes)
}

/// Returns `true` if the proof is valid and has the same index as the node ID. Otherwise
/// logs an info message.
fn validate_proof(&self, p: &Proof<Vec<u8>>, id: &N) -> bool {
Expand All @@ -397,7 +377,7 @@ impl<N: Eq + Debug + Clone + Ord> Broadcast<N> {
);
false
} else if self.index_of_node(id) != Some(p.value[0] as usize)
|| self.index_of_proof(&p) != p.value[0] as usize
|| p.index(self.num_nodes) != p.value[0] as usize
{
info!(
"Node {:?} received proof for wrong position: {:?}.",
Expand Down Expand Up @@ -556,33 +536,3 @@ where

t[1..(payload_len + 1)].to_vec().into()
}

/// Computes the Merkle tree leaf index of a value in a given lemma.
pub fn index_of_lemma(lemma: &Lemma, n: usize) -> usize {
let m = n.next_power_of_two();
match (lemma.sub_lemma.as_ref(), lemma.sibling_hash.as_ref()) {
(None, Some(&Positioned::Right(_))) | (None, None) => 0,
(None, Some(&Positioned::Left(_))) => 1,
(Some(l), None) => index_of_lemma(l, n),
(Some(l), Some(&Positioned::Left(_))) => (m >> 1) + index_of_lemma(l, n - (m >> 1)),
(Some(l), Some(&Positioned::Right(_))) => index_of_lemma(l, m >> 1),
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_index_of_lemma() {
for &n in &[3, 4, 13, 16, 127, 128, 129, 255] {
let shards: Vec<[u8; 1]> = (0..n).map(|i| [i as u8]).collect();
let mtree = MerkleTree::from_vec(&digest::SHA256, shards);
for (i, val) in mtree.iter().enumerate() {
let p = mtree.gen_proof(val.clone()).expect("generate proof");
let idx = index_of_lemma(&p.lemma, n);
assert_eq!(i, idx, "Wrong index {} for leaf {}/{}.", idx, i, n);
}
}
}
}
Loading