Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(consensus): add documentation to consensus crate #2626

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
#![warn(missing_docs)]
// TODO(Matan): Add a description of the crate.
// TODO(Matan): Add links to the spec.
// TODO(Matan): fix #[allow(missing_docs)].
//! A consensus implementation for a [`Starknet`](https://www.starknet.io/) node.
//! A consensus implementation for a [Starknet](https://www.starknet.io/) node. The consensus
//! algorithm is based on [Tendermint](https://arxiv.org/pdf/1807.04938).
//!
//! Consensus communicates with other nodes via a gossip network; sending and receiving votes on one
//! topic and streaming proposals on a separate topic. [details](https://github.com/starknet-io/starknet-p2p-specs/tree/main/p2p/proto/consensus).
//!
//! In addition to the network inputs, consensus reaches out to the rest of the node via the
//! [`Context`](types::ConsensusContext) API.
//!
//! Consensus is generic over the content of the proposals, and merely requires an identifier to be
//! produced by the Context.
//!
//! Consensus operates in two modes:
//! 1. Observer - Receives consensus messages and updates the node when a decision is reached.
//! 2. Active - In addition to receiving messages, the node can also send messages to the network.
//!
//! Observer mode offers lower latency compared to sync, as Proposals and votes are processed in
//! real-time rather than after a decision has been made.
//!
//! Consensus is an active component, it doesn't follow the server/client model:
//! 1. The outbound messages are not sent as responses to the inbound messages.
//! 2. It generates and runs its own events (e.g. timeouts).

pub mod config;
pub mod manager;
#[allow(missing_docs)]
pub mod types;
pub use manager::run_consensus;
#[allow(missing_docs)]
pub mod simulation_network_receiver;
pub mod stream_handler;

mod manager;
#[allow(missing_docs)]
pub mod single_height_consensus;
mod single_height_consensus;
#[allow(missing_docs)]
pub mod state_machine;
pub mod stream_handler;
mod state_machine;
#[cfg(test)]
pub(crate) mod test_utils;
#[allow(missing_docs)]
pub mod types;

pub use manager::run_consensus;
51 changes: 42 additions & 9 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
//! Consensus manager, see Manager struct.
//! Top level of consensus, used to run multiple heights of consensus.
//!
//! [`run_consensus`] - This is the primary entrypoint for running the consensus component.
//!
//! [`MultiHeightManager`] - Runs consensus repeatedly across different heights using
//! [`run_height`](MultiHeightManager::run_height).

#[cfg(test)]
#[path = "manager_test.rs"]
Expand Down Expand Up @@ -28,7 +33,25 @@ use crate::types::{
ValidatorId,
};

/// Run consensus indefinitely.
///
/// If a decision is reached via consensus the context is updated. If a decision is learned via the
/// sync protocol, consensus silently moves on to the next height.
///
/// Inputs:
/// - `context`: The API for consensus to reach out to the rest of the node.
/// - `start_active_height`: The height at which the node may participate in consensus (if it is a
/// validator).
/// - `start_observe_height`: The height at which the node begins to run consensus.
/// - `validator_id`: The ID of this node.
/// - `consensus_delay`: delay before starting consensus; allowing the network to connect to peers.
/// - `timeouts`: The timeouts for the consensus algorithm.
/// - `vote_receiver`: The channels to receive votes from the network. These are self contained
/// messages.
/// - `proposal_receiver`: The channel to receive proposals from the network. Proposals are
/// represented as streams (ProposalInit, Content.*, ProposalFin).
// TODO(dvir): add test for this.
// TODO(Asmaa): Update documentation when we update for the real sync.
#[instrument(skip_all, level = "info")]
#[allow(missing_docs)]
#[allow(clippy::too_many_arguments)]
Expand All @@ -39,8 +62,8 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
validator_id: ValidatorId,
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut broadcast_channels: BroadcastConsensusMessageChannel,
mut inbound_proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut vote_receiver: BroadcastConsensusMessageChannel,
mut proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
Expand Down Expand Up @@ -72,8 +95,8 @@ where
&mut context,
current_height,
is_observer,
&mut broadcast_channels,
&mut inbound_proposal_receiver,
&mut vote_receiver,
&mut proposal_receiver,
&mut sync_receiver,
)
.await?
Expand Down Expand Up @@ -115,7 +138,7 @@ struct MultiHeightManager<ContextT: ConsensusContext> {

impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Create a new consensus manager.
pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
pub(crate) fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self {
validator_id,
cached_messages: BTreeMap::new(),
Expand All @@ -126,10 +149,20 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
/// A height of consensus ends either when the node learns of a decision, either by consensus
/// directly or via the sync protocol.
/// - An error implies that consensus cannot continue, not just that the current height failed.
///
/// This is the "top level" task of consensus, which is able to multiplex across activities:
/// network messages and self generated events.
///
/// Assumes that `height` is monotonically increasing across calls.
///
/// Inputs - see [`run_consensus`].
/// - `is_observer`: Whether the node must observe or if it is allowed to be active (assuming it
/// is in the validator set).
#[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")]
pub async fn run_height<SyncReceiverT>(
pub(crate) async fn run_height<SyncReceiverT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@ use starknet_api::block::BlockHash;
use starknet_api::core::{ContractAddress, PatriciaKey};
use tracing::{debug, instrument};

/// Receiver used to help run simulations of consensus. It has 2 goals in mind:
/// 1. Simulate network failures.
/// 2. Make tests repeatable - This is challenging because simulations involve a noisy environment;
/// so the actual network issues experienced may differ between 2 test runs.
/// - We expect simulations to use fairly reliable networks. That means messages arriving in
/// different order between runs will make up most of the actual noise between runs, as
/// opposed to actual drops or corruption.
/// - Tendermint is, to a large extent, unaffected by minor network reorderings. For instance it
/// doesn't matter if prevotes arrive before or after the Proposal they are for.
/// - This struct is therefore also designed not to be overly sensistive to message order. If
/// message A was dropped by this struct in one run, it should be dropped in the rerun. This
/// is as opposed to using a stateful RNG where the random number is a function of all the
/// previous calls to the RNG.
/// Receiver which can simulate network issues in a repeatable manner. Simulates drops and network
/// corruption. The errors are meant to be repeatable regardless of the order of messages received.
///
/// Being indifferent to the order of messages on the network means that we don't have a state which
/// changes across all messages. If we were truly stateless though we would treat resends of
/// messages all the same, meaning that a dropped message would always be dropped. To avoid this we
/// have the cache, which allows us to treat resends of a specific message differently.
pub struct NetworkReceiver {
pub broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
// Cache is used so that repeat sends of a message can be processed differently. For example,
Expand All @@ -43,6 +37,14 @@ pub struct NetworkReceiver {
}

impl NetworkReceiver {
/// Creates a new NetworkReceiver.
///
/// Inputs:
/// - `broadcasted_messages_receiver`: The receiver to listen to.
/// - `cache_size`: Determines the size of the cache. A small cache risks acting the same across
/// resends of a given message. /// - `seed`: Seed for the random number generator.
/// - `drop_probability`: Probability of dropping a message [0, 1].
/// - `invalid_probability`: Probability of making a message invalid [0, 1].
pub fn new(
broadcasted_messages_receiver: BroadcastTopicServer<ConsensusMessage>,
cache_size: usize,
Expand Down
28 changes: 24 additions & 4 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//! Run a single height of consensus.
//!
//! [`SingleHeightConsensus`] (SHC) - run consensus for a single height.
//!
//! [`ShcTask`] - a task which should be run without blocking consensus.
//!
//! [`ShcEvent`] - an event, generated from an `ShcTask` which should be handled by the SHC.

#[cfg(test)]
#[path = "single_height_consensus_test.rs"]
mod single_height_consensus_test;
Expand All @@ -24,13 +32,16 @@ use crate::types::{
ValidatorId,
};

/// The SHC can either update the manager of a decision or return tasks that should be run without
/// blocking further calls to itself.
#[derive(Debug, PartialEq)]
#[cfg_attr(test, derive(EnumAsInner))]
pub enum ShcReturn {
Tasks(Vec<ShcTask>),
Decision(Decision),
}

/// Events produced from tasks for the SHC to handle.
#[derive(Debug, Clone)]
pub enum ShcEvent {
TimeoutPropose(StateMachineEvent),
Expand All @@ -43,6 +54,7 @@ pub enum ShcEvent {
ValidateProposal(StateMachineEvent, Option<ProposalFin>),
}

/// A task which should be run without blocking calls to SHC.
#[derive(Debug)]
#[cfg_attr(test, derive(EnumAsInner))]
pub enum ShcTask {
Expand Down Expand Up @@ -135,10 +147,18 @@ impl ShcTask {
}
}

/// Struct which represents a single height of consensus. Each height is expected to be begun with a
/// call to `start`, which is relevant if we are the proposer for this height's first round.
/// SingleHeightConsensus receives messages directly as parameters to function calls. It can send
/// out messages "directly" to the network, and returning a decision to the caller.
/// Represents a single height of consensus. It is responsible for mapping between the idealized
/// view of consensus represented in the StateMachine and the real world implementation.
///
/// Example:
/// - Timeouts: the SM returns an event timeout, but SHC then maps that to a task which can be run
/// by the Manager. The manager though unaware of the specific task as it has minimal consensus
/// logic.
///
/// Each height is begun with a call to `start`, with no further calls to it.
///
/// SHC is not a top level task, it is called directly and returns values (doesn't directly run sub
/// tasks). SHC does have side effects, such as sending messages to the network via the context.
pub(crate) struct SingleHeightConsensus {
height: BlockNumber,
validators: Vec<ValidatorId>,
Expand Down
11 changes: 6 additions & 5 deletions crates/sequencing/papyrus_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::types::{ProposalContentId, Round, ValidatorId};
pub enum StateMachineEvent {
/// Sent by the state machine when a block is required to propose (ProposalContentId is always
/// None). While waiting for the response of GetProposal, the state machine will buffer all
/// other events. The caller must respond with a valid proposal id for this height to the
/// other events. The caller *must* respond with a valid proposal id for this height to the
/// state machine, and the same round sent out.
GetProposal(Option<ProposalContentId>, Round),
/// Consensus message, can be both sent from and to the state machine.
Expand Down Expand Up @@ -48,9 +48,10 @@ pub enum Step {
}

/// State Machine. Major assumptions:
/// 1. SHC handles replays and conflicts.
/// 1. SHC handles: authentication, replays, and conflicts.
/// 2. SM must handle "out of order" messages (E.g. vote arrives before proposal).
/// 3. No network failures.
///
/// Each height is begun with a call to `start`, with no further calls to it.
pub struct StateMachine {
id: ValidatorId,
round: Round,
Expand Down Expand Up @@ -117,8 +118,8 @@ impl StateMachine {

/// Process the incoming event.
///
/// If we are waiting for a response to `GetProposal` all other incoming events are buffered
/// until that response arrives.
/// If we are waiting for a response to [`GetProposal`](`StateMachineEvent::GetProposal`) all
/// other incoming events are buffered until that response arrives.
///
/// Returns a set of events for the caller to handle. The caller should not mirror the output
/// events back to the state machine, as it makes sure to handle them before returning.
Expand Down
4 changes: 4 additions & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Types for interfacing between consensus and the node.

use std::fmt::Debug;
use std::time::Duration;

Expand Down Expand Up @@ -28,6 +30,8 @@ pub type ProposalContentId = BlockHash;
pub const DEFAULT_VALIDATOR_ID: u64 = 100;

/// Interface for consensus to call out to the node.
///
/// Function calls should be assumed to not be cancel safe.
#[async_trait]
pub trait ConsensusContext {
/// The parts of the proposal that are streamed in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use starknet_batcher_types::batcher_types::{
use starknet_batcher_types::communication::BatcherClient;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, debug_span, error, info, trace, warn, Instrument};
use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument};

// TODO(Dan, Matan): Remove this once and replace with real gas prices.
const TEMPORARY_GAS_PRICES: GasPrices = GasPrices {
Expand Down
Loading