diff --git a/Cargo.lock b/Cargo.lock index 3fddafbc6b..b70ac067ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7534,6 +7534,7 @@ dependencies = [ "papyrus_monitoring_gateway", "papyrus_network", "papyrus_p2p_sync", + "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "papyrus_sync", diff --git a/crates/papyrus_node/Cargo.toml b/crates/papyrus_node/Cargo.toml index 8f25b05355..cb290bfd6b 100644 --- a/crates/papyrus_node/Cargo.toml +++ b/crates/papyrus_node/Cargo.toml @@ -39,6 +39,7 @@ papyrus_consensus_orchestrator.workspace = true papyrus_monitoring_gateway.workspace = true papyrus_network.workspace = true papyrus_p2p_sync.workspace = true +papyrus_protobuf.workspace = true papyrus_rpc = { workspace = true, optional = true } papyrus_storage.workspace = true papyrus_sync.workspace = true diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 2ddd329799..5e2b0de3e7 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -13,14 +13,16 @@ use papyrus_common::pending_classes::PendingClasses; use papyrus_config::presentation::get_config_presentation; use papyrus_config::validators::config_validate; use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext; use papyrus_monitoring_gateway::MonitoringServer; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::NetworkManager; +use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager}; use papyrus_network::{network_manager, NetworkConfig}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels}; use papyrus_p2p_sync::server::{P2PSyncServer, P2PSyncServerChannels}; use papyrus_p2p_sync::{Protocol, BUFFER_SIZE}; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; #[cfg(feature = "rpc")] use papyrus_rpc::run_server; use papyrus_storage::{open_storage, update_storage_metrics, StorageReader, StorageWriter}; @@ -49,6 +51,7 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO; // different genesis hash. // TODO: Consider moving to a more general place. const GENESIS_HASH: &str = "0x0"; +pub const NETWORK_TOPIC: &str = "consensus_proposals"; // TODO(dvir): add this to config. // Duration between updates to the storage metrics (those in the collect_storage_metrics function). @@ -185,12 +188,25 @@ fn spawn_consensus( let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; + let proposal_network_channels: BroadcastTopicChannels> = + network_manager.register_broadcast_topic(Topic::new(NETWORK_TOPIC), BUFFER_SIZE)?; + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposal_network_channels; + + let (outbound_internal_sender, inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let context = PapyrusConsensusContext::new( storage_reader.clone(), network_channels.broadcast_topic_client.clone(), + // outbound_network_sender.clone(), + outbound_internal_sender, config.num_validators, None, ); + Ok(tokio::spawn(async move { Ok(papyrus_consensus::run_consensus( context, @@ -199,6 +215,7 @@ fn spawn_consensus( config.consensus_delay, config.timeouts.clone(), network_channels.into(), + inbound_internal_receiver, futures::stream::pending(), ) .await?) diff --git a/crates/papyrus_protobuf/src/consensus.rs b/crates/papyrus_protobuf/src/consensus.rs index 894b035b5e..44c57a3ae0 100644 --- a/crates/papyrus_protobuf/src/consensus.rs +++ b/crates/papyrus_protobuf/src/consensus.rs @@ -6,6 +6,7 @@ use starknet_api::transaction::{Transaction, TransactionHash}; use crate::converters::ProtobufConversionError; +// TODO(guyn): remove this once we integrate ProposalPart everywhere. #[derive(Debug, Default, Hash, Clone, Eq, PartialEq)] pub struct Proposal { pub height: u64, @@ -34,7 +35,7 @@ pub struct Vote { #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum ConsensusMessage { - Proposal(Proposal), + Proposal(Proposal), // To be deprecated Vote(Vote), } @@ -78,12 +79,12 @@ pub struct ProposalInit { pub struct TransactionBatch { /// The transactions in the batch. pub transactions: Vec, - // TODO(guyn): remove this once we settle how to convert transactions to ExecutableTransactions - /// The hashes of each transaction. + // TODO(guyn): remove this once we know how to get hashes as part of the compilation. + /// The transaction's hashes. pub tx_hashes: Vec, } -/// The propsal is done when receiving this fin message, which contains the block hash. +/// The proposal is done when receiving this fin message, which contains the block hash. #[derive(Debug, Clone, PartialEq)] pub struct ProposalFin { /// The block hash of the proposed block. @@ -102,6 +103,28 @@ pub enum ProposalPart { Fin(ProposalFin), } +impl TryInto for ProposalPart { + type Error = ProtobufConversionError; + + fn try_into(self: ProposalPart) -> Result { + match self { + ProposalPart::Init(init) => Ok(init), + _ => Err(ProtobufConversionError::WrongEnumVariant { + type_description: "ProposalPart", + value_as_str: format!("{:?}", self), + expected: "Init", + got: "Transactions or Fin", + }), + } + } +} + +impl From for ProposalPart { + fn from(value: ProposalInit) -> Self { + ProposalPart::Init(value) + } +} + impl std::fmt::Display for StreamMessage where T: Clone + Into> + TryFrom, Error = ProtobufConversionError>, diff --git a/crates/papyrus_protobuf/src/converters/consensus.rs b/crates/papyrus_protobuf/src/converters/consensus.rs index 178c7c9589..708a9ee3b9 100644 --- a/crates/papyrus_protobuf/src/converters/consensus.rs +++ b/crates/papyrus_protobuf/src/converters/consensus.rs @@ -227,6 +227,8 @@ impl From for protobuf::ProposalInit { auto_impl_into_and_try_from_vec_u8!(ProposalInit, protobuf::ProposalInit); +// TODO(guyn): remove tx_hashes once we know how to compile the hashes +// when making the executable transactions. impl TryFrom for TransactionBatch { type Error = ProtobufConversionError; fn try_from(value: protobuf::TransactionBatch) -> Result { @@ -311,6 +313,7 @@ impl From for protobuf::ProposalPart { auto_impl_into_and_try_from_vec_u8!(ProposalPart, protobuf::ProposalPart); +// TODO(guyn): remove this once we are happy with how proposals are sent separate from votes. impl TryFrom for ConsensusMessage { type Error = ProtobufConversionError; diff --git a/crates/papyrus_protobuf/src/converters/mod.rs b/crates/papyrus_protobuf/src/converters/mod.rs index a72d774d25..c2a19f9d8d 100644 --- a/crates/papyrus_protobuf/src/converters/mod.rs +++ b/crates/papyrus_protobuf/src/converters/mod.rs @@ -22,6 +22,13 @@ pub enum ProtobufConversionError { MissingField { field_description: &'static str }, #[error("Type `{type_description}` should be {num_expected} bytes but it got {value:?}.")] BytesDataLengthMismatch { type_description: &'static str, num_expected: usize, value: Vec }, + #[error("Type `{type_description}` got unexpected enum variant {value_as_str}")] + WrongEnumVariant { + type_description: &'static str, + value_as_str: String, + expected: &'static str, + got: &'static str, + }, #[error(transparent)] DecodeError(#[from] DecodeError), /// For CompressionError and serde_json::Error we put the string of the error instead of the diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 5dfa5c7ad5..fe5ad992da 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -37,13 +37,14 @@ pub async fn run_consensus( consensus_delay: Duration, timeouts: TimeoutsConfig, mut broadcast_channels: BroadcastConsensusMessageChannel, + mut inbound_proposal_receiver: mpsc::Receiver>, mut sync_receiver: SyncReceiverT, ) -> Result<(), ConsensusError> where ContextT: ConsensusContext, - SyncReceiverT: Stream + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, + SyncReceiverT: Stream + Unpin, { info!( "Running consensus, start_height={}, validator_id={}, consensus_delay={}, timeouts={:?}", @@ -61,7 +62,12 @@ where loop { metrics::gauge!(PAPYRUS_CONSENSUS_HEIGHT, current_height.0 as f64); - let run_height = manager.run_height(&mut context, current_height, &mut broadcast_channels); + let run_height = manager.run_height( + &mut context, + current_height, + &mut broadcast_channels, + &mut inbound_proposal_receiver, + ); // `run_height` is not cancel safe. Our implementation doesn't enable us to start and stop // it. We also cannot restart the height; when we dropped the future we dropped the state it @@ -106,6 +112,7 @@ impl MultiHeightManager { context: &mut ContextT, height: BlockNumber, broadcast_channels: &mut BroadcastConsensusMessageChannel, + proposal_receiver: &mut mpsc::Receiver>, ) -> Result where ContextT: ConsensusContext, @@ -186,6 +193,7 @@ impl MultiHeightManager { match message { ConsensusMessage::Proposal(proposal) => { // Special case due to fake streaming. + // TODO(guyn): this will be gone once we integrate the proposal channels. let (proposal_init, content_receiver, fin_receiver) = ProposalWrapper(proposal).into(); let res = shc @@ -224,9 +232,7 @@ impl MultiHeightManager { async fn next_message( cached_messages: &mut Vec, broadcast_channels: &mut BroadcastConsensusMessageChannel, -) -> Result -where -{ +) -> Result { let BroadcastConsensusMessageChannel { broadcasted_messages_receiver, broadcast_topic_client } = broadcast_channels; if let Some(msg) = cached_messages.pop() { diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 3f94d78fb4..374d35bd88 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -13,7 +13,7 @@ use papyrus_network::network_manager::test_utils::{ TestSubscriberChannels, }; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalPart, Vote}; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; @@ -32,12 +32,15 @@ lazy_static! { static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); } +const CHANNEL_SIZE: usize = 10; + mock! { pub TestContext {} #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, @@ -83,6 +86,11 @@ async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut sender = mock_network.broadcasted_messages_sender; + + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); + // Send messages for height 2 followed by those for height 1. send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; @@ -107,8 +115,15 @@ async fn manager_multiple_heights_unordered() { let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); let mut subscriber_channels = subscriber_channels.into(); - let decision = - manager.run_height(&mut context, BlockNumber(1), &mut subscriber_channels).await.unwrap(); + let decision = manager + .run_height( + &mut context, + BlockNumber(1), + &mut subscriber_channels, + &mut proposal_receiver_receiver, + ) + .await + .unwrap(); assert_eq!(decision.block, BlockHash(Felt::ONE)); // Run the manager for height 2. @@ -120,8 +135,15 @@ async fn manager_multiple_heights_unordered() { block_receiver }) .times(1); - let decision = - manager.run_height(&mut context, BlockNumber(2), &mut subscriber_channels).await.unwrap(); + let decision = manager + .run_height( + &mut context, + BlockNumber(2), + &mut subscriber_channels, + &mut proposal_receiver_receiver, + ) + .await + .unwrap(); assert_eq!(decision.block, BlockHash(Felt::TWO)); } @@ -131,6 +153,9 @@ async fn run_consensus_sync() { let mut context = MockTestContext::new(); let (decision_tx, decision_rx) = oneshot::channel(); + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::TWO)).unwrap(); @@ -164,6 +189,7 @@ async fn run_consensus_sync() { Duration::ZERO, TIMEOUTS.clone(), subscriber_channels.into(), + proposal_receiver_receiver, &mut sync_receiver, ) .await @@ -190,6 +216,9 @@ async fn run_consensus_sync_cancellation_safety() { let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel(); let (decision_tx, decision_rx) = oneshot::channel(); + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + context.expect_validate_proposal().return_once(move |_, _, _| { let (block_sender, block_receiver) = oneshot::channel(); block_sender.send(BlockHash(Felt::ONE)).unwrap(); @@ -223,6 +252,7 @@ async fn run_consensus_sync_cancellation_safety() { Duration::ZERO, TIMEOUTS.clone(), subscriber_channels.into(), + proposal_receiver_receiver, &mut sync_receiver, ) .await @@ -253,6 +283,11 @@ async fn test_timeouts() { let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut sender = mock_network.broadcasted_messages_sender; + + // TODO(guyn): refactor this test to pass proposals through the correct channels. + let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + mpsc::channel(CHANNEL_SIZE); + send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await; @@ -285,7 +320,12 @@ async fn test_timeouts() { let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone()); let manager_handle = tokio::spawn(async move { let decision = manager - .run_height(&mut context, BlockNumber(1), &mut subscriber_channels.into()) + .run_height( + &mut context, + BlockNumber(1), + &mut subscriber_channels.into(), + &mut proposal_receiver_receiver, + ) .await .unwrap(); assert_eq!(decision.block, BlockHash(Felt::ONE)); diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs index 0bd7250f12..4bd575da8d 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs @@ -83,7 +83,7 @@ mod tests { mock_register_broadcast_topic().unwrap(); let network_sender_to_inbound = mock_network.broadcasted_messages_sender; - // The inbound_receiver is given to StreamHandler to inbound to mock network messages. + // The inbound_receiver is given to StreamHandler to mock network messages. let BroadcastTopicChannels { broadcasted_messages_receiver: inbound_receiver, broadcast_topic_client: _, diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 303e30a049..980f5ee976 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -3,7 +3,14 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use mockall::mock; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote, VoteType}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + Proposal, + ProposalInit, + ProposalPart, + Vote, + VoteType, +}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -23,6 +30,7 @@ mock! { #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = u32; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index a4fcd53f31..751cb4b4ed 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -29,7 +29,13 @@ pub trait ConsensusContext { /// The chunks of content returned when iterating the proposal. // In practice I expect this to match the type sent to the network // (papyrus_protobuf::ConsensusMessage), and not to be specific to just the block's content. - type ProposalChunk; + type ProposalChunk; // TODO(guyn): deprecate this (and replace by ProposalPart) + type ProposalPart: TryFrom, Error = ProtobufConversionError> + + Into> + + TryInto + + From + + Clone + + Send; // TODO(matan): The oneshot for receiving the build block could be generalized to just be some // future which returns a block. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 5779d8a156..401a661293 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -23,7 +23,7 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; @@ -39,6 +39,7 @@ type HeightToIdToContent = BTreeMap, + _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, validators: Vec, sync_broadcast_sender: Option>, // Proposal building/validating returns immediately, leaving the actual processing to a spawned @@ -51,12 +52,14 @@ impl PapyrusConsensusContext { pub fn new( storage_reader: StorageReader, network_broadcast_client: BroadcastTopicClient, + _network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, sync_broadcast_sender: Option>, ) -> Self { Self { storage_reader, network_broadcast_client, + _network_proposal_sender, validators: (0..num_validators).map(ContractAddress::from).collect(), sync_broadcast_sender, valid_proposals: Arc::new(Mutex::new(BTreeMap::new())), @@ -67,10 +70,11 @@ impl PapyrusConsensusContext { #[async_trait] impl ConsensusContext for PapyrusConsensusContext { type ProposalChunk = Transaction; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, - init: ProposalInit, + proposal_init: ProposalInit, _timeout: Duration, ) -> oneshot::Receiver { let mut network_broadcast_sender = self.network_broadcast_client.clone(); @@ -83,39 +87,39 @@ impl ConsensusContext for PapyrusConsensusContext { // TODO(dvir): consider fix this for the case of reverts. If between the check that // the block in storage and to getting the transaction was a revert // this flow will fail. - wait_for_block(&storage_reader, init.height) + wait_for_block(&storage_reader, proposal_init.height) .await .expect("Failed to wait to block"); let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn"); let transactions = txn - .get_block_transactions(init.height) + .get_block_transactions(proposal_init.height) .expect("Get transactions from storage failed") .unwrap_or_else(|| { panic!( "Block in {} was not found in storage despite waiting for it", - init.height + proposal_init.height ) }); let block_hash = txn - .get_block_header(init.height) + .get_block_header(proposal_init.height) .expect("Get header from storage failed") .unwrap_or_else(|| { panic!( "Block in {} was not found in storage despite waiting for it", - init.height + proposal_init.height ) }) .block_hash; let proposal = Proposal { - height: init.height.0, - round: init.round, - proposer: init.proposer, + height: proposal_init.height.0, + round: proposal_init.round, + proposer: proposal_init.proposer, transactions: transactions.clone(), block_hash, - valid_round: init.valid_round, + valid_round: proposal_init.valid_round, }; network_broadcast_sender .broadcast_message(ConsensusMessage::Proposal(proposal)) @@ -125,7 +129,10 @@ impl ConsensusContext for PapyrusConsensusContext { let mut proposals = valid_proposals .lock() .expect("Lock on active proposals was poisoned due to a previous panic"); - proposals.entry(init.height).or_default().insert(block_hash, transactions); + proposals + .entry(proposal_init.height) + .or_default() + .insert(block_hash, transactions); } // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index 9c7e5b9f96..c2367be02b 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -2,12 +2,21 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::StreamExt; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::ConsensusContext; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, BroadcastNetworkMock, + TestSubscriberChannels, +}; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + ProposalInit, + ProposalPart, + StreamMessage, + Vote, }; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; use papyrus_storage::body::BodyStorageWriter; use papyrus_storage::header::HeaderStorageWriter; use papyrus_storage::test_utils::get_test_storage; @@ -107,10 +116,21 @@ fn test_setup() -> ( .unwrap(); let network_channels = mock_register_broadcast_topic().unwrap(); + let network_proposal_channels: TestSubscriberChannels> = + mock_register_broadcast_topic().unwrap(); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = network_proposal_channels.subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let sync_channels = mock_register_broadcast_topic().unwrap(); + let papyrus_context = PapyrusConsensusContext::new( storage_reader.clone(), network_channels.subscriber_channels.broadcast_topic_client, + outbound_internal_sender, 4, Some(sync_channels.subscriber_channels.broadcast_topic_client), ); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 8ee354a4f9..50278b5df3 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -64,17 +64,20 @@ pub struct SequencerConsensusContext { proposal_id: u64, current_height: Option, network_broadcast_client: BroadcastTopicClient, + _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, } impl SequencerConsensusContext { pub fn new( batcher: Arc, network_broadcast_client: BroadcastTopicClient, + _outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, num_validators: u64, ) -> Self { Self { batcher, network_broadcast_client, + _outbound_proposal_sender, validators: (0..num_validators).map(ValidatorId::from).collect(), valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())), proposal_id: 0, @@ -85,8 +88,9 @@ impl SequencerConsensusContext { #[async_trait] impl ConsensusContext for SequencerConsensusContext { - // TODO: Switch to ProposalPart when Guy merges the PR. + // TODO(guyn): Switch to ProposalPart when done with the streaming integration. type ProposalChunk = Vec; + type ProposalPart = ProposalPart; async fn build_proposal( &mut self, diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index c5b1c127c5..1b2ef6c171 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -5,6 +5,7 @@ use std::vec; use futures::channel::mpsc; use futures::SinkExt; use lazy_static::lazy_static; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::ConsensusContext; use papyrus_network::network_manager::test_utils::{ mock_register_broadcast_topic, @@ -79,12 +80,27 @@ async fn build_proposal() { }), }) }); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); let init = ProposalInit { height: BlockNumber(0), round: 0, @@ -132,12 +148,27 @@ async fn validate_proposal_success() { }) }, ); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); content_sender.send(TX_BATCH.clone()).await.unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), TIMEOUT, content_receiver).await; @@ -170,12 +201,27 @@ async fn repropose() { }) }, ); + // TODO(guyn): remove this first set of channels once we are using only the streaming channels. let TestSubscriberChannels { mock_network: _, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } = subscriber_channels; - let mut context = - SequencerConsensusContext::new(Arc::new(batcher), broadcast_topic_client, NUM_VALIDATORS); + + let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } = + mock_register_broadcast_topic().expect("Failed to create mock network"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = subscriber_channels; + let (outbound_internal_sender, _inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + + let mut context = SequencerConsensusContext::new( + Arc::new(batcher), + broadcast_topic_client, + outbound_internal_sender, + NUM_VALIDATORS, + ); // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); diff --git a/crates/starknet_api/src/transaction.rs b/crates/starknet_api/src/transaction.rs index 77eb7da72b..fede15d67e 100644 --- a/crates/starknet_api/src/transaction.rs +++ b/crates/starknet_api/src/transaction.rs @@ -155,12 +155,10 @@ impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Tra #[derive(Copy, Clone, Debug, Eq, PartialEq, Default)] pub struct TransactionOptions { /// Transaction that shouldn't be broadcasted to StarkNet. For example, users that want to - /// test the execution result of a transaction without the risk of it being rebroadcasted (the /// signature will be different while the execution remain the same). Using this flag will /// modify the transaction version by setting the 128-th bit to 1. pub only_query: bool, } - macro_rules! implement_v3_tx_getters { ($(($field:ident, $field_type:ty)),*) => { $(pub fn $field(&self) -> $field_type { diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index 1ee091bff2..98cf580e48 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -6,12 +6,17 @@ use futures::channel::mpsc::{self, SendError}; use futures::future::Ready; use futures::SinkExt; use libp2p::PeerId; +use papyrus_consensus::stream_handler::StreamHandler; use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError}; use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext; use papyrus_network::gossipsub_impl::Topic; -use papyrus_network::network_manager::{BroadcastTopicClient, NetworkManager}; +use papyrus_network::network_manager::{ + BroadcastTopicChannels, + BroadcastTopicClient, + NetworkManager, +}; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalPart, StreamMessage}; use starknet_batcher_types::communication::SharedBatcherClient; use starknet_sequencer_infra::component_definitions::ComponentStarter; use starknet_sequencer_infra::errors::ComponentError; @@ -22,6 +27,8 @@ use crate::config::ConsensusManagerConfig; // TODO(Dan, Guy): move to config. pub const BROADCAST_BUFFER_SIZE: usize = 100; pub const NETWORK_TOPIC: &str = "consensus_proposals"; +// TODO(guyn): remove this once we have integrated streaming. +pub const NETWORK_TOPIC2: &str = "streamed_consensus_proposals"; #[derive(Clone)] pub struct ConsensusManager { @@ -37,15 +44,33 @@ impl ConsensusManager { pub async fn run(&self) -> Result<(), ConsensusError> { let mut network_manager = NetworkManager::new(self.config.consensus_config.network_config.clone(), None); - let proposals_broadcast_channels = network_manager + + // TODO(guyn): remove this channel once we have integrated streaming. + let old_proposals_broadcast_channels = network_manager .register_broadcast_topic::( Topic::new(NETWORK_TOPIC), BROADCAST_BUFFER_SIZE, ) .expect("Failed to register broadcast topic"); + + let proposals_broadcast_channels = network_manager + .register_broadcast_topic::>( + Topic::new(NETWORK_TOPIC2), + BROADCAST_BUFFER_SIZE, + ) + .expect("Failed to register broadcast topic"); + let BroadcastTopicChannels { + broadcasted_messages_receiver: inbound_network_receiver, + broadcast_topic_client: outbound_network_sender, + } = proposals_broadcast_channels; + + let (outbound_internal_sender, inbound_internal_receiver, _) = + StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); + let context = SequencerConsensusContext::new( Arc::clone(&self.batcher_client), - proposals_broadcast_channels.broadcast_topic_client.clone(), + old_proposals_broadcast_channels.broadcast_topic_client.clone(), + outbound_internal_sender, self.config.consensus_config.num_validators, ); @@ -57,6 +82,7 @@ impl ConsensusManager { self.config.consensus_config.consensus_delay, self.config.consensus_config.timeouts.clone(), create_fake_network_channels(), + inbound_internal_receiver, futures::stream::pending(), );