Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(starknet_consensus_manager): set proposer address in propose block input #2346

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand All @@ -81,7 +82,6 @@ mock! {
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);

}
}

Expand Down Expand Up @@ -123,7 +123,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -155,7 +155,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
.return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
Expand Down Expand Up @@ -188,7 +188,7 @@ async fn run_consensus_sync() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }))
Expand Down Expand Up @@ -258,7 +258,7 @@ async fn run_consensus_sync_cancellation_safety() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down Expand Up @@ -346,7 +346,7 @@ async fn test_timeouts() {

let mut context = MockTestContext::new();
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _| {
context.expect_validate_proposal().returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl SingleHeightConsensus {
.validate_proposal(
self.height,
init.round,
init.proposer,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down Expand Up @@ -250,7 +250,7 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap();
block_receiver
Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mock! {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<MockProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub trait ConsensusContext {
&mut self,
height: BlockNumber,
round: Round,
proposer: ValidatorId,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::BlockNumber;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use tracing::{debug, debug_span, info, warn, Instrument};

Expand Down Expand Up @@ -70,7 +69,7 @@ impl PapyrusConsensusContext {
storage_reader,
network_broadcast_client,
network_proposal_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
validators: (0..num_validators).map(ValidatorId::from).collect(),
sync_broadcast_sender,
valid_proposals: Arc::new(Mutex::new(BTreeMap::new())),
}
Expand Down Expand Up @@ -167,6 +166,7 @@ impl ConsensusContext for PapyrusConsensusContext {
&mut self,
height: BlockNumber,
_round: Round,
_proposer: ValidatorId,
_timeout: Duration,
mut content: mpsc::Receiver<ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_consensus::types::{ConsensusContext, ValidatorId};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand All @@ -24,7 +24,6 @@ use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_test_utils::get_test_block;
use starknet_api::block::{Block, BlockHash};
use starknet_api::core::ContractAddress;

use crate::papyrus_consensus_context::PapyrusConsensusContext;

Expand All @@ -40,7 +39,7 @@ async fn build_proposal() {
let proposal_init = ProposalInit {
height: block_number,
round: 0,
proposer: ContractAddress::default(),
proposer: ValidatorId::default(),
valid_round: None,
};
// TODO(Asmaa): Test proposal content.
Expand Down Expand Up @@ -68,7 +67,13 @@ async fn validate_proposal_success() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await
.unwrap();
Expand All @@ -93,7 +98,13 @@ async fn validate_proposal_fail() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.validate_proposal(
block_number,
0,
ValidatorId::default(),
Duration::MAX,
validate_receiver,
)
.await
.await;
assert_eq!(fin, Err(oneshot::Canceled));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const TEMPORARY_GAS_PRICES: GasPrices = GasPrices {
// store one of them.
type HeightToIdToContent =
BTreeMap<BlockNumber, HashMap<ProposalContentId, (Vec<ExecutableTransaction>, ProposalId)>>;
type ValidationParams = (BlockNumber, Duration, mpsc::Receiver<ProposalPart>);
type ValidationParams = (BlockNumber, ValidatorId, Duration, mpsc::Receiver<ProposalPart>);

const CHANNEL_SIZE: usize = 100;

Expand Down Expand Up @@ -173,7 +173,7 @@ impl ConsensusContext for SequencerConsensusContext {
now.timestamp().try_into().expect("Failed to convert timestamp"),
),
use_kzg_da: true,
..Default::default()
sequencer_address: proposal_init.proposer,
},
};
// TODO: Should we be returning an error?
Expand Down Expand Up @@ -219,6 +219,7 @@ impl ConsensusContext for SequencerConsensusContext {
&mut self,
height: BlockNumber,
round: Round,
validator: ValidatorId,
timeout: Duration,
content_receiver: mpsc::Receiver<Self::ProposalPart>,
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> {
Expand All @@ -228,12 +229,18 @@ impl ConsensusContext for SequencerConsensusContext {
std::cmp::Ordering::Less => fin_receiver,
std::cmp::Ordering::Greater => {
self.queued_proposals
.insert(round, ((height, timeout, content_receiver), fin_sender));
.insert(round, ((height, validator, timeout, content_receiver), fin_sender));
fin_receiver
}
std::cmp::Ordering::Equal => {
self.validate_current_round_proposal(height, timeout, content_receiver, fin_sender)
.await;
self.validate_current_round_proposal(
height,
validator,
timeout,
content_receiver,
fin_sender,
)
.await;
fin_receiver
}
}
Expand Down Expand Up @@ -334,17 +341,18 @@ impl ConsensusContext for SequencerConsensusContext {
}
}
// Validate the proposal for the current round if exists.
let Some(((height, timeout, content), fin_sender)) = to_process else {
let Some(((height, validator, timeout, content), fin_sender)) = to_process else {
return;
};
self.validate_current_round_proposal(height, timeout, content, fin_sender).await;
self.validate_current_round_proposal(height, validator, timeout, content, fin_sender).await;
}
}

impl SequencerConsensusContext {
async fn validate_current_round_proposal(
&mut self,
height: BlockNumber,
proposer: ValidatorId,
timeout: Duration,
content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
Expand Down Expand Up @@ -374,7 +382,7 @@ impl SequencerConsensusContext {
now.timestamp().try_into().expect("Failed to convert timestamp"),
),
use_kzg_da: true,
..Default::default()
sequencer_address: proposer,
},
};
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::channel::mpsc;
use futures::{FutureExt, SinkExt};
use lazy_static::lazy_static;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_consensus::types::{ConsensusContext, ValidatorId};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand All @@ -21,7 +21,7 @@ use papyrus_protobuf::consensus::{
TransactionBatch,
};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::{ContractAddress, StateDiffCommitment};
use starknet_api::core::StateDiffCommitment;
use starknet_api::executable_transaction::{
AccountTransaction,
Transaction as ExecutableTransaction,
Expand Down Expand Up @@ -128,7 +128,7 @@ async fn build_proposal() {
let init = ProposalInit {
height: BlockNumber(0),
round: 0,
proposer: ContractAddress::default(),
proposer: ValidatorId::default(),
valid_round: None,
};
// TODO(Asmaa): Test proposal content.
Expand Down Expand Up @@ -207,8 +207,9 @@ async fn validate_proposal_success() {
}))
.await
.unwrap();
let fin_receiver =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
content_sender.close_channel();
assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);
}
Expand Down Expand Up @@ -267,8 +268,9 @@ async fn repropose() {
proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0),
});
content_sender.send(prop_part).await.unwrap();
let fin_receiver =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
content_sender.close_channel();
assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

Expand Down Expand Up @@ -348,25 +350,28 @@ async fn proposals_from_different_rounds() {
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();

let fin_receiver_past_round =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver_past_round = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
// No fin was sent, channel remains open.
assert!(fin_receiver_past_round.await.is_err());

// The proposal from the current round should be validated.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();
content_sender.send(prop_part_fin.clone()).await.unwrap();
let fin_receiver_curr_round =
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
let fin_receiver_curr_round = context
.validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0);

// The proposal from the future round should not be processed.
let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender.send(prop_part_txs.clone()).await.unwrap();
content_sender.send(prop_part_fin.clone()).await.unwrap();
let fin_receiver_future_round =
context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await;
let fin_receiver_future_round = context
.validate_proposal(BlockNumber(0), 2, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
content_sender.close_channel();
// Even with sending fin and closing the channel.
assert!(fin_receiver_future_round.now_or_never().is_none());
Expand Down Expand Up @@ -433,8 +438,9 @@ async fn interrupt_active_proposal() {
// Keep the sender open, as closing it or sending Fin would cause the validate to complete
// without needing interrupt.
let (mut _content_sender_0, content_receiver) = mpsc::channel(CHANNEL_SIZE);
let fin_receiver_0 =
context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await;
let fin_receiver_0 = context
.validate_proposal(BlockNumber(0), 0, ValidatorId::default(), TIMEOUT, content_receiver)
.await;

let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE);
content_sender_1
Expand All @@ -450,8 +456,9 @@ async fn interrupt_active_proposal() {
}))
.await
.unwrap();
let fin_receiver_1 =
context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await;
let fin_receiver_1 = context
.validate_proposal(BlockNumber(0), 1, ValidatorId::default(), TIMEOUT, content_receiver)
.await;
// Move the context to the next round.
context.set_height_and_round(BlockNumber(0), 1).await;

Expand Down
Loading