diff --git a/consensus/src/alerts/service.rs b/consensus/src/alerts/service.rs index 3373e78f..c29bed3b 100644 --- a/consensus/src/alerts/service.rs +++ b/consensus/src/alerts/service.rs @@ -2,7 +2,8 @@ use crate::{ alerts::{ handler::Handler, Alert, AlertMessage, AlerterResponse, ForkingNotification, NetworkMessage, }, - Data, Hasher, MultiKeychain, Multisigned, NodeCount, Receiver, Recipient, Sender, Terminator, + runway::BackupItem, + Data, Hasher, MultiKeychain, NodeCount, Receiver, Recipient, Sender, Terminator, }; use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage, ReliableMulticast}; use futures::{channel::mpsc, FutureExt, StreamExt}; @@ -20,6 +21,8 @@ pub struct Service { messages_for_rmc: Sender>, messages_from_rmc: Receiver>, keychain: MK, + items_for_backup: Sender>, + responses_from_backup: Receiver>, exiting: bool, } @@ -31,6 +34,8 @@ impl Service { notifications_for_units: Sender>, alerts_from_units: Receiver>, n_members: NodeCount, + items_for_backup: Sender>, + responses_from_backup: Receiver>, ) -> Service { let (messages_for_rmc, messages_from_us) = mpsc::unbounded(); let (messages_for_us, messages_from_rmc) = mpsc::unbounded(); @@ -52,6 +57,8 @@ impl Service { messages_for_rmc, messages_from_rmc, keychain, + items_for_backup, + responses_from_backup, exiting: false, } } @@ -110,7 +117,7 @@ impl Service { ) { match handler.on_message(message) { Ok(Some(AlerterResponse::ForkAlert(alert, recipient))) => { - self.send_message_for_network(AlertMessage::ForkAlert(alert), recipient); + self.send_item_to_backup(BackupItem::NetworkAlert(alert, recipient)); } Ok(Some(AlerterResponse::AlertRequest(hash, peer))) => { let message = AlertMessage::AlertRequest(self.keychain.index(), hash); @@ -137,31 +144,36 @@ impl Service { } } - fn handle_alert_from_runway( + fn handle_response_from_backup( &mut self, handler: &mut Handler, - alert: Alert, - ) { - let (message, recipient, hash) = handler.on_own_alert(alert); - self.send_message_for_network(message, recipient); - self.rmc.start_rmc(hash); - } - - fn handle_message_from_rmc( - &mut self, - message: RmcMessage, + response: BackupItem, ) { - self.rmc_message_to_network(message) + match response { + BackupItem::OwnAlert(alert) => { + let (message, recipient, hash) = handler.on_own_alert(alert); + self.send_message_for_network(message, recipient); + self.rmc.start_rmc(hash); + } + BackupItem::NetworkAlert(alert, recipient) => { + self.send_message_for_network(AlertMessage::ForkAlert(alert), recipient); + } + BackupItem::MultiSignature(multisigned) => match handler.alert_confirmed(multisigned) { + Ok(notification) => self.send_notification_for_units(notification), + Err(error) => warn!(target: LOG_TARGET, "{}", error), + }, + _ => {} + } } - fn handle_multisigned( - &mut self, - handler: &mut Handler, - multisigned: Multisigned, - ) { - match handler.alert_confirmed(multisigned) { - Ok(notification) => self.send_notification_for_units(notification), - Err(error) => warn!(target: LOG_TARGET, "{}", error), + fn send_item_to_backup(&mut self, item: BackupItem) { + if self.items_for_backup.unbounded_send(item).is_err() { + warn!( + target: LOG_TARGET, + "{:?} Channel for passing items to backup was closed", + self.keychain.index() + ); + self.exiting = true; } } @@ -176,20 +188,27 @@ impl Service { } }, alert = self.alerts_from_units.next() => match alert { - Some(alert) => self.handle_alert_from_runway(&mut handler, alert), + Some(alert) => self.send_item_to_backup(BackupItem::OwnAlert(alert)), None => { error!(target: LOG_TARGET, "{:?} Alert stream closed.", self.keychain.index()); break; } }, message = self.messages_from_rmc.next() => match message { - Some(message) => self.handle_message_from_rmc(message), + Some(message) => self.rmc_message_to_network(message), None => { error!(target: LOG_TARGET, "{:?} RMC message stream closed.", self.keychain.index()); break; } }, - multisigned = self.rmc.next_multisigned_hash().fuse() => self.handle_multisigned(&mut handler, multisigned), + multisigned = self.rmc.next_multisigned_hash().fuse() => self.send_item_to_backup(BackupItem::MultiSignature(multisigned)), + response = self.responses_from_backup.next() => match response { + Some(item) => self.handle_response_from_backup(&mut handler, item), + None => { + error!(target: LOG_TARGET, "Receiver of responses from backup closed"); + break; + } + }, _ = terminator.get_exit().fuse() => { debug!(target: LOG_TARGET, "{:?} received exit signal", self.keychain.index()); self.exiting = true; diff --git a/consensus/src/runway/backup.rs b/consensus/src/runway/backup.rs index 7f2f3f55..91590e5f 100644 --- a/consensus/src/runway/backup.rs +++ b/consensus/src/runway/backup.rs @@ -2,8 +2,9 @@ use crate::{ alerts::Alert, units::{UncheckedSignedUnit, UnitCoord}, Data, Hasher, MultiKeychain, Multisigned, NodeIndex, Receiver, Round, Sender, SessionId, - Terminator, + Terminator, UncheckedSigned, }; +use aleph_bft_types::Recipient; use codec::{Decode, Encode, Error as CodecError}; use futures::{channel::oneshot, FutureExt, StreamExt}; use log::{debug, error, info, warn}; @@ -20,7 +21,11 @@ const LOG_TARGET: &str = "AlephBFT-backup"; #[derive(Clone, Debug, Decode, Encode, PartialEq)] pub enum BackupItem { Unit(UncheckedSignedUnit), - Alert(Alert), + OwnAlert(Alert), + NetworkAlert( + UncheckedSigned, MK::Signature>, + Recipient, + ), MultiSignature(Multisigned), } @@ -267,9 +272,9 @@ pub async fn run_loading_mechanism<'a, H: Hasher, D: Data, MK: MultiKeychain, R: } } -/// A task responsible for saving units into backup. -/// It waits for units to appear in `backup_units_from_runway`, and writes them to backup. -/// It announces a successful write through `backup_units_for_runway`. +/// A task responsible for saving units, alerts and multi signatures into backup. +/// It waits for items to appear in `incoming_backup_items`, and writes them to backup. +/// It announces a successful write through `outgoing_units_for_runway` or `outgoing_items_for_alerter`. pub async fn run_saving_mechanism<'a, H: Hasher, D: Data, MK: MultiKeychain, W: Write>( mut backup_writer: BackupWriter, mut incoming_backup_items: Receiver>, diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 24e622eb..3eff51ea 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -876,9 +876,22 @@ pub(crate) async fn run( MK: MultiKeychain, SH: SpawnHandle, { - let (tx_consensus, consensus_stream) = mpsc::unbounded(); - let (consensus_sink, rx_consensus) = mpsc::unbounded(); - let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded(); + let (backup_items_for_saver, incoming_backup_items) = mpsc::unbounded(); + let (backup_units_for_runway, backup_units_from_saver) = mpsc::unbounded(); + let (backup_items_for_alerter, backup_items_from_saver) = mpsc::unbounded(); + + let backup_saver_terminator = terminator.add_offspring_connection("AlephBFT-backup-saver"); + let backup_saver_handle = spawn_handle.spawn_essential("runway/backup_saver", async move { + backup::run_saving_mechanism( + runway_io.backup_writer, + incoming_backup_items, + backup_units_for_runway, + backup_items_for_alerter, + backup_saver_terminator, + ) + .await; + }); + let mut backup_saver_handle = backup_saver_handle.fuse(); let (alert_notifications_for_units, notifications_from_alerter) = mpsc::unbounded(); let (alerts_for_alerter, alerts_from_units) = mpsc::unbounded(); @@ -898,6 +911,8 @@ pub(crate) async fn run( alert_notifications_for_units, alerts_from_units, alert_config.n_members, + backup_items_for_saver.clone(), + backup_items_from_saver, ); let alerter_handler = crate::alerts::Handler::new(alerter_keychain, alert_config); @@ -908,6 +923,10 @@ pub(crate) async fn run( }); let mut alerter_handle = alerter_handle.fuse(); + let (tx_consensus, consensus_stream) = mpsc::unbounded(); + let (consensus_sink, rx_consensus) = mpsc::unbounded(); + let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded(); + let consensus_terminator = terminator.add_offspring_connection("AlephBFT-consensus"); let consensus_config = config.clone(); let consensus_spawner = spawn_handle.clone(); @@ -927,23 +946,6 @@ pub(crate) async fn run( }); let mut consensus_handle = consensus_handle.fuse(); - let (backup_items_for_saver, incoming_backup_items) = mpsc::unbounded(); - let (backup_units_for_runway, backup_units_from_saver) = mpsc::unbounded(); - let (backup_items_for_alerter, _backup_items_from_alerter) = mpsc::unbounded(); // todo: make use of backup in alerter - - let backup_saver_terminator = terminator.add_offspring_connection("AlephBFT-backup-saver"); - let backup_saver_handle = spawn_handle.spawn_essential("runway/backup_saver", async move { - backup::run_saving_mechanism( - runway_io.backup_writer, - incoming_backup_items, - backup_units_for_runway, - backup_items_for_alerter, - backup_saver_terminator, - ) - .await; - }); - let mut backup_saver_handle = backup_saver_handle.fuse(); - let index = keychain.index(); let threshold = (keychain.node_count() * 2) / 3 + NodeCount(1); let validator = Validator::new( diff --git a/consensus/src/testing/alerts.rs b/consensus/src/testing/alerts.rs index 04b0a5ad..cfd3ef9b 100644 --- a/consensus/src/testing/alerts.rs +++ b/consensus/src/testing/alerts.rs @@ -215,6 +215,7 @@ impl TestCase { let (alerts_for_alerter, alerts_from_units) = mpsc::unbounded(); let (exit_alerter, exit) = oneshot::channel(); let n_members = keychain.node_count(); + let (dummy_backup_tx, dummy_backup_rx) = mpsc::unbounded(); let mut alerter_service = Service::new( keychain, @@ -223,6 +224,8 @@ impl TestCase { notifications_for_units, alerts_from_units, n_members, + dummy_backup_tx, + dummy_backup_rx, // todo: include the actual backup service in the test ); let alerter_handler = Handler::new( keychain,