diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 89945cec95..1637c19322 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -23,7 +23,15 @@ use papyrus_consensus::types::{ ValidatorId, }; use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; -use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalFin, ProposalInit, ProposalPart, TransactionBatch, Vote}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + Proposal, + ProposalFin, + ProposalInit, + ProposalPart, + TransactionBatch, + Vote, +}; use papyrus_storage::body::BodyStorageReader; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader}; diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 83ce87067f..47207cee13 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -332,6 +332,7 @@ async fn stream_build_proposal( height ); debug!("Broadcasting proposal fin: {proposal_content_id:?}"); + println!("Broadcasting proposal fin: {proposal_content_id:?}"); proposal_sender .send(ProposalPart::Fin(ProposalFin { proposal_content_id })) .await @@ -343,10 +344,6 @@ async fn stream_build_proposal( .entry(height) .or_default() .insert(proposal_content_id, (content, proposal_id)); - // proposal_sender - // .send(ProposalPart::Fin(ProposalFin { proposal_content_id })) - // .await - // .expect("Failed to broadcast proposal fin"); if fin_sender.send(proposal_content_id).is_err() { // Consensus may exit early (e.g. sync). warn!("Failed to send proposal content id"); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index aced9b6ef6..1b2ef6c171 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -213,11 +213,7 @@ async fn repropose() { broadcasted_messages_receiver: inbound_network_receiver, broadcast_topic_client: outbound_network_sender, } = subscriber_channels; -<<<<<<< HEAD let (outbound_internal_sender, _inbound_internal_receiver, _) = -======= - let (outbound_internal_sender, _inbound_internal_receiver) = ->>>>>>> 883a253be (feat: allow a streamed proposal channel on top of existing one) StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender); let mut context = SequencerConsensusContext::new( diff --git a/crates/starknet_integration_tests/src/flow_test_setup.rs b/crates/starknet_integration_tests/src/flow_test_setup.rs index cecefd60cd..d2cc04d95a 100644 --- a/crates/starknet_integration_tests/src/flow_test_setup.rs +++ b/crates/starknet_integration_tests/src/flow_test_setup.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalPart; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_gateway_types::errors::GatewaySpecError; @@ -32,7 +32,7 @@ pub struct FlowTestSetup { pub sequencer_node_handle: JoinHandle>, // Channels for consensus proposals, used for asserting the right transactions are proposed. - pub consensus_proposals_channels: BroadcastTopicChannels, + pub consensus_proposals_channels: BroadcastTopicChannels>, } impl FlowTestSetup { diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index fb89e98051..814920f375 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -14,7 +14,7 @@ use mempool_test_utils::starknet_api_test_utils::{ use papyrus_consensus::config::ConsensusConfig; use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::ProposalPart; +use papyrus_protobuf::consensus::{ProposalPart, StreamMessage}; use papyrus_storage::StorageConfig; use reqwest::{Client, Response}; use starknet_api::block::BlockNumber; @@ -49,7 +49,7 @@ pub async fn create_config( chain_info: ChainInfo, rpc_server_addr: SocketAddr, batcher_storage_config: StorageConfig, -) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels) { +) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels>) { let fee_token_addresses = chain_info.fee_token_addresses.clone(); let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone()); let gateway_config = create_gateway_config(chain_info.clone()).await; @@ -77,11 +77,12 @@ pub async fn create_config( } fn create_consensus_manager_config_and_channels() --> (ConsensusManagerConfig, BroadcastTopicChannels) { +-> (ConsensusManagerConfig, BroadcastTopicChannels>) { let (network_config, broadcast_channels) = create_network_config_connected_to_broadcast_channels( papyrus_network::gossipsub_impl::Topic::new( - starknet_consensus_manager::consensus_manager::NETWORK_TOPIC, + // TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming. + starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2, ), ); let consensus_manager_config = ConsensusManagerConfig { diff --git a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs index 367a97c138..e97679bb16 100644 --- a/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs +++ b/crates/starknet_integration_tests/tests/end_to_end_flow_test.rs @@ -3,7 +3,13 @@ use std::collections::HashSet; use futures::StreamExt; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart}; +use papyrus_protobuf::consensus::{ + ProposalFin, + ProposalInit, + ProposalPart, + StreamMessage, + StreamMessageBody, +}; use papyrus_storage::test_utils::CHAIN_ID_FOR_TESTS; use pretty_assertions::assert_eq; use rstest::{fixture, rstest}; @@ -48,7 +54,7 @@ async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) { } async fn listen_to_broadcasted_messages( - consensus_proposals_channels: &mut BroadcastTopicChannels, + consensus_proposals_channels: &mut BroadcastTopicChannels>, expected_batched_tx_hashes: &[TransactionHash], ) { let chain_id = CHAIN_ID_FOR_TESTS.clone(); @@ -68,17 +74,19 @@ async fn listen_to_broadcasted_messages( )), }; assert_eq!( - broadcasted_messages_receiver.next().await.unwrap().0.unwrap(), - ProposalPart::Init(expected_proposal_init) + broadcasted_messages_receiver.next().await.unwrap().0.unwrap().message, + StreamMessageBody::Content(ProposalPart::Init(expected_proposal_init)) ); loop { - match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() { - ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init), - ProposalPart::Fin(proposal_fin) => { + match broadcasted_messages_receiver.next().await.unwrap().0.unwrap().message { + StreamMessageBody::Content(ProposalPart::Init(init)) => { + panic!("Unexpected init: {:?}", init) + } + StreamMessageBody::Content(ProposalPart::Fin(proposal_fin)) => { assert_eq!(proposal_fin, expected_proposal_fin); break; } - ProposalPart::Transactions(transactions) => { + StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => { received_tx_hashes.extend( transactions .transactions @@ -86,6 +94,8 @@ async fn listen_to_broadcasted_messages( .map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()), ); } + // Ignore this, in case it comes out of the network before some of the other messages. + StreamMessageBody::Fin => (), } } // Using HashSet to ignore the order of the transactions (broadcast can lead to reordering).