Skip to content

Commit

Permalink
[consensus] Get highest accepted rounds from RoundProber and use in a…
Browse files Browse the repository at this point in the history
…ncestor selection (#20336)

## Description 

For ancestor selection we want to ensure the highest quality ancestors
are included in the proposal, therefore if an authority is excluded we
should start including them once they have caught up to the network with
their accepted blocks not just received as its possible for blocks to be
received but suspended on many hosts which can cause performance
degradation

Additionally switched to using high quorum round of the high quorum
round of accepted rounds for inclusion metric.

Also bumped protocol version to 70 to include the new feature.

## Test plan 

How did you test the new or updated feature?

Injected Latency Test - https://metrics.sui.io/goto/Jowh4F7Hg?orgId=1
Hoarded Block Test - Pending

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [X] Protocol: v70 Enable probing for accepted rounds in round prober
in consensus.
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
arun-koshy authored Dec 5, 2024
1 parent cb0016d commit b4d6dc1
Show file tree
Hide file tree
Showing 23 changed files with 1,688 additions and 221 deletions.
5 changes: 0 additions & 5 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ impl Committee {
self.total_stake
}

pub fn n_percent_stake_threshold(&self, n: u64) -> Stake {
assert!(n <= 100, "n must be between 0 and 100");
self.total_stake * n / 100
}

pub fn quorum_threshold(&self) -> Stake {
self.quorum_threshold
}
Expand Down
331 changes: 272 additions & 59 deletions consensus/core/src/ancestor.rs

Large diffs are not rendered by default.

27 changes: 19 additions & 8 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,28 @@ impl<C: CoreThreadDispatcher> NetworkService for AuthorityService<C> {
Ok(result)
}

async fn handle_get_latest_rounds(&self, _peer: AuthorityIndex) -> ConsensusResult<Vec<Round>> {
async fn handle_get_latest_rounds(
&self,
_peer: AuthorityIndex,
) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
fail_point_async!("consensus-rpc-response");

let mut highest_received_rounds = self.core_dispatcher.highest_received_rounds();
// Own blocks do not go through the core dispatcher, so they need to be set separately.
highest_received_rounds[self.context.own_index] = self

let blocks = self
.dag_state
.read()
.get_last_block_for_authority(self.context.own_index)
.round();
.get_last_cached_block_per_authority(Round::MAX);
let highest_accepted_rounds = blocks
.into_iter()
.map(|block| block.round())
.collect::<Vec<_>>();

// Own blocks do not go through the core dispatcher, so they need to be set separately.
highest_received_rounds[self.context.own_index] =
highest_accepted_rounds[self.context.own_index];

Ok(highest_received_rounds)
Ok((highest_received_rounds, highest_accepted_rounds))
}
}

Expand Down Expand Up @@ -669,7 +679,8 @@ mod tests {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
_received_quorum_rounds: Vec<QuorumRound>,
_accepted_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}
Expand Down Expand Up @@ -740,7 +751,7 @@ mod tests {
&self,
_peer: AuthorityIndex,
_timeout: Duration,
) -> ConsensusResult<Vec<Round>> {
) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
unimplemented!("Unimplemented")
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ mod test {
&self,
_peer: AuthorityIndex,
_timeout: Duration,
) -> ConsensusResult<Vec<Round>> {
) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
unimplemented!("Unimplemented")
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/commit_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ mod tests {
&self,
_peer: AuthorityIndex,
_timeout: Duration,
) -> ConsensusResult<Vec<Round>> {
) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
unimplemented!("Unimplemented")
}
}
Expand Down
30 changes: 17 additions & 13 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ impl Core {
}

/// Processes the provided blocks and accepts them if possible when their causal history exists.
/// The method returns the references of parents that are unknown and need to be fetched.
/// The method returns:
/// - The references of accepted blocks
/// - The references of ancestors missing their block
pub(crate) fn add_blocks(
&mut self,
blocks: Vec<VerifiedBlock>,
Expand All @@ -266,7 +268,7 @@ impl Core {
.observe(blocks.len() as f64);

// Try to accept them via the block manager
let (accepted_blocks, missing_blocks) = self.block_manager.try_accept_blocks(blocks);
let (accepted_blocks, missing_block_refs) = self.block_manager.try_accept_blocks(blocks);

if !accepted_blocks.is_empty() {
debug!(
Expand All @@ -284,13 +286,13 @@ impl Core {

// Try to propose now since there are new blocks accepted.
self.try_propose(false)?;
}
};

if !missing_blocks.is_empty() {
debug!("Missing blocks: {:?}", missing_blocks);
if !missing_block_refs.is_empty() {
debug!("Missing block refs: {:?}", missing_block_refs);
}

Ok(missing_blocks)
Ok(missing_block_refs)
}

/// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the
Expand Down Expand Up @@ -691,16 +693,18 @@ impl Core {
self.subscriber_exists = exists;
}

/// Sets the delay by round for propagating blocks to a quorum and the
/// quorum round per authority for ancestor state manager.
/// Sets the delay by round for propagating blocks to a quorum and the received
/// & accepted quorum rounds per authority for ancestor state manager.
pub(crate) fn set_propagation_delay_and_quorum_rounds(
&mut self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
received_quorum_rounds: Vec<QuorumRound>,
accepted_quorum_rounds: Vec<QuorumRound>,
) {
info!("Quorum round per authority in ancestor state manager set to: {quorum_rounds:?}");
info!("Received quorum round per authority in ancestor state manager set to: {received_quorum_rounds:?}");
info!("Accepted quorum round per authority in ancestor state manager set to: {accepted_quorum_rounds:?}");
self.ancestor_state_manager
.set_quorum_round_per_authority(quorum_rounds);
.set_quorum_rounds_per_authority(received_quorum_rounds, accepted_quorum_rounds);
info!("Propagation round delay set to: {delay}");
self.propagation_delay = delay;
}
Expand Down Expand Up @@ -2332,7 +2336,7 @@ mod test {
);

// Use a large propagation delay to disable proposing.
core.set_propagation_delay_and_quorum_rounds(1000, vec![]);
core.set_propagation_delay_and_quorum_rounds(1000, vec![], vec![]);

// Make propagation delay the only reason for not proposing.
core.set_subscriber_exists(true);
Expand All @@ -2341,7 +2345,7 @@ mod test {
assert!(core.try_propose(true).unwrap().is_none());

// Let Core know there is no propagation delay.
core.set_propagation_delay_and_quorum_rounds(0, vec![]);
core.set_propagation_delay_and_quorum_rounds(0, vec![], vec![]);

// Proposing now would succeed.
assert!(core.try_propose(true).unwrap().is_some());
Expand Down
61 changes: 44 additions & 17 deletions consensus/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static {
fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError>;

/// Sets the estimated delay to propagate a block to a quorum of peers, in
/// number of rounds, and the quorum rounds for all authorities.
/// number of rounds, and the received & accepted quorum rounds for all
/// authorities.
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
received_quorum_rounds: Vec<QuorumRound>,
accepted_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError>;

fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;
Expand All @@ -97,7 +99,7 @@ struct CoreThread {
core: Core,
receiver: Receiver<CoreThreadCommand>,
rx_subscriber_exists: watch::Receiver<bool>,
rx_propagation_delay_and_quorum_rounds: watch::Receiver<(Round, Vec<QuorumRound>)>,
rx_propagation_delay_and_quorum_rounds: watch::Receiver<PropagationDelayAndQuorumRounds>,
rx_last_known_proposed_round: watch::Receiver<Round>,
context: Arc<Context>,
}
Expand All @@ -116,8 +118,8 @@ impl CoreThread {
match command {
CoreThreadCommand::AddBlocks(blocks, sender) => {
let _scope = monitored_scope("CoreThread::loop::add_blocks");
let missing_blocks = self.core.add_blocks(blocks)?;
sender.send(missing_blocks).ok();
let missing_block_refs = self.core.add_blocks(blocks)?;
sender.send(missing_block_refs).ok();
}
CoreThreadCommand::NewBlock(round, sender, force) => {
let _scope = monitored_scope("CoreThread::loop::new_block");
Expand Down Expand Up @@ -150,8 +152,12 @@ impl CoreThread {
_ = self.rx_propagation_delay_and_quorum_rounds.changed() => {
let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_and_quorum_rounds");
let should_propose_before = self.core.should_propose();
let (delay, quorum_rounds) = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
self.core.set_propagation_delay_and_quorum_rounds(delay, quorum_rounds);
let state = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
self.core.set_propagation_delay_and_quorum_rounds(
state.delay,
state.received_quorum_rounds,
state.accepted_quorum_rounds
);
if !should_propose_before && self.core.should_propose() {
// If core cannnot propose before but can propose now, try to produce a new block to ensure liveness,
// because block proposal could have been skipped.
Expand All @@ -170,7 +176,7 @@ pub(crate) struct ChannelCoreThreadDispatcher {
context: Arc<Context>,
sender: WeakSender<CoreThreadCommand>,
tx_subscriber_exists: Arc<watch::Sender<bool>>,
tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<(Round, Vec<QuorumRound>)>>,
tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<PropagationDelayAndQuorumRounds>>,
tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
highest_received_rounds: Arc<Vec<AtomicU32>>,
}
Expand All @@ -181,23 +187,29 @@ impl ChannelCoreThreadDispatcher {
dag_state: &RwLock<DagState>,
core: Core,
) -> (Self, CoreThreadHandle) {
// Initialize highest received rounds to last accepted rounds.
// Initialize highest received rounds.
let highest_received_rounds = {
let dag_state = dag_state.read();
context
let highest_received_rounds = context
.committee
.authorities()
.map(|(index, _)| {
AtomicU32::new(dag_state.get_last_block_for_authority(index).round())
})
.collect()
.collect();

highest_received_rounds
};

let (sender, receiver) =
channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
let (tx_subscriber_exists, mut rx_subscriber_exists) = watch::channel(false);
let (tx_propagation_delay_and_quorum_rounds, mut rx_propagation_delay_and_quorum_rounds) =
watch::channel((0, vec![(0, 0); context.committee.size()]));
watch::channel(PropagationDelayAndQuorumRounds {
delay: 0,
received_quorum_rounds: vec![(0, 0); context.committee.size()],
accepted_quorum_rounds: vec![(0, 0); context.committee.size()],
});
let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
rx_subscriber_exists.mark_unchanged();
rx_propagation_delay_and_quorum_rounds.mark_unchanged();
Expand Down Expand Up @@ -264,9 +276,11 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
self.highest_received_rounds[block.author()].fetch_max(block.round(), Ordering::AcqRel);
}
let (sender, receiver) = oneshot::channel();
self.send(CoreThreadCommand::AddBlocks(blocks, sender))
self.send(CoreThreadCommand::AddBlocks(blocks.clone(), sender))
.await;
receiver.await.map_err(|e| Shutdown(e.to_string()))
let missing_block_refs = receiver.await.map_err(|e| Shutdown(e.to_string()))?;

Ok(missing_block_refs)
}

async fn new_block(&self, round: Round, force: bool) -> Result<(), CoreError> {
Expand All @@ -291,10 +305,15 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
received_quorum_rounds: Vec<QuorumRound>,
accepted_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
self.tx_propagation_delay_and_quorum_rounds
.send((delay, quorum_rounds))
.send(PropagationDelayAndQuorumRounds {
delay,
received_quorum_rounds,
accepted_quorum_rounds,
})
.map_err(|e| Shutdown(e.to_string()))
}

Expand All @@ -312,6 +331,13 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
}
}

#[derive(Clone)]
struct PropagationDelayAndQuorumRounds {
delay: Round,
received_quorum_rounds: Vec<QuorumRound>,
accepted_quorum_rounds: Vec<QuorumRound>,
}

// TODO: complete the Mock for thread dispatcher to be used from several tests
#[cfg(test)]
#[derive(Default)]
Expand Down Expand Up @@ -372,7 +398,8 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
_received_quorum_rounds: Vec<QuorumRound>,
_accepted_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}
Expand Down
3 changes: 2 additions & 1 deletion consensus/core/src/leader_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ mod tests {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
_received_quorum_rounds: Vec<QuorumRound>,
_accepted_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}
Expand Down
Loading

0 comments on commit b4d6dc1

Please sign in to comment.