From 568a63bcc8a2e36a5a4fff127ac2c2c99c366308 Mon Sep 17 00:00:00 2001 From: Guy Nir Date: Sun, 17 Nov 2024 15:22:44 +0200 Subject: [PATCH] feat: allow a streamed proposal channel on top of existing one --- crates/papyrus_node/src/run.rs | 4 +- crates/papyrus_protobuf/src/converters/mod.rs | 4 - .../papyrus_consensus/src/manager.rs | 65 ++++++---- .../papyrus_consensus/src/manager_test.rs | 85 ++++++++++--- .../src/single_height_consensus.rs | 69 +++++------ .../src/single_height_consensus_test.rs | 30 +++-- .../papyrus_consensus/src/test_utils.rs | 48 +++++++- .../sequencing/papyrus_consensus/src/types.rs | 9 +- .../src/papyrus_consensus_context.rs | 46 ++++--- .../src/papyrus_consensus_context_test.rs | 18 ++- .../src/sequencer_consensus_context.rs | 114 ++++++++++-------- .../src/sequencer_consensus_context_test.rs | 102 ++++++++++++---- 12 files changed, 397 insertions(+), 197 deletions(-) diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index 5a8a02c135..c09cfe22a6 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -51,10 +51,8 @@ const DEFAULT_LEVEL: LevelFilter = LevelFilter::INFO; // different genesis hash. // TODO: Consider moving to a more general place. const GENESIS_HASH: &str = "0x0"; -<<<<<<< HEAD + // TODO(guyn): move this to the config. -======= ->>>>>>> 93fd11f32 (feat: allow a streamed proposal channel on top of existing one) pub const NETWORK_TOPIC: &str = "consensus_proposals"; // TODO(dvir): add this to config. diff --git a/crates/papyrus_protobuf/src/converters/mod.rs b/crates/papyrus_protobuf/src/converters/mod.rs index ae9a23c891..ed2fee8ba5 100644 --- a/crates/papyrus_protobuf/src/converters/mod.rs +++ b/crates/papyrus_protobuf/src/converters/mod.rs @@ -27,10 +27,6 @@ pub enum ProtobufConversionError { type_description: &'static str, value_as_str: String, expected: &'static str, -<<<<<<< HEAD -======= - got: &'static str, ->>>>>>> 93fd11f32 (feat: allow a streamed proposal channel on top of existing one) }, #[error(transparent)] DecodeError(#[from] DecodeError), diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index fe5ad992da..8138ea502f 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -7,14 +7,14 @@ mod manager_test; use std::collections::BTreeMap; use std::time::Duration; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt}; use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::BroadcastTopicClientTrait; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalWrapper}; -use starknet_api::block::{BlockHash, BlockNumber}; -use tracing::{debug, info, instrument}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; +use starknet_api::block::BlockNumber; +use tracing::{debug, info, instrument, warn}; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus}; @@ -42,8 +42,6 @@ pub async fn run_consensus( ) -> Result<(), ConsensusError> where ContextT: ConsensusContext, - ProposalWrapper: - Into<(ProposalInit, mpsc::Receiver, oneshot::Receiver)>, SyncReceiverT: Stream + Unpin, { info!( @@ -116,11 +114,6 @@ impl MultiHeightManager { ) -> Result where ContextT: ConsensusContext, - ProposalWrapper: Into<( - ProposalInit, - mpsc::Receiver, - oneshot::Receiver, - )>, { let validators = context.validators(height).await; info!("running consensus for height {height:?} with validator set {validators:?}"); @@ -147,6 +140,18 @@ impl MultiHeightManager { message = next_message(&mut current_height_messages, broadcast_channels) => { self.handle_message(context, height, &mut shc, message?).await? }, + Some(mut content_receiver) = proposal_receiver.next() => { + // Get the first message to verify the init was sent. + // TODO(guyn): add a timeout and panic if nothing comes from StreamHandler + // (if it isn't sending things, that means something is wrong). + let Some(first_part) = content_receiver.next().await else { + return Err(ConsensusError::InternalNetworkError( + "Proposal receiver closed".to_string(), + )); + }; + let proposal_init: ProposalInit = first_part.try_into()?; + self.handle_proposal(context, height, &mut shc, proposal_init, content_receiver).await? + }, Some(shc_event) = shc_events.next() => { shc.handle_event(context, shc_event).await? }, @@ -163,6 +168,26 @@ impl MultiHeightManager { } } + // Handle a new proposal receiver from the network. + async fn handle_proposal( + &mut self, + context: &mut ContextT, + height: BlockNumber, + shc: &mut SingleHeightConsensus, + proposal_init: ProposalInit, + content_receiver: mpsc::Receiver, + ) -> Result + where + ContextT: ConsensusContext, + { + // TODO(guyn): what is the right thing to do if proposal's height doesn't match? + if proposal_init.height != height { + // TODO(guyn): add caching of heights for future use. + warn!("Received a proposal for a different height. {:?}", proposal_init); + } + shc.handle_proposal(context, proposal_init, content_receiver).await + } + // Handle a single consensus message. async fn handle_message( &mut self, @@ -173,11 +198,6 @@ impl MultiHeightManager { ) -> Result where ContextT: ConsensusContext, - ProposalWrapper: Into<( - ProposalInit, - mpsc::Receiver, - oneshot::Receiver, - )>, { // TODO(matan): We need to figure out an actual cacheing strategy under 2 constraints: // 1. Malicious - must be capped so a malicious peer can't DoS us. @@ -191,16 +211,9 @@ impl MultiHeightManager { return Ok(ShcReturn::Tasks(Vec::new())); } match message { - ConsensusMessage::Proposal(proposal) => { - // Special case due to fake streaming. - // TODO(guyn): this will be gone once we integrate the proposal channels. - let (proposal_init, content_receiver, fin_receiver) = - ProposalWrapper(proposal).into(); - let res = shc - .handle_proposal(context, proposal_init, content_receiver, fin_receiver) - .await?; - Ok(res) - } + ConsensusMessage::Proposal(_) => Err(ConsensusError::InternalNetworkError( + "Proposal variant of ConsensusMessage no longer supported".to_string(), + )), _ => { let res = shc.handle_message(context, message).await?; Ok(res) diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index e4f03e65b0..4f03cc0a8a 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -13,7 +13,13 @@ use papyrus_network::network_manager::test_utils::{ TestSubscriberChannels, }; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, ProposalPart, Vote}; +use papyrus_protobuf::consensus::{ + ConsensusMessage, + ProposalFin, + ProposalInit, + ProposalPart, + Vote, +}; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::transaction::Transaction; @@ -21,7 +27,7 @@ use starknet_types_core::felt::Felt; use super::{run_consensus, MultiHeightManager}; use crate::config::TimeoutsConfig; -use crate::test_utils::{precommit, prevote, proposal}; +use crate::test_utils::{precommit, prevote, proposal, proposal_init}; use crate::types::{ConsensusContext, ConsensusError, ProposalContentId, Round, ValidatorId}; lazy_static! { @@ -53,8 +59,8 @@ mock! { height: BlockNumber, round: Round, timeout: Duration, - content: mpsc::Receiver - ) -> oneshot::Receiver; + content: mpsc::Receiver + ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; async fn repropose( &mut self, @@ -85,6 +91,16 @@ async fn send(sender: &mut MockBroadcastedMessagesSender, msg: sender.send((msg, broadcasted_message_metadata)).await.unwrap(); } +async fn send_proposal( + proposal_receiver_sender: &mut mpsc::Sender>, + content: ProposalPart, +) { + let (mut proposal_sender, proposal_receiver) = mpsc::channel(CHANNEL_SIZE); + proposal_receiver_sender.send(proposal_receiver).await.unwrap(); + proposal_sender.send(content).await.unwrap(); +} + +#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn manager_multiple_heights_unordered() { let TestSubscriberChannels { mock_network, subscriber_channels } = @@ -92,13 +108,13 @@ async fn manager_multiple_heights_unordered() { let mut sender = mock_network.broadcasted_messages_sender; // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = - mpsc::channel(CHANNEL_SIZE); + let (_proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); // Send messages for height 2 followed by those for height 1. send(&mut sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; + send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await; @@ -109,7 +125,12 @@ async fn manager_multiple_heights_unordered() { .expect_validate_proposal() .return_once(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::ONE)).unwrap(); + block_sender + .send(( + BlockHash(Felt::ONE), + ProposalFin { proposal_content_id: BlockHash(Felt::ONE) }, + )) + .unwrap(); block_receiver }) .times(1); @@ -136,7 +157,12 @@ async fn manager_multiple_heights_unordered() { .expect_validate_proposal() .return_once(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::TWO)).unwrap(); + block_sender + .send(( + BlockHash(Felt::TWO), + ProposalFin { proposal_content_id: BlockHash(Felt::TWO) }, + )) + .unwrap(); block_receiver }) .times(1); @@ -152,6 +178,7 @@ async fn manager_multiple_heights_unordered() { assert_eq!(decision.block, BlockHash(Felt::TWO)); } +#[ignore] // TODO(guyn): return this once caching proposals is implemented. #[tokio::test] async fn run_consensus_sync() { // Set expectations. @@ -159,11 +186,13 @@ async fn run_consensus_sync() { let (decision_tx, decision_rx) = oneshot::channel(); // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); context.expect_validate_proposal().return_once(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::TWO)).unwrap(); + block_sender + .send((BlockHash(Felt::TWO), ProposalFin { proposal_content_id: BlockHash(Felt::TWO) })) + .unwrap(); block_receiver }); context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); @@ -178,10 +207,14 @@ async fn run_consensus_sync() { }); // Send messages for height 2. + send_proposal( + &mut proposal_receiver_sender, + ProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID)), + ) + .await; let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut network_sender = mock_network.broadcasted_messages_sender; - send(&mut network_sender, proposal(Felt::TWO, 2, 0, *PROPOSER_ID)).await; send(&mut network_sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; send(&mut network_sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await; @@ -223,11 +256,13 @@ async fn run_consensus_sync_cancellation_safety() { let (decision_tx, decision_rx) = oneshot::channel(); // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); + let (mut proposal_receiver_sender, proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); context.expect_validate_proposal().return_once(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::ONE)).unwrap(); + block_sender + .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) + .unwrap(); block_receiver }); context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); @@ -267,7 +302,11 @@ async fn run_consensus_sync_cancellation_safety() { let mut network_sender = mock_network.broadcasted_messages_sender; // Send a proposal for height 1. - send(&mut network_sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; + send_proposal( + &mut proposal_receiver_sender, + ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + ) + .await; proposal_handled_rx.await.unwrap(); // Send an old sync. This should not cancel the current height. @@ -292,10 +331,14 @@ async fn test_timeouts() { let mut sender = mock_network.broadcasted_messages_sender; // TODO(guyn): refactor this test to pass proposals through the correct channels. - let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) = + let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(CHANNEL_SIZE); - send(&mut sender, proposal(Felt::ONE, 1, 0, *PROPOSER_ID)).await; + send_proposal( + &mut proposal_receiver_sender, + ProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID)), + ) + .await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_2)).await; send(&mut sender, prevote(None, 1, 0, *VALIDATOR_ID_3)).await; send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_2)).await; @@ -305,7 +348,9 @@ async fn test_timeouts() { context.expect_set_height_and_round().returning(move |_, _| ()); context.expect_validate_proposal().returning(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BlockHash(Felt::ONE)).unwrap(); + block_sender + .send((BlockHash(Felt::ONE), ProposalFin { proposal_content_id: BlockHash(Felt::ONE) })) + .unwrap(); block_receiver }); context @@ -343,7 +388,11 @@ async fn test_timeouts() { timeout_receive.await.unwrap(); // Show that after the timeout is triggered we can still precommit in favor of the block and // reach a decision. - send(&mut sender, proposal(Felt::ONE, 1, 1, *PROPOSER_ID)).await; + send_proposal( + &mut proposal_receiver_sender, + ProposalPart::Init(proposal_init(1, 1, *PROPOSER_ID)), + ) + .await; send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *PROPOSER_ID)).await; send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_2)).await; send(&mut sender, prevote(Some(Felt::ONE), 1, 1, *VALIDATOR_ID_3)).await; diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index f24ef8922c..c36c3d7e79 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -9,7 +9,7 @@ use std::time::Duration; #[cfg(test)] use enum_as_inner::EnumAsInner; use futures::channel::{mpsc, oneshot}; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote, VoteType}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote, VoteType}; use starknet_api::block::BlockNumber; use tracing::{debug, info, instrument, trace, warn}; @@ -40,7 +40,7 @@ pub enum ShcEvent { Precommit(StateMachineEvent), BuildProposal(StateMachineEvent), // TODO: Replace ProposalContentId with the unvalidated signature from the proposer. - ValidateProposal(StateMachineEvent, Option), + ValidateProposal(StateMachineEvent, Option), } #[derive(Debug)] @@ -66,11 +66,7 @@ pub enum ShcTask { /// result without blocking consensus. /// 3. Once validation is complete, the manager returns the built proposal to the SHC as an /// event, which can be sent to the SM. - ValidateProposal( - ProposalInit, - oneshot::Receiver, // Block built from the content. - oneshot::Receiver, // Fin sent by the proposer. - ), + ValidateProposal(ProposalInit, oneshot::Receiver<(ProposalContentId, ProposalFin)>), } impl PartialEq for ShcTask { @@ -82,9 +78,7 @@ impl PartialEq for ShcTask { | (ShcTask::Prevote(d1, e1), ShcTask::Prevote(d2, e2)) | (ShcTask::Precommit(d1, e1), ShcTask::Precommit(d2, e2)) => d1 == d2 && e1 == e2, (ShcTask::BuildProposal(r1, _), ShcTask::BuildProposal(r2, _)) => r1 == r2, - (ShcTask::ValidateProposal(pi1, _, _), ShcTask::ValidateProposal(pi2, _, _)) => { - pi1 == pi2 - } + (ShcTask::ValidateProposal(pi1, _), ShcTask::ValidateProposal(pi2, _)) => pi1 == pi2, _ => false, } } @@ -117,28 +111,24 @@ impl ShcTask { let proposal_id = receiver.await.ok(); ShcEvent::BuildProposal(StateMachineEvent::GetProposal(proposal_id, round)) } - ShcTask::ValidateProposal( - init, - id_built_from_content_receiver, - fin_from_proposer_receiver, - ) => { + ShcTask::ValidateProposal(init, block_receiver) => { // Handle the result of the block validation: - // - If successful, set it as Some. + // The output is a tuple with the proposal id, calculated and from network. + // - If successful, set it as (Some, Some). // - If there was an error (e.g., invalid proposal, no proposal received from the - // peer, or the process was interrupted), set it to None. + // peer, or the process was interrupted), set it to (None, None). // TODO(Asmaa): Consider if we want to differentiate between an interrupt and other // failures. - let proposal_id = match id_built_from_content_receiver.await { - Ok(proposal_id) => Some(proposal_id), - Err(_) => None, - }; - let fin = match fin_from_proposer_receiver.await { - Ok(fin) => Some(fin), - Err(_) => None, + let (built_content_id, received_proposal_id) = match block_receiver.await { + Ok((built_content_id, received_proposal_id)) => { + (Some(built_content_id), Some(received_proposal_id)) + } + // Proposal never received from peer. + Err(_) => (None, None), }; ShcEvent::ValidateProposal( - StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round), - fin, + StateMachineEvent::Proposal(built_content_id, init.round, init.valid_round), + received_proposal_id, ) } } @@ -212,8 +202,7 @@ impl SingleHeightConsensus { &mut self, context: &mut ContextT, init: ProposalInit, - p2p_messages_receiver: mpsc::Receiver, - fin_receiver: oneshot::Receiver, + p2p_messages_receiver: mpsc::Receiver, ) -> Result { debug!( "Received proposal: height={}, round={}, proposer={:?}", @@ -245,7 +234,7 @@ impl SingleHeightConsensus { ) .await; context.set_height_and_round(self.height, self.state_machine.round()).await; - Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver, fin_receiver)])) + Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver)])) } async fn process_inbound_proposal( @@ -320,20 +309,26 @@ impl SingleHeightConsensus { )])) } ShcEvent::ValidateProposal( - StateMachineEvent::Proposal(proposal_id, round, valid_round), - fin, + StateMachineEvent::Proposal(built_content_id, round, valid_round), + received_proposal_id, ) => { // TODO(matan): Switch to signature validation. - let id = if proposal_id != fin { + + let mut id = None; + if let (Some(built_content_id), Some(ProposalFin { proposal_content_id })) = + (built_content_id, received_proposal_id.clone()) + { + if built_content_id == proposal_content_id { + id = Some(built_content_id); + } + } + if id.is_none() { warn!( "proposal_id built from content receiver does not match fin: {:#064x?} != \ {:#064x?}", - proposal_id, fin + built_content_id, received_proposal_id ); - None - } else { - proposal_id - }; + } // Retaining the entry for this round prevents us from receiving another proposal on // this round. If the validations failed, which can be caused by a network issue, we // may want to re-open ourselves to this round. The downside is that this may open diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs index 3f0bd44856..e60cf46d4a 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus_test.rs @@ -1,6 +1,7 @@ use futures::channel::{mpsc, oneshot}; +use futures::SinkExt; use lazy_static::lazy_static; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; use test_case::test_case; @@ -10,7 +11,7 @@ use super::SingleHeightConsensus; use crate::config::TimeoutsConfig; use crate::single_height_consensus::{ShcEvent, ShcReturn, ShcTask}; use crate::state_machine::StateMachineEvent; -use crate::test_utils::{precommit, prevote, MockTestContext, TestBlock}; +use crate::test_utils::{precommit, prevote, MockProposalPart, MockTestContext, TestBlock}; use crate::types::{ConsensusError, ValidatorId}; lazy_static! { @@ -30,10 +31,14 @@ lazy_static! { static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); static ref VALIDATE_PROPOSAL_EVENT: ShcEvent = ShcEvent::ValidateProposal( StateMachineEvent::Proposal(Some(BLOCK.id), PROPOSAL_INIT.round, PROPOSAL_INIT.valid_round,), - Some(BLOCK.id), + Some(ProposalFin { proposal_content_id: BLOCK.id }), ); + #[derive(Debug)] + static ref PROPOSAL_FIN: ProposalFin = ProposalFin { proposal_content_id: BLOCK.id }; } +const CHANNEL_SIZE: usize = 1; + fn prevote_task(block_felt: Option, round: u32) -> ShcTask { ShcTask::Prevote( TIMEOUTS.prevote_timeout, @@ -64,17 +69,10 @@ async fn handle_proposal( context: &mut MockTestContext, ) -> ShcReturn { // Send the proposal from the peer. - let (fin_sender, fin_receiver) = oneshot::channel(); - fin_sender.send(BLOCK.id).unwrap(); - - shc.handle_proposal( - context, - PROPOSAL_INIT.clone(), - mpsc::channel(1).1, // content - ignored by SHC. - fin_receiver, - ) - .await - .unwrap() + let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); + content_sender.send(MockProposalPart(1)).await.unwrap(); + + shc.handle_proposal(context, PROPOSAL_INIT.clone(), content_receiver).await.unwrap() } #[tokio::test] @@ -176,7 +174,7 @@ async fn validator(repeat_proposal: bool) { context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_validate_proposal().times(1).returning(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BLOCK.id).unwrap(); + block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver }); context.expect_set_height_and_round().returning(move |_, _| ()); @@ -255,7 +253,7 @@ async fn vote_twice(same_vote: bool) { context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID); context.expect_validate_proposal().times(1).returning(move |_, _, _, _| { let (block_sender, block_receiver) = oneshot::channel(); - block_sender.send(BLOCK.id).unwrap(); + block_sender.send((BLOCK.id, PROPOSAL_FIN.clone())).unwrap(); block_receiver }); context.expect_set_height_and_round().returning(move |_, _| ()); diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 37e51af60d..66d128b117 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -6,11 +6,12 @@ use mockall::mock; use papyrus_protobuf::consensus::{ ConsensusMessage, Proposal, + ProposalFin, ProposalInit, - ProposalPart, Vote, VoteType, }; +use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_types_core::felt::Felt; @@ -23,6 +24,41 @@ pub struct TestBlock { pub id: BlockHash, } +#[derive(Debug, PartialEq, Clone)] +pub struct MockProposalPart(pub u64); + +impl From for MockProposalPart { + fn from(init: ProposalInit) -> Self { + MockProposalPart(init.height.0) + } +} + +impl TryFrom for ProposalInit { + type Error = ProtobufConversionError; + fn try_from(part: MockProposalPart) -> Result { + Ok(ProposalInit { + height: BlockNumber(part.0), + round: 0, + proposer: ValidatorId::default(), + valid_round: None, + }) + } +} + +impl Into> for MockProposalPart { + fn into(self) -> Vec { + vec![self.0 as u8] + } +} + +impl TryFrom> for MockProposalPart { + type Error = ProtobufConversionError; + + fn try_from(value: Vec) -> Result { + Ok(MockProposalPart(value[0].into())) + } +} + // TODO(matan): When QSelf is supported, switch to automocking `ConsensusContext`. mock! { pub TestContext {} @@ -30,7 +66,7 @@ mock! { #[async_trait] impl ConsensusContext for TestContext { type ProposalChunk = u32; - type ProposalPart = ProposalPart; + type ProposalPart = MockProposalPart; async fn build_proposal( &mut self, @@ -43,8 +79,8 @@ mock! { height: BlockNumber, round: Round, timeout: Duration, - content: mpsc::Receiver - ) -> oneshot::Receiver; + content: mpsc::Receiver + ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; async fn repropose( &mut self, @@ -110,3 +146,7 @@ pub fn proposal( valid_round: None, }) } + +pub fn proposal_init(height: u64, round: u32, proposer: ValidatorId) -> ProposalInit { + ProposalInit { height: BlockNumber(height), round, proposer, valid_round: None } +} diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index e1b8ce6936..282185ed47 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -9,7 +9,7 @@ use papyrus_network::network_manager::{ GenericReceiver, }; use papyrus_network_types::network_types::BroadcastedMessageMetadata; -use papyrus_protobuf::consensus::{ConsensusMessage, ProposalInit, Vote}; +use papyrus_protobuf::consensus::{ConsensusMessage, ProposalFin, ProposalInit, Vote}; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::ContractAddress; @@ -35,7 +35,8 @@ pub trait ConsensusContext { + TryInto + From + Clone - + Send; + + Send + + Debug; // TODO(matan): The oneshot for receiving the build block could be generalized to just be some // future which returns a block. @@ -77,8 +78,8 @@ pub trait ConsensusContext { height: BlockNumber, round: Round, timeout: Duration, - content: mpsc::Receiver, - ) -> oneshot::Receiver; + content: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)>; /// This function is called by consensus to retrieve the content of a previously built or /// validated proposal. It broadcasts the proposal to the network. diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs index 4d5523be74..ad3f6fa0ec 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context.rs @@ -168,8 +168,8 @@ impl ConsensusContext for PapyrusConsensusContext { height: BlockNumber, _round: Round, _timeout: Duration, - mut content: mpsc::Receiver, - ) -> oneshot::Receiver { + mut content: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { let (fin_sender, fin_receiver) = oneshot::channel(); let storage_reader = self.storage_reader.clone(); @@ -189,19 +189,35 @@ impl ConsensusContext for PapyrusConsensusContext { panic!("Block in {height} was not found in storage despite waiting for it") }); - for tx in transactions.iter() { - let received_tx = content - .next() - .await - .unwrap_or_else(|| panic!("Not received transaction equals to {tx:?}")); + // First gather all the non-fin transactions. + let mut content_transactions: Vec = Vec::new(); + let received_block_hash = loop { + match content.next().await { + Some(ProposalPart::Transactions(batch)) => { + for tx in batch.transactions { + content_transactions.push(tx); + } + } + Some(ProposalPart::Fin(fin)) => { + break fin.proposal_content_id; + } + msg => panic!("Unexpected message: {msg:?}"), + } + }; + + // Check each transaction matches the transactions in the storage. + for tx in transactions.iter().rev() { + let received_tx = content_transactions + .pop() + .expect("Received less transactions than expected"); if tx != &received_tx { panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}"); } } - - if content.next().await.is_some() { - panic!("Received more transactions than expected"); - } + assert!( + content_transactions.is_empty(), + "Received more transactions than expected" + ); let block_hash = txn .get_block_header(height) @@ -219,9 +235,11 @@ impl ConsensusContext for PapyrusConsensusContext { // Done after inserting the proposal into the map to avoid race conditions between // insertion and calls to `repropose`. // This can happen as a result of sync interrupting `run_height`. - fin_sender.send(block_hash).unwrap_or_else(|_| { - warn!("Failed to send block to consensus. height={height}"); - }) + fin_sender + .send((block_hash, ProposalFin { proposal_content_id: received_block_hash })) + .unwrap_or_else(|_| { + warn!("Failed to send block to consensus. height={height}"); + }) } .instrument(debug_span!("consensus_validate_proposal")), ); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs index 25712fe2a7..49b1c5c7ff 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/papyrus_consensus_context_test.rs @@ -12,9 +12,11 @@ use papyrus_network::network_manager::test_utils::{ use papyrus_network::network_manager::BroadcastTopicChannels; use papyrus_protobuf::consensus::{ ConsensusMessage, + ProposalFin, ProposalInit, ProposalPart, StreamMessage, + TransactionBatch, Vote, }; use papyrus_storage::body::BodyStorageWriter; @@ -55,8 +57,14 @@ async fn validate_proposal_success() { let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE); for tx in block.body.transactions.clone() { - validate_sender.try_send(tx).unwrap(); + let tx_part = ProposalPart::Transactions(TransactionBatch { + transactions: vec![tx], + tx_hashes: vec![], + }); + validate_sender.try_send(tx_part).unwrap(); } + let fin_part = ProposalPart::Fin(ProposalFin { proposal_content_id: block.header.block_hash }); + validate_sender.try_send(fin_part).unwrap(); validate_sender.close_channel(); let fin = papyrus_context @@ -65,7 +73,7 @@ async fn validate_proposal_success() { .await .unwrap(); - assert_eq!(fin, block.header.block_hash); + assert_eq!(fin.0, block.header.block_hash); } #[tokio::test] @@ -76,7 +84,11 @@ async fn validate_proposal_fail() { let different_block = get_test_block(4, None, None, None); let (mut validate_sender, validate_receiver) = mpsc::channel(5000); for tx in different_block.body.transactions.clone() { - validate_sender.try_send(tx).unwrap(); + let tx_part = ProposalPart::Transactions(TransactionBatch { + transactions: vec![tx], + tx_hashes: vec![], + }); + validate_sender.try_send(tx_part).unwrap(); } validate_sender.close_channel(); diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 41f1044c98..e5ecdc0e34 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -29,7 +29,8 @@ use papyrus_protobuf::consensus::{ Vote, }; use starknet_api::block::{BlockHash, BlockHashAndNumber, BlockInfo, BlockNumber, BlockTimestamp}; -use starknet_api::executable_transaction::Transaction; +use starknet_api::executable_transaction::Transaction as ExecutableTransaction; +use starknet_api::transaction::Transaction; use starknet_batcher_types::batcher_types::{ DecisionReachedInput, GetProposalContent, @@ -51,8 +52,8 @@ use tracing::{debug, debug_span, error, info, trace, warn, Instrument}; // Note that multiple proposals IDs can be associated with the same content, but we only need to // store one of them. type HeightToIdToContent = - BTreeMap, ProposalId)>>; -type ValidationParams = (BlockNumber, Duration, mpsc::Receiver>); + BTreeMap, ProposalId)>>; +type ValidationParams = (BlockNumber, Duration, mpsc::Receiver); const CHANNEL_SIZE: usize = 100; @@ -78,7 +79,8 @@ pub struct SequencerConsensusContext { // current round. active_proposal: Option<(Arc, JoinHandle<()>)>, // Stores proposals for future rounds until the round is reached. - queued_proposals: BTreeMap)>, + queued_proposals: + BTreeMap)>, outbound_proposal_sender: mpsc::Sender<(u64, mpsc::Receiver)>, } @@ -186,23 +188,27 @@ impl ConsensusContext for SequencerConsensusContext { fin_receiver } + // Note: this function does not receive ProposalInit. + // That part is consumed by the caller, so it can know the height/round. async fn validate_proposal( &mut self, height: BlockNumber, round: Round, timeout: Duration, - content: mpsc::Receiver, - ) -> oneshot::Receiver { + content_receiver: mpsc::Receiver, + ) -> oneshot::Receiver<(ProposalContentId, ProposalFin)> { assert_eq!(Some(height), self.current_height); let (fin_sender, fin_receiver) = oneshot::channel(); match round.cmp(&self.current_round) { std::cmp::Ordering::Less => fin_receiver, std::cmp::Ordering::Greater => { - self.queued_proposals.insert(round, ((height, timeout, content), fin_sender)); + self.queued_proposals + .insert(round, ((height, timeout, content_receiver), fin_sender)); fin_receiver } std::cmp::Ordering::Equal => { - self.validate_current_round_proposal(height, timeout, content, fin_sender).await; + self.validate_current_round_proposal(height, timeout, content_receiver, fin_sender) + .await; fin_receiver } } @@ -219,7 +225,7 @@ impl ConsensusContext for SequencerConsensusContext { .unwrap_or_else(|| panic!("No proposals found for height {height}")) .get(&id) .unwrap_or_else(|| panic!("No proposal found for height {height} and id {id}")); - // TODO: Stream the TXs to the network. + // TODO(guyn): Stream the TXs to the network. } async fn validators(&self, _height: BlockNumber) -> Vec { @@ -310,8 +316,8 @@ impl SequencerConsensusContext { &mut self, height: BlockNumber, timeout: Duration, - content: mpsc::Receiver>, - fin_sender: oneshot::Sender, + content_receiver: mpsc::Receiver, + fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, ) { debug!("Validating proposal for height: {height} with timeout: {timeout:?}"); let batcher = Arc::clone(&self.batcher); @@ -351,7 +357,7 @@ impl SequencerConsensusContext { proposal_id, batcher, valid_proposals, - content, + content_receiver, fin_sender, ); tokio::select! { @@ -464,65 +470,79 @@ async fn stream_validate_proposal( proposal_id: ProposalId, batcher: Arc, valid_proposals: Arc>, - mut content_receiver: mpsc::Receiver>, - fin_sender: oneshot::Sender, + mut content_receiver: mpsc::Receiver, + fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, ) { let mut content = Vec::new(); - while let Some(txs) = content_receiver.next().await { - content.extend_from_slice(&txs[..]); - let input = - SendProposalContentInput { proposal_id, content: SendProposalContent::Txs(txs) }; - let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { - panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") - }); - match response.response { - ProposalStatus::Processing => {} - ProposalStatus::Finished(fin) => { - panic!("Batcher returned Fin before all content was sent: {proposal_id:?} {fin:?}"); - } - ProposalStatus::Aborted => { - panic!("Unexpected abort response for proposal: {:?}", proposal_id); + let network_block_id = loop { + let Some(prop_part) = content_receiver.next().await else { + warn!("Failed to receive proposal content: {proposal_id:?}"); + return; + }; + match prop_part { + ProposalPart::Transactions(TransactionBatch { transactions: txs, tx_hashes }) => { + let exe_txs: Vec = txs + .into_iter() + .zip(tx_hashes.into_iter()) + .map(|tx_tup| tx_tup.into()) + .collect(); + content.extend_from_slice(&exe_txs[..]); + let input = SendProposalContentInput { + proposal_id, + content: SendProposalContent::Txs(exe_txs), + }; + let response = batcher.send_proposal_content(input).await.unwrap_or_else(|e| { + panic!("Failed to send proposal content to batcher: {proposal_id:?}. {e:?}") + }); + match response.response { + ProposalStatus::Processing => {} + ProposalStatus::InvalidProposal => { + warn!("Proposal was invalid: {:?}", proposal_id); + return; + } + status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"), + } } - ProposalStatus::InvalidProposal => { - warn!("Proposal was invalid: {:?}", proposal_id); - return; + ProposalPart::Fin(ProposalFin { proposal_content_id: id }) => { + // Output this along with the ID from batcher, to compare them. + break id; } + _ => panic!("Invalid proposal part: {:?}", prop_part), } - } - // TODO: In the future we will receive a Fin from the network instead of the channel closing. - // We will just send the network Fin out along with what the batcher calculates. + }; let input = SendProposalContentInput { proposal_id, content: SendProposalContent::Finish }; let response = batcher .send_proposal_content(input) .await .unwrap_or_else(|e| panic!("Failed to send Fin to batcher: {proposal_id:?}. {e:?}")); - let id = match response.response { + let response_id = match response.response { ProposalStatus::Finished(id) => id, - ProposalStatus::Processing => { - panic!("Batcher failed to return Fin after all content was sent: {:?}", proposal_id); - } - ProposalStatus::Aborted => { - panic!("Unexpected abort response for proposal: {:?}", proposal_id); - } ProposalStatus::InvalidProposal => { warn!("Proposal was invalid: {:?}", proposal_id); return; } + status => panic!("Unexpected status: for {proposal_id:?}, {status:?}"), }; - let proposal_content_id = BlockHash(id.state_diff_commitment.0.0); + let batcher_block_id = BlockHash(response_id.state_diff_commitment.0.0); info!( - "Finished validating proposal {:?}: content_id = {:?}, num_txs = {:?}, height = {:?}", + "Finished validating proposal {:?}: network_block_id: {:?}, batcher_block_id = {:?}, \ + num_txs = {:?}, height = {:?}", proposal_id, - proposal_content_id, + network_block_id, + batcher_block_id, content.len(), height ); // Update valid_proposals before sending fin to avoid a race condition // with `get_proposal` being called before `valid_proposals` is updated. + // TODO(Matan): Consider validating the ProposalFin signature here. let mut valid_proposals = valid_proposals.lock().unwrap(); - valid_proposals.entry(height).or_default().insert(proposal_content_id, (content, proposal_id)); - if fin_sender.send(proposal_content_id).is_err() { + valid_proposals.entry(height).or_default().insert(batcher_block_id, (content, proposal_id)); + if fin_sender + .send((batcher_block_id, ProposalFin { proposal_content_id: network_block_id })) + .is_err() + { // Consensus may exit early (e.g. sync). - warn!("Failed to send proposal content id"); + warn!("Failed to send proposal content ids"); } } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 8f718df3a0..d148a68a65 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -13,13 +13,22 @@ use papyrus_network::network_manager::test_utils::{ TestSubscriberChannels, }; use papyrus_network::network_manager::BroadcastTopicChannels; -use papyrus_protobuf::consensus::{ProposalInit, ProposalPart, StreamMessage}; +use papyrus_protobuf::consensus::{ + ProposalFin, + ProposalInit, + ProposalPart, + StreamMessage, + TransactionBatch, +}; use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::core::{ContractAddress, StateDiffCommitment}; -use starknet_api::executable_transaction::{AccountTransaction, Transaction}; +use starknet_api::executable_transaction::{ + AccountTransaction, + Transaction as ExecutableTransaction, +}; use starknet_api::hash::PoseidonHash; -use starknet_api::test_utils::invoke::{executable_invoke_tx, InvokeTxArgs}; -use starknet_api::transaction::TransactionHash; +use starknet_api::test_utils::invoke::{executable_invoke_tx, invoke_tx, InvokeTxArgs}; +use starknet_api::transaction::{Transaction, TransactionHash}; use starknet_batcher_types::batcher_types::{ GetProposalContent, GetProposalContentResponse, @@ -43,11 +52,16 @@ const NUM_VALIDATORS: u64 = 4; const STATE_DIFF_COMMITMENT: StateDiffCommitment = StateDiffCommitment(PoseidonHash(Felt::ZERO)); lazy_static! { - static ref TX_BATCH: Vec = vec![generate_invoke_tx(Felt::THREE)]; + static ref TX_BATCH: Vec = + vec![generate_executable_invoke_tx(Felt::THREE)]; +} + +fn generate_invoke_tx() -> Transaction { + Transaction::Invoke(invoke_tx(InvokeTxArgs::default())) } -fn generate_invoke_tx(tx_hash: Felt) -> Transaction { - Transaction::Account(AccountTransaction::Invoke(executable_invoke_tx(InvokeTxArgs { +fn generate_executable_invoke_tx(tx_hash: Felt) -> ExecutableTransaction { + ExecutableTransaction::Account(AccountTransaction::Invoke(executable_invoke_tx(InvokeTxArgs { tx_hash: TransactionHash(tx_hash), ..Default::default() }))) @@ -177,11 +191,26 @@ async fn validate_proposal_success() { context.set_height_and_round(BlockNumber(0), 0).await; let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - content_sender.send(TX_BATCH.clone()).await.unwrap(); + let tx_hash = TX_BATCH.first().unwrap().tx_hash(); + let txs = + TX_BATCH.clone().into_iter().map(starknet_api::transaction::Transaction::from).collect(); + content_sender + .send(ProposalPart::Transactions(TransactionBatch { + transactions: txs, + tx_hashes: vec![tx_hash], + })) + .await + .unwrap(); + content_sender + .send(ProposalPart::Fin(ProposalFin { + proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), + })) + .await + .unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; content_sender.close_channel(); - assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); + assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); } #[tokio::test] @@ -229,12 +258,19 @@ async fn repropose() { // Receive a valid proposal. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - let txs = vec![generate_invoke_tx(Felt::TWO)]; - content_sender.send(txs.clone()).await.unwrap(); + let prop_part = ProposalPart::Transactions(TransactionBatch { + transactions: vec![generate_invoke_tx()], + tx_hashes: vec![TransactionHash(Felt::TWO)], + }); + content_sender.send(prop_part).await.unwrap(); + let prop_part = ProposalPart::Fin(ProposalFin { + proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), + }); + content_sender.send(prop_part).await.unwrap(); let fin_receiver = context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; content_sender.close_channel(); - assert_eq!(fin_receiver.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); + assert_eq!(fin_receiver.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); // Re-proposal: Just asserts this is a known valid proposal. context @@ -299,28 +335,41 @@ async fn proposals_from_different_rounds() { context.set_height_and_round(BlockNumber(0), 0).await; context.set_height_and_round(BlockNumber(0), 1).await; + // Proposal parts sent in the proposals. + let prop_part_txs = ProposalPart::Transactions(TransactionBatch { + transactions: TX_BATCH.clone().into_iter().map(Transaction::from).collect(), + tx_hashes: vec![TX_BATCH[0].tx_hash()], + }); + let prop_part_fin = ProposalPart::Fin(ProposalFin { + proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), + }); + // The proposal from the past round is ignored. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - content_sender.send(TX_BATCH.clone()).await.unwrap(); + content_sender.send(prop_part_txs.clone()).await.unwrap(); + let fin_receiver_past_round = context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; - content_sender.close_channel(); + // No fin was sent, channel remains open. assert!(fin_receiver_past_round.await.is_err()); // The proposal from the current round should be validated. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - content_sender.send(TX_BATCH.clone()).await.unwrap(); + content_sender.send(prop_part_txs.clone()).await.unwrap(); + content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_curr_round = context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await; - content_sender.close_channel(); - assert_eq!(fin_receiver_curr_round.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); + + assert_eq!(fin_receiver_curr_round.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); // The proposal from the future round should not be processed. let (mut content_sender, content_receiver) = mpsc::channel(CHANNEL_SIZE); - content_sender.send(TX_BATCH.clone()).await.unwrap(); + content_sender.send(prop_part_txs.clone()).await.unwrap(); + content_sender.send(prop_part_fin.clone()).await.unwrap(); let fin_receiver_future_round = context.validate_proposal(BlockNumber(0), 2, TIMEOUT, content_receiver).await; content_sender.close_channel(); + // Even with sending fin and closing the channel. assert!(fin_receiver_future_round.now_or_never().is_none()); } @@ -389,14 +438,25 @@ async fn interrupt_active_proposal() { context.validate_proposal(BlockNumber(0), 0, TIMEOUT, content_receiver).await; let (mut content_sender_1, content_receiver) = mpsc::channel(CHANNEL_SIZE); - content_sender_1.send(TX_BATCH.clone()).await.unwrap(); + content_sender_1 + .send(ProposalPart::Transactions(TransactionBatch { + transactions: TX_BATCH.clone().into_iter().map(Transaction::from).collect(), + tx_hashes: vec![TX_BATCH[0].tx_hash()], + })) + .await + .unwrap(); + content_sender_1 + .send(ProposalPart::Fin(ProposalFin { + proposal_content_id: BlockHash(STATE_DIFF_COMMITMENT.0.0), + })) + .await + .unwrap(); let fin_receiver_1 = context.validate_proposal(BlockNumber(0), 1, TIMEOUT, content_receiver).await; - content_sender_1.close_channel(); // Move the context to the next round. context.set_height_and_round(BlockNumber(0), 1).await; // Interrupt active proposal. assert!(fin_receiver_0.await.is_err()); - assert_eq!(fin_receiver_1.await.unwrap().0, STATE_DIFF_COMMITMENT.0.0); + assert_eq!(fin_receiver_1.await.unwrap().0.0, STATE_DIFF_COMMITMENT.0.0); }