Skip to content

Commit

Permalink
[consensus] advance threshold clock from dag state
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Jan 16, 2025
1 parent 39e1000 commit 0020c99
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 78 deletions.
61 changes: 29 additions & 32 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::{
leader_schedule::LeaderSchedule,
round_prober::QuorumRound,
stake_aggregator::{QuorumThreshold, StakeAggregator},
threshold_clock::ThresholdClock,
transaction::TransactionConsumer,
universal_committer::{
universal_committer_builder::UniversalCommitterBuilder, UniversalCommitter,
Expand All @@ -51,8 +50,6 @@ const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;

pub(crate) struct Core {
context: Arc<Context>,
/// The threshold clock that is used to keep track of the current round
threshold_clock: ThresholdClock,
/// The consumer to use in order to pull transactions to be included for the next proposals
transaction_consumer: TransactionConsumer,
/// The block manager which is responsible for keeping track of the DAG dependencies when processing new blocks
Expand Down Expand Up @@ -162,8 +159,7 @@ impl Core {
ancestor_state_manager.set_propagation_scores(propagation_scores);

Self {
context: context.clone(),
threshold_clock: ThresholdClock::new(0, context.clone()),
context,
last_included_ancestors,
last_decided_leader,
leader_schedule,
Expand Down Expand Up @@ -206,9 +202,9 @@ impl Core {
);
std::thread::sleep(Duration::from_millis(wait_ms));
}
// Recover the last available quorum to correctly advance the threshold clock.
let last_quorum = self.dag_state.read().last_quorum();
self.add_accepted_blocks(last_quorum);
// Try to set up leader timeout if needed.
// TODO: review & document the order of operations here.
self.try_notify_new_round();
// Try to commit and propose, since they may not have run after the last storage write.
self.try_commit().unwrap();

Expand Down Expand Up @@ -269,9 +265,10 @@ impl Core {
.join(",")
);

// Now add accepted blocks to the threshold clock and pending ancestors list.
self.add_accepted_blocks(accepted_blocks);
// Now set up leader timeout if needed.
self.try_notify_new_round();

// Try to commit the new blocks if possible.
self.try_commit()?;

// Try to propose now since there are new blocks accepted.
Expand All @@ -288,24 +285,20 @@ impl Core {
Ok(missing_block_refs)
}

/// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the
/// pending ancestors list.
fn add_accepted_blocks(&mut self, accepted_blocks: Vec<VerifiedBlock>) {
// Advance the threshold clock. If advanced to a new round then send a signal that a new quorum has been received.
if let Some(new_round) = self
.threshold_clock
.add_blocks(accepted_blocks.iter().map(|b| b.reference()).collect())
{
// notify that threshold clock advanced to new round
self.signals.new_round(new_round);
}
/// Try to set up leader timeout if needed.
fn try_notify_new_round(&mut self) {
// If advanced to a new round then send a signal to set up leader timeout.
let Some(new_clock_round) = self.new_clock_round() else {
return;
};
self.signals.new_round(new_clock_round);

// Report the threshold clock round
self.context
.metrics
.node_metrics
.threshold_clock_round
.set(self.threshold_clock.get_round() as i64);
.set(new_clock_round as i64);
}

/// Creating a new block for the dictated round. This is used when a leader timeout occurs, either
Expand Down Expand Up @@ -359,13 +352,10 @@ impl Core {
.with_label_values(&["Core::try_new_block"])
.start_timer();

let clock_round = self.threshold_clock.get_round();
if clock_round <= self.last_proposed_round() {
return None;
}
let clock_round = self.new_clock_round()?;

// There must be a quorum of blocks from the previous round.
let quorum_round = self.threshold_clock.get_round().saturating_sub(1);
let quorum_round = clock_round.saturating_sub(1);

// Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
// or because we are actually ready to produce the block (leader exists and min delay has passed).
Expand Down Expand Up @@ -428,7 +418,7 @@ impl Core {
.with_label_values(&[leader_authority])
.inc_by(
Instant::now()
.saturating_duration_since(self.threshold_clock.get_quorum_ts())
.saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts())
.as_millis() as u64,
);
self.context
Expand Down Expand Up @@ -527,9 +517,6 @@ impl Core {
assert_eq!(accepted_blocks.len(), 1);
assert!(missing.is_empty());

// Internally accept the block to move the threshold clock etc
self.add_accepted_blocks(vec![verified_block.clone()]);

// Ensure the new block and its ancestors are persisted, before broadcasting it.
self.dag_state.write().flush();

Expand Down Expand Up @@ -736,7 +723,7 @@ impl Core {

/// Whether the core should propose new blocks.
pub(crate) fn should_propose(&self) -> bool {
let clock_round = self.threshold_clock.get_round();
let clock_round = self.dag_state.read().threshold_clock_round();
let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals;

if !self.subscriber_exists {
Expand Down Expand Up @@ -1048,6 +1035,16 @@ impl Core {
self.leaders(round).first().unwrap().authority
}

/// Returns the next clock round if it has not been proposed yet. None otherwise.
fn new_clock_round(&self) -> Option<Round> {
let dag_state = self.dag_state.read();
let clock_round = dag_state.threshold_clock_round();
if clock_round <= dag_state.get_last_proposed_block().round() {
return None;
}
Some(clock_round)
}

fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
self.last_proposed_block().timestamp_ms()
}
Expand Down
89 changes: 54 additions & 35 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{

use consensus_config::AuthorityIndex;
use itertools::Itertools as _;
use tokio::time::Instant;
use tracing::{debug, error, info};

use crate::{
Expand All @@ -25,8 +26,8 @@ use crate::{
},
context::Context,
leader_scoring::{ReputationScores, ScoringSubdag},
stake_aggregator::{QuorumThreshold, StakeAggregator},
storage::{Store, WriteBatch},
threshold_clock::ThresholdClock,
CommittedSubDag,
};

Expand Down Expand Up @@ -56,6 +57,9 @@ pub(crate) struct DagState {
// Vec position corresponds to the authority index.
recent_refs_by_authority: Vec<BTreeSet<BlockRef>>,

// Keeps track of the threshold clock for proposing blocks.
threshold_clock: ThresholdClock,

// Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round
// should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear.
// The `evicted_rounds` size should be the same as the committee size.
Expand Down Expand Up @@ -114,6 +118,8 @@ impl DagState {
.map(|block| (block.reference(), block))
.collect();

let threshold_clock = ThresholdClock::new(0, context.clone());

let last_commit = store
.read_last_commit()
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
Expand Down Expand Up @@ -168,6 +174,7 @@ impl DagState {
genesis,
recent_blocks: BTreeMap::new(),
recent_refs_by_authority: vec![BTreeSet::new(); num_authorities],
threshold_clock,
highest_accepted_round: 0,
last_commit: last_commit.clone(),
last_commit_round_advancement_time: None,
Expand Down Expand Up @@ -334,6 +341,7 @@ impl DagState {
self.recent_blocks
.insert(block_ref, BlockInfo::new(block.clone()));
self.recent_refs_by_authority[block_ref.author].insert(block_ref);
self.threshold_clock.add_block(block_ref);
self.highest_accepted_round = max(self.highest_accepted_round, block.round());
self.context
.metrics
Expand Down Expand Up @@ -518,11 +526,6 @@ impl DagState {
.collect()
}

pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
let blocks = self.contains_blocks(vec![*block_ref]);
blocks.first().cloned().unwrap()
}

/// Gets the last proposed block from this authority.
/// If no block is proposed yet, returns the genesis block.
pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock {
Expand Down Expand Up @@ -728,6 +731,19 @@ impl DagState {
exist
}

pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool {
let blocks = self.contains_blocks(vec![*block_ref]);
blocks.first().cloned().unwrap()
}

pub(crate) fn threshold_clock_round(&self) -> Round {
self.threshold_clock.get_round()
}

pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant {
self.threshold_clock.get_quorum_ts()
}

pub(crate) fn highest_accepted_round(&self) -> Round {
self.highest_accepted_round
}
Expand Down Expand Up @@ -959,31 +975,6 @@ impl DagState {
);
}

/// Detects and returns the blocks of the round that forms the last quorum. The method will return
/// the quorum even if that's genesis.
pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
// the quorum should exist either on the highest accepted round or the one before. If we fail to detect
// a quorum then it means that our DAG has advanced with missing causal history.
for round in
(self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
{
if round == GENESIS_ROUND {
return self.genesis_blocks();
}
let mut quorum = StakeAggregator::<QuorumThreshold>::new();

// Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
let blocks = self.get_uncommitted_blocks_at_round(round);
for block in &blocks {
if quorum.add(block.author(), &self.context.committee) {
return blocks;
}
}
}

panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
}

pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> {
self.store
.read_last_commit_info()
Expand Down Expand Up @@ -1039,10 +1030,6 @@ impl DagState {
.end()
}

pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
self.genesis.values().cloned().collect()
}

/// The last round that should get evicted after a cache clean up operation. After this round we are
/// guaranteed to have all the produced blocks from that authority. For any round that is
/// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist.
Expand Down Expand Up @@ -1072,6 +1059,38 @@ impl DagState {
gc_round.min(last_round.saturating_sub(cached_rounds))
}

/// Detects and returns the blocks of the round that forms the last quorum. The method will return
/// the quorum even if that's genesis.
#[cfg(test)]
pub(crate) fn last_quorum(&self) -> Vec<VerifiedBlock> {
// the quorum should exist either on the highest accepted round or the one before. If we fail to detect
// a quorum then it means that our DAG has advanced with missing causal history.
for round in
(self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev()
{
if round == GENESIS_ROUND {
return self.genesis_blocks();
}
use crate::stake_aggregator::{QuorumThreshold, StakeAggregator};
let mut quorum = StakeAggregator::<QuorumThreshold>::new();

// Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds.
let blocks = self.get_uncommitted_blocks_at_round(round);
for block in &blocks {
if quorum.add(block.author(), &self.context.committee) {
return blocks;
}
}
}

panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds.");
}

#[cfg(test)]
pub(crate) fn genesis_blocks(&self) -> Vec<VerifiedBlock> {
self.genesis.values().cloned().collect()
}

#[cfg(test)]
pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) {
self.last_commit = Some(commit);
Expand Down
24 changes: 13 additions & 11 deletions consensus/core/src/threshold_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,8 @@ impl ThresholdClock {
}
}

/// Add the block references that have been successfully processed and advance the round accordingly. If the round
/// has indeed advanced then the new round is returned, otherwise None is returned.
pub(crate) fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
let previous_round = self.round;
for block_ref in blocks {
self.add_block(block_ref);
}
(self.round > previous_round).then_some(self.round)
}

fn add_block(&mut self, block: BlockRef) {
/// Add the block reference that have been accepted and advance the round accordingly.
pub(crate) fn add_block(&mut self, block: BlockRef) {
match block.round.cmp(&self.round) {
// Blocks with round less then what we currently build are irrelevant here
Ordering::Less => {}
Expand Down Expand Up @@ -67,6 +58,17 @@ impl ThresholdClock {
}
}

/// Add the block references that have been successfully processed and advance the round accordingly. If the round
/// has indeed advanced then the new round is returned, otherwise None is returned.
#[cfg(test)]
fn add_blocks(&mut self, blocks: Vec<BlockRef>) -> Option<Round> {
let previous_round = self.round;
for block_ref in blocks {
self.add_block(block_ref);
}
(self.round > previous_round).then_some(self.round)
}

pub(crate) fn get_round(&self) -> Round {
self.round
}
Expand Down

0 comments on commit 0020c99

Please sign in to comment.