diff --git a/src/agreement.rs b/src/agreement.rs index 95b7655d..ae4806a3 100644 --- a/src/agreement.rs +++ b/src/agreement.rs @@ -74,6 +74,11 @@ impl Agreement { let mut outgoing = VecDeque::new(); match *message { + _ if self.terminated => { + // The algorithm instance has already terminated. + Err(Error::Terminated) + } + AgreementMessage::BVal((epoch, b)) if epoch == self.epoch => { update_map_of_sets(&mut self.received_bval, uid, b); let count_bval = self.received_bval.iter().fold(0, |count, (_, values)| { @@ -113,7 +118,7 @@ impl Agreement { } } - AgreementMessage::Aux((_epoch, b)) => { + AgreementMessage::Aux((epoch, b)) if epoch == self.epoch => { update_map_of_sets(&mut self.received_aux, uid, b); if !self.bin_values.is_empty() { let coin_result = self.try_coin(); @@ -231,5 +236,6 @@ where #[derive(Clone, Debug)] pub enum Error { + Terminated, NotImplemented, } diff --git a/src/common_subset.rs b/src/common_subset.rs index 25aaa798..35968a4e 100644 --- a/src/common_subset.rs +++ b/src/common_subset.rs @@ -17,6 +17,8 @@ use proto::{AgreementMessage, BroadcastMessage}; // TODO: Make this a generic argument of `Broadcast`. type ProposedValue = Vec; +// Type of output from the Common Subset message handler. +type CommonSubsetOutput = (Option>, VecDeque>); /// Input from a remote node to Common Subset. pub enum Input { @@ -27,9 +29,6 @@ pub enum Input { } /// Output from Common Subset to remote nodes. -/// -/// FIXME: We can do an interface that doesn't need this type and instead works -/// directly with the `TargetBroadcastMessage` and `AgreementMessage`. pub enum Output { /// A broadcast message to be sent to the destination set in the /// `TargetedBroadcastMessage`. @@ -46,6 +45,8 @@ pub struct CommonSubset { broadcast_instances: HashMap>, agreement_instances: HashMap>, broadcast_results: HashMap, + /// FIXME: The result may be a set of bool rather than a single bool due to + /// the ability of Agreement to output multiple values. agreement_results: HashMap, } @@ -100,10 +101,7 @@ impl CommonSubset { /// Upon delivery of v_j from RBC_j, if input has not yet been provided to /// BA_j, then provide input 1 to BA_j. See Figure 11. - pub fn on_broadcast_result( - &mut self, - uid: &NodeUid, - ) -> Result, Error> { + fn on_broadcast_result(&mut self, uid: &NodeUid) -> Result, Error> { if let Some(agreement_instance) = self.agreement_instances.get_mut(&uid) { if !agreement_instance.has_input() { Ok(Some(agreement_instance.set_input(true))) @@ -115,20 +113,22 @@ impl CommonSubset { } } - /// Receive input from a remote node. + /// Receive input from a remote node. The output contains an optional result + /// of the Common Subset algorithm - a set of proposed values - and a queue + /// of messages to be sent to remote nodes, or an error. pub fn on_input( &mut self, message: Input, - ) -> Result>, Error> { + ) -> Result, Error> { match message { Input::Broadcast(uid, bmessage) => { let mut instance_result = None; - let input_result = { + let input_result: Result>, Error> = { if let Some(broadcast_instance) = self.broadcast_instances.get(&uid) { broadcast_instance .handle_broadcast_message(&uid, bmessage) - .map(|(value, queue)| { - instance_result = value; + .map(|(opt_value, queue)| { + instance_result = opt_value; queue.into_iter().map(Output::Broadcast).collect() }) .map_err(Error::from) @@ -136,17 +136,24 @@ impl CommonSubset { Err(Error::NoSuchBroadcastInstance) } }; - if instance_result.is_some() { - self.on_broadcast_result(&uid)?; + let mut opt_message: Option = None; + if let Some(value) = instance_result { + self.broadcast_results.insert(uid.clone(), value); + opt_message = self.on_broadcast_result(&uid)?; } - input_result + input_result.map(|mut queue| { + if let Some(agreement_message) = opt_message { + // Append the message to agreement nodes to the common output queue. + queue.push_back(Output::Agreement(agreement_message)) + } + (None, queue) + }) } Input::Agreement(uid, amessage) => { // The result defaults to error. let mut result = Err(Error::NoSuchAgreementInstance); - // FIXME: send the message to the Agreement instance and if let Some(mut agreement_instance) = self.agreement_instances.get_mut(&uid) { // Optional output of agreement and outgoing agreement // messages to remote nodes. @@ -154,6 +161,7 @@ impl CommonSubset { // This instance has terminated and does not accept input. Ok((None, VecDeque::new())) } else { + // Send the message to the agreement instance. agreement_instance .on_input(uid.clone(), &amessage) .map_err(Error::from) @@ -164,11 +172,15 @@ impl CommonSubset { if let Some(b) = output { outgoing.append(&mut self.on_agreement_result(uid, b)); } - Ok(outgoing.into_iter().map(Output::Agreement).collect()) + Ok(( + self.try_agreement_completion(), + outgoing.into_iter().map(Output::Agreement).collect(), + )) } else { // error - result - .map(|(_, messages)| messages.into_iter().map(Output::Agreement).collect()) + result.map(|(_, messages)| { + (None, messages.into_iter().map(Output::Agreement).collect()) + }) } } } @@ -176,25 +188,20 @@ impl CommonSubset { /// Callback to be invoked on receipt of a returned value of the Agreement /// instance `uid`. - /// - /// FIXME: It is likely that only one `AgreementMessage` is required because - /// Figure 11 does not count the number of messages but the number of nodes - /// that sent messages. fn on_agreement_result(&mut self, uid: NodeUid, result: bool) -> VecDeque { let mut outgoing = VecDeque::new(); // Upon delivery of value 1 from at least N − f instances of BA, provide // input 0 to each instance of BA that has not yet been provided input. if result { self.agreement_results.insert(uid, result); - let results1: Vec = self.agreement_results - .iter() - .map(|(_, v)| *v) - .filter(|b| *b) - .collect(); + // The number of instances of BA that output 1. + let results1: usize = + self.agreement_results + .iter() + .fold(0, |count, (_, v)| if *v { count + 1 } else { count }); - if results1.len() >= self.num_nodes - self.num_faulty_nodes { - let instances = &mut self.agreement_instances; - for (_uid0, instance) in instances.iter_mut() { + if results1 >= self.num_nodes - self.num_faulty_nodes { + for instance in self.agreement_instances.values_mut() { if !instance.has_input() { outgoing.push_back(instance.set_input(false)); } @@ -204,24 +211,19 @@ impl CommonSubset { outgoing } - pub fn on_agreement_completion(&self) -> Option> { + fn try_agreement_completion(&self) -> Option> { // Once all instances of BA have completed, let C ⊂ [1..N] be // the indexes of each BA that delivered 1. Wait for the output // v_j for each RBC_j such that j∈C. Finally output ∪ j∈C v_j. - let instance_uids: HashSet = self.agreement_instances - .iter() - .map(|(k, _)| k.clone()) - .collect(); - let completed_uids: HashSet = self.agreement_results + if self.agreement_instances .iter() - .map(|(k, _)| k.clone()) - .collect(); - if instance_uids == completed_uids { - // All instances of Agreement that delivered `true`. - let delivered_1: HashSet = self.agreement_results + .all(|(_, instance)| instance.terminated()) + { + // All instances of Agreement that delivered `true` (or "1" in the paper). + let delivered_1: HashSet<&NodeUid> = self.agreement_results .iter() .filter(|(_, v)| **v) - .map(|(k, _)| k.clone()) + .map(|(k, _)| k) .collect(); // Results of Broadcast instances in `delivered_1` let broadcast_results: HashSet = self.broadcast_results