Skip to content

Commit

Permalink
feat(sequencing): validate streamed proposals (#2305)
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware authored Dec 3, 2024
1 parent 4b0d3c5 commit a6a227b
Show file tree
Hide file tree
Showing 11 changed files with 395 additions and 190 deletions.
1 change: 1 addition & 0 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO;
// different genesis hash.
// TODO: Consider moving to a more general place.
const GENESIS_HASH: &str = "0x0";

// TODO(guyn): move this to the config.
pub const NETWORK_TOPIC: &str = "consensus_proposals";

Expand Down
65 changes: 39 additions & 26 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ mod manager_test;
use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper};
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, warn};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand Down Expand Up @@ -42,8 +42,6 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
) -> Result<(), ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<ContextT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
info!(
Expand Down Expand Up @@ -116,11 +114,6 @@ impl MultiHeightManager {
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
Expand All @@ -147,6 +140,18 @@ impl MultiHeightManager {
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
// Get the first message to verify the init was sent.
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// the first message (message_id=0) has arrived.
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
"Proposal receiver closed".to_string(),
));
};
let proposal_init: ProposalInit = first_part.try_into()?;
self.handle_proposal(context, height, &mut shc, proposal_init, content_receiver).await?
},
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
Expand All @@ -163,6 +168,26 @@ impl MultiHeightManager {
}
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: ProposalInit,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
if proposal_init.height != height {
// TODO(guyn): add caching of heights for future use.
warn!("Received a proposal for a different height. {:?}", proposal_init);
}
shc.handle_proposal(context, proposal_init, content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
&mut self,
Expand All @@ -173,11 +198,6 @@ impl MultiHeightManager {
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
Expand All @@ -191,16 +211,9 @@ impl MultiHeightManager {
return Ok(ShcReturn::Tasks(Vec::new()));
}
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
// TODO(guyn): this will be gone once we integrate the proposal channels.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?;
Ok(res)
}
ConsensusMessage::Proposal(_) => Err(ConsensusError::InternalNetworkError(
"Proposal variant of ConsensusMessage no longer supported".to_string(),
)),
_ => {
let res = shc.handle_message(context, message).await?;
Ok(res)
Expand Down
85 changes: 67 additions & 18 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ use papyrus_network::network_manager::test_utils::{
TestSubscriberChannels,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalPart, Vote};
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
ProposalInit,
ProposalPart,
Vote,
};
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::transaction::Transaction;
use starknet_types_core::felt::Felt;

use super::{run_consensus, MultiHeightManager};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal};
use crate::test_utils::{precommit, prevote, proposal, proposal_init};
use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId};

lazy_static! {
Expand Down Expand Up @@ -53,8 +59,8 @@ mock! {
height: BlockNumber,
round: Round,
timeout: Duration,
content: mpsc::Receiver<Transaction>
) -> oneshot::Receiver<ProposalContentId>;
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>;

async fn repropose(
&mut self,
Expand Down Expand Up @@ -85,20 +91,30 @@ async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg:
sender.send((msg, broadcasted_message_metadata)).await.unwrap();
}

async fn send_proposal(
proposal_receiver_sender: &mut mpsc::Sender<mpsc::Receiver<ProposalPart>>,
content: ProposalPart,
) {
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(content).await.unwrap();
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn manager_multiple_heights_unordered() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);
let (_proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

// Send messages for height 2 followed by those for height 1.
send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;

send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
Expand All @@ -109,7 +125,12 @@ async fn manager_multiple_heights_unordered() {
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender
.send((
BlockHash(Felt::ONE),
ProposalFin { proposal_content_id: BlockHash(Felt::ONE) },
))
.unwrap();
block_receiver
})
.times(1);
Expand All @@ -136,7 +157,12 @@ async fn manager_multiple_heights_unordered() {
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_sender
.send((
BlockHash(Felt::TWO),
ProposalFin { proposal_content_id: BlockHash(Felt::TWO) },
))
.unwrap();
block_receiver
})
.times(1);
Expand All @@ -152,18 +178,21 @@ async fn manager_multiple_heights_unordered() {
assert_eq!(decision.block, BlockHash(Felt::TWO));
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn run_consensus_sync() {
// Set expectations.
let mut context = MockTestContext::new();
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_sender
.send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }))
.unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
Expand All @@ -178,10 +207,14 @@ async fn run_consensus_sync() {
});

// Send messages for height 2.
send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)),
)
.await;
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut network_sender = mock_network.broadcasted_messages_sender;
send(&mut network_sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
send(&mut network_sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut network_sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;

Expand Down Expand Up @@ -223,11 +256,13 @@ async fn run_consensus_sync_cancellation_safety() {
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
.unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
Expand Down Expand Up @@ -267,7 +302,11 @@ async fn run_consensus_sync_cancellation_safety() {
let mut network_sender = mock_network.broadcasted_messages_sender;

// Send a proposal for height 1.
send(&mut network_sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)),
)
.await;
proposal_handled_rx.await.unwrap();

// Send an old sync. This should not cancel the current height.
Expand All @@ -292,10 +331,14 @@ async fn test_timeouts() {
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)),
)
.await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await;
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_2)).await;
Expand All @@ -305,7 +348,9 @@ async fn test_timeouts() {
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
.unwrap();
block_receiver
});
context
Expand Down Expand Up @@ -343,7 +388,11 @@ async fn test_timeouts() {
timeout_receive.await.unwrap();
// Show that after the timeout is triggered we can still precommit in favor of the block and
// reach a decision.
send(&mut sender, proposal(Felt::ONE, 1, 1, *PROPOSER_ID)).await;
send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID)),
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_3)).await;
Expand Down
Loading

0 comments on commit a6a227b

Please sign in to comment.