From f515d0cd314c4d72dd1d74cbc3d25306cfea1618 Mon Sep 17 00:00:00 2001 From: Guy Nir Date: Wed, 27 Nov 2024 12:08:50 +0200 Subject: [PATCH] feat(consensus): add streamed validation --- .../papyrus_consensus/src/manager.rs | 67 ++++++++------ .../papyrus_consensus/src/manager_test.rs | 53 +++++++---- .../src/single_height_consensus.rs | 50 ++++------- .../src/single_height_consensus_test.rs | 19 ++-- .../papyrus_consensus/src/test_utils.rs | 72 +++++++++------ .../sequencing/papyrus_consensus/src/types.rs | 4 +- .../src/papyrus_consensus_context.rs | 41 ++++++--- .../src/papyrus_consensus_context_test.rs | 18 +++- .../src/sequencer_consensus_context.rs | 89 ++++++++++++------- .../src/sequencer_consensus_context_test.rs | 28 ++++-- crates/starknet_api/src/transaction.rs | 2 +- 11 files changed, 277 insertions(+), 166 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index fe5ad992da..488f706697 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -7,14 +7,14 @@ mod manager_test; use std::collections::BTreeMap; use std::time::Duration; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt}; use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::BroadcastTopicClientTrait; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper}; -use starknet_api::block::{BlockHash, BlockNumber}; -use tracing::{debug, info, instrument}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; +use starknet_api::block::BlockNumber; +use tracing::{debug, info, instrument, warn}; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus}; @@ -42,8 +42,7 @@ pub async fn run_consensus( ) -> Result<(), ConsensusError> where ContextT: ConsensusContext, - ProposalWrapper: - Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, + ::ProposalPart: std::fmt::Debug, SyncReceiverT: Stream + Unpin, { info!( @@ -116,11 +115,7 @@ impl MultiHeightManager { ) -> Result where ContextT: ConsensusContext, - ProposalWrapper: Into<( - ProposalInit, - mpsc::Receiver, - oneshot::Receiver, - )>, + ::ProposalPart: std::fmt::Debug, { let validators = context.validators(height).await; info!("running consensus for height {height:?} with validator set {validators:?}"); @@ -147,6 +142,17 @@ impl MultiHeightManager { message = next_message(&mut current_height_messages, broadcast_channels) => { self.handle_message(context, height, &mut shc, message?).await? }, + Some(mut content_receiver) = proposal_receiver.next() => { + // Get the first message to verify the init was sent. + // TODO(guyn): what happens if the channel never sends anything? + let Some(first_part) = content_receiver.next().await else { + return Err(ConsensusError::InternalNetworkError( + "Proposal receiver closed".to_string(), + )); + }; + let proposal_init: ProposalInit = first_part.try_into()?; + self.handle_proposal(context, height, &mut shc, proposal_init, content_receiver).await? + }, Some(shc_event) = shc_events.next() => { shc.handle_event(context, shc_event).await? }, @@ -163,6 +169,27 @@ impl MultiHeightManager { } } + // Handle a new proposal receiver from the network. + async fn handle_proposal( + &mut self, + context: &mut ContextT, + height: BlockNumber, + shc: &mut SingleHeightConsensus, + proposal_init: ProposalInit, + content_receiver: mpsc::Receiver, + ) -> Result + where + ContextT: ConsensusContext, + ::ProposalPart: std::fmt::Debug, + { + // TODO(guyn): what is the right thing to do if proposal's height doesn't match? + if proposal_init.height != height { + // TODO(guyn): add caching of heights for future use. + warn!("Received a proposal for a different height. {:?}", proposal_init); + } + shc.handle_proposal(context, proposal_init.into(), content_receiver).await + } + // Handle a single consensus message. async fn handle_message( &mut self, @@ -173,11 +200,6 @@ impl MultiHeightManager { ) -> Result where ContextT: ConsensusContext, - ProposalWrapper: Into<( - ProposalInit, - mpsc::Receiver, - oneshot::Receiver, - )>, { // TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints: // 1. Malicious - must be capped so a malicious peer can't DoS us. @@ -191,16 +213,9 @@ impl MultiHeightManager { return Ok(ShcReturn::Tasks(Vec::new())); } match message { - ConsensusMessage::Proposal(proposal) => { - // Special case due to fake streaming. - // TODO(guyn): this will be gone once we integrate the proposal channels. - let (proposal_init, content_receiver, fin_receiver) = - ProposalWrapper(proposal).into(); - let res = shc - .handle_proposal(context, proposal_init, content_receiver, fin_receiver) - .await?; - Ok(res) - } + ConsensusMessage::Proposal(_) => Err(ConsensusError::InternalNetworkError( + "Proposal variant of ConsensusMessage no longer supported".to_string(), + )), _ => { let res = shc.handle_message(context, message).await?; Ok(res) diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 374d35bd88..44fd3e81f0 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -21,7 +21,7 @@ use starknet_types_core::felt::Felt; use super::{run_consensus, MultiHeightManager}; use crate::config::TimeoutsConfig; -use crate::test_utils::{precommit, prevote, proposal}; +use crate::test_utils::{precommit, prevote, proposal_fin, proposal_init}; use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId}; lazy_static! { @@ -52,8 +52,8 @@ mock! { &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver - ) -> oneshot::Receiver; + content: mpsc::Receiver + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>; async fn repropose( &mut self, @@ -81,6 +81,7 @@ async fn send(sender: &mut MockBroadcastedMessagesSender, msg: sender.send((msg, broadcasted_message_metadata)).await.unwrap(); } +#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -88,14 +89,21 @@ async fn manager_multiple_heights_unordered() { let mut sender = mock_network.broadcasted_messages_sender; // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); // Send messages for height 2 followed by those for height 1. - send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + proposal_receiver_sender.send(proposal_receiver).await.unwrap(); + proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap(); + proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::TWO))).await.unwrap(); send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; - send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; + + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + proposal_receiver_sender.send(proposal_receiver).await.unwrap(); + proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap(); + proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::ONE))).await.unwrap(); send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; @@ -105,7 +113,7 @@ async fn manager_multiple_heights_unordered() { .expect_validate_proposal() .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::ONE)).unwrap(); + block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap(); block_receiver }) .times(1); @@ -131,7 +139,7 @@ async fn manager_multiple_heights_unordered() { .expect_validate_proposal() .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::TWO)).unwrap(); + block_sender.send((BlockHash(Felt::TWO), BlockHash(Felt::TWO))).unwrap(); block_receiver }) .times(1); @@ -147,6 +155,7 @@ async fn manager_multiple_heights_unordered() { assert_eq!(decision.block, BlockHash(Felt::TWO)); } +#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn run_consensus_sync() { // Set expectations. @@ -154,11 +163,11 @@ async fn run_consensus_sync() { let (decision_tx, decision_rx) = oneshot::channel(); // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::TWO)).unwrap(); + block_sender.send((BlockHash(Felt::TWO), BlockHash(Felt::TWO))).unwrap(); block_receiver }); context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); @@ -175,7 +184,9 @@ async fn run_consensus_sync() { let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut network_sender = mock_network.broadcasted_messages_sender; - send(&mut network_sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + proposal_receiver_sender.send(proposal_receiver).await.unwrap(); + proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap(); send(&mut network_sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; send(&mut network_sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; @@ -217,11 +228,11 @@ async fn run_consensus_sync_cancellation_safety() { let (decision_tx, decision_rx) = oneshot::channel(); // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::ONE)).unwrap(); + block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap(); block_receiver }); context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); @@ -260,7 +271,9 @@ async fn run_consensus_sync_cancellation_safety() { let mut network_sender = mock_network.broadcasted_messages_sender; // Send a proposal for height 1. - send(&mut network_sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + proposal_receiver_sender.send(proposal_receiver).await.unwrap(); + proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap(); proposal_handled_rx.await.unwrap(); // Send an old sync. This should not cancel the current height. @@ -285,10 +298,12 @@ async fn test_timeouts() { let mut sender = mock_network.broadcasted_messages_sender; // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + proposal_receiver_sender.send(proposal_receiver).await.unwrap(); + proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap(); send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await; send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_2)).await; @@ -297,7 +312,7 @@ async fn test_timeouts() { let mut context = MockTestContext::new(); context.expect_validate_proposal().returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::ONE)).unwrap(); + block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap(); block_receiver }); context @@ -335,7 +350,9 @@ async fn test_timeouts() { timeout_receive.await.unwrap(); // Show that after the timeout is triggered we can still precommit in favor of the block and // reach a decision. - send(&mut sender, proposal(Felt::ONE, 1, 1, *PROPOSER_ID)).await; + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + proposal_receiver_sender.send(proposal_receiver).await.unwrap(); + proposal_sender.send(ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))).await.unwrap(); send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await; send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_2)).await; send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_3)).await; diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index f2ee23c7d3..2a3ad54445 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -66,11 +66,7 @@ pub enum ShcTask { /// result without blocking consensus. /// 3. Once validation is complete, the manager returns the built proposal to the SHC as an /// event, which can be sent to the SM. - ValidateProposal( - ProposalInit, - oneshot::Receiver, // Block built from the content. - oneshot::Receiver, // Fin sent by the proposer. - ), + ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalContentId)>), } impl PartialEq for ShcTask { @@ -82,9 +78,7 @@ impl PartialEq for ShcTask { | (ShcTask::Prevote(d1, e1), ShcTask::Prevote(d2, e2)) | (ShcTask::Precommit(d1, e1), ShcTask::Precommit(d2, e2)) => d1 == d2 && e1 == e2, (ShcTask::BuildProposal(r1, _), ShcTask::BuildProposal(r2, _)) => r1 == r2, - (ShcTask::ValidateProposal(pi1, _, _), ShcTask::ValidateProposal(pi2, _, _)) => { - pi1 == pi2 - } + (ShcTask::ValidateProposal(pi1, _), ShcTask::ValidateProposal(pi2, _)) => pi1 == pi2, _ => false, } } @@ -118,24 +112,17 @@ impl ShcTask { let proposal_id = receiver.await.expect("Block building failed."); ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(proposal_id), round)) } - ShcTask::ValidateProposal( - init, - id_built_from_content_receiver, - fin_from_proposer_receiver, - ) => { - let proposal_id = match id_built_from_content_receiver.await { - Ok(proposal_id) => Some(proposal_id), + ShcTask::ValidateProposal(init, block_receiver) => { + let (block_proposal_id, network_proposal_id) = match block_receiver.await { + Ok((block_proposal_id, network_proposal_id)) => { + (Some(block_proposal_id), Some(network_proposal_id)) + } // Proposal never received from peer. - Err(_) => None, - }; - let fin = match fin_from_proposer_receiver.await { - Ok(fin) => Some(fin), - // ProposalFin never received from peer. - Err(_) => None, + Err(_) => (None, None), }; ShcEvent::ValidateProposal( - StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round), - fin, + StateMachineEvent::Proposal(block_proposal_id, init.round, init.valid_round), + network_proposal_id, ) } } @@ -206,8 +193,7 @@ impl SingleHeightConsensus { &mut self, context: &mut ContextT, init: ProposalInit, - p2p_messages_receiver: mpsc::Receiver, - fin_receiver: oneshot::Receiver, + p2p_messages_receiver: mpsc::Receiver, ) -> Result { debug!( "Received proposal: height={}, round={}, proposer={:?}", @@ -230,10 +216,10 @@ impl SingleHeightConsensus { // Since validating the proposal is non-blocking, we want to avoid validating the same round // twice in parallel. This could be caused by a network repeat or a malicious spam attack. proposal_entry.insert(None); - let block_receiver = context + let validation_receiver = context .validate_proposal(self.height, self.timeouts.proposal_timeout, p2p_messages_receiver) .await; - Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver, fin_receiver)])) + Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, validation_receiver)])) } async fn process_inbound_proposal( @@ -304,19 +290,19 @@ impl SingleHeightConsensus { )])) } ShcEvent::ValidateProposal( - StateMachineEvent::Proposal(proposal_id, round, valid_round), - fin, + StateMachineEvent::Proposal(block_proposal_id, round, valid_round), + network_proposal_id, ) => { // TODO(matan): Switch to signature validation. - let id = if proposal_id != fin { + let id = if block_proposal_id != network_proposal_id { warn!( "proposal_id built from content receiver does not match fin: {:#064x?} != \ {:#064x?}", - proposal_id, fin + block_proposal_id, network_proposal_id ); None } else { - proposal_id + block_proposal_id }; // Retaining the entry for this round prevents us from receiving another proposal on // this round. If the validations failed, which can be caused by a network issue, we diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index deb0d937f8..0cdef9c582 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -1,4 +1,5 @@ use futures::channel::{mpsc, oneshot}; +use futures::SinkExt; use lazy_static::lazy_static; use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; use starknet_api::block::{BlockHash, BlockNumber}; @@ -10,7 +11,7 @@ use super::SingleHeightConsensus; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcEvent, ShcReturn, ShcTask}; use crate::state_machine::StateMachineEvent; -use crate::test_utils::{precommit, prevote, MockTestContext, TestBlock}; +use crate::test_utils::{precommit, prevote, MockProposalPart, MockTestContext, TestBlock}; use crate::types::{ConsensusError, ValidatorId}; lazy_static! { @@ -34,6 +35,8 @@ lazy_static! { ); } +const CHANNEL_SIZE: usize = 1; + fn prevote_task(block_felt: Option, round: u32) -> ShcTask { ShcTask::Prevote( TIMEOUTS.prevote_timeout, @@ -64,14 +67,16 @@ async fn handle_proposal( context: &mut MockTestContext, ) -> ShcReturn { // Send the proposal from the peer. - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(BLOCK.id).unwrap(); + let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); + content_sender.send(MockProposalPart(1)).await.unwrap(); + // let (fin_sender, fin_receiver) = oneshot::channel(); + // fin_sender.send(BLOCK.id).unwrap(); shc.handle_proposal( context, PROPOSAL_INIT.clone(), - mpsc::channel(1).1, // content - ignored by SHC. - fin_receiver, + content_receiver, /* Note: we are only sending the fin, there's no actual content. + * mpsc::channel(1).1, // content - ignored by SHC. */ ) .await .unwrap() @@ -175,7 +180,7 @@ async fn validator(repeat_proposal: bool) { context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BLOCK.id).unwrap(); + block_sender.send((BLOCK.id, BLOCK.id)).unwrap(); block_receiver }); context @@ -253,7 +258,7 @@ async fn vote_twice(same_vote: bool) { context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BLOCK.id).unwrap(); + block_sender.send((BLOCK.id, BLOCK.id)).unwrap(); block_receiver }); context diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 980f5ee976..7bde14744a 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -3,14 +3,8 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use mockall::mock; -use papyrus_protobuf::consensus::{ - ConsensusMessage, - Proposal, - ProposalInit, - ProposalPart, - Vote, - VoteType, -}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType}; +use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -23,6 +17,41 @@ pub struct TestBlock { pub id: BlockHash, } +#[derive(Debug, PartialEq, Clone)] +pub struct MockProposalPart(pub u64); + +impl From for MockProposalPart { + fn from(init: ProposalInit) -> Self { + MockProposalPart(init.height.0) + } +} + +impl TryFrom for ProposalInit { + type Error = ProtobufConversionError; + fn try_from(part: MockProposalPart) -> Result { + Ok(ProposalInit { + height: BlockNumber(part.0 as u64), + round: 0, + proposer: ValidatorId::default(), + valid_round: None, + }) + } +} + +impl Into> for MockProposalPart { + fn into(self) -> Vec { + vec![self.0 as u8] + } +} + +impl TryFrom> for MockProposalPart { + type Error = ProtobufConversionError; + + fn try_from(value: Vec) -> Result { + Ok(MockProposalPart(value[0].into())) + } +} + // TODO(matan): When QSelf is supported, switch to automocking `ConsensusContext`. mock! { pub TestContext {} @@ -30,7 +59,7 @@ mock! { #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = u32; - type ProposalPart = ProposalPart; + type ProposalPart = MockProposalPart; async fn build_proposal( &mut self, @@ -42,8 +71,8 @@ mock! { &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver - ) -> oneshot::Receiver; + content: mpsc::Receiver + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>; async fn repropose( &mut self, @@ -91,19 +120,10 @@ pub fn precommit( }) } -pub fn proposal( - block_felt: Felt, - height: u64, - round: u32, - proposer: ValidatorId, -) -> ConsensusMessage { - let block_hash = BlockHash(block_felt); - ConsensusMessage::Proposal(Proposal { - height, - block_hash, - round, - proposer, - transactions: Vec::new(), - valid_round: None, - }) +pub fn proposal_init(height: u64, round: u32, proposer: ValidatorId) -> ProposalInit { + ProposalInit { height: BlockNumber(height), round, proposer, valid_round: None } +} + +pub fn proposal_fin(block_felt: Felt) -> ProposalFin { + ProposalFin { proposal_content_id: BlockHash(block_felt) } } diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 751cb4b4ed..1add4edc0b 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -75,8 +75,8 @@ pub trait ConsensusContext { &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver, - ) -> oneshot::Receiver; + content: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>; /// This function is called by consensus to retrieve the content of a previously built or /// validated proposal. It broadcasts the proposal to the network. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 401a661293..b1cc1ccebb 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -148,8 +148,8 @@ impl ConsensusContext for PapyrusConsensusContext { &mut self, height: BlockNumber, _timeout: Duration, - mut content: mpsc::Receiver, - ) -> oneshot::Receiver { + mut content: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)> { let (fin_sender, fin_receiver) = oneshot::channel(); let storage_reader = self.storage_reader.clone(); @@ -169,20 +169,37 @@ impl ConsensusContext for PapyrusConsensusContext { panic!("Block in {height} was not found in storage despite waiting for it") }); - for tx in transactions.iter() { - let received_tx = content - .next() - .await - .unwrap_or_else(|| panic!("Not received transaction equals to {tx:?}")); + // First gather all the non-fin transactions. + let mut content_transactions: Vec = Vec::new(); + let received_block_hash = loop { + match content.next().await { + Some(ProposalPart::Init(_)) => { + panic!("Should not have ProposalInit at this point"); + } + Some(ProposalPart::Transactions(batch)) => { + for tx in batch.transactions { + content_transactions.push(tx); + } + } + Some(ProposalPart::Fin(fin)) => { + break fin.proposal_content_id; + } + None => { + panic!("Did not receive a Fin message"); + } + } + }; + + // Check each transaction matches the transactions in the storage. + for tx in transactions.iter().rev() { + let received_tx = content_transactions + .pop() + .expect("Received less transactions than expected"); if tx != &received_tx { panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}"); } } - if content.next().await.is_some() { - panic!("Received more transactions than expected"); - } - let block_hash = txn .get_block_header(height) .expect("Get header from storage failed") @@ -199,7 +216,7 @@ impl ConsensusContext for PapyrusConsensusContext { // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. // This can happen as a result of sync interrupting `run_height`. - fin_sender.send(block_hash).unwrap_or_else(|_| { + fin_sender.send((block_hash, received_block_hash)).unwrap_or_else(|_| { warn!("Failed to send block to consensus. height={height}"); }) } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index c2367be02b..d98cecae96 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -12,9 +12,11 @@ use papyrus_network::network_manager::test_utils::{ use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::{ ConsensusMessage, + ProposalFin, ProposalInit, ProposalPart, StreamMessage, + TransactionBatch, Vote, }; use papyrus_storage::body::BodyStorageWriter; @@ -55,8 +57,14 @@ async fn validate_proposal_success() { let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); for tx in block.body.transactions.clone() { - validate_sender.try_send(tx).unwrap(); + let tx_part = ProposalPart::Transactions(TransactionBatch { + transactions: vec![tx], + tx_hashes: vec![], + }); + validate_sender.try_send(tx_part).unwrap(); } + let fin_part = ProposalPart::Fin(ProposalFin { proposal_content_id: block.header.block_hash }); + validate_sender.try_send(fin_part).unwrap(); validate_sender.close_channel(); let fin = papyrus_context @@ -65,7 +73,7 @@ async fn validate_proposal_success() { .await .unwrap(); - assert_eq!(fin, block.header.block_hash); + assert_eq!(fin.0, block.header.block_hash); } #[tokio::test] @@ -76,7 +84,11 @@ async fn validate_proposal_fail() { let different_block = get_test_block(4, None, None, None); let (mut validate_sender, validate_receiver) = mpsc::channel(5000); for tx in different_block.body.transactions.clone() { - validate_sender.try_send(tx).unwrap(); + let tx_part = ProposalPart::Transactions(TransactionBatch { + transactions: vec![tx], + tx_hashes: vec![], + }); + validate_sender.try_send(tx_part).unwrap(); } validate_sender.close_channel(); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 50278b5df3..23c89b09e4 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -29,7 +29,8 @@ use papyrus_protobuf::consensus::{ Vote, }; use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockNumber}; -use starknet_api::executable_transaction::Transaction; +use starknet_api::executable_transaction::Transaction as ExecutableTransaction; +use starknet_api::transaction::Transaction; use starknet_batcher_types::batcher_types::{ DecisionReachedInput, GetProposalContent, @@ -49,7 +50,7 @@ use tracing::{debug, debug_span, error, info, trace, warn, Instrument}; // Note that multiple proposals IDs can be associated with the same content, but we only need to // store one of them. type HeightToIdToContent = - BTreeMap, ProposalId)>>; + BTreeMap, ProposalId)>>; pub struct SequencerConsensusContext { batcher: Arc, @@ -157,8 +158,8 @@ impl ConsensusContext for SequencerConsensusContext { &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver, - ) -> oneshot::Receiver { + content_receiver: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)> { debug!("Validating proposal for height: {height} with timeout: {timeout:?}"); let (fin_sender, fin_receiver) = oneshot::channel(); let batcher = Arc::clone(&self.batcher); @@ -186,7 +187,7 @@ impl ConsensusContext for SequencerConsensusContext { proposal_id, batcher, valid_proposals, - content, + content_receiver, fin_sender, ); if let Err(e) = tokio::time::timeout(timeout, validate_fut).await { @@ -210,7 +211,7 @@ impl ConsensusContext for SequencerConsensusContext { .unwrap_or_else(|| panic!("No proposals found for height {height}")) .get(&id) .unwrap_or_else(|| panic!("No proposal found for height {height} and id {id}")); - // TODO: Stream the TXs to the network. + // TODO(guyn): Stream the TXs to the network. } async fn validators(&self, _height: BlockNumber) -> Vec { @@ -357,39 +358,59 @@ async fn stream_validate_proposal( proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, - mut content_receiver: mpsc::Receiver>, - fin_sender: oneshot::Sender, + mut content_receiver: mpsc::Receiver, + fin_sender: oneshot::Sender<(ProposalContentId, ProposalContentId)>, ) { let mut content = Vec::new(); - while let Some(txs) = content_receiver.next().await { - content.extend_from_slice(&txs[..]); - let input = - SendProposalContentInput { proposal_id, content: SendProposalContent::Txs(txs) }; - let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { - panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") - }); - match response.response { - ProposalStatus::Processing => {} - ProposalStatus::Finished(fin) => { - panic!("Batcher returned Fin before all content was sent: {proposal_id:?} {fin:?}"); + let mut network_block_id = BlockHash::default(); + while let Some(prop_part) = content_receiver.next().await { + match prop_part { + ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes }) => { + let exe_txs: Vec = txs + .into_iter() + .zip(tx_hashes.into_iter()) + .map(|tx_tup| tx_tup.into()) + .collect(); + content.extend_from_slice(&exe_txs[..]); + let input = SendProposalContentInput { + proposal_id, + content: SendProposalContent::Txs(exe_txs), + }; + let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { + panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") + }); + match response.response { + ProposalStatus::Processing => {} + ProposalStatus::Finished(fin) => { + panic!( + "Batcher returned Fin before all content was sent: {proposal_id:?} \ + {fin:?}" + ); + } + ProposalStatus::Aborted => { + panic!("Unexpected abort response for proposal: {:?}", proposal_id); + } + ProposalStatus::InvalidProposal => { + warn!("Proposal was invalid: {:?}", proposal_id); + return; + } + } } - ProposalStatus::Aborted => { - panic!("Unexpected abort response for proposal: {:?}", proposal_id); + ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => { + network_block_id = id; // Output this along with the ID from batcher, to compare them. + break; } - ProposalStatus::InvalidProposal => { - warn!("Proposal was invalid: {:?}", proposal_id); - return; + _ => { + panic!("Invalid proposal part: {:?}", prop_part); } } } - // TODO: In the future we will receive a Fin from the network instead of the channel closing. - // We will just send the network Fin out along with what the batcher calculates. let input = SendProposalContentInput { proposal_id, content: SendProposalContent::Finish }; let response = batcher .send_proposal_content(input) .await .unwrap_or_else(|e| panic!("Failed to send Fin to batcher: {proposal_id:?}. {e:?}")); - let id = match response.response { + let response_id = match response.response { ProposalStatus::Finished(id) => id, ProposalStatus::Processing => { panic!("Batcher failed to return Fin after all content was sent: {:?}", proposal_id); @@ -402,20 +423,22 @@ async fn stream_validate_proposal( return; } }; - let proposal_content_id = BlockHash(id.state_diff_commitment.0.0); + let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0); info!( - "Finished validating proposal {:?}: content_id = {:?}, num_txs = {:?}, height = {:?}", + "Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = {:?}, \ + num_txs = {:?}, height = {:?}", proposal_id, - proposal_content_id, + network_block_id, + batcher_block_id, content.len(), height ); // Update valid_proposals before sending fin to avoid a race condition // with `get_proposal` being called before `valid_proposals` is updated. let mut valid_proposals = valid_proposals.lock().unwrap(); - valid_proposals.entry(height).or_default().insert(proposal_content_id, (content, proposal_id)); - if fin_sender.send(proposal_content_id).is_err() { + valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id)); + if fin_sender.send((batcher_block_id, network_block_id)).is_err() { // Consensus may exit early (e.g. sync). - warn!("Failed to send proposal content id"); + warn!("Failed to send proposal content ids"); } } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 1b2ef6c171..2519c2c990 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -12,7 +12,7 @@ use papyrus_network::network_manager::test_utils::{ TestSubscriberChannels, }; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalInit; +use papyrus_protobuf::consensus::{ProposalInit, ProposalPart, TransactionBatch}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::{ContractAddress, StateDiffCommitment}; use starknet_api::executable_transaction::{AccountTransaction, Transaction}; @@ -170,10 +170,19 @@ async fn validate_proposal_success() { NUM_VALIDATORS, ); let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - content_sender.send(TX_BATCH.clone()).await.unwrap(); + let tx_hash = TX_BATCH.first().unwrap().tx_hash(); + let txs = + TX_BATCH.clone().into_iter().map(starknet_api::transaction::Transaction::from).collect(); + content_sender + .send(ProposalPart::Transactions(TransactionBatch { + transactions: txs, + tx_hashes: vec![tx_hash], + })) + .await + .unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), TIMEOUT, content_receiver).await; content_sender.close_channel(); - assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); + assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); } #[tokio::test] @@ -225,11 +234,18 @@ async fn repropose() { // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - let txs = vec![generate_invoke_tx(Felt::TWO)]; - content_sender.send(txs.clone()).await.unwrap(); + let txs = vec![generate_invoke_tx(Felt::TWO)] + .into_iter() + .map(starknet_api::transaction::Transaction::from) + .collect(); + let prop_part = ProposalPart::Transactions(TransactionBatch { + transactions: txs, + tx_hashes: vec![TransactionHash(Felt::ZERO)], + }); + content_sender.send(prop_part).await.unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), TIMEOUT, content_receiver).await; content_sender.close_channel(); - assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); + assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); // Re-proposal: Just asserts this is a known valid proposal. context diff --git a/crates/starknet_api/src/transaction.rs b/crates/starknet_api/src/transaction.rs index f2eea93dad..fede15d67e 100644 --- a/crates/starknet_api/src/transaction.rs +++ b/crates/starknet_api/src/transaction.rs @@ -147,7 +147,7 @@ impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Tra ), _ => { unimplemented!("Unsupported transaction type. Only Invoke is currently supported.") - }, + } } } }