diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 70fff41bb7..71d3e5f350 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -125,7 +125,7 @@ fn run_consensus( let start_height = config.start_height; Ok(tokio::spawn(papyrus_consensus::run_consensus( - Arc::new(context), + context, start_height, validator_id, consensus_channels.broadcasted_messages_receiver, diff --git a/crates/sequencing/papyrus_consensus/src/lib.rs b/crates/sequencing/papyrus_consensus/src/lib.rs index a14bb0043c..bddadd70aa 100644 --- a/crates/sequencing/papyrus_consensus/src/lib.rs +++ b/crates/sequencing/papyrus_consensus/src/lib.rs @@ -3,8 +3,6 @@ // TODO(Matan): fix #[allow(missing_docs)]. //! A consensus implementation for a [`Starknet`](https://www.starknet.io/) node. -use std::sync::Arc; - use futures::channel::{mpsc, oneshot}; use papyrus_common::metrics as papyrus_metrics; use papyrus_network::network_manager::BroadcastSubscriberReceiver; @@ -37,8 +35,8 @@ use futures::StreamExt; #[instrument(skip(context, validator_id, network_receiver, cached_messages), level = "info")] #[allow(missing_docs)] -async fn run_height( - context: Arc>, +async fn run_height>( + context: &ContextT, height: BlockNumber, validator_id: ValidatorId, network_receiver: &mut BroadcastSubscriberReceiver, @@ -49,10 +47,9 @@ where Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, { let validators = context.validators(height).await; - let mut shc = - SingleHeightConsensus::new(Arc::clone(&context), height, validator_id, validators); + let mut shc = SingleHeightConsensus::new(height, validator_id, validators); - if let Some(decision) = shc.start().await? { + if let Some(decision) = shc.start(context).await? { return Ok(decision); } @@ -89,9 +86,9 @@ where // Special case due to fake streaming. let (proposal_init, content_receiver, fin_receiver) = ProposalWrapper(proposal).into(); - shc.handle_proposal(proposal_init, content_receiver, fin_receiver).await? + shc.handle_proposal(context, proposal_init, content_receiver, fin_receiver).await? } - _ => shc.handle_message(message).await?, + _ => shc.handle_message(context, message).await?, }; if let Some(decision) = maybe_decision { @@ -103,8 +100,8 @@ where // TODO(dvir): add test for this. #[instrument(skip(context, start_height, network_receiver), level = "info")] #[allow(missing_docs)] -pub async fn run_consensus( - context: Arc>, +pub async fn run_consensus>( + context: ContextT, start_height: BlockNumber, validator_id: ValidatorId, mut network_receiver: BroadcastSubscriberReceiver, @@ -117,7 +114,7 @@ where let mut future_messages = Vec::new(); loop { let decision = run_height( - Arc::clone(&context), + &context, current_height, validator_id, &mut network_receiver, diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index 34f69d9ab9..5eca5ee6cb 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -3,7 +3,6 @@ mod single_height_consensus_test; use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; use futures::channel::{mpsc, oneshot}; use papyrus_protobuf::consensus::{ConsensusMessage, Vote, VoteType}; @@ -29,7 +28,6 @@ const ROUND_ZERO: Round = 0; /// out messages "directly" to the network, and returning a decision to the caller. pub(crate) struct SingleHeightConsensus { height: BlockNumber, - context: Arc>, validators: Vec, id: ValidatorId, state_machine: StateMachine, @@ -39,17 +37,11 @@ pub(crate) struct SingleHeightConsensus { } impl SingleHeightConsensus { - pub(crate) fn new( - context: Arc>, - height: BlockNumber, - id: ValidatorId, - validators: Vec, - ) -> Self { + pub(crate) fn new(height: BlockNumber, id: ValidatorId, validators: Vec) -> Self { // TODO(matan): Use actual weights, not just `len`. let state_machine = StateMachine::new(validators.len() as u32); Self { height, - context, validators, id, state_machine, @@ -59,22 +51,26 @@ impl SingleHeightConsensus { } } - #[instrument(skip(self), fields(height=self.height.0), level = "debug")] - pub(crate) async fn start(&mut self) -> Result>, ConsensusError> { + #[instrument(skip_all, fields(height=self.height.0), level = "debug")] + pub(crate) async fn start>( + &mut self, + context: &ContextT, + ) -> Result>, ConsensusError> { info!("Starting consensus with validators {:?}", self.validators); let events = self.state_machine.start(); - self.handle_state_machine_events(events).await + self.handle_state_machine_events(context, events).await } /// Receive a proposal from a peer node. Returns only once the proposal has been fully received /// and processed. #[instrument( - skip(self, init, p2p_messages_receiver, fin_receiver), + skip_all, fields(height = %self.height), level = "debug", )] - pub(crate) async fn handle_proposal( + pub(crate) async fn handle_proposal>( &mut self, + context: &ContextT, init: ProposalInit, p2p_messages_receiver: mpsc::Receiver<::ProposalChunk>, fin_receiver: oneshot::Receiver, @@ -83,7 +79,7 @@ impl SingleHeightConsensus { "Received proposal: proposal_height={}, proposer={:?}", init.height.0, init.proposer ); - let proposer_id = self.context.proposer(&self.validators, self.height); + let proposer_id = context.proposer(&self.validators, self.height); if init.height != self.height { let msg = format!("invalid height: expected {:?}, got {:?}", self.height, init.height); return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg)); @@ -94,8 +90,7 @@ impl SingleHeightConsensus { return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg)); } - let block_receiver = - self.context.validate_proposal(self.height, p2p_messages_receiver).await; + let block_receiver = context.validate_proposal(self.height, p2p_messages_receiver).await; // TODO(matan): Actual Tendermint should handle invalid proposals. let block = block_receiver.await.map_err(|_| { ConsensusError::InvalidProposal( @@ -124,13 +119,14 @@ impl SingleHeightConsensus { // TODO(matan): Handle multiple rounds. self.proposals.insert(ROUND_ZERO, block); let sm_events = self.state_machine.handle_event(sm_proposal); - self.handle_state_machine_events(sm_events).await + self.handle_state_machine_events(context, sm_events).await } /// Handle messages from peer nodes. #[instrument(skip_all)] - pub(crate) async fn handle_message( + pub(crate) async fn handle_message>( &mut self, + context: &ContextT, message: ConsensusMessage, ) -> Result>, ConsensusError> { debug!("Received message: {:?}", message); @@ -138,13 +134,14 @@ impl SingleHeightConsensus { ConsensusMessage::Proposal(_) => { unimplemented!("Proposals should use `handle_proposal` due to fake streaming") } - ConsensusMessage::Vote(vote) => self.handle_vote(vote).await, + ConsensusMessage::Vote(vote) => self.handle_vote(context, vote).await, } } #[instrument(skip_all)] - async fn handle_vote( + async fn handle_vote>( &mut self, + context: &ContextT, vote: Vote, ) -> Result>, ConsensusError> { let (votes, sm_vote) = match vote.vote_type { @@ -170,13 +167,14 @@ impl SingleHeightConsensus { votes.insert((ROUND_ZERO, vote.voter), vote); let sm_events = self.state_machine.handle_event(sm_vote); - self.handle_state_machine_events(sm_events).await + self.handle_state_machine_events(context, sm_events).await } // Handle events output by the state machine. #[instrument(skip_all)] - async fn handle_state_machine_events( + async fn handle_state_machine_events>( &mut self, + context: &ContextT, mut events: VecDeque, ) -> Result>, ConsensusError> { while let Some(event) = events.pop_front() { @@ -184,7 +182,9 @@ impl SingleHeightConsensus { match event { StateMachineEvent::StartRound(block_hash, round) => { events.append( - &mut self.handle_state_machine_start_round(block_hash, round).await, + &mut self + .handle_state_machine_start_round(context, block_hash, round) + .await, ); } StateMachineEvent::Proposal(_, _) => { @@ -195,37 +195,39 @@ impl SingleHeightConsensus { return self.handle_state_machine_decision(block_hash, round).await; } StateMachineEvent::Prevote(block_hash, round) => { - self.handle_state_machine_vote(block_hash, round, VoteType::Prevote).await?; + self.handle_state_machine_vote(context, block_hash, round, VoteType::Prevote) + .await?; } StateMachineEvent::Precommit(block_hash, round) => { - self.handle_state_machine_vote(block_hash, round, VoteType::Precommit).await?; + self.handle_state_machine_vote(context, block_hash, round, VoteType::Precommit) + .await?; } } } Ok(None) } - #[instrument(skip(self), level = "debug")] - async fn handle_state_machine_start_round( + #[instrument(skip(self, context), level = "debug")] + async fn handle_state_machine_start_round>( &mut self, + context: &ContextT, block_hash: Option, round: Round, ) -> VecDeque { // TODO(matan): Support re-proposing validValue. assert!(block_hash.is_none(), "Reproposing is not yet supported"); - let proposer_id = self.context.proposer(&self.validators, self.height); + let proposer_id = context.proposer(&self.validators, self.height); if proposer_id != self.id { debug!("Validator"); return self.state_machine.handle_event(StateMachineEvent::StartRound(None, round)); } debug!("Proposer"); - let (p2p_messages_receiver, block_receiver) = - self.context.build_proposal(self.height).await; + let (p2p_messages_receiver, block_receiver) = context.build_proposal(self.height).await; let (fin_sender, fin_receiver) = oneshot::channel(); let init = ProposalInit { height: self.height, proposer: self.id }; // Peering is a permanent component, so if sending to it fails we cannot continue. - self.context + context .propose(init, p2p_messages_receiver, fin_receiver) .await .expect("Failed sending Proposal to Peering"); @@ -245,8 +247,9 @@ impl SingleHeightConsensus { } #[instrument(skip_all)] - async fn handle_state_machine_vote( + async fn handle_state_machine_vote>( &mut self, + context: &ContextT, block_hash: BlockHash, round: Round, vote_type: VoteType, @@ -260,7 +263,7 @@ impl SingleHeightConsensus { // TODO(matan): Consider refactoring not to panic, rather log and return the error. panic!("State machine should not send repeat votes: old={:?}, new={:?}", old, vote); } - self.context.broadcast(ConsensusMessage::Vote(vote)).await?; + context.broadcast(ConsensusMessage::Vote(vote)).await?; Ok(None) } diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 57f28443a7..e04681637b 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -54,17 +54,16 @@ async fn proposer() { .returning(move |_| Ok(())); let mut shc = SingleHeightConsensus::new( - Arc::new(context), BlockNumber(0), node_id, vec![node_id, 2_u32.into(), 3_u32.into(), 4_u32.into()], ); // Sends proposal and prevote. - assert!(matches!(shc.start().await, Ok(None))); + assert!(matches!(shc.start(&context).await, Ok(None))); - assert_eq!(shc.handle_message(prevote(block.id(), 0, 2_u32.into())).await, Ok(None)); - assert_eq!(shc.handle_message(prevote(block.id(), 0, 3_u32.into())).await, Ok(None)); + assert_eq!(shc.handle_message(&context, prevote(block.id(), 0, 2_u32.into())).await, Ok(None)); + assert_eq!(shc.handle_message(&context, prevote(block.id(), 0, 3_u32.into())).await, Ok(None)); let precommits = vec![ precommit(block.id(), 0, 1_u32.into()), @@ -72,9 +71,9 @@ async fn proposer() { precommit(block.id(), 0, 2_u32.into()), precommit(block.id(), 0, 3_u32.into()), ]; - assert_eq!(shc.handle_message(precommits[1].clone()).await, Ok(None)); - assert_eq!(shc.handle_message(precommits[2].clone()).await, Ok(None)); - let decision = shc.handle_message(precommits[3].clone()).await.unwrap().unwrap(); + assert_eq!(shc.handle_message(&context, precommits[1].clone()).await, Ok(None)); + assert_eq!(shc.handle_message(&context, precommits[2].clone()).await, Ok(None)); + let decision = shc.handle_message(&context, precommits[3].clone()).await.unwrap().unwrap(); assert_eq!(decision.block, block); assert!( decision @@ -116,7 +115,6 @@ async fn validator() { // Creation calls to `context.validators`. let mut shc = SingleHeightConsensus::new( - Arc::new(context), BlockNumber(0), node_id, vec![node_id, proposer, 3_u32.into(), 4_u32.into()], @@ -128,6 +126,7 @@ async fn validator() { let res = shc .handle_proposal( + &context, ProposalInit { height: BlockNumber(0), proposer }, mpsc::channel(1).1, // content - ignored by SHC. fin_receiver, @@ -135,16 +134,16 @@ async fn validator() { .await; assert_eq!(res, Ok(None)); - assert_eq!(shc.handle_message(prevote(block.id(), 0, 2_u32.into())).await, Ok(None)); - assert_eq!(shc.handle_message(prevote(block.id(), 0, 3_u32.into())).await, Ok(None)); + assert_eq!(shc.handle_message(&context, prevote(block.id(), 0, 2_u32.into())).await, Ok(None)); + assert_eq!(shc.handle_message(&context, prevote(block.id(), 0, 3_u32.into())).await, Ok(None)); let precommits = vec![ precommit(block.id(), 0, 2_u32.into()), precommit(block.id(), 0, 3_u32.into()), precommit(block.id(), 0, node_id), ]; - assert_eq!(shc.handle_message(precommits[0].clone()).await, Ok(None)); - let decision = shc.handle_message(precommits[1].clone()).await.unwrap().unwrap(); + assert_eq!(shc.handle_message(&context, precommits[0].clone()).await, Ok(None)); + let decision = shc.handle_message(&context, precommits[1].clone()).await.unwrap().unwrap(); assert_eq!(decision.block, block); assert!( decision diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 17efce9736..b2e2c55a30 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -1,7 +1,3 @@ -#[cfg(test)] -#[path = "types_test.rs"] -mod types_test; - use std::fmt::Debug; use async_trait::async_trait; @@ -68,14 +64,8 @@ pub trait ConsensusBlock: Send { } /// Interface for consensus to call out to the node. -// Why `Send + Sync`? -// 1. We expect multiple components within consensus to concurrently access the context. -// 2. The other option is for each component to have its own copy (i.e. clone) of the context, but -// this is object unsafe (Clone requires Sized). -// 3. Given that we see the context as basically a connector to other components in the node, the -// limitation of Sync to keep functions `&self` shouldn't be a problem. #[async_trait] -pub trait ConsensusContext: Send + Sync { +pub trait ConsensusContext { /// The [block](`ConsensusBlock`) type built by `ConsensusContext` from a proposal. // We use an associated type since consensus is indifferent to the actual content of a proposal, // but we cannot use generics due to object safety. diff --git a/crates/sequencing/papyrus_consensus/src/types_test.rs b/crates/sequencing/papyrus_consensus/src/types_test.rs deleted file mode 100644 index 4305c70817..0000000000 --- a/crates/sequencing/papyrus_consensus/src/types_test.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::types::ConsensusContext; - -// This should cause compilation to fail if `ConsensusContext` is not object safe. Note that -// `ConsensusBlock` need not be object safe for this to work. -#[test] -fn check_object_safety() { - fn _check_context() -> Box> { - todo!() - } -}