Skip to content

Commit

Permalink
refactor(consensus): move the sync check into run_height (#2499)
Browse files Browse the repository at this point in the history
The goal is to remove the nested selects to make cancellation logic simpler.
  • Loading branch information
matan-starkware authored Dec 15, 2024
1 parent b7b51eb commit fd7bae0
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 51 deletions.
92 changes: 45 additions & 47 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,39 @@ where
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let is_observer = current_height < start_active_height;
let run_height = manager.run_height(
&mut context,
current_height,
is_observer,
&mut broadcast_channels,
&mut inbound_proposal_receiver,
);

// `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop
// it. We also cannot restart the height; when we dropped the future we dropped the state it
// built and risk equivocating. Therefore, we must only enter the other select branches if
// we are certain to leave this height.
tokio::select! {
decision = run_height => {
let decision = decision?;
match manager
.run_height(
&mut context,
current_height,
is_observer,
&mut broadcast_channels,
&mut inbound_proposal_receiver,
&mut sync_receiver,
)
.await?
{
RunHeightRes::Decision(decision) => {
context.decision_reached(decision.block, decision.precommits).await?;
current_height = current_height.unchecked_next();
},
sync_height = sync_height(current_height, &mut sync_receiver) => {
}
RunHeightRes::Sync(sync_height) => {
metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT);
current_height = sync_height?.unchecked_next();
current_height = sync_height.unchecked_next();
}
}
}
}

/// Run height can end either when consensus reaches a decision or when we learn, via sync, of the
/// decision.
// TODO(Matan): Sync may change when Shahak actually implements.
pub enum RunHeightRes {
/// Decision reached.
Decision(Decision),
/// Sync protocol returned a future height.
Sync(BlockNumber),
}

/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
/// part of the single height consensus algorithm (e.g. messages from future heights).
#[derive(Debug, Default)]
Expand All @@ -118,15 +125,19 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, broadcast_channels), level = "info")]
pub async fn run_height(
#[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")]
pub async fn run_height<SyncReceiverT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
is_observer: bool,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError> {
sync_receiver: &mut SyncReceiverT,
) -> Result<RunHeightRes, ConsensusError>
where
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
let mut shc = SingleHeightConsensus::new(
Expand All @@ -139,7 +150,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
let mut shc_events = FuturesUnordered::new();

match self.start_height(context, height, &mut shc).await? {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_events.push(task.run());
Expand Down Expand Up @@ -169,10 +180,21 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
sync_height = sync_receiver.next() => {
let Some(sync_height) = sync_height else {
return Err(ConsensusError::SyncError("Sync receiver closed".to_string()))
};
if sync_height >= height {
info!("Sync to height: {}. current_height={}", sync_height, height);
return Ok(RunHeightRes::Sync(sync_height));
}
debug!("Ignoring sync to height: {}. current_height={}", sync_height, height);
continue;
}
};

match shc_return {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_events.push(task.run());
Expand Down Expand Up @@ -315,27 +337,3 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
}
}

// Return only when a height is reached that is greater than or equal to the current height.
async fn sync_height<SyncReceiverT>(
height: BlockNumber,
mut sync_receiver: SyncReceiverT,
) -> Result<BlockNumber, ConsensusError>
where
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
loop {
match sync_receiver.next().await {
Some(sync_height) if sync_height >= height => {
info!("Sync to height: {}. current_height={}", sync_height, height);
return Ok(sync_height);
}
Some(sync_height) => {
debug!("Ignoring sync to height: {}. current_height={}", sync_height, height);
}
None => {
return Err(ConsensusError::SyncError("Sync receiver closed".to_string()));
}
}
}
}
18 changes: 14 additions & 4 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
use tokio::sync::Notify;

use super::{run_consensus, MultiHeightManager};
use super::{run_consensus, MultiHeightManager, RunHeightRes};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal_init};
use crate::types::{
Expand Down Expand Up @@ -129,6 +129,13 @@ fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) {
.times(1);
}

fn assert_decision(res: RunHeightRes, id: Felt) {
match res {
RunHeightRes::Decision(decision) => assert_eq!(decision.block, BlockHash(id)),
_ => panic!("Expected decision"),
}
}

#[tokio::test]
async fn manager_multiple_heights_unordered() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
Expand Down Expand Up @@ -179,10 +186,11 @@ async fn manager_multiple_heights_unordered() {
false,
&mut subscriber_channels,
&mut proposal_receiver_receiver,
&mut futures::stream::pending(),
)
.await
.unwrap();
assert_eq!(decision.block, BlockHash(Felt::ONE));
assert_decision(decision, Felt::ONE);

// Run the manager for height 2.
expect_validate_proposal(&mut context, Felt::TWO);
Expand All @@ -193,10 +201,11 @@ async fn manager_multiple_heights_unordered() {
false,
&mut subscriber_channels,
&mut proposal_receiver_receiver,
&mut futures::stream::pending(),
)
.await
.unwrap();
assert_eq!(decision.block, BlockHash(Felt::TWO));
assert_decision(decision, Felt::TWO);
}

#[tokio::test]
Expand Down Expand Up @@ -392,10 +401,11 @@ async fn test_timeouts() {
false,
&mut subscriber_channels.into(),
&mut proposal_receiver_receiver,
&mut futures::stream::pending(),
)
.await
.unwrap();
assert_eq!(decision.block, BlockHash(Felt::ONE));
assert_decision(decision, Felt::ONE);
});

// Wait for the timeout to be triggered.
Expand Down

0 comments on commit fd7bae0

Please sign in to comment.