Skip to content

Commit

Permalink
chore(starknet_consensus_manager): add proposal init into validate pr…
Browse files Browse the repository at this point in the history
…oposal input
  • Loading branch information
ArniStarkware committed Dec 9, 2024
1 parent b15b075 commit 514e2e7
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 72 deletions.
4 changes: 2 additions & 2 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ pub enum StreamMessageBody<T> {
Fin,
}

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StreamMessage<T: Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
pub message: StreamMessageBody<T>,
pub stream_id: u64,
pub message_id: u64,
}

/// This message must be sent first when proposing a new block.
#[derive(Default, Debug, Clone, PartialEq)]
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub struct ProposalInit {
/// The height of the consensus (block number).
pub height: BlockNumber,
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_protobuf/src/converters/consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ fn convert_proposal_init_to_vec_u8_and_back() {

let proposal_init = ProposalInit::get_test_instance(&mut rng);

let bytes_data: Vec<u8> = proposal_init.clone().into();
let bytes_data: Vec<u8> = proposal_init.into();
let res_data = ProposalInit::try_from(bytes_data).unwrap();
assert_eq!(proposal_init, res_data);
}
Expand Down
8 changes: 3 additions & 5 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down Expand Up @@ -112,7 +110,7 @@ async fn send_proposal(
fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) {
context
.expect_validate_proposal()
.return_once(move |_, _, _, _, _| {
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -352,7 +350,7 @@ async fn test_timeouts() {

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
context.expect_validate_proposal().returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,7 @@ impl SingleHeightConsensus {
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
proposal_entry.insert(None);
let block_receiver = context
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
.validate_proposal(init, self.timeouts.proposal_timeout, p2p_messages_receiver)
.await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn handle_proposal(
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(MockProposalPart(1)).await.unwrap();

shc.handle_proposal(context, PROPOSAL_INIT.clone(), content_receiver).await.unwrap()
shc.handle_proposal(context, *PROPOSAL_INIT, content_receiver).await.unwrap()
}

#[tokio::test]
Expand Down Expand Up @@ -173,7 +173,7 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down Expand Up @@ -253,7 +253,7 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down
4 changes: 1 addition & 3 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ mock! {

async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<MockProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
4 changes: 1 addition & 3 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ pub trait ConsensusContext {
/// by ConsensusContext.
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
init: ProposalInit,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl ConsensusContext for PapyrusConsensusContext {
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(proposal_init.clone()))
.send(Self::ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
proposal_sender
Expand Down Expand Up @@ -168,9 +168,7 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn validate_proposal(
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
proposal_init: ProposalInit,
_timeout: Duration,
mut content: mpsc::Receiver<ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
Expand All @@ -183,14 +181,19 @@ impl ConsensusContext for PapyrusConsensusContext {
// TODO(dvir): consider fix this for the case of reverts. If between the check that
// the block in storage and to getting the transaction was a revert
// this flow will fail.
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block");
wait_for_block(&storage_reader, proposal_init.height)
.await
.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(height)
.get_block_transactions(proposal_init.height)
.expect("Get transactions from storage failed")
.unwrap_or_else(|| {
panic!("Block in {height} was not found in storage despite waiting for it")
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
});

// First gather all the non-fin transactions.
Expand Down Expand Up @@ -224,25 +227,28 @@ impl ConsensusContext for PapyrusConsensusContext {
);

let block_hash = txn
.get_block_header(height)
.get_block_header(proposal_init.height)
.expect("Get header from storage failed")
.unwrap_or_else(|| {
panic!("Block in {height} was not found in storage despite waiting for it")
panic!(
"Block in {} was not found in storage despite waiting for it",
proposal_init.height
)
})
.block_hash;

let mut proposals = valid_proposals
.lock()
.expect("Lock on active proposals was poisoned due to a previous panic");

proposals.entry(height).or_default().insert(block_hash, transactions);
proposals.entry(proposal_init.height).or_default().insert(block_hash, transactions);
// Done after inserting the proposal into the map to avoid race conditions between
// insertion and calls to `repropose`.
// This can happen as a result of sync interrupting `run_height`.
fin_sender
.send((block_hash, ProposalFin { proposal_content_id: received_block_hash }))
.unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
warn!("Failed to send block to consensus. height={}", proposal_init.height);
})
}
.instrument(debug_span!("consensus_validate_proposal")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ async fn validate_proposal_success() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: block_number,
round: 0,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
Duration::MAX,
validate_receiver,
)
Expand Down Expand Up @@ -99,9 +102,12 @@ async fn validate_proposal_fail() {

let fin = papyrus_context
.validate_proposal(
block_number,
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: block_number,
round: 0,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
Duration::MAX,
validate_receiver,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(ProposalPart::Init(proposal_init.clone()))
.send(ProposalPart::Init(proposal_init))
.await
.expect("Failed to send proposal init");
tokio::spawn(
Expand All @@ -220,25 +220,28 @@ impl ConsensusContext for SequencerConsensusContext {
// That part is consumed by the caller, so it can know the height/round.
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
validator: ValidatorId,
proposal_init: ProposalInit,
timeout: Duration,
content_receiver: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
assert_eq!(Some(height), self.current_height);
assert_eq!(Some(proposal_init.height), self.current_height);
let (fin_sender, fin_receiver) = oneshot::channel();
match round.cmp(&self.current_round) {
match proposal_init.round.cmp(&self.current_round) {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
self.queued_proposals
.insert(round, ((height, validator, timeout, content_receiver), fin_sender));
self.queued_proposals.insert(
proposal_init.round,
(
(proposal_init.height, proposal_init.proposer, timeout, content_receiver),
fin_sender,
),
);
fin_receiver
}
std::cmp::Ordering::Equal => {
self.validate_current_round_proposal(
height,
validator,
proposal_init.height,
proposal_init.proposer,
timeout,
content_receiver,
fin_sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ async fn validate_proposal_success() {
.unwrap();
let fin_receiver = context
.validate_proposal(
BlockNumber(0),
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: BlockNumber(0),
round: 0,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
TIMEOUT,
content_receiver,
)
Expand Down Expand Up @@ -255,9 +258,12 @@ async fn repropose() {
.unwrap();
let fin_receiver = context
.validate_proposal(
BlockNumber(0),
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: BlockNumber(0),
round: 0,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
TIMEOUT,
content_receiver,
)
Expand Down Expand Up @@ -330,9 +336,12 @@ async fn proposals_from_different_rounds() {

let fin_receiver_past_round = context
.validate_proposal(
BlockNumber(0),
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: BlockNumber(0),
round: 0,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
TIMEOUT,
content_receiver,
)
Expand All @@ -346,9 +355,12 @@ async fn proposals_from_different_rounds() {
content_sender.send(prop_part_fin.clone()).await.unwrap();
let fin_receiver_curr_round = context
.validate_proposal(
BlockNumber(0),
1,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: BlockNumber(0),
round: 1,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
TIMEOUT,
content_receiver,
)
Expand All @@ -361,9 +373,12 @@ async fn proposals_from_different_rounds() {
content_sender.send(prop_part_fin.clone()).await.unwrap();
let fin_receiver_future_round = context
.validate_proposal(
BlockNumber(0),
2,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: BlockNumber(0),
round: 2,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
TIMEOUT,
content_receiver,
)
Expand Down Expand Up @@ -423,9 +438,12 @@ async fn interrupt_active_proposal() {
let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE);
let fin_receiver_0 = context
.validate_proposal(
BlockNumber(0),
0,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: BlockNumber(0),
round: 0,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
TIMEOUT,
content_receiver,
)
Expand All @@ -447,9 +465,12 @@ async fn interrupt_active_proposal() {
.unwrap();
let fin_receiver_1 = context
.validate_proposal(
BlockNumber(0),
1,
ValidatorId::from(DEFAULT_VALIDATOR_ID),
ProposalInit {
height: BlockNumber(0),
round: 1,
proposer: ValidatorId::from(DEFAULT_VALIDATOR_ID),
..Default::default()
},
TIMEOUT,
content_receiver,
)
Expand Down

0 comments on commit 514e2e7

Please sign in to comment.