Skip to content

Commit

Permalink
feat: allow a streamed proposal channel on top of existing one
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 2, 2024
1 parent 88ff3ce commit 6036394
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 49 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/papyrus_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ papyrus_consensus_orchestrator.workspace = true
papyrus_monitoring_gateway.workspace = true
papyrus_network.workspace = true
papyrus_p2p_sync.workspace = true
papyrus_protobuf.workspace = true
papyrus_rpc = { workspace = true, optional = true }
papyrus_storage.workspace = true
papyrus_sync.workspace = true
Expand Down
20 changes: 19 additions & 1 deletion crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ use papyrus_common::pending_classes::PendingClasses;
use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::NetworkManager;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::{network_manager, NetworkConfig};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels};
use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels};
use papyrus_p2p_sync::{Protocol, BUFFER_SIZE};
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
#[cfg(feature = "rpc")]
use papyrus_rpc::run_server;
use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter};
Expand Down Expand Up @@ -49,6 +51,8 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO;
// different genesis hash.
// TODO: Consider moving to a more general place.
const GENESIS_HASH: &str = "0x0";
// TODO(guyn): move this to the config.
pub const NETWORK_TOPIC: &str = "consensus_proposals";

// TODO(dvir): add this to config.
// Duration between updates to the storage metrics (those in the collect_storage_metrics function).
Expand Down Expand Up @@ -185,12 +189,25 @@ fn spawn_consensus(

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>> =
network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?;
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = proposal_network_channels;

// TODO(Matan): receive the handle for the StreamHandler and pass it into run_consensus below.
let (outbound_internal_sender, inbound_internal_receiver, _) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.broadcast_topic_client.clone(),
outbound_internal_sender,
config.num_validators,
None,
);

Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
Expand All @@ -199,6 +216,7 @@ fn spawn_consensus(
config.consensus_delay,
config.timeouts.clone(),
network_channels.into(),
inbound_internal_receiver,
futures::stream::pending(),
)
.await?)
Expand Down
30 changes: 26 additions & 4 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use starknet_api::transaction::{Transaction, TransactionHash};

use crate::converters::ProtobufConversionError;

// TODO(guyn): remove this once we integrate ProposalPart everywhere.
#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)]
pub struct Proposal {
pub height: u64,
Expand Down Expand Up @@ -34,7 +35,7 @@ pub struct Vote {

#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
Proposal(Proposal), // To be deprecated
Vote(Vote),
}

Expand Down Expand Up @@ -78,12 +79,12 @@ pub struct ProposalInit {
pub struct TransactionBatch {
/// The transactions in the batch.
pub transactions: Vec<Transaction>,
// TODO(guyn): remove this once we settle how to convert transactions to ExecutableTransactions
/// The hashes of each transaction.
// TODO(guyn): remove this once we know how to get hashes as part of the compilation.
/// The transaction's hashes.
pub tx_hashes: Vec<TransactionHash>,
}

/// The propsal is done when receiving this fin message, which contains the block hash.
/// The proposal is done when receiving this fin message, which contains the block hash.
#[derive(Debug, Clone, PartialEq)]
pub struct ProposalFin {
/// The block hash of the proposed block.
Expand All @@ -102,6 +103,27 @@ pub enum ProposalPart {
Fin(ProposalFin),
}

impl TryInto<ProposalInit> for ProposalPart {
type Error = ProtobufConversionError;

fn try_into(self: ProposalPart) -> Result<ProposalInit, Self::Error> {
match self {
ProposalPart::Init(init) => Ok(init),
_ => Err(ProtobufConversionError::WrongEnumVariant {
type_description: "ProposalPart",
expected: "Init",
value_as_str: format!("{:?}", self),
}),
}
}
}

impl From<ProposalInit> for ProposalPart {
fn from(value: ProposalInit) -> Self {
ProposalPart::Init(value)
}
}

impl<T> std::fmt::Display for StreamMessage<T>
where
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
Expand Down
3 changes: 3 additions & 0 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ impl From<ProposalInit> for protobuf::ProposalInit {

auto_impl_into_and_try_from_vec_u8!(ProposalInit, protobuf::ProposalInit);

// TODO(guyn): remove tx_hashes once we know how to compile the hashes
// when making the executable transactions.
impl TryFrom<protobuf::TransactionBatch> for TransactionBatch {
type Error = ProtobufConversionError;
fn try_from(value: protobuf::TransactionBatch) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -311,6 +313,7 @@ impl From<ProposalPart> for protobuf::ProposalPart {

auto_impl_into_and_try_from_vec_u8!(ProposalPart, protobuf::ProposalPart);

// TODO(guyn): remove this once we are happy with how proposals are sent separate from votes.
impl TryFrom<protobuf::ConsensusMessage> for ConsensusMessage {
type Error = ProtobufConversionError;

Expand Down
6 changes: 6 additions & 0 deletions crates/papyrus_protobuf/src/converters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ pub enum ProtobufConversionError {
MissingField { field_description: &'static str },
#[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")]
BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec<u8> },
#[error("Type `{type_description}` got unexpected enum variant {value_as_str}")]
WrongEnumVariant {
type_description: &'static str,
value_as_str: String,
expected: &'static str,
},
#[error(transparent)]
DecodeError(#[from] DecodeError),
/// For CompressionError and serde_json::Error we put the string of the error instead of the
Expand Down
16 changes: 11 additions & 5 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ pub async fn run_consensus<ContextT, SyncReceiverT>(
consensus_delay: Duration,
timeouts: TimeoutsConfig,
mut broadcast_channels: BroadcastConsensusMessageChannel,
mut inbound_proposal_receiver: mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
mut sync_receiver: SyncReceiverT,
) -> Result<(), ConsensusError>
where
ContextT: ConsensusContext,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
ProposalWrapper:
Into<(ProposalInit, mpsc::Receiver<ContextT::ProposalChunk>, oneshot::Receiver<BlockHash>)>,
SyncReceiverT: Stream<Item = BlockNumber> + Unpin,
{
info!(
"Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}",
Expand All @@ -61,7 +62,12 @@ where
loop {
metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64);

let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels);
let run_height = manager.run_height(
&mut context,
current_height,
&mut broadcast_channels,
&mut inbound_proposal_receiver,
);

// `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop
// it. We also cannot restart the height; when we dropped the future we dropped the state it
Expand Down Expand Up @@ -106,6 +112,7 @@ impl MultiHeightManager {
context: &mut ContextT,
height: BlockNumber,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
Expand Down Expand Up @@ -186,6 +193,7 @@ impl MultiHeightManager {
match message {
ConsensusMessage::Proposal(proposal) => {
// Special case due to fake streaming.
// TODO(guyn): this will be gone once we integrate the proposal channels.
let (proposal_init, content_receiver, fin_receiver) =
ProposalWrapper(proposal).into();
let res = shc
Expand Down Expand Up @@ -224,9 +232,7 @@ impl MultiHeightManager {
async fn next_message(
cached_messages: &mut Vec<ConsensusMessage>,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
) -> Result<ConsensusMessage, ConsensusError>
where
{
) -> Result<ConsensusMessage, ConsensusError> {
let BroadcastConsensusMessageChannel { broadcasted_messages_receiver, broadcast_topic_client } =
broadcast_channels;
if let Some(msg) = cached_messages.pop() {
Expand Down
52 changes: 46 additions & 6 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use papyrus_network::network_manager::test_utils::{
TestSubscriberChannels,
};
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalPart, Vote};
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::transaction::Transaction;
Expand All @@ -32,12 +32,15 @@ lazy_static! {
static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default();
}

const CHANNEL_SIZE: usize = 10;

mock! {
pub TestContext {}

#[async_trait]
impl ConsensusContext for TestContext {
type ProposalChunk = Transaction;
type ProposalPart = ProposalPart;

async fn build_proposal(
&mut self,
Expand Down Expand Up @@ -87,6 +90,11 @@ async fn manager_multiple_heights_unordered() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

// Send messages for height 2 followed by those for height 1.
send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
Expand All @@ -112,8 +120,15 @@ async fn manager_multiple_heights_unordered() {

let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone());
let mut subscriber_channels = subscriber_channels.into();
let decision =
manager.run_height(&mut context, BlockNumber(1), &mut subscriber_channels).await.unwrap();
let decision = manager
.run_height(
&mut context,
BlockNumber(1),
&mut subscriber_channels,
&mut proposal_receiver_receiver,
)
.await
.unwrap();
assert_eq!(decision.block, BlockHash(Felt::ONE));

// Run the manager for height 2.
Expand All @@ -125,8 +140,15 @@ async fn manager_multiple_heights_unordered() {
block_receiver
})
.times(1);
let decision =
manager.run_height(&mut context, BlockNumber(2), &mut subscriber_channels).await.unwrap();
let decision = manager
.run_height(
&mut context,
BlockNumber(2),
&mut subscriber_channels,
&mut proposal_receiver_receiver,
)
.await
.unwrap();
assert_eq!(decision.block, BlockHash(Felt::TWO));
}

Expand All @@ -136,6 +158,9 @@ async fn run_consensus_sync() {
let mut context = MockTestContext::new();
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
Expand Down Expand Up @@ -170,6 +195,7 @@ async fn run_consensus_sync() {
Duration::ZERO,
TIMEOUTS.clone(),
subscriber_channels.into(),
proposal_receiver_receiver,
&mut sync_receiver,
)
.await
Expand All @@ -196,6 +222,9 @@ async fn run_consensus_sync_cancellation_safety() {
let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel();
let (decision_tx, decision_rx) = oneshot::channel();

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
Expand Down Expand Up @@ -230,6 +259,7 @@ async fn run_consensus_sync_cancellation_safety() {
Duration::ZERO,
TIMEOUTS.clone(),
subscriber_channels.into(),
proposal_receiver_receiver,
&mut sync_receiver,
)
.await
Expand Down Expand Up @@ -260,6 +290,11 @@ async fn test_timeouts() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await;
Expand Down Expand Up @@ -293,7 +328,12 @@ async fn test_timeouts() {
let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone());
let manager_handle = tokio::spawn(async move {
let decision = manager
.run_height(&mut context, BlockNumber(1), &mut subscriber_channels.into())
.run_height(
&mut context,
BlockNumber(1),
&mut subscriber_channels.into(),
&mut proposal_receiver_receiver,
)
.await
.unwrap();
assert_eq!(decision.block, BlockHash(Felt::ONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ mod tests {
mock_register_broadcast_topic().unwrap();
let network_sender_to_inbound = mock_network.broadcasted_messages_sender;

// The inbound_receiver is given to StreamHandler to inbound to mock network messages.
// The inbound_receiver is given to StreamHandler to mock network messages.
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_receiver,
broadcast_topic_client: _,
Expand Down
10 changes: 9 additions & 1 deletion crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use mockall::mock;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote, VoteType};
use papyrus_protobuf::consensus::{
ConsensusMessage,
Proposal,
ProposalInit,
ProposalPart,
Vote,
VoteType,
};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;

Expand All @@ -23,6 +30,7 @@ mock! {
#[async_trait]
impl ConsensusContext for TestContext {
type ProposalChunk = u32;
type ProposalPart = ProposalPart;

async fn build_proposal(
&mut self,
Expand Down
Loading

0 comments on commit 6036394

Please sign in to comment.