Skip to content

Commit

Permalink
feat(sequencing): cache proposals from greater heights
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 1, 2024
1 parent f515d0c commit d5190a6
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
71 changes: 59 additions & 12 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_C
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand Down Expand Up @@ -89,32 +89,37 @@ where
/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
/// part of the single height consensus algorithm (e.g. messages from future heights).
#[derive(Debug, Default)]
struct MultiHeightManager {
struct MultiHeightManager<ContextT: ConsensusContext> {
validator_id: ValidatorId,
cached_messages: BTreeMap<u64, Vec<ConsensusMessage>>,
cached_proposals: BTreeMap<u64, (ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)>,
timeouts: TimeoutsConfig,
}

impl MultiHeightManager {
impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Create a new consensus manager.
pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self { validator_id, cached_messages: BTreeMap::new(), timeouts }
Self {
validator_id,
cached_messages: BTreeMap::new(),
cached_proposals: BTreeMap::new(),
timeouts,
}
}

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, broadcast_channels), level = "info")]
pub async fn run_height<ContextT>(
pub async fn run_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
<ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,
{
let validators = context.validators(height).await;
Expand All @@ -137,8 +142,26 @@ impl MultiHeightManager {
}

let mut current_height_messages = self.get_current_height_messages(height);

// If there's already a cached proposal, handle that before looping.
if let Some((init, proposal)) = self.get_current_proposal(height) {
let shc_return =
self.handle_proposal(context, height, &mut shc, init, proposal).await?;
// Handle potential tasks like validate the proposal.
match shc_return {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_events.push(task.run());
}
}
}
};

// No cached proposal, loop over incoming proposals, messages, cached messages, and events.
loop {
let shc_return = tokio::select! {
// TODO(Matan): remove report peer / continue propagation, as they are not cancel safe.
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Expand Down Expand Up @@ -170,7 +193,7 @@ impl MultiHeightManager {
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
async fn handle_proposal(
&mut self,
context: &mut ContextT,
height: BlockNumber,
Expand All @@ -182,16 +205,20 @@ impl MultiHeightManager {
ContextT: ConsensusContext,
<ContextT as ConsensusContext>::ProposalPart: std::fmt::Debug,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
if proposal_init.height != height {
// TODO(guyn): add caching of heights for future use.
warn!("Received a proposal for a different height. {:?}", proposal_init);
debug!("Received a proposal for a different height. {:?}", proposal_init);
if proposal_init.height > height {
// Note: this will overwrite an existing content_receiver for this height!
self.cached_proposals
.insert(proposal_init.height.0, (proposal_init, content_receiver));
}
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init.into(), content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
async fn handle_message(
&mut self,
context: &mut ContextT,
height: BlockNumber,
Expand All @@ -201,7 +228,7 @@ impl MultiHeightManager {
where
ContextT: ConsensusContext,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
// TODO(matan): We need to figure out an actual caching strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
Expand All @@ -223,6 +250,26 @@ impl MultiHeightManager {
}
}

// Checks if a cached proposal already exists
// - returns the proposal if it exists and removes it from the cache.
// - returns None if no proposal exists.
// - cleans up any proposals from earlier heights.
fn get_current_proposal(
&mut self,
height: BlockNumber,
) -> Option<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
loop {
let Some(entry) = self.cached_proposals.first_entry() else { return None };
match entry.key().cmp(&height.0) {
std::cmp::Ordering::Greater => return None,
std::cmp::Ordering::Equal => return Some(entry.remove()),
std::cmp::Ordering::Less => {
entry.remove();
}
}
}
}

// Filters the cached messages:
// - returns all of the current height messages.
// - drops messages from earlier heights.
Expand Down
2 changes: 0 additions & 2 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg:
sender.send((msg, broadcasted_message_metadata)).await.unwrap();
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn manager_multiple_heights_unordered() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
Expand Down Expand Up @@ -155,7 +154,6 @@ async fn manager_multiple_heights_unordered() {
assert_eq!(decision.block, BlockHash(Felt::TWO));
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn run_consensus_sync() {
// Set expectations.
Expand Down

0 comments on commit d5190a6

Please sign in to comment.