Skip to content

Commit

Permalink
feat(consensus): add streamed validation
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 2, 2024
1 parent 455f399 commit 89628e8
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 197 deletions.
4 changes: 1 addition & 3 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO;
// different genesis hash.
// TODO: Consider moving to a more general place.
const GENESIS_HASH: &str = "0x0";
<<<<<<< HEAD

// TODO(guyn): move this to the config.
=======
>>>>>>> 93fd11f32 (feat: allow a streamed proposal channel on top of existing one)
pub const NETWORK_TOPIC: &str = "consensus_proposals";

// TODO(dvir): add this to config.
Expand Down
4 changes: 0 additions & 4 deletions crates/papyrus_protobuf/src/converters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ pub enum ProtobufConversionError {
type_description: &'static str,
value_as_str: String,
expected: &'static str,
<<<<<<< HEAD
=======
got: &'static str,
>>>>>>> 93fd11f32 (feat: allow a streamed proposal channel on top of existing one)
},
#[error(transparent)]
DecodeError(#[from] DecodeError),
Expand Down
64 changes: 38 additions & 26 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ mod manager_test;
use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper};
use starknet_api::block::{BlockHash, BlockNumber};
use tracing::{debug, info, instrument};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, warn};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand Down Expand Up @@ -42,8 +42,6 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
) -> Result<(), ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<ContextT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
info!(
Expand Down Expand Up @@ -116,11 +114,6 @@ impl MultiHeightManager {
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
Expand All @@ -147,6 +140,17 @@ impl MultiHeightManager {
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
// Get the first message to verify the init was sent.
// TODO(guyn): what happens if the channel never sends anything?
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
"Proposal receiver closed".to_string(),
));
};
let proposal_init: ProposalInit = first_part.try_into()?;
self.handle_proposal(context, height, &mut shc, proposal_init, content_receiver).await?
},
Some(shc_event) = shc_events.next() => {
shc.handle_event(context, shc_event).await?
},
Expand All @@ -163,6 +167,26 @@ impl MultiHeightManager {
}
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: ProposalInit,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
if proposal_init.height != height {
// TODO(guyn): add caching of heights for future use.
warn!("Received a proposal for a different height. {:?}", proposal_init);
}
shc.handle_proposal(context, proposal_init.into(), content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
&mut self,
Expand All @@ -173,11 +197,6 @@ impl MultiHeightManager {
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
ProposalWrapper: Into<(
ProposalInit,
mpsc::Receiver<ContextT::ProposalChunk>,
oneshot::Receiver<BlockHash>,
)>,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
Expand All @@ -191,16 +210,9 @@ impl MultiHeightManager {
return Ok(ShcReturn::Tasks(Vec::new()));
}
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
// TODO(guyn): this will be gone once we integrate the proposal channels.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
.handle_proposal(context, proposal_init, content_receiver, fin_receiver)
.await?;
Ok(res)
}
ConsensusMessage::Proposal(_) => Err(ConsensusError::InternalNetworkError(
"Proposal variant of ConsensusMessage no longer supported".to_string(),
)),
_ => {
let res = shc.handle_message(context, message).await?;
Ok(res)
Expand Down
53 changes: 35 additions & 18 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use starknet_types_core::felt::Felt;

use super::{run_consensus, MultiHeightManager};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal};
use crate::test_utils::{precommit, prevote, proposal_fin, proposal_init};
use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId};

lazy_static! {
Expand Down Expand Up @@ -53,8 +53,8 @@ mock! {
height: BlockNumber,
round: Round,
timeout: Duration,
content: mpsc::Receiver<Transaction>
) -> oneshot::Receiver<ProposalContentId>;
content: mpsc::Receiver<ProposalPart>
) -> oneshot::Receiver<(ProposalContentId, ProposalContentId)>;

async fn repropose(
&mut self,
Expand Down Expand Up @@ -85,21 +85,29 @@ async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg:
sender.send((msg, broadcasted_message_metadata)).await.unwrap();
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn manager_multiple_heights_unordered() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

// Send messages for height 2 followed by those for height 1.
send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap();
proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::TWO))).await.unwrap();
send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;

let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
proposal_sender.send(ProposalPart::Fin(proposal_fin(Felt::ONE))).await.unwrap();
send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;

Expand All @@ -109,7 +117,7 @@ async fn manager_multiple_heights_unordered() {
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap();
block_receiver
})
.times(1);
Expand All @@ -136,7 +144,7 @@ async fn manager_multiple_heights_unordered() {
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_sender.send((BlockHash(Felt::TWO), BlockHash(Felt::TWO))).unwrap();
block_receiver
})
.times(1);
Expand All @@ -152,18 +160,19 @@ async fn manager_multiple_heights_unordered() {
assert_eq!(decision.block, BlockHash(Felt::TWO));
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn run_consensus_sync() {
// Set expectations.
let mut context = MockTestContext::new();
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_sender.send((BlockHash(Felt::TWO), BlockHash(Felt::TWO))).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
Expand All @@ -181,7 +190,9 @@ async fn run_consensus_sync() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut network_sender = mock_network.broadcasted_messages_sender;
send(&mut network_sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))).await.unwrap();
send(&mut network_sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut network_sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;

Expand Down Expand Up @@ -223,11 +234,11 @@ async fn run_consensus_sync_cancellation_safety() {
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
Expand Down Expand Up @@ -267,7 +278,9 @@ async fn run_consensus_sync_cancellation_safety() {
let mut network_sender = mock_network.broadcasted_messages_sender;

// Send a proposal for height 1.
send(&mut network_sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
proposal_handled_rx.await.unwrap();

// Send an old sync. This should not cancel the current height.
Expand All @@ -292,10 +305,12 @@ async fn test_timeouts() {
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))).await.unwrap();
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await;
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_2)).await;
Expand All @@ -305,7 +320,7 @@ async fn test_timeouts() {
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_sender.send((BlockHash(Felt::ONE), BlockHash(Felt::ONE))).unwrap();
block_receiver
});
context
Expand Down Expand Up @@ -343,7 +358,9 @@ async fn test_timeouts() {
timeout_receive.await.unwrap();
// Show that after the timeout is triggered we can still precommit in favor of the block and
// reach a decision.
send(&mut sender, proposal(Felt::ONE, 1, 1, *PROPOSER_ID)).await;
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))).await.unwrap();
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_3)).await;
Expand Down
Loading

0 comments on commit 89628e8

Please sign in to comment.