-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove protoc dep, use new MerkleTree methods, fix Agreement. #32
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ use itertools::Itertools; | |
use std::collections::{BTreeSet, HashMap, VecDeque}; | ||
use std::fmt::Debug; | ||
use std::hash::Hash; | ||
use std::mem; | ||
|
||
use messaging::{DistAlgorithm, Target, TargetedMessage}; | ||
|
||
|
@@ -17,6 +18,15 @@ pub enum AgreementMessage { | |
Aux(u32, bool), | ||
} | ||
|
||
impl AgreementMessage { | ||
fn epoch(&self) -> u32 { | ||
match *self { | ||
AgreementMessage::BVal(epoch, _) => epoch, | ||
AgreementMessage::Aux(epoch, _) => epoch, | ||
} | ||
} | ||
} | ||
|
||
/// Binary Agreement instance | ||
pub struct Agreement<NodeUid> { | ||
/// The UID of the corresponding proposer node. | ||
|
@@ -44,6 +54,9 @@ pub struct Agreement<NodeUid> { | |
/// ever there at all. While the output value will still be required in a later epoch to decide | ||
/// the termination state. | ||
decision: Option<bool>, | ||
/// A cache for messages for future epochs that cannot be handled yet. | ||
// TODO: Find a better solution for this; defend against spam. | ||
incoming_queue: Vec<(NodeUid, AgreementMessage)>, | ||
/// Termination flag. The Agreement instance doesn't terminate immediately | ||
/// upon deciding on the agreed value. This is done in order to help other | ||
/// nodes decide despite asynchrony of communication. Once the instance | ||
|
@@ -71,18 +84,20 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU | |
sender_id: &Self::NodeUid, | ||
message: Self::Message, | ||
) -> Result<(), Self::Error> { | ||
if self.terminated { | ||
return Err(Error::Terminated); | ||
} | ||
if message.epoch() < self.epoch { | ||
return Ok(()); // Message is obsolete: We are already in a later epoch. | ||
} | ||
if message.epoch() > self.epoch { | ||
// Message is for a later epoch. We can't handle that yet. | ||
self.incoming_queue.push((sender_id.clone(), message)); | ||
return Ok(()); | ||
} | ||
match message { | ||
// The algorithm instance has already terminated. | ||
_ if self.terminated => Err(Error::Terminated), | ||
|
||
AgreementMessage::BVal(epoch, b) if epoch == self.epoch => { | ||
self.handle_bval(sender_id, b) | ||
} | ||
|
||
AgreementMessage::Aux(epoch, b) if epoch == self.epoch => self.handle_aux(sender_id, b), | ||
|
||
// Epoch does not match. Ignore the message. | ||
_ => Ok(()), | ||
AgreementMessage::BVal(_, b) => self.handle_bval(sender_id, b), | ||
AgreementMessage::Aux(_, b) => self.handle_aux(sender_id, b), | ||
} | ||
} | ||
|
||
|
@@ -108,7 +123,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash + Ord> DistAlgorithm for Agreement<NodeU | |
} | ||
} | ||
|
||
impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { | ||
impl<NodeUid: Clone + Debug + Eq + Hash + Ord> Agreement<NodeUid> { | ||
pub fn new(uid: NodeUid, num_nodes: usize) -> Self { | ||
let num_faulty_nodes = (num_nodes - 1) / 3; | ||
|
||
|
@@ -124,6 +139,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { | |
estimated: None, | ||
output: None, | ||
decision: None, | ||
incoming_queue: Vec::new(), | ||
terminated: false, | ||
messages: VecDeque::new(), | ||
} | ||
|
@@ -134,20 +150,16 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { | |
if self.epoch != 0 || self.estimated.is_some() { | ||
return Err(Error::InputNotAccepted); | ||
} | ||
if self.num_nodes == 1 { | ||
self.decision = Some(input); | ||
self.output = Some(input); | ||
self.terminated = true; | ||
} | ||
|
||
// Set the initial estimated value to the input value. | ||
self.estimated = Some(input); | ||
// Record the input value as sent. | ||
self.sent_bval.insert(input); | ||
// Receive the BVAL message locally. | ||
self.received_bval | ||
.entry(self.uid.clone()) | ||
.or_insert_with(BTreeSet::new) | ||
.insert(input); | ||
// Multicast BVAL | ||
self.messages | ||
.push_back(AgreementMessage::BVal(self.epoch, input)); | ||
Ok(()) | ||
self.send_bval(input) | ||
} | ||
|
||
/// Acceptance check to be performed before setting the input value. | ||
|
@@ -178,32 +190,35 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { | |
self.messages | ||
.push_back(AgreementMessage::Aux(self.epoch, b)); | ||
// Receive the AUX message locally. | ||
self.received_aux.insert(self.uid.clone(), b); | ||
let our_uid = self.uid.clone(); | ||
self.handle_aux(&our_uid, b)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense too. |
||
} | ||
|
||
self.try_coin(); | ||
self.try_coin()?; | ||
} | ||
// upon receiving BVAL_r(b) messages from f + 1 nodes, if | ||
// BVAL_r(b) has not been sent, multicast BVAL_r(b) | ||
else if count_bval == self.num_faulty_nodes + 1 && !self.sent_bval.contains(&b) { | ||
// Record the value `b` as sent. | ||
self.sent_bval.insert(b); | ||
// Receive the BVAL message locally. | ||
self.received_bval | ||
.entry(self.uid.clone()) | ||
.or_insert_with(BTreeSet::new) | ||
.insert(b); | ||
// Multicast BVAL. | ||
self.messages | ||
.push_back(AgreementMessage::BVal(self.epoch, b)); | ||
self.send_bval(b)?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn send_bval(&mut self, b: bool) -> Result<(), Error> { | ||
// Record the value `b` as sent. | ||
self.sent_bval.insert(b); | ||
// Multicast BVAL. | ||
self.messages | ||
.push_back(AgreementMessage::BVal(self.epoch, b)); | ||
// Receive the BVAL message locally. | ||
let our_uid = self.uid.clone(); | ||
self.handle_bval(&our_uid, b) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well spotted! |
||
} | ||
|
||
fn handle_aux(&mut self, sender_id: &NodeUid, b: bool) -> Result<(), Error> { | ||
self.received_aux.insert(sender_id.clone(), b); | ||
if !self.bin_values.is_empty() { | ||
self.try_coin(); | ||
self.try_coin()?; | ||
} | ||
Ok(()) | ||
} | ||
|
@@ -237,11 +252,11 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { | |
/// to compute the next decision estimate and outputs the optional decision | ||
/// value. The function may start the next epoch. In that case, it also | ||
/// returns a message for broadcast. | ||
fn try_coin(&mut self) { | ||
fn try_coin(&mut self) -> Result<(), Error> { | ||
let (count_aux, vals) = self.count_aux(); | ||
if count_aux < self.num_nodes - self.num_faulty_nodes { | ||
// Continue waiting for the (N - f) AUX messages. | ||
return; | ||
return Ok(()); | ||
} | ||
|
||
debug!("{:?} try_coin in epoch {}", self.uid, self.epoch); | ||
|
@@ -255,6 +270,7 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { | |
self.terminated = self.terminated || self.decision == Some(coin); | ||
if self.terminated { | ||
debug!("Agreement instance {:?} terminated", self.uid); | ||
return Ok(()); | ||
} | ||
|
||
// Start the next epoch. | ||
|
@@ -286,9 +302,12 @@ impl<NodeUid: Clone + Debug + Eq + Hash> Agreement<NodeUid> { | |
}; | ||
|
||
let b = self.estimated.unwrap(); | ||
self.sent_bval.insert(b); | ||
self.messages | ||
.push_back(AgreementMessage::BVal(self.epoch, b)); | ||
self.send_bval(b)?; | ||
let queued_msgs = mem::replace(&mut self.incoming_queue, Vec::new()); | ||
for (sender_id, msg) in queued_msgs { | ||
self.handle_message(&sender_id, msg)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is one possible place for introducing spam control. I've no idea though how to avoid spam apart from using a set instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's true. Basically we just need to store per epoch: who sent us which We still might want to limit the number of future epochs somehow. On the other hand, that wouldn't be strictly correct… There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I created a new issue for this: #43 |
||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be
else if
.