From ca6a7be1a2c836a3c09f3277741406eaee9ac20f Mon Sep 17 00:00:00 2001 From: Arni Hod Date: Mon, 2 Dec 2024 09:56:10 +0200 Subject: [PATCH] chore(starknet_consensus_manager): add proposal init into validate proposal input --- crates/papyrus_protobuf/src/consensus.rs | 4 +- .../src/converters/consensus_test.rs | 2 +- .../papyrus_consensus/src/manager_test.rs | 8 +-- .../src/single_height_consensus.rs | 8 +-- .../src/single_height_consensus_test.rs | 6 +- .../papyrus_consensus/src/test_utils.rs | 4 +- .../sequencing/papyrus_consensus/src/types.rs | 4 +- .../src/papyrus_consensus_context.rs | 28 +++++---- .../src/papyrus_consensus_context_test.rs | 18 ++++-- .../src/sequencer_consensus_context.rs | 23 ++++--- .../src/sequencer_consensus_context_test.rs | 63 ++++++++++++------- 11 files changed, 96 insertions(+), 72 deletions(-) diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index 2a092e6fed4..726c69b5fa4 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -52,7 +52,7 @@ pub enum StreamMessageBody { Fin, } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct StreamMessage> + TryFrom, Error = ProtobufConversionError>> { pub message: StreamMessageBody, pub stream_id: u64, @@ -60,7 +60,7 @@ pub struct StreamMessage> + TryFrom, Error = ProtobufCon } /// This message must be sent first when proposing a new block. -#[derive(Default, Debug, Clone, PartialEq)] +#[derive(Clone, Copy, Debug, Default, PartialEq)] pub struct ProposalInit { /// The height of the consensus (block number). pub height: BlockNumber, diff --git a/crates/papyrus_protobuf/src/converters/consensus_test.rs b/crates/papyrus_protobuf/src/converters/consensus_test.rs index 57e328a7fac..422c6a98107 100644 --- a/crates/papyrus_protobuf/src/converters/consensus_test.rs +++ b/crates/papyrus_protobuf/src/converters/consensus_test.rs @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() { let proposal_init = ProposalInit::get_test_instance(&mut rng); - let bytes_data: Vec = proposal_init.clone().into(); + let bytes_data: Vec = proposal_init.into(); let res_data = ProposalInit::try_from(bytes_data).unwrap(); assert_eq!(proposal_init, res_data); } diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index e186c23f0e1..a78260b853d 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -63,9 +63,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; @@ -112,7 +110,7 @@ async fn send_proposal( fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) { context .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { + .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send(( @@ -352,7 +350,7 @@ async fn test_timeouts() { let mut context = MockTestContext::new(); context.expect_set_height_and_round().returning(move |_, _| ()); - context.expect_validate_proposal().returning(move |_, _, _, _, _| { + context.expect_validate_proposal().returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index b6512adc4f1..30ab3e99875 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -227,13 +227,7 @@ impl SingleHeightConsensus { // twice in parallel. This could be caused by a network repeat or a malicious spam attack. proposal_entry.insert(None); let block_receiver = context - .validate_proposal( - self.height, - init.round, - init.proposer, - self.timeouts.proposal_timeout, - p2p_messages_receiver, - ) + .validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver) .await; context.set_height_and_round(self.height, self.state_machine.round()).await; Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)])) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index ee5f2cd0627..f3248bc5567 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -71,7 +71,7 @@ async fn handle_proposal( let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(MockProposalPart(1)).await.unwrap(); - shc.handle_proposal(context, PROPOSAL_INIT.clone(), content_receiver).await.unwrap() + shc.handle_proposal(context, *PROPOSAL_INIT, content_receiver).await.unwrap() } #[tokio::test] @@ -173,7 +173,7 @@ async fn validator(repeat_proposal: bool) { ); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver @@ -253,7 +253,7 @@ async fn vote_twice(same_vote: bool) { ); context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 32ea37b75ff..518d03b1b79 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -75,9 +75,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 797100b6f4d..dc561f435f8 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -79,9 +79,7 @@ pub trait ConsensusContext { /// by ConsensusContext. async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 9a34d1af07c..770b5061b36 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -133,7 +133,7 @@ impl ConsensusContext for PapyrusConsensusContext { .await .expect("Failed to send proposal receiver"); proposal_sender - .send(Self::ProposalPart::Init(proposal_init.clone())) + .send(Self::ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); proposal_sender @@ -168,9 +168,7 @@ impl ConsensusContext for PapyrusConsensusContext { async fn validate_proposal( &mut self, - height: BlockNumber, - _round: Round, - _proposer: ValidatorId, + proposal_init: ProposalInit, _timeout: Duration, mut content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { @@ -183,14 +181,19 @@ impl ConsensusContext for PapyrusConsensusContext { // TODO(dvir): consider fix this for the case of reverts. If between the check that // the block in storage and to getting the transaction was a revert // this flow will fail. - wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); + wait_for_block(&storage_reader, proposal_init.height) + .await + .expect("Failed to wait to block"); let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); let transactions = txn - .get_block_transactions(height) + .get_block_transactions(proposal_init.height) .expect("Get transactions from storage failed") .unwrap_or_else(|| { - panic!("Block in {height} was not found in storage despite waiting for it") + panic!( + "Block in {} was not found in storage despite waiting for it", + proposal_init.height + ) }); // First gather all the non-fin transactions. @@ -224,10 +227,13 @@ impl ConsensusContext for PapyrusConsensusContext { ); let block_hash = txn - .get_block_header(height) + .get_block_header(proposal_init.height) .expect("Get header from storage failed") .unwrap_or_else(|| { - panic!("Block in {height} was not found in storage despite waiting for it") + panic!( + "Block in {} was not found in storage despite waiting for it", + proposal_init.height + ) }) .block_hash; @@ -235,14 +241,14 @@ impl ConsensusContext for PapyrusConsensusContext { .lock() .expect("Lock on active proposals was poisoned due to a previous panic"); - proposals.entry(height).or_default().insert(block_hash, transactions); + proposals.entry(proposal_init.height).or_default().insert(block_hash, transactions); // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. // This can happen as a result of sync interrupting `run_height`. fin_sender .send((block_hash, ProposalFin { proposal_content_id: received_block_hash })) .unwrap_or_else(|_| { - warn!("Failed to send block to consensus. height={height}"); + warn!("Failed to send block to consensus. height={}", proposal_init.height); }) } .instrument(debug_span!("consensus_validate_proposal")), diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index dc83ee01c75..b27cbf77eac 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -68,9 +68,12 @@ async fn validate_proposal_success() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: block_number, + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, Duration::MAX, validate_receiver, ) @@ -99,9 +102,12 @@ async fn validate_proposal_fail() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: block_number, + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, Duration::MAX, validate_receiver, ) diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 293d53414e3..c49a61b5a32 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -195,7 +195,7 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to send proposal receiver"); proposal_sender - .send(ProposalPart::Init(proposal_init.clone())) + .send(ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); tokio::spawn( @@ -220,25 +220,28 @@ impl ConsensusContext for SequencerConsensusContext { // That part is consumed by the caller, so it can know the height/round. async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - validator: ValidatorId, + proposal_init: ProposalInit, timeout: Duration, content_receiver: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { - assert_eq!(Some(height), self.current_height); + assert_eq!(Some(proposal_init.height), self.current_height); let (fin_sender, fin_receiver) = oneshot::channel(); - match round.cmp(&self.current_round) { + match proposal_init.round.cmp(&self.current_round) { std::cmp::Ordering::Less => fin_receiver, std::cmp::Ordering::Greater => { - self.queued_proposals - .insert(round, ((height, validator, timeout, content_receiver), fin_sender)); + self.queued_proposals.insert( + proposal_init.round, + ( + (proposal_init.height, proposal_init.proposer, timeout, content_receiver), + fin_sender, + ), + ); fin_receiver } std::cmp::Ordering::Equal => { self.validate_current_round_proposal( - height, - validator, + proposal_init.height, + proposal_init.proposer, timeout, content_receiver, fin_sender, diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 085cfed39dc..fae7c8b3a6f 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -197,9 +197,12 @@ async fn validate_proposal_success() { .unwrap(); let fin_receiver = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -255,9 +258,12 @@ async fn repropose() { .unwrap(); let fin_receiver = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -330,9 +336,12 @@ async fn proposals_from_different_rounds() { let fin_receiver_past_round = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -346,9 +355,12 @@ async fn proposals_from_different_rounds() { content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_curr_round = context .validate_proposal( - BlockNumber(0), - 1, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 1, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -361,9 +373,12 @@ async fn proposals_from_different_rounds() { content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_future_round = context .validate_proposal( - BlockNumber(0), - 2, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 2, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -423,9 +438,12 @@ async fn interrupt_active_proposal() { let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE); let fin_receiver_0 = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -447,9 +465,12 @@ async fn interrupt_active_proposal() { .unwrap(); let fin_receiver_1 = context .validate_proposal( - BlockNumber(0), - 1, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 1, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, )