diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index ae2f2abebd4..6ff6a7cd23d 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -54,7 +54,7 @@ pub enum StreamMessageBody { Fin, } -#[derive(Debug, Clone, Hash, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct StreamMessage> + TryFrom, Error = ProtobufConversionError>> { pub message: StreamMessageBody, pub stream_id: u64, @@ -62,7 +62,7 @@ pub struct StreamMessage> + TryFrom, Error = ProtobufCon } /// This message must be sent first when proposing a new block. -#[derive(Default, Debug, Clone, PartialEq)] +#[derive(Clone, Copy, Debug, Default, PartialEq)] pub struct ProposalInit { /// The height of the consensus (block number). pub height: BlockNumber, diff --git a/crates/papyrus_protobuf/src/converters/consensus_test.rs b/crates/papyrus_protobuf/src/converters/consensus_test.rs index 57e328a7fac..422c6a98107 100644 --- a/crates/papyrus_protobuf/src/converters/consensus_test.rs +++ b/crates/papyrus_protobuf/src/converters/consensus_test.rs @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() { let proposal_init = ProposalInit::get_test_instance(&mut rng); - let bytes_data: Vec = proposal_init.clone().into(); + let bytes_data: Vec = proposal_init.into(); let res_data = ProposalInit::try_from(bytes_data).unwrap(); assert_eq!(proposal_init, res_data); } diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 8853cec317f..ad3104d713c 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -56,9 +56,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; @@ -123,7 +121,7 @@ async fn manager_multiple_heights_unordered() { // Run the manager for height 1. context .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { + .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send(( @@ -155,7 +153,7 @@ async fn manager_multiple_heights_unordered() { // Run the manager for height 2. context .expect_validate_proposal() - .return_once(move |_, _, _, _, _| { + .return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send(( @@ -188,7 +186,7 @@ async fn run_consensus_sync() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _, _| { + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) })) @@ -258,7 +256,7 @@ async fn run_consensus_sync_cancellation_safety() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _, _| { + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) @@ -346,7 +344,7 @@ async fn test_timeouts() { let mut context = MockTestContext::new(); context.expect_set_height_and_round().returning(move |_, _| ()); - context.expect_validate_proposal().returning(move |_, _, _, _, _| { + context.expect_validate_proposal().returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index 4bbd82b1066..c96b5c5046f 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -226,13 +226,7 @@ impl SingleHeightConsensus { // twice in parallel. This could be caused by a network repeat or a malicious spam attack. proposal_entry.insert(None); let block_receiver = context - .validate_proposal( - self.height, - init.round, - init.proposer, - self.timeouts.proposal_timeout, - p2p_messages_receiver, - ) + .validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver) .await; context.set_height_and_round(self.height, self.state_machine.round()).await; Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)])) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 0340568b1d3..266312cac5e 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -71,7 +71,7 @@ async fn handle_proposal( let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(MockProposalPart(1)).await.unwrap(); - shc.handle_proposal(context, PROPOSAL_INIT.clone(), content_receiver).await.unwrap() + shc.handle_proposal(context, *PROPOSAL_INIT, content_receiver).await.unwrap() } #[tokio::test] @@ -171,7 +171,7 @@ async fn validator(repeat_proposal: bool) { ); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver @@ -250,7 +250,7 @@ async fn vote_twice(same_vote: bool) { ); context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 45dcb09fa81..c9ef9ac81b8 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -76,9 +76,7 @@ mock! { async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index becc800fd95..cb7ce03274c 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -75,9 +75,7 @@ pub trait ConsensusContext { /// by ConsensusContext. async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - proposer: ValidatorId, + init: ProposalInit, timeout: Duration, content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 87ff7fc4211..b3b1b9a80d0 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -129,7 +129,7 @@ impl ConsensusContext for PapyrusConsensusContext { .await .expect("Failed to send proposal receiver"); proposal_sender - .send(Self::ProposalPart::Init(proposal_init.clone())) + .send(Self::ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); proposal_sender @@ -164,9 +164,7 @@ impl ConsensusContext for PapyrusConsensusContext { async fn validate_proposal( &mut self, - height: BlockNumber, - _round: Round, - _proposer: ValidatorId, + proposal_init: ProposalInit, _timeout: Duration, mut content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { @@ -179,14 +177,19 @@ impl ConsensusContext for PapyrusConsensusContext { // TODO(dvir): consider fix this for the case of reverts. If between the check that // the block in storage and to getting the transaction was a revert // this flow will fail. - wait_for_block(&storage_reader, height).await.expect("Failed to wait to block"); + wait_for_block(&storage_reader, proposal_init.height) + .await + .expect("Failed to wait to block"); let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); let transactions = txn - .get_block_transactions(height) + .get_block_transactions(proposal_init.height) .expect("Get transactions from storage failed") .unwrap_or_else(|| { - panic!("Block in {height} was not found in storage despite waiting for it") + panic!( + "Block in {} was not found in storage despite waiting for it", + proposal_init.height + ) }); // First gather all the non-fin transactions. @@ -220,10 +223,13 @@ impl ConsensusContext for PapyrusConsensusContext { ); let block_hash = txn - .get_block_header(height) + .get_block_header(proposal_init.height) .expect("Get header from storage failed") .unwrap_or_else(|| { - panic!("Block in {height} was not found in storage despite waiting for it") + panic!( + "Block in {} was not found in storage despite waiting for it", + proposal_init.height + ) }) .block_hash; @@ -231,14 +237,14 @@ impl ConsensusContext for PapyrusConsensusContext { .lock() .expect("Lock on active proposals was poisoned due to a previous panic"); - proposals.entry(height).or_default().insert(block_hash, transactions); + proposals.entry(proposal_init.height).or_default().insert(block_hash, transactions); // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. // This can happen as a result of sync interrupting `run_height`. fin_sender .send((block_hash, ProposalFin { proposal_content_id: received_block_hash })) .unwrap_or_else(|_| { - warn!("Failed to send block to consensus. height={height}"); + warn!("Failed to send block to consensus. height={}", proposal_init.height); }) } .instrument(debug_span!("consensus_validate_proposal")), diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index 0b6aa4feb42..aad593e2fe4 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -68,9 +68,12 @@ async fn validate_proposal_success() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::default(), + ProposalInit { + height: block_number, + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, Duration::MAX, validate_receiver, ) @@ -99,9 +102,12 @@ async fn validate_proposal_fail() { let fin = papyrus_context .validate_proposal( - block_number, - 0, - ValidatorId::default(), + ProposalInit { + height: block_number, + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, Duration::MAX, validate_receiver, ) diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 11fc4b7b337..60377d0314f 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -192,7 +192,7 @@ impl ConsensusContext for SequencerConsensusContext { .await .expect("Failed to send proposal receiver"); proposal_sender - .send(ProposalPart::Init(proposal_init.clone())) + .send(ProposalPart::Init(proposal_init)) .await .expect("Failed to send proposal init"); tokio::spawn( @@ -217,25 +217,28 @@ impl ConsensusContext for SequencerConsensusContext { // That part is consumed by the caller, so it can know the height/round. async fn validate_proposal( &mut self, - height: BlockNumber, - round: Round, - validator: ValidatorId, + proposal_init: ProposalInit, timeout: Duration, content_receiver: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { - assert_eq!(Some(height), self.current_height); + assert_eq!(Some(proposal_init.height), self.current_height); let (fin_sender, fin_receiver) = oneshot::channel(); - match round.cmp(&self.current_round) { + match proposal_init.round.cmp(&self.current_round) { std::cmp::Ordering::Less => fin_receiver, std::cmp::Ordering::Greater => { - self.queued_proposals - .insert(round, ((height, validator, timeout, content_receiver), fin_sender)); + self.queued_proposals.insert( + proposal_init.round, + ( + (proposal_init.height, proposal_init.proposer, timeout, content_receiver), + fin_sender, + ), + ); fin_receiver } std::cmp::Ordering::Equal => { self.validate_current_round_proposal( - height, - validator, + proposal_init.height, + proposal_init.proposer, timeout, content_receiver, fin_sender, diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 3fb1a632c6b..87677dfbbda 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -208,7 +208,16 @@ async fn validate_proposal_success() { .await .unwrap(); let fin_receiver = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); @@ -269,7 +278,16 @@ async fn repropose() { }); content_sender.send(prop_part).await.unwrap(); let fin_receiver = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); @@ -351,7 +369,16 @@ async fn proposals_from_different_rounds() { content_sender.send(prop_part_txs.clone()).await.unwrap(); let fin_receiver_past_round = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; // No fin was sent, channel remains open. assert!(fin_receiver_past_round.await.is_err()); @@ -361,7 +388,16 @@ async fn proposals_from_different_rounds() { content_sender.send(prop_part_txs.clone()).await.unwrap(); content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_curr_round = context - .validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 1, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); @@ -370,7 +406,16 @@ async fn proposals_from_different_rounds() { content_sender.send(prop_part_txs.clone()).await.unwrap(); content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_future_round = context - .validate_proposal(BlockNumber(0), 2, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 2, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; content_sender.close_channel(); // Even with sending fin and closing the channel. @@ -439,7 +484,16 @@ async fn interrupt_active_proposal() { // without needing interrupt. let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE); let fin_receiver_0 = context - .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE); @@ -457,7 +511,16 @@ async fn interrupt_active_proposal() { .await .unwrap(); let fin_receiver_1 = context - .validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver) + .validate_proposal( + ProposalInit { + height: BlockNumber(0), + round: 0, + valid_round: None, + proposer: ValidatorId::default(), + }, + TIMEOUT, + content_receiver, + ) .await; // Move the context to the next round. context.set_height_and_round(BlockNumber(0), 1).await;