Skip to content

Commit

Permalink
feat(blockifier): add comprehensive state diff versioned constant (#2407
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yoavGrs authored and guy-starkware committed Dec 5, 2024
1 parent af955af commit ef40e92
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"segment_arena_cells": true,
"disable_cairo0_redeclaration": false,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"segment_arena_cells": true,
"disable_cairo0_redeclaration": false,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"segment_arena_cells": true,
"disable_cairo0_redeclaration": false,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": false,
"comprehensive_state_diff": false,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
},
"disable_cairo0_redeclaration": true,
"enable_stateful_compression": true,
"comprehensive_state_diff": true,
"allocation_cost": {
"blob_cost": {
"l1_gas": 0,
Expand Down
1 change: 1 addition & 0 deletions crates/blockifier/src/versioned_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub struct VersionedConstants {
// Transactions settings.
pub disable_cairo0_redeclaration: bool,
pub enable_stateful_compression: bool,
pub comprehensive_state_diff: bool,
pub ignore_inner_event_resources: bool,

// Compiler settings.
Expand Down
86 changes: 62 additions & 24 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_C
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand Down Expand Up @@ -94,34 +94,37 @@ where
/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
/// part of the single height consensus algorithm (e.g. messages from future heights).
#[derive(Debug, Default)]
struct MultiHeightManager {
struct MultiHeightManager<ContextT: ConsensusContext> {
validator_id: ValidatorId,
cached_messages: BTreeMap<u64, Vec<ConsensusMessage>>,
cached_proposals: BTreeMap<u64, (ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)>,
timeouts: TimeoutsConfig,
}

impl MultiHeightManager {
impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Create a new consensus manager.
pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self { validator_id, cached_messages: BTreeMap::new(), timeouts }
Self {
validator_id,
cached_messages: BTreeMap::new(),
cached_proposals: BTreeMap::new(),
timeouts,
}
}

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, broadcast_channels), level = "info")]
pub async fn run_height<ContextT>(
pub async fn run_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
is_observer: bool,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
{
) -> Result<Decision, ConsensusError> {
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
let mut shc = SingleHeightConsensus::new(
Expand All @@ -143,14 +146,31 @@ impl MultiHeightManager {
}

let mut current_height_messages = self.get_current_height_messages(height);
// If there's already a cached proposal, handle that before looping.
if let Some((init, proposal)) = self.get_current_proposal(height) {
let shc_return =
self.handle_proposal(context, height, &mut shc, init, proposal).await?;
// Handle potential tasks like validate the proposal.
match shc_return {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_events.push(task.run());
}
}
}
};

// No cached proposal, loop over incoming proposals, messages, cached messages, and events.
loop {
let shc_return = tokio::select! {
// TODO(Matan): remove report peer / continue propagation, as they are not cancel safe.
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
// Get the first message to verify the init was sent.
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// the first message (message_id=0) has arrived.
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
Expand All @@ -177,37 +197,35 @@ impl MultiHeightManager {
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
async fn handle_proposal(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: ProposalInit,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
) -> Result<ShcReturn, ConsensusError> {
if proposal_init.height != height {
// TODO(guyn): add caching of heights for future use.
warn!("Received a proposal for a different height. {:?}", proposal_init);
debug!("Received a proposal for a different height. {:?}", proposal_init);
if proposal_init.height > height {
// Note: this will overwrite an existing content_receiver for this height!
self.cached_proposals
.insert(proposal_init.height.0, (proposal_init, content_receiver));
}
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init, content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
async fn handle_message(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: ConsensusMessage,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
) -> Result<ShcReturn, ConsensusError> {
// TODO(matan): We need to figure out an actual caching strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
Expand All @@ -229,6 +247,26 @@ impl MultiHeightManager {
}
}

// Checks if a cached proposal already exists
// - returns the proposal if it exists and removes it from the cache.
// - returns None if no proposal exists.
// - cleans up any proposals from earlier heights.
fn get_current_proposal(
&mut self,
height: BlockNumber,
) -> Option<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
loop {
let entry = self.cached_proposals.first_entry()?;
match entry.key().cmp(&height.0) {
std::cmp::Ordering::Greater => return None,
std::cmp::Ordering::Equal => return Some(entry.remove()),
std::cmp::Ordering::Less => {
entry.remove();
}
}
}
}

// Filters the cached messages:
// - returns all of the current height messages.
// - drops messages from earlier heights.
Expand Down
Loading

0 comments on commit ef40e92

Please sign in to comment.