Skip to content

Commit

Permalink
chore(starknet_consensus_manager): set proposer address in propose bl…
Browse files Browse the repository at this point in the history
…ock input
  • Loading branch information
ArniStarkware committed Dec 3, 2024
1 parent dee0a42 commit c2fba98
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 40 deletions.
12 changes: 6 additions & 6 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand All @@ -81,7 +82,6 @@ mock! {
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);

}
}

Expand Down Expand Up @@ -123,7 +123,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -155,7 +155,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -188,7 +188,7 @@ async fn run_consensus_sync() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }))
Expand Down Expand Up @@ -258,7 +258,7 @@ async fn run_consensus_sync_cancellation_safety() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down Expand Up @@ -346,7 +346,7 @@ async fn test_timeouts() {

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _| {
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl SingleHeightConsensus {
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<MockProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub trait ConsensusContext {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::BlockNumber;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, warn, Instrument};

Expand Down Expand Up @@ -70,7 +69,7 @@ impl PapyrusConsensusContext {
storage_reader,
network_broadcast_client,
network_proposal_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
validators: (0..num_validators).map(ValidatorId::from).collect(),
sync_broadcast_sender,
valid_proposals: Arc::new(Mutex::new(BTreeMap::new())),
}
Expand Down Expand Up @@ -167,6 +166,7 @@ impl ConsensusContext for PapyrusConsensusContext {
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
_timeout: Duration,
mut content: mpsc::Receiver<ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_consensus::types::{ConsensusContext, ValidatorId};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand All @@ -24,7 +24,6 @@ use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_test_utils::get_test_block;
use starknet_api::block::{Block, BlockHash};
use starknet_api::core::ContractAddress;

use crate::papyrus_consensus_context::PapyrusConsensusContext;

Expand All @@ -40,7 +39,7 @@ async fn build_proposal() {
let proposal_init = ProposalInit {
height: block_number,
round: 0,
proposer: ContractAddress::default(),
proposer: ValidatorId::default(),
valid_round: None,
};
// TODO(Asmaa): Test proposal content.
Expand Down Expand Up @@ -68,7 +67,13 @@ async fn validate_proposal_success() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await
.unwrap();
Expand All @@ -93,7 +98,13 @@ async fn validate_proposal_fail() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await;
assert_eq!(fin, Err(oneshot::Canceled));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const TEMPORARY_GAS_PRICES: GasPrices = GasPrices {
// store one of them.
type HeightToIdToContent =
BTreeMap<BlockNumber, HashMap<ProposalContentId, (Vec<ExecutableTransaction>, ProposalId)>>;
type ValidationParams = (BlockNumber, Duration, mpsc::Receiver<ProposalPart>);
type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver<ProposalPart>);

const CHANNEL_SIZE: usize = 100;

Expand Down Expand Up @@ -173,7 +173,7 @@ impl ConsensusContext for SequencerConsensusContext {
now.timestamp().try_into().expect("Failed to convert timestamp"),
),
use_kzg_da: true,
..Default::default()
sequencer_address: proposal_init.proposer,
},
};
// TODO: Should we be returning an error?
Expand Down Expand Up @@ -219,6 +219,7 @@ impl ConsensusContext for SequencerConsensusContext {
&mut self,
height: BlockNumber,
round: Round,
validator: ValidatorId,
timeout: Duration,
content_receiver: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
Expand All @@ -228,12 +229,18 @@ impl ConsensusContext for SequencerConsensusContext {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
self.queued_proposals
.insert(round, ((height, timeout, content_receiver), fin_sender));
.insert(round, ((height, validator, timeout, content_receiver), fin_sender));
fin_receiver
}
std::cmp::Ordering::Equal => {
self.validate_current_round_proposal(height, timeout, content_receiver, fin_sender)
.await;
self.validate_current_round_proposal(
height,
validator,
timeout,
content_receiver,
fin_sender,
)
.await;
fin_receiver
}
}
Expand Down Expand Up @@ -334,17 +341,18 @@ impl ConsensusContext for SequencerConsensusContext {
}
}
// Validate the proposal for the current round if exists.
let Some(((height, timeout, content), fin_sender)) = to_process else {
let Some(((height, validator, timeout, content), fin_sender)) = to_process else {
return;
};
self.validate_current_round_proposal(height, timeout, content, fin_sender).await;
self.validate_current_round_proposal(height, validator, timeout, content, fin_sender).await;
}
}

impl SequencerConsensusContext {
async fn validate_current_round_proposal(
&mut self,
height: BlockNumber,
proposer: ValidatorId,
timeout: Duration,
content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
Expand Down Expand Up @@ -374,7 +382,7 @@ impl SequencerConsensusContext {
now.timestamp().try_into().expect("Failed to convert timestamp"),
),
use_kzg_da: true,
..Default::default()
sequencer_address: proposer,
},
};
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::channel::mpsc;
use futures::{FutureExt, SinkExt};
use lazy_static::lazy_static;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_consensus::types::{ConsensusContext, ValidatorId};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand All @@ -21,7 +21,7 @@ use papyrus_protobuf::consensus::{
TransactionBatch,
};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::{ContractAddress, StateDiffCommitment};
use starknet_api::core::StateDiffCommitment;
use starknet_api::executable_transaction::{
AccountTransaction,
Transaction as ExecutableTransaction,
Expand Down Expand Up @@ -128,7 +128,7 @@ async fn build_proposal() {
let init = ProposalInit {
height: BlockNumber(0),
round: 0,
proposer: ContractAddress::default(),
proposer: ValidatorId::default(),
valid_round: None,
};
// TODO(Asmaa): Test proposal content.
Expand Down Expand Up @@ -207,8 +207,9 @@ async fn validate_proposal_success() {
}))
.await
.unwrap();
let fin_receiver =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
content_sender.close_channel();
assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);
}
Expand Down Expand Up @@ -267,8 +268,9 @@ async fn repropose() {
proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
});
content_sender.send(prop_part).await.unwrap();
let fin_receiver =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
content_sender.close_channel();
assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

Expand Down Expand Up @@ -348,25 +350,28 @@ async fn proposals_from_different_rounds() {
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();

let fin_receiver_past_round =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver_past_round = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
// No fin was sent, channel remains open.
assert!(fin_receiver_past_round.await.is_err());

// The proposal from the current round should be validated.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();
content_sender.send(prop_part_fin.clone()).await.unwrap();
let fin_receiver_curr_round =
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
let fin_receiver_curr_round = context
.validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

// The proposal from the future round should not be processed.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();
content_sender.send(prop_part_fin.clone()).await.unwrap();
let fin_receiver_future_round =
context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await;
let fin_receiver_future_round = context
.validate_proposal(BlockNumber(0), 2, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
content_sender.close_channel();
// Even with sending fin and closing the channel.
assert!(fin_receiver_future_round.now_or_never().is_none());
Expand Down Expand Up @@ -433,8 +438,9 @@ async fn interrupt_active_proposal() {
// Keep the sender open, as closing it or sending Fin would cause the validate to complete
// without needing interrupt.
let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE);
let fin_receiver_0 =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver_0 = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;

let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender_1
Expand All @@ -450,8 +456,9 @@ async fn interrupt_active_proposal() {
}))
.await
.unwrap();
let fin_receiver_1 =
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
let fin_receiver_1 = context
.validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
// Move the context to the next round.
context.set_height_and_round(BlockNumber(0), 1).await;

Expand Down

0 comments on commit c2fba98

Please sign in to comment.