diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 7aae08598d537..c42c9d592dd59 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -33,7 +33,6 @@ use crate::{ leader_schedule::LeaderSchedule, round_prober::QuorumRound, stake_aggregator::{QuorumThreshold, StakeAggregator}, - threshold_clock::ThresholdClock, transaction::TransactionConsumer, universal_committer::{ universal_committer_builder::UniversalCommitterBuilder, UniversalCommitter, @@ -51,8 +50,6 @@ const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100; pub(crate) struct Core { context: Arc, - /// The threshold clock that is used to keep track of the current round - threshold_clock: ThresholdClock, /// The consumer to use in order to pull transactions to be included for the next proposals transaction_consumer: TransactionConsumer, /// The block manager which is responsible for keeping track of the DAG dependencies when processing new blocks @@ -162,8 +159,7 @@ impl Core { ancestor_state_manager.set_propagation_scores(propagation_scores); Self { - context: context.clone(), - threshold_clock: ThresholdClock::new(0, context.clone()), + context, last_included_ancestors, last_decided_leader, leader_schedule, @@ -206,9 +202,9 @@ impl Core { ); std::thread::sleep(Duration::from_millis(wait_ms)); } - // Recover the last available quorum to correctly advance the threshold clock. - let last_quorum = self.dag_state.read().last_quorum(); - self.add_accepted_blocks(last_quorum); + // Try to set up leader timeout if needed. + // TODO: review & document the order of operations here. + self.try_notify_new_round(); // Try to commit and propose, since they may not have run after the last storage write. self.try_commit().unwrap(); @@ -269,9 +265,10 @@ impl Core { .join(",") ); - // Now add accepted blocks to the threshold clock and pending ancestors list. - self.add_accepted_blocks(accepted_blocks); + // Now set up leader timeout if needed. + self.try_notify_new_round(); + // Try to commit the new blocks if possible. self.try_commit()?; // Try to propose now since there are new blocks accepted. @@ -288,24 +285,20 @@ impl Core { Ok(missing_block_refs) } - /// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the - /// pending ancestors list. - fn add_accepted_blocks(&mut self, accepted_blocks: Vec) { - // Advance the threshold clock. If advanced to a new round then send a signal that a new quorum has been received. - if let Some(new_round) = self - .threshold_clock - .add_blocks(accepted_blocks.iter().map(|b| b.reference()).collect()) - { - // notify that threshold clock advanced to new round - self.signals.new_round(new_round); - } + /// Try to set up leader timeout if needed. + fn try_notify_new_round(&mut self) { + // If advanced to a new round then send a signal to set up leader timeout. + let Some(new_clock_round) = self.new_clock_round() else { + return; + }; + self.signals.new_round(new_clock_round); // Report the threshold clock round self.context .metrics .node_metrics .threshold_clock_round - .set(self.threshold_clock.get_round() as i64); + .set(new_clock_round as i64); } /// Creating a new block for the dictated round. This is used when a leader timeout occurs, either @@ -359,13 +352,10 @@ impl Core { .with_label_values(&["Core::try_new_block"]) .start_timer(); - let clock_round = self.threshold_clock.get_round(); - if clock_round <= self.last_proposed_round() { - return None; - } + let clock_round = self.new_clock_round()?; // There must be a quorum of blocks from the previous round. - let quorum_round = self.threshold_clock.get_round().saturating_sub(1); + let quorum_round = clock_round.saturating_sub(1); // Create a new block either because we want to "forcefully" propose a block due to a leader timeout, // or because we are actually ready to produce the block (leader exists and min delay has passed). @@ -428,7 +418,7 @@ impl Core { .with_label_values(&[leader_authority]) .inc_by( Instant::now() - .saturating_duration_since(self.threshold_clock.get_quorum_ts()) + .saturating_duration_since(self.dag_state.read().threshold_clock_quorum_ts()) .as_millis() as u64, ); self.context @@ -527,9 +517,6 @@ impl Core { assert_eq!(accepted_blocks.len(), 1); assert!(missing.is_empty()); - // Internally accept the block to move the threshold clock etc - self.add_accepted_blocks(vec![verified_block.clone()]); - // Ensure the new block and its ancestors are persisted, before broadcasting it. self.dag_state.write().flush(); @@ -736,7 +723,7 @@ impl Core { /// Whether the core should propose new blocks. pub(crate) fn should_propose(&self) -> bool { - let clock_round = self.threshold_clock.get_round(); + let clock_round = self.dag_state.read().threshold_clock_round(); let core_skipped_proposals = &self.context.metrics.node_metrics.core_skipped_proposals; if !self.subscriber_exists { @@ -1048,6 +1035,16 @@ impl Core { self.leaders(round).first().unwrap().authority } + /// Returns the next clock round if it's advanced. None otherwise. + fn new_clock_round(&self) -> Option { + let dag_state = self.dag_state.read(); + let clock_round = dag_state.threshold_clock_round(); + if clock_round <= dag_state.get_last_proposed_block().round() { + return None; + } + Some(clock_round) + } + fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs { self.last_proposed_block().timestamp_ms() } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 3ff804d02da8d..04f54786b4881 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -12,6 +12,7 @@ use std::{ use consensus_config::AuthorityIndex; use itertools::Itertools as _; +use tokio::time::Instant; use tracing::{debug, error, info}; use crate::{ @@ -25,8 +26,8 @@ use crate::{ }, context::Context, leader_scoring::{ReputationScores, ScoringSubdag}, - stake_aggregator::{QuorumThreshold, StakeAggregator}, storage::{Store, WriteBatch}, + threshold_clock::ThresholdClock, CommittedSubDag, }; @@ -56,6 +57,9 @@ pub(crate) struct DagState { // Vec position corresponds to the authority index. recent_refs_by_authority: Vec>, + // Keeps track of the threshold clock for proposing blocks. + threshold_clock: ThresholdClock, + // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear. // The `evicted_rounds` size should be the same as the committee size. @@ -114,6 +118,8 @@ impl DagState { .map(|block| (block.reference(), block)) .collect(); + let threshold_clock = ThresholdClock::new(0, context.clone()); + let last_commit = store .read_last_commit() .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)); @@ -168,6 +174,7 @@ impl DagState { genesis, recent_blocks: BTreeMap::new(), recent_refs_by_authority: vec![BTreeSet::new(); num_authorities], + threshold_clock, highest_accepted_round: 0, last_commit: last_commit.clone(), last_commit_round_advancement_time: None, @@ -334,6 +341,7 @@ impl DagState { self.recent_blocks .insert(block_ref, BlockInfo::new(block.clone())); self.recent_refs_by_authority[block_ref.author].insert(block_ref); + self.threshold_clock.add_block(block_ref); self.highest_accepted_round = max(self.highest_accepted_round, block.round()); self.context .metrics @@ -518,11 +526,6 @@ impl DagState { .collect() } - pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool { - let blocks = self.contains_blocks(vec![*block_ref]); - blocks.first().cloned().unwrap() - } - /// Gets the last proposed block from this authority. /// If no block is proposed yet, returns the genesis block. pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock { @@ -728,6 +731,19 @@ impl DagState { exist } + pub(crate) fn contains_block(&self, block_ref: &BlockRef) -> bool { + let blocks = self.contains_blocks(vec![*block_ref]); + blocks.first().cloned().unwrap() + } + + pub(crate) fn threshold_clock_round(&self) -> Round { + self.threshold_clock.get_round() + } + + pub(crate) fn threshold_clock_quorum_ts(&self) -> Instant { + self.threshold_clock.get_quorum_ts() + } + pub(crate) fn highest_accepted_round(&self) -> Round { self.highest_accepted_round } @@ -959,31 +975,6 @@ impl DagState { ); } - /// Detects and returns the blocks of the round that forms the last quorum. The method will return - /// the quorum even if that's genesis. - pub(crate) fn last_quorum(&self) -> Vec { - // the quorum should exist either on the highest accepted round or the one before. If we fail to detect - // a quorum then it means that our DAG has advanced with missing causal history. - for round in - (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev() - { - if round == GENESIS_ROUND { - return self.genesis_blocks(); - } - let mut quorum = StakeAggregator::::new(); - - // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds. - let blocks = self.get_uncommitted_blocks_at_round(round); - for block in &blocks { - if quorum.add(block.author(), &self.context.committee) { - return blocks; - } - } - } - - panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds."); - } - pub(crate) fn recover_last_commit_info(&self) -> Option<(CommitRef, CommitInfo)> { self.store .read_last_commit_info() @@ -1039,10 +1030,6 @@ impl DagState { .end() } - pub(crate) fn genesis_blocks(&self) -> Vec { - self.genesis.values().cloned().collect() - } - /// The last round that should get evicted after a cache clean up operation. After this round we are /// guaranteed to have all the produced blocks from that authority. For any round that is /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist. @@ -1072,6 +1059,38 @@ impl DagState { gc_round.min(last_round.saturating_sub(cached_rounds)) } + /// Detects and returns the blocks of the round that forms the last quorum. The method will return + /// the quorum even if that's genesis. + #[cfg(test)] + pub(crate) fn last_quorum(&self) -> Vec { + // the quorum should exist either on the highest accepted round or the one before. If we fail to detect + // a quorum then it means that our DAG has advanced with missing causal history. + for round in + (self.highest_accepted_round.saturating_sub(1)..=self.highest_accepted_round).rev() + { + if round == GENESIS_ROUND { + return self.genesis_blocks(); + } + use crate::stake_aggregator::{QuorumThreshold, StakeAggregator}; + let mut quorum = StakeAggregator::::new(); + + // Since the minimum wave length is 3 we expect to find a quorum in the uncommitted rounds. + let blocks = self.get_uncommitted_blocks_at_round(round); + for block in &blocks { + if quorum.add(block.author(), &self.context.committee) { + return blocks; + } + } + } + + panic!("Fatal error, no quorum has been detected in our DAG on the last two rounds."); + } + + #[cfg(test)] + pub(crate) fn genesis_blocks(&self) -> Vec { + self.genesis.values().cloned().collect() + } + #[cfg(test)] pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) { self.last_commit = Some(commit); diff --git a/consensus/core/src/threshold_clock.rs b/consensus/core/src/threshold_clock.rs index ef8ca8b752973..57e9454c56e80 100644 --- a/consensus/core/src/threshold_clock.rs +++ b/consensus/core/src/threshold_clock.rs @@ -28,17 +28,8 @@ impl ThresholdClock { } } - /// Add the block references that have been successfully processed and advance the round accordingly. If the round - /// has indeed advanced then the new round is returned, otherwise None is returned. - pub(crate) fn add_blocks(&mut self, blocks: Vec) -> Option { - let previous_round = self.round; - for block_ref in blocks { - self.add_block(block_ref); - } - (self.round > previous_round).then_some(self.round) - } - - fn add_block(&mut self, block: BlockRef) { + /// Add the block reference that have been accepted and advance the round accordingly. + pub(crate) fn add_block(&mut self, block: BlockRef) { match block.round.cmp(&self.round) { // Blocks with round less then what we currently build are irrelevant here Ordering::Less => {} @@ -67,6 +58,17 @@ impl ThresholdClock { } } + /// Add the block references that have been successfully processed and advance the round accordingly. If the round + /// has indeed advanced then the new round is returned, otherwise None is returned. + #[cfg(test)] + fn add_blocks(&mut self, blocks: Vec) -> Option { + let previous_round = self.round; + for block_ref in blocks { + self.add_block(block_ref); + } + (self.round > previous_round).then_some(self.round) + } + pub(crate) fn get_round(&self) -> Round { self.round }