Skip to content

Commit

Permalink
A0-3185: split rmc (#347)
Browse files Browse the repository at this point in the history
Split rmc into synchronous handler and service with single async method.
  • Loading branch information
woocash2 authored Oct 13, 2023
1 parent 47be40d commit ccb0a3a
Show file tree
Hide file tree
Showing 16 changed files with 1,034 additions and 718 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.30"
aleph-bft = "^0.31"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
4 changes: 2 additions & 2 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.30.0"
version = "0.31.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand All @@ -13,7 +13,7 @@ readme = "../README.md"
description = "AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed at ordering arbitrary messages (transactions). It has been designed to continuously operate even in the harshest conditions: with no bounds on message-delivery delays and in the presence of malicious actors. This makes it an excellent fit for blockchain-related applications."

[dependencies]
aleph-bft-rmc = { path = "../rmc", version = "0.10" }
aleph-bft-rmc = { path = "../rmc", version = "0.11" }
aleph-bft-types = { path = "../types", version = "0.10" }
anyhow = "1.0"
async-trait = "0.1"
Expand Down
12 changes: 6 additions & 6 deletions consensus/src/alerts/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,13 @@ mod tests {
use crate::{
alerts::{
handler::{Error, Handler, RmcResponse},
Alert, AlertMessage, ForkProof, ForkingNotification, RmcMessage,
Alert, AlertMessage, ForkProof, ForkingNotification,
},
units::{ControlHash, FullUnit, PreUnit},
PartiallyMultisigned, Recipient, Round,
};
use aleph_bft_mock::{Data, Hasher64, Keychain, Signature};
use aleph_bft_rmc::Message;
use aleph_bft_types::{NodeCount, NodeIndex, NodeMap, Signable, Signed};

type TestForkProof = ForkProof<Hasher64, Data, Signature>;
Expand Down Expand Up @@ -375,8 +376,7 @@ mod tests {
let alert_hash = Signable::hash(&alert);
let signed_alert_hash =
Signed::sign_with_index(alert_hash, &alerter_keychain).into_unchecked();
let response =
this.on_rmc_message(alerter_index, RmcMessage::SignedHash(signed_alert_hash));
let response = this.on_rmc_message(alerter_index, Message::SignedHash(signed_alert_hash));
assert_eq!(
response,
RmcResponse::AlertRequest(alert_hash, Recipient::Node(alerter_index),),
Expand Down Expand Up @@ -460,7 +460,7 @@ mod tests {
empty_alert_hash,
)),
);
let message = RmcMessage::MultisignedHash(multisigned_empty_alert_hash.into_unchecked());
let message = Message::MultisignedHash(multisigned_empty_alert_hash.into_unchecked());
assert_eq!(
this.on_rmc_message(other_honest_node, message.clone()),
RmcResponse::RmcMessage(message),
Expand Down Expand Up @@ -489,7 +489,7 @@ mod tests {
&keychains[double_committer.0],
);
}
let message = RmcMessage::MultisignedHash(multisigned_nonempty_alert_hash.into_unchecked());
let message = Message::MultisignedHash(multisigned_nonempty_alert_hash.into_unchecked());
assert_eq!(
this.on_network_alert(signed_nonempty_alert),
Err(Error::RepeatedAlert(double_committer, forker_index)),
Expand Down Expand Up @@ -547,7 +547,7 @@ mod tests {
&keychains[double_committer.0],
);
}
let message = RmcMessage::MultisignedHash(multisigned_nonempty_alert_hash.into_unchecked());
let message = Message::MultisignedHash(multisigned_nonempty_alert_hash.into_unchecked());
assert_eq!(
this.on_network_alert(signed_nonempty_alert),
Err(Error::RepeatedAlert(double_committer, forker_index)),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod handler;
mod service;

pub use handler::Handler;
pub use service::Service;
pub use service::{Service, IO};

pub type ForkProof<H, D, S> = (UncheckedSignedUnit<H, D, S>, UncheckedSignedUnit<H, D, S>);

Expand Down
133 changes: 59 additions & 74 deletions consensus/src/alerts/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,69 +5,73 @@ use crate::{
},
Data, Hasher, MultiKeychain, Multisigned, NodeIndex, Receiver, Recipient, Sender,
};
use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage, ReliableMulticast};
use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage};
use aleph_bft_types::Terminator;
use futures::{channel::mpsc, FutureExt, StreamExt};
use futures::{FutureExt, StreamExt};
use log::{debug, error, warn};
use std::{collections::HashMap, time};
use std::{collections::HashMap, time::Duration};

const LOG_TARGET: &str = "AlephBFT-alerter";
type RmcService<H, MK, S, M> =
aleph_bft_rmc::Service<H, MK, DoublingDelayScheduler<RmcMessage<H, S, M>>>;

pub struct Service<H: Hasher, D: Data, MK: MultiKeychain> {
messages_for_network: Sender<(NetworkMessage<H, D, MK>, Recipient)>,
messages_from_network: Receiver<NetworkMessage<H, D, MK>>,
notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
rmc: ReliableMulticast<H::Hash, MK>,
messages_for_rmc: Sender<RmcMessage<H::Hash, MK::Signature, MK::PartialMultisignature>>,
messages_from_rmc: Receiver<RmcMessage<H::Hash, MK::Signature, MK::PartialMultisignature>>,

data_for_backup: Sender<AlertData<H, D, MK>>,
responses_from_backup: Receiver<AlertData<H, D, MK>>,

own_alert_responses: HashMap<H::Hash, OnOwnAlertResponse<H, D, MK>>,
network_alert_responses: HashMap<H::Hash, OnNetworkAlertResponse<H, D, MK>>,
multisigned_notifications: HashMap<H::Hash, ForkingNotification<H, D, MK::Signature>>,

node_index: NodeIndex,
exiting: bool,
handler: Handler<H, D, MK>,
rmc_service: RmcService<H::Hash, MK, MK::Signature, MK::PartialMultisignature>,
}

pub struct IO<H: Hasher, D: Data, MK: MultiKeychain> {
pub messages_for_network: Sender<(NetworkMessage<H, D, MK>, Recipient)>,
pub messages_from_network: Receiver<NetworkMessage<H, D, MK>>,
pub notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
pub alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
pub data_for_backup: Sender<AlertData<H, D, MK>>,
pub responses_from_backup: Receiver<AlertData<H, D, MK>>,
}

impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
pub fn new(
keychain: MK,
messages_for_network: Sender<(NetworkMessage<H, D, MK>, Recipient)>,
messages_from_network: Receiver<NetworkMessage<H, D, MK>>,
notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
data_for_backup: Sender<AlertData<H, D, MK>>,
responses_from_backup: Receiver<AlertData<H, D, MK>>,
) -> Service<H, D, MK> {
let (messages_for_rmc, messages_from_us) = mpsc::unbounded();
let (messages_for_us, messages_from_rmc) = mpsc::unbounded();
pub fn new(keychain: MK, io: IO<H, D, MK>, handler: Handler<H, D, MK>) -> Service<H, D, MK> {
let IO {
messages_for_network,
messages_from_network,
notifications_for_units,
alerts_from_units,
data_for_backup,
responses_from_backup,
} = io;

let rmc = ReliableMulticast::new(
messages_from_us,
messages_for_us,
keychain.clone(),
DoublingDelayScheduler::new(time::Duration::from_millis(500)),
let node_index = keychain.index();
let rmc_handler = aleph_bft_rmc::Handler::new(keychain);
let rmc_service = aleph_bft_rmc::Service::new(
DoublingDelayScheduler::new(Duration::from_millis(500)),
rmc_handler,
);

Service {
messages_for_network,
messages_from_network,
notifications_for_units,
alerts_from_units,
rmc,
messages_for_rmc,
messages_from_rmc,
data_for_backup,
responses_from_backup,
own_alert_responses: HashMap::new(),
network_alert_responses: HashMap::new(),
multisigned_notifications: HashMap::new(),
node_index: keychain.index(),
node_index,
exiting: false,
handler,
rmc_service,
}
}

Expand Down Expand Up @@ -118,11 +122,10 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {

fn handle_message_from_network(
&mut self,
handler: &mut Handler<H, D, MK>,
message: AlertMessage<H, D, MK::Signature, MK::PartialMultisignature>,
) {
match message {
AlertMessage::ForkAlert(alert) => match handler.on_network_alert(alert.clone()) {
AlertMessage::ForkAlert(alert) => match self.handler.on_network_alert(alert.clone()) {
Ok(response) => {
let alert = alert.as_signable().clone();
self.network_alert_responses.insert(alert.hash(), response);
Expand All @@ -140,14 +143,10 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
Err(error) => debug!(target: LOG_TARGET, "{}", error),
},
AlertMessage::RmcMessage(sender, message) => {
match handler.on_rmc_message(sender, message) {
match self.handler.on_rmc_message(sender, message) {
RmcResponse::RmcMessage(message) => {
if self.messages_for_rmc.unbounded_send(message).is_err() {
warn!(
target: LOG_TARGET,
"Channel with messages for rmc should be open.",
);
self.exiting = true;
if let Some(multisigned) = self.rmc_service.process_message(message) {
self.handle_multisigned(multisigned);
}
}
RmcResponse::AlertRequest(hash, recipient) => {
Expand All @@ -157,21 +156,19 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
RmcResponse::Noop => {}
}
}
AlertMessage::AlertRequest(node, hash) => match handler.on_alert_request(node, hash) {
Ok((alert, recipient)) => {
self.send_message_for_network(AlertMessage::ForkAlert(alert), recipient);
AlertMessage::AlertRequest(node, hash) => {
match self.handler.on_alert_request(node, hash) {
Ok((alert, recipient)) => {
self.send_message_for_network(AlertMessage::ForkAlert(alert), recipient);
}
Err(error) => debug!(target: LOG_TARGET, "{}", error),
}
Err(error) => debug!(target: LOG_TARGET, "{}", error),
},
}
}
}

fn handle_alert_from_runway(
&mut self,
handler: &mut Handler<H, D, MK>,
alert: Alert<H, D, MK::Signature>,
) {
let response = handler.on_own_alert(alert.clone());
fn handle_alert_from_runway(&mut self, alert: Alert<H, D, MK::Signature>) {
let response = self.handler.on_own_alert(alert.clone());
self.own_alert_responses.insert(alert.hash(), response);
if self
.data_for_backup
Expand All @@ -182,19 +179,8 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
}
}

fn handle_message_from_rmc(
&mut self,
message: RmcMessage<H::Hash, MK::Signature, MK::PartialMultisignature>,
) {
self.rmc_message_to_network(message)
}

fn handle_multisigned(
&mut self,
handler: &mut Handler<H, D, MK>,
multisigned: Multisigned<H::Hash, MK>,
) {
match handler.alert_confirmed(multisigned.clone()) {
fn handle_multisigned(&mut self, multisigned: Multisigned<H::Hash, MK>) {
match self.handler.alert_confirmed(multisigned.clone()) {
Ok(notification) => {
self.multisigned_notifications
.insert(*multisigned.as_signable(), notification);
Expand All @@ -218,14 +204,18 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
AlertData::OwnAlert(alert) => match self.own_alert_responses.remove(&alert.hash()) {
Some((message, recipient, hash)) => {
self.send_message_for_network(message, recipient);
self.rmc.start_rmc(hash);
if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
self.handle_multisigned(multisigned);
}
}
None => warn!(target: LOG_TARGET, "Alert response missing from storage."),
},
AlertData::NetworkAlert(alert) => {
match self.network_alert_responses.remove(&alert.hash()) {
Some((maybe_notification, hash)) => {
self.rmc.start_rmc(hash);
if let Some(multisigned) = self.rmc_service.start_rmc(hash) {
self.handle_multisigned(multisigned);
}
if let Some(notification) = maybe_notification {
self.send_notification_for_units(notification);
}
Expand All @@ -251,31 +241,26 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
}
}

pub async fn run(&mut self, mut handler: Handler<H, D, MK>, mut terminator: Terminator) {
pub async fn run(&mut self, mut terminator: Terminator) {
loop {
futures::select! {
message = self.messages_from_network.next() => match message {
Some(message) => self.handle_message_from_network(&mut handler, message),
Some(message) => self.handle_message_from_network(message),
None => {
error!(target: LOG_TARGET, "Message stream closed.");
break;
}
},
alert = self.alerts_from_units.next() => match alert {
Some(alert) => self.handle_alert_from_runway(&mut handler, alert),
Some(alert) => self.handle_alert_from_runway(alert),
None => {
error!(target: LOG_TARGET, "Alert stream closed.");
break;
}
},
message = self.messages_from_rmc.next() => match message {
Some(message) => self.handle_message_from_rmc(message),
None => {
error!(target: LOG_TARGET, "RMC message stream closed.");
break;
}
message = self.rmc_service.next_message().fuse() => {
self.rmc_message_to_network(message);
},
multisigned = self.rmc.next_multisigned_hash().fuse() => self.handle_multisigned(&mut handler, multisigned),
item = self.responses_from_backup.next() => match item {
Some(item) => self.handle_data_from_backup(item),
None => {
Expand Down
Loading

0 comments on commit ccb0a3a

Please sign in to comment.