Skip to content

Commit

Permalink
chore(starknet_consensus_manager): add proposal init into validate pr…
Browse files Browse the repository at this point in the history
…oposal input
  • Loading branch information
ArniStarkware committed Dec 2, 2024
1 parent 0dc1db4 commit ce0120e
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 59 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ pub enum StreamMessageBody<T> {
Fin,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StreamMessage<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
pub message: StreamMessageBody<T>,
pub stream_id: u64,
pub message_id: u64,
}

/// This message must be sent first when proposing a new block.
#[derive(Default, Debug, Clone, PartialEq)]
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub struct ProposalInit {
/// The height of the consensus (block number).
pub height: BlockNumber,
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_protobuf/src/converters/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() {

let proposal_init = ProposalInit::get_test_instance(&mut rng);

let bytes_data: Vec<u8> = proposal_init.clone().into();
let bytes_data: Vec<u8> = proposal_init.into();
let res_data = ProposalInit::try_from(bytes_data).unwrap();
assert_eq!(proposal_init, res_data);
}
Expand Down
14 changes: 6 additions & 8 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<Transaction>
) -> oneshot::Receiver<ProposalContentId>;
Expand Down Expand Up @@ -112,7 +110,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _, _| {
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
Expand All @@ -139,7 +137,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _, _| {
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_receiver
Expand All @@ -166,7 +164,7 @@ async fn run_consensus_sync() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_receiver
Expand Down Expand Up @@ -230,7 +228,7 @@ async fn run_consensus_sync_cancellation_safety() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
Expand Down Expand Up @@ -308,7 +306,7 @@ async fn test_timeouts() {

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
context.expect_validate_proposal().returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,7 @@ impl SingleHeightConsensus {
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
proposal_entry.insert(None);
let block_receiver = context
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
.validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver)
.await;
context
.set_height_and_round(self.height, self.state_machine.round(), ValidatorId::default())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn handle_proposal(

shc.handle_proposal(
context,
PROPOSAL_INIT.clone(),
*PROPOSAL_INIT,
mpsc::channel(1).1, // content - ignored by SHC.
fin_receiver,
)
Expand Down Expand Up @@ -174,7 +174,7 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BLOCK.id).unwrap();
block_receiver
Expand Down Expand Up @@ -253,7 +253,7 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BLOCK.id).unwrap();
block_receiver
Expand Down
4 changes: 1 addition & 3 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<u32>
) -> oneshot::Receiver<ProposalContentId>;
Expand Down
4 changes: 1 addition & 3 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ pub trait ConsensusContext {
/// by ConsensusContext.
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalChunk>,
) -> oneshot::Receiver<ProposalContentId>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ConsensusContext for PapyrusConsensusContext {
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(proposal_init.clone()))
.send(Self::ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
proposal_sender
Expand Down Expand Up @@ -164,9 +164,7 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn validate_proposal(
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
proposal_init: ProposalInit,
_timeout: Duration,
mut content: mpsc::Receiver<Transaction>,
) -> oneshot::Receiver<ProposalContentId> {
Expand All @@ -179,14 +177,19 @@ impl ConsensusContext for PapyrusConsensusContext {
// TODO(dvir): consider fix this for the case of reverts. If between the check that
// the block in storage and to getting the transaction was a revert
// this flow will fail.
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block");
wait_for_block(&storage_reader, proposal_init.height)
.await
.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(height)
.get_block_transactions(proposal_init.height)
.expect("Get transactions from storage failed")
.unwrap_or_else(|| {
panic!("Block in {height} was not found in storage despite waiting for it")
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
});

for tx in transactions.iter() {
Expand All @@ -204,23 +207,26 @@ impl ConsensusContext for PapyrusConsensusContext {
}

let block_hash = txn
.get_block_header(height)
.get_block_header(proposal_init.height)
.expect("Get header from storage failed")
.unwrap_or_else(|| {
panic!("Block in {height} was not found in storage despite waiting for it")
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
})
.block_hash;

let mut proposals = valid_proposals
.lock()
.expect("Lock on active proposals was poisoned due to a previous panic");

proposals.entry(height).or_default().insert(block_hash, transactions);
proposals.entry(proposal_init.height).or_default().insert(block_hash, transactions);
// Done after inserting the proposal into the map to avoid race conditions between
// insertion and calls to `repropose`.
// This can happen as a result of sync interrupting `run_height`.
fin_sender.send(block_hash).unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
warn!("Failed to send block to consensus. height={}", proposal_init.height);
})
}
.instrument(debug_span!("consensus_validate_proposal")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ async fn validate_proposal_success() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::default(),
ProposalInit {
height: block_number,
round: 0,
valid_round: None,
proposer: ValidatorId::default(),
},
Duration::MAX,
validate_receiver,
)
Expand All @@ -87,9 +90,12 @@ async fn validate_proposal_fail() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::default(),
ProposalInit {
height: block_number,
round: 0,
valid_round: None,
proposer: ValidatorId::default(),
},
Duration::MAX,
validate_receiver,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(ProposalPart::Init(proposal_init.clone()))
.send(ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
tokio::spawn(
Expand All @@ -218,23 +218,28 @@ impl ConsensusContext for SequencerConsensusContext {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
proposal_init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalChunk>,
) -> oneshot::Receiver<ProposalContentId> {
assert_eq!(Some(height), self.current_height);
assert_eq!(Some(proposal_init.height), self.current_height);
let (fin_sender, fin_receiver) = oneshot::channel();
match round.cmp(&self.current_round) {
match proposal_init.round.cmp(&self.current_round) {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
self.queued_proposals.insert(round, ((height, timeout, content), fin_sender));
self.queued_proposals.insert(
proposal_init.round,
((proposal_init.height, timeout, content), fin_sender),
);
fin_receiver
}
std::cmp::Ordering::Equal => {
self.validate_current_round_proposal(
height, proposer, timeout, content, fin_sender,
proposal_init.height,
proposal_init.proposer,
timeout,
content,
fin_sender,
)
.await;
fin_receiver
Expand Down
Loading

0 comments on commit ce0120e

Please sign in to comment.