Skip to content

Commit

Permalink
chore(starknet_consensus_manager): set proposer address in propose bl…
Browse files Browse the repository at this point in the history
…ock input
  • Loading branch information
ArniStarkware committed Dec 3, 2024
1 parent b7934f4 commit 38d7da8
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 63 deletions.
27 changes: 16 additions & 11 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand All @@ -80,8 +81,12 @@ mock! {
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);

async fn set_height_and_round(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId
);
}
}

Expand Down Expand Up @@ -123,7 +128,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand All @@ -136,7 +141,7 @@ async fn manager_multiple_heights_unordered() {
.times(1);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_broadcast().returning(move |_| Ok(()));

let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone());
Expand All @@ -155,7 +160,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -188,7 +193,7 @@ async fn run_consensus_sync() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }))
Expand All @@ -197,7 +202,7 @@ async fn run_consensus_sync() {
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_broadcast().returning(move |_| Ok(()));
context.expect_decision_reached().return_once(move |block, votes| {
assert_eq!(block, BlockHash(Felt::TWO));
Expand Down Expand Up @@ -258,7 +263,7 @@ async fn run_consensus_sync_cancellation_safety() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand All @@ -267,7 +272,7 @@ async fn run_consensus_sync_cancellation_safety() {
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_broadcast().with(eq(prevote(Some(Felt::ONE), 1, 0, *VALIDATOR_ID))).return_once(
move |_| {
proposal_handled_tx.send(()).unwrap();
Expand Down Expand Up @@ -345,8 +350,8 @@ async fn test_timeouts() {
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_3)).await;

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _| {
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down
25 changes: 20 additions & 5 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,15 @@ impl SingleHeightConsensus {
context: &mut ContextT,
) -> Result<ShcReturn, ConsensusError> {
info!("Starting consensus with validators {:?}", self.validators);
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) };
let events = self.state_machine.start(&leader_fn);
let ret = self.handle_state_machine_events(context, events).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
ret
}

Expand Down Expand Up @@ -229,11 +233,14 @@ impl SingleHeightConsensus {
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
.await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
}

Expand Down Expand Up @@ -261,7 +268,13 @@ impl SingleHeightConsensus {
}
ConsensusMessage::Vote(vote) => {
let ret = self.handle_vote(context, vote).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(
self.height,
self.state_machine.round(),
ValidatorId::default(),
)
.await;
ret
}
}
Expand Down Expand Up @@ -354,7 +367,9 @@ impl SingleHeightConsensus {
}
_ => unimplemented!("Unexpected event: {:?}", event),
};
context.set_height_and_round(self.height, self.state_machine.round()).await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
.await;
ret
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn proposer() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -171,12 +171,12 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -250,12 +250,12 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1) // Shows the repeat vote is ignored.
Expand Down Expand Up @@ -324,7 +324,7 @@ async fn rebroadcast_votes() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -386,7 +386,7 @@ async fn repropose() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_set_height_and_round().returning(move |_, _, _| ());
context
.expect_broadcast()
.times(1)
Expand Down
8 changes: 7 additions & 1 deletion crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<MockProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand All @@ -100,7 +101,12 @@ mock! {
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
async fn set_height_and_round(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId
);
}
}

Expand Down
8 changes: 7 additions & 1 deletion crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub trait ConsensusContext {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down Expand Up @@ -113,7 +114,12 @@ pub trait ConsensusContext {

/// Update the context with the current height and round.
/// Must be called at the beginning of each height.
async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
async fn set_height_and_round(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
);
}

#[derive(PartialEq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::BlockNumber;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, warn, Instrument};

Expand Down Expand Up @@ -70,7 +69,7 @@ impl PapyrusConsensusContext {
storage_reader,
network_broadcast_client,
network_proposal_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
validators: (0..num_validators).map(ValidatorId::from).collect(),
sync_broadcast_sender,
valid_proposals: Arc::new(Mutex::new(BTreeMap::new())),
}
Expand Down Expand Up @@ -167,6 +166,7 @@ impl ConsensusContext for PapyrusConsensusContext {
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
_timeout: Duration,
mut content: mpsc::Receiver<ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
Expand Down Expand Up @@ -305,7 +305,12 @@ impl ConsensusContext for PapyrusConsensusContext {
Ok(())
}

async fn set_height_and_round(&mut self, _height: BlockNumber, _round: Round) {
async fn set_height_and_round(
&mut self,
_height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
) {
// No-op
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_consensus::types::{ConsensusContext, ValidatorId};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand All @@ -24,7 +24,6 @@ use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_test_utils::get_test_block;
use starknet_api::block::{Block, BlockHash};
use starknet_api::core::ContractAddress;

use crate::papyrus_consensus_context::PapyrusConsensusContext;

Expand All @@ -40,7 +39,7 @@ async fn build_proposal() {
let proposal_init = ProposalInit {
height: block_number,
round: 0,
proposer: ContractAddress::default(),
proposer: ValidatorId::default(),
valid_round: None,
};
// TODO(Asmaa): Test proposal content.
Expand Down Expand Up @@ -68,7 +67,13 @@ async fn validate_proposal_success() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await
.unwrap();
Expand All @@ -93,7 +98,13 @@ async fn validate_proposal_fail() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await;
assert_eq!(fin, Err(oneshot::Canceled));
Expand Down
Loading

0 comments on commit 38d7da8

Please sign in to comment.