Skip to content

Commit

Permalink
chore(consensus): update logs in shc (#3100)
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware authored Jan 6, 2025
1 parent 434c5e9 commit 7ea07f7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 43 deletions.
68 changes: 28 additions & 40 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl PartialEq for ShcTask {

impl ShcTask {
pub async fn run(self) -> ShcEvent {
trace!("Running task: {:?}", self);
match self {
ShcTask::TimeoutPropose(duration, event) => {
tokio::time::sleep(duration).await;
Expand Down Expand Up @@ -198,12 +199,11 @@ impl SingleHeightConsensus {
}
}

#[instrument(skip_all, fields(height=self.height.0), level = "debug")]
#[instrument(skip_all)]
pub(crate) async fn start<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
) -> Result<ShcReturn, ConsensusError> {
info!("Starting consensus with validators {:?}", self.validators);
context.set_height_and_round(self.height, self.state_machine.round()).await;
let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) };
let events = self.state_machine.start(&leader_fn);
Expand All @@ -214,11 +214,7 @@ impl SingleHeightConsensus {

/// Process the proposal init and initiate block validation. See [`ShcTask::ValidateProposal`]
/// for more details on the full proposal flow.
#[instrument(
skip_all,
fields(height = %self.height),
level = "debug",
)]
#[instrument(skip_all)]
pub(crate) async fn handle_proposal<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
Expand All @@ -235,9 +231,8 @@ impl SingleHeightConsensus {
return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg));
}
if init.proposer != proposer_id {
let msg =
format!("invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
return Err(ConsensusError::InvalidProposal(proposer_id, self.height, msg));
warn!("invalid proposer: expected {:?}, got {:?}", proposer_id, init.proposer);
return Ok(ShcReturn::Tasks(Vec::new()));
}
let Entry::Vacant(proposal_entry) = self.proposals.entry(init.round) else {
warn!("Round {} already has a proposal, ignoring", init.round);
Expand All @@ -253,6 +248,7 @@ impl SingleHeightConsensus {
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)]))
}

#[instrument(skip_all)]
pub async fn handle_event<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
Expand Down Expand Up @@ -301,7 +297,7 @@ impl SingleHeightConsensus {
// TODO(matan): Switch to signature validation.
if built_id != received_fin.as_ref().map(|fin| fin.proposal_content_id) {
warn!(
"proposal_id built from content receiver does not match fin: {:#064x?} != \
"proposal_id built from content received does not match fin: {:#064x?} != \
{:#064x?}",
built_id, received_fin
);
Expand All @@ -311,11 +307,10 @@ impl SingleHeightConsensus {
// this round. While this prevents spam attacks it also prevents re-receiving after
// a network issue.
let old = self.proposals.insert(round, built_id);
assert!(
old.expect("Should exist from init").is_none(),
"Proposal already exists for this round: {:?}",
round
);
let old = old
.unwrap_or_else(|| panic!("Proposal should exist from init. round: {round}"));
assert!(old.is_none(), "Proposal already exists for this round: {round}");
info!("Validated proposal: {:?} in round {round}", built_id);
let leader_fn =
|round: Round| -> ValidatorId { context.proposer(self.height, round) };
let sm_events = self.state_machine.handle_event(
Expand All @@ -326,7 +321,8 @@ impl SingleHeightConsensus {
}
ShcEvent::BuildProposal(StateMachineEvent::GetProposal(proposal_id, round)) => {
let old = self.proposals.insert(round, proposal_id);
assert!(old.is_none(), "There should be no entry for this round.");
assert!(old.is_none(), "There should be no entry for round {round} when proposing");
info!("Built proposal: {:?} in round {round}", proposal_id);
let leader_fn =
|round: Round| -> ValidatorId { context.proposer(self.height, round) };
let sm_events = self
Expand All @@ -347,8 +343,9 @@ impl SingleHeightConsensus {
context: &mut ContextT,
vote: Vote,
) -> Result<ShcReturn, ConsensusError> {
debug!("Received vote: {:?}", vote);
if !self.validators.contains(&vote.voter) {
debug!("Ignoring vote from voter not in validators: vote={:?}", vote);
debug!("Ignoring vote from non validator: vote={:?}", vote);
return Ok(ShcReturn::Tasks(Vec::new()));
}

Expand All @@ -368,13 +365,14 @@ impl SingleHeightConsensus {
Entry::Occupied(entry) => {
let old = entry.get();
if old.block_hash != vote.block_hash {
return Err(ConsensusError::Equivocation(self.height, old.clone(), vote));
return Err(ConsensusError::Equivocation(old.clone(), vote));
} else {
// Replay, ignore.
return Ok(ShcReturn::Tasks(Vec::new()));
}
}
}
info!("Received vote: {:?}", vote);
let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) };
let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn);
let ret = self.handle_state_machine_events(context, sm_events).await;
Expand All @@ -383,15 +381,14 @@ impl SingleHeightConsensus {
}

// Handle events output by the state machine.
#[instrument(skip_all)]
async fn handle_state_machine_events<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
mut events: VecDeque<StateMachineEvent>,
) -> Result<ShcReturn, ConsensusError> {
let mut ret_val = Vec::new();
while let Some(event) = events.pop_front() {
trace!("Handling event: {:?}", event);
trace!("Handling sm event: {:?}", event);
match event {
StateMachineEvent::GetProposal(proposal_id, round) => {
ret_val.extend(
Expand Down Expand Up @@ -443,7 +440,6 @@ impl SingleHeightConsensus {

/// Initiate block building. See [`ShcTask::BuildProposal`] for more details on the full
/// proposal flow.
#[instrument(skip(self, context), level = "debug")]
async fn handle_state_machine_get_proposal<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
Expand All @@ -465,7 +461,6 @@ impl SingleHeightConsensus {
vec![ShcTask::BuildProposal(round, fin_receiver)]
}

#[instrument(skip(self, context), level = "debug")]
async fn handle_state_machine_proposal<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
Expand All @@ -474,17 +469,19 @@ impl SingleHeightConsensus {
valid_round: Option<Round>,
) {
let Some(valid_round) = valid_round else {
// Newly built so already streamed out.
// Newly built proposals are handled by the BuildProposal flow.
return;
};
let proposal_id = proposal_id.expect("Reproposal must have a valid ID");

let id = self
.proposals
.get(&valid_round)
.expect("proposals should have proposal for valid_round")
.expect("proposal should not be None");
assert_eq!(id, proposal_id, "proposal should match the stored proposal");
.unwrap_or_else(|| panic!("A proposal should exist for valid_round: {valid_round}"))
.unwrap_or_else(|| {
panic!("A valid proposal should exist for valid_round: {valid_round}")
});
assert_eq!(id, proposal_id, "reproposal should match the stored proposal");
let init = ProposalInit {
height: self.height,
round,
Expand All @@ -493,10 +490,9 @@ impl SingleHeightConsensus {
};
context.repropose(id, init).await;
let old = self.proposals.insert(round, Some(proposal_id));
assert!(old.is_none(), "There should be no entry for this round.");
assert!(old.is_none(), "There should be no proposal for round {round}.");
}

#[instrument(skip_all)]
async fn handle_state_machine_vote<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
Expand Down Expand Up @@ -553,30 +549,22 @@ impl SingleHeightConsensus {
Ok(vec![task])
}

#[instrument(skip_all)]
async fn handle_state_machine_decision(
&mut self,
proposal_id: ProposalContentId,
round: Round,
) -> Result<ShcReturn, ConsensusError> {
let invalid_decision = |msg: String| {
ConsensusError::InternalInconsistency(format!(
"Invalid decision: sm_proposal_id: {:?}, round: {:?}. {}",
proposal_id, round, msg
"Invalid decision: sm_proposal_id: {proposal_id}, round: {round}. {msg}",
))
};
let block = self
.proposals
.remove(&round)
.ok_or_else(|| invalid_decision("No ProposalInit for this round".to_string()))?
.ok_or_else(|| {
// No ProposalInit received for this round.
invalid_decision("Decided on an unknown proposal".to_string())
})?
.ok_or_else(|| {
// Either invalid or validations haven't yet completed.
invalid_decision(
"Decided on a proposal which was not succesfully validated".to_string(),
)
invalid_decision("Invalid or validations haven't yet completed".to_string())
})?;
if block != proposal_id {
return Err(invalid_decision(format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ async fn vote_twice(same_vote: bool) {
if same_vote {
assert_eq!(res, Ok(ShcReturn::Tasks(Vec::new())));
} else {
assert!(matches!(res, Err(ConsensusError::Equivocation(_, _, _))));
assert!(matches!(res, Err(ConsensusError::Equivocation(_, _))));
}

let ShcReturn::Decision(decision) = shc
Expand Down
4 changes: 2 additions & 2 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ pub enum ConsensusError {
InvalidProposal(ValidatorId, BlockNumber, String),
#[error(transparent)]
SendError(#[from] mpsc::SendError),
#[error("Conflicting messages for block {0}. Old: {1:?}, New: {2:?}")]
Equivocation(BlockNumber, Vote, Vote),
#[error("Conflicting votes. Old: {0:?}, New: {1:?}")]
Equivocation(Vote, Vote),
// Indicates an error in communication between consensus and the node's networking component.
// As opposed to an error between this node and peer nodes.
#[error("{0}")]
Expand Down

0 comments on commit 7ea07f7

Please sign in to comment.