Skip to content

Commit

Permalink
feat(sequencing): broadcast proposal in a stream (#2286)
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware authored Dec 2, 2024
1 parent fdb3467 commit 87dcb06
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
Expand All @@ -23,7 +23,15 @@ use papyrus_consensus::types::{
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, ProposalInit, ProposalPart, Vote};
use papyrus_protobuf::consensus::{
ConsensusMessage,
Proposal,
ProposalFin,
ProposalInit,
ProposalPart,
TransactionBatch,
Vote,
};
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
Expand All @@ -36,10 +44,12 @@ use tracing::{debug, debug_span, info, warn, Instrument};

type HeightToIdToContent = BTreeMap<BlockNumber, HashMap<ProposalContentId, Vec<Transaction>>>;

const CHANNEL_SIZE: usize = 100;

pub struct PapyrusConsensusContext {
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
_network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
validators: Vec<ValidatorId>,
sync_broadcast_sender: Option<BroadcastTopicClient<Vote>>,
// Proposal building/validating returns immediately, leaving the actual processing to a spawned
Expand All @@ -52,14 +62,14 @@ impl PapyrusConsensusContext {
pub fn new(
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<ConsensusMessage>,
_network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
network_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastTopicClient<Vote>>,
) -> Self {
Self {
storage_reader,
network_broadcast_client,
_network_proposal_sender,
network_proposal_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
sync_broadcast_sender,
valid_proposals: Arc::new(Mutex::new(BTreeMap::new())),
Expand All @@ -77,7 +87,7 @@ impl ConsensusContext for PapyrusConsensusContext {
proposal_init: ProposalInit,
_timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
let mut network_broadcast_sender = self.network_broadcast_client.clone();
let mut proposal_sender_sender = self.network_proposal_sender.clone();
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
Expand Down Expand Up @@ -113,18 +123,27 @@ impl ConsensusContext for PapyrusConsensusContext {
})
.block_hash;

let proposal = Proposal {
height: proposal_init.height.0,
round: proposal_init.round,
proposer: proposal_init.proposer,
transactions: transactions.clone(),
block_hash,
valid_round: proposal_init.valid_round,
};
network_broadcast_sender
.broadcast_message(ConsensusMessage::Proposal(proposal))
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = proposal_init.height.0;
proposal_sender_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(Self::ProposalPart::Init(proposal_init.clone()))
.await
.expect("Failed to send proposal init");
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch {
transactions: transactions.clone(),
tx_hashes: vec![],
}))
.await
.expect("Failed to send transactions");
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id: block_hash }))
.await
.expect("Failed to send proposal");
.expect("Failed to send fin");
{
let mut proposals = valid_proposals
.lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use futures::{SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
ProposalContentId,
Round,
ValidatorId,
};
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
Expand Down Expand Up @@ -54,6 +54,8 @@ type HeightToIdToContent =
BTreeMap<BlockNumber, HashMap<ProposalContentId, (Vec<Transaction>, ProposalId)>>;
type ValidationParams = (BlockNumber, Duration, mpsc::Receiver<Vec<Transaction>>);

const CHANNEL_SIZE: usize = 100;

pub struct SequencerConsensusContext {
batcher: Arc<dyn BatcherClient>,
validators: Vec<ValidatorId>,
Expand All @@ -69,28 +71,28 @@ pub struct SequencerConsensusContext {
current_round: Round,
// Used to broadcast proposals to other consensus nodes.
// TODO(Guy) switch to the actual streaming struct.
proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
// The active proposal refers to the proposal being validated at the current height/round.
// Building proposals are not tracked as active, as consensus can't move on to the next
// height/round until building is done. Context only works on proposals for the
// current round.
active_proposal: Option<(Arc<Notify>, JoinHandle<()>)>,
// Stores proposals for future rounds until the round is reached.
queued_proposals: BTreeMap<Round, (ValidationParams, oneshot::Sender<ProposalContentId>)>,
_outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
}

impl SequencerConsensusContext {
pub fn new(
batcher: Arc<dyn BatcherClient>,
proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
_outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
_proposal_streaming_client: BroadcastTopicClient<ProposalPart>,
outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
num_validators: u64,
) -> Self {
Self {
batcher,
proposal_streaming_client,
_outbound_proposal_sender,
_proposal_streaming_client,
outbound_proposal_sender,
validators: (0..num_validators).map(ValidatorId::from).collect(),
valid_proposals: Arc::new(Mutex::new(HeightToIdToContent::new())),
proposal_id: 0,
Expand Down Expand Up @@ -147,19 +149,24 @@ impl ConsensusContext for SequencerConsensusContext {
.await
.expect("Failed to initiate proposal build");
debug!("Broadcasting proposal init: {proposal_init:?}");
self.proposal_streaming_client
.broadcast_message(ProposalPart::Init(proposal_init.clone()))
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
let stream_id = proposal_init.height.0;
self.outbound_proposal_sender
.send((stream_id, proposal_receiver))
.await
.expect("Failed to send proposal receiver");
proposal_sender
.send(ProposalPart::Init(proposal_init.clone()))
.await
.expect("Failed to broadcast proposal init");
let broadcast_client = self.proposal_streaming_client.clone();
.expect("Failed to send proposal init");
tokio::spawn(
async move {
stream_build_proposal(
proposal_init.height,
proposal_id,
batcher,
valid_proposals,
broadcast_client,
proposal_sender,
fin_sender,
)
.await;
Expand Down Expand Up @@ -352,16 +359,16 @@ impl SequencerConsensusContext {

// Handles building a new proposal without blocking consensus:
// 1. Receive chunks of content from the batcher.
// 2. Forward these to consensus to be streamed out to the network.
// 2. Forward these to the stream handler to be streamed out to the network.
// 3. Once finished, receive the commitment from the batcher.
// 4. Store the proposal for re-proposal.
// 5. Send the commitment to consensus.
// 5. Send the commitment to the stream handler (to send fin).
async fn stream_build_proposal(
height: BlockNumber,
proposal_id: ProposalId,
batcher: Arc<dyn BatcherClient>,
valid_proposals: Arc<Mutex<HeightToIdToContent>>,
mut broadcast_client: BroadcastTopicClient<ProposalPart>,
mut proposal_sender: mpsc::Sender<ProposalPart>,
fin_sender: oneshot::Sender<ProposalContentId>,
) {
let mut content = Vec::new();
Expand All @@ -388,8 +395,8 @@ async fn stream_build_proposal(
}
debug!("Broadcasting proposal content: {transaction_hashes:?}");
trace!("Broadcasting proposal content: {transactions:?}");
broadcast_client
.broadcast_message(ProposalPart::Transactions(TransactionBatch {
proposal_sender
.send(ProposalPart::Transactions(TransactionBatch {
transactions,
tx_hashes: transaction_hashes,
}))
Expand All @@ -407,8 +414,8 @@ async fn stream_build_proposal(
height
);
debug!("Broadcasting proposal fin: {proposal_content_id:?}");
broadcast_client
.broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id }))
proposal_sender
.send(ProposalPart::Fin(ProposalFin { proposal_content_id }))
.await
.expect("Failed to broadcast proposal fin");
// Update valid_proposals before sending fin to avoid a race condition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
TestSubscriberChannels,
};
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{ProposalInit, ProposalPart};
use papyrus_protobuf::consensus::{ProposalInit, ProposalPart, StreamMessage};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::{ContractAddress, StateDiffCommitment};
use starknet_api::executable_transaction::{AccountTransaction, Transaction};
Expand Down Expand Up @@ -52,18 +53,20 @@ fn generate_invoke_tx(tx_hash: Felt) -> Transaction {
})))
}

fn make_streaming_channels()
-> (mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>, mpsc::Receiver<mpsc::Receiver<ProposalPart>>)
{
let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } =
fn make_streaming_channels() -> (
mpsc::Sender<(u64, mpsc::Receiver<ProposalPart>)>,
mpsc::Receiver<mpsc::Receiver<ProposalPart>>,
BroadcastNetworkMock<StreamMessage<ProposalPart>>,
) {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().expect("Failed to create mock network");
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
} = subscriber_channels;
let (outbound_internal_sender, inbound_internal_receiver, _) =
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);
(outbound_internal_sender, inbound_internal_receiver)
(outbound_internal_sender, inbound_internal_receiver, mock_network)
}

#[tokio::test]
Expand Down Expand Up @@ -99,7 +102,8 @@ async fn build_proposal() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -160,7 +164,8 @@ async fn validate_proposal_success() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -210,7 +215,8 @@ async fn repropose() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -280,7 +286,8 @@ async fn proposals_from_different_rounds() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down Expand Up @@ -363,7 +370,8 @@ async fn interrupt_active_proposal() {
let BroadcastTopicChannels { broadcasted_messages_receiver: _, broadcast_topic_client } =
subscriber_channels;

let (outbound_internal_sender, _inbound_internal_receiver) = make_streaming_channels();
let (outbound_internal_sender, _inbound_internal_receiver, _mock_network) =
make_streaming_channels();

let mut context = SequencerConsensusContext::new(
Arc::new(batcher),
Expand Down
2 changes: 0 additions & 2 deletions crates/starknet_api/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ impl From<(Transaction, TransactionHash)> for crate::executable_transaction::Tra
#[derive(Copy, Clone, Debug, Eq, PartialEq, Default)]
pub struct TransactionOptions {
/// Transaction that shouldn't be broadcasted to StarkNet. For example, users that want to
/// test the execution result of a transaction without the risk of it being rebroadcasted (the
/// signature will be different while the execution remain the same). Using this flag will
/// modify the transaction version by setting the 128-th bit to 1.
pub only_query: bool,
}

macro_rules! implement_v3_tx_getters {
($(($field:ident, $field_type:ty)),*) => {
$(pub fn $field(&self) -> $field_type {
Expand Down
4 changes: 2 additions & 2 deletions crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::net::SocketAddr;

use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_gateway_types::errors::GatewaySpecError;
Expand Down Expand Up @@ -33,7 +33,7 @@ pub struct FlowTestSetup {
pub sequencer_node_handle: JoinHandle<Result<(), anyhow::Error>>,

// Channels for consensus proposals, used for asserting the right transactions are proposed.
pub consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
pub consensus_proposals_channels: BroadcastTopicChannels<StreamMessage<ProposalPart>>,
}

impl FlowTestSetup {
Expand Down
10 changes: 6 additions & 4 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransac
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_configs_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_protobuf::consensus::{ProposalPart, StreamMessage};
use papyrus_storage::StorageConfig;
use starknet_api::block::BlockNumber;
use starknet_api::contract_address;
Expand Down Expand Up @@ -42,7 +42,7 @@ pub async fn create_config(
chain_info: ChainInfo,
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<ProposalPart>) {
) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let fee_token_addresses = chain_info.fee_token_addresses.clone();
let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info.clone()).await;
Expand Down Expand Up @@ -72,14 +72,16 @@ pub async fn create_config(

fn create_consensus_manager_configs_and_channels(
n_managers: usize,
) -> (Vec<ConsensusManagerConfig>, BroadcastTopicChannels<ProposalPart>) {
) -> (Vec<ConsensusManagerConfig>, BroadcastTopicChannels<StreamMessage<ProposalPart>>) {
let (network_configs, broadcast_channels) =
create_network_configs_connected_to_broadcast_channels(
n_managers,
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::CONSENSUS_PROPOSALS_TOPIC,
// TODO(guyn): return this to NETWORK_TOPIC once we have integrated streaming.
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC2,
),
);
// TODO: Need to also add a channel for votes, in addition to the proposals channel.

let consensus_manager_configs = network_configs
.into_iter()
Expand Down
Loading

0 comments on commit 87dcb06

Please sign in to comment.