Skip to content

Commit

Permalink
feat: broadcast proposal as stream
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Nov 27, 2024
1 parent 2f7526b commit b14f1c8
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ use papyrus_consensus::types::{
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalFin, ProposalInit, ProposalPart, TransactionBatch, Vote};
use papyrus_protobuf::consensus::{
ConsensusMessage,
Proposal,
ProposalFin,
ProposalInit,
ProposalPart,
TransactionBatch,
Vote,
};
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ async fn stream_build_proposal(
height
);
debug!("Broadcasting proposal fin: {proposal_content_id:?}");
println!("Broadcasting proposal fin: {proposal_content_id:?}");
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id }))
.await
Expand All @@ -343,10 +344,6 @@ async fn stream_build_proposal(
.entry(height)
.or_default()
.insert(proposal_content_id, (content, proposal_id));
// proposal_sender
// .send(ProposalPart::Fin(ProposalFin { proposal_content_id }))
// .await
// .expect("Failed to broadcast proposal fin");
if fin_sender.send(proposal_content_id).is_err() {
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,7 @@ async fn repropose() {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = subscriber_channels;
<<<<<<< HEAD
let (outbound_internal_sender, _inbound_internal_receiver, _) =
=======
let (outbound_internal_sender, _inbound_internal_receiver) =
>>>>>>> 883a253be (feat: allow a streamed proposal channel on top of existing one)
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let mut context = SequencerConsensusContext::new(
Expand Down
4 changes: 2 additions & 2 deletions crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::net::SocketAddr;

use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_gateway_types::errors::GatewaySpecError;
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct FlowTestSetup {
pub sequencer_node_handle: JoinHandle<Result<(), anyhow::Error>>,

// Channels for consensus proposals, used for asserting the right transactions are proposed.
pub consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
pub consensus_proposals_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>>,
}

impl FlowTestSetup {
Expand Down
9 changes: 5 additions & 4 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mempool_test_utils::starknet_api_test_utils::{
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use papyrus_storage::StorageConfig;
use reqwest::{Client, Response};
use starknet_api::block::BlockNumber;
Expand Down Expand Up @@ -49,7 +49,7 @@ pub async fn create_config(
chain_info: ChainInfo,
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<ProposalPart>) {
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let fee_token_addresses = chain_info.fee_token_addresses.clone();
let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info.clone()).await;
Expand Down Expand Up @@ -77,11 +77,12 @@ pub async fn create_config(
}

fn create_consensus_manager_config_and_channels()
-> (ConsensusManagerConfig, BroadcastTopicChannels<ProposalPart>) {
-> (ConsensusManagerConfig, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let (network_config, broadcast_channels) =
create_network_config_connected_to_broadcast_channels(
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC,
// TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming.
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2,
),
);
let consensus_manager_config = ConsensusManagerConfig {
Expand Down
26 changes: 18 additions & 8 deletions crates/starknet_integration_tests/tests/end_to_end_flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ use std::collections::HashSet;
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart};
use papyrus_protobuf::consensus::{
ProposalFin,
ProposalInit,
ProposalPart,
StreamMessage,
StreamMessageBody,
};
use papyrus_storage::test_utils::CHAIN_ID_FOR_TESTS;
use pretty_assertions::assert_eq;
use rstest::{fixture, rstest};
Expand Down Expand Up @@ -48,7 +54,7 @@ async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) {
}

async fn listen_to_broadcasted_messages(
consensus_proposals_channels: &mut BroadcastTopicChannels<ProposalPart>,
consensus_proposals_channels: &mut BroadcastTopicChannels<StreamMessage<ProposalPart>>,
expected_batched_tx_hashes: &[TransactionHash],
) {
let chain_id = CHAIN_ID_FOR_TESTS.clone();
Expand All @@ -68,24 +74,28 @@ async fn listen_to_broadcasted_messages(
)),
};
assert_eq!(
broadcasted_messages_receiver.next().await.unwrap().0.unwrap(),
ProposalPart::Init(expected_proposal_init)
broadcasted_messages_receiver.next().await.unwrap().0.unwrap().message,
StreamMessageBody::Content(ProposalPart::Init(expected_proposal_init))
);
loop {
match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() {
ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init),
ProposalPart::Fin(proposal_fin) => {
match broadcasted_messages_receiver.next().await.unwrap().0.unwrap().message {
StreamMessageBody::Content(ProposalPart::Init(init)) => {
panic!("Unexpected init: {:?}", init)
}
StreamMessageBody::Content(ProposalPart::Fin(proposal_fin)) => {
assert_eq!(proposal_fin, expected_proposal_fin);
break;
}
ProposalPart::Transactions(transactions) => {
StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => {
received_tx_hashes.extend(
transactions
.transactions
.iter()
.map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()),
);
}
// Ignore this, in case it comes out of the network before some of the other messages.
StreamMessageBody::Fin => (),
}
}
// Using HashSet to ignore the order of the transactions (broadcast can lead to reordering).
Expand Down

0 comments on commit b14f1c8

Please sign in to comment.