Skip to content

Commit

Permalink
Docs and tweaks (#40)
Browse files Browse the repository at this point in the history
* Multiple documentation additions and fixes
* Better names for `testing` mocks
* `Debug` impls for public items
  • Loading branch information
dvdplm authored Oct 24, 2024
1 parent 5cfc6c1 commit cda1d26
Show file tree
Hide file tree
Showing 22 changed files with 244 additions and 141 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
/target
Cargo.lock
*.sublime-project
*.sublime-workspace
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Session` is now generic over `SessionParameters` instead of a bunch of separate types. ([#36])
- `MessageBundle` is not generic anymore. ([#36])
- `ProcessedArtifact` is now also generic on `SessionParameters`. ([#37])
- Added a `Test` prefix to `testing::Signer`/`Verifier`/`Signature`/`Hasher` and renamed `TestingSessionParams` to `TestSessionParams`. ([#40])


### Added
Expand All @@ -20,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#32]: https://github.com/entropyxyz/manul/pull/32
[#36]: https://github.com/entropyxyz/manul/pull/36
[#37]: https://github.com/entropyxyz/manul/pull/37
[#40]: https://github.com/entropyxyz/manul/pull/40


## [0.0.1] - 2024-10-12
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ version = "0.0.0"
edition = "2021"
authors = ['Entropy Cryptography <[email protected]>']
license = "MIT"
description = "Examples of usage for `manul` crate"
description = "Usage examples for the `manul` crate"
repository = "https://github.com/entropyxyz/manul/examples"
readme = "README.md"

Expand Down
8 changes: 6 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Manul examples

This crate illustrates the usage of `manul` for implementing distributed protocols.

The library itself is the perspective of the protocol implementor, where they create a set of `Round` impls and write unit-tests for them.
The library itself takes the perspective of the protocol implementor, as they create a set of `Round` impls and write unit-tests for them.

The integration tests are written from the perspective of the protocol user, emulating an asynchronous execution of the protocol on multiple nodes.

The integration tests are the perspective of the protocol user, emulating an asynchronous execution of the protocol on multiple nodes.
To run the example, execute: `RUST_LOG=debug cargo t -p manul-example --test async_runner`
9 changes: 6 additions & 3 deletions examples/src/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,14 @@ pub struct Inputs<Id> {
pub all_ids: BTreeSet<Id>,
}

#[derive(Debug)]
pub(crate) struct Context<Id> {
pub(crate) id: Id,
pub(crate) other_ids: BTreeSet<Id>,
pub(crate) ids_to_positions: BTreeMap<Id, u8>,
}

#[derive(Debug)]
pub struct Round1<Id> {
pub(crate) context: Context<Id>,
}
Expand Down Expand Up @@ -246,6 +248,7 @@ impl<Id: 'static + Debug + Clone + Ord + Send + Sync> Round<Id> for Round1<Id> {
}
}

#[derive(Debug)]
pub(crate) struct Round2<Id> {
round1_sum: u8,
pub(crate) context: Context<Id>,
Expand Down Expand Up @@ -356,7 +359,7 @@ mod tests {

use manul::{
session::{signature::Keypair, SessionOutcome},
testing::{run_sync, Signer, TestingSessionParams, Verifier},
testing::{run_sync, TestSessionParams, TestSigner, TestVerifier},
};
use rand_core::OsRng;
use tracing_subscriber::EnvFilter;
Expand All @@ -365,7 +368,7 @@ mod tests {

#[test]
fn round() {
let signers = (0..3).map(Signer::new).collect::<Vec<_>>();
let signers = (0..3).map(TestSigner::new).collect::<Vec<_>>();
let all_ids = signers
.iter()
.map(|signer| signer.verifying_key())
Expand All @@ -386,7 +389,7 @@ mod tests {
.with_env_filter(EnvFilter::from_default_env())
.finish();
let reports = tracing::subscriber::with_default(my_subscriber, || {
run_sync::<Round1<Verifier>, TestingSessionParams>(&mut OsRng, inputs).unwrap()
run_sync::<Round1<TestVerifier>, TestSessionParams>(&mut OsRng, inputs).unwrap()
});

for (_id, report) in reports {
Expand Down
16 changes: 9 additions & 7 deletions examples/src/simple_malicious.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use manul::{
Artifact, DirectMessage, FinalizeError, FinalizeOutcome, FirstRound, LocalError, Payload, Round, SessionId,
},
session::signature::Keypair,
testing::{round_override, run_sync, RoundOverride, RoundWrapper, Signer, TestingSessionParams, Verifier},
testing::{round_override, run_sync, RoundOverride, RoundWrapper, TestSessionParams, TestSigner, TestVerifier},
};
use rand_core::{CryptoRngCore, OsRng};
use tracing_subscriber::EnvFilter;
Expand All @@ -26,6 +26,7 @@ struct MaliciousInputs<Id> {
behavior: Behavior,
}

#[derive(Debug)]
struct MaliciousRound1<Id> {
round: Round1<Id>,
behavior: Behavior,
Expand Down Expand Up @@ -105,6 +106,7 @@ impl<Id: 'static + Debug + Clone + Ord + Send + Sync> RoundOverride<Id> for Mali

round_override!(MaliciousRound1);

#[derive(Debug)]
struct MaliciousRound2<Id> {
round: Round2<Id>,
behavior: Behavior,
Expand Down Expand Up @@ -143,7 +145,7 @@ round_override!(MaliciousRound2);

#[test]
fn serialized_garbage() {
let signers = (0..3).map(Signer::new).collect::<Vec<_>>();
let signers = (0..3).map(TestSigner::new).collect::<Vec<_>>();
let all_ids = signers
.iter()
.map(|signer| signer.verifying_key())
Expand Down Expand Up @@ -172,7 +174,7 @@ fn serialized_garbage() {
.with_env_filter(EnvFilter::from_default_env())
.finish();
let mut reports = tracing::subscriber::with_default(my_subscriber, || {
run_sync::<MaliciousRound1<Verifier>, TestingSessionParams>(&mut OsRng, run_inputs).unwrap()
run_sync::<MaliciousRound1<TestVerifier>, TestSessionParams>(&mut OsRng, run_inputs).unwrap()
});

let v0 = signers[0].verifying_key();
Expand All @@ -189,7 +191,7 @@ fn serialized_garbage() {

#[test]
fn attributable_failure() {
let signers = (0..3).map(Signer::new).collect::<Vec<_>>();
let signers = (0..3).map(TestSigner::new).collect::<Vec<_>>();
let all_ids = signers
.iter()
.map(|signer| signer.verifying_key())
Expand Down Expand Up @@ -218,7 +220,7 @@ fn attributable_failure() {
.with_env_filter(EnvFilter::from_default_env())
.finish();
let mut reports = tracing::subscriber::with_default(my_subscriber, || {
run_sync::<MaliciousRound1<Verifier>, TestingSessionParams>(&mut OsRng, run_inputs).unwrap()
run_sync::<MaliciousRound1<TestVerifier>, TestSessionParams>(&mut OsRng, run_inputs).unwrap()
});

let v0 = signers[0].verifying_key();
Expand All @@ -235,7 +237,7 @@ fn attributable_failure() {

#[test]
fn attributable_failure_round2() {
let signers = (0..3).map(Signer::new).collect::<Vec<_>>();
let signers = (0..3).map(TestSigner::new).collect::<Vec<_>>();
let all_ids = signers
.iter()
.map(|signer| signer.verifying_key())
Expand Down Expand Up @@ -264,7 +266,7 @@ fn attributable_failure_round2() {
.with_env_filter(EnvFilter::from_default_env())
.finish();
let mut reports = tracing::subscriber::with_default(my_subscriber, || {
run_sync::<MaliciousRound1<Verifier>, TestingSessionParams>(&mut OsRng, run_inputs).unwrap()
run_sync::<MaliciousRound1<TestVerifier>, TestSessionParams>(&mut OsRng, run_inputs).unwrap()
});

let v0 = signers[0].verifying_key();
Expand Down
79 changes: 53 additions & 26 deletions examples/tests/async.rs → examples/tests/async_runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
extern crate alloc;

use std::fmt::Debug;

use alloc::collections::{BTreeMap, BTreeSet};

use manul::{
Expand All @@ -8,16 +10,16 @@ use manul::{
signature::Keypair, CanFinalize, LocalError, MessageBundle, RoundOutcome, Session, SessionId,
SessionParameters, SessionReport,
},
testing::{Signer, TestingSessionParams, Verifier},
testing::{TestSessionParams, TestSigner},
};
use manul_example::simple::{Inputs, Round1};
use manul_example::simple::{Inputs, Round1, SimpleProtocol};
use rand::Rng;
use rand_core::OsRng;
use tokio::{
sync::mpsc,
time::{sleep, Duration},
};
use tracing::debug;
use tracing::{debug, trace};
use tracing_subscriber::{util::SubscriberInitExt, EnvFilter};

struct MessageOut<SP: SessionParameters> {
Expand All @@ -31,24 +33,38 @@ struct MessageIn<SP: SessionParameters> {
message: MessageBundle,
}

/// Runs a session. Simulates what each participating party would run as the protocol progresses.
async fn run_session<P, SP>(
tx: mpsc::Sender<MessageOut<SP>>,
rx: mpsc::Receiver<MessageIn<SP>>,
session: Session<P, SP>,
) -> Result<SessionReport<P, SP>, LocalError>
where
P: 'static + Protocol,
SP: 'static + SessionParameters,
SP: 'static + SessionParameters + Debug,
{
let rng = &mut OsRng;

let mut rx = rx;

let mut session = session;
// Some rounds can finalize early and put off sending messages to the next round. Such messages
// will be stored here and applied after the messages for this round are sent.
let mut cached_messages = Vec::new();

let key = session.verifier();

// Each iteration of the loop progresses the session as follows:
// - Send out messages as dictated by the session "destinations".
// - Apply any cached messages.
// - Enter a nested loop:
// - Try to finalize the session; if we're done, exit the inner loop.
// - Wait until we get an incoming message.
// - Process the message we received and continue the loop.
// - When all messages have been sent and received as specified by the protocol, finalize the
// round.
// - If the protocol outcome is a new round, go to the top of the loop and start over with a
// new session.
loop {
debug!("{key:?}: *** starting round {:?} ***", session.round_id());

Expand All @@ -67,7 +83,7 @@ where
// and the artifact will be sent back to the host task
// to be added to the accumulator.
let (message, artifact) = session.make_message(rng, destination)?;
debug!("{key:?}: sending a message to {destination:?}",);
debug!("{key:?}: Sending a message to {destination:?}",);
tx.send(MessageOut {
from: key.clone(),
to: destination.clone(),
Expand All @@ -76,16 +92,16 @@ where
.await
.unwrap();

// This will happen in a host task
// This would happen in a host task
session.add_artifact(&mut accum, artifact)?;
}

for preprocessed in cached_messages {
// In production usage, this will happen in a spawned task.
debug!("{key:?}: applying a cached message");
// In production usage, this would happen in a spawned task and relayed back to the main task.
debug!("{key:?}: Applying a cached message");
let processed = session.process_message(rng, preprocessed);

// This will happen in a host task.
// This would happen in a host task.
session.add_processed_message(&mut accum, processed)?;
}

Expand All @@ -96,26 +112,31 @@ where
// Due to already registered invalid messages from nodes,
// even if the remaining nodes send correct messages, it won't be enough.
// Terminating.
CanFinalize::Never => return session.terminate(accum),
CanFinalize::Never => {
tracing::warn!("{key:?}: This session cannot ever be finalized. Terminating.");
return session.terminate(accum);
}
}

debug!("{key:?}: waiting for a message");
debug!("{key:?}: Waiting for a message");
let incoming = rx.recv().await.unwrap();

// Perform quick checks before proceeding with the verification.
let preprocessed = session.preprocess_message(&mut accum, &incoming.from, incoming.message)?;

if let Some(preprocessed) = preprocessed {
// In production usage, this will happen in a spawned task.
debug!("{key:?}: applying a message from {:?}", incoming.from);
let processed = session.process_message(rng, preprocessed);

// This will happen in a host task.
session.add_processed_message(&mut accum, processed)?;
match session.preprocess_message(&mut accum, &incoming.from, incoming.message)? {
Some(preprocessed) => {
// In production usage, this would happen in a separate task.
debug!("{key:?}: Applying a message from {:?}", incoming.from);
let processed = session.process_message(rng, preprocessed);
// In production usage, this would be a host task.
session.add_processed_message(&mut accum, processed)?;
}
None => {
trace!("{key:?} Pre-processing complete. Current state: {accum:?}")
}
}
}

debug!("{key:?}: finalizing the round");
debug!("{key:?}: Finalizing the round");

match session.finalize_round(rng, accum)? {
RoundOutcome::Finished(report) => break Ok(report),
Expand Down Expand Up @@ -176,7 +197,7 @@ async fn message_dispatcher<SP>(
async fn run_nodes<P, SP>(sessions: Vec<Session<P, SP>>) -> Vec<SessionReport<P, SP>>
where
P: 'static + Protocol + Send,
SP: 'static + SessionParameters,
SP: 'static + SessionParameters + Debug,
P::Result: Send,
SP::Signer: Send,
{
Expand Down Expand Up @@ -204,7 +225,7 @@ where
})
.collect::<Vec<_>>();

// Drop the last copy of the dispatcher's incoming channel so that it could finish.
// Drop the last copy of the dispatcher's incoming channel so that it can finish.
drop(dispatcher_tx);

let mut results = Vec::with_capacity(num_parties);
Expand All @@ -219,20 +240,25 @@ where

#[tokio::test]
async fn async_run() {
let signers = (0..3).map(Signer::new).collect::<Vec<_>>();
// The kind of Session we need to run the `SimpleProtocol`.
type SimpleSession = Session<SimpleProtocol, TestSessionParams>;

// Create 4 parties
let signers = (0..3).map(TestSigner::new).collect::<Vec<_>>();
let all_ids = signers
.iter()
.map(|signer| signer.verifying_key())
.collect::<BTreeSet<_>>();
let session_id = SessionId::random(&mut OsRng);

// Create 4 `Session`s
let sessions = signers
.into_iter()
.map(|signer| {
let inputs = Inputs {
all_ids: all_ids.clone(),
};
Session::<_, TestingSessionParams>::new::<Round1<Verifier>>(&mut OsRng, session_id.clone(), signer, inputs)
.unwrap()
SimpleSession::new::<Round1<_>>(&mut OsRng, session_id.clone(), signer, inputs).unwrap()
})
.collect::<Vec<_>>();

Expand All @@ -242,5 +268,6 @@ async fn async_run() {
.try_init()
.unwrap();

// Run the protocol
run_nodes(sessions).await;
}
Loading

0 comments on commit cda1d26

Please sign in to comment.