diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index fac2dce9e2..70994fe7a7 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -98,6 +98,7 @@ impl PartialEq for ShcTask { impl ShcTask { pub async fn run(self) -> ShcEvent { + trace!("Running task: {:?}", self); match self { ShcTask::TimeoutPropose(duration, event) => { tokio::time::sleep(duration).await; @@ -198,12 +199,11 @@ impl SingleHeightConsensus { } } - #[instrument(skip_all, fields(height=self.height.0), level = "debug")] + #[instrument(skip_all)] pub(crate) async fn start( &mut self, context: &mut ContextT, ) -> Result { - info!("Starting consensus with validators {:?}", self.validators); context.set_height_and_round(self.height, self.state_machine.round()).await; let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let events = self.state_machine.start(&leader_fn); @@ -214,11 +214,7 @@ impl SingleHeightConsensus { /// Process the proposal init and initiate block validation. See [`ShcTask::ValidateProposal`] /// for more details on the full proposal flow. - #[instrument( - skip_all, - fields(height = %self.height), - level = "debug", - )] + #[instrument(skip_all)] pub(crate) async fn handle_proposal( &mut self, context: &mut ContextT, @@ -235,9 +231,8 @@ impl SingleHeightConsensus { return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg)); } if init.proposer != proposer_id { - let msg = - format!("invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer); - return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg)); + warn!("invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer); + return Ok(ShcReturn::Tasks(Vec::new())); } let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else { warn!("Round {} already has a proposal, ignoring", init.round); @@ -253,6 +248,7 @@ impl SingleHeightConsensus { Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)])) } + #[instrument(skip_all)] pub async fn handle_event( &mut self, context: &mut ContextT, @@ -301,7 +297,7 @@ impl SingleHeightConsensus { // TODO(matan): Switch to signature validation. if built_id != received_fin.as_ref().map(|fin| fin.proposal_content_id) { warn!( - "proposal_id built from content receiver does not match fin: {:#064x?} != \ + "proposal_id built from content received does not match fin: {:#064x?} != \ {:#064x?}", built_id, received_fin ); @@ -311,11 +307,10 @@ impl SingleHeightConsensus { // this round. While this prevents spam attacks it also prevents re-receiving after // a network issue. let old = self.proposals.insert(round, built_id); - assert!( - old.expect("Should exist from init").is_none(), - "Proposal already exists for this round: {:?}", - round - ); + let old = old + .unwrap_or_else(|| panic!("Proposal should exist from init. round: {round}")); + assert!(old.is_none(), "Proposal already exists for this round: {round}"); + info!("Validated proposal: {:?} in round {round}", built_id); let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let sm_events = self.state_machine.handle_event( @@ -326,7 +321,8 @@ impl SingleHeightConsensus { } ShcEvent::BuildProposal(StateMachineEvent::GetProposal(proposal_id, round)) => { let old = self.proposals.insert(round, proposal_id); - assert!(old.is_none(), "There should be no entry for this round."); + assert!(old.is_none(), "There should be no entry for round {round} when proposing"); + info!("Built proposal: {:?} in round {round}", proposal_id); let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let sm_events = self @@ -347,8 +343,9 @@ impl SingleHeightConsensus { context: &mut ContextT, vote: Vote, ) -> Result { + debug!("Received vote: {:?}", vote); if !self.validators.contains(&vote.voter) { - debug!("Ignoring vote from voter not in validators: vote={:?}", vote); + debug!("Ignoring vote from non validator: vote={:?}", vote); return Ok(ShcReturn::Tasks(Vec::new())); } @@ -368,13 +365,14 @@ impl SingleHeightConsensus { Entry::Occupied(entry) => { let old = entry.get(); if old.block_hash != vote.block_hash { - return Err(ConsensusError::Equivocation(self.height, old.clone(), vote)); + return Err(ConsensusError::Equivocation(old.clone(), vote)); } else { // Replay, ignore. return Ok(ShcReturn::Tasks(Vec::new())); } } } + info!("Received vote: {:?}", vote); let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) }; let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn); let ret = self.handle_state_machine_events(context, sm_events).await; @@ -383,7 +381,6 @@ impl SingleHeightConsensus { } // Handle events output by the state machine. - #[instrument(skip_all)] async fn handle_state_machine_events( &mut self, context: &mut ContextT, @@ -391,7 +388,7 @@ impl SingleHeightConsensus { ) -> Result { let mut ret_val = Vec::new(); while let Some(event) = events.pop_front() { - trace!("Handling event: {:?}", event); + trace!("Handling sm event: {:?}", event); match event { StateMachineEvent::GetProposal(proposal_id, round) => { ret_val.extend( @@ -443,7 +440,6 @@ impl SingleHeightConsensus { /// Initiate block building. See [`ShcTask::BuildProposal`] for more details on the full /// proposal flow. - #[instrument(skip(self, context), level = "debug")] async fn handle_state_machine_get_proposal( &mut self, context: &mut ContextT, @@ -465,7 +461,6 @@ impl SingleHeightConsensus { vec![ShcTask::BuildProposal(round, fin_receiver)] } - #[instrument(skip(self, context), level = "debug")] async fn handle_state_machine_proposal( &mut self, context: &mut ContextT, @@ -474,7 +469,7 @@ impl SingleHeightConsensus { valid_round: Option, ) { let Some(valid_round) = valid_round else { - // Newly built so already streamed out. + // Newly built proposals are handled by the BuildProposal flow. return; }; let proposal_id = proposal_id.expect("Reproposal must have a valid ID"); @@ -482,9 +477,11 @@ impl SingleHeightConsensus { let id = self .proposals .get(&valid_round) - .expect("proposals should have proposal for valid_round") - .expect("proposal should not be None"); - assert_eq!(id, proposal_id, "proposal should match the stored proposal"); + .unwrap_or_else(|| panic!("A proposal should exist for valid_round: {valid_round}")) + .unwrap_or_else(|| { + panic!("A valid proposal should exist for valid_round: {valid_round}") + }); + assert_eq!(id, proposal_id, "reproposal should match the stored proposal"); let init = ProposalInit { height: self.height, round, @@ -493,10 +490,9 @@ impl SingleHeightConsensus { }; context.repropose(id, init).await; let old = self.proposals.insert(round, Some(proposal_id)); - assert!(old.is_none(), "There should be no entry for this round."); + assert!(old.is_none(), "There should be no proposal for round {round}."); } - #[instrument(skip_all)] async fn handle_state_machine_vote( &mut self, context: &mut ContextT, @@ -553,7 +549,6 @@ impl SingleHeightConsensus { Ok(vec![task]) } - #[instrument(skip_all)] async fn handle_state_machine_decision( &mut self, proposal_id: ProposalContentId, @@ -561,22 +556,15 @@ impl SingleHeightConsensus { ) -> Result { let invalid_decision = |msg: String| { ConsensusError::InternalInconsistency(format!( - "Invalid decision: sm_proposal_id: {:?}, round: {:?}. {}", - proposal_id, round, msg + "Invalid decision: sm_proposal_id: {proposal_id}, round: {round}. {msg}", )) }; let block = self .proposals .remove(&round) + .ok_or_else(|| invalid_decision("No ProposalInit for this round".to_string()))? .ok_or_else(|| { - // No ProposalInit received for this round. - invalid_decision("Decided on an unknown proposal".to_string()) - })? - .ok_or_else(|| { - // Either invalid or validations haven't yet completed. - invalid_decision( - "Decided on a proposal which was not succesfully validated".to_string(), - ) + invalid_decision("Invalid or validations haven't yet completed".to_string()) })?; if block != proposal_id { return Err(invalid_decision(format!( diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 420d434a29..a115f449fe 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -275,7 +275,7 @@ async fn vote_twice(same_vote: bool) { if same_vote { assert_eq!(res, Ok(ShcReturn::Tasks(Vec::new()))); } else { - assert!(matches!(res, Err(ConsensusError::Equivocation(_, _, _)))); + assert!(matches!(res, Err(ConsensusError::Equivocation(_, _)))); } let ShcReturn::Decision(decision) = shc diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index 19dd87bee1..e5c4109233 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -166,8 +166,8 @@ pub enum ConsensusError { InvalidProposal(ValidatorId, BlockNumber, String), #[error(transparent)] SendError(#[from] mpsc::SendError), - #[error("Conflicting messages for block {0}. Old: {1:?}, New: {2:?}")] - Equivocation(BlockNumber, Vote, Vote), + #[error("Conflicting votes. Old: {0:?}, New: {1:?}")] + Equivocation(Vote, Vote), // Indicates an error in communication between consensus and the node's networking component. // As opposed to an error between this node and peer nodes. #[error("{0}")]