diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index 5afa4522fc..f8ee721597 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -52,7 +52,7 @@ pub enum StreamMessageBody { Fin, } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct StreamMessage> + TryFrom, Error = ProtobufConversionError>> { pub message: StreamMessageBody, pub stream_id: u64, @@ -60,7 +60,7 @@ pub struct StreamMessage> + TryFrom, Error = ProtobufCon } /// This message must be sent first when proposing a new block. -#[derive(Default, Debug, Clone, PartialEq)] +#[derive(Clone, Copy, Debug, Default, PartialEq)] pub struct ProposalInit { /// The height of the consensus (block number). pub height: BlockNumber, diff --git a/crates/papyrus_protobuf/src/converters/consensus_test.rs b/crates/papyrus_protobuf/src/converters/consensus_test.rs index 57e328a7fa..422c6a9810 100644 --- a/crates/papyrus_protobuf/src/converters/consensus_test.rs +++ b/crates/papyrus_protobuf/src/converters/consensus_test.rs @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() { let proposal_init = ProposalInit::get_test_instance(&mut rng); - let bytes_data: Vec = proposal_init.clone().into(); + let bytes_data: Vec = proposal_init.into(); let res_data = ProposalInit::try_from(bytes_data).unwrap(); assert_eq!(proposal_init, res_data); } diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index e6ae664568..498a24afc6 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -67,9 +67,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; @@ -116,7 +114,7 @@ async fn send_proposal( fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) { context .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { + .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send(( @@ -368,7 +366,7 @@ async fn test_timeouts() { let mut context = MockTestContext::new(); context.expect_set_height_and_round().returning(move |_, _| ()); - context.expect_validate_proposal().returning(move |_, _, _, _, _| { + context.expect_validate_proposal().returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index b6512adc4f..30ab3e9987 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -227,13 +227,7 @@ impl SingleHeightConsensus { // twice in parallel. This could be caused by a network repeat or a malicious spam attack. proposal_entry.insert(None); let block_receiver = context - .validate_proposal( - self.height, - init.round, - init.proposer, - self.timeouts.proposal_timeout, - p2p_messages_receiver, - ) + .validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver) .await; context.set_height_and_round(self.height, self.state_machine.round()).await; Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)])) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index ee5f2cd062..f3248bc556 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -71,7 +71,7 @@ async fn handle_proposal( let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(MockProposalPart(1)).await.unwrap(); - shc.handle_proposal(context, PROPOSAL_INIT.clone(), content_receiver).await.unwrap() + shc.handle_proposal(context, *PROPOSAL_INIT, content_receiver).await.unwrap() } #[tokio::test] @@ -173,7 +173,7 @@ async fn validator(repeat_proposal: bool) { ); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver @@ -253,7 +253,7 @@ async fn vote_twice(same_vote: bool) { ); context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 8ee68c1427..126156423c 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -74,9 +74,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 561aabd5e0..83e9b24914 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -77,9 +77,7 @@ pub trait ConsensusContext { /// by ConsensusContext. async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 5581abac18..a70e0d631f 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -89,6 +89,7 @@ impl ConsensusContext for PapyrusConsensusContext { proposal_init: ProposalInit, _timeout: Duration, ) -> oneshot::Receiver { + let height = proposal_init.height; let mut proposal_sender_sender = self.network_proposal_sender.clone(); let (fin_sender, fin_receiver) = oneshot::channel(); @@ -99,40 +100,32 @@ impl ConsensusContext for PapyrusConsensusContext { // TODO(dvir): consider fix this for the case of reverts. If between the check that // the block in storage and to getting the transaction was a revert // this flow will fail. - wait_for_block(&storage_reader, proposal_init.height) - .await - .expect("Failed to wait to block"); + wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); let transactions = txn - .get_block_transactions(proposal_init.height) + .get_block_transactions(height) .expect("Get transactions from storage failed") .unwrap_or_else(|| { - panic!( - "Block in {} was not found in storage despite waiting for it", - proposal_init.height - ) + panic!("Block in {height} was not found in storage despite waiting for it") }); let block_hash = txn - .get_block_header(proposal_init.height) + .get_block_header(height) .expect("Get header from storage failed") .unwrap_or_else(|| { - panic!( - "Block in {} was not found in storage despite waiting for it", - proposal_init.height - ) + panic!("Block in {height} was not found in storage despite waiting for it") }) .block_hash; let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); - let stream_id = proposal_init.height.0; + let stream_id = height.0; proposal_sender_sender .send((stream_id, proposal_receiver)) .await .expect("Failed to send proposal receiver"); proposal_sender - .send(Self::ProposalPart::Init(proposal_init.clone())) + .send(Self::ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); proposal_sender @@ -150,10 +143,7 @@ impl ConsensusContext for PapyrusConsensusContext { let mut proposals = valid_proposals .lock() .expect("Lock on active proposals was poisoned due to a previous panic"); - proposals - .entry(proposal_init.height) - .or_default() - .insert(block_hash, transactions); + proposals.entry(height).or_default().insert(block_hash, transactions); } // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. @@ -167,12 +157,11 @@ impl ConsensusContext for PapyrusConsensusContext { async fn validate_proposal( &mut self, - height: BlockNumber, - _round: Round, - _proposer: ValidatorId, + proposal_init: ProposalInit, _timeout: Duration, mut content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { + let height = proposal_init.height; let (fin_sender, fin_receiver) = oneshot::channel(); let storage_reader = self.storage_reader.clone(); @@ -251,18 +240,19 @@ impl ConsensusContext for PapyrusConsensusContext { } async fn repropose(&mut self, id: ProposalContentId, init: ProposalInit) { + let height = init.height; let transactions = self .valid_proposals .lock() .expect("valid_proposals lock was poisoned") - .get(&init.height) - .unwrap_or_else(|| panic!("No proposals found for height {}", init.height)) + .get(&height) + .unwrap_or_else(|| panic!("No proposals found for height {height}")) .get(&id) - .unwrap_or_else(|| panic!("No proposal found for height {} and id {}", init.height, id)) + .unwrap_or_else(|| panic!("No proposal found for height {height} and id {id}")) .clone(); let proposal = Proposal { - height: init.height.0, + height: height.0, round: init.round, proposer: init.proposer, transactions, diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index dc83ee01c7..b27cbf77ea 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -68,9 +68,12 @@ async fn validate_proposal_success() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: block_number, + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, Duration::MAX, validate_receiver, ) @@ -99,9 +102,12 @@ async fn validate_proposal_fail() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: block_number, + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, Duration::MAX, validate_receiver, ) diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 92e45cd85d..0aa70cd788 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -192,7 +192,7 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to send proposal receiver"); proposal_sender - .send(ProposalPart::Init(proposal_init.clone())) + .send(ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); tokio::spawn( @@ -217,25 +217,28 @@ impl ConsensusContext for SequencerConsensusContext { // That part is consumed by the caller, so it can know the height/round. async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - validator: ValidatorId, + proposal_init: ProposalInit, timeout: Duration, content_receiver: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { - assert_eq!(Some(height), self.current_height); + assert_eq!(Some(proposal_init.height), self.current_height); let (fin_sender, fin_receiver) = oneshot::channel(); - match round.cmp(&self.current_round) { + match proposal_init.round.cmp(&self.current_round) { std::cmp::Ordering::Less => fin_receiver, std::cmp::Ordering::Greater => { - self.queued_proposals - .insert(round, ((height, validator, timeout, content_receiver), fin_sender)); + self.queued_proposals.insert( + proposal_init.round, + ( + (proposal_init.height, proposal_init.proposer, timeout, content_receiver), + fin_sender, + ), + ); fin_receiver } std::cmp::Ordering::Equal => { self.validate_current_round_proposal( - height, - validator, + proposal_init.height, + proposal_init.proposer, timeout, content_receiver, fin_sender, diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 085cfed39d..fae7c8b3a6 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -197,9 +197,12 @@ async fn validate_proposal_success() { .unwrap(); let fin_receiver = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -255,9 +258,12 @@ async fn repropose() { .unwrap(); let fin_receiver = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -330,9 +336,12 @@ async fn proposals_from_different_rounds() { let fin_receiver_past_round = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -346,9 +355,12 @@ async fn proposals_from_different_rounds() { content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_curr_round = context .validate_proposal( - BlockNumber(0), - 1, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 1, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -361,9 +373,12 @@ async fn proposals_from_different_rounds() { content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_future_round = context .validate_proposal( - BlockNumber(0), - 2, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 2, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -423,9 +438,12 @@ async fn interrupt_active_proposal() { let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE); let fin_receiver_0 = context .validate_proposal( - BlockNumber(0), - 0, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 0, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, ) @@ -447,9 +465,12 @@ async fn interrupt_active_proposal() { .unwrap(); let fin_receiver_1 = context .validate_proposal( - BlockNumber(0), - 1, - ValidatorId::from(DEFAULT_VALIDATOR_ID), + ProposalInit { + height: BlockNumber(0), + round: 1, + proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID), + ..Default::default() + }, TIMEOUT, content_receiver, )