From f6384cf4c7dd510d49d05d6a88d71d493f2f1861 Mon Sep 17 00:00:00 2001 From: woocash2 <59764862+woocash2@users.noreply.github.com> Date: Wed, 30 Aug 2023 14:09:41 +0200 Subject: [PATCH] A0-3137: Split backup into submodules (#338) --- Cargo.lock | 2 +- README.md | 4 +- consensus/Cargo.toml | 2 +- .../{runway/backup.rs => backup/loader.rs} | 134 +++--------------- consensus/src/backup/mod.rs | 16 +++ consensus/src/backup/saver.rs | 102 +++++++++++++ consensus/src/lib.rs | 1 + consensus/src/runway/mod.rs | 4 +- consensus/src/testing/crash_recovery.rs | 2 +- 9 files changed, 142 insertions(+), 125 deletions(-) rename consensus/src/{runway/backup.rs => backup/loader.rs} (81%) create mode 100644 consensus/src/backup/mod.rs create mode 100644 consensus/src/backup/saver.rs diff --git a/Cargo.lock b/Cargo.lock index b0a7f94c..79c1409b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,7 +43,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.27.0" +version = "0.28.0" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/README.md b/README.md index 115ff4f0..d4bd1a50 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ ### Overview -AlephBFT is an asynchronous and Byzantine fault tolerant consensus protocol aimed +AlephBFT is an asynchronous and Byzantine fault-tolerant consensus protocol aimed at ordering arbitrary messages (transactions). It has been designed to operate continuously under conditions where there is no bound on message-delivery delay and under the assumption that there is a significant probability of malicious @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details]. - Import AlephBFT in your crate ```toml [dependencies] - aleph-bft = "^0.27" + aleph-bft = "^0.28" ``` - The main entry point is the `run_session` function, which returns a Future that runs the consensus algorithm. diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 6bc39b23..629b1fef 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.27.0" +version = "0.28.0" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/runway/backup.rs b/consensus/src/backup/loader.rs similarity index 81% rename from consensus/src/runway/backup.rs rename to consensus/src/backup/loader.rs index dbe2fc7f..15c99bef 100644 --- a/consensus/src/runway/backup.rs +++ b/consensus/src/backup/loader.rs @@ -1,29 +1,18 @@ -use crate::{ - units::{UncheckedSignedUnit, UnitCoord}, - Data, Hasher, Keychain, MultiKeychain, NodeIndex, Receiver, Round, Sender, SessionId, - Terminator, -}; +use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData}; -use crate::alerts::AlertData; -use codec::{Decode, Encode, Error as CodecError}; -use futures::{channel::oneshot, FutureExt, StreamExt}; +use codec::{Decode, Error as CodecError}; +use futures::channel::oneshot; use itertools::{Either, Itertools}; -use log::{debug, error, info, warn}; -use std::{ - collections::HashSet, - fmt, - fmt::Debug, - io::{Read, Write}, - marker::PhantomData, -}; +use log::{error, info, warn}; -const LOG_TARGET: &str = "AlephBFT-backup"; +use crate::{ + alerts::AlertData, + backup::BackupItem, + units::{UncheckedSignedUnit, UnitCoord}, + Data, Hasher, Keychain, MultiKeychain, NodeIndex, Round, SessionId, +}; -#[derive(Clone, Debug, Decode, Encode, PartialEq)] -pub enum BackupItem { - Unit(UncheckedSignedUnit), - AlertData(AlertData), -} +const LOG_TARGET: &str = "AlephBFT-backup-loader"; /// Backup read error. Could be either caused by io error from `BackupReader`, or by decoding. #[derive(Debug)] @@ -270,110 +259,21 @@ impl BackupLoader { } } -/// Component responsible for saving units and alert data into backup. -/// It waits for items to appear on its receivers, and writes them to backup. -/// It announces a successful write through an appropriate response sender. -pub struct BackupSaver { - units_from_runway: Receiver>, - data_from_alerter: Receiver>, - responses_for_runway: Sender>, - responses_for_alerter: Sender>, - backup: W, -} - -impl BackupSaver { - pub fn new( - units_from_runway: Receiver>, - data_from_alerter: Receiver>, - responses_for_runway: Sender>, - responses_for_alerter: Sender>, - backup: W, - ) -> BackupSaver { - BackupSaver { - units_from_runway, - data_from_alerter, - responses_for_runway, - responses_for_alerter, - backup, - } - } - - pub fn save_item(&mut self, item: BackupItem) -> Result<(), std::io::Error> { - self.backup.write_all(&item.encode())?; - self.backup.flush()?; - Ok(()) - } - - pub async fn run(&mut self, mut terminator: Terminator) { - let mut terminator_exit = false; - loop { - futures::select! { - unit = self.units_from_runway.next() => { - let unit = match unit { - Some(unit) => unit, - None => { - error!(target: LOG_TARGET, "receiver of units to save closed early"); - break; - }, - }; - let item = BackupItem::Unit(unit.clone()); - if let Err(e) = self.save_item(item) { - error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); - break; - } - if self.responses_for_runway.unbounded_send(unit).is_err() { - error!(target: LOG_TARGET, "couldn't respond with saved unit to runway"); - break; - } - }, - data = self.data_from_alerter.next() => { - let data = match data { - Some(data) => data, - None => { - error!(target: LOG_TARGET, "receiver of alert data to save closed early"); - break; - }, - }; - let item = BackupItem::AlertData(data.clone()); - if let Err(e) = self.save_item(item) { - error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); - break; - } - if self.responses_for_alerter.unbounded_send(data).is_err() { - error!(target: LOG_TARGET, "couldn't respond with saved alert data to runway"); - break; - } - } - _ = terminator.get_exit().fuse() => { - debug!(target: LOG_TARGET, "backup saver received exit signal."); - terminator_exit = true; - } - } - - if terminator_exit { - debug!(target: LOG_TARGET, "backup saver decided to exit."); - terminator.terminate_sync().await; - break; - } - } - } -} - #[cfg(test)] mod tests { + use codec::Encode; + use futures::channel::oneshot; + + use aleph_bft_mock::{Data, Hasher64, Keychain, Loader, Signature}; + use crate::{ - runway::backup::{BackupItem, BackupLoader}, + backup::{loader::LoadedData, BackupItem, BackupLoader}, units::{ create_units, creator_set, preunit_to_unchecked_signed_unit, preunit_to_unit, UncheckedSignedUnit as GenericUncheckedSignedUnit, }, NodeCount, NodeIndex, Round, SessionId, }; - use aleph_bft_mock::{Data, Hasher64, Keychain, Loader, Signature}; - use codec::Encode; - use futures::channel::oneshot; - - use crate::runway::backup::LoadedData; type UncheckedSignedUnit = GenericUncheckedSignedUnit; type TestBackupItem = BackupItem; diff --git a/consensus/src/backup/mod.rs b/consensus/src/backup/mod.rs new file mode 100644 index 00000000..3c9cb284 --- /dev/null +++ b/consensus/src/backup/mod.rs @@ -0,0 +1,16 @@ +use codec::{Decode, Encode}; +use std::fmt::Debug; + +pub use loader::{BackupLoader, LoadedData}; +pub use saver::BackupSaver; + +use crate::{alerts::AlertData, units::UncheckedSignedUnit, Data, Hasher, MultiKeychain}; + +mod loader; +mod saver; + +#[derive(Clone, Debug, Decode, Encode, PartialEq)] +pub enum BackupItem { + Unit(UncheckedSignedUnit), + AlertData(AlertData), +} diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs new file mode 100644 index 00000000..c24af89e --- /dev/null +++ b/consensus/src/backup/saver.rs @@ -0,0 +1,102 @@ +use crate::{ + units::UncheckedSignedUnit, Data, Hasher, MultiKeychain, Receiver, Sender, Terminator, +}; + +use crate::alerts::AlertData; +use codec::Encode; +use futures::{FutureExt, StreamExt}; + +use crate::backup::BackupItem; +use log::{debug, error}; +use std::io::Write; + +const LOG_TARGET: &str = "AlephBFT-backup-saver"; + +/// Component responsible for saving units and alert data into backup. +/// It waits for items to appear on its receivers, and writes them to backup. +/// It announces a successful write through an appropriate response sender. +pub struct BackupSaver { + units_from_runway: Receiver>, + data_from_alerter: Receiver>, + responses_for_runway: Sender>, + responses_for_alerter: Sender>, + backup: W, +} + +impl BackupSaver { + pub fn new( + units_from_runway: Receiver>, + data_from_alerter: Receiver>, + responses_for_runway: Sender>, + responses_for_alerter: Sender>, + backup: W, + ) -> BackupSaver { + BackupSaver { + units_from_runway, + data_from_alerter, + responses_for_runway, + responses_for_alerter, + backup, + } + } + + pub fn save_item(&mut self, item: BackupItem) -> Result<(), std::io::Error> { + self.backup.write_all(&item.encode())?; + self.backup.flush()?; + Ok(()) + } + + pub async fn run(&mut self, mut terminator: Terminator) { + let mut terminator_exit = false; + loop { + futures::select! { + unit = self.units_from_runway.next() => { + let unit = match unit { + Some(unit) => unit, + None => { + error!(target: LOG_TARGET, "receiver of units to save closed early"); + break; + }, + }; + let item = BackupItem::Unit(unit.clone()); + if let Err(e) = self.save_item(item) { + error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); + break; + } + if self.responses_for_runway.unbounded_send(unit).is_err() { + error!(target: LOG_TARGET, "couldn't respond with saved unit to runway"); + break; + } + }, + data = self.data_from_alerter.next() => { + let data = match data { + Some(data) => data, + None => { + error!(target: LOG_TARGET, "receiver of alert data to save closed early"); + break; + }, + }; + let item = BackupItem::AlertData(data.clone()); + if let Err(e) = self.save_item(item) { + error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); + break; + } + if self.responses_for_alerter.unbounded_send(data).is_err() { + error!(target: LOG_TARGET, "couldn't respond with saved alert data to runway"); + break; + } + } + _ = terminator.get_exit().fuse() => { + debug!(target: LOG_TARGET, "backup saver received exit signal."); + terminator_exit = true; + } + } + + if terminator_exit { + debug!(target: LOG_TARGET, "backup saver decided to exit."); + terminator.terminate_sync().await; + break; + } + } + } +} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 5a6903e5..8958d441 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -14,6 +14,7 @@ mod terminal; mod terminator; mod units; +mod backup; mod task_queue; #[cfg(test)] mod testing; diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index d83f1728..21233204 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -26,12 +26,10 @@ use std::{ time::Duration, }; -mod backup; mod collection; mod packer; -use crate::runway::backup::{BackupLoader, BackupSaver, LoadedData}; -pub use backup::BackupItem; +use crate::backup::{BackupLoader, BackupSaver, LoadedData}; #[cfg(feature = "initial_unit_collection")] use collection::{Collection, IO as CollectionIO}; pub use collection::{NewestUnitResponse, Salt}; diff --git a/consensus/src/testing/crash_recovery.rs b/consensus/src/testing/crash_recovery.rs index 936831ed..731d1caa 100644 --- a/consensus/src/testing/crash_recovery.rs +++ b/consensus/src/testing/crash_recovery.rs @@ -1,5 +1,5 @@ use crate::{ - runway::BackupItem, + backup::BackupItem, testing::{init_log, spawn_honest_member, HonestMember, Network, ReconnectSender}, units::UnitCoord, NodeCount, NodeIndex, SpawnHandle, TaskHandle,