diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 4f03cc0a8a..8853cec317 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -58,6 +58,7 @@ mock! { &mut self, height: BlockNumber, round: Round, + proposer: ValidatorId, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; @@ -81,7 +82,6 @@ mock! { ) -> Result<(), ConsensusError>; async fn set_height_and_round(&mut self, height: BlockNumber, round: Round); - } } @@ -123,7 +123,7 @@ async fn manager_multiple_heights_unordered() { // Run the manager for height 1. context .expect_validate_proposal() - .return_once(move |_, _, _, _| { + .return_once(move |_, _, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send(( @@ -155,7 +155,7 @@ async fn manager_multiple_heights_unordered() { // Run the manager for height 2. context .expect_validate_proposal() - .return_once(move |_, _, _, _| { + .return_once(move |_, _, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send(( @@ -188,7 +188,7 @@ async fn run_consensus_sync() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _| { + context.expect_validate_proposal().return_once(move |_, _, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) })) @@ -258,7 +258,7 @@ async fn run_consensus_sync_cancellation_safety() { // TODO(guyn): refactor this test to pass proposals through the correct channels. let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - context.expect_validate_proposal().return_once(move |_, _, _, _| { + context.expect_validate_proposal().return_once(move |_, _, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) @@ -346,7 +346,7 @@ async fn test_timeouts() { let mut context = MockTestContext::new(); context.expect_set_height_and_round().returning(move |_, _| ()); - context.expect_validate_proposal().returning(move |_, _, _, _| { + context.expect_validate_proposal().returning(move |_, _, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index c36c3d7e79..4bbd82b106 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -229,6 +229,7 @@ impl SingleHeightConsensus { .validate_proposal( self.height, init.round, + init.proposer, self.timeouts.proposal_timeout, p2p_messages_receiver, ) diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 2d3d14fac4..0340568b1d 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -171,7 +171,7 @@ async fn validator(repeat_proposal: bool) { ); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver @@ -250,7 +250,7 @@ async fn vote_twice(same_vote: bool) { ); context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); - context.expect_validate_proposal().times(1).returning(move |_, _, _, _| { + context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 66d128b117..45dcb09fa8 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -78,6 +78,7 @@ mock! { &mut self, height: BlockNumber, round: Round, + proposer: ValidatorId, timeout: Duration, content: mpsc::Receiver ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 282185ed47..becc800fd9 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -77,6 +77,7 @@ pub trait ConsensusContext { &mut self, height: BlockNumber, round: Round, + proposer: ValidatorId, timeout: Duration, content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index ad3f6fa0ec..87ff7fc421 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -36,7 +36,6 @@ use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; use starknet_api::block::BlockNumber; -use starknet_api::core::ContractAddress; use starknet_api::transaction::Transaction; use tracing::{debug, debug_span, info, warn, Instrument}; @@ -70,7 +69,7 @@ impl PapyrusConsensusContext { storage_reader, network_broadcast_client, network_proposal_sender, - validators: (0..num_validators).map(ContractAddress::from).collect(), + validators: (0..num_validators).map(ValidatorId::from).collect(), sync_broadcast_sender, valid_proposals: Arc::new(Mutex::new(BTreeMap::new())), } @@ -167,6 +166,7 @@ impl ConsensusContext for PapyrusConsensusContext { &mut self, height: BlockNumber, _round: Round, + _proposer: ValidatorId, _timeout: Duration, mut content: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index 49b1c5c7ff..0b6aa4feb4 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -3,7 +3,7 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::StreamExt; use papyrus_consensus::stream_handler::StreamHandler; -use papyrus_consensus::types::ConsensusContext; +use papyrus_consensus::types::{ConsensusContext, ValidatorId}; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, BroadcastNetworkMock, @@ -24,7 +24,6 @@ use papyrus_storage::header::HeaderStorageWriter; use papyrus_storage::test_utils::get_test_storage; use papyrus_test_utils::get_test_block; use starknet_api::block::{Block, BlockHash}; -use starknet_api::core::ContractAddress; use crate::papyrus_consensus_context::PapyrusConsensusContext; @@ -40,7 +39,7 @@ async fn build_proposal() { let proposal_init = ProposalInit { height: block_number, round: 0, - proposer: ContractAddress::default(), + proposer: ValidatorId::default(), valid_round: None, }; // TODO(Asmaa): Test proposal content. @@ -68,7 +67,13 @@ async fn validate_proposal_success() { validate_sender.close_channel(); let fin = papyrus_context - .validate_proposal(block_number, 0, Duration::MAX, validate_receiver) + .validate_proposal( + block_number, + 0, + ValidatorId::default(), + Duration::MAX, + validate_receiver, + ) .await .await .unwrap(); @@ -93,7 +98,13 @@ async fn validate_proposal_fail() { validate_sender.close_channel(); let fin = papyrus_context - .validate_proposal(block_number, 0, Duration::MAX, validate_receiver) + .validate_proposal( + block_number, + 0, + ValidatorId::default(), + Duration::MAX, + validate_receiver, + ) .await .await; assert_eq!(fin, Err(oneshot::Canceled)); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 7ffe207133..11fc4b7b33 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -76,7 +76,7 @@ const TEMPORARY_GAS_PRICES: GasPrices = GasPrices { // store one of them. type HeightToIdToContent = BTreeMap, ProposalId)>>; -type ValidationParams = (BlockNumber, Duration, mpsc::Receiver); +type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver); const CHANNEL_SIZE: usize = 100; @@ -173,7 +173,7 @@ impl ConsensusContext for SequencerConsensusContext { now.timestamp().try_into().expect("Failed to convert timestamp"), ), use_kzg_da: true, - ..Default::default() + sequencer_address: proposal_init.proposer, }, }; // TODO: Should we be returning an error? @@ -219,6 +219,7 @@ impl ConsensusContext for SequencerConsensusContext { &mut self, height: BlockNumber, round: Round, + validator: ValidatorId, timeout: Duration, content_receiver: mpsc::Receiver, ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { @@ -228,12 +229,18 @@ impl ConsensusContext for SequencerConsensusContext { std::cmp::Ordering::Less => fin_receiver, std::cmp::Ordering::Greater => { self.queued_proposals - .insert(round, ((height, timeout, content_receiver), fin_sender)); + .insert(round, ((height, validator, timeout, content_receiver), fin_sender)); fin_receiver } std::cmp::Ordering::Equal => { - self.validate_current_round_proposal(height, timeout, content_receiver, fin_sender) - .await; + self.validate_current_round_proposal( + height, + validator, + timeout, + content_receiver, + fin_sender, + ) + .await; fin_receiver } } @@ -334,10 +341,10 @@ impl ConsensusContext for SequencerConsensusContext { } } // Validate the proposal for the current round if exists. - let Some(((height, timeout, content), fin_sender)) = to_process else { + let Some(((height, validator, timeout, content), fin_sender)) = to_process else { return; }; - self.validate_current_round_proposal(height, timeout, content, fin_sender).await; + self.validate_current_round_proposal(height, validator, timeout, content, fin_sender).await; } } @@ -345,6 +352,7 @@ impl SequencerConsensusContext { async fn validate_current_round_proposal( &mut self, height: BlockNumber, + proposer: ValidatorId, timeout: Duration, content_receiver: mpsc::Receiver, fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, @@ -374,7 +382,7 @@ impl SequencerConsensusContext { now.timestamp().try_into().expect("Failed to convert timestamp"), ), use_kzg_da: true, - ..Default::default() + sequencer_address: proposer, }, }; batcher.validate_block(input).await.expect("Failed to initiate proposal validation"); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index b6c1b0788f..3fb1a632c6 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -6,7 +6,7 @@ use futures::channel::mpsc; use futures::{FutureExt, SinkExt}; use lazy_static::lazy_static; use papyrus_consensus::stream_handler::StreamHandler; -use papyrus_consensus::types::ConsensusContext; +use papyrus_consensus::types::{ConsensusContext, ValidatorId}; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, BroadcastNetworkMock, @@ -21,7 +21,7 @@ use papyrus_protobuf::consensus::{ TransactionBatch, }; use starknet_api::block::{BlockHash, BlockNumber}; -use starknet_api::core::{ContractAddress, StateDiffCommitment}; +use starknet_api::core::StateDiffCommitment; use starknet_api::executable_transaction::{ AccountTransaction, Transaction as ExecutableTransaction, @@ -128,7 +128,7 @@ async fn build_proposal() { let init = ProposalInit { height: BlockNumber(0), round: 0, - proposer: ContractAddress::default(), + proposer: ValidatorId::default(), valid_round: None, }; // TODO(Asmaa): Test proposal content. @@ -207,8 +207,9 @@ async fn validate_proposal_success() { })) .await .unwrap(); - let fin_receiver = - context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; + let fin_receiver = context + .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .await; content_sender.close_channel(); assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); } @@ -267,8 +268,9 @@ async fn repropose() { proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), }); content_sender.send(prop_part).await.unwrap(); - let fin_receiver = - context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; + let fin_receiver = context + .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .await; content_sender.close_channel(); assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); @@ -348,8 +350,9 @@ async fn proposals_from_different_rounds() { let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(prop_part_txs.clone()).await.unwrap(); - let fin_receiver_past_round = - context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; + let fin_receiver_past_round = context + .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .await; // No fin was sent, channel remains open. assert!(fin_receiver_past_round.await.is_err()); @@ -357,16 +360,18 @@ async fn proposals_from_different_rounds() { let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(prop_part_txs.clone()).await.unwrap(); content_sender.send(prop_part_fin.clone()).await.unwrap(); - let fin_receiver_curr_round = - context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await; + let fin_receiver_curr_round = context + .validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver) + .await; assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); // The proposal from the future round should not be processed. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(prop_part_txs.clone()).await.unwrap(); content_sender.send(prop_part_fin.clone()).await.unwrap(); - let fin_receiver_future_round = - context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await; + let fin_receiver_future_round = context + .validate_proposal(BlockNumber(0), 2, ValidatorId::default(), TIMEOUT, content_receiver) + .await; content_sender.close_channel(); // Even with sending fin and closing the channel. assert!(fin_receiver_future_round.now_or_never().is_none()); @@ -433,8 +438,9 @@ async fn interrupt_active_proposal() { // Keep the sender open, as closing it or sending Fin would cause the validate to complete // without needing interrupt. let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE); - let fin_receiver_0 = - context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; + let fin_receiver_0 = context + .validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver) + .await; let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender_1 @@ -450,8 +456,9 @@ async fn interrupt_active_proposal() { })) .await .unwrap(); - let fin_receiver_1 = - context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await; + let fin_receiver_1 = context + .validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver) + .await; // Move the context to the next round. context.set_height_and_round(BlockNumber(0), 1).await;