Skip to content

Commit

Permalink
fix: merge with current main
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 4, 2024
1 parent a6a227b commit 4237007
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 111 deletions.
86 changes: 62 additions & 24 deletions crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_C
use papyrus_network::network_manager::BroadcastTopicClientTrait;
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit};
use starknet_api::block::BlockNumber;
use tracing::{debug, info, instrument, warn};
use tracing::{debug, info, instrument};

use crate::config::TimeoutsConfig;
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
Expand Down Expand Up @@ -88,33 +88,36 @@ where
/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
/// part of the single height consensus algorithm (e.g. messages from future heights).
#[derive(Debug, Default)]
struct MultiHeightManager {
struct MultiHeightManager<ContextT: ConsensusContext> {
validator_id: ValidatorId,
cached_messages: BTreeMap<u64, Vec<ConsensusMessage>>,
cached_proposals: BTreeMap<u64, (ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)>,
timeouts: TimeoutsConfig,
}

impl MultiHeightManager {
impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
/// Create a new consensus manager.
pub fn new(validator_id: ValidatorId, timeouts: TimeoutsConfig) -> Self {
Self { validator_id, cached_messages: BTreeMap::new(), timeouts }
Self {
validator_id,
cached_messages: BTreeMap::new(),
cached_proposals: BTreeMap::new(),
timeouts,
}
}

/// Run the consensus algorithm for a single height.
///
/// Assumes that `height` is monotonically increasing across calls for the sake of filtering
/// `cached_messaged`.
#[instrument(skip(self, context, broadcast_channels), level = "info")]
pub async fn run_height<ContextT>(
pub async fn run_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
broadcast_channels: &mut BroadcastConsensusMessageChannel,
proposal_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<Decision, ConsensusError>
where
ContextT: ConsensusContext,
{
) -> Result<Decision, ConsensusError> {
let validators = context.validators(height).await;
info!("running consensus for height {height:?} with validator set {validators:?}");
let mut shc = SingleHeightConsensus::new(
Expand All @@ -135,14 +138,31 @@ impl MultiHeightManager {
}

let mut current_height_messages = self.get_current_height_messages(height);
// If there's already a cached proposal, handle that before looping.
if let Some((init, proposal)) = self.get_current_proposal(height) {
let shc_return =
self.handle_proposal(context, height, &mut shc, init, proposal).await?;
// Handle potential tasks like validate the proposal.
match shc_return {
ShcReturn::Decision(decision) => return Ok(decision),
ShcReturn::Tasks(tasks) => {
for task in tasks {
shc_events.push(task.run());
}
}
}
};

// No cached proposal, loop over incoming proposals, messages, cached messages, and events.
loop {
let shc_return = tokio::select! {
// TODO(Matan): remove report peer / continue propagation, as they are not cancel safe.
message = next_message(&mut current_height_messages, broadcast_channels) => {
self.handle_message(context, height, &mut shc, message?).await?
},
Some(mut content_receiver) = proposal_receiver.next() => {
// Get the first message to verify the init was sent.
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// TODO(guyn): add a timeout and panic, since StreamHandler should only send once
// the first message (message_id=0) has arrived.
let Some(first_part) = content_receiver.next().await else {
return Err(ConsensusError::InternalNetworkError(
Expand All @@ -169,37 +189,35 @@ impl MultiHeightManager {
}

// Handle a new proposal receiver from the network.
async fn handle_proposal<ContextT>(
async fn handle_proposal(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
proposal_init: ProposalInit,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(guyn): what is the right thing to do if proposal's height doesn't match?
) -> Result<ShcReturn, ConsensusError> {
if proposal_init.height != height {
// TODO(guyn): add caching of heights for future use.
warn!("Received a proposal for a different height. {:?}", proposal_init);
debug!("Received a proposal for a different height. {:?}", proposal_init);
if proposal_init.height > height {
// Note: this will overwrite an existing content_receiver for this height!
self.cached_proposals
.insert(proposal_init.height.0, (proposal_init, content_receiver));
}
return Ok(ShcReturn::Tasks(Vec::new()));
}
shc.handle_proposal(context, proposal_init, content_receiver).await
}

// Handle a single consensus message.
async fn handle_message<ContextT>(
async fn handle_message(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
message: ConsensusMessage,
) -> Result<ShcReturn, ConsensusError>
where
ContextT: ConsensusContext,
{
// TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints:
) -> Result<ShcReturn, ConsensusError> {
// TODO(matan): We need to figure out an actual caching strategy under 2 constraints:
// 1. Malicious - must be capped so a malicious peer can't DoS us.
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
// In general I think we will want to only cache (H+1, 0) messages.
Expand All @@ -221,6 +239,26 @@ impl MultiHeightManager {
}
}

// Checks if a cached proposal already exists
// - returns the proposal if it exists and removes it from the cache.
// - returns None if no proposal exists.
// - cleans up any proposals from earlier heights.
fn get_current_proposal(
&mut self,
height: BlockNumber,
) -> Option<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
loop {
let entry = self.cached_proposals.first_entry()?;
match entry.key().cmp(&height.0) {
std::cmp::Ordering::Greater => return None,
std::cmp::Ordering::Equal => return Some(entry.remove()),
std::cmp::Ordering::Less => {
entry.remove();
}
}
}
}

// Filters the cached messages:
// - returns all of the current height messages.
// - drops messages from earlier heights.
Expand Down
99 changes: 47 additions & 52 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use starknet_types_core::felt::Felt;

use super::{run_consensus, MultiHeightManager};
use crate::config::TimeoutsConfig;
use crate::test_utils::{precommit, prevote, proposal, proposal_init};
use crate::test_utils::{precommit, prevote, proposal_init};
use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId};

lazy_static! {
Expand Down Expand Up @@ -93,47 +93,67 @@ async fn send(sender: &mut MockBroadcastedMessagesSender<ConsensusMessage>, msg:

async fn send_proposal(
proposal_receiver_sender: &mut mpsc::Sender<mpsc::Receiver<ProposalPart>>,
content: ProposalPart,
content: Vec<ProposalPart>,
) {
let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE);
proposal_receiver_sender.send(proposal_receiver).await.unwrap();
proposal_sender.send(content).await.unwrap();
for item in content {
proposal_sender.send(item).await.unwrap();
}
}

fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt) {
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
BlockHash(block_hash),
ProposalFin { proposal_content_id: BlockHash(block_hash) },
))
.unwrap();
block_receiver
})
.times(1);
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn manager_multiple_heights_unordered() {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut sender = mock_network.broadcasted_messages_sender;

// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (_proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

// Send messages for height 2 followed by those for height 1.
send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await;
send_proposal(
&mut proposal_receiver_sender,
vec![
ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)),
ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }),
],
)
.await;
send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;

send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await;
send_proposal(
&mut proposal_receiver_sender,
vec![
ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)),
ProposalPart::Fin(ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }),
],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;

let mut context = MockTestContext::new();
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
BlockHash(Felt::ONE),
ProposalFin { proposal_content_id: BlockHash(Felt::ONE) },
))
.unwrap();
block_receiver
})
.times(1);
expect_validate_proposal(&mut context, Felt::ONE);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
Expand All @@ -153,19 +173,7 @@ async fn manager_multiple_heights_unordered() {
assert_eq!(decision.block, BlockHash(Felt::ONE));

// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((
BlockHash(Felt::TWO),
ProposalFin { proposal_content_id: BlockHash(Felt::TWO) },
))
.unwrap();
block_receiver
})
.times(1);
expect_validate_proposal(&mut context, Felt::TWO);
let decision = manager
.run_height(
&mut context,
Expand All @@ -178,7 +186,6 @@ async fn manager_multiple_heights_unordered() {
assert_eq!(decision.block, BlockHash(Felt::TWO));
}

#[ignore] // TODO(guyn): return this once caching proposals is implemented.
#[tokio::test]
async fn run_consensus_sync() {
// Set expectations.
Expand All @@ -188,13 +195,7 @@ async fn run_consensus_sync() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }))
.unwrap();
block_receiver
});
expect_validate_proposal(&mut context, Felt::TWO);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
Expand All @@ -209,7 +210,7 @@ async fn run_consensus_sync() {
// Send messages for height 2.
send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)),
vec![ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))],
)
.await;
let TestSubscriberChannels { mock_network, subscriber_channels } =
Expand Down Expand Up @@ -258,13 +259,7 @@ async fn run_consensus_sync_cancellation_safety() {
// TODO(guyn): refactor this test to pass proposals through the correct channels.
let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE);

context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender
.send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }))
.unwrap();
block_receiver
});
expect_validate_proposal(&mut context, Felt::ONE);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
Expand Down Expand Up @@ -304,7 +299,7 @@ async fn run_consensus_sync_cancellation_safety() {
// Send a proposal for height 1.
send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)),
vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
)
.await;
proposal_handled_rx.await.unwrap();
Expand Down Expand Up @@ -336,7 +331,7 @@ async fn test_timeouts() {

send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)),
vec![ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await;
Expand Down Expand Up @@ -390,7 +385,7 @@ async fn test_timeouts() {
// reach a decision.
send_proposal(
&mut proposal_receiver_sender,
ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID)),
vec![ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await;
Expand Down
27 changes: 1 addition & 26 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,7 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use mockall::mock;
use papyrus_protobuf::consensus::{
ConsensusMessage,
Proposal,
ProposalFin,
ProposalInit,
Vote,
VoteType,
};
use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType};
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_types_core::felt::Felt;
Expand Down Expand Up @@ -129,24 +122,6 @@ pub fn precommit(
voter,
})
}

pub fn proposal(
block_felt: Felt,
height: u64,
round: u32,
proposer: ValidatorId,
) -> ConsensusMessage {
let block_hash = BlockHash(block_felt);
ConsensusMessage::Proposal(Proposal {
height,
block_hash,
round,
proposer,
transactions: Vec::new(),
valid_round: None,
})
}

pub fn proposal_init(height: u64, round: u32, proposer: ValidatorId) -> ProposalInit {
ProposalInit { height: BlockNumber(height), round, proposer, valid_round: None }
}
Loading

0 comments on commit 4237007

Please sign in to comment.