From 437c8038c89e171150a094dff7c772a2be2fb789 Mon Sep 17 00:00:00 2001 From: Bogdan Opanchuk Date: Wed, 6 Nov 2024 17:04:32 -0800 Subject: [PATCH] Add proper support for rounds where echo broadcasts are only sent to a subset of nodes --- Cargo.lock | 1 + manul/Cargo.toml | 2 + manul/src/combinators/chain.rs | 21 ++- manul/src/combinators/misbehave.rs | 17 ++- manul/src/lib.rs | 3 + manul/src/protocol.rs | 3 +- manul/src/protocol/object_safe.rs | 20 ++- manul/src/protocol/round.rs | 40 ++++- manul/src/session/echo.rs | 35 ++--- manul/src/session/session.rs | 52 +++++-- manul/src/tests.rs | 1 + manul/src/tests/partial_echo.rs | 232 +++++++++++++++++++++++++++++ 12 files changed, 369 insertions(+), 58 deletions(-) create mode 100644 manul/src/tests.rs create mode 100644 manul/src/tests/partial_echo.rs diff --git a/Cargo.lock b/Cargo.lock index a5dc428..ee99c2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -494,6 +494,7 @@ dependencies = [ "signature", "tinyvec", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/manul/Cargo.toml b/manul/Cargo.toml index ede1380..8bb38b8 100644 --- a/manul/Cargo.toml +++ b/manul/Cargo.toml @@ -29,12 +29,14 @@ serde_json = { version = "1", default-features = false, features = ["alloc"], op [dev-dependencies] impls = "1" +rand_core = { version = "0.6.4", default-features = false, features = ["getrandom"] } rand = { version = "0.8", default-features = false } serde_asn1_der = "0.8" criterion = "0.5" serde-persistent-deserializer = "0.3" postcard = { version = "1", default-features = false, features = ["alloc"] } serde_json = { version = "1", default-features = false, features = ["alloc"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] testing = ["rand", "postcard", "serde_json", "serde-persistent-deserializer"] diff --git a/manul/src/combinators/chain.rs b/manul/src/combinators/chain.rs index 13450d9..32ef8c9 100644 --- a/manul/src/combinators/chain.rs +++ b/manul/src/combinators/chain.rs @@ -382,6 +382,20 @@ where } } + fn expecting_messages_from(&self) -> &BTreeSet { + match &self.state { + ChainState::Protocol1 { round, .. } => round.as_ref().expecting_messages_from(), + ChainState::Protocol2(round) => round.as_ref().expecting_messages_from(), + } + } + + fn echo_round_participation(&self) -> EchoRoundParticipation { + match &self.state { + ChainState::Protocol1 { round, .. } => round.as_ref().echo_round_participation(), + ChainState::Protocol2(round) => round.as_ref().echo_round_participation(), + } + } + fn make_direct_message( &self, rng: &mut dyn CryptoRngCore, @@ -514,11 +528,4 @@ where }, } } - - fn expecting_messages_from(&self) -> &BTreeSet { - match &self.state { - ChainState::Protocol1 { round, .. } => round.as_ref().expecting_messages_from(), - ChainState::Protocol2(round) => round.as_ref().expecting_messages_from(), - } - } } diff --git a/manul/src/combinators/misbehave.rs b/manul/src/combinators/misbehave.rs index d71ddc2..311995a 100644 --- a/manul/src/combinators/misbehave.rs +++ b/manul/src/combinators/misbehave.rs @@ -29,8 +29,9 @@ use core::fmt::Debug; use rand_core::CryptoRngCore; use crate::protocol::{ - Artifact, BoxedRng, BoxedRound, Deserializer, DirectMessage, EchoBroadcast, EntryPoint, FinalizeError, - FinalizeOutcome, LocalError, NormalBroadcast, ObjectSafeRound, PartyId, Payload, ReceiveError, RoundId, Serializer, + Artifact, BoxedRng, BoxedRound, Deserializer, DirectMessage, EchoBroadcast, EchoRoundParticipation, EntryPoint, + FinalizeError, FinalizeOutcome, LocalError, NormalBroadcast, ObjectSafeRound, PartyId, Payload, ReceiveError, + RoundId, Serializer, }; /// A trait describing required properties for a behavior type. @@ -173,6 +174,14 @@ where self.round.as_ref().message_destinations() } + fn expecting_messages_from(&self) -> &BTreeSet { + self.round.as_ref().expecting_messages_from() + } + + fn echo_round_participation(&self) -> EchoRoundParticipation { + self.round.as_ref().echo_round_participation() + } + fn make_direct_message( &self, rng: &mut dyn CryptoRngCore, @@ -284,8 +293,4 @@ where Err(err) => Err(err), } } - - fn expecting_messages_from(&self) -> &BTreeSet { - self.round.as_ref().expecting_messages_from() - } } diff --git a/manul/src/lib.rs b/manul/src/lib.rs index c3e9b8b..438b56b 100644 --- a/manul/src/lib.rs +++ b/manul/src/lib.rs @@ -22,3 +22,6 @@ pub(crate) mod utils; #[cfg(any(test, feature = "testing"))] pub mod testing; + +#[cfg(test)] +mod tests; diff --git a/manul/src/protocol.rs b/manul/src/protocol.rs index 46b356f..f6aa234 100644 --- a/manul/src/protocol.rs +++ b/manul/src/protocol.rs @@ -24,7 +24,8 @@ pub use errors::{ pub use message::{DirectMessage, EchoBroadcast, NormalBroadcast, ProtocolMessagePart}; pub use object_safe::BoxedRound; pub use round::{ - Artifact, CorrectnessProof, EntryPoint, FinalizeOutcome, PartyId, Payload, Protocol, ProtocolError, Round, RoundId, + Artifact, CorrectnessProof, EchoRoundParticipation, EntryPoint, FinalizeOutcome, PartyId, Payload, Protocol, + ProtocolError, Round, RoundId, }; pub use serialization::{Deserializer, Serializer}; diff --git a/manul/src/protocol/object_safe.rs b/manul/src/protocol/object_safe.rs index 910abc9..8678951 100644 --- a/manul/src/protocol/object_safe.rs +++ b/manul/src/protocol/object_safe.rs @@ -10,7 +10,7 @@ use rand_core::{CryptoRng, CryptoRngCore, RngCore}; use super::{ errors::{FinalizeError, LocalError, ReceiveError}, message::{DirectMessage, EchoBroadcast, NormalBroadcast}, - round::{Artifact, FinalizeOutcome, PartyId, Payload, Protocol, Round, RoundId}, + round::{Artifact, EchoRoundParticipation, FinalizeOutcome, PartyId, Payload, Protocol, Round, RoundId}, serialization::{Deserializer, Serializer}, }; @@ -48,6 +48,10 @@ pub(crate) trait ObjectSafeRound: 'static + Debug + Send + Sync { fn message_destinations(&self) -> &BTreeSet; + fn expecting_messages_from(&self) -> &BTreeSet; + + fn echo_round_participation(&self) -> EchoRoundParticipation; + fn make_direct_message( &self, rng: &mut dyn CryptoRngCore, @@ -87,8 +91,6 @@ pub(crate) trait ObjectSafeRound: 'static + Debug + Send + Sync { artifacts: BTreeMap, ) -> Result, FinalizeError>; - fn expecting_messages_from(&self) -> &BTreeSet; - /// Returns the type ID of the implementing type. fn get_type_id(&self) -> core::any::TypeId { core::any::TypeId::of::() @@ -135,6 +137,14 @@ where self.round.message_destinations() } + fn expecting_messages_from(&self) -> &BTreeSet { + self.round.expecting_messages_from() + } + + fn echo_round_participation(&self) -> EchoRoundParticipation { + self.round.echo_round_participation() + } + fn make_direct_message( &self, rng: &mut dyn CryptoRngCore, @@ -195,10 +205,6 @@ where let mut boxed_rng = BoxedRng(rng); self.round.finalize(&mut boxed_rng, payloads, artifacts) } - - fn expecting_messages_from(&self) -> &BTreeSet { - self.round.expecting_messages_from() - } } // We do not want to expose `ObjectSafeRound` to the user, so it is hidden in a struct. diff --git a/manul/src/protocol/round.rs b/manul/src/protocol/round.rs index bf6f6e7..9a306cc 100644 --- a/manul/src/protocol/round.rs +++ b/manul/src/protocol/round.rs @@ -368,6 +368,25 @@ pub trait PartyId: 'static + Debug + Clone + Ord + Send + Sync + Serialize + for impl PartyId for T where T: 'static + Debug + Clone + Ord + Send + Sync + Serialize + for<'de> Deserialize<'de> {} +/// The specific way the node participates in the echo round (if any). +#[derive(Debug, Clone)] +pub enum EchoRoundParticipation { + /// The default behavior: sends broadcasts and receives echoed messages, or does neither. + /// + /// That is, this node will be a part of the echo round if [`Round::make_echo_broadcast`] generates a message. + Default, + + /// This node sends broadcasts that will be echoed, but does not receive any. + Send, + + /// This node receives broadcasts that it needs to echo, but does not send any itself. + Receive { + /// The other participants of the echo round + /// (that is, the nodes to which echoed messages will be sent). + echo_targets: BTreeSet, + }, +} + /** A type representing a single round of a protocol. @@ -400,6 +419,21 @@ pub trait Round: 'static + Debug + Send + Sync { /// for each element of the returned set. fn message_destinations(&self) -> &BTreeSet; + /// Returns the set of node IDs from which this round expects messages. + /// + /// The execution layer will not call [`finalize`](`Self::finalize`) until all these nodes have responded + /// (and the corresponding [`receive_message`](`Self::receive_message`) finished successfully). + fn expecting_messages_from(&self) -> &BTreeSet; + + /// Returns the specific way the node participates in the echo round following this round. + /// + /// Returns [`EchoRoundParticipation::Default`] by default; this works fine when every node + /// sends messages to every other one, or do not send or receive any echo broadcasts. + /// Otherwise, review the options in [`EchoRoundParticipation`] and pick the appropriate one. + fn echo_round_participation(&self) -> EchoRoundParticipation { + EchoRoundParticipation::Default + } + /// Returns the direct message to the given destination and (maybe) an accompanying artifact. /// /// Return [`DirectMessage::none`] if this round does not send direct messages. @@ -472,10 +506,4 @@ pub trait Round: 'static + Debug + Send + Sync { payloads: BTreeMap, artifacts: BTreeMap, ) -> Result, FinalizeError>; - - /// Returns the set of node IDs from which this round expects messages. - /// - /// The execution layer will not call [`finalize`](`Self::finalize`) until all these nodes have responded - /// (and the corresponding [`receive_message`](`Self::receive_message`) finished successfully). - fn expecting_messages_from(&self) -> &BTreeSet; } diff --git a/manul/src/session/echo.rs b/manul/src/session/echo.rs index c4fbcb9..a57117c 100644 --- a/manul/src/session/echo.rs +++ b/manul/src/session/echo.rs @@ -12,7 +12,7 @@ use tracing::debug; use super::{ message::{MessageVerificationError, SignedMessagePart}, - session::SessionParameters, + session::{EchoRoundInfo, SessionParameters}, LocalError, }; use crate::{ @@ -77,8 +77,7 @@ pub(crate) struct EchoRoundMessage { pub struct EchoRound { verifier: SP::Verifier, echo_broadcasts: BTreeMap>, - destinations: BTreeSet, - expected_echos: BTreeSet, + echo_round_info: EchoRoundInfo, main_round: BoxedRound, payloads: BTreeMap, artifacts: BTreeMap, @@ -93,25 +92,19 @@ where verifier: SP::Verifier, my_echo_broadcast: SignedMessagePart, echo_broadcasts: BTreeMap>, + echo_round_info: EchoRoundInfo, main_round: BoxedRound, payloads: BTreeMap, artifacts: BTreeMap, ) -> Self { - let destinations = echo_broadcasts.keys().cloned().collect::>(); - - // Add our own echo message because we expect it to be sent back from other nodes. - let mut expected_echos = destinations.clone(); - expected_echos.insert(verifier.clone()); - let mut echo_broadcasts = echo_broadcasts; echo_broadcasts.insert(verifier.clone(), my_echo_broadcast); - debug!("{:?}: initialized echo round with {:?}", verifier, destinations); + debug!("{:?}: initialized echo round with {:?}", verifier, echo_round_info); Self { verifier, echo_broadcasts, - destinations, - expected_echos, + echo_round_info, main_round, payloads, artifacts, @@ -154,7 +147,7 @@ where } fn message_destinations(&self) -> &BTreeSet { - &self.destinations + &self.echo_round_info.message_destinations } fn make_normal_broadcast( @@ -180,7 +173,7 @@ where } fn expecting_messages_from(&self) -> &BTreeSet { - &self.destinations + &self.echo_round_info.expecting_messages_from } fn receive_message( @@ -199,16 +192,14 @@ where let message = normal_broadcast.deserialize::>(deserializer)?; - // Check that the received message contains entries from `destinations` sans `from` + // Check that the received message contains entries from `expected_echos`. // It is an unprovable fault. - let mut expected_keys = self.expected_echos.clone(); - if !expected_keys.remove(from) { - return Err(ReceiveError::local(format!( - "The message sender {from:?} is missing from the expected senders {:?}", - self.destinations - ))); - } + let mut expected_keys = self.echo_round_info.expected_echos.clone(); + + // We don't expect the node to send its echo the second time. + expected_keys.remove(from); + let message_keys = message.echo_broadcasts.keys().cloned().collect::>(); let missing_keys = expected_keys.difference(&message_keys).collect::>(); diff --git a/manul/src/session/session.rs b/manul/src/session/session.rs index db81183..ab1a27c 100644 --- a/manul/src/session/session.rs +++ b/manul/src/session/session.rs @@ -23,9 +23,9 @@ use super::{ LocalError, RemoteError, }; use crate::protocol::{ - Artifact, BoxedRound, Deserializer, DirectMessage, EchoBroadcast, EntryPoint, FinalizeError, FinalizeOutcome, - NormalBroadcast, PartyId, Payload, Protocol, ProtocolMessagePart, ReceiveError, ReceiveErrorType, RoundId, - Serializer, + Artifact, BoxedRound, Deserializer, DirectMessage, EchoBroadcast, EchoRoundParticipation, EntryPoint, + FinalizeError, FinalizeOutcome, NormalBroadcast, PartyId, Payload, Protocol, ProtocolMessagePart, ReceiveError, + ReceiveErrorType, RoundId, Serializer, }; /// A set of types needed to execute a session. @@ -97,6 +97,13 @@ impl AsRef<[u8]> for SessionId { } } +#[derive(Debug)] +pub(crate) struct EchoRoundInfo { + pub(crate) message_destinations: BTreeSet, + pub(crate) expecting_messages_from: BTreeSet, + pub(crate) expected_echos: BTreeSet, +} + /// An object encapsulating the currently active round, transport protocol, /// and the database of messages and errors from the previous rounds. #[derive(Debug)] @@ -108,6 +115,7 @@ pub struct Session { deserializer: Deserializer, round: BoxedRound, message_destinations: BTreeSet, + echo_round_info: Option>, echo_broadcast: SignedMessagePart, normal_broadcast: SignedMessagePart, possible_next_rounds: BTreeSet, @@ -177,10 +185,36 @@ where let message_destinations = round.as_ref().message_destinations().clone(); - let possible_next_rounds = if echo_broadcast.payload().is_none() { - round.as_ref().possible_next_rounds() - } else { + let echo_round_participation = round.as_ref().echo_round_participation(); + + let round_sends_echo_broadcast = !echo_broadcast.payload().is_none(); + let echo_round_info = match echo_round_participation { + EchoRoundParticipation::Default => { + if round_sends_echo_broadcast { + // Add our own echo message to the expected list because we expect it to be sent back from other nodes. + let mut expected_echos = round.as_ref().expecting_messages_from().clone(); + expected_echos.insert(verifier.clone()); + Some(EchoRoundInfo { + message_destinations: message_destinations.clone(), + expecting_messages_from: message_destinations.clone(), + expected_echos, + }) + } else { + None + } + } + EchoRoundParticipation::Send => None, + EchoRoundParticipation::Receive { echo_targets } => Some(EchoRoundInfo { + message_destinations: echo_targets.clone(), + expecting_messages_from: echo_targets, + expected_echos: round.as_ref().expecting_messages_from().clone(), + }), + }; + + let possible_next_rounds = if echo_round_info.is_some() { BTreeSet::from([round.id().echo()]) + } else { + round.as_ref().possible_next_rounds() }; Ok(Self { @@ -194,6 +228,7 @@ where normal_broadcast, possible_next_rounds, message_destinations, + echo_round_info, transcript, }) } @@ -455,13 +490,12 @@ where accum.still_have_not_sent_messages, )?; - let echo_round_needed = !self.echo_broadcast.payload().is_none(); - - if echo_round_needed { + if let Some(echo_round_info) = self.echo_round_info { let round = BoxedRound::new_dynamic(EchoRound::::new( verifier, self.echo_broadcast, transcript.echo_broadcasts(round_id)?, + echo_round_info, self.round, accum.payloads, accum.artifacts, diff --git a/manul/src/tests.rs b/manul/src/tests.rs new file mode 100644 index 0000000..310f393 --- /dev/null +++ b/manul/src/tests.rs @@ -0,0 +1 @@ +mod partial_echo; diff --git a/manul/src/tests/partial_echo.rs b/manul/src/tests/partial_echo.rs new file mode 100644 index 0000000..4408b9c --- /dev/null +++ b/manul/src/tests/partial_echo.rs @@ -0,0 +1,232 @@ +use alloc::{ + collections::{BTreeMap, BTreeSet}, + format, + string::String, + vec, + vec::Vec, +}; +use core::fmt::Debug; + +use rand_core::{CryptoRngCore, OsRng}; +use serde::{Deserialize, Serialize}; +use tracing_subscriber::EnvFilter; + +use crate::{ + protocol::*, + session::{signature::Keypair, SessionOutcome}, + testing::{run_sync, BinaryFormat, TestSessionParams, TestSigner, TestVerifier}, +}; + +#[derive(Debug)] +struct PartialEchoProtocol; + +impl Protocol for PartialEchoProtocol { + type Result = (); + type ProtocolError = PartialEchoProtocolError; + type CorrectnessProof = (); +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PartialEchoProtocolError; + +impl ProtocolError for PartialEchoProtocolError { + fn description(&self) -> String { + format!("{:?}", self) + } + + fn verify_messages_constitute_error( + &self, + _deserializer: &Deserializer, + _echo_broadcast: &EchoBroadcast, + _normal_broadcast: &NormalBroadcast, + _direct_message: &DirectMessage, + _echo_broadcasts: &BTreeMap, + _normal_broadcasts: &BTreeMap, + _direct_messages: &BTreeMap, + _combined_echos: &BTreeMap>, + ) -> Result<(), ProtocolValidationError> { + unimplemented!() + } +} + +#[derive(Debug, Clone)] +struct Inputs { + message_destinations: Vec, + expecting_messages_from: Vec, + echo_round_participation: EchoRoundParticipation, +} + +#[derive(Debug)] +struct Round1 { + id: Id, + message_destinations: BTreeSet, + expecting_messages_from: BTreeSet, + echo_round_participation: EchoRoundParticipation, +} + +#[derive(Debug, Serialize, Deserialize)] +struct Round1Echo { + sender: Id, +} + +impl Deserialize<'de>> EntryPoint for Round1 { + type Inputs = Inputs; + type Protocol = PartialEchoProtocol; + fn new( + _rng: &mut impl CryptoRngCore, + _shared_randomness: &[u8], + id: Id, + inputs: Self::Inputs, + ) -> Result, LocalError> { + let message_destinations = BTreeSet::from_iter(inputs.message_destinations); + let expecting_messages_from = BTreeSet::from_iter(inputs.expecting_messages_from); + Ok(BoxedRound::new_dynamic(Self { + id, + message_destinations, + expecting_messages_from, + echo_round_participation: inputs.echo_round_participation, + })) + } +} + +impl Deserialize<'de>> Round for Round1 { + type Protocol = PartialEchoProtocol; + + fn id(&self) -> RoundId { + RoundId::new(1) + } + + fn possible_next_rounds(&self) -> BTreeSet { + BTreeSet::new() + } + + fn message_destinations(&self) -> &BTreeSet { + &self.message_destinations + } + + fn expecting_messages_from(&self) -> &BTreeSet { + &self.expecting_messages_from + } + + fn echo_round_participation(&self) -> EchoRoundParticipation { + self.echo_round_participation.clone() + } + + fn make_echo_broadcast( + &self, + _rng: &mut impl CryptoRngCore, + serializer: &Serializer, + ) -> Result { + if self.message_destinations.is_empty() { + Ok(EchoBroadcast::none()) + } else { + EchoBroadcast::new( + serializer, + Round1Echo { + sender: self.id.clone(), + }, + ) + } + } + + fn receive_message( + &self, + _rng: &mut impl CryptoRngCore, + deserializer: &Deserializer, + from: &Id, + echo_broadcast: EchoBroadcast, + normal_broadcast: NormalBroadcast, + direct_message: DirectMessage, + ) -> Result> { + normal_broadcast.assert_is_none()?; + direct_message.assert_is_none()?; + + if self.expecting_messages_from.is_empty() { + echo_broadcast.assert_is_none()?; + } else { + let echo = echo_broadcast.deserialize::>(deserializer)?; + assert_eq!(&echo.sender, from); + assert!(self.expecting_messages_from.contains(from)); + } + + Ok(Payload::new(())) + } + + fn finalize( + self, + _rng: &mut impl CryptoRngCore, + _payloads: BTreeMap, + _artifacts: BTreeMap, + ) -> Result, FinalizeError> { + Ok(FinalizeOutcome::Result(())) + } +} + +#[test] +fn partial_echo() { + let signers = (0..5).map(TestSigner::new).collect::>(); + let ids = signers.iter().map(|signer| signer.verifying_key()).collect::>(); + + // Nodes 0, 1 send an echo broadcast to nodes 1, 2, 3 + // The echo round happens between the nodes 1, 2, 3 + // Node 0 only sends the broadcasts, but doesn't receive any, so it skips the echo round + // Node 4 doesn't send or receive any broadcasts, so it skips the echo round + + let node0 = ( + signers[0], + Inputs { + message_destinations: [ids[1], ids[2], ids[3]].into(), + expecting_messages_from: [].into(), + echo_round_participation: EchoRoundParticipation::Send, + }, + ); + let node1 = ( + signers[1], + Inputs { + message_destinations: [ids[2], ids[3]].into(), + expecting_messages_from: [ids[0]].into(), + echo_round_participation: EchoRoundParticipation::Default, + }, + ); + let node2 = ( + signers[2], + Inputs { + message_destinations: [].into(), + expecting_messages_from: [ids[0], ids[1]].into(), + echo_round_participation: EchoRoundParticipation::Receive { + echo_targets: BTreeSet::from([ids[1], ids[3]]), + }, + }, + ); + let node3 = ( + signers[3], + Inputs { + message_destinations: [].into(), + expecting_messages_from: [ids[0], ids[1]].into(), + echo_round_participation: EchoRoundParticipation::Receive { + echo_targets: BTreeSet::from([ids[1], ids[2]]), + }, + }, + ); + let node4 = ( + signers[4], + Inputs { + message_destinations: [].into(), + expecting_messages_from: [].into(), + echo_round_participation: EchoRoundParticipation::::Default, + }, + ); + + let inputs = vec![node0, node1, node2, node3, node4]; + + let my_subscriber = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .finish(); + let reports = tracing::subscriber::with_default(my_subscriber, || { + run_sync::, TestSessionParams>(&mut OsRng, inputs).unwrap() + }); + + for (_id, report) in reports { + assert!(matches!(report.outcome, SessionOutcome::Result(_))); + } +}