Skip to content

Commit

Permalink
feat(consensus): implement ComponentStarter for the consensus manager…
Browse files Browse the repository at this point in the history
… component (#1199)

Run node with:
```
cargo run --package starknet_mempool_node --bin starknet_mempool_node -- \
  --consensus_manager_config.consensus_config.validator_id 0x0 \
  --batcher_config.storage.db_config.enforce_file_exists false
```
  • Loading branch information
matan-starkware authored Oct 13, 2024
1 parent a1b7a01 commit c161cca
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 1 deletion.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/consensus_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,17 @@ workspace = true

[dependencies]
async-trait.workspace = true
futures.workspace = true
libp2p.workspace = true
papyrus_config.workspace = true
papyrus_consensus.workspace = true
papyrus_consensus_orchestrator.workspace = true
papyrus_network.workspace = true
papyrus_network_types.workspace = true
papyrus_protobuf.workspace = true
serde.workspace = true
starknet_batcher_types.workspace = true
starknet_consensus_manager_types.workspace = true
starknet_mempool_infra.workspace = true
tracing.workspace = true
validator.workspace = true
66 changes: 65 additions & 1 deletion crates/consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
use std::any::type_name;
use std::sync::Arc;

use async_trait::async_trait;
use futures::channel::mpsc::{self, SendError};
use futures::future::Ready;
use futures::SinkExt;
use libp2p::PeerId;
use papyrus_consensus::types::{BroadcastConsensusMessageChannel, ConsensusError};
use papyrus_consensus_orchestrator::sequencer_consensus_context::SequencerConsensusContext;
use papyrus_network::network_manager::BroadcastTopicClient;
use papyrus_network_types::network_types::BroadcastedMessageManager;
use papyrus_protobuf::consensus::ConsensusMessage;
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_mempool_infra::component_definitions::ComponentStarter;
use starknet_mempool_infra::errors::ComponentError;
use tracing::{error, info};

use crate::config::ConsensusManagerConfig;

Expand All @@ -15,6 +30,46 @@ impl ConsensusManager {
pub fn new(config: ConsensusManagerConfig, batcher_client: SharedBatcherClient) -> Self {
Self { config, batcher_client }
}

pub async fn run(&self) -> Result<(), ConsensusError> {
let context = SequencerConsensusContext::new(
Arc::clone(&self.batcher_client),
self.config.consensus_config.num_validators,
);

papyrus_consensus::run_consensus(
context,
self.config.consensus_config.start_height,
self.config.consensus_config.validator_id,
self.config.consensus_config.consensus_delay,
self.config.consensus_config.timeouts.clone(),
create_fake_network_channels(),
futures::stream::pending(),
)
.await
}
}

// Milestone 1:
// We want to only run 1 node (e.g. no network), implying the local node can reach a quorum
// alone and is always the proposer. Actually connecting to the network will require an external
// dependency.
fn create_fake_network_channels() -> BroadcastConsensusMessageChannel {
let messages_to_broadcast_fn: fn(ConsensusMessage) -> Ready<Result<Vec<u8>, SendError>> =
|_| todo!("messages_to_broadcast_sender should not be used");
let reported_messages_sender_fn: fn(
BroadcastedMessageManager,
) -> Ready<Result<PeerId, SendError>> =
|_| todo!("messages_to_broadcast_sender should not be used");
let broadcast_topic_client = BroadcastTopicClient::new(
mpsc::channel(0).0.with(messages_to_broadcast_fn),
mpsc::channel(0).0.with(reported_messages_sender_fn),
mpsc::channel(0).0,
);
BroadcastConsensusMessageChannel {
broadcasted_messages_receiver: Box::new(futures::stream::pending()),
broadcast_topic_client,
}
}

pub fn create_consensus_manager(
Expand All @@ -24,4 +79,13 @@ pub fn create_consensus_manager(
ConsensusManager::new(config, batcher_client)
}

impl ComponentStarter for ConsensusManager {}
#[async_trait]
impl ComponentStarter for ConsensusManager {
async fn start(&mut self) -> Result<(), ComponentError> {
info!("Starting component {}.", type_name::<Self>());
self.run().await.map_err(|e| {
error!("Error running component ConsensusManager: {:?}", e);
ComponentError::InternalComponentError
})
}
}
23 changes: 23 additions & 0 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use futures::channel::{mpsc, oneshot};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::executable_transaction::Transaction as ExecutableTransaction;
use starknet_api::transaction::Transaction;

use crate::converters::ProtobufConversionError;
Expand Down Expand Up @@ -111,3 +112,25 @@ impl From<ProposalWrapper>
(proposal_init, content_receiver, fin_receiver)
}
}

impl From<ProposalWrapper>
for (
(BlockNumber, u32, ContractAddress, Option<u32>),
mpsc::Receiver<Vec<ExecutableTransaction>>,
oneshot::Receiver<BlockHash>,
)
{
fn from(val: ProposalWrapper) -> Self {
let proposal_init =
(BlockNumber(val.0.height), val.0.round, val.0.proposer, val.0.valid_round);

let (_, content_receiver) = mpsc::channel(0);
// This should only be used for Milestone 1, and then removed once streaming is supported.
println!("Cannot build ExecutableTransaction from Transaction.");

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(val.0.block_hash).expect("Send should succeed");

(proposal_init, content_receiver, fin_receiver)
}
}

0 comments on commit c161cca

Please sign in to comment.