diff --git a/Cargo.toml b/Cargo.toml index f0aad989..98f477ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,13 +9,14 @@ log = "0.4.1" reed-solomon-erasure = "3.0" merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" } ring = "^0.12" -rand = "*" protobuf = "1.4.4" crossbeam = "0.3.2" crossbeam-channel = "0.1" +itertools = "0.7" [build-dependencies] protoc-rust = "1.4.4" [dev-dependencies] docopt = "0.8" +rand = "0.3" diff --git a/TODO b/TODO new file mode 100644 index 00000000..e79c0e14 --- /dev/null +++ b/TODO @@ -0,0 +1,12 @@ +TODO +==== + +* Fix the inappropriate use of Common Coin in the Byzantine Agreement protocol + + This bug is explained in https://github.com/amiller/HoneyBadgerBFT/issues/59 + where a solution is suggested introducing an additional type of message, + CONF. + + There may be alternative solutions, such as using a different Byzantine + Agreement protocol altogether, for example, + https://people.csail.mit.edu/silvio/Selected%20Scientific%20Papers/Distributed%20Computation/BYZANTYNE%20AGREEMENT%20MADE%20TRIVIAL.pdf \ No newline at end of file diff --git a/proto/message.proto b/proto/message.proto index ff5bfd7b..dbfb0fb3 100644 --- a/proto/message.proto +++ b/proto/message.proto @@ -41,12 +41,12 @@ message LemmaProto { bytes left_sibling_hash = 3; bytes right_sibling_hash = 4; } - } message AgreementProto { + uint32 epoch = 1; oneof payload { - bool bval = 1; - bool aux = 2; + bool bval = 2; + bool aux = 3; } } \ No newline at end of file diff --git a/src/agreement.rs b/src/agreement.rs index 0c733bed..195bd36a 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,42 +1,295 @@ //! Binary Byzantine agreement protocol from a common coin protocol. -use proto::AgreementMessage; -use std::collections::{BTreeSet, VecDeque}; +use itertools::Itertools; +use std::collections::{BTreeSet, HashMap, VecDeque}; +use std::hash::Hash; -#[derive(Default)] -pub struct Agreement { - input: Option, - _bin_values: BTreeSet, +use proto::message; + +/// Type of output from the Agreement message handler. The first component is +/// the value on which the Agreement has decided, also called "output" in the +/// HoneyadgerBFT paper. The second component is a queue of messages to be sent +/// to remote nodes as a result of handling the incomming message. +type AgreementOutput = (Option, VecDeque); + +/// Messages sent during the binary Byzantine agreement stage. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum AgreementMessage { + /// BVAL message with an epoch. + BVal((u32, bool)), + /// AUX message with an epoch. + Aux((u32, bool)), +} + +impl AgreementMessage { + pub fn into_proto(self) -> message::AgreementProto { + let mut p = message::AgreementProto::new(); + match self { + AgreementMessage::BVal((e, b)) => { + p.set_epoch(e); + p.set_bval(b); + } + AgreementMessage::Aux((e, b)) => { + p.set_epoch(e); + p.set_aux(b); + } + } + p + } + + // TODO: Re-enable lint once implemented. + #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] + pub fn from_proto(mp: message::AgreementProto) -> Option { + let epoch = mp.get_epoch(); + if mp.has_bval() { + Some(AgreementMessage::BVal((epoch, mp.get_bval()))) + } else if mp.has_aux() { + Some(AgreementMessage::Aux((epoch, mp.get_aux()))) + } else { + None + } + } +} + +/// Binary Agreement instance. +pub struct Agreement { + /// The UID of the corresponding proposer node. + uid: NodeUid, + num_nodes: usize, + num_faulty_nodes: usize, + epoch: u32, + /// Bin values. Reset on every epoch update. + bin_values: BTreeSet, + /// Values received in BVAL messages. Reset on every epoch update. + received_bval: HashMap>, + /// Sent BVAL values. Reset on every epoch update. + sent_bval: BTreeSet, + /// Values received in AUX messages. Reset on every epoch update. + received_aux: HashMap, + /// The estimate of the decision value in the current epoch. + estimated: Option, + /// The value output by the agreement instance. It is set once to `Some(b)` + /// and then never changed. That is, no instance of Binary Agreement can + /// decide on two different values of output. + output: Option, + /// Termination flag. The Agreement instance doesn't terminate immediately + /// upon deciding on the agreed value. This is done in order to help other + /// nodes decide despite asynchrony of communication. Once the instance + /// determines that all the remote nodes have reached agreement, it sets the + /// `terminated` flag and accepts no more incoming messages. + terminated: bool, } -impl Agreement { - pub fn new() -> Self { +impl Agreement { + pub fn new(uid: NodeUid, num_nodes: usize) -> Self { + let num_faulty_nodes = (num_nodes - 1) / 3; + Agreement { - input: None, - _bin_values: BTreeSet::new(), + uid, + num_nodes, + num_faulty_nodes, + epoch: 0, + bin_values: BTreeSet::new(), + received_bval: HashMap::new(), + sent_bval: BTreeSet::new(), + received_aux: HashMap::new(), + estimated: None, + output: None, + terminated: false, } } - pub fn set_input(&mut self, input: bool) -> AgreementMessage { - self.input = Some(input); + /// Algorithm has terminated. + pub fn terminated(&self) -> bool { + self.terminated + } + + /// Sets the input value for agreement. + pub fn set_input(&mut self, input: bool) -> Result { + if self.epoch != 0 { + return Err(Error::InputNotAccepted); + } + + // Set the initial estimated value to the input value. + self.estimated = Some(input); + // Receive the BVAL message locally. + self.received_bval + .entry(self.uid.clone()) + .or_insert_with(BTreeSet::new) + .insert(input); // Multicast BVAL - AgreementMessage::BVal(input) + Ok(AgreementMessage::BVal((self.epoch, input))) } - pub fn has_input(&self) -> bool { - self.input.is_some() + /// Acceptance check to be performed before setting the input value. + pub fn accepts_input(&self) -> bool { + self.epoch == 0 && self.estimated.is_none() } /// Receive input from a remote node. - pub fn on_input( - &self, - _message: &AgreementMessage, - ) -> Result, Error> { - Err(Error::NotImplemented) + /// + /// Outputs an optional agreement result and a queue of agreement messages + /// to remote nodes. There can be up to 2 messages. + pub fn handle_agreement_message( + &mut self, + sender_id: &NodeUid, + message: &AgreementMessage, + ) -> Result { + match *message { + // The algorithm instance has already terminated. + _ if self.terminated => Err(Error::Terminated), + + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => { + self.handle_bval(sender_id, b) + } + + AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => { + self.handle_aux(sender_id, b) + } + + // Epoch does not match. Ignore the message. + _ => Ok((None, VecDeque::new())), + } + } + + fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result { + let mut outgoing = VecDeque::new(); + + self.received_bval + .entry(sender_id.clone()) + .or_insert_with(BTreeSet::new) + .insert(b); + let count_bval = self.received_bval + .values() + .filter(|values| values.contains(&b)) + .count(); + + // upon receiving BVAL_r(b) messages from 2f + 1 nodes, + // bin_values_r := bin_values_r ∪ {b} + if count_bval == 2 * self.num_faulty_nodes + 1 { + self.bin_values.insert(b); + + // wait until bin_values_r != 0, then multicast AUX_r(w) + // where w ∈ bin_values_r + if self.bin_values.len() == 1 { + // Send an AUX message at most once per epoch. + outgoing.push_back(AgreementMessage::Aux((self.epoch, b))); + // Receive the AUX message locally. + self.received_aux.insert(self.uid.clone(), b); + } + + let (decision, maybe_message) = self.try_coin(); + outgoing.extend(maybe_message); + Ok((decision, outgoing)) + } + // upon receiving BVAL_r(b) messages from f + 1 nodes, if + // BVAL_r(b) has not been sent, multicast BVAL_r(b) + else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { + outgoing.push_back(AgreementMessage::BVal((self.epoch, b))); + // Receive the BVAL message locally. + self.received_bval + .entry(self.uid.clone()) + .or_insert_with(BTreeSet::new) + .insert(b); + Ok((None, outgoing)) + } else { + Ok((None, outgoing)) + } + } + + fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result { + self.received_aux.insert(sender_id.clone(), b); + let mut outgoing = VecDeque::new(); + if !self.bin_values.is_empty() { + let (decision, maybe_message) = self.try_coin(); + outgoing.extend(maybe_message); + Ok((decision, outgoing)) + } else { + Ok((None, outgoing)) + } + } + + /// AUX_r messages such that the set of values carried by those messages is + /// a subset of bin_values_r. Outputs this subset. + /// + /// FIXME: Clarify whether the values of AUX messages should be the same or + /// not. It is assumed in `count_aux` that they can differ. + /// + /// In general, we can't expect every good node to send the same AUX value, + /// so waiting for N - f agreeing messages would not always terminate. We + /// can, however, expect every good node to send an AUX value that will + /// eventually end up in our bin_values. + fn count_aux(&self) -> (usize, BTreeSet) { + let (vals_cnt, vals) = self.received_aux + .values() + .filter(|b| self.bin_values.contains(b)) + .tee(); + + (vals_cnt.count(), vals.cloned().collect()) + } + + /// Waits until at least (N − f) AUX_r messages have been received, such that + /// the set of values carried by these messages, vals, are a subset of + /// bin_values_r (note that bin_values_r may continue to change as BVAL_r + /// messages are received, thus this condition may be triggered upon arrival + /// of either an AUX_r or a BVAL_r message). + /// + /// Once the (N - f) messages are received, gets a common coin and uses it + /// to compute the next decision estimate and outputs the optional decision + /// value. The function may start the next epoch. In that case, it also + /// returns a message for broadcast. + fn try_coin(&mut self) -> (Option, VecDeque) { + let (count_aux, vals) = self.count_aux(); + if count_aux < self.num_nodes - self.num_faulty_nodes { + // Continue waiting for the (N - f) AUX messages. + return (None, VecDeque::new()); + } + + // FIXME: Implement the Common Coin algorithm. At the moment the + // coin value is common across different nodes but not random. + let coin = (self.epoch % 2) == 0; + + // Check the termination condition: "continue looping until both a + // value b is output in some round r, and the value Coin_r' = b for + // some round r' > r." + self.terminated = self.terminated || self.output == Some(coin); + + // Start the next epoch. + self.bin_values.clear(); + self.received_aux.clear(); + self.epoch += 1; + + let decision = if vals.len() != 1 { + self.estimated = Some(coin); + None + } else { + // NOTE: `vals` has exactly one element due to `vals.len() == 1` + let v: Vec = vals.into_iter().collect(); + let b = v[0]; + self.estimated = Some(b); + // Outputting a value is allowed only once. + if self.output.is_none() && b == coin { + // Output the agreement value. + self.output = Some(b); + self.output + } else { + None + } + }; + + ( + decision, + vec![AgreementMessage::BVal(( + self.epoch, + self.estimated.unwrap(), + ))].into_iter() + .collect(), + ) } } #[derive(Clone, Debug)] pub enum Error { - NotImplemented, + Terminated, + InputNotAccepted, } diff --git a/src/common_subset.rs b/src/common_subset.rs index d92c5d49..cd6fc94f 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -8,28 +8,17 @@ use std::fmt::{Debug, Display}; use std::hash::Hash; use agreement; -use agreement::Agreement; +use agreement::{Agreement, AgreementMessage}; use broadcast; use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage}; -use proto::AgreementMessage; - // TODO: Make this a generic argument of `Broadcast`. type ProposedValue = Vec; - -/// Input from a remote node to Common Subset. -pub enum Input { - /// Message from a remote node `uid` to the broadcast instance `uid`. - Broadcast(NodeUid, BroadcastMessage), - /// Message from a remote node `uid` to the agreement instance `uid`. - Agreement(NodeUid, AgreementMessage), -} +// Type of output from the Common Subset message handler. +type CommonSubsetOutput = (Option>, VecDeque>); /// Output from Common Subset to remote nodes. -/// -/// FIXME: We can do an interface that doesn't need this type and instead works -/// directly with the `TargetBroadcastMessage` and `AgreementMessage`. pub enum Output { /// A broadcast message to be sent to the destination set in the /// `TargetedBroadcastMessage`. @@ -43,15 +32,15 @@ pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, - agreement_true_outputs: HashSet, broadcast_instances: HashMap>, - agreement_instances: HashMap, + agreement_instances: HashMap>, broadcast_results: HashMap, agreement_results: HashMap, } impl CommonSubset { - pub fn new(uid: NodeUid, all_uids: &HashSet, num_nodes: usize) -> Result { + pub fn new(uid: NodeUid, all_uids: &HashSet) -> Result { + let num_nodes = all_uids.len(); let num_faulty_nodes = (num_nodes - 1) / 3; // Create all broadcast instances. @@ -68,18 +57,17 @@ impl CommonSubset { } // Create all agreement instances. - let mut agreement_instances: HashMap = HashMap::new(); + let mut agreement_instances: HashMap> = HashMap::new(); for uid0 in all_uids { - agreement_instances.insert(uid0.clone(), Agreement::new()); + agreement_instances.insert(uid0.clone(), Agreement::new(uid0.clone(), num_nodes)); } Ok(CommonSubset { uid, num_nodes, num_faulty_nodes, - agreement_true_outputs: HashSet::new(), broadcast_instances, - agreement_instances: HashMap::new(), + agreement_instances, broadcast_results: HashMap::new(), agreement_results: HashMap::new(), }) @@ -105,13 +93,10 @@ impl CommonSubset { /// Upon delivery of v_j from RBC_j, if input has not yet been provided to /// BA_j, then provide input 1 to BA_j. See Figure 11. - pub fn on_broadcast_result( - &mut self, - uid: &NodeUid, - ) -> Result, Error> { - if let Some(agreement_instance) = self.agreement_instances.get_mut(uid) { - if !agreement_instance.has_input() { - Ok(Some(agreement_instance.set_input(true))) + fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result, Error> { + if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { + if agreement_instance.accepts_input() { + Ok(Some(agreement_instance.set_input(true)?)) } else { Ok(None) } @@ -120,84 +105,132 @@ impl CommonSubset { } } - /// Receive input from a remote node. - pub fn on_input( + /// Receives a broadcast message from a remote node `sender_id` concerning a + /// value proposed by the node `proposer_id`. The output contains an + /// optional result of the Common Subset algorithm - a set of proposed + /// values - and a queue of messages to be sent to remote nodes, or an + /// error. + pub fn handle_broadcast( &mut self, - message: Input, - ) -> Result>, Error> { - match message { - Input::Broadcast(uid, bmessage) => { - let mut instance_result = None; - let input_result = { - if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) { - broadcast_instance - .handle_broadcast_message(&uid, bmessage) - .map(|(value, queue)| { - instance_result = value; - queue.into_iter().map(Output::Broadcast).collect() - }) - .map_err(Error::from) - } else { - Err(Error::NoSuchBroadcastInstance) - } - }; - if instance_result.is_some() { - self.on_broadcast_result(&uid)?; - } - input_result + sender_id: &NodeUid, + proposer_id: &NodeUid, + bmessage: BroadcastMessage, + ) -> Result, Error> { + let mut instance_result = None; + let input_result: Result>, Error> = { + if let Some(broadcast_instance) = self.broadcast_instances.get(proposer_id) { + broadcast_instance + .handle_broadcast_message(sender_id, bmessage) + .map(|(opt_value, queue)| { + instance_result = opt_value; + queue.into_iter().map(Output::Broadcast).collect() + }) + .map_err(Error::from) + } else { + Err(Error::NoSuchBroadcastInstance) } - Input::Agreement(_uid, _message) => { - // FIXME: send the message to the Agreement instance and - // conditionally call `on_agreement_output` + }; + let mut opt_message: Option = None; + if let Some(value) = instance_result { + self.broadcast_results.insert(proposer_id.clone(), value); + opt_message = self.on_broadcast_result(proposer_id)?; + } + input_result.map(|mut queue| { + if let Some(agreement_message) = opt_message { + // Append the message to agreement nodes to the common output queue. + queue.push_back(Output::Agreement(agreement_message)) + } + (None, queue) + }) + } + + /// Receives an agreement message from a remote node `sender_id` concerning + /// a value proposed by the node `proposer_id`. The output contains an + /// optional result of the Common Subset algorithm - a set of proposed + /// values - and a queue of messages to be sent to remote nodes, or an + /// error. + pub fn handle_agreement( + &mut self, + sender_id: &NodeUid, + proposer_id: &NodeUid, + amessage: &AgreementMessage, + ) -> Result, Error> { + // The result defaults to error. + let mut result = Err(Error::NoSuchAgreementInstance); + + // Send the message to the local instance of Agreement + if let Some(agreement_instance) = self.agreement_instances.get_mut(proposer_id) { + // Optional output of agreement and outgoing agreement + // messages to remote nodes. + result = if agreement_instance.terminated() { + // This instance has terminated and does not accept input. + Ok((None, VecDeque::new())) + } else { + // Send the message to the agreement instance. + agreement_instance + .handle_agreement_message(sender_id, &amessage) + .map_err(Error::from) + } + } - Err(Error::NotImplemented) + if let Ok((output, mut outgoing)) = result { + // Process Agreement outputs. + if let Some(b) = output { + outgoing.append(&mut self.on_agreement_result(proposer_id, b)?); } + + // Check whether Agreement has completed. + Ok(( + self.try_agreement_completion(), + outgoing.into_iter().map(Output::Agreement).collect(), + )) + } else { + // error + result + .map(|(_, messages)| (None, messages.into_iter().map(Output::Agreement).collect())) } } /// Callback to be invoked on receipt of a returned value of the Agreement /// instance `uid`. - /// - /// FIXME: It is likely that only one `AgreementMessage` is required because - /// Figure 11 does not count the number of messages but the number of nodes - /// that sent messages. - fn on_agreement_result(&mut self, uid: NodeUid, result: bool) -> VecDeque { + fn on_agreement_result( + &mut self, + element_proposer_id: &NodeUid, + result: bool, + ) -> Result, Error> { let mut outgoing = VecDeque::new(); // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. if result { - self.agreement_true_outputs.insert(uid); + self.agreement_results + .insert(element_proposer_id.clone(), result); + // The number of instances of BA that output 1. + let results1 = self.agreement_results.values().filter(|v| **v).count(); - if self.agreement_true_outputs.len() >= self.num_nodes - self.num_faulty_nodes { - let instances = &mut self.agreement_instances; - for (_uid0, instance) in instances.iter_mut() { - if !instance.has_input() { - outgoing.push_back(instance.set_input(false)); + if results1 >= self.num_nodes - self.num_faulty_nodes { + for instance in self.agreement_instances.values_mut() { + if instance.accepts_input() { + outgoing.push_back(instance.set_input(false)?); } } } } - outgoing + Ok(outgoing) } - pub fn on_agreement_completion(&self) -> Option> { + fn try_agreement_completion(&self) -> Option> { // Once all instances of BA have completed, let C ⊂ [1..N] be // the indexes of each BA that delivered 1. Wait for the output // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. - let instance_uids: HashSet = self.agreement_instances - .iter() - .map(|(k, _)| k.clone()) - .collect(); - let completed_uids: HashSet = self.agreement_results - .iter() - .map(|(k, _)| k.clone()) - .collect(); - if instance_uids == completed_uids { - // All instances of Agreement that delivered `true`. - let delivered_1: HashSet = self.agreement_results + if self.agreement_instances + .values() + .all(|instance| instance.terminated()) + { + // All instances of Agreement that delivered `true` (or "1" in the paper). + let delivered_1: HashSet<&NodeUid> = self.agreement_results .iter() .filter(|(_, v)| **v) - .map(|(k, _)| k.clone()) + .map(|(k, _)| k) .collect(); // Results of Broadcast instances in `delivered_1` let broadcast_results: HashSet = self.broadcast_results @@ -222,6 +255,7 @@ pub enum Error { UnexpectedMessage, NotImplemented, NoSuchBroadcastInstance, + NoSuchAgreementInstance, Broadcast(broadcast::Error), Agreement(agreement::Error), } diff --git a/src/lib.rs b/src/lib.rs index 58deb95c..53e63f60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ extern crate log; extern crate crossbeam; extern crate crossbeam_channel; +extern crate itertools; extern crate merkle; extern crate protobuf; extern crate reed_solomon_erasure; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ba5e2a05..202b5af2 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,6 +1,7 @@ //! Construction of messages from protobuf buffers. pub mod message; +use agreement::AgreementMessage; use broadcast::BroadcastMessage; use merkle::proof::{Lemma, Positioned, Proof}; use proto::message::*; @@ -66,13 +67,6 @@ impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> { } } -/// Messages sent during the binary Byzantine agreement stage. -#[derive(Copy, Clone, Debug, PartialEq)] -pub enum AgreementMessage { - BVal(bool), - Aux(bool), -} - impl + From>> Message { /// Translation from protobuf to the regular type. /// @@ -171,29 +165,6 @@ impl + From>> BroadcastMessage { } } -impl AgreementMessage { - pub fn into_proto(self) -> AgreementProto { - let mut p = AgreementProto::new(); - match self { - AgreementMessage::BVal(b) => p.set_bval(b), - AgreementMessage::Aux(b) => p.set_aux(b), - } - p - } - - // TODO: Re-enable lint once implemented. - #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] - pub fn from_proto(mp: AgreementProto) -> Option { - if mp.has_bval() { - Some(AgreementMessage::BVal(mp.get_bval())) - } else if mp.has_aux() { - Some(AgreementMessage::Aux(mp.get_aux())) - } else { - None - } - } -} - /// Serialisation of `Proof` defined against its protobuf interface to work /// around the restriction of not being allowed to extend the implementation of /// `Proof` outside its crate.