Skip to content

Commit

Permalink
refactor(consensus): change StartRound to GetProposal and pass leader…
Browse files Browse the repository at this point in the history
…_fn to sm from shc
  • Loading branch information
asmaastarkware committed Jul 23, 2024
1 parent 6403530 commit cf6508b
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ description = "Reach consensus for Starknet"
[dependencies]
async-trait.workspace = true
futures.workspace = true
lazy_static.workspace = true
metrics.workspace = true
papyrus_common = { path = "../../papyrus_common", version = "0.4.0-dev.2" }
papyrus_network = { path = "../../papyrus_network", version = "0.4.0-dev.2" }
Expand Down
30 changes: 15 additions & 15 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) struct SingleHeightConsensus<BlockT: ConsensusBlock> {
impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
pub(crate) fn new(height: BlockNumber, id: ValidatorId, validators: Vec<ValidatorId>) -> Self {
// TODO(matan): Use actual weights, not just `len`.
let state_machine = StateMachine::new(validators.len() as u32);
let state_machine = StateMachine::new(id, validators.len() as u32);
Self {
height,
validators,
Expand All @@ -57,7 +57,10 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
context: &mut ContextT,
) -> Result<Option<Decision<BlockT>>, ConsensusError> {
info!("Starting consensus with validators {:?}", self.validators);
let events = self.state_machine.start();
let leader_fn = |_round: Round| -> ValidatorId {
context.proposer(&self.validators.clone(), self.height)
};
let events = self.state_machine.start(&leader_fn);
self.handle_state_machine_events(context, events).await
}

Expand Down Expand Up @@ -180,16 +183,17 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
while let Some(event) = events.pop_front() {
trace!("Handling event: {:?}", event);
match event {
StateMachineEvent::StartRound(block_hash, round) => {
StateMachineEvent::GetProposal(block_hash, round) => {
events.append(
&mut self
.handle_state_machine_start_round(context, block_hash, round)
.handle_state_machine_get_proposal(context, block_hash, round)
.await,
);
}
StateMachineEvent::Proposal(_, _) => {
// Ignore proposals sent by the StateMachine as SingleHeightConsensus already
// sent this out when responding to a StartRound.
// sent this out when responding to a GetProposal.
// TODO(matan): How do we handle this when validValue is set?
}
StateMachineEvent::Decision(block_hash, round) => {
return self.handle_state_machine_decision(block_hash, round).await;
Expand All @@ -208,19 +212,16 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
}

#[instrument(skip(self, context), level = "debug")]
async fn handle_state_machine_start_round<ContextT: ConsensusContext<Block = BlockT>>(
async fn handle_state_machine_get_proposal<ContextT: ConsensusContext<Block = BlockT>>(
&mut self,
context: &mut ContextT,
block_hash: Option<BlockHash>,
round: Round,
) -> VecDeque<StateMachineEvent> {
// TODO(matan): Support re-proposing validValue.
assert!(block_hash.is_none(), "Reproposing is not yet supported");
let proposer_id = context.proposer(&self.validators, self.height);
if proposer_id != self.id {
debug!("Validator");
return self.state_machine.handle_event(StateMachineEvent::StartRound(None, round));
}
assert!(
block_hash.is_none(),
"BlockHash must be None since the state machine is requesting a BlockHash"
);
debug!("Proposer");

let (p2p_messages_receiver, block_receiver) = context.build_proposal(self.height).await;
Expand All @@ -242,8 +243,7 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
let old = self.proposals.insert(round, block);
assert!(old.is_none(), "There should be no entry for this round.");

// TODO(matan): Send to the state machine and handle voting.
self.state_machine.handle_event(StateMachineEvent::StartRound(Some(id), round))
self.state_machine.handle_event(StateMachineEvent::GetProposal(Some(id), round))
}

#[instrument(skip_all)]
Expand Down
63 changes: 32 additions & 31 deletions crates/sequencing/papyrus_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,16 @@ use std::collections::{HashMap, VecDeque};
use starknet_api::block::BlockHash;
use tracing::trace;

use crate::types::Round;
use crate::types::{Round, ValidatorId};

/// Events which the state machine sends/receives.
#[derive(Debug, Clone, PartialEq)]
pub enum StateMachineEvent {
/// StartRound is effective 2 questions:
/// 1. Is the local node the proposer for this round?
/// 2. If so, what value should be proposed?
/// While waiting for the response to this event, the state machine will buffer all other
/// events.
///
/// How should the caller handle this event?
/// 1. If the local node is not the proposer, the caller responds with with `None` as the block
/// hash.
/// 2. If the local node is the proposer and a block hash was supplied by the state machine,
/// the caller responds with the supplied block hash.
/// 3. If the local node is the proposer and no block hash was supplied by the state machine,
/// the caller must find/build a block to respond with.
StartRound(Option<BlockHash>, Round),
/// Sent by the state machine when a block is required to propose (BlockHash is always None).
/// While waiting for the response of GetProposal, the state machine will buffer all other
/// events. The caller must respond with a valid block hash for this height to the state
/// machine, and the same round sent out.
GetProposal(Option<BlockHash>, Round),
/// Consensus message, can be both sent from and to the state machine.
Proposal(BlockHash, Round),
/// Consensus message, can be both sent from and to the state machine.
Expand All @@ -56,6 +47,7 @@ pub enum Step {
/// 3. Only valid proposals (e.g. no NIL)
/// 4. No network failures - together with 3 this means we only support round 0.
pub struct StateMachine {
id: ValidatorId,
round: Round,
step: Step,
quorum: u32,
Expand All @@ -65,21 +57,22 @@ pub struct StateMachine {
precommits: HashMap<Round, HashMap<BlockHash, u32>>,
// When true, the state machine will wait for a GetProposal event, buffering all other input
// events in `events_queue`.
starting_round: bool,
awaiting_get_proposal: bool,
events_queue: VecDeque<StateMachineEvent>,
}

impl StateMachine {
/// total_weight - the total voting weight of all validators for this height.
pub fn new(total_weight: u32) -> Self {
pub fn new(id: ValidatorId, total_weight: u32) -> Self {
Self {
id,
round: 0,
step: Step::Propose,
quorum: (2 * total_weight / 3) + 1,
proposals: HashMap::new(),
prevotes: HashMap::new(),
precommits: HashMap::new(),
starting_round: false,
awaiting_get_proposal: false,
events_queue: VecDeque::new(),
}
}
Expand All @@ -88,16 +81,24 @@ impl StateMachine {
self.quorum
}

/// Starts the state machine, effectively calling `StartRound(0)` from the paper. This is needed
/// to trigger the first leader to propose. See [`StartRound`](StateMachineEvent::StartRound)
pub fn start(&mut self) -> VecDeque<StateMachineEvent> {
self.starting_round = true;
VecDeque::from([StateMachineEvent::StartRound(None, self.round)])
/// Starts the state machine, effectively calling `StartRound(0)` from the paper. This is
/// needed to trigger the first leader to propose.
/// See [`GetProposal`](StateMachineEvent::GetProposal)
pub fn start(
&mut self,
leader_fn: &impl Fn(Round) -> ValidatorId,
) -> VecDeque<StateMachineEvent> {
if self.id == leader_fn(self.round) {
self.awaiting_get_proposal = true;
// TODO(matan): Support re-proposing validValue.
return VecDeque::from([StateMachineEvent::GetProposal(None, self.round)]);
}
VecDeque::from([])
}

/// Process the incoming event.
///
/// If we are waiting for a response to `StartRound` all other incoming events are buffered
/// If we are waiting for a response to `GetProposal` all other incoming events are buffered
/// until that response arrives.
///
/// Returns a set of events for the caller to handle. The caller should not mirror the output
Expand All @@ -108,9 +109,9 @@ impl StateMachine {
trace!("Handling event: {:?}", event);
// Mimic LOC 18 in the paper; the state machine doesn't
// handle any events until `getValue` completes.
if self.starting_round {
if self.awaiting_get_proposal {
match event {
StateMachineEvent::StartRound(_, round) if round == self.round => {
StateMachineEvent::GetProposal(_, round) if round == self.round => {
self.events_queue.push_front(event);
}
_ => {
Expand Down Expand Up @@ -156,8 +157,8 @@ impl StateMachine {

fn handle_event_internal(&mut self, event: StateMachineEvent) -> VecDeque<StateMachineEvent> {
match event {
StateMachineEvent::StartRound(block_hash, round) => {
self.handle_start_round(block_hash, round)
StateMachineEvent::GetProposal(block_hash, round) => {
self.handle_get_proposal(block_hash, round)
}
StateMachineEvent::Proposal(block_hash, round) => {
self.handle_proposal(block_hash, round)
Expand All @@ -174,15 +175,15 @@ impl StateMachine {
}
}

fn handle_start_round(
fn handle_get_proposal(
&mut self,
block_hash: Option<BlockHash>,
round: u32,
) -> VecDeque<StateMachineEvent> {
// TODO(matan): Will we allow other events (timeoutPropose) to exit this state?
assert!(self.starting_round);
assert!(self.awaiting_get_proposal);
assert_eq!(round, self.round);
self.starting_round = false;
self.awaiting_get_proposal = false;

let Some(hash) = block_hash else {
// Validator.
Expand Down
39 changes: 24 additions & 15 deletions crates/sequencing/papyrus_consensus/src/state_machine_test.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
use lazy_static::lazy_static;
use starknet_api::block::BlockHash;
use starknet_types_core::felt::Felt;
use test_case::test_case;

use super::Round;
use crate::state_machine::{StateMachine, StateMachineEvent};
use crate::types::ValidatorId;

lazy_static! {
static ref VALIDATOR_ID: ValidatorId = 0_u32.into();
}

const BLOCK_HASH: BlockHash = BlockHash(Felt::ONE);
const ROUND: Round = 0;

#[test_case(true; "proposer")]
#[test_case(false; "validator")]
fn events_arrive_in_ideal_order(is_proposer: bool) {
let mut state_machine = StateMachine::new(4);

let mut events = state_machine.start();
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::StartRound(None, ROUND));
let mut state_machine = StateMachine::new(*VALIDATOR_ID, 4);
let leader_fn = |_: Round| *VALIDATOR_ID;
let mut events = state_machine.start(&leader_fn);
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::GetProposal(None, ROUND));
if is_proposer {
events = state_machine.handle_event(StateMachineEvent::StartRound(Some(BLOCK_HASH), ROUND));
events =
state_machine.handle_event(StateMachineEvent::GetProposal(Some(BLOCK_HASH), ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Proposal(BLOCK_HASH, ROUND));
} else {
state_machine.handle_event(StateMachineEvent::StartRound(None, ROUND));
state_machine.handle_event(StateMachineEvent::GetProposal(None, ROUND));
assert!(events.is_empty());
events = state_machine.handle_event(StateMachineEvent::Proposal(BLOCK_HASH, ROUND));
}
Expand All @@ -43,12 +50,13 @@ fn events_arrive_in_ideal_order(is_proposer: bool) {

#[test]
fn validator_receives_votes_first() {
let mut state_machine = StateMachine::new(4);
let mut state_machine = StateMachine::new(*VALIDATOR_ID, 4);

let mut events = state_machine.start();
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::StartRound(None, ROUND));
let leader_fn = |_: Round| *VALIDATOR_ID;
let mut events = state_machine.start(&leader_fn);
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::GetProposal(None, ROUND));
assert!(events.is_empty(), "{:?}", events);
events = state_machine.handle_event(StateMachineEvent::StartRound(None, ROUND));
events = state_machine.handle_event(StateMachineEvent::GetProposal(None, ROUND));
assert!(events.is_empty(), "{:?}", events);

// Receives votes from all the other nodes first (more than minimum for a quorum).
Expand All @@ -69,10 +77,11 @@ fn validator_receives_votes_first() {
}

#[test]
fn buffer_events_during_start_round() {
let mut state_machine = StateMachine::new(4);
let mut events = state_machine.start();
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::StartRound(None, 0));
fn buffer_events_during_get_proposal() {
let mut state_machine = StateMachine::new(*VALIDATOR_ID, 4);
let leader_fn = |_: Round| *VALIDATOR_ID;
let mut events = state_machine.start(&leader_fn);
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::GetProposal(None, 0));
assert!(events.is_empty(), "{:?}", events);

// TODO(matan): When we support NIL votes, we should send them. Real votes without the proposal
Expand All @@ -84,7 +93,7 @@ fn buffer_events_during_start_round() {
assert!(events.is_empty(), "{:?}", events);

// Node finishes building the proposal.
events = state_machine.handle_event(StateMachineEvent::StartRound(None, 0));
events = state_machine.handle_event(StateMachineEvent::GetProposal(None, 0));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Prevote(BLOCK_HASH, ROUND));
assert_eq!(events.pop_front().unwrap(), StateMachineEvent::Precommit(BLOCK_HASH, ROUND));
assert!(events.is_empty(), "{:?}", events);
Expand Down

0 comments on commit cf6508b

Please sign in to comment.