From d3b974f888a9743bcce6e1c905ffbeea1bd5a73a Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Sun, 6 May 2018 22:39:01 +0100 Subject: [PATCH 01/14] Binary Agreement implementation and its wiring into Common Subset --- proto/message.proto | 5 +- src/agreement.rs | 217 ++++++++++++++++++++++++++++++++++++++++--- src/common_subset.rs | 55 ++++++++--- src/proto/mod.rs | 23 +++-- 4 files changed, 266 insertions(+), 34 deletions(-) diff --git a/proto/message.proto b/proto/message.proto index ff5bfd7b..92ae378f 100644 --- a/proto/message.proto +++ b/proto/message.proto @@ -45,8 +45,9 @@ message LemmaProto { } message AgreementProto { + uint32 epoch = 1; oneof payload { - bool bval = 1; - bool aux = 2; + bool bval = 2; + bool aux = 3; } } \ No newline at end of file diff --git a/src/agreement.rs b/src/agreement.rs index 0c733bed..95b7655d 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,26 +1,61 @@ //! Binary Byzantine agreement protocol from a common coin protocol. +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::hash::Hash; + use proto::AgreementMessage; -use std::collections::{BTreeSet, VecDeque}; -#[derive(Default)] -pub struct Agreement { +pub struct Agreement { + /// The UID of the corresponding node. + uid: NodeUid, + num_nodes: usize, + num_faulty_nodes: usize, + epoch: u32, input: Option, - _bin_values: BTreeSet, + /// Bin values. Reset on every epoch update. + bin_values: BTreeSet, + /// Values received in BVAL messages. Reset on every epoch update. + received_bval: HashMap>, + /// Sent BVAL values. Reset on every epoch update. + sent_bval: BTreeSet, + /// Values received in AUX messages. Reset on every epoch update. + received_aux: HashMap>, + /// All the output values in all epochs. + outputs: BTreeMap, + /// Termination flag. + terminated: bool, } -impl Agreement { - pub fn new() -> Self { +impl Agreement { + pub fn new(uid: NodeUid, num_nodes: usize) -> Self { + let num_faulty_nodes = (num_nodes - 1) / 3; + Agreement { + uid, + num_nodes, + num_faulty_nodes, + epoch: 0, input: None, - _bin_values: BTreeSet::new(), + bin_values: BTreeSet::new(), + received_bval: HashMap::new(), + sent_bval: BTreeSet::new(), + received_aux: HashMap::new(), + outputs: BTreeMap::new(), + terminated: false, } } + /// Algorithm has terminated. + pub fn terminated(&self) -> bool { + self.terminated + } + pub fn set_input(&mut self, input: bool) -> AgreementMessage { self.input = Some(input); + // Receive the BVAL message locally. + update_map_of_sets(&mut self.received_bval, self.uid.clone(), input); // Multicast BVAL - AgreementMessage::BVal(input) + AgreementMessage::BVal((self.epoch, input)) } pub fn has_input(&self) -> bool { @@ -28,12 +63,170 @@ impl Agreement { } /// Receive input from a remote node. + /// + /// Outputs an optional agreement result and a queue of agreement messages + /// to remote nodes. There can be up to 2 messages. pub fn on_input( - &self, - _message: &AgreementMessage, - ) -> Result, Error> { - Err(Error::NotImplemented) + &mut self, + uid: NodeUid, + message: &AgreementMessage, + ) -> Result<(Option, VecDeque), Error> { + let mut outgoing = VecDeque::new(); + + match *message { + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => { + update_map_of_sets(&mut self.received_bval, uid, b); + let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| { + if values.contains(&b) { + count + 1 + } else { + count + } + }); + + // upon receiving BVAL_r(b) messages from 2f + 1 nodes, + // bin_values_r := bin_values_r ∪ {b} + if count_bval == 2 * self.num_faulty_nodes + 1 { + self.bin_values.insert(b); + + // wait until bin_values_r /= 0, then multicast AUX_r(w) + // where w ∈ bin_values_r + outgoing.push_back(AgreementMessage::Aux((self.epoch, b))); + // Receive the AUX message locally. + update_map_of_sets(&mut self.received_aux, self.uid.clone(), b); + + let coin_result = self.try_coin(); + if let Some(output_message) = coin_result.1 { + outgoing.push_back(output_message); + } + Ok((coin_result.0, outgoing)) + } + // upon receiving BVAL_r(b) messages from f + 1 nodes, if + // BVAL_r(b) has not been sent, multicast BVAL_r(b) + else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { + outgoing.push_back(AgreementMessage::BVal((self.epoch, b))); + // Receive the BVAL message locally. + update_map_of_sets(&mut self.received_bval, self.uid.clone(), b); + Ok((None, outgoing)) + } else { + Ok((None, outgoing)) + } + } + + AgreementMessage::Aux((_epoch, b)) => { + update_map_of_sets(&mut self.received_aux, uid, b); + if !self.bin_values.is_empty() { + let coin_result = self.try_coin(); + if let Some(output_message) = coin_result.1 { + outgoing.push_back(output_message); + } + Ok((coin_result.0, outgoing)) + } else { + Ok((None, outgoing)) + } + } + + _ => { + // Epoch does not match. Ignore the message. + Ok((None, outgoing)) + } + } } + + /// AUX_r messages such that the set of values carried by those messages is + /// a subset of bin_values_r. Outputs this subset. + /// + /// FIXME: Clarify whether the values of AUX messages should be the same or + /// not. It is assumed in `count_aux` that they can differ. + fn count_aux(&self) -> (usize, BTreeSet) { + let vals = BTreeSet::new(); + ( + self.received_aux.iter().fold(0, |count, (_, values)| { + if values.is_subset(&self.bin_values) { + vals.union(values); + count + 1 + } else { + count + } + }), + vals, + ) + } + + /// Wait until at least (N − f) AUX_r messages have been received, such that + /// the set of values carried by these messages, vals, are a subset of + /// bin_values_r (note that bin_values_r may continue to change as BVAL_r + /// messages are received, thus this condition may be triggered upon arrival + /// of either an AUX_r or a BVAL_r message). + /// + /// `try_coin` output an optional combination of the agreement value and the + /// agreement broadcast message. + fn try_coin(&mut self) -> (Option, Option) { + let (count_aux, vals) = self.count_aux(); + if count_aux >= self.num_nodes - self.num_faulty_nodes { + // FIXME: Implement the Common Coin algorithm. At the moment the + // coin value is constant `true`. + let coin: u64 = 1; + + let coin2 = coin % 2 != 0; + + // Check the termination condition: "continue looping until both a + // value b is output in some round r, and the value Coin_r' = b for + // some round r' > r." + self.terminated = self.terminated || self.outputs.values().any(|b| *b == coin2); + + // Prepare to start the next epoch. + self.bin_values.clear(); + + if vals.len() == 1 { + let mut message = None; + // NOTE: `vals` has exactly one element due to `vals.len() == 1` + let output: Vec> = vals.into_iter() + .take(1) + .map(|b| { + message = Some(self.set_input(b)); + + if b == coin2 { + // Record the output to perform a termination check later. + self.outputs.insert(self.epoch, b); + // Output the agreement value. + Some(b) + } else { + // Don't output a value. + None + } + }) + .collect(); + // Start the next epoch. + self.epoch += 1; + (output[0], message) + } else { + // Start the next epoch. + self.epoch += 1; + (None, Some(self.set_input(coin2))) + } + } else { + // Continue waiting for the (N - f) AUX messages. + (None, None) + } + } +} + +// Insert an element into a hash map of sets of values of type `Elt`. +fn update_map_of_sets(map: &mut HashMap>, key: Key, elt: Elt) +where + Key: Eq + Hash, + Elt: Copy + Ord, +{ + map.entry(key) + .and_modify(|values| { + values.insert(elt); + }) + .or_insert({ + let mut values = BTreeSet::new(); + values.insert(elt); + values + }); } #[derive(Clone, Debug)] diff --git a/src/common_subset.rs b/src/common_subset.rs index d92c5d49..55f85e08 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -43,15 +43,15 @@ pub struct CommonSubset { uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, - agreement_true_outputs: HashSet, broadcast_instances: HashMap>, - agreement_instances: HashMap, + agreement_instances: HashMap>, broadcast_results: HashMap, agreement_results: HashMap, } impl CommonSubset { - pub fn new(uid: NodeUid, all_uids: &HashSet, num_nodes: usize) -> Result { + pub fn new(uid: NodeUid, all_uids: &HashSet) -> Result { + let num_nodes = all_uids.len(); let num_faulty_nodes = (num_nodes - 1) / 3; // Create all broadcast instances. @@ -68,18 +68,17 @@ impl CommonSubset { } // Create all agreement instances. - let mut agreement_instances: HashMap = HashMap::new(); + let mut agreement_instances: HashMap> = HashMap::new(); for uid0 in all_uids { - agreement_instances.insert(uid0.clone(), Agreement::new()); + agreement_instances.insert(uid0.clone(), Agreement::new(uid0.clone(), num_nodes)); } Ok(CommonSubset { uid, num_nodes, num_faulty_nodes, - agreement_true_outputs: HashSet::new(), broadcast_instances, - agreement_instances: HashMap::new(), + agreement_instances, broadcast_results: HashMap::new(), agreement_results: HashMap::new(), }) @@ -109,7 +108,7 @@ impl CommonSubset { &mut self, uid: &NodeUid, ) -> Result, Error> { - if let Some(agreement_instance) = self.agreement_instances.get_mut(uid) { + if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { if !agreement_instance.has_input() { Ok(Some(agreement_instance.set_input(true))) } else { @@ -146,11 +145,35 @@ impl CommonSubset { } input_result } - Input::Agreement(_uid, _message) => { + + Input::Agreement(uid, amessage) => { + // The result defaults to error. + let mut result = Err(Error::NoSuchAgreementInstance); + // FIXME: send the message to the Agreement instance and - // conditionally call `on_agreement_output` + if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) { + // Optional output of agreement and outgoing agreement + // messages to remote nodes. + result = if agreement_instance.terminated() { + // This instance has terminated and does not accept input. + Ok((None, VecDeque::new())) + } else { + agreement_instance + .on_input(uid.clone(), &amessage) + .map_err(Error::from) + } + } - Err(Error::NotImplemented) + if let Ok((output, mut outgoing)) = result { + if let Some(b) = output { + outgoing.append(&mut self.on_agreement_result(uid, b)); + } + Ok(outgoing.into_iter().map(Output::Agreement).collect()) + } else { + // error + result + .map(|(_, messages)| messages.into_iter().map(Output::Agreement).collect()) + } } } } @@ -166,9 +189,14 @@ impl CommonSubset { // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. if result { - self.agreement_true_outputs.insert(uid); + self.agreement_results.insert(uid, result); + let results1: Vec = self.agreement_results + .iter() + .map(|(_, v)| *v) + .filter(|b| *b) + .collect(); - if self.agreement_true_outputs.len() >= self.num_nodes - self.num_faulty_nodes { + if results1.len() >= self.num_nodes - self.num_faulty_nodes { let instances = &mut self.agreement_instances; for (_uid0, instance) in instances.iter_mut() { if !instance.has_input() { @@ -222,6 +250,7 @@ pub enum Error { UnexpectedMessage, NotImplemented, NoSuchBroadcastInstance, + NoSuchAgreementInstance, Broadcast(broadcast::Error), Agreement(agreement::Error), } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index ba5e2a05..7163d8d7 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -67,10 +67,12 @@ impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> { } /// Messages sent during the binary Byzantine agreement stage. -#[derive(Copy, Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum AgreementMessage { - BVal(bool), - Aux(bool), + /// BVAL message with an epoch. + BVal((u32, bool)), + /// AUX message with an epoch. + Aux((u32, bool)), } impl + From>> Message { @@ -175,8 +177,14 @@ impl AgreementMessage { pub fn into_proto(self) -> AgreementProto { let mut p = AgreementProto::new(); match self { - AgreementMessage::BVal(b) => p.set_bval(b), - AgreementMessage::Aux(b) => p.set_aux(b), + AgreementMessage::BVal((e, b)) => { + p.set_epoch(e); + p.set_bval(b); + } + AgreementMessage::Aux((e, b)) => { + p.set_epoch(e); + p.set_aux(b); + } } p } @@ -184,10 +192,11 @@ impl AgreementMessage { // TODO: Re-enable lint once implemented. #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] pub fn from_proto(mp: AgreementProto) -> Option { + let epoch = mp.get_epoch(); if mp.has_bval() { - Some(AgreementMessage::BVal(mp.get_bval())) + Some(AgreementMessage::BVal((epoch, mp.get_bval()))) } else if mp.has_aux() { - Some(AgreementMessage::Aux(mp.get_aux())) + Some(AgreementMessage::Aux((epoch, mp.get_aux()))) } else { None } From 5215156ec5264df3c01ae38e013a5ddf36278e3d Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Mon, 7 May 2018 10:59:14 +0100 Subject: [PATCH 02/14] defined the output from the Common Subset algorithm --- src/agreement.rs | 8 +++- src/common_subset.rs | 88 ++++++++++++++++++++++---------------------- 2 files changed, 52 insertions(+), 44 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 95b7655d..ae4806a3 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -74,6 +74,11 @@ impl Agreement { let mut outgoing = VecDeque::new(); match *message { + _ if self.terminated => { + // The algorithm instance has already terminated. + Err(Error::Terminated) + } + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => { update_map_of_sets(&mut self.received_bval, uid, b); let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| { @@ -113,7 +118,7 @@ impl Agreement { } } - AgreementMessage::Aux((_epoch, b)) => { + AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => { update_map_of_sets(&mut self.received_aux, uid, b); if !self.bin_values.is_empty() { let coin_result = self.try_coin(); @@ -231,5 +236,6 @@ where #[derive(Clone, Debug)] pub enum Error { + Terminated, NotImplemented, } diff --git a/src/common_subset.rs b/src/common_subset.rs index 55f85e08..23cc289c 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -17,6 +17,8 @@ use proto::AgreementMessage; // TODO: Make this a generic argument of `Broadcast`. type ProposedValue = Vec; +// Type of output from the Common Subset message handler. +type CommonSubsetOutput = (Option>, VecDeque>); /// Input from a remote node to Common Subset. pub enum Input { @@ -27,9 +29,6 @@ pub enum Input { } /// Output from Common Subset to remote nodes. -/// -/// FIXME: We can do an interface that doesn't need this type and instead works -/// directly with the `TargetBroadcastMessage` and `AgreementMessage`. pub enum Output { /// A broadcast message to be sent to the destination set in the /// `TargetedBroadcastMessage`. @@ -46,6 +45,8 @@ pub struct CommonSubset { broadcast_instances: HashMap>, agreement_instances: HashMap>, broadcast_results: HashMap, + /// FIXME: The result may be a set of bool rather than a single bool due to + /// the ability of Agreement to output multiple values. agreement_results: HashMap, } @@ -104,10 +105,7 @@ impl CommonSubset { /// Upon delivery of v_j from RBC_j, if input has not yet been provided to /// BA_j, then provide input 1 to BA_j. See Figure 11. - pub fn on_broadcast_result( - &mut self, - uid: &NodeUid, - ) -> Result, Error> { + fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result, Error> { if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { if !agreement_instance.has_input() { Ok(Some(agreement_instance.set_input(true))) @@ -119,20 +117,22 @@ impl CommonSubset { } } - /// Receive input from a remote node. + /// Receive input from a remote node. The output contains an optional result + /// of the Common Subset algorithm - a set of proposed values - and a queue + /// of messages to be sent to remote nodes, or an error. pub fn on_input( &mut self, message: Input, - ) -> Result>, Error> { + ) -> Result, Error> { match message { Input::Broadcast(uid, bmessage) => { let mut instance_result = None; - let input_result = { + let input_result: Result>, Error> = { if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) { broadcast_instance .handle_broadcast_message(&uid, bmessage) - .map(|(value, queue)| { - instance_result = value; + .map(|(opt_value, queue)| { + instance_result = opt_value; queue.into_iter().map(Output::Broadcast).collect() }) .map_err(Error::from) @@ -140,17 +140,24 @@ impl CommonSubset { Err(Error::NoSuchBroadcastInstance) } }; - if instance_result.is_some() { - self.on_broadcast_result(&uid)?; + let mut opt_message: Option = None; + if let Some(value) = instance_result { + self.broadcast_results.insert(uid.clone(), value); + opt_message = self.on_broadcast_result(&uid)?; } - input_result + input_result.map(|mut queue| { + if let Some(agreement_message) = opt_message { + // Append the message to agreement nodes to the common output queue. + queue.push_back(Output::Agreement(agreement_message)) + } + (None, queue) + }) } Input::Agreement(uid, amessage) => { // The result defaults to error. let mut result = Err(Error::NoSuchAgreementInstance); - // FIXME: send the message to the Agreement instance and if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) { // Optional output of agreement and outgoing agreement // messages to remote nodes. @@ -158,6 +165,7 @@ impl CommonSubset { // This instance has terminated and does not accept input. Ok((None, VecDeque::new())) } else { + // Send the message to the agreement instance. agreement_instance .on_input(uid.clone(), &amessage) .map_err(Error::from) @@ -168,11 +176,15 @@ impl CommonSubset { if let Some(b) = output { outgoing.append(&mut self.on_agreement_result(uid, b)); } - Ok(outgoing.into_iter().map(Output::Agreement).collect()) + Ok(( + self.try_agreement_completion(), + outgoing.into_iter().map(Output::Agreement).collect(), + )) } else { // error - result - .map(|(_, messages)| messages.into_iter().map(Output::Agreement).collect()) + result.map(|(_, messages)| { + (None, messages.into_iter().map(Output::Agreement).collect()) + }) } } } @@ -180,25 +192,20 @@ impl CommonSubset { /// Callback to be invoked on receipt of a returned value of the Agreement /// instance `uid`. - /// - /// FIXME: It is likely that only one `AgreementMessage` is required because - /// Figure 11 does not count the number of messages but the number of nodes - /// that sent messages. fn on_agreement_result(&mut self, uid: NodeUid, result: bool) -> VecDeque { let mut outgoing = VecDeque::new(); // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. if result { self.agreement_results.insert(uid, result); - let results1: Vec = self.agreement_results - .iter() - .map(|(_, v)| *v) - .filter(|b| *b) - .collect(); + // The number of instances of BA that output 1. + let results1: usize = + self.agreement_results + .iter() + .fold(0, |count, (_, v)| if *v { count + 1 } else { count }); - if results1.len() >= self.num_nodes - self.num_faulty_nodes { - let instances = &mut self.agreement_instances; - for (_uid0, instance) in instances.iter_mut() { + if results1 >= self.num_nodes - self.num_faulty_nodes { + for instance in self.agreement_instances.values_mut() { if !instance.has_input() { outgoing.push_back(instance.set_input(false)); } @@ -208,24 +215,19 @@ impl CommonSubset { outgoing } - pub fn on_agreement_completion(&self) -> Option> { + fn try_agreement_completion(&self) -> Option> { // Once all instances of BA have completed, let C ⊂ [1..N] be // the indexes of each BA that delivered 1. Wait for the output // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. - let instance_uids: HashSet = self.agreement_instances - .iter() - .map(|(k, _)| k.clone()) - .collect(); - let completed_uids: HashSet = self.agreement_results + if self.agreement_instances .iter() - .map(|(k, _)| k.clone()) - .collect(); - if instance_uids == completed_uids { - // All instances of Agreement that delivered `true`. - let delivered_1: HashSet = self.agreement_results + .all(|(_, instance)| instance.terminated()) + { + // All instances of Agreement that delivered `true` (or "1" in the paper). + let delivered_1: HashSet<&NodeUid> = self.agreement_results .iter() .filter(|(_, v)| **v) - .map(|(k, _)| k.clone()) + .map(|(k, _)| k) .collect(); // Results of Broadcast instances in `delivered_1` let broadcast_results: HashSet = self.broadcast_results From 2205f9083e377893004f0ebc5f14b255c8a2fe4a Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Mon, 7 May 2018 18:28:22 +0100 Subject: [PATCH 03/14] added a TODO file and changed indentation in the .proto file --- TODO | 12 ++++++++++ proto/message.proto | 53 ++++++++++++++++++++++----------------------- 2 files changed, 38 insertions(+), 27 deletions(-) create mode 100644 TODO diff --git a/TODO b/TODO new file mode 100644 index 00000000..e79c0e14 --- /dev/null +++ b/TODO @@ -0,0 +1,12 @@ +TODO +==== + +* Fix the inappropriate use of Common Coin in the Byzantine Agreement protocol + + This bug is explained in https://github.com/amiller/HoneyBadgerBFT/issues/59 + where a solution is suggested introducing an additional type of message, + CONF. + + There may be alternative solutions, such as using a different Byzantine + Agreement protocol altogether, for example, + https://people.csail.mit.edu/silvio/Selected%20Scientific%20Papers/Distributed%20Computation/BYZANTYNE%20AGREEMENT%20MADE%20TRIVIAL.pdf \ No newline at end of file diff --git a/proto/message.proto b/proto/message.proto index 92ae378f..5b070825 100644 --- a/proto/message.proto +++ b/proto/message.proto @@ -1,53 +1,52 @@ syntax = "proto3"; message MessageProto { - oneof payload { - BroadcastProto broadcast = 1; - AgreementProto agreement = 2; - } + oneof payload { + BroadcastProto broadcast = 1; + AgreementProto agreement = 2; + } } message BroadcastProto { - oneof payload { - ValueProto value = 1; - EchoProto echo = 2; - ReadyProto ready = 3; - } + oneof payload { + ValueProto value = 1; + EchoProto echo = 2; + ReadyProto ready = 3; + } } message ValueProto { - ProofProto proof = 1; + ProofProto proof = 1; } message EchoProto { - ProofProto proof = 1; + ProofProto proof = 1; } message ReadyProto { - bytes root_hash = 1; + bytes root_hash = 1; } message ProofProto { - bytes root_hash = 1; - LemmaProto lemma = 2; - bytes value = 3; + bytes root_hash = 1; + LemmaProto lemma = 2; + bytes value = 3; } message LemmaProto { - bytes node_hash = 1; - LemmaProto sub_lemma = 2; - - oneof sibling_hash { - bytes left_sibling_hash = 3; - bytes right_sibling_hash = 4; - } + bytes node_hash = 1; + LemmaProto sub_lemma = 2; + oneof sibling_hash { + bytes left_sibling_hash = 3; + bytes right_sibling_hash = 4; + } } message AgreementProto { - uint32 epoch = 1; - oneof payload { - bool bval = 2; - bool aux = 3; - } + uint32 epoch = 1; + oneof payload { + bool bval = 2; + bool aux = 3; + } } \ No newline at end of file From 394462c88b080e6d778346e73c1fddf48a69ba6e Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Tue, 8 May 2018 17:25:57 +0100 Subject: [PATCH 04/14] changed code according to review comments --- Cargo.toml | 2 +- src/agreement.rs | 250 +++++++++++++++++++++++++------------------ src/common_subset.rs | 161 ++++++++++++++++------------ src/lib.rs | 1 + src/proto/mod.rs | 40 +------ 5 files changed, 241 insertions(+), 213 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f0aad989..f3eb1a18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,10 @@ log = "0.4.1" reed-solomon-erasure = "3.0" merkle = { git = "https://github.com/vkomenda/merkle.rs", branch = "public-proof" } ring = "^0.12" -rand = "*" protobuf = "1.4.4" crossbeam = "0.3.2" crossbeam-channel = "0.1" +rand = "0.3" [build-dependencies] protoc-rust = "1.4.4" diff --git a/src/agreement.rs b/src/agreement.rs index ae4806a3..2961926f 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,9 +1,51 @@ //! Binary Byzantine agreement protocol from a common coin protocol. +use rand::random; use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::hash::Hash; -use proto::AgreementMessage; +use proto::message; + +type AgreementOutput = (Option, VecDeque); + +/// Messages sent during the binary Byzantine agreement stage. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum AgreementMessage { + /// BVAL message with an epoch. + BVal((u32, bool)), + /// AUX message with an epoch. + Aux((u32, bool)), +} + +impl AgreementMessage { + pub fn into_proto(self) -> message::AgreementProto { + let mut p = message::AgreementProto::new(); + match self { + AgreementMessage::BVal((e, b)) => { + p.set_epoch(e); + p.set_bval(b); + } + AgreementMessage::Aux((e, b)) => { + p.set_epoch(e); + p.set_aux(b); + } + } + p + } + + // TODO: Re-enable lint once implemented. + #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] + pub fn from_proto(mp: message::AgreementProto) -> Option { + let epoch = mp.get_epoch(); + if mp.has_bval() { + Some(AgreementMessage::BVal((epoch, mp.get_bval()))) + } else if mp.has_aux() { + Some(AgreementMessage::Aux((epoch, mp.get_aux()))) + } else { + None + } + } +} pub struct Agreement { /// The UID of the corresponding node. @@ -21,7 +63,7 @@ pub struct Agreement { /// Values received in AUX messages. Reset on every epoch update. received_aux: HashMap>, /// All the output values in all epochs. - outputs: BTreeMap, + estimated: BTreeMap, /// Termination flag. terminated: bool, } @@ -40,7 +82,7 @@ impl Agreement { received_bval: HashMap::new(), sent_bval: BTreeSet::new(), received_aux: HashMap::new(), - outputs: BTreeMap::new(), + estimated: BTreeMap::new(), terminated: false, } } @@ -53,7 +95,10 @@ impl Agreement { pub fn set_input(&mut self, input: bool) -> AgreementMessage { self.input = Some(input); // Receive the BVAL message locally. - update_map_of_sets(&mut self.received_bval, self.uid.clone(), input); + self.received_bval + .entry(self.uid.clone()) + .or_insert_with(BTreeSet::new) + .insert(input); // Multicast BVAL AgreementMessage::BVal((self.epoch, input)) } @@ -70,71 +115,85 @@ impl Agreement { &mut self, uid: NodeUid, message: &AgreementMessage, - ) -> Result<(Option, VecDeque), Error> { + ) -> Result { + match *message { + // The algorithm instance has already terminated. + _ if self.terminated => Err(Error::Terminated), + + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => self.on_bval(uid, b), + + AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => self.on_aux(uid, b), + + // Epoch does not match. Ignore the message. + _ => Ok((None, VecDeque::new())), + } + } + + fn on_bval(&mut self, uid: NodeUid, b: bool) -> Result { let mut outgoing = VecDeque::new(); - match *message { - _ if self.terminated => { - // The algorithm instance has already terminated. - Err(Error::Terminated) - } + self.received_bval + .entry(uid) + .or_insert_with(BTreeSet::new) + .insert(b); + let count_bval = self.received_bval + .values() + .filter(|values| values.contains(&b)) + .count(); + + // upon receiving BVAL_r(b) messages from 2f + 1 nodes, + // bin_values_r := bin_values_r ∪ {b} + if count_bval == 2 * self.num_faulty_nodes + 1 { + self.bin_values.insert(b); - AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => { - update_map_of_sets(&mut self.received_bval, uid, b); - let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| { - if values.contains(&b) { - count + 1 - } else { - count - } - }); - - // upon receiving BVAL_r(b) messages from 2f + 1 nodes, - // bin_values_r := bin_values_r ∪ {b} - if count_bval == 2 * self.num_faulty_nodes + 1 { - self.bin_values.insert(b); - - // wait until bin_values_r /= 0, then multicast AUX_r(w) - // where w ∈ bin_values_r - outgoing.push_back(AgreementMessage::Aux((self.epoch, b))); - // Receive the AUX message locally. - update_map_of_sets(&mut self.received_aux, self.uid.clone(), b); - - let coin_result = self.try_coin(); - if let Some(output_message) = coin_result.1 { - outgoing.push_back(output_message); - } - Ok((coin_result.0, outgoing)) - } - // upon receiving BVAL_r(b) messages from f + 1 nodes, if - // BVAL_r(b) has not been sent, multicast BVAL_r(b) - else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { - outgoing.push_back(AgreementMessage::BVal((self.epoch, b))); - // Receive the BVAL message locally. - update_map_of_sets(&mut self.received_bval, self.uid.clone(), b); - Ok((None, outgoing)) - } else { - Ok((None, outgoing)) - } + // wait until bin_values_r != 0, then multicast AUX_r(w) + // where w ∈ bin_values_r + if self.bin_values.len() == 1 { + // Send an AUX message at most once per epoch. + outgoing.push_back(AgreementMessage::Aux((self.epoch, b))); + // Receive the AUX message locally. + self.received_aux + .entry(self.uid.clone()) + .or_insert_with(BTreeSet::new) + .insert(b); } - AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => { - update_map_of_sets(&mut self.received_aux, uid, b); - if !self.bin_values.is_empty() { - let coin_result = self.try_coin(); - if let Some(output_message) = coin_result.1 { - outgoing.push_back(output_message); - } - Ok((coin_result.0, outgoing)) - } else { - Ok((None, outgoing)) - } + let coin_result = self.try_coin(); + if let Some(output_message) = coin_result.1 { + outgoing.push_back(output_message); } + Ok((coin_result.0, outgoing)) + } + // upon receiving BVAL_r(b) messages from f + 1 nodes, if + // BVAL_r(b) has not been sent, multicast BVAL_r(b) + else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { + outgoing.push_back(AgreementMessage::BVal((self.epoch, b))); + // Receive the BVAL message locally. + self.received_bval + .entry(self.uid.clone()) + .or_insert_with(BTreeSet::new) + .insert(b); + Ok((None, outgoing)) + } else { + Ok((None, outgoing)) + } + } - _ => { - // Epoch does not match. Ignore the message. - Ok((None, outgoing)) + fn on_aux(&mut self, uid: NodeUid, b: bool) -> Result { + let mut outgoing = VecDeque::new(); + + self.received_aux + .entry(uid) + .or_insert_with(BTreeSet::new) + .insert(b); + if !self.bin_values.is_empty() { + let coin_result = self.try_coin(); + if let Some(output_message) = coin_result.1 { + outgoing.push_back(output_message); } + Ok((coin_result.0, outgoing)) + } else { + Ok((None, outgoing)) } } @@ -143,47 +202,54 @@ impl Agreement { /// /// FIXME: Clarify whether the values of AUX messages should be the same or /// not. It is assumed in `count_aux` that they can differ. + /// + /// In general, we can't expect every good node to send the same AUX value, + /// so waiting for N - f agreeing messages would not always terminate. We + /// can, however, expect every good node to send an AUX value that will + /// eventually end up in our bin_values. fn count_aux(&self) -> (usize, BTreeSet) { let vals = BTreeSet::new(); ( - self.received_aux.iter().fold(0, |count, (_, values)| { - if values.is_subset(&self.bin_values) { - vals.union(values); - count + 1 - } else { - count - } - }), + self.received_aux + .values() + .filter(|values| values.is_subset(&self.bin_values)) + .map(|values| vals.union(values)) + .count(), vals, ) } - /// Wait until at least (N − f) AUX_r messages have been received, such that + /// Waits until at least (N − f) AUX_r messages have been received, such that /// the set of values carried by these messages, vals, are a subset of /// bin_values_r (note that bin_values_r may continue to change as BVAL_r /// messages are received, thus this condition may be triggered upon arrival /// of either an AUX_r or a BVAL_r message). /// - /// `try_coin` output an optional combination of the agreement value and the - /// agreement broadcast message. + /// `try_coin` outputs an optional combination of the agreement value and + /// the agreement broadcast message. fn try_coin(&mut self) -> (Option, Option) { let (count_aux, vals) = self.count_aux(); - if count_aux >= self.num_nodes - self.num_faulty_nodes { + if count_aux < self.num_nodes - self.num_faulty_nodes { + // Continue waiting for the (N - f) AUX messages. + (None, None) + } else { // FIXME: Implement the Common Coin algorithm. At the moment the - // coin value is constant `true`. - let coin: u64 = 1; - - let coin2 = coin % 2 != 0; + // coin value is random and local to each instance of Agreement. + let coin2 = random::(); // Check the termination condition: "continue looping until both a // value b is output in some round r, and the value Coin_r' = b for // some round r' > r." - self.terminated = self.terminated || self.outputs.values().any(|b| *b == coin2); + self.terminated = self.terminated || self.estimated.values().any(|b| *b == coin2); // Prepare to start the next epoch. self.bin_values.clear(); - if vals.len() == 1 { + if vals.len() != 1 { + // Start the next epoch. + self.epoch += 1; + (None, Some(self.set_input(coin2))) + } else { let mut message = None; // NOTE: `vals` has exactly one element due to `vals.len() == 1` let output: Vec> = vals.into_iter() @@ -193,7 +259,7 @@ impl Agreement { if b == coin2 { // Record the output to perform a termination check later. - self.outputs.insert(self.epoch, b); + self.estimated.insert(self.epoch, b); // Output the agreement value. Some(b) } else { @@ -205,35 +271,11 @@ impl Agreement { // Start the next epoch. self.epoch += 1; (output[0], message) - } else { - // Start the next epoch. - self.epoch += 1; - (None, Some(self.set_input(coin2))) } - } else { - // Continue waiting for the (N - f) AUX messages. - (None, None) } } } -// Insert an element into a hash map of sets of values of type `Elt`. -fn update_map_of_sets(map: &mut HashMap>, key: Key, elt: Elt) -where - Key: Eq + Hash, - Elt: Copy + Ord, -{ - map.entry(key) - .and_modify(|values| { - values.insert(elt); - }) - .or_insert({ - let mut values = BTreeSet::new(); - values.insert(elt); - values - }); -} - #[derive(Clone, Debug)] pub enum Error { Terminated, diff --git a/src/common_subset.rs b/src/common_subset.rs index 23cc289c..4a488366 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -8,13 +8,11 @@ use std::fmt::{Debug, Display}; use std::hash::Hash; use agreement; -use agreement::Agreement; +use agreement::{Agreement, AgreementMessage}; use broadcast; use broadcast::{Broadcast, BroadcastMessage, TargetedBroadcastMessage}; -use proto::AgreementMessage; - // TODO: Make this a generic argument of `Broadcast`. type ProposedValue = Vec; // Type of output from the Common Subset message handler. @@ -24,7 +22,7 @@ type CommonSubsetOutput = (Option>, VecDeque { /// Message from a remote node `uid` to the broadcast instance `uid`. Broadcast(NodeUid, BroadcastMessage), - /// Message from a remote node `uid` to the agreement instance `uid`. + /// Message from a remote node `uid` to all agreement instances. Agreement(NodeUid, AgreementMessage), } @@ -45,8 +43,6 @@ pub struct CommonSubset { broadcast_instances: HashMap>, agreement_instances: HashMap>, broadcast_results: HashMap, - /// FIXME: The result may be a set of bool rather than a single bool due to - /// the ability of Agreement to output multiple values. agreement_results: HashMap, } @@ -125,68 +121,98 @@ impl CommonSubset { message: Input, ) -> Result, Error> { match message { - Input::Broadcast(uid, bmessage) => { - let mut instance_result = None; - let input_result: Result>, Error> = { - if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) { - broadcast_instance - .handle_broadcast_message(&uid, bmessage) - .map(|(opt_value, queue)| { - instance_result = opt_value; - queue.into_iter().map(Output::Broadcast).collect() - }) - .map_err(Error::from) - } else { - Err(Error::NoSuchBroadcastInstance) - } - }; - let mut opt_message: Option = None; - if let Some(value) = instance_result { - self.broadcast_results.insert(uid.clone(), value); - opt_message = self.on_broadcast_result(&uid)?; - } - input_result.map(|mut queue| { - if let Some(agreement_message) = opt_message { - // Append the message to agreement nodes to the common output queue. - queue.push_back(Output::Agreement(agreement_message)) - } - (None, queue) - }) - } + Input::Broadcast(uid, bmessage) => self.on_input_broadcast(&uid, bmessage), - Input::Agreement(uid, amessage) => { - // The result defaults to error. - let mut result = Err(Error::NoSuchAgreementInstance); + Input::Agreement(uid, amessage) => self.on_input_agreement(&uid, &amessage), + } + } - if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) { - // Optional output of agreement and outgoing agreement - // messages to remote nodes. - result = if agreement_instance.terminated() { - // This instance has terminated and does not accept input. - Ok((None, VecDeque::new())) - } else { - // Send the message to the agreement instance. - agreement_instance - .on_input(uid.clone(), &amessage) - .map_err(Error::from) - } - } + fn on_input_broadcast( + &mut self, + uid: &NodeUid, + bmessage: BroadcastMessage, + ) -> Result, Error> { + let mut instance_result = None; + let input_result: Result>, Error> = { + if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) { + broadcast_instance + .handle_broadcast_message(&uid, bmessage) + .map(|(opt_value, queue)| { + instance_result = opt_value; + queue.into_iter().map(Output::Broadcast).collect() + }) + .map_err(Error::from) + } else { + Err(Error::NoSuchBroadcastInstance) + } + }; + let mut opt_message: Option = None; + if let Some(value) = instance_result { + self.broadcast_results.insert(uid.clone(), value); + opt_message = self.on_broadcast_result(&uid)?; + } + input_result.map(|mut queue| { + if let Some(agreement_message) = opt_message { + // Append the message to agreement nodes to the common output queue. + queue.push_back(Output::Agreement(agreement_message)) + } + (None, queue) + }) + } - if let Ok((output, mut outgoing)) = result { - if let Some(b) = output { - outgoing.append(&mut self.on_agreement_result(uid, b)); + fn on_input_agreement( + &mut self, + uid: &NodeUid, + amessage: &AgreementMessage, + ) -> Result, Error> { + // Send the message to all local instances of Agreement + let on_input_result: Result< + (HashMap, VecDeque), + Error, + > = self.agreement_instances.iter_mut().fold( + Ok((HashMap::new(), VecDeque::new())), + |accum, (instance_uid, instance)| { + match accum { + Err(_) => accum, + Ok((mut outputs, mut outgoing)) => { + // Optional output of agreement and outgoing + // agreement messages to remote nodes. + if instance.terminated() { + // This instance has terminated and does not accept input. + Ok((outputs, outgoing)) + } else { + // Send the message to the agreement instance. + instance + .on_input(uid.clone(), &amessage) + .map_err(Error::from) + .map(|(output, mut messages)| { + if let Some(b) = output { + outputs.insert(instance_uid.clone(), b); + } + outgoing.append(&mut messages); + (outputs, outgoing) + }) + } } - Ok(( - self.try_agreement_completion(), - outgoing.into_iter().map(Output::Agreement).collect(), - )) - } else { - // error - result.map(|(_, messages)| { - (None, messages.into_iter().map(Output::Agreement).collect()) - }) } - } + }, + ); + + if let Ok((outputs, mut outgoing)) = on_input_result { + // Process Agreement outputs. + outputs.iter().map(|(output_uid, &output_value)| { + outgoing.append(&mut self.on_agreement_result(output_uid.clone(), output_value)); + }); + + // Check whether Agreement has completed. + Ok(( + self.try_agreement_completion(), + outgoing.into_iter().map(Output::Agreement).collect(), + )) + } else { + // error + on_input_result + .map(|(_, messages)| (None, messages.into_iter().map(Output::Agreement).collect())) } } @@ -199,10 +225,7 @@ impl CommonSubset { if result { self.agreement_results.insert(uid, result); // The number of instances of BA that output 1. - let results1: usize = - self.agreement_results - .iter() - .fold(0, |count, (_, v)| if *v { count + 1 } else { count }); + let results1 = self.agreement_results.values().filter(|v| **v).count(); if results1 >= self.num_nodes - self.num_faulty_nodes { for instance in self.agreement_instances.values_mut() { @@ -220,8 +243,8 @@ impl CommonSubset { // the indexes of each BA that delivered 1. Wait for the output // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. if self.agreement_instances - .iter() - .all(|(_, instance)| instance.terminated()) + .values() + .all(|instance| instance.terminated()) { // All instances of Agreement that delivered `true` (or "1" in the paper). let delivered_1: HashSet<&NodeUid> = self.agreement_results diff --git a/src/lib.rs b/src/lib.rs index 58deb95c..2e3b5a17 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ extern crate crossbeam; extern crate crossbeam_channel; extern crate merkle; extern crate protobuf; +extern crate rand; extern crate reed_solomon_erasure; extern crate ring; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 7163d8d7..202b5af2 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,6 +1,7 @@ //! Construction of messages from protobuf buffers. pub mod message; +use agreement::AgreementMessage; use broadcast::BroadcastMessage; use merkle::proof::{Lemma, Positioned, Proof}; use proto::message::*; @@ -66,15 +67,6 @@ impl<'a, T: AsRef<[u8]>> fmt::Debug for HexProof<'a, T> { } } -/// Messages sent during the binary Byzantine agreement stage. -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum AgreementMessage { - /// BVAL message with an epoch. - BVal((u32, bool)), - /// AUX message with an epoch. - Aux((u32, bool)), -} - impl + From>> Message { /// Translation from protobuf to the regular type. /// @@ -173,36 +165,6 @@ impl + From>> BroadcastMessage { } } -impl AgreementMessage { - pub fn into_proto(self) -> AgreementProto { - let mut p = AgreementProto::new(); - match self { - AgreementMessage::BVal((e, b)) => { - p.set_epoch(e); - p.set_bval(b); - } - AgreementMessage::Aux((e, b)) => { - p.set_epoch(e); - p.set_aux(b); - } - } - p - } - - // TODO: Re-enable lint once implemented. - #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] - pub fn from_proto(mp: AgreementProto) -> Option { - let epoch = mp.get_epoch(); - if mp.has_bval() { - Some(AgreementMessage::BVal((epoch, mp.get_bval()))) - } else if mp.has_aux() { - Some(AgreementMessage::Aux((epoch, mp.get_aux()))) - } else { - None - } - } -} - /// Serialisation of `Proof` defined against its protobuf interface to work /// around the restriction of not being allowed to extend the implementation of /// `Proof` outside its crate. From 3e35cc665bd4e7747e47e8e53c3f716370b74f14 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Wed, 9 May 2018 09:55:34 +0100 Subject: [PATCH 05/14] added element_proposer_id to the Agreement input to the Common Subset algorithm --- src/agreement.rs | 16 ++++---- src/common_subset.rs | 90 ++++++++++++++++++++++---------------------- 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 2961926f..5c466d35 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -48,7 +48,7 @@ impl AgreementMessage { } pub struct Agreement { - /// The UID of the corresponding node. + /// The UID of the corresponding proposer node. uid: NodeUid, num_nodes: usize, num_faulty_nodes: usize, @@ -113,27 +113,27 @@ impl Agreement { /// to remote nodes. There can be up to 2 messages. pub fn on_input( &mut self, - uid: NodeUid, + sender_id: NodeUid, message: &AgreementMessage, ) -> Result { match *message { // The algorithm instance has already terminated. _ if self.terminated => Err(Error::Terminated), - AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => self.on_bval(uid, b), + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => self.on_bval(sender_id, b), - AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => self.on_aux(uid, b), + AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => self.on_aux(sender_id, b), // Epoch does not match. Ignore the message. _ => Ok((None, VecDeque::new())), } } - fn on_bval(&mut self, uid: NodeUid, b: bool) -> Result { + fn on_bval(&mut self, sender_id: NodeUid, b: bool) -> Result { let mut outgoing = VecDeque::new(); self.received_bval - .entry(uid) + .entry(sender_id) .or_insert_with(BTreeSet::new) .insert(b); let count_bval = self.received_bval @@ -179,11 +179,11 @@ impl Agreement { } } - fn on_aux(&mut self, uid: NodeUid, b: bool) -> Result { + fn on_aux(&mut self, sender_id: NodeUid, b: bool) -> Result { let mut outgoing = VecDeque::new(); self.received_aux - .entry(uid) + .entry(sender_id) .or_insert_with(BTreeSet::new) .insert(b); if !self.bin_values.is_empty() { diff --git a/src/common_subset.rs b/src/common_subset.rs index 4a488366..3f1b9559 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -22,8 +22,13 @@ type CommonSubsetOutput = (Option>, VecDeque { /// Message from a remote node `uid` to the broadcast instance `uid`. Broadcast(NodeUid, BroadcastMessage), - /// Message from a remote node `uid` to all agreement instances. - Agreement(NodeUid, AgreementMessage), + /// Message from a remote node `message_sender_id` concerning the common + /// subset element proposed by the node `element_proposer_id`. + Agreement { + message_sender_id: NodeUid, + element_proposer_id: NodeUid, + message: AgreementMessage, + }, } /// Output from Common Subset to remote nodes. @@ -118,12 +123,16 @@ impl CommonSubset { /// of messages to be sent to remote nodes, or an error. pub fn on_input( &mut self, - message: Input, + input_message: Input, ) -> Result, Error> { - match message { + match input_message { Input::Broadcast(uid, bmessage) => self.on_input_broadcast(&uid, bmessage), - Input::Agreement(uid, amessage) => self.on_input_agreement(&uid, &amessage), + Input::Agreement { + message_sender_id, + element_proposer_id, + message, + } => self.on_input_agreement(message_sender_id, element_proposer_id, &message), } } @@ -162,47 +171,34 @@ impl CommonSubset { fn on_input_agreement( &mut self, - uid: &NodeUid, + msg_sender_id: NodeUid, + element_proposer_id: NodeUid, amessage: &AgreementMessage, ) -> Result, Error> { - // Send the message to all local instances of Agreement - let on_input_result: Result< - (HashMap, VecDeque), - Error, - > = self.agreement_instances.iter_mut().fold( - Ok((HashMap::new(), VecDeque::new())), - |accum, (instance_uid, instance)| { - match accum { - Err(_) => accum, - Ok((mut outputs, mut outgoing)) => { - // Optional output of agreement and outgoing - // agreement messages to remote nodes. - if instance.terminated() { - // This instance has terminated and does not accept input. - Ok((outputs, outgoing)) - } else { - // Send the message to the agreement instance. - instance - .on_input(uid.clone(), &amessage) - .map_err(Error::from) - .map(|(output, mut messages)| { - if let Some(b) = output { - outputs.insert(instance_uid.clone(), b); - } - outgoing.append(&mut messages); - (outputs, outgoing) - }) - } - } - } - }, - ); + // The result defaults to error. + let mut result = Err(Error::NoSuchAgreementInstance); - if let Ok((outputs, mut outgoing)) = on_input_result { + // Send the message to the local instance of Agreement + if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&element_proposer_id) + { + // Optional output of agreement and outgoing agreement + // messages to remote nodes. + result = if agreement_instance.terminated() { + // This instance has terminated and does not accept input. + Ok((None, VecDeque::new())) + } else { + // Send the message to the agreement instance. + agreement_instance + .on_input(msg_sender_id, &amessage) + .map_err(Error::from) + } + } + + if let Ok((output, mut outgoing)) = result { // Process Agreement outputs. - outputs.iter().map(|(output_uid, &output_value)| { - outgoing.append(&mut self.on_agreement_result(output_uid.clone(), output_value)); - }); + if let Some(b) = output { + outgoing.append(&mut self.on_agreement_result(element_proposer_id, b)); + } // Check whether Agreement has completed. Ok(( @@ -211,19 +207,23 @@ impl CommonSubset { )) } else { // error - on_input_result + result .map(|(_, messages)| (None, messages.into_iter().map(Output::Agreement).collect())) } } /// Callback to be invoked on receipt of a returned value of the Agreement /// instance `uid`. - fn on_agreement_result(&mut self, uid: NodeUid, result: bool) -> VecDeque { + fn on_agreement_result( + &mut self, + element_proposer_id: NodeUid, + result: bool, + ) -> VecDeque { let mut outgoing = VecDeque::new(); // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. if result { - self.agreement_results.insert(uid, result); + self.agreement_results.insert(element_proposer_id, result); // The number of instances of BA that output 1. let results1 = self.agreement_results.values().filter(|v| **v).count(); From e0005a672b62c8008141331196296b5c4fbaf8c4 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Wed, 9 May 2018 15:27:31 +0100 Subject: [PATCH 06/14] removed the separate field in Agreement and corrected computation of estimated values --- Cargo.toml | 1 - proto/message.proto | 52 +++++++------- src/agreement.rs | 159 ++++++++++++++++++++++--------------------- src/common_subset.rs | 12 ++-- src/lib.rs | 1 - 5 files changed, 112 insertions(+), 113 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f3eb1a18..4e681600 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ ring = "^0.12" protobuf = "1.4.4" crossbeam = "0.3.2" crossbeam-channel = "0.1" -rand = "0.3" [build-dependencies] protoc-rust = "1.4.4" diff --git a/proto/message.proto b/proto/message.proto index 5b070825..dbfb0fb3 100644 --- a/proto/message.proto +++ b/proto/message.proto @@ -1,52 +1,52 @@ syntax = "proto3"; message MessageProto { - oneof payload { - BroadcastProto broadcast = 1; - AgreementProto agreement = 2; - } + oneof payload { + BroadcastProto broadcast = 1; + AgreementProto agreement = 2; + } } message BroadcastProto { - oneof payload { - ValueProto value = 1; - EchoProto echo = 2; - ReadyProto ready = 3; - } + oneof payload { + ValueProto value = 1; + EchoProto echo = 2; + ReadyProto ready = 3; + } } message ValueProto { - ProofProto proof = 1; + ProofProto proof = 1; } message EchoProto { - ProofProto proof = 1; + ProofProto proof = 1; } message ReadyProto { - bytes root_hash = 1; + bytes root_hash = 1; } message ProofProto { - bytes root_hash = 1; - LemmaProto lemma = 2; - bytes value = 3; + bytes root_hash = 1; + LemmaProto lemma = 2; + bytes value = 3; } message LemmaProto { - bytes node_hash = 1; - LemmaProto sub_lemma = 2; + bytes node_hash = 1; + LemmaProto sub_lemma = 2; - oneof sibling_hash { - bytes left_sibling_hash = 3; - bytes right_sibling_hash = 4; - } + oneof sibling_hash { + bytes left_sibling_hash = 3; + bytes right_sibling_hash = 4; + } } message AgreementProto { - uint32 epoch = 1; - oneof payload { - bool bval = 2; - bool aux = 3; - } + uint32 epoch = 1; + oneof payload { + bool bval = 2; + bool aux = 3; + } } \ No newline at end of file diff --git a/src/agreement.rs b/src/agreement.rs index 5c466d35..64090c25 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,11 +1,14 @@ //! Binary Byzantine agreement protocol from a common coin protocol. -use rand::random; use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::hash::Hash; use proto::message; +/// Type of output from the Agreement message handler. The first component is +/// the value on which the Agreement has decided, also called "output" in the +/// HoneyadgerBFT paper. The second component is a queue of messages to be sent +/// to remote nodes as a result of handling the incomming message. type AgreementOutput = (Option, VecDeque); /// Messages sent during the binary Byzantine agreement stage. @@ -53,7 +56,6 @@ pub struct Agreement { num_nodes: usize, num_faulty_nodes: usize, epoch: u32, - input: Option, /// Bin values. Reset on every epoch update. bin_values: BTreeSet, /// Values received in BVAL messages. Reset on every epoch update. @@ -62,9 +64,15 @@ pub struct Agreement { sent_bval: BTreeSet, /// Values received in AUX messages. Reset on every epoch update. received_aux: HashMap>, - /// All the output values in all epochs. + /// Estimates of the decision value in all epochs. The first estimated value + /// is provided as input by Common Subset using the `set_input` function + /// which triggers the algorithm to start. estimated: BTreeMap, - /// Termination flag. + /// Termination flag. The Agreement instance doesn't terminate immediately + /// upon deciding on the agreed value. This is done in order to help other + /// nodes decide despite asynchrony of communication. Once the instance + /// determines that all the remote nodes have reached agreement, it sets the + /// `terminated` flag and accepts no more incoming messages. terminated: bool, } @@ -77,7 +85,6 @@ impl Agreement { num_nodes, num_faulty_nodes, epoch: 0, - input: None, bin_values: BTreeSet::new(), received_bval: HashMap::new(), sent_bval: BTreeSet::new(), @@ -92,26 +99,31 @@ impl Agreement { self.terminated } - pub fn set_input(&mut self, input: bool) -> AgreementMessage { - self.input = Some(input); + pub fn set_input(&mut self, input: bool) -> Result { + if self.epoch != 0 { + return Err(Error::InputNotAccepted); + } + + // Set the initial estimated value to the input value. + self.estimated.insert(self.epoch, input); // Receive the BVAL message locally. self.received_bval .entry(self.uid.clone()) .or_insert_with(BTreeSet::new) .insert(input); // Multicast BVAL - AgreementMessage::BVal((self.epoch, input)) + Ok(AgreementMessage::BVal((self.epoch, input))) } pub fn has_input(&self) -> bool { - self.input.is_some() + self.estimated.get(&0).is_some() } /// Receive input from a remote node. /// /// Outputs an optional agreement result and a queue of agreement messages /// to remote nodes. There can be up to 2 messages. - pub fn on_input( + pub fn handle_agreement_message( &mut self, sender_id: NodeUid, message: &AgreementMessage, @@ -120,16 +132,20 @@ impl Agreement { // The algorithm instance has already terminated. _ if self.terminated => Err(Error::Terminated), - AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => self.on_bval(sender_id, b), + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => { + self.handle_bval(sender_id, b) + } - AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => self.on_aux(sender_id, b), + AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => { + self.handle_aux(sender_id, b) + } // Epoch does not match. Ignore the message. _ => Ok((None, VecDeque::new())), } } - fn on_bval(&mut self, sender_id: NodeUid, b: bool) -> Result { + fn handle_bval(&mut self, sender_id: NodeUid, b: bool) -> Result { let mut outgoing = VecDeque::new(); self.received_bval @@ -159,10 +175,7 @@ impl Agreement { } let coin_result = self.try_coin(); - if let Some(output_message) = coin_result.1 { - outgoing.push_back(output_message); - } - Ok((coin_result.0, outgoing)) + Ok((coin_result, outgoing)) } // upon receiving BVAL_r(b) messages from f + 1 nodes, if // BVAL_r(b) has not been sent, multicast BVAL_r(b) @@ -179,21 +192,16 @@ impl Agreement { } } - fn on_aux(&mut self, sender_id: NodeUid, b: bool) -> Result { - let mut outgoing = VecDeque::new(); - + fn handle_aux(&mut self, sender_id: NodeUid, b: bool) -> Result { self.received_aux .entry(sender_id) .or_insert_with(BTreeSet::new) .insert(b); if !self.bin_values.is_empty() { let coin_result = self.try_coin(); - if let Some(output_message) = coin_result.1 { - outgoing.push_back(output_message); - } - Ok((coin_result.0, outgoing)) + Ok((coin_result, VecDeque::new())) } else { - Ok((None, outgoing)) + Ok((None, VecDeque::new())) } } @@ -208,15 +216,14 @@ impl Agreement { /// can, however, expect every good node to send an AUX value that will /// eventually end up in our bin_values. fn count_aux(&self) -> (usize, BTreeSet) { - let vals = BTreeSet::new(); - ( - self.received_aux - .values() - .filter(|values| values.is_subset(&self.bin_values)) - .map(|values| vals.union(values)) - .count(), - vals, - ) + let vals: BTreeSet = self.received_aux + .values() + .filter(|values| values.is_subset(&self.bin_values)) + .fold(BTreeSet::new(), |vals, values| { + vals.union(values).cloned().collect() + }); + + (vals.len(), vals) } /// Waits until at least (N − f) AUX_r messages have been received, such that @@ -225,59 +232,53 @@ impl Agreement { /// messages are received, thus this condition may be triggered upon arrival /// of either an AUX_r or a BVAL_r message). /// - /// `try_coin` outputs an optional combination of the agreement value and - /// the agreement broadcast message. - fn try_coin(&mut self) -> (Option, Option) { + /// `try_coin` outputs an optional decision value of the agreement instance. + fn try_coin(&mut self) -> Option { let (count_aux, vals) = self.count_aux(); if count_aux < self.num_nodes - self.num_faulty_nodes { // Continue waiting for the (N - f) AUX messages. - (None, None) - } else { - // FIXME: Implement the Common Coin algorithm. At the moment the - // coin value is random and local to each instance of Agreement. - let coin2 = random::(); - - // Check the termination condition: "continue looping until both a - // value b is output in some round r, and the value Coin_r' = b for - // some round r' > r." - self.terminated = self.terminated || self.estimated.values().any(|b| *b == coin2); - - // Prepare to start the next epoch. - self.bin_values.clear(); - - if vals.len() != 1 { - // Start the next epoch. - self.epoch += 1; - (None, Some(self.set_input(coin2))) - } else { - let mut message = None; - // NOTE: `vals` has exactly one element due to `vals.len() == 1` - let output: Vec> = vals.into_iter() - .take(1) - .map(|b| { - message = Some(self.set_input(b)); - - if b == coin2 { - // Record the output to perform a termination check later. - self.estimated.insert(self.epoch, b); - // Output the agreement value. - Some(b) - } else { - // Don't output a value. - None - } - }) - .collect(); - // Start the next epoch. - self.epoch += 1; - (output[0], message) - } + return None; + } + + // FIXME: Implement the Common Coin algorithm. At the moment the + // coin value is common across different nodes but not random. + let coin2 = (self.epoch % 2) == 0; + + // Check the termination condition: "continue looping until both a + // value b is output in some round r, and the value Coin_r' = b for + // some round r' > r." + self.terminated = self.terminated || self.estimated.values().any(|b| *b == coin2); + + // Start the next epoch. + self.bin_values.clear(); + self.epoch += 1; + + if vals.len() != 1 { + self.estimated.insert(self.epoch, coin2); + return None; } + + // NOTE: `vals` has exactly one element due to `vals.len() == 1` + let output: Vec> = vals.into_iter() + .take(1) + .map(|b| { + self.estimated.insert(self.epoch, b); + if b == coin2 { + // Output the agreement value. + Some(b) + } else { + // Don't output a value. + None + } + }) + .collect(); + + output[0] } } #[derive(Clone, Debug)] pub enum Error { Terminated, - NotImplemented, + InputNotAccepted, } diff --git a/src/common_subset.rs b/src/common_subset.rs index 3f1b9559..5d2a4aec 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -109,7 +109,7 @@ impl CommonSubset { fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result, Error> { if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { if !agreement_instance.has_input() { - Ok(Some(agreement_instance.set_input(true))) + Ok(Some(agreement_instance.set_input(true)?)) } else { Ok(None) } @@ -189,7 +189,7 @@ impl CommonSubset { } else { // Send the message to the agreement instance. agreement_instance - .on_input(msg_sender_id, &amessage) + .handle_agreement_message(msg_sender_id, &amessage) .map_err(Error::from) } } @@ -197,7 +197,7 @@ impl CommonSubset { if let Ok((output, mut outgoing)) = result { // Process Agreement outputs. if let Some(b) = output { - outgoing.append(&mut self.on_agreement_result(element_proposer_id, b)); + outgoing.append(&mut self.on_agreement_result(element_proposer_id, b)?); } // Check whether Agreement has completed. @@ -218,7 +218,7 @@ impl CommonSubset { &mut self, element_proposer_id: NodeUid, result: bool, - ) -> VecDeque { + ) -> Result, Error> { let mut outgoing = VecDeque::new(); // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. @@ -230,12 +230,12 @@ impl CommonSubset { if results1 >= self.num_nodes - self.num_faulty_nodes { for instance in self.agreement_instances.values_mut() { if !instance.has_input() { - outgoing.push_back(instance.set_input(false)); + outgoing.push_back(instance.set_input(false)?); } } } } - outgoing + Ok(outgoing) } fn try_agreement_completion(&self) -> Option> { diff --git a/src/lib.rs b/src/lib.rs index 2e3b5a17..58deb95c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,6 @@ extern crate crossbeam; extern crate crossbeam_channel; extern crate merkle; extern crate protobuf; -extern crate rand; extern crate reed_solomon_erasure; extern crate ring; From b6a6bb35ea3dcd3d53440d6867e89494bb1b75ca Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Wed, 9 May 2018 15:36:02 +0100 Subject: [PATCH 07/14] clear the received AUX messages on every epoch update --- src/agreement.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 64090c25..d81b55c9 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -63,7 +63,7 @@ pub struct Agreement { /// Sent BVAL values. Reset on every epoch update. sent_bval: BTreeSet, /// Values received in AUX messages. Reset on every epoch update. - received_aux: HashMap>, + received_aux: HashMap, /// Estimates of the decision value in all epochs. The first estimated value /// is provided as input by Common Subset using the `set_input` function /// which triggers the algorithm to start. @@ -168,10 +168,7 @@ impl Agreement { // Send an AUX message at most once per epoch. outgoing.push_back(AgreementMessage::Aux((self.epoch, b))); // Receive the AUX message locally. - self.received_aux - .entry(self.uid.clone()) - .or_insert_with(BTreeSet::new) - .insert(b); + self.received_aux.insert(self.uid.clone(), b); } let coin_result = self.try_coin(); @@ -193,10 +190,7 @@ impl Agreement { } fn handle_aux(&mut self, sender_id: NodeUid, b: bool) -> Result { - self.received_aux - .entry(sender_id) - .or_insert_with(BTreeSet::new) - .insert(b); + self.received_aux.insert(sender_id, b); if !self.bin_values.is_empty() { let coin_result = self.try_coin(); Ok((coin_result, VecDeque::new())) @@ -216,12 +210,13 @@ impl Agreement { /// can, however, expect every good node to send an AUX value that will /// eventually end up in our bin_values. fn count_aux(&self) -> (usize, BTreeSet) { - let vals: BTreeSet = self.received_aux - .values() - .filter(|values| values.is_subset(&self.bin_values)) - .fold(BTreeSet::new(), |vals, values| { - vals.union(values).cloned().collect() - }); + let mut vals: BTreeSet = BTreeSet::new(); + + for b in self.received_aux.values() { + if self.bin_values.contains(b) { + vals.insert(b.clone()); + } + } (vals.len(), vals) } @@ -251,6 +246,7 @@ impl Agreement { // Start the next epoch. self.bin_values.clear(); + self.received_aux.clear(); self.epoch += 1; if vals.len() != 1 { From 259d5369b02f45967c75031d02705e5751cc9124 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Wed, 9 May 2018 16:39:45 +0100 Subject: [PATCH 08/14] corrected the count of AUX messages --- src/agreement.rs | 18 ++++++++++-------- src/common_subset.rs | 3 +-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index d81b55c9..9a02934b 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -210,15 +210,17 @@ impl Agreement { /// can, however, expect every good node to send an AUX value that will /// eventually end up in our bin_values. fn count_aux(&self) -> (usize, BTreeSet) { - let mut vals: BTreeSet = BTreeSet::new(); - - for b in self.received_aux.values() { - if self.bin_values.contains(b) { - vals.insert(b.clone()); - } - } + let mut count = 0; + let vals: BTreeSet = self.received_aux + .values() + .filter(|b| { + count += 1; + self.bin_values.contains(b) + }) + .cloned() + .collect(); - (vals.len(), vals) + (count, vals) } /// Waits until at least (N − f) AUX_r messages have been received, such that diff --git a/src/common_subset.rs b/src/common_subset.rs index 5d2a4aec..97087e17 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -179,8 +179,7 @@ impl CommonSubset { let mut result = Err(Error::NoSuchAgreementInstance); // Send the message to the local instance of Agreement - if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&element_proposer_id) - { + if let Some(agreement_instance) = self.agreement_instances.get_mut(&element_proposer_id) { // Optional output of agreement and outgoing agreement // messages to remote nodes. result = if agreement_instance.terminated() { From 51ef11b55c711ff25934f06a8c5810eeeef05da7 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 10 May 2018 09:57:58 +0100 Subject: [PATCH 09/14] fixed the count of matching AUX messages --- Cargo.toml | 1 + src/agreement.rs | 14 +++++--------- src/lib.rs | 1 + 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4e681600..257ef307 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ ring = "^0.12" protobuf = "1.4.4" crossbeam = "0.3.2" crossbeam-channel = "0.1" +itertools = "0.7" [build-dependencies] protoc-rust = "1.4.4" diff --git a/src/agreement.rs b/src/agreement.rs index 9a02934b..0cf9d943 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,5 +1,6 @@ //! Binary Byzantine agreement protocol from a common coin protocol. +use itertools::Itertools; use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::hash::Hash; @@ -210,17 +211,12 @@ impl Agreement { /// can, however, expect every good node to send an AUX value that will /// eventually end up in our bin_values. fn count_aux(&self) -> (usize, BTreeSet) { - let mut count = 0; - let vals: BTreeSet = self.received_aux + let (vals_cnt, vals) = self.received_aux .values() - .filter(|b| { - count += 1; - self.bin_values.contains(b) - }) - .cloned() - .collect(); + .filter(|b| self.bin_values.contains(b)) + .tee(); - (count, vals) + (vals_cnt.count(), vals.cloned().collect()) } /// Waits until at least (N − f) AUX_r messages have been received, such that diff --git a/src/lib.rs b/src/lib.rs index 58deb95c..53e63f60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ extern crate log; extern crate crossbeam; extern crate crossbeam_channel; +extern crate itertools; extern crate merkle; extern crate protobuf; extern crate reed_solomon_erasure; From cf33bac533532d3c6b05a1b0dd6d38e29f32590e Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 10 May 2018 11:01:25 +0100 Subject: [PATCH 10/14] added the proposer ID to common_subset::handle_broadcast and mae it an interface fn --- src/common_subset.rs | 71 ++++++++++++++++---------------------------- 1 file changed, 26 insertions(+), 45 deletions(-) diff --git a/src/common_subset.rs b/src/common_subset.rs index 97087e17..5bdd0648 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -18,19 +18,6 @@ type ProposedValue = Vec; // Type of output from the Common Subset message handler. type CommonSubsetOutput = (Option>, VecDeque>); -/// Input from a remote node to Common Subset. -pub enum Input { - /// Message from a remote node `uid` to the broadcast instance `uid`. - Broadcast(NodeUid, BroadcastMessage), - /// Message from a remote node `message_sender_id` concerning the common - /// subset element proposed by the node `element_proposer_id`. - Agreement { - message_sender_id: NodeUid, - element_proposer_id: NodeUid, - message: AgreementMessage, - }, -} - /// Output from Common Subset to remote nodes. pub enum Output { /// A broadcast message to be sent to the destination set in the @@ -118,34 +105,22 @@ impl CommonSubset { } } - /// Receive input from a remote node. The output contains an optional result - /// of the Common Subset algorithm - a set of proposed values - and a queue - /// of messages to be sent to remote nodes, or an error. - pub fn on_input( - &mut self, - input_message: Input, - ) -> Result, Error> { - match input_message { - Input::Broadcast(uid, bmessage) => self.on_input_broadcast(&uid, bmessage), - - Input::Agreement { - message_sender_id, - element_proposer_id, - message, - } => self.on_input_agreement(message_sender_id, element_proposer_id, &message), - } - } - - fn on_input_broadcast( + /// Receives a broadcast message from a remote node `sender_id` concerning a + /// value proposed by the node `proposer_id`. The output contains an + /// optional result of the Common Subset algorithm - a set of proposed + /// values - and a queue of messages to be sent to remote nodes, or an + /// error. + pub fn handle_broadcast( &mut self, - uid: &NodeUid, + sender_id: &NodeUid, + proposer_id: &NodeUid, bmessage: BroadcastMessage, ) -> Result, Error> { let mut instance_result = None; let input_result: Result>, Error> = { - if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) { + if let Some(broadcast_instance) = self.broadcast_instances.get(proposer_id) { broadcast_instance - .handle_broadcast_message(&uid, bmessage) + .handle_broadcast_message(sender_id, bmessage) .map(|(opt_value, queue)| { instance_result = opt_value; queue.into_iter().map(Output::Broadcast).collect() @@ -157,8 +132,8 @@ impl CommonSubset { }; let mut opt_message: Option = None; if let Some(value) = instance_result { - self.broadcast_results.insert(uid.clone(), value); - opt_message = self.on_broadcast_result(&uid)?; + self.broadcast_results.insert(proposer_id.clone(), value); + opt_message = self.on_broadcast_result(proposer_id)?; } input_result.map(|mut queue| { if let Some(agreement_message) = opt_message { @@ -169,17 +144,22 @@ impl CommonSubset { }) } - fn on_input_agreement( + /// Receives an agreement message from a remote node `sender_id` concerning + /// a value proposed by the node `proposer_id`. The output contains an + /// optional result of the Common Subset algorithm - a set of proposed + /// values - and a queue of messages to be sent to remote nodes, or an + /// error. + pub fn handle_agreement( &mut self, - msg_sender_id: NodeUid, - element_proposer_id: NodeUid, + sender_id: &NodeUid, + proposer_id: &NodeUid, amessage: &AgreementMessage, ) -> Result, Error> { // The result defaults to error. let mut result = Err(Error::NoSuchAgreementInstance); // Send the message to the local instance of Agreement - if let Some(agreement_instance) = self.agreement_instances.get_mut(&element_proposer_id) { + if let Some(agreement_instance) = self.agreement_instances.get_mut(proposer_id) { // Optional output of agreement and outgoing agreement // messages to remote nodes. result = if agreement_instance.terminated() { @@ -188,7 +168,7 @@ impl CommonSubset { } else { // Send the message to the agreement instance. agreement_instance - .handle_agreement_message(msg_sender_id, &amessage) + .handle_agreement_message(sender_id.clone(), &amessage) .map_err(Error::from) } } @@ -196,7 +176,7 @@ impl CommonSubset { if let Ok((output, mut outgoing)) = result { // Process Agreement outputs. if let Some(b) = output { - outgoing.append(&mut self.on_agreement_result(element_proposer_id, b)?); + outgoing.append(&mut self.on_agreement_result(proposer_id, b)?); } // Check whether Agreement has completed. @@ -215,14 +195,15 @@ impl CommonSubset { /// instance `uid`. fn on_agreement_result( &mut self, - element_proposer_id: NodeUid, + element_proposer_id: &NodeUid, result: bool, ) -> Result, Error> { let mut outgoing = VecDeque::new(); // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. if result { - self.agreement_results.insert(element_proposer_id, result); + self.agreement_results + .insert(element_proposer_id.clone(), result); // The number of instances of BA that output 1. let results1 = self.agreement_results.values().filter(|v| **v).count(); From fb50e38eadd2c6e0824e0066a492200ce90e4435 Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 10 May 2018 12:09:22 +0100 Subject: [PATCH 11/14] replaced the map of estimated values with only one optional value for the current epoch --- src/agreement.rs | 81 ++++++++++++++++++++++---------------------- src/common_subset.rs | 6 ++-- 2 files changed, 44 insertions(+), 43 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 0cf9d943..4ea3307d 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -1,7 +1,7 @@ //! Binary Byzantine agreement protocol from a common coin protocol. use itertools::Itertools; -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeSet, HashMap, VecDeque}; use std::hash::Hash; use proto::message; @@ -51,6 +51,7 @@ impl AgreementMessage { } } +/// Binary Agreement instance. pub struct Agreement { /// The UID of the corresponding proposer node. uid: NodeUid, @@ -65,10 +66,12 @@ pub struct Agreement { sent_bval: BTreeSet, /// Values received in AUX messages. Reset on every epoch update. received_aux: HashMap, - /// Estimates of the decision value in all epochs. The first estimated value - /// is provided as input by Common Subset using the `set_input` function - /// which triggers the algorithm to start. - estimated: BTreeMap, + /// The estimate of the decision value in the current epoch. + estimated: Option, + /// The value output by the agreement instance. It is set once to `Some(b)` + /// and then never changed. That is, no instance of Binary Agreement can + /// decide on two different values of output. + output: Option, /// Termination flag. The Agreement instance doesn't terminate immediately /// upon deciding on the agreed value. This is done in order to help other /// nodes decide despite asynchrony of communication. Once the instance @@ -90,7 +93,8 @@ impl Agreement { received_bval: HashMap::new(), sent_bval: BTreeSet::new(), received_aux: HashMap::new(), - estimated: BTreeMap::new(), + estimated: None, + output: None, terminated: false, } } @@ -100,13 +104,14 @@ impl Agreement { self.terminated } + /// Sets the input value for agreement. pub fn set_input(&mut self, input: bool) -> Result { if self.epoch != 0 { return Err(Error::InputNotAccepted); } // Set the initial estimated value to the input value. - self.estimated.insert(self.epoch, input); + self.estimated = Some(input); // Receive the BVAL message locally. self.received_bval .entry(self.uid.clone()) @@ -116,8 +121,9 @@ impl Agreement { Ok(AgreementMessage::BVal((self.epoch, input))) } - pub fn has_input(&self) -> bool { - self.estimated.get(&0).is_some() + /// Acceptance check to be performed before setting the input value. + pub fn accepts_input(&self) -> bool { + self.epoch == 0 && self.estimated.is_none() } /// Receive input from a remote node. @@ -126,7 +132,7 @@ impl Agreement { /// to remote nodes. There can be up to 2 messages. pub fn handle_agreement_message( &mut self, - sender_id: NodeUid, + sender_id: &NodeUid, message: &AgreementMessage, ) -> Result { match *message { @@ -146,11 +152,11 @@ impl Agreement { } } - fn handle_bval(&mut self, sender_id: NodeUid, b: bool) -> Result { + fn handle_bval(&mut self, sender_id: &NodeUid, b: bool) -> Result { let mut outgoing = VecDeque::new(); self.received_bval - .entry(sender_id) + .entry(sender_id.clone()) .or_insert_with(BTreeSet::new) .insert(b); let count_bval = self.received_bval @@ -172,8 +178,8 @@ impl Agreement { self.received_aux.insert(self.uid.clone(), b); } - let coin_result = self.try_coin(); - Ok((coin_result, outgoing)) + self.try_coin(); + Ok((self.output, outgoing)) } // upon receiving BVAL_r(b) messages from f + 1 nodes, if // BVAL_r(b) has not been sent, multicast BVAL_r(b) @@ -190,11 +196,11 @@ impl Agreement { } } - fn handle_aux(&mut self, sender_id: NodeUid, b: bool) -> Result { - self.received_aux.insert(sender_id, b); + fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result { + self.received_aux.insert(sender_id.clone(), b); if !self.bin_values.is_empty() { - let coin_result = self.try_coin(); - Ok((coin_result, VecDeque::new())) + self.try_coin(); + Ok((self.output, VecDeque::new())) } else { Ok((None, VecDeque::new())) } @@ -225,22 +231,24 @@ impl Agreement { /// messages are received, thus this condition may be triggered upon arrival /// of either an AUX_r or a BVAL_r message). /// - /// `try_coin` outputs an optional decision value of the agreement instance. - fn try_coin(&mut self) -> Option { + /// Once the (N - f) messages are received, gets a common coin and uses it + /// to compute the next decision estimate and, optionally, sets the output + /// decision value. + fn try_coin(&mut self) { let (count_aux, vals) = self.count_aux(); if count_aux < self.num_nodes - self.num_faulty_nodes { // Continue waiting for the (N - f) AUX messages. - return None; + return; } // FIXME: Implement the Common Coin algorithm. At the moment the // coin value is common across different nodes but not random. - let coin2 = (self.epoch % 2) == 0; + let coin = (self.epoch % 2) == 0; // Check the termination condition: "continue looping until both a // value b is output in some round r, and the value Coin_r' = b for // some round r' > r." - self.terminated = self.terminated || self.estimated.values().any(|b| *b == coin2); + self.terminated = self.terminated || self.output == Some(coin); // Start the next epoch. self.bin_values.clear(); @@ -248,26 +256,19 @@ impl Agreement { self.epoch += 1; if vals.len() != 1 { - self.estimated.insert(self.epoch, coin2); - return None; + self.estimated = Some(coin); + return; } // NOTE: `vals` has exactly one element due to `vals.len() == 1` - let output: Vec> = vals.into_iter() - .take(1) - .map(|b| { - self.estimated.insert(self.epoch, b); - if b == coin2 { - // Output the agreement value. - Some(b) - } else { - // Don't output a value. - None - } - }) - .collect(); - - output[0] + let v: Vec = vals.into_iter().collect(); + let b = v[0]; + self.estimated = Some(b); + // Setting the output value is allowed only once. + if self.output.is_none() && b == coin { + // Output the agreement value. + self.output = Some(b); + } } } diff --git a/src/common_subset.rs b/src/common_subset.rs index 5bdd0648..cd6fc94f 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -95,7 +95,7 @@ impl CommonSubset { /// BA_j, then provide input 1 to BA_j. See Figure 11. fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result, Error> { if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { - if !agreement_instance.has_input() { + if agreement_instance.accepts_input() { Ok(Some(agreement_instance.set_input(true)?)) } else { Ok(None) @@ -168,7 +168,7 @@ impl CommonSubset { } else { // Send the message to the agreement instance. agreement_instance - .handle_agreement_message(sender_id.clone(), &amessage) + .handle_agreement_message(sender_id, &amessage) .map_err(Error::from) } } @@ -209,7 +209,7 @@ impl CommonSubset { if results1 >= self.num_nodes - self.num_faulty_nodes { for instance in self.agreement_instances.values_mut() { - if !instance.has_input() { + if instance.accepts_input() { outgoing.push_back(instance.set_input(false)?); } } From 49c33d235746f774527b29d550ace71775ed3aab Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 10 May 2018 12:10:42 +0100 Subject: [PATCH 12/14] added a dev dependency on rand for CI --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 257ef307..98f477ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,4 @@ protoc-rust = "1.4.4" [dev-dependencies] docopt = "0.8" +rand = "0.3" From 68e6a7ae2ddb2c97a56a1d7c907ed6b25137ac4b Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 10 May 2018 12:33:01 +0100 Subject: [PATCH 13/14] added the missing agreement broadcast message on epoch change --- src/agreement.rs | 41 ++++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 4ea3307d..4ccb71c7 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -178,7 +178,7 @@ impl Agreement { self.received_aux.insert(self.uid.clone(), b); } - self.try_coin(); + outgoing.extend(self.try_coin()); Ok((self.output, outgoing)) } // upon receiving BVAL_r(b) messages from f + 1 nodes, if @@ -198,11 +198,12 @@ impl Agreement { fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result { self.received_aux.insert(sender_id.clone(), b); + let mut outgoing = VecDeque::new(); if !self.bin_values.is_empty() { - self.try_coin(); - Ok((self.output, VecDeque::new())) + outgoing.extend(self.try_coin()); + Ok((self.output, outgoing)) } else { - Ok((None, VecDeque::new())) + Ok((None, outgoing)) } } @@ -233,12 +234,13 @@ impl Agreement { /// /// Once the (N - f) messages are received, gets a common coin and uses it /// to compute the next decision estimate and, optionally, sets the output - /// decision value. - fn try_coin(&mut self) { + /// decision value. The function may start the next epoch. In that case, it + /// returns a message for broadcast. + fn try_coin(&mut self) -> VecDeque { let (count_aux, vals) = self.count_aux(); if count_aux < self.num_nodes - self.num_faulty_nodes { // Continue waiting for the (N - f) AUX messages. - return; + return VecDeque::new(); } // FIXME: Implement the Common Coin algorithm. At the moment the @@ -257,18 +259,23 @@ impl Agreement { if vals.len() != 1 { self.estimated = Some(coin); - return; + } else { + // NOTE: `vals` has exactly one element due to `vals.len() == 1` + let v: Vec = vals.into_iter().collect(); + let b = v[0]; + self.estimated = Some(b); + // Setting the output value is allowed only once. + if self.output.is_none() && b == coin { + // Output the agreement value. + self.output = Some(b); + } } - // NOTE: `vals` has exactly one element due to `vals.len() == 1` - let v: Vec = vals.into_iter().collect(); - let b = v[0]; - self.estimated = Some(b); - // Setting the output value is allowed only once. - if self.output.is_none() && b == coin { - // Output the agreement value. - self.output = Some(b); - } + vec![AgreementMessage::BVal(( + self.epoch, + self.estimated.unwrap(), + ))].into_iter() + .collect() } } From 57ff64cce0a50d9ad649ee3fd996322637aeeb8d Mon Sep 17 00:00:00 2001 From: Vladimir Komendantskiy Date: Thu, 10 May 2018 12:44:33 +0100 Subject: [PATCH 14/14] correction: Agreement outputs a value only once --- src/agreement.rs | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/src/agreement.rs b/src/agreement.rs index 4ccb71c7..195bd36a 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -178,8 +178,9 @@ impl Agreement { self.received_aux.insert(self.uid.clone(), b); } - outgoing.extend(self.try_coin()); - Ok((self.output, outgoing)) + let (decision, maybe_message) = self.try_coin(); + outgoing.extend(maybe_message); + Ok((decision, outgoing)) } // upon receiving BVAL_r(b) messages from f + 1 nodes, if // BVAL_r(b) has not been sent, multicast BVAL_r(b) @@ -200,8 +201,9 @@ impl Agreement { self.received_aux.insert(sender_id.clone(), b); let mut outgoing = VecDeque::new(); if !self.bin_values.is_empty() { - outgoing.extend(self.try_coin()); - Ok((self.output, outgoing)) + let (decision, maybe_message) = self.try_coin(); + outgoing.extend(maybe_message); + Ok((decision, outgoing)) } else { Ok((None, outgoing)) } @@ -233,14 +235,14 @@ impl Agreement { /// of either an AUX_r or a BVAL_r message). /// /// Once the (N - f) messages are received, gets a common coin and uses it - /// to compute the next decision estimate and, optionally, sets the output - /// decision value. The function may start the next epoch. In that case, it + /// to compute the next decision estimate and outputs the optional decision + /// value. The function may start the next epoch. In that case, it also /// returns a message for broadcast. - fn try_coin(&mut self) -> VecDeque { + fn try_coin(&mut self) -> (Option, VecDeque) { let (count_aux, vals) = self.count_aux(); if count_aux < self.num_nodes - self.num_faulty_nodes { // Continue waiting for the (N - f) AUX messages. - return VecDeque::new(); + return (None, VecDeque::new()); } // FIXME: Implement the Common Coin algorithm. At the moment the @@ -257,25 +259,32 @@ impl Agreement { self.received_aux.clear(); self.epoch += 1; - if vals.len() != 1 { + let decision = if vals.len() != 1 { self.estimated = Some(coin); + None } else { // NOTE: `vals` has exactly one element due to `vals.len() == 1` let v: Vec = vals.into_iter().collect(); let b = v[0]; self.estimated = Some(b); - // Setting the output value is allowed only once. + // Outputting a value is allowed only once. if self.output.is_none() && b == coin { // Output the agreement value. self.output = Some(b); + self.output + } else { + None } - } - - vec![AgreementMessage::BVal(( - self.epoch, - self.estimated.unwrap(), - ))].into_iter() - .collect() + }; + + ( + decision, + vec![AgreementMessage::BVal(( + self.epoch, + self.estimated.unwrap(), + ))].into_iter() + .collect(), + ) } }