Skip to content

Commit

Permalink
feat(consensus): add streamed validation
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 28, 2024
1 parent c5f0159 commit f515d0c
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 166 deletions.
67 changes: 41 additions & 26 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ mod manager_test;
use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper};
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, warn};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand Down Expand Up @@ -42,8 +42,7 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
) -> Result<(), ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<ContextT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
<ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
info!(
Expand Down Expand Up @@ -116,11 +115,7 @@ impl MultiHeightManager {
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
<ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,
{
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
Expand All @@ -147,6 +142,17 @@ impl MultiHeightManager {
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
// Get the first message to verify the init was sent.
// TODO(guyn): what happens if the channel never sends anything?
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
"Proposal receiver closed".to_string(),
));
};
let proposal_init: ProposalInit = first_part.try_into()?;
self.handle_proposal(context, height, &mut shc, proposal_init, content_receiver).await?
},
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
Expand All @@ -163,6 +169,27 @@ impl MultiHeightManager {
}
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: ProposalInit,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
<ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
if proposal_init.height != height {
// TODO(guyn): add caching of heights for future use.
warn!("Received a proposal for a different height. {:?}", proposal_init);
}
shc.handle_proposal(context, proposal_init.into(), content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
&mut self,
Expand All @@ -173,11 +200,6 @@ impl MultiHeightManager {
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
Expand All @@ -191,16 +213,9 @@ impl MultiHeightManager {
return Ok(ShcReturn::Tasks(Vec::new()));
}
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
// TODO(guyn): this will be gone once we integrate the proposal channels.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?;
Ok(res)
}
ConsensusMessage::Proposal(_) => Err(ConsensusError::InternalNetworkError(
"Proposal variant of ConsensusMessage no longer supported".to_string(),
)),
_ => {
let res = shc.handle_message(context, message).await?;
Ok(res)
Expand Down
53 changes: 35 additions & 18 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use starknet_types_core::felt::Felt;

use super::{run_consensus, MultiHeightManager};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal};
use crate::test_utils::{precommit, prevote, proposal_fin, proposal_init};
use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId};

lazy_static! {
Expand Down Expand Up @@ -52,8 +52,8 @@ mock! {
&mut self,
height: BlockNumber,
timeout: Duration,
content: mpsc::Receiver<Transaction>
) -> oneshot::Receiver<ProposalContentId>;
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>;

async fn repropose(
&mut self,
Expand Down Expand Up @@ -81,21 +81,29 @@ async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg:
sender.send((msg, broadcasted_message_metadata)).await.unwrap();
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn manager_multiple_heights_unordered() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

// Send messages for height 2 followed by those for height 1.
send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap();
proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::TWO))).await.unwrap();
send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;

let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::ONE))).await.unwrap();
send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;

Expand All @@ -105,7 +113,7 @@ async fn manager_multiple_heights_unordered() {
.expect_validate_proposal()
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap();
block_receiver
})
.times(1);
Expand All @@ -131,7 +139,7 @@ async fn manager_multiple_heights_unordered() {
.expect_validate_proposal()
.return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_sender.send((BlockHash(Felt::TWO), BlockHash(Felt::TWO))).unwrap();
block_receiver
})
.times(1);
Expand All @@ -147,18 +155,19 @@ async fn manager_multiple_heights_unordered() {
assert_eq!(decision.block, BlockHash(Felt::TWO));
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn run_consensus_sync() {
// Set expectations.
let mut context = MockTestContext::new();
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_sender.send((BlockHash(Felt::TWO), BlockHash(Felt::TWO))).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
Expand All @@ -175,7 +184,9 @@ async fn run_consensus_sync() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut network_sender = mock_network.broadcasted_messages_sender;
send(&mut network_sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap();
send(&mut network_sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut network_sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;

Expand Down Expand Up @@ -217,11 +228,11 @@ async fn run_consensus_sync_cancellation_safety() {
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
Expand Down Expand Up @@ -260,7 +271,9 @@ async fn run_consensus_sync_cancellation_safety() {
let mut network_sender = mock_network.broadcasted_messages_sender;

// Send a proposal for height 1.
send(&mut network_sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
proposal_handled_rx.await.unwrap();

// Send an old sync. This should not cancel the current height.
Expand All @@ -285,10 +298,12 @@ async fn test_timeouts() {
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await;
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_2)).await;
Expand All @@ -297,7 +312,7 @@ async fn test_timeouts() {
let mut context = MockTestContext::new();
context.expect_validate_proposal().returning(move |_, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap();
block_receiver
});
context
Expand Down Expand Up @@ -335,7 +350,9 @@ async fn test_timeouts() {
timeout_receive.await.unwrap();
// Show that after the timeout is triggered we can still precommit in favor of the block and
// reach a decision.
send(&mut sender, proposal(Felt::ONE, 1, 1, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))).await.unwrap();
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_3)).await;
Expand Down
50 changes: 18 additions & 32 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ pub enum ShcTask {
/// result without blocking consensus.
/// 3. Once validation is complete, the manager returns the built proposal to the SHC as an
/// event, which can be sent to the SM.
ValidateProposal(
ProposalInit,
oneshot::Receiver<ProposalContentId>, // Block built from the content.
oneshot::Receiver<ProposalContentId>, // Fin sent by the proposer.
),
ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalContentId)>),
}

impl PartialEq for ShcTask {
Expand All @@ -82,9 +78,7 @@ impl PartialEq for ShcTask {
| (ShcTask::Prevote(d1, e1), ShcTask::Prevote(d2, e2))
| (ShcTask::Precommit(d1, e1), ShcTask::Precommit(d2, e2)) => d1 == d2 && e1 == e2,
(ShcTask::BuildProposal(r1, _), ShcTask::BuildProposal(r2, _)) => r1 == r2,
(ShcTask::ValidateProposal(pi1, _, _), ShcTask::ValidateProposal(pi2, _, _)) => {
pi1 == pi2
}
(ShcTask::ValidateProposal(pi1, _), ShcTask::ValidateProposal(pi2, _)) => pi1 == pi2,
_ => false,
}
}
Expand Down Expand Up @@ -118,24 +112,17 @@ impl ShcTask {
let proposal_id = receiver.await.expect("Block building failed.");
ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(proposal_id), round))
}
ShcTask::ValidateProposal(
init,
id_built_from_content_receiver,
fin_from_proposer_receiver,
) => {
let proposal_id = match id_built_from_content_receiver.await {
Ok(proposal_id) => Some(proposal_id),
ShcTask::ValidateProposal(init, block_receiver) => {
let (block_proposal_id, network_proposal_id) = match block_receiver.await {
Ok((block_proposal_id, network_proposal_id)) => {
(Some(block_proposal_id), Some(network_proposal_id))
}
// Proposal never received from peer.
Err(_) => None,
};
let fin = match fin_from_proposer_receiver.await {
Ok(fin) => Some(fin),
// ProposalFin never received from peer.
Err(_) => None,
Err(_) => (None, None),
};
ShcEvent::ValidateProposal(
StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round),
fin,
StateMachineEvent::Proposal(block_proposal_id, init.round, init.valid_round),
network_proposal_id,
)
}
}
Expand Down Expand Up @@ -206,8 +193,7 @@ impl SingleHeightConsensus {
&mut self,
context: &mut ContextT,
init: ProposalInit,
p2p_messages_receiver: mpsc::Receiver<ContextT::ProposalChunk>,
fin_receiver: oneshot::Receiver<ProposalContentId>,
p2p_messages_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError> {
debug!(
"Received proposal: height={}, round={}, proposer={:?}",
Expand All @@ -230,10 +216,10 @@ impl SingleHeightConsensus {
// Since validating the proposal is non-blocking, we want to avoid validating the same round
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
proposal_entry.insert(None);
let block_receiver = context
let validation_receiver = context
.validate_proposal(self.height, self.timeouts.proposal_timeout, p2p_messages_receiver)
.await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver, fin_receiver)]))
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, validation_receiver)]))
}

async fn process_inbound_proposal<ContextT: ConsensusContext>(
Expand Down Expand Up @@ -304,19 +290,19 @@ impl SingleHeightConsensus {
)]))
}
ShcEvent::ValidateProposal(
StateMachineEvent::Proposal(proposal_id, round, valid_round),
fin,
StateMachineEvent::Proposal(block_proposal_id, round, valid_round),
network_proposal_id,
) => {
// TODO(matan): Switch to signature validation.
let id = if proposal_id != fin {
let id = if block_proposal_id != network_proposal_id {
warn!(
"proposal_id built from content receiver does not match fin: {:#064x?} != \
{:#064x?}",
proposal_id, fin
block_proposal_id, network_proposal_id
);
None
} else {
proposal_id
block_proposal_id
};
// Retaining the entry for this round prevents us from receiving another proposal on
// this round. If the validations failed, which can be caused by a network issue, we
Expand Down
Loading

0 comments on commit f515d0c

Please sign in to comment.