Skip to content

Commit

Permalink
chore(starknet_consensus_manager): add proposal init into validate pr…
Browse files Browse the repository at this point in the history
…oposal input
  • Loading branch information
ArniStarkware committed Dec 3, 2024
1 parent c2fba98 commit df7d345
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 61 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ pub enum StreamMessageBody<T> {
Fin,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StreamMessage<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
pub message: StreamMessageBody<T>,
pub stream_id: u64,
pub message_id: u64,
}

/// This message must be sent first when proposing a new block.
#[derive(Default, Debug, Clone, PartialEq)]
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub struct ProposalInit {
/// The height of the consensus (block number).
pub height: BlockNumber,
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_protobuf/src/converters/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() {

let proposal_init = ProposalInit::get_test_instance(&mut rng);

let bytes_data: Vec<u8> = proposal_init.clone().into();
let bytes_data: Vec<u8> = proposal_init.into();
let res_data = ProposalInit::try_from(bytes_data).unwrap();
assert_eq!(proposal_init, res_data);
}
Expand Down
14 changes: 6 additions & 8 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down Expand Up @@ -123,7 +121,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _, _| {
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -155,7 +153,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _, _| {
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -188,7 +186,7 @@ async fn run_consensus_sync() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }))
Expand Down Expand Up @@ -258,7 +256,7 @@ async fn run_consensus_sync_cancellation_safety() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down Expand Up @@ -346,7 +344,7 @@ async fn test_timeouts() {

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
context.expect_validate_proposal().returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,7 @@ impl SingleHeightConsensus {
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
proposal_entry.insert(None);
let block_receiver = context
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
.validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver)
.await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn handle_proposal(
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(MockProposalPart(1)).await.unwrap();

shc.handle_proposal(context, PROPOSAL_INIT.clone(), content_receiver).await.unwrap()
shc.handle_proposal(context, *PROPOSAL_INIT, content_receiver).await.unwrap()
}

#[tokio::test]
Expand Down Expand Up @@ -171,7 +171,7 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down
4 changes: 1 addition & 3 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<MockProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
4 changes: 1 addition & 3 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ pub trait ConsensusContext {
/// by ConsensusContext.
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ConsensusContext for PapyrusConsensusContext {
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(proposal_init.clone()))
.send(Self::ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
proposal_sender
Expand Down Expand Up @@ -164,9 +164,7 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn validate_proposal(
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
proposal_init: ProposalInit,
_timeout: Duration,
mut content: mpsc::Receiver<ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
Expand All @@ -179,14 +177,19 @@ impl ConsensusContext for PapyrusConsensusContext {
// TODO(dvir): consider fix this for the case of reverts. If between the check that
// the block in storage and to getting the transaction was a revert
// this flow will fail.
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block");
wait_for_block(&storage_reader, proposal_init.height)
.await
.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(height)
.get_block_transactions(proposal_init.height)
.expect("Get transactions from storage failed")
.unwrap_or_else(|| {
panic!("Block in {height} was not found in storage despite waiting for it")
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
});

// First gather all the non-fin transactions.
Expand Down Expand Up @@ -220,25 +223,28 @@ impl ConsensusContext for PapyrusConsensusContext {
);

let block_hash = txn
.get_block_header(height)
.get_block_header(proposal_init.height)
.expect("Get header from storage failed")
.unwrap_or_else(|| {
panic!("Block in {height} was not found in storage despite waiting for it")
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
})
.block_hash;

let mut proposals = valid_proposals
.lock()
.expect("Lock on active proposals was poisoned due to a previous panic");

proposals.entry(height).or_default().insert(block_hash, transactions);
proposals.entry(proposal_init.height).or_default().insert(block_hash, transactions);
// Done after inserting the proposal into the map to avoid race conditions between
// insertion and calls to `repropose`.
// This can happen as a result of sync interrupting `run_height`.
fin_sender
.send((block_hash, ProposalFin { proposal_content_id: received_block_hash }))
.unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
warn!("Failed to send block to consensus. height={}", proposal_init.height);
})
}
.instrument(debug_span!("consensus_validate_proposal")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ async fn validate_proposal_success() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::default(),
ProposalInit {
height: block_number,
round: 0,
valid_round: None,
proposer: ValidatorId::default(),
},
Duration::MAX,
validate_receiver,
)
Expand Down Expand Up @@ -99,9 +102,12 @@ async fn validate_proposal_fail() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::default(),
ProposalInit {
height: block_number,
round: 0,
valid_round: None,
proposer: ValidatorId::default(),
},
Duration::MAX,
validate_receiver,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(ProposalPart::Init(proposal_init.clone()))
.send(ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
tokio::spawn(
Expand All @@ -217,25 +217,28 @@ impl ConsensusContext for SequencerConsensusContext {
// That part is consumed by the caller, so it can know the height/round.
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
validator: ValidatorId,
proposal_init: ProposalInit,
timeout: Duration,
content_receiver: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
assert_eq!(Some(height), self.current_height);
assert_eq!(Some(proposal_init.height), self.current_height);
let (fin_sender, fin_receiver) = oneshot::channel();
match round.cmp(&self.current_round) {
match proposal_init.round.cmp(&self.current_round) {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
self.queued_proposals
.insert(round, ((height, validator, timeout, content_receiver), fin_sender));
self.queued_proposals.insert(
proposal_init.round,
(
(proposal_init.height, proposal_init.proposer, timeout, content_receiver),
fin_sender,
),
);
fin_receiver
}
std::cmp::Ordering::Equal => {
self.validate_current_round_proposal(
height,
validator,
proposal_init.height,
proposal_init.proposer,
timeout,
content_receiver,
fin_sender,
Expand Down
Loading

0 comments on commit df7d345

Please sign in to comment.