Skip to content

Commit

Permalink
refactor(consensus): remove next_message to ensure cancellation safety (
Browse files Browse the repository at this point in the history
#2497)

Previously we could receive a message and then at the await point for continue_propagation or report_peer the select statement could complete another branch resulting in a dropped message.
  • Loading branch information
matan-starkware authored Dec 9, 2024
1 parent b214d2f commit 780935a
Showing 1 changed file with 29 additions and 40 deletions.
69 changes: 29 additions & 40 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt};
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument};

Expand Down Expand Up @@ -148,9 +150,9 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// Loop over incoming proposals, messages, and self generated events.
loop {
let shc_return = tokio::select! {
// TODO(Matan): remove report peer / continue propagation, as they are not cancel safe.
message = next_message(broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
message = broadcast_channels.broadcasted_messages_receiver.next() => {
self.handle_message(
context, height, &mut shc, message, broadcast_channels).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
// Get the first message to verify the init was sent.
Expand Down Expand Up @@ -235,8 +237,29 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: ConsensusMessage,
message: Option<(
Result<ConsensusMessage, ProtobufConversionError>,
BroadcastedMessageMetadata,
)>,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
) -> Result<ShcReturn, ConsensusError> {
let message = match message {
None => Err(ConsensusError::InternalNetworkError(
"NetworkReceiver should never be closed".to_string(),
)),
Some((Ok(msg), metadata)) => {
// TODO(matan): Hold onto report_sender for use in later errors by SHC.
let _ =
broadcast_channels.broadcast_topic_client.continue_propagation(&metadata).await;
Ok(msg)
}
Some((Err(e), metadata)) => {
// Failed to parse consensus message
let _ = broadcast_channels.broadcast_topic_client.report_peer(metadata).await;
Err(e.into())
}
}?;

// TODO(matan): We need to figure out an actual caching strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
Expand All @@ -248,15 +271,8 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
return Ok(ShcReturn::Tasks(Vec::new()));
}
match message {
ConsensusMessage::Proposal(_) => Err(ConsensusError::InternalNetworkError(
"Proposal variant of ConsensusMessage no longer supported".to_string(),
)),
_ => {
let res = shc.handle_message(context, message).await?;
Ok(res)
}
}

shc.handle_message(context, message).await
}

// Checks if a cached proposal already exists
Expand Down Expand Up @@ -300,33 +316,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
}

async fn next_message(
broadcast_channels: &mut BroadcastConsensusMessageChannel,
) -> Result<ConsensusMessage, ConsensusError> {
let BroadcastConsensusMessageChannel { broadcasted_messages_receiver, broadcast_topic_client } =
broadcast_channels;

let (msg, broadcasted_message_metadata) =
broadcasted_messages_receiver.next().await.ok_or_else(|| {
ConsensusError::InternalNetworkError(
"NetworkReceiver should never be closed".to_string(),
)
})?;
match msg {
// TODO(matan): Return report_sender for use in later errors by SHC.
Ok(msg) => {
let _ =
broadcast_topic_client.continue_propagation(&broadcasted_message_metadata).await;
Ok(msg)
}
Err(e) => {
// Failed to parse consensus message
let _ = broadcast_topic_client.report_peer(broadcasted_message_metadata).await;
Err(e.into())
}
}
}

// Return only when a height is reached that is greater than or equal to the current height.
async fn sync_height<SyncReceiverT>(
height: BlockNumber,
Expand Down

0 comments on commit 780935a

Please sign in to comment.