Skip to content

Commit

Permalink
Backup saving in alerter
Browse files Browse the repository at this point in the history
  • Loading branch information
woocash2 committed Aug 18, 2023
1 parent e8429d7 commit 149ab46
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 50 deletions.
69 changes: 44 additions & 25 deletions consensus/src/alerts/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::{
alerts::{
handler::Handler, Alert, AlertMessage, AlerterResponse, ForkingNotification, NetworkMessage,
},
Data, Hasher, MultiKeychain, Multisigned, NodeCount, Receiver, Recipient, Sender, Terminator,
runway::BackupItem,
Data, Hasher, MultiKeychain, NodeCount, Receiver, Recipient, Sender, Terminator,
};
use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage, ReliableMulticast};
use futures::{channel::mpsc, FutureExt, StreamExt};
Expand All @@ -20,6 +21,8 @@ pub struct Service<H: Hasher, D: Data, MK: MultiKeychain> {
messages_for_rmc: Sender<RmcMessage<H::Hash, MK::Signature, MK::PartialMultisignature>>,
messages_from_rmc: Receiver<RmcMessage<H::Hash, MK::Signature, MK::PartialMultisignature>>,
keychain: MK,
items_for_backup: Sender<BackupItem<H, D, MK>>,
responses_from_backup: Receiver<BackupItem<H, D, MK>>,
exiting: bool,
}

Expand All @@ -31,6 +34,8 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
notifications_for_units: Sender<ForkingNotification<H, D, MK::Signature>>,
alerts_from_units: Receiver<Alert<H, D, MK::Signature>>,
n_members: NodeCount,
items_for_backup: Sender<BackupItem<H, D, MK>>,
responses_from_backup: Receiver<BackupItem<H, D, MK>>,
) -> Service<H, D, MK> {
let (messages_for_rmc, messages_from_us) = mpsc::unbounded();
let (messages_for_us, messages_from_rmc) = mpsc::unbounded();
Expand All @@ -52,6 +57,8 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
messages_for_rmc,
messages_from_rmc,
keychain,
items_for_backup,
responses_from_backup,
exiting: false,
}
}
Expand Down Expand Up @@ -110,7 +117,7 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
) {
match handler.on_message(message) {
Ok(Some(AlerterResponse::ForkAlert(alert, recipient))) => {
self.send_message_for_network(AlertMessage::ForkAlert(alert), recipient);
self.send_item_to_backup(BackupItem::NetworkAlert(alert, recipient));
}
Ok(Some(AlerterResponse::AlertRequest(hash, peer))) => {
let message = AlertMessage::AlertRequest(self.keychain.index(), hash);
Expand All @@ -137,31 +144,36 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
}
}

fn handle_alert_from_runway(
fn handle_response_from_backup(
&mut self,
handler: &mut Handler<H, D, MK>,
alert: Alert<H, D, MK::Signature>,
) {
let (message, recipient, hash) = handler.on_own_alert(alert);
self.send_message_for_network(message, recipient);
self.rmc.start_rmc(hash);
}

fn handle_message_from_rmc(
&mut self,
message: RmcMessage<H::Hash, MK::Signature, MK::PartialMultisignature>,
response: BackupItem<H, D, MK>,
) {
self.rmc_message_to_network(message)
match response {
BackupItem::OwnAlert(alert) => {
let (message, recipient, hash) = handler.on_own_alert(alert);
self.send_message_for_network(message, recipient);
self.rmc.start_rmc(hash);
}
BackupItem::NetworkAlert(alert, recipient) => {
self.send_message_for_network(AlertMessage::ForkAlert(alert), recipient);
}
BackupItem::MultiSignature(multisigned) => match handler.alert_confirmed(multisigned) {
Ok(notification) => self.send_notification_for_units(notification),
Err(error) => warn!(target: LOG_TARGET, "{}", error),
},
_ => {}
}
}

fn handle_multisigned(
&mut self,
handler: &mut Handler<H, D, MK>,
multisigned: Multisigned<H::Hash, MK>,
) {
match handler.alert_confirmed(multisigned) {
Ok(notification) => self.send_notification_for_units(notification),
Err(error) => warn!(target: LOG_TARGET, "{}", error),
fn send_item_to_backup(&mut self, item: BackupItem<H, D, MK>) {
if self.items_for_backup.unbounded_send(item).is_err() {
warn!(
target: LOG_TARGET,
"{:?} Channel for passing items to backup was closed",
self.keychain.index()
);
self.exiting = true;
}
}

Expand All @@ -176,20 +188,27 @@ impl<H: Hasher, D: Data, MK: MultiKeychain> Service<H, D, MK> {
}
},
alert = self.alerts_from_units.next() => match alert {
Some(alert) => self.handle_alert_from_runway(&mut handler, alert),
Some(alert) => self.send_item_to_backup(BackupItem::OwnAlert(alert)),
None => {
error!(target: LOG_TARGET, "{:?} Alert stream closed.", self.keychain.index());
break;
}
},
message = self.messages_from_rmc.next() => match message {
Some(message) => self.handle_message_from_rmc(message),
Some(message) => self.rmc_message_to_network(message),
None => {
error!(target: LOG_TARGET, "{:?} RMC message stream closed.", self.keychain.index());
break;
}
},
multisigned = self.rmc.next_multisigned_hash().fuse() => self.handle_multisigned(&mut handler, multisigned),
multisigned = self.rmc.next_multisigned_hash().fuse() => self.send_item_to_backup(BackupItem::MultiSignature(multisigned)),
response = self.responses_from_backup.next() => match response {
Some(item) => self.handle_response_from_backup(&mut handler, item),
None => {
error!(target: LOG_TARGET, "Receiver of responses from backup closed");
break;
}
},
_ = terminator.get_exit().fuse() => {
debug!(target: LOG_TARGET, "{:?} received exit signal", self.keychain.index());
self.exiting = true;
Expand Down
15 changes: 10 additions & 5 deletions consensus/src/runway/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use crate::{
alerts::Alert,
units::{UncheckedSignedUnit, UnitCoord},
Data, Hasher, MultiKeychain, Multisigned, NodeIndex, Receiver, Round, Sender, SessionId,
Terminator,
Terminator, UncheckedSigned,
};
use aleph_bft_types::Recipient;
use codec::{Decode, Encode, Error as CodecError};
use futures::{channel::oneshot, FutureExt, StreamExt};
use log::{debug, error, info, warn};
Expand All @@ -20,7 +21,11 @@ const LOG_TARGET: &str = "AlephBFT-backup";
#[derive(Clone, Debug, Decode, Encode, PartialEq)]
pub enum BackupItem<H: Hasher, D: Data, MK: MultiKeychain> {
Unit(UncheckedSignedUnit<H, D, MK::Signature>),
Alert(Alert<H, D, MK::Signature>),
OwnAlert(Alert<H, D, MK::Signature>),
NetworkAlert(
UncheckedSigned<Alert<H, D, MK::Signature>, MK::Signature>,
Recipient,
),
MultiSignature(Multisigned<H::Hash, MK>),
}

Expand Down Expand Up @@ -267,9 +272,9 @@ pub async fn run_loading_mechanism<'a, H: Hasher, D: Data, MK: MultiKeychain, R:
}
}

/// A task responsible for saving units into backup.
/// It waits for units to appear in `backup_units_from_runway`, and writes them to backup.
/// It announces a successful write through `backup_units_for_runway`.
/// A task responsible for saving units, alerts and multi signatures into backup.
/// It waits for items to appear in `incoming_backup_items`, and writes them to backup.
/// It announces a successful write through `outgoing_units_for_runway` or `outgoing_items_for_alerter`.
pub async fn run_saving_mechanism<'a, H: Hasher, D: Data, MK: MultiKeychain, W: Write>(
mut backup_writer: BackupWriter<W, H, D, MK>,
mut incoming_backup_items: Receiver<BackupItem<H, D, MK>>,
Expand Down
42 changes: 22 additions & 20 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,9 +876,22 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
MK: MultiKeychain,
SH: SpawnHandle,
{
let (tx_consensus, consensus_stream) = mpsc::unbounded();
let (consensus_sink, rx_consensus) = mpsc::unbounded();
let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded();
let (backup_items_for_saver, incoming_backup_items) = mpsc::unbounded();
let (backup_units_for_runway, backup_units_from_saver) = mpsc::unbounded();
let (backup_items_for_alerter, backup_items_from_saver) = mpsc::unbounded();

let backup_saver_terminator = terminator.add_offspring_connection("AlephBFT-backup-saver");
let backup_saver_handle = spawn_handle.spawn_essential("runway/backup_saver", async move {
backup::run_saving_mechanism(
runway_io.backup_writer,
incoming_backup_items,
backup_units_for_runway,
backup_items_for_alerter,
backup_saver_terminator,
)
.await;
});
let mut backup_saver_handle = backup_saver_handle.fuse();

let (alert_notifications_for_units, notifications_from_alerter) = mpsc::unbounded();
let (alerts_for_alerter, alerts_from_units) = mpsc::unbounded();
Expand All @@ -898,6 +911,8 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
alert_notifications_for_units,
alerts_from_units,
alert_config.n_members,
backup_items_for_saver.clone(),
backup_items_from_saver,
);
let alerter_handler = crate::alerts::Handler::new(alerter_keychain, alert_config);

Expand All @@ -908,6 +923,10 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
});
let mut alerter_handle = alerter_handle.fuse();

let (tx_consensus, consensus_stream) = mpsc::unbounded();
let (consensus_sink, rx_consensus) = mpsc::unbounded();
let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded();

let consensus_terminator = terminator.add_offspring_connection("AlephBFT-consensus");
let consensus_config = config.clone();
let consensus_spawner = spawn_handle.clone();
Expand All @@ -927,23 +946,6 @@ pub(crate) async fn run<H, D, US, UL, MK, DP, FH, SH>(
});
let mut consensus_handle = consensus_handle.fuse();

let (backup_items_for_saver, incoming_backup_items) = mpsc::unbounded();
let (backup_units_for_runway, backup_units_from_saver) = mpsc::unbounded();
let (backup_items_for_alerter, _backup_items_from_alerter) = mpsc::unbounded(); // todo: make use of backup in alerter

let backup_saver_terminator = terminator.add_offspring_connection("AlephBFT-backup-saver");
let backup_saver_handle = spawn_handle.spawn_essential("runway/backup_saver", async move {
backup::run_saving_mechanism(
runway_io.backup_writer,
incoming_backup_items,
backup_units_for_runway,
backup_items_for_alerter,
backup_saver_terminator,
)
.await;
});
let mut backup_saver_handle = backup_saver_handle.fuse();

let index = keychain.index();
let threshold = (keychain.node_count() * 2) / 3 + NodeCount(1);
let validator = Validator::new(
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/testing/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl TestCase {
let (alerts_for_alerter, alerts_from_units) = mpsc::unbounded();
let (exit_alerter, exit) = oneshot::channel();
let n_members = keychain.node_count();
let (dummy_backup_tx, dummy_backup_rx) = mpsc::unbounded();

let mut alerter_service = Service::new(
keychain,
Expand All @@ -223,6 +224,8 @@ impl TestCase {
notifications_for_units,
alerts_from_units,
n_members,
dummy_backup_tx,
dummy_backup_rx, // todo: include the actual backup service in the test
);
let alerter_handler = Handler::new(
keychain,
Expand Down

0 comments on commit 149ab46

Please sign in to comment.