From fd7bae05fd914ff055267eb0570bbb55e5a1bb64 Mon Sep 17 00:00:00 2001 From: matan-starkware <97523054+matan-starkware@users.noreply.github.com> Date: Sun, 15 Dec 2024 11:55:02 +0200 Subject: [PATCH] refactor(consensus): move the sync check into run_height (#2499) The goal is to remove the nested selects to make cancellation logic simpler. --- .../papyrus_consensus/src/manager.rs | 92 +++++++++---------- .../papyrus_consensus/src/manager_test.rs | 18 +++- 2 files changed, 59 insertions(+), 51 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 880d71f877..5128f9efe2 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -67,32 +67,39 @@ where metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); let is_observer = current_height < start_active_height; - let run_height = manager.run_height( - &mut context, - current_height, - is_observer, - &mut broadcast_channels, - &mut inbound_proposal_receiver, - ); - - // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop - // it. We also cannot restart the height; when we dropped the future we dropped the state it - // built and risk equivocating. Therefore, we must only enter the other select branches if - // we are certain to leave this height. - tokio::select! { - decision = run_height => { - let decision = decision?; + match manager + .run_height( + &mut context, + current_height, + is_observer, + &mut broadcast_channels, + &mut inbound_proposal_receiver, + &mut sync_receiver, + ) + .await? + { + RunHeightRes::Decision(decision) => { context.decision_reached(decision.block, decision.precommits).await?; current_height = current_height.unchecked_next(); - }, - sync_height = sync_height(current_height, &mut sync_receiver) => { + } + RunHeightRes::Sync(sync_height) => { metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT); - current_height = sync_height?.unchecked_next(); + current_height = sync_height.unchecked_next(); } } } } +/// Run height can end either when consensus reaches a decision or when we learn, via sync, of the +/// decision. +// TODO(Matan): Sync may change when Shahak actually implements. +pub enum RunHeightRes { + /// Decision reached. + Decision(Decision), + /// Sync protocol returned a future height. + Sync(BlockNumber), +} + /// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly /// part of the single height consensus algorithm (e.g. messages from future heights). #[derive(Debug, Default)] @@ -118,15 +125,19 @@ impl MultiHeightManager { /// /// Assumes that `height` is monotonically increasing across calls for the sake of filtering /// `cached_messaged`. - #[instrument(skip(self, context, broadcast_channels), level = "info")] - pub async fn run_height( + #[instrument(skip(self, context, broadcast_channels, sync_receiver), level = "info")] + pub async fn run_height( &mut self, context: &mut ContextT, height: BlockNumber, is_observer: bool, broadcast_channels: &mut BroadcastConsensusMessageChannel, proposal_receiver: &mut mpsc::Receiver>, - ) -> Result { + sync_receiver: &mut SyncReceiverT, + ) -> Result + where + SyncReceiverT: Stream + Unpin, + { let validators = context.validators(height).await; info!("running consensus for height {height:?} with validator set {validators:?}"); let mut shc = SingleHeightConsensus::new( @@ -139,7 +150,7 @@ impl MultiHeightManager { let mut shc_events = FuturesUnordered::new(); match self.start_height(context, height, &mut shc).await? { - ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)), ShcReturn::Tasks(tasks) => { for task in tasks { shc_events.push(task.run()); @@ -169,10 +180,21 @@ impl MultiHeightManager { Some(shc_event) = shc_events.next() => { shc.handle_event(context, shc_event).await? }, + sync_height = sync_receiver.next() => { + let Some(sync_height) = sync_height else { + return Err(ConsensusError::SyncError("Sync receiver closed".to_string())) + }; + if sync_height >= height { + info!("Sync to height: {}. current_height={}", sync_height, height); + return Ok(RunHeightRes::Sync(sync_height)); + } + debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); + continue; + } }; match shc_return { - ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)), ShcReturn::Tasks(tasks) => { for task in tasks { shc_events.push(task.run()); @@ -315,27 +337,3 @@ impl MultiHeightManager { } } } - -// Return only when a height is reached that is greater than or equal to the current height. -async fn sync_height( - height: BlockNumber, - mut sync_receiver: SyncReceiverT, -) -> Result -where - SyncReceiverT: Stream + Unpin, -{ - loop { - match sync_receiver.next().await { - Some(sync_height) if sync_height >= height => { - info!("Sync to height: {}. current_height={}", sync_height, height); - return Ok(sync_height); - } - Some(sync_height) => { - debug!("Ignoring sync to height: {}. current_height={}", sync_height, height); - } - None => { - return Err(ConsensusError::SyncError("Sync receiver closed".to_string())); - } - } - } -} diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 1e3ceaee64..e6ae664568 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -26,7 +26,7 @@ use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; use tokio::sync::Notify; -use super::{run_consensus, MultiHeightManager}; +use super::{run_consensus, MultiHeightManager, RunHeightRes}; use crate::config::TimeoutsConfig; use crate::test_utils::{precommit, prevote, proposal_init}; use crate::types::{ @@ -129,6 +129,13 @@ fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) { .times(1); } +fn assert_decision(res: RunHeightRes, id: Felt) { + match res { + RunHeightRes::Decision(decision) => assert_eq!(decision.block, BlockHash(id)), + _ => panic!("Expected decision"), + } +} + #[tokio::test] async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -179,10 +186,11 @@ async fn manager_multiple_heights_unordered() { false, &mut subscriber_channels, &mut proposal_receiver_receiver, + &mut futures::stream::pending(), ) .await .unwrap(); - assert_eq!(decision.block, BlockHash(Felt::ONE)); + assert_decision(decision, Felt::ONE); // Run the manager for height 2. expect_validate_proposal(&mut context, Felt::TWO); @@ -193,10 +201,11 @@ async fn manager_multiple_heights_unordered() { false, &mut subscriber_channels, &mut proposal_receiver_receiver, + &mut futures::stream::pending(), ) .await .unwrap(); - assert_eq!(decision.block, BlockHash(Felt::TWO)); + assert_decision(decision, Felt::TWO); } #[tokio::test] @@ -392,10 +401,11 @@ async fn test_timeouts() { false, &mut subscriber_channels.into(), &mut proposal_receiver_receiver, + &mut futures::stream::pending(), ) .await .unwrap(); - assert_eq!(decision.block, BlockHash(Felt::ONE)); + assert_decision(decision, Felt::ONE); }); // Wait for the timeout to be triggered.