From 7cbebd87bfbe3b093e89e35a2309b9f1148c7945 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Mon, 29 Jul 2024 19:57:42 +0300 Subject: [PATCH] refactor(consensus): consensus takes a generic network receiver --- .../sequencing/papyrus_consensus/src/lib.rs | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/lib.rs b/crates/sequencing/papyrus_consensus/src/lib.rs index 1bd16070702..2ef8355553e 100644 --- a/crates/sequencing/papyrus_consensus/src/lib.rs +++ b/crates/sequencing/papyrus_consensus/src/lib.rs @@ -6,9 +6,11 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; +use futures::Stream; use papyrus_common::metrics as papyrus_metrics; -use papyrus_network::network_manager::BroadcastSubscriberReceiver; +use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; +use papyrus_protobuf::converters::ProtobufConversionError; use single_height_consensus::SingleHeightConsensus; use starknet_api::block::{BlockHash, BlockNumber}; use tracing::{debug, info, instrument}; @@ -39,14 +41,18 @@ use futures::StreamExt; #[instrument(skip(context, validator_id, network_receiver, cached_messages), level = "info")] #[allow(missing_docs)] -async fn run_height>( +async fn run_height( context: &mut ContextT, height: BlockNumber, validator_id: ValidatorId, - network_receiver: &mut BroadcastSubscriberReceiver, + network_receiver: &mut NetworkReceiverT, cached_messages: &mut Vec, ) -> Result, ConsensusError> where + BlockT: ConsensusBlock, + ContextT: ConsensusContext, + NetworkReceiverT: + Stream, ReportSender)> + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { @@ -106,14 +112,18 @@ where // TODO(dvir): add test for this. #[instrument(skip(context, start_height, network_receiver), level = "info")] #[allow(missing_docs)] -pub async fn run_consensus>( +pub async fn run_consensus( mut context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, consensus_delay: Duration, - mut network_receiver: BroadcastSubscriberReceiver, + mut network_receiver: NetworkReceiverT, ) -> Result<(), ConsensusError> where + BlockT: ConsensusBlock, + ContextT: ConsensusContext, + NetworkReceiverT: + Stream, ReportSender)> + Unpin, ProposalWrapper: Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, {