From 09e0bca9453d291d3f24ab17f9d7e277a713fb08 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Wed, 6 Nov 2024 17:04:32 -0800 Subject: [PATCH] Add proper support for rounds where echo broadcasts are only sent to a subset of nodes --- Cargo.lock | 1 + manul/Cargo.toml | 2 + manul/src/lib.rs | 3 + manul/src/protocol.rs | 3 +- manul/src/protocol/object_safe.rs | 20 ++- manul/src/protocol/round.rs | 40 ++++- manul/src/session/echo.rs | 35 ++--- manul/src/session/session.rs | 52 +++++-- manul/src/tests.rs | 1 + manul/src/tests/partial_echo.rs | 235 ++++++++++++++++++++++++++++++ 10 files changed, 347 insertions(+), 45 deletions(-) create mode 100644 manul/src/tests.rs create mode 100644 manul/src/tests/partial_echo.rs diff --git a/Cargo.lock b/Cargo.lock index c31e3f0..44b9893 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,6 +493,7 @@ dependencies = [ "serde_json", "signature", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/manul/Cargo.toml b/manul/Cargo.toml index 893157b..c8f6a7b 100644 --- a/manul/Cargo.toml +++ b/manul/Cargo.toml @@ -28,12 +28,14 @@ serde_json = { version = "1", default-features = false, features = ["alloc"], op [dev-dependencies] impls = "1" +rand_core = { version = "0.6.4", default-features = false, features = ["getrandom"] } rand = { version = "0.8", default-features = false } serde_asn1_der = "0.8" criterion = "0.5" serde-persistent-deserializer = "0.3" postcard = { version = "1", default-features = false, features = ["alloc"] } serde_json = { version = "1", default-features = false, features = ["alloc"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] testing = ["rand", "postcard", "serde_json", "serde-persistent-deserializer"] diff --git a/manul/src/lib.rs b/manul/src/lib.rs index 9f3a842..23131f1 100644 --- a/manul/src/lib.rs +++ b/manul/src/lib.rs @@ -21,3 +21,6 @@ pub(crate) mod utils; #[cfg(any(test, feature = "testing"))] pub mod testing; + +#[cfg(test)] +mod tests; diff --git a/manul/src/protocol.rs b/manul/src/protocol.rs index 0e53744..463490b 100644 --- a/manul/src/protocol.rs +++ b/manul/src/protocol.rs @@ -23,7 +23,8 @@ pub use errors::{ }; pub use message::{DirectMessage, EchoBroadcast, NormalBroadcast, ProtocolMessagePart}; pub use round::{ - AnotherRound, Artifact, FinalizeOutcome, FirstRound, PartyId, Payload, Protocol, ProtocolError, Round, RoundId, + AnotherRound, Artifact, EchoRoundParticipation, FinalizeOutcome, FirstRound, PartyId, Payload, Protocol, + ProtocolError, Round, RoundId, }; pub use serialization::{Deserializer, Serializer}; diff --git a/manul/src/protocol/object_safe.rs b/manul/src/protocol/object_safe.rs index fd65f95..e140afc 100644 --- a/manul/src/protocol/object_safe.rs +++ b/manul/src/protocol/object_safe.rs @@ -10,7 +10,7 @@ use rand_core::{CryptoRng, CryptoRngCore, RngCore}; use super::{ errors::{FinalizeError, LocalError, ReceiveError}, message::{DirectMessage, EchoBroadcast, NormalBroadcast}, - round::{Artifact, FinalizeOutcome, PartyId, Payload, Protocol, Round, RoundId}, + round::{Artifact, EchoRoundParticipation, FinalizeOutcome, PartyId, Payload, Protocol, Round, RoundId}, serialization::{Deserializer, Serializer}, }; @@ -48,6 +48,10 @@ pub(crate) trait ObjectSafeRound: 'static + Debug + Send + Sync { fn message_destinations(&self) -> &BTreeSet; + fn expecting_messages_from(&self) -> &BTreeSet; + + fn echo_round_participation(&self) -> EchoRoundParticipation; + fn make_direct_message_with_artifact( &self, rng: &mut dyn CryptoRngCore, @@ -84,8 +88,6 @@ pub(crate) trait ObjectSafeRound: 'static + Debug + Send + Sync { artifacts: BTreeMap, ) -> Result, FinalizeError>; - fn expecting_messages_from(&self) -> &BTreeSet; - /// Returns the type ID of the implementing type. fn get_type_id(&self) -> core::any::TypeId; } @@ -130,6 +132,14 @@ where self.round.message_destinations() } + fn expecting_messages_from(&self) -> &BTreeSet { + self.round.expecting_messages_from() + } + + fn echo_round_participation(&self) -> EchoRoundParticipation { + self.round.echo_round_participation() + } + fn make_direct_message_with_artifact( &self, rng: &mut dyn CryptoRngCore, @@ -189,10 +199,6 @@ where self.round.finalize(&mut boxed_rng, payloads, artifacts) } - fn expecting_messages_from(&self) -> &BTreeSet { - self.round.expecting_messages_from() - } - fn get_type_id(&self) -> core::any::TypeId { core::any::TypeId::of::() } diff --git a/manul/src/protocol/round.rs b/manul/src/protocol/round.rs index e2f113b..8e97634 100644 --- a/manul/src/protocol/round.rs +++ b/manul/src/protocol/round.rs @@ -322,6 +322,25 @@ pub trait PartyId: 'static + Debug + Clone + Ord + Send + Sync {} impl PartyId for T where T: 'static + Debug + Clone + Ord + Send + Sync {} +/// The specific way the node participates in the echo round (if any). +#[derive(Debug, Clone)] +pub enum EchoRoundParticipation { + /// The default behavior: sends broadcasts and receives echoed messages, or does neither. + /// + /// That is, this node will be a part of the echo round if [`Round::make_echo_broadcast`] generates a message. + Default, + + /// This node sends broadcasts that will be echoed, but does not receive any. + Send, + + /// This node receives broadcasts that it needs to echo, but does not send any itself. + Receive { + /// The other participants of the echo round + /// (that is, the nodes to which echoed messages will be sent). + echo_targets: BTreeSet, + }, +} + /** A type representing a single round of a protocol. @@ -354,6 +373,21 @@ pub trait Round: 'static + Debug + Send + Sync { /// for each element of the returned set. fn message_destinations(&self) -> &BTreeSet; + /// Returns the set of node IDs from which this round expects messages. + /// + /// The execution layer will not call [`finalize`](`Self::finalize`) until all these nodes have responded + /// (and the corresponding [`receive_message`](`Self::receive_message`) finished successfully). + fn expecting_messages_from(&self) -> &BTreeSet; + + /// Returns the specific way the node participates in the echo round following this round. + /// + /// Returns [`EchoRoundParticipation::Default`] by default; this works fine when every node + /// sends messages to every other one, or do not send or receive any echo broadcasts. + /// Otherwise, review the options in [`EchoRoundParticipation`] and pick the appropriate one. + fn echo_round_participation(&self) -> EchoRoundParticipation { + EchoRoundParticipation::Default + } + /// Returns the direct message to the given destination and (maybe) an accompanying artifact. /// /// Return [`DirectMessage::none`] if this round does not send direct messages. @@ -445,10 +479,4 @@ pub trait Round: 'static + Debug + Send + Sync { payloads: BTreeMap, artifacts: BTreeMap, ) -> Result, FinalizeError>; - - /// Returns the set of node IDs from which this round expects messages. - /// - /// The execution layer will not call [`finalize`](`Self::finalize`) until all these nodes have responded - /// (and the corresponding [`receive_message`](`Self::receive_message`) finished successfully). - fn expecting_messages_from(&self) -> &BTreeSet; } diff --git a/manul/src/session/echo.rs b/manul/src/session/echo.rs index 5bbd124..eb37240 100644 --- a/manul/src/session/echo.rs +++ b/manul/src/session/echo.rs @@ -13,7 +13,7 @@ use tracing::debug; use super::{ message::{MessageVerificationError, SignedMessagePart}, - session::SessionParameters, + session::{EchoRoundInfo, SessionParameters}, LocalError, }; use crate::{ @@ -78,8 +78,7 @@ pub(crate) struct EchoRoundMessage { pub struct EchoRound { verifier: SP::Verifier, echo_broadcasts: BTreeMap>, - destinations: BTreeSet, - expected_echos: BTreeSet, + echo_round_info: EchoRoundInfo, main_round: Box>, payloads: BTreeMap, artifacts: BTreeMap, @@ -94,25 +93,19 @@ where verifier: SP::Verifier, my_echo_broadcast: SignedMessagePart, echo_broadcasts: BTreeMap>, + echo_round_info: EchoRoundInfo, main_round: Box>, payloads: BTreeMap, artifacts: BTreeMap, ) -> Self { - let destinations = echo_broadcasts.keys().cloned().collect::>(); - - // Add our own echo message because we expect it to be sent back from other nodes. - let mut expected_echos = destinations.clone(); - expected_echos.insert(verifier.clone()); - let mut echo_broadcasts = echo_broadcasts; echo_broadcasts.insert(verifier.clone(), my_echo_broadcast); - debug!("{:?}: initialized echo round with {:?}", verifier, destinations); + debug!("{:?}: initialized echo round with {:?}", verifier, echo_round_info); Self { verifier, echo_broadcasts, - destinations, - expected_echos, + echo_round_info, main_round, payloads, artifacts, @@ -155,7 +148,7 @@ where } fn message_destinations(&self) -> &BTreeSet { - &self.destinations + &self.echo_round_info.message_destinations } fn make_normal_broadcast( @@ -181,7 +174,7 @@ where } fn expecting_messages_from(&self) -> &BTreeSet { - &self.destinations + &self.echo_round_info.expecting_messages_from } fn receive_message( @@ -200,16 +193,14 @@ where let message = normal_broadcast.deserialize::>(deserializer)?; - // Check that the received message contains entries from `destinations` sans `from` + // Check that the received message contains entries from `expected_echos`. // It is an unprovable fault. - let mut expected_keys = self.expected_echos.clone(); - if !expected_keys.remove(from) { - return Err(ReceiveError::local(format!( - "The message sender {from:?} is missing from the expected senders {:?}", - self.destinations - ))); - } + let mut expected_keys = self.echo_round_info.expected_echos.clone(); + + // We don't expect the node to send its echo the second time. + expected_keys.remove(from); + let message_keys = message.echo_broadcasts.keys().cloned().collect::>(); let missing_keys = expected_keys.difference(&message_keys).collect::>(); diff --git a/manul/src/session/session.rs b/manul/src/session/session.rs index 5c1845e..da55a69 100644 --- a/manul/src/session/session.rs +++ b/manul/src/session/session.rs @@ -23,9 +23,9 @@ use super::{ LocalError, RemoteError, }; use crate::protocol::{ - Artifact, Deserializer, DirectMessage, EchoBroadcast, FinalizeError, FinalizeOutcome, FirstRound, NormalBroadcast, - ObjectSafeRound, ObjectSafeRoundWrapper, PartyId, Payload, Protocol, ProtocolMessagePart, ReceiveError, - ReceiveErrorType, Round, RoundId, Serializer, + Artifact, Deserializer, DirectMessage, EchoBroadcast, EchoRoundParticipation, FinalizeError, FinalizeOutcome, + FirstRound, NormalBroadcast, ObjectSafeRound, ObjectSafeRoundWrapper, PartyId, Payload, Protocol, + ProtocolMessagePart, ReceiveError, ReceiveErrorType, Round, RoundId, Serializer, }; /// A set of types needed to execute a session. @@ -97,6 +97,13 @@ impl AsRef<[u8]> for SessionId { } } +#[derive(Debug)] +pub(crate) struct EchoRoundInfo { + pub(crate) message_destinations: BTreeSet, + pub(crate) expecting_messages_from: BTreeSet, + pub(crate) expected_echos: BTreeSet, +} + /// An object encapsulating the currently active round, transport protocol, /// and the database of messages and errors from the previous rounds. #[derive(Debug)] @@ -108,6 +115,7 @@ pub struct Session { deserializer: Deserializer, round: Box>, message_destinations: BTreeSet, + echo_round_info: Option>, echo_broadcast: SignedMessagePart, normal_broadcast: SignedMessagePart, possible_next_rounds: BTreeSet, @@ -182,10 +190,36 @@ where let message_destinations = round.message_destinations().clone(); - let possible_next_rounds = if echo_broadcast.payload().is_none() { - round.possible_next_rounds() - } else { + let echo_round_participation = round.echo_round_participation(); + + let round_sends_echo_broadcast = !echo_broadcast.payload().is_none(); + let echo_round_info = match echo_round_participation { + EchoRoundParticipation::Default => { + if round_sends_echo_broadcast { + // Add our own echo message to the expected list because we expect it to be sent back from other nodes. + let mut expected_echos = round.expecting_messages_from().clone(); + expected_echos.insert(verifier.clone()); + Some(EchoRoundInfo { + message_destinations: message_destinations.clone(), + expecting_messages_from: message_destinations.clone(), + expected_echos, + }) + } else { + None + } + } + EchoRoundParticipation::Send => None, + EchoRoundParticipation::Receive { echo_targets } => Some(EchoRoundInfo { + message_destinations: echo_targets.clone(), + expecting_messages_from: echo_targets, + expected_echos: round.expecting_messages_from().clone(), + }), + }; + + let possible_next_rounds = if echo_round_info.is_some() { BTreeSet::from([round.id().echo()]) + } else { + round.possible_next_rounds() }; Ok(Self { @@ -199,6 +233,7 @@ where normal_broadcast, possible_next_rounds, message_destinations, + echo_round_info, transcript, }) } @@ -459,13 +494,12 @@ where accum.still_have_not_sent_messages, )?; - let echo_round_needed = !self.echo_broadcast.payload().is_none(); - - if echo_round_needed { + if let Some(echo_round_info) = self.echo_round_info { let round = Box::new(ObjectSafeRoundWrapper::new(EchoRound::::new( verifier, self.echo_broadcast, transcript.echo_broadcasts(round_id)?, + echo_round_info, self.round, accum.payloads, accum.artifacts, diff --git a/manul/src/tests.rs b/manul/src/tests.rs new file mode 100644 index 0000000..310f393 --- /dev/null +++ b/manul/src/tests.rs @@ -0,0 +1 @@ +mod partial_echo; diff --git a/manul/src/tests/partial_echo.rs b/manul/src/tests/partial_echo.rs new file mode 100644 index 0000000..5b9d8c1 --- /dev/null +++ b/manul/src/tests/partial_echo.rs @@ -0,0 +1,235 @@ +use alloc::{ + collections::{BTreeMap, BTreeSet}, + format, + string::String, + vec, + vec::Vec, +}; +use core::fmt::Debug; + +use rand_core::{CryptoRngCore, OsRng}; +use serde::{Deserialize, Serialize}; +use tracing_subscriber::EnvFilter; + +use crate::{ + protocol::*, + session::{signature::Keypair, SessionOutcome}, + testing::{run_sync, BinaryFormat, TestSessionParams, TestSigner, TestVerifier}, +}; + +#[derive(Debug)] +struct PartialEchoProtocol; + +impl Protocol for PartialEchoProtocol { + type Result = (); + type ProtocolError = PartialEchoProtocolError; + type CorrectnessProof = (); +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PartialEchoProtocolError; + +impl ProtocolError for PartialEchoProtocolError { + fn description(&self) -> String { + format!("{:?}", self) + } + + fn verify_messages_constitute_error( + &self, + _deserializer: &Deserializer, + _echo_broadcast: &EchoBroadcast, + _normal_broadcast: &NormalBroadcast, + _direct_message: &DirectMessage, + _echo_broadcasts: &BTreeMap, + _normal_broadcasts: &BTreeMap, + _direct_messages: &BTreeMap, + _combined_echos: &BTreeMap>, + ) -> Result<(), ProtocolValidationError> { + unimplemented!() + } +} + +#[derive(Debug, Clone)] +struct Inputs { + message_destinations: Vec, + expecting_messages_from: Vec, + echo_round_participation: EchoRoundParticipation, +} + +#[derive(Debug)] +struct Round1 { + id: Id, + message_destinations: BTreeSet, + expecting_messages_from: BTreeSet, + echo_round_participation: EchoRoundParticipation, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Round1Echo { + sender: Id, +} + +impl Deserialize<'de>> FirstRound for Round1 { + type Inputs = Inputs; + fn new( + _rng: &mut impl CryptoRngCore, + _shared_randomness: &[u8], + id: Id, + inputs: Self::Inputs, + ) -> Result { + let message_destinations = BTreeSet::from_iter(inputs.message_destinations); + let expecting_messages_from = BTreeSet::from_iter(inputs.expecting_messages_from); + Ok(Self { + id, + message_destinations, + expecting_messages_from, + echo_round_participation: inputs.echo_round_participation, + }) + } +} + +impl Deserialize<'de>> Round for Round1 { + type Protocol = PartialEchoProtocol; + + fn id(&self) -> RoundId { + RoundId::new(1) + } + + fn possible_next_rounds(&self) -> BTreeSet { + BTreeSet::new() + } + + fn message_destinations(&self) -> &BTreeSet { + &self.message_destinations + } + + fn expecting_messages_from(&self) -> &BTreeSet { + &self.expecting_messages_from + } + + fn echo_round_participation(&self) -> EchoRoundParticipation { + self.echo_round_participation.clone() + } + + fn make_echo_broadcast( + &self, + _rng: &mut impl CryptoRngCore, + serializer: &Serializer, + ) -> Result { + if self.message_destinations.is_empty() { + Ok(EchoBroadcast::none()) + } else { + EchoBroadcast::new( + serializer, + Round1Echo { + sender: self.id.clone(), + }, + ) + } + } + + fn receive_message( + &self, + _rng: &mut impl CryptoRngCore, + deserializer: &Deserializer, + from: &Id, + echo_broadcast: EchoBroadcast, + normal_broadcast: NormalBroadcast, + direct_message: DirectMessage, + ) -> Result> { + normal_broadcast.assert_is_none()?; + direct_message.assert_is_none()?; + + if self.expecting_messages_from.is_empty() { + echo_broadcast.assert_is_none()?; + } else { + let echo = echo_broadcast.deserialize::>(deserializer)?; + assert_eq!(&echo.sender, from); + assert!(self.expecting_messages_from.contains(from)); + } + + Ok(Payload::new(())) + } + + fn finalize( + self, + _rng: &mut impl CryptoRngCore, + _payloads: BTreeMap, + _artifacts: BTreeMap, + ) -> Result, FinalizeError> { + Ok(FinalizeOutcome::Result(())) + } +} + +#[test] +fn partial_echo() { + let signers = (0..5).map(TestSigner::new).collect::>(); + let ids = signers.iter().map(|signer| signer.verifying_key()).collect::>(); + + // Nodes 0, 1 send an echo broadcast to nodes 1, 2, 3 + // The echo round happens between the nodes 1, 2, 3 + // Node 0 only sends the broadcasts, but doesn't receive any, so it skips the echo round + // Node 4 doesn't send or receive any broadcasts, so it skips the echo round + + let node0 = ( + signers[0], + Inputs { + message_destinations: [ids[1], ids[2], ids[3]].into(), + expecting_messages_from: [].into(), + echo_round_participation: EchoRoundParticipation::Send, + }, + ); + let node1 = ( + signers[1], + Inputs { + message_destinations: [ids[2], ids[3]].into(), + expecting_messages_from: [ids[0]].into(), + echo_round_participation: EchoRoundParticipation::Default, + }, + ); + let node2 = ( + signers[2], + Inputs { + message_destinations: [].into(), + expecting_messages_from: [ids[0], ids[1]].into(), + echo_round_participation: EchoRoundParticipation::Receive { + echo_targets: BTreeSet::from([ids[1], ids[3]]), + }, + }, + ); + let node3 = ( + signers[3], + Inputs { + message_destinations: [].into(), + expecting_messages_from: [ids[0], ids[1]].into(), + echo_round_participation: EchoRoundParticipation::Receive { + echo_targets: BTreeSet::from([ids[1], ids[2]]), + }, + }, + ); + let node4 = ( + signers[4], + Inputs { + message_destinations: [].into(), + expecting_messages_from: [].into(), + echo_round_participation: EchoRoundParticipation::::Default, + }, + ); + + let inputs = vec![node0, node1, node2, node3, node4]; + + let my_subscriber = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .finish(); + let reports = tracing::subscriber::with_default(my_subscriber, || { + run_sync::, TestSessionParams>(&mut OsRng, inputs).unwrap() + }); + + for (id, report) in reports { + if !matches!(report.outcome, SessionOutcome::Result(_)) { + extern crate std; + std::println!("{:?}: {:?}", id, report); + panic!(); + }; + } +}