diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index b522e23289..2eb1671899 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -14,7 +14,7 @@ use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_C use papyrus_network::network_manager::BroadcastTopicClientTrait; use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; use starknet_api::block::BlockNumber; -use tracing::{debug, info, instrument, warn}; +use tracing::{debug, info, instrument}; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus}; @@ -88,16 +88,22 @@ where /// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly /// part of the single height consensus algorithm (e.g. messages from future heights). #[derive(Debug, Default)] -struct MultiHeightManager { +struct MultiHeightManager { validator_id: ValidatorId, cached_messages: BTreeMap>, + cached_proposals: BTreeMap)>, timeouts: TimeoutsConfig, } -impl MultiHeightManager { +impl MultiHeightManager { /// Create a new consensus manager. pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self { - Self { validator_id, cached_messages: BTreeMap::new(), timeouts } + Self { + validator_id, + cached_messages: BTreeMap::new(), + cached_proposals: BTreeMap::new(), + timeouts, + } } /// Run the consensus algorithm for a single height. @@ -105,16 +111,13 @@ impl MultiHeightManager { /// Assumes that `height` is monotonically increasing across calls for the sake of filtering /// `cached_messaged`. #[instrument(skip(self, context, broadcast_channels), level = "info")] - pub async fn run_height( + pub async fn run_height( &mut self, context: &mut ContextT, height: BlockNumber, broadcast_channels: &mut BroadcastConsensusMessageChannel, proposal_receiver: &mut mpsc::Receiver>, - ) -> Result - where - ContextT: ConsensusContext, - { + ) -> Result { let validators = context.validators(height).await; info!("running consensus for height {height:?} with validator set {validators:?}"); let mut shc = SingleHeightConsensus::new( @@ -135,14 +138,31 @@ impl MultiHeightManager { } let mut current_height_messages = self.get_current_height_messages(height); + // If there's already a cached proposal, handle that before looping. + if let Some((init, proposal)) = self.get_current_proposal(height) { + let shc_return = + self.handle_proposal(context, height, &mut shc, init, proposal).await?; + // Handle potential tasks like validate the proposal. + match shc_return { + ShcReturn::Decision(decision) => return Ok(decision), + ShcReturn::Tasks(tasks) => { + for task in tasks { + shc_events.push(task.run()); + } + } + } + }; + + // No cached proposal, loop over incoming proposals, messages, cached messages, and events. loop { let shc_return = tokio::select! { + // TODO(Matan): remove report peer / continue propagation, as they are not cancel safe. message = next_message(&mut current_height_messages, broadcast_channels) => { self.handle_message(context, height, &mut shc, message?).await? }, Some(mut content_receiver) = proposal_receiver.next() => { // Get the first message to verify the init was sent. - // TODO(guyn): add a timeout and panic, since StreamHandler should only send once + // TODO(guyn): add a timeout and panic, since StreamHandler should only send once // the first message (message_id=0) has arrived. let Some(first_part) = content_receiver.next().await else { return Err(ConsensusError::InternalNetworkError( @@ -169,37 +189,35 @@ impl MultiHeightManager { } // Handle a new proposal receiver from the network. - async fn handle_proposal( + async fn handle_proposal( &mut self, context: &mut ContextT, height: BlockNumber, shc: &mut SingleHeightConsensus, proposal_init: ProposalInit, content_receiver: mpsc::Receiver, - ) -> Result - where - ContextT: ConsensusContext, - { - // TODO(guyn): what is the right thing to do if proposal's height doesn't match? + ) -> Result { if proposal_init.height != height { - // TODO(guyn): add caching of heights for future use. - warn!("Received a proposal for a different height. {:?}", proposal_init); + debug!("Received a proposal for a different height. {:?}", proposal_init); + if proposal_init.height > height { + // Note: this will overwrite an existing content_receiver for this height! + self.cached_proposals + .insert(proposal_init.height.0, (proposal_init, content_receiver)); + } + return Ok(ShcReturn::Tasks(Vec::new())); } shc.handle_proposal(context, proposal_init, content_receiver).await } // Handle a single consensus message. - async fn handle_message( + async fn handle_message( &mut self, context: &mut ContextT, height: BlockNumber, shc: &mut SingleHeightConsensus, message: ConsensusMessage, - ) -> Result - where - ContextT: ConsensusContext, - { - // TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints: + ) -> Result { + // TODO(matan): We need to figure out an actual caching strategy under 2 constraints: // 1. Malicious - must be capped so a malicious peer can't DoS us. // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). // In general I think we will want to only cache (H+1, 0) messages. @@ -221,6 +239,26 @@ impl MultiHeightManager { } } + // Checks if a cached proposal already exists + // - returns the proposal if it exists and removes it from the cache. + // - returns None if no proposal exists. + // - cleans up any proposals from earlier heights. + fn get_current_proposal( + &mut self, + height: BlockNumber, + ) -> Option<(ProposalInit, mpsc::Receiver)> { + loop { + let entry = self.cached_proposals.first_entry()?; + match entry.key().cmp(&height.0) { + std::cmp::Ordering::Greater => return None, + std::cmp::Ordering::Equal => return Some(entry.remove()), + std::cmp::Ordering::Less => { + entry.remove(); + } + } + } + } + // Filters the cached messages: // - returns all of the current height messages. // - drops messages from earlier heights. diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 4f03cc0a8a..eab0c706df 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -27,7 +27,7 @@ use starknet_types_core::felt::Felt; use super::{run_consensus, MultiHeightManager}; use crate::config::TimeoutsConfig; -use crate::test_utils::{precommit, prevote, proposal, proposal_init}; +use crate::test_utils::{precommit, prevote, proposal_init}; use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId}; lazy_static! { @@ -93,14 +93,15 @@ async fn send(sender: &mut MockBroadcastedMessagesSender, msg: async fn send_proposal( proposal_receiver_sender: &mut mpsc::Sender>, - content: ProposalPart, + content: Vec, ) { let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); proposal_receiver_sender.send(proposal_receiver).await.unwrap(); - proposal_sender.send(content).await.unwrap(); + for item in content { + proposal_sender.send(item).await.unwrap(); + } } -#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -108,14 +109,29 @@ async fn manager_multiple_heights_unordered() { let mut sender = mock_network.broadcasted_messages_sender; // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (_proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); // Send messages for height 2 followed by those for height 1. - send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; + send_proposal( + &mut proposal_receiver_sender, + vec![ + ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)), + ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }), + ], + ) + .await; send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; - send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; + send_proposal( + &mut proposal_receiver_sender, + vec![ + ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }), + ], + ) + .await; send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; @@ -178,7 +194,6 @@ async fn manager_multiple_heights_unordered() { assert_eq!(decision.block, BlockHash(Felt::TWO)); } -#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn run_consensus_sync() { // Set expectations. @@ -209,7 +224,7 @@ async fn run_consensus_sync() { // Send messages for height 2. send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))], ) .await; let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -304,7 +319,7 @@ async fn run_consensus_sync_cancellation_safety() { // Send a proposal for height 1. send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))], ) .await; proposal_handled_rx.await.unwrap(); @@ -336,7 +351,7 @@ async fn test_timeouts() { send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; @@ -390,7 +405,7 @@ async fn test_timeouts() { // reach a decision. send_proposal( &mut proposal_receiver_sender, - ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID)), + vec![ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))], ) .await; send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await; diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 66d128b117..9d4c718e9b 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -3,14 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use mockall::mock; -use papyrus_protobuf::consensus::{ - ConsensusMessage, - Proposal, - ProposalFin, - ProposalInit, - Vote, - VoteType, -}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -129,24 +122,6 @@ pub fn precommit( voter, }) } - -pub fn proposal( - block_felt: Felt, - height: u64, - round: u32, - proposer: ValidatorId, -) -> ConsensusMessage { - let block_hash = BlockHash(block_felt); - ConsensusMessage::Proposal(Proposal { - height, - block_hash, - round, - proposer, - transactions: Vec::new(), - valid_round: None, - }) -} - pub fn proposal_init(height: u64, round: u32, proposer: ValidatorId) -> ProposalInit { ProposalInit { height: BlockNumber(height), round, proposer, valid_round: None } }