From 1173cf678fd111e832b286f34d45260cea7fffed Mon Sep 17 00:00:00 2001 From: timorleph <145755355+timorleph@users.noreply.github.com> Date: Fri, 29 Nov 2024 14:32:21 +0100 Subject: [PATCH] Move response logic out of runway (#496) --- Cargo.lock | 2 +- consensus/Cargo.toml | 2 +- consensus/src/dissemination/mod.rs | 25 ++ consensus/src/dissemination/responder.rs | 338 +++++++++++++++++++++++ consensus/src/extension/election.rs | 34 ++- consensus/src/extension/extender.rs | 8 +- consensus/src/extension/units.rs | 26 +- consensus/src/lib.rs | 1 + consensus/src/member.rs | 8 +- consensus/src/runway/collection.rs | 12 +- consensus/src/runway/mod.rs | 121 ++------ consensus/src/units/testing.rs | 65 +++-- 12 files changed, 487 insertions(+), 155 deletions(-) create mode 100644 consensus/src/dissemination/mod.rs create mode 100644 consensus/src/dissemination/responder.rs diff --git a/Cargo.lock b/Cargo.lock index 645ebdb1..ac297e40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.38.1" +version = "0.38.2" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 75bd3dbf..819c8983 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.38.1" +version = "0.38.2" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/dissemination/mod.rs b/consensus/src/dissemination/mod.rs new file mode 100644 index 00000000..81a788cf --- /dev/null +++ b/consensus/src/dissemination/mod.rs @@ -0,0 +1,25 @@ +use crate::{ + runway::{NewestUnitResponse, Salt}, + units::{UncheckedSignedUnit, UnitCoord}, + Data, Hasher, NodeIndex, Signature, UncheckedSigned, +}; + +mod responder; + +pub use responder::Responder; + +/// Possible requests for information from other nodes. +#[derive(Debug)] +pub enum Request { + Coord(UnitCoord), + Parents(H::Hash), + NewestUnit(NodeIndex, Salt), +} + +/// Responses to requests. +#[derive(Debug)] +pub enum Response { + Coord(UncheckedSignedUnit), + Parents(H::Hash, Vec>), + NewestUnit(UncheckedSigned, S>), +} diff --git a/consensus/src/dissemination/responder.rs b/consensus/src/dissemination/responder.rs new file mode 100644 index 00000000..ae01602e --- /dev/null +++ b/consensus/src/dissemination/responder.rs @@ -0,0 +1,338 @@ +use crate::{ + dag::DagUnit, + dissemination::{Request, Response}, + runway::{NewestUnitResponse, Salt}, + units::{UnitCoord, UnitStore, UnitWithParents, WrappedUnit}, + Data, Hasher, MultiKeychain, NodeIndex, Signed, +}; +use std::marker::PhantomData; +use thiserror::Error; + +/// A responder that is able to answer requests for data about units. +pub struct Responder { + keychain: MK, + _phantom: PhantomData<(H, D)>, +} + +/// Ways in which it can be impossible for us to respond to a request. +#[derive(Eq, Error, Debug, PartialEq)] +pub enum Error { + #[error("no canonical unit at {0}")] + NoCanonicalAt(UnitCoord), + #[error("unit with hash {0:?} not known")] + UnknownUnit(H::Hash), +} + +impl Responder { + /// Create a new responder. + pub fn new(keychain: MK) -> Self { + Responder { + keychain, + _phantom: PhantomData, + } + } + + fn index(&self) -> NodeIndex { + self.keychain.index() + } + + fn on_request_coord( + &self, + coord: UnitCoord, + units: &UnitStore>, + ) -> Result, Error> { + units + .canonical_unit(coord) + .map(|unit| Response::Coord(unit.clone().unpack().into())) + .ok_or(Error::NoCanonicalAt(coord)) + } + + fn on_request_parents( + &self, + hash: H::Hash, + units: &UnitStore>, + ) -> Result, Error> { + units + .unit(&hash) + .map(|unit| { + let parents = unit + .parents() + .values() + .map(|parent_hash| { + units + .unit(parent_hash) + .expect("Units are added to the store in order.") + .clone() + .unpack() + .into_unchecked() + }) + .collect(); + Response::Parents(hash, parents) + }) + .ok_or(Error::UnknownUnit(hash)) + } + + fn on_request_newest( + &self, + requester: NodeIndex, + salt: Salt, + units: &UnitStore>, + ) -> Response { + let unit = units + .canonical_units(requester) + .last() + .map(|unit| unit.clone().unpack().into_unchecked()); + let response = NewestUnitResponse::new(requester, self.index(), unit, salt); + + let signed_response = Signed::sign(response, &self.keychain).into_unchecked(); + Response::NewestUnit(signed_response) + } + + /// Handle an incoming request returning either the appropriate response or an error if we + /// aren't able to help. + pub fn handle_request( + &self, + request: Request, + units: &UnitStore>, + ) -> Result, Error> { + use Request::*; + match request { + Coord(coord) => self.on_request_coord(coord, units), + Parents(hash) => self.on_request_parents(hash, units), + NewestUnit(node_id, salt) => Ok(self.on_request_newest(node_id, salt, units)), + } + } +} + +#[cfg(test)] +mod test { + use crate::{ + dissemination::{ + responder::{Error, Responder}, + Request, Response, + }, + units::{ + random_full_parent_reconstrusted_units_up_to, TestingDagUnit, Unit, UnitCoord, + UnitStore, UnitWithParents, WrappedUnit, + }, + NodeCount, NodeIndex, + }; + use aleph_bft_mock::{Data, Hasher64, Keychain}; + use std::iter::zip; + + const NODE_ID: NodeIndex = NodeIndex(0); + const NODE_COUNT: NodeCount = NodeCount(7); + + fn setup() -> ( + Responder, + UnitStore, + Vec, + ) { + let keychains = Keychain::new_vec(NODE_COUNT); + ( + Responder::new(keychains[NODE_ID.0]), + UnitStore::new(NODE_COUNT), + keychains, + ) + } + + #[test] + fn empty_fails_to_respond_to_coords() { + let (responder, store, _) = setup(); + let coord = UnitCoord::new(0, NodeIndex(1)); + let request = Request::Coord(coord); + match responder.handle_request(request, &store) { + Ok(response) => panic!("Unexpected response: {:?}.", response), + Err(err) => assert_eq!(err, Error::NoCanonicalAt(coord)), + } + } + + #[test] + fn empty_fails_to_respond_to_parents() { + let (responder, store, keychains) = setup(); + let session_id = 2137; + let hash = + random_full_parent_reconstrusted_units_up_to(1, NODE_COUNT, session_id, &keychains) + .last() + .expect("just created this round") + .last() + .expect("the round has at least one unit") + .hash(); + let request = Request::Parents(hash); + match responder.handle_request(request, &store) { + Ok(response) => panic!("Unexpected response: {:?}.", response), + Err(err) => assert_eq!(err, Error::UnknownUnit(hash)), + } + } + + #[test] + fn empty_newest_responds_with_no_units() { + let (responder, store, keychains) = setup(); + let requester = NodeIndex(1); + let request = Request::NewestUnit(requester, rand::random()); + let response = responder + .handle_request(request, &store) + .expect("newest unit requests always get a response"); + match response { + Response::NewestUnit(newest_unit_response) => { + let checked_newest_unit_response = newest_unit_response + .check(&keychains[NODE_ID.0]) + .expect("should sign correctly"); + assert_eq!( + checked_newest_unit_response.as_signable().requester(), + requester + ); + assert!(checked_newest_unit_response + .as_signable() + .included_data() + .is_empty()); + } + other => panic!("Unexpected response: {:?}.", other), + } + } + + #[test] + fn responds_to_coords_when_possible() { + let (responder, mut store, keychains) = setup(); + let session_id = 2137; + let coord = UnitCoord::new(3, NodeIndex(1)); + let units = random_full_parent_reconstrusted_units_up_to( + coord.round() + 1, + NODE_COUNT, + session_id, + &keychains, + ); + for round_units in &units { + for unit in round_units { + store.insert(unit.clone()); + } + } + let request = Request::Coord(coord); + let response = responder + .handle_request(request, &store) + .expect("should successfully respond"); + match response { + Response::Coord(unit) => assert_eq!( + unit, + units[coord.round() as usize][coord.creator().0] + .clone() + .unpack() + .into_unchecked() + ), + other => panic!("Unexpected response: {:?}.", other), + } + } + + #[test] + fn fails_to_responds_to_too_new_coords() { + let (responder, mut store, keychains) = setup(); + let session_id = 2137; + let coord = UnitCoord::new(3, NodeIndex(1)); + let units = random_full_parent_reconstrusted_units_up_to( + coord.round() - 1, + NODE_COUNT, + session_id, + &keychains, + ); + for round_units in &units { + for unit in round_units { + store.insert(unit.clone()); + } + } + let request = Request::Coord(coord); + match responder.handle_request(request, &store) { + Ok(response) => panic!("Unexpected response: {:?}.", response), + Err(err) => assert_eq!(err, Error::NoCanonicalAt(coord)), + } + } + + #[test] + fn responds_to_parents_when_possible() { + let (responder, mut store, keychains) = setup(); + let session_id = 2137; + let units = + random_full_parent_reconstrusted_units_up_to(5, NODE_COUNT, session_id, &keychains); + for round_units in &units { + for unit in round_units { + store.insert(unit.clone()); + } + } + let requested_unit = units + .last() + .expect("just created this round") + .last() + .expect("the round has at least one unit") + .clone(); + let request = Request::Parents(requested_unit.hash()); + let response = responder + .handle_request(request, &store) + .expect("should successfully respond"); + match response { + Response::Parents(response_hash, parents) => { + assert_eq!(response_hash, requested_unit.hash()); + assert_eq!(parents.len(), requested_unit.parents().size().0); + for (parent, parent_hash) in zip(parents, requested_unit.parents().values()) { + assert_eq!(&parent.as_signable().hash(), parent_hash); + } + } + other => panic!("Unexpected response: {:?}.", other), + } + } + + #[test] + fn fails_to_respond_to_unknown_parents() { + let (responder, mut store, keychains) = setup(); + let session_id = 2137; + let units = + random_full_parent_reconstrusted_units_up_to(5, NODE_COUNT, session_id, &keychains); + for round_units in &units { + for unit in round_units { + store.insert(unit.clone()); + } + } + let hash = + random_full_parent_reconstrusted_units_up_to(1, NODE_COUNT, session_id, &keychains) + .last() + .expect("just created this round") + .last() + .expect("the round has at least one unit") + .hash(); + let request = Request::Parents(hash); + match responder.handle_request(request, &store) { + Ok(response) => panic!("Unexpected response: {:?}.", response), + Err(err) => assert_eq!(err, Error::UnknownUnit(hash)), + } + } + + #[test] + fn responds_to_existing_newest() { + let (responder, mut store, keychains) = setup(); + let session_id = 2137; + let units = + random_full_parent_reconstrusted_units_up_to(5, NODE_COUNT, session_id, &keychains); + for round_units in &units { + for unit in round_units { + store.insert(unit.clone()); + } + } + let requester = NodeIndex(1); + let request = Request::NewestUnit(requester, rand::random()); + let response = responder + .handle_request(request, &store) + .expect("newest unit requests always get a response"); + match response { + Response::NewestUnit(newest_unit_response) => { + let checked_newest_unit_response = newest_unit_response + .check(&keychains[NODE_ID.0]) + .expect("should sign correctly"); + assert_eq!( + checked_newest_unit_response.as_signable().requester(), + requester + ); + // unfortunately there is no easy way to check whether the response contains a unit + // with its API :/ + } + other => panic!("Unexpected response: {:?}.", other), + } + } +} diff --git a/consensus/src/extension/election.rs b/consensus/src/extension/election.rs index f409676d..348fd31b 100644 --- a/consensus/src/extension/election.rs +++ b/consensus/src/extension/election.rs @@ -222,6 +222,7 @@ mod test { }, NodeCount, }; + use aleph_bft_mock::Keychain; #[test] fn refuses_to_elect_without_units() { @@ -235,9 +236,10 @@ mod test { let n_members = NodeCount(4); let max_round = 2; let session_id = 2137; - for round_units in - random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) - { + let keychains = Keychain::new_vec(n_members); + for round_units in random_full_parent_reconstrusted_units_up_to( + max_round, n_members, session_id, &keychains, + ) { for unit in round_units { units.add_unit(unit); } @@ -252,7 +254,10 @@ mod test { let n_members = NodeCount(4); let max_round = 4; let session_id = 2137; - let dag = random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id); + let keychains = Keychain::new_vec(n_members); + let dag = random_full_parent_reconstrusted_units_up_to( + max_round, n_members, session_id, &keychains, + ); for round_units in dag.iter().take(4) { for unit in round_units { units.add_unit(unit.clone()); @@ -280,9 +285,10 @@ mod test { let n_members = NodeCount(4); let max_round = 4; let session_id = 2137; - for round_units in - random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) - { + let keychains = Keychain::new_vec(n_members); + for round_units in random_full_parent_reconstrusted_units_up_to( + max_round, n_members, session_id, &keychains, + ) { for unit in round_units { units.add_unit(unit.clone()); } @@ -303,9 +309,11 @@ mod test { let n_members = NodeCount(4); let max_round = 4; let session_id = 2137; - for unit in random_full_parent_reconstrusted_units_up_to(0, n_members, session_id) - .last() - .expect("just created") + let keychains = Keychain::new_vec(n_members); + for unit in + random_full_parent_reconstrusted_units_up_to(0, n_members, session_id, &keychains) + .last() + .expect("just created") { units.add_unit(unit.clone()); } @@ -332,7 +340,11 @@ mod test { .into_iterator() .filter(|node_id| node_id != &inactive_node) { - units.add_unit(random_reconstructed_unit_with_parents(creator, &parents)); + units.add_unit(random_reconstructed_unit_with_parents( + creator, + &parents, + &keychains[creator.0], + )); } } let election = RoundElection::for_round(0, &units).expect("we have enough rounds"); diff --git a/consensus/src/extension/extender.rs b/consensus/src/extension/extender.rs index 222f15f6..6ea920db 100644 --- a/consensus/src/extension/extender.rs +++ b/consensus/src/extension/extender.rs @@ -74,6 +74,7 @@ mod test { extension::extender::Extender, units::random_full_parent_reconstrusted_units_up_to, NodeCount, Round, }; + use aleph_bft_mock::Keychain; #[test] fn easy_elections() { @@ -81,10 +82,11 @@ mod test { let n_members = NodeCount(4); let max_round: Round = 43; let session_id = 2137; + let keychains = Keychain::new_vec(n_members); let mut batches = Vec::new(); - for round_units in - random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) - { + for round_units in random_full_parent_reconstrusted_units_up_to( + max_round, n_members, session_id, &keychains, + ) { for unit in round_units { batches.append(&mut extender.add_unit(unit)); } diff --git a/consensus/src/extension/units.rs b/consensus/src/extension/units.rs index 4ede12ab..ca2b42f6 100644 --- a/consensus/src/extension/units.rs +++ b/consensus/src/extension/units.rs @@ -86,6 +86,7 @@ mod test { units::{random_full_parent_reconstrusted_units_up_to, TestingDagUnit, Unit}, NodeCount, }; + use aleph_bft_mock::Keychain; #[test] fn initially_empty() { @@ -99,7 +100,10 @@ mod test { let mut units = Units::new(); let n_members = NodeCount(4); let session_id = 2137; - let unit = &random_full_parent_reconstrusted_units_up_to(0, n_members, session_id)[0][0]; + let keychains = Keychain::new_vec(n_members); + let unit = + &random_full_parent_reconstrusted_units_up_to(0, n_members, session_id, &keychains)[0] + [0]; units.add_unit(unit.clone()); assert_eq!(units.highest_round(), 0); assert_eq!(units.in_round(0), Some(vec![unit])); @@ -112,11 +116,13 @@ mod test { let n_members = NodeCount(4); let max_round = 43; let session_id = 2137; + let keychains = Keychain::new_vec(n_members); let mut heads = Vec::new(); - for (round, round_units) in - random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) - .into_iter() - .enumerate() + for (round, round_units) in random_full_parent_reconstrusted_units_up_to( + max_round, n_members, session_id, &keychains, + ) + .into_iter() + .enumerate() { heads.push(round_units[round % n_members.0].clone()); for unit in round_units { @@ -138,11 +144,13 @@ mod test { let n_members = NodeCount(4); let max_round = 43; let session_id = 2137; + let keychains = Keychain::new_vec(n_members); let mut heads = Vec::new(); - for (round, round_units) in - random_full_parent_reconstrusted_units_up_to(max_round, n_members, session_id) - .into_iter() - .enumerate() + for (round, round_units) in random_full_parent_reconstrusted_units_up_to( + max_round, n_members, session_id, &keychains, + ) + .into_iter() + .enumerate() { heads.push(round_units[round % n_members.0].clone()); for unit in &round_units { diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index a8cb66b1..0260fdd6 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -6,6 +6,7 @@ mod alerts; mod config; mod creation; mod dag; +mod dissemination; mod extension; mod member; mod network; diff --git a/consensus/src/member.rs b/consensus/src/member.rs index a7f6ebdf..1681ce56 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -1,10 +1,10 @@ use crate::{ + dissemination::{Request, Response}, handle_task_termination, member::Task::{CoordRequest, ParentsRequest, RequestNewest, UnitBroadcast}, network::{Hub as NetworkHub, NetworkData}, runway::{ - self, NetworkIO, NewestUnitResponse, Request, Response, RunwayIO, RunwayNotificationIn, - RunwayNotificationOut, + self, NetworkIO, NewestUnitResponse, RunwayIO, RunwayNotificationIn, RunwayNotificationOut, }, task_queue::TaskQueue, units::{UncheckedSignedUnit, Unit, UnitCoord}, @@ -497,7 +497,7 @@ where RunwayNotificationOut::Request(request) => match request { Request::Coord(coord) => self.on_request_coord(coord), Request::Parents(u_hash) => self.on_request_parents(u_hash), - Request::NewestUnit(salt) => self.on_request_newest(salt), + Request::NewestUnit(_, salt) => self.on_request_newest(salt), }, RunwayNotificationOut::Response(response, recipient) => match response { Response::Coord(u) => { @@ -552,7 +552,7 @@ where Request::Parents(u_hash) => { self.not_resolved_parents.remove(&u_hash); }, - Request::NewestUnit(_) => { + Request::NewestUnit(..) => { self.newest_unit_resolved = true; } }, diff --git a/consensus/src/runway/collection.rs b/consensus/src/runway/collection.rs index 8de52016..fb3c952a 100644 --- a/consensus/src/runway/collection.rs +++ b/consensus/src/runway/collection.rs @@ -154,6 +154,10 @@ impl<'a, MK: Keychain> Collection<'a, MK> { ) } + fn index(&self) -> NodeIndex { + self.keychain.index() + } + /// Process a response to a newest unit request. pub fn on_newest_response( &mut self, @@ -243,10 +247,10 @@ impl<'a, H: Hasher, D: Data, MK: Keychain> IO<'a, H, D, MK> { if self.round_for_creator.send(round).is_err() { error!(target: "AlephBFT-runway", "unable to send starting round to creator"); } - if let Err(e) = self - .resolved_requests - .unbounded_send(Request::NewestUnit(self.collection.salt())) - { + if let Err(e) = self.resolved_requests.unbounded_send(Request::NewestUnit( + self.collection.index(), + self.collection.salt(), + )) { warn!(target: "AlephBFT-runway", "unable to send resolved request: {}", e); } info!(target: "AlephBFT-runway", "Finished initial unit collection with status: {:?}", self.collection.status()); diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 45daf01e..cbdc6234 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -2,15 +2,16 @@ use crate::{ alerts::{Alert, ForkingNotification, NetworkMessage}, creation, dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest}, + dissemination::{Request, Responder, Response}, extension::Ordering, handle_task_termination, member::UnitMessage, units::{ - SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, - UnitWithParents, Validator, WrappedUnit, + SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, Validator, + WrappedUnit, }, Config, Data, DataProvider, Hasher, Index, Keychain, MultiKeychain, NodeIndex, Receiver, Round, - Sender, Signature, Signed, SpawnHandle, Terminator, UncheckedSigned, + Sender, Signature, SpawnHandle, Terminator, UncheckedSigned, }; use aleph_bft_types::{Recipient, UnitFinalizationHandler}; use futures::{ @@ -37,19 +38,6 @@ use crate::backup::{BackupLoader, BackupSaver}; use collection::{Collection, IO as CollectionIO}; pub use collection::{NewestUnitResponse, Salt}; -/// Possible requests for information from other nodes. -pub enum Request { - Coord(UnitCoord), - Parents(H::Hash), - NewestUnit(Salt), -} - -pub(crate) enum Response { - Coord(UncheckedSignedUnit), - Parents(H::Hash, Vec>), - NewestUnit(UncheckedSigned, S>), -} - pub(crate) enum RunwayNotificationOut { /// A new unit was generated by this runway NewSelfUnit(UncheckedSignedUnit), @@ -84,7 +72,7 @@ impl TryFrom> RunwayNotificationIn::Response(Response::Parents(u_hash, parents)) } UnitMessage::RequestNewest(node_id, salt) => { - RunwayNotificationIn::Request(Request::NewestUnit(salt), node_id) + RunwayNotificationIn::Request(Request::NewestUnit(node_id, salt), node_id) } UnitMessage::ResponseNewest(response) => { RunwayNotificationIn::Response(Response::NewestUnit(response)) @@ -104,12 +92,13 @@ where FH: UnitFinalizationHandler, MK: MultiKeychain, { + own_id: NodeIndex, missing_coords: HashSet, missing_parents: HashSet<::Hash>, store: UnitStore>, - keychain: MK, dag: Dag, ordering: Ordering, + responder: Responder, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, unit_messages_from_network: Receiver>, @@ -234,6 +223,7 @@ where { fn new(config: RunwayConfig, keychain: MK, validator: Validator) -> Self { let n_members = keychain.node_count(); + let own_id = keychain.index(); let RunwayConfig { finalization_handler, backup_units_for_saver, @@ -252,12 +242,13 @@ where let ordering = Ordering::new(finalization_handler); Runway { + own_id, store, - keychain, dag, ordering, missing_coords: HashSet::new(), missing_parents: HashSet::new(), + responder: Responder::new(keychain), resolved_requests, alerts_for_alerter, notifications_from_alerter, @@ -273,7 +264,7 @@ where } fn index(&self) -> NodeIndex { - self.keychain.index() + self.own_id } fn handle_dag_result(&mut self, result: DagResult) { @@ -314,20 +305,16 @@ where self.on_unit_received(u) } - RunwayNotificationIn::Request(request, node_id) => match request { - Request::Coord(coord) => { - trace!(target: "AlephBFT-runway", "{:?} Coords request received {:?}.", self.index(), coord); - self.on_request_coord(node_id, coord) - } - Request::Parents(u_hash) => { - trace!(target: "AlephBFT-runway", "{:?} Parents request received {:?}.", self.index(), u_hash); - self.on_request_parents(node_id, u_hash) - } - Request::NewestUnit(salt) => { - trace!(target: "AlephBFT-runway", "{:?} Newest unit request received {:?}.", self.index(), salt); - self.on_request_newest(node_id, salt) + RunwayNotificationIn::Request(request, node_id) => { + match self.responder.handle_request(request, &self.store) { + Ok(response) => self.send_message_for_network(RunwayNotificationOut::Response( + response, node_id, + )), + Err(err) => { + trace!(target: "AlephBFT-runway", "Not answering request from node {:?}: {}.", node_id, err) + } } - }, + } RunwayNotificationIn::Response(res) => match res { Response::Coord(u) => { @@ -355,72 +342,6 @@ where } } - fn on_request_coord(&mut self, node_id: NodeIndex, coord: UnitCoord) { - debug!(target: "AlephBFT-runway", "{:?} Received fetch request for coord {:?} from {:?}.", self.index(), coord, node_id); - match self.store.canonical_unit(coord).cloned() { - Some(su) => { - trace!(target: "AlephBFT-runway", "{:?} Answering fetch request for coord {:?} from {:?}.", self.index(), coord, node_id); - self.send_message_for_network(RunwayNotificationOut::Response( - Response::Coord(su.unpack().into()), - node_id, - )); - } - None => { - trace!(target: "AlephBFT-runway", "{:?} Not answering fetch request for coord {:?}. Unit not in store.", self.index(), coord); - } - } - } - - fn on_request_parents(&mut self, node_id: NodeIndex, u_hash: ::Hash) { - debug!(target: "AlephBFT-runway", "{:?} Received parents request for hash {:?} from {:?}.", self.index(), u_hash, node_id); - - match self.store.unit(&u_hash) { - Some(unit) => { - trace!(target: "AlephBFT-runway", "{:?} Answering parents request for hash {:?} from {:?}.", self.index(), u_hash, node_id); - let parents = unit - .parents() - .values() - .map(|parent_hash| { - self.store - .unit(parent_hash) - .expect("We add units to the store in order.") - .clone() - .unpack() - .into_unchecked() - }) - .collect(); - self.send_message_for_network(RunwayNotificationOut::Response( - Response::Parents(u_hash, parents), - node_id, - )); - } - None => { - trace!(target: "AlephBFT-runway", "{:?} Not answering parents request for hash {:?}. Unit not in DAG yet.", self.index(), u_hash); - } - } - } - - fn on_request_newest(&mut self, requester: NodeIndex, salt: u64) { - let unit = self - .store - .canonical_units(requester) - .last() - .map(|unit| unit.clone().unpack().into_unchecked()); - let response = NewestUnitResponse::new(requester, self.index(), unit, salt); - - let signed_response = Signed::sign(response, &self.keychain).into_unchecked(); - - if let Err(e) = - self.unit_messages_for_network - .unbounded_send(RunwayNotificationOut::Response( - Response::NewestUnit(signed_response), - requester, - )) - { - error!(target: "AlephBFT-runway", "Unable to send response to network: {}", e); - } - } - fn on_parents_response( &mut self, u_hash: ::Hash, @@ -650,7 +571,7 @@ fn initial_unit_collection<'a, H: Hasher, D: Data, MK: MultiKeychain>( resolved_requests: Sender>, ) -> Result + 'a, ()> { let (collection, salt) = Collection::new(keychain, validator); - let notification = RunwayNotificationOut::Request(Request::NewestUnit(salt)); + let notification = RunwayNotificationOut::Request(Request::NewestUnit(keychain.index(), salt)); if let Err(e) = unit_messages_for_network.unbounded_send(notification) { error!(target: "AlephBFT-runway", "Unable to send the newest unit request: {}", e); diff --git a/consensus/src/units/testing.rs b/consensus/src/units/testing.rs index 19bfa8cb..dfaa4a81 100644 --- a/consensus/src/units/testing.rs +++ b/consensus/src/units/testing.rs @@ -16,7 +16,7 @@ type PreUnit = GenericPreUnit; pub type FullUnit = GenericFullUnit; type UncheckedSignedUnit = GenericUncheckedSignedUnit; pub type SignedUnit = GenericSignedUnit; -pub type DagUnit = ReconstructedUnit; +pub type DagUnit = ReconstructedUnit; #[derive(Clone)] pub struct WrappedSignedUnit(pub SignedUnit); @@ -111,46 +111,63 @@ fn initial_preunit(n_members: NodeCount, node_id: NodeIndex) -> PreUnit { ) } -fn random_initial_reconstructed_units(n_members: NodeCount, session_id: SessionId) -> Vec { +fn random_initial_units(n_members: NodeCount, session_id: SessionId) -> Vec { n_members .into_iterator() .map(|node_id| initial_preunit(n_members, node_id)) - .map(|preunit| ReconstructedUnit::initial(preunit_to_full_unit(preunit, session_id))) + .map(|preunit| preunit_to_full_unit(preunit, session_id)) .collect() } -fn random_initial_units(n_members: NodeCount, session_id: SessionId) -> Vec { - random_initial_reconstructed_units(n_members, session_id) +fn random_initial_reconstructed_units( + n_members: NodeCount, + session_id: SessionId, + keychains: &[Keychain], +) -> Vec { + random_initial_units(n_members, session_id) .into_iter() - .map(|unit| unit.unpack()) + .map(|full_unit| { + let keychain = &keychains[full_unit.creator().0]; + ReconstructedUnit::initial(full_unit_to_signed_unit(full_unit, keychain)) + }) .collect() } -pub fn random_reconstructed_unit_with_parents>( +fn parent_map>(parents: &Vec) -> NodeMap { + let n_members = parents + .last() + .expect("there are parents") + .control_hash() + .n_members(); + let mut result = NodeMap::with_size(n_members); + for parent in parents { + result.insert(parent.creator(), parent.hash()); + } + result +} + +pub fn random_unit_with_parents>( creator: NodeIndex, parents: &Vec, -) -> DagUnit { +) -> FullUnit { let representative_parent = parents.last().expect("there are parents"); - let n_members = representative_parent.control_hash().n_members(); let session_id = representative_parent.session_id(); let round = representative_parent.round() + 1; - let mut parent_map = NodeMap::with_size(n_members); - for parent in parents { - parent_map.insert(parent.creator(), parent.hash()); - } + let parent_map = parent_map(parents); let control_hash = ControlHash::new(&parent_map); - ReconstructedUnit::with_parents( - preunit_to_full_unit(PreUnit::new(creator, round, control_hash), session_id), - parent_map, - ) - .expect("correct parents") + preunit_to_full_unit(PreUnit::new(creator, round, control_hash), session_id) } -pub fn random_unit_with_parents>( +pub fn random_reconstructed_unit_with_parents>( creator: NodeIndex, parents: &Vec, -) -> FullUnit { - random_reconstructed_unit_with_parents(creator, parents).unpack() + keychain: &Keychain, +) -> DagUnit { + ReconstructedUnit::with_parents( + full_unit_to_signed_unit(random_unit_with_parents(creator, parents), keychain), + parent_map(parents), + ) + .expect("correct parents") } pub fn random_full_parent_units_up_to( @@ -175,8 +192,11 @@ pub fn random_full_parent_reconstrusted_units_up_to( round: Round, n_members: NodeCount, session_id: SessionId, + keychains: &[Keychain], ) -> Vec> { - let mut result = vec![random_initial_reconstructed_units(n_members, session_id)]; + let mut result = vec![random_initial_reconstructed_units( + n_members, session_id, keychains, + )]; for _ in 0..round { let units = n_members .into_iterator() @@ -184,6 +204,7 @@ pub fn random_full_parent_reconstrusted_units_up_to( random_reconstructed_unit_with_parents( node_id, result.last().expect("previous round present"), + &keychains[node_id.0], ) }) .collect();