From ace69fa8404eb704b504082d324ebc355a3d2948 Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Tue, 8 Oct 2024 14:33:43 -0700 Subject: [PATCH] [Fastpath] execute transactions certified via fastpath (#19638) ## Description - Add support to execute certified transactions from consensus output. - Refactor how consensus commit handler processes blocks from consensus commits, so the logic can be reused for consensus transaction handler. - Small refactors in CommitConsumer and DagState. ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- consensus/core/src/authority_node.rs | 10 +- consensus/core/src/commit.rs | 10 + consensus/core/src/commit_consumer.rs | 40 +- consensus/core/src/commit_observer.rs | 50 ++- consensus/core/src/core.rs | 35 +- consensus/core/src/core_thread.rs | 5 +- consensus/core/src/dag_state.rs | 24 +- consensus/core/src/leader_schedule.rs | 6 + consensus/core/src/leader_scoring.rs | 1 + consensus/core/src/linearizer.rs | 5 + consensus/core/src/test_dag_builder.rs | 3 + crates/sui-core/src/authority.rs | 16 +- .../authority/authority_per_epoch_store.rs | 43 +- crates/sui-core/src/consensus_adapter.rs | 33 -- crates/sui-core/src/consensus_handler.rs | 422 ++++++++++++++---- .../consensus_manager/mysticeti_manager.rs | 33 +- .../consensus_types/consensus_output_api.rs | 112 +++-- crates/sui-core/src/consensus_types/mod.rs | 4 - crates/sui-core/src/consensus_validator.rs | 8 +- crates/sui-core/src/scoring_decision.rs | 6 +- crates/sui-core/src/transaction_manager.rs | 2 +- .../src/unit_tests/consensus_tests.rs | 69 ++- crates/sui-json-rpc/src/error.rs | 7 +- crates/sui-transaction-checks/src/deny.rs | 2 +- crates/sui-types/src/error.rs | 4 +- 25 files changed, 677 insertions(+), 273 deletions(-) diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index b40b65a894336..46d989535b086 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -426,7 +426,7 @@ mod tests { use std::{collections::BTreeSet, sync::Arc, time::Duration}; use consensus_config::{local_committee_and_keys, Parameters}; - use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver}; + use mysten_metrics::monitored_mpsc::UnboundedReceiver; use prometheus::Registry; use rstest::rstest; use sui_protocol_config::ProtocolConfig; @@ -457,8 +457,7 @@ mod tests { let protocol_keypair = keypairs[own_index].1.clone(); let network_keypair = keypairs[own_index].0.clone(); - let (sender, _receiver) = unbounded_channel("consensus_output"); - let commit_consumer = CommitConsumer::new(sender, 0); + let (commit_consumer, _, _) = CommitConsumer::new(0); let authority = ConsensusAuthority::start( network_type, @@ -735,8 +734,7 @@ mod tests { let protocol_keypair = keypairs[index].1.clone(); let network_keypair = keypairs[index].0.clone(); - let (sender, receiver) = unbounded_channel("consensus_output"); - let commit_consumer = CommitConsumer::new(sender, 0); + let (commit_consumer, commit_receiver, _) = CommitConsumer::new(0); let authority = ConsensusAuthority::start( network_type, @@ -753,6 +751,6 @@ mod tests { ) .await; - (authority, receiver) + (authority, commit_receiver) } } diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index d40ee0a5a4ced..458d90aff2199 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -19,6 +19,7 @@ use crate::{ block::{BlockAPI, BlockRef, BlockTimestampMs, Round, Slot, VerifiedBlock}, leader_scoring::ReputationScores, storage::Store, + TransactionIndex, }; /// Index of a commit among all consensus commits. @@ -109,6 +110,7 @@ pub(crate) struct CommitV1 { leader: BlockRef, /// Refs to committed blocks, in the commit order. blocks: Vec, + // TODO(fastpath): record rejected transactions. } impl CommitAPI for CommitV1 { @@ -293,6 +295,8 @@ pub struct CommittedSubDag { pub leader: BlockRef, /// All the committed blocks that are part of this sub-dag pub blocks: Vec, + /// Indices of rejected transactions in each block. + pub rejected_transactions_by_block: Vec>, /// The timestamp of the commit, obtained from the timestamp of the leader block. pub timestamp_ms: BlockTimestampMs, /// The reference of the commit. @@ -309,13 +313,16 @@ impl CommittedSubDag { pub fn new( leader: BlockRef, blocks: Vec, + rejected_transactions_by_block: Vec>, timestamp_ms: BlockTimestampMs, commit_ref: CommitRef, reputation_scores_desc: Vec<(AuthorityIndex, u64)>, ) -> Self { + assert_eq!(blocks.len(), rejected_transactions_by_block.len()); Self { leader, blocks, + rejected_transactions_by_block, timestamp_ms, commit_ref, reputation_scores_desc, @@ -386,11 +393,14 @@ pub fn load_committed_subdag_from_store( commit_block }) .collect::>(); + // TODO(fastpath): recover rejected transaction indices from commit. + let rejected_transactions = vec![vec![]; blocks.len()]; let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag"); let leader_block_ref = blocks[leader_block_idx].reference(); CommittedSubDag::new( leader_block_ref, blocks, + rejected_transactions, commit.timestamp_ms(), commit.reference(), reputation_scores_desc, diff --git a/consensus/core/src/commit_consumer.rs b/consensus/core/src/commit_consumer.rs index 2d06208d03964..adee1c716def1 100644 --- a/consensus/core/src/commit_consumer.rs +++ b/consensus/core/src/commit_consumer.rs @@ -4,13 +4,20 @@ use std::sync::{Arc, RwLock}; use tokio::sync::watch; -use mysten_metrics::monitored_mpsc::UnboundedSender; +use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use crate::{CommitIndex, CommittedSubDag}; +use crate::{CommitIndex, CommittedSubDag, TransactionIndex, VerifiedBlock}; +#[derive(Clone)] pub struct CommitConsumer { - // A channel to send the committed sub dags through - pub(crate) sender: UnboundedSender, + // A channel to output the committed sub dags. + pub(crate) commit_sender: UnboundedSender, + // A channel to output certified and rejected transactions by batches of blocks. + // Each tuple contains the block containing transactions, and indices of rejected transactions. + // In each block, transactions that are not rejected are considered certified. + // Batches of blocks are sent together, to improve efficiency. + #[allow(unused)] + pub(crate) transaction_sender: UnboundedSender)>>, // Index of the last commit that the consumer has processed. This is useful for // crash/recovery so mysticeti can replay the commits from the next index. // First commit in the replayed sequence will have index last_processed_commit_index + 1. @@ -22,15 +29,26 @@ pub struct CommitConsumer { impl CommitConsumer { pub fn new( - sender: UnboundedSender, last_processed_commit_index: CommitIndex, - ) -> Self { + ) -> ( + Self, + UnboundedReceiver, + UnboundedReceiver)>>, + ) { + let (commit_sender, commit_receiver) = unbounded_channel("consensus_output"); + let (transaction_sender, transaction_receiver) = unbounded_channel("consensus_certified"); + let monitor = Arc::new(CommitConsumerMonitor::new(last_processed_commit_index)); - Self { - sender, - last_processed_commit_index, - monitor, - } + ( + Self { + commit_sender, + transaction_sender, + last_processed_commit_index, + monitor, + }, + commit_receiver, + transaction_receiver, + ) } pub fn monitor(&self) -> Arc { diff --git a/consensus/core/src/commit_observer.rs b/consensus/core/src/commit_observer.rs index a25113de387fb..9196235c3b934 100644 --- a/consensus/core/src/commit_observer.rs +++ b/consensus/core/src/commit_observer.rs @@ -55,7 +55,7 @@ impl CommitObserver { let mut observer = Self { context, commit_interpreter: Linearizer::new(dag_state.clone(), leader_schedule.clone()), - sender: commit_consumer.sender, + sender: commit_consumer.commit_sender, store, leader_schedule, }; @@ -207,7 +207,7 @@ impl CommitObserver { #[cfg(test)] mod tests { - use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver}; + use mysten_metrics::monitored_mpsc::UnboundedReceiver; use parking_lot::RwLock; use super::*; @@ -227,7 +227,8 @@ mod tests { mem_store.clone(), ))); let last_processed_commit_index = 0; - let (sender, mut receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, mut commit_receiver, _transaction_receiver) = + CommitConsumer::new(last_processed_commit_index); let leader_schedule = Arc::new(LeaderSchedule::from_store( context.clone(), @@ -236,7 +237,7 @@ mod tests { let mut observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender, last_processed_commit_index), + commit_consumer, dag_state.clone(), mem_store.clone(), leader_schedule, @@ -289,7 +290,7 @@ mod tests { // Check commits sent over consensus output channel is accurate let mut processed_subdag_index = 0; - while let Ok(subdag) = receiver.try_recv() { + while let Ok(subdag) = commit_receiver.try_recv() { assert_eq!(subdag, commits[processed_subdag_index]); assert_eq!(subdag.reputation_scores_desc, vec![]); processed_subdag_index = subdag.commit_ref.index as usize; @@ -299,7 +300,7 @@ mod tests { } assert_eq!(processed_subdag_index, leaders.len()); - verify_channel_empty(&mut receiver); + verify_channel_empty(&mut commit_receiver); // Check commits have been persisted to storage let last_commit = mem_store.read_last_commit().unwrap().unwrap(); @@ -326,8 +327,8 @@ mod tests { mem_store.clone(), ))); let last_processed_commit_index = 0; - let (sender, mut receiver) = unbounded_channel("consensus_output"); - + let (commit_consumer, mut commit_receiver, _transaction_receiver) = + CommitConsumer::new(last_processed_commit_index); let leader_schedule = Arc::new(LeaderSchedule::from_store( context.clone(), dag_state.clone(), @@ -335,7 +336,7 @@ mod tests { let mut observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), last_processed_commit_index), + commit_consumer, dag_state.clone(), mem_store.clone(), leader_schedule.clone(), @@ -370,7 +371,7 @@ mod tests { // Check commits sent over consensus output channel is accurate let mut processed_subdag_index = 0; - while let Ok(subdag) = receiver.try_recv() { + while let Ok(subdag) = commit_receiver.try_recv() { tracing::info!("Processed {subdag}"); assert_eq!(subdag, commits[processed_subdag_index]); assert_eq!(subdag.reputation_scores_desc, vec![]); @@ -381,7 +382,7 @@ mod tests { } assert_eq!(processed_subdag_index, expected_last_processed_index); - verify_channel_empty(&mut receiver); + verify_channel_empty(&mut commit_receiver); // Check last stored commit is correct let last_commit = mem_store.read_last_commit().unwrap().unwrap(); @@ -406,7 +407,7 @@ mod tests { ); let expected_last_sent_index = num_rounds as usize; - while let Ok(subdag) = receiver.try_recv() { + while let Ok(subdag) = commit_receiver.try_recv() { tracing::info!("{subdag} was sent but not processed by consumer"); assert_eq!(subdag, commits[processed_subdag_index]); assert_eq!(subdag.reputation_scores_desc, vec![]); @@ -417,7 +418,7 @@ mod tests { } assert_eq!(processed_subdag_index, expected_last_sent_index); - verify_channel_empty(&mut receiver); + verify_channel_empty(&mut commit_receiver); // Check last stored commit is correct. We should persist the last commit // that was sent over the channel regardless of how the consumer handled @@ -427,9 +428,11 @@ mod tests { // Re-create commit observer starting from index 2 which represents the // last processed index from the consumer over consensus output channel + let (commit_consumer, mut commit_receiver, _transaction_receiver) = + CommitConsumer::new(expected_last_processed_index as CommitIndex); let _observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender, expected_last_processed_index as CommitIndex), + commit_consumer, dag_state.clone(), mem_store.clone(), leader_schedule, @@ -438,7 +441,7 @@ mod tests { // Check commits sent over consensus output channel is accurate starting // from last processed index of 2 and finishing at last sent index of 3. processed_subdag_index = expected_last_processed_index; - while let Ok(subdag) = receiver.try_recv() { + while let Ok(subdag) = commit_receiver.try_recv() { tracing::info!("Processed {subdag} on resubmission"); assert_eq!(subdag, commits[processed_subdag_index]); assert_eq!(subdag.reputation_scores_desc, vec![]); @@ -449,7 +452,7 @@ mod tests { } assert_eq!(processed_subdag_index, expected_last_sent_index); - verify_channel_empty(&mut receiver); + verify_channel_empty(&mut commit_receiver); } #[tokio::test] @@ -463,7 +466,8 @@ mod tests { mem_store.clone(), ))); let last_processed_commit_index = 0; - let (sender, mut receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, mut commit_receiver, _transaction_receiver) = + CommitConsumer::new(last_processed_commit_index); let leader_schedule = Arc::new(LeaderSchedule::from_store( context.clone(), @@ -472,7 +476,7 @@ mod tests { let mut observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), last_processed_commit_index), + commit_consumer, dag_state.clone(), mem_store.clone(), leader_schedule.clone(), @@ -499,7 +503,7 @@ mod tests { // Check commits sent over consensus output channel is accurate let mut processed_subdag_index = 0; - while let Ok(subdag) = receiver.try_recv() { + while let Ok(subdag) = commit_receiver.try_recv() { tracing::info!("Processed {subdag}"); assert_eq!(subdag, commits[processed_subdag_index]); assert_eq!(subdag.reputation_scores_desc, vec![]); @@ -510,7 +514,7 @@ mod tests { } assert_eq!(processed_subdag_index, expected_last_processed_index); - verify_channel_empty(&mut receiver); + verify_channel_empty(&mut commit_receiver); // Check last stored commit is correct let last_commit = mem_store.read_last_commit().unwrap().unwrap(); @@ -521,9 +525,11 @@ mod tests { // Re-create commit observer starting from index 3 which represents the // last processed index from the consumer over consensus output channel + let (commit_consumer, mut commit_receiver, _transaction_receiver) = + CommitConsumer::new(expected_last_processed_index as CommitIndex); let _observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender, expected_last_processed_index as CommitIndex), + commit_consumer, dag_state.clone(), mem_store.clone(), leader_schedule, @@ -531,7 +537,7 @@ mod tests { // No commits should be resubmitted as consensus store's last commit index // is equal to last processed index by consumer - verify_channel_empty(&mut receiver); + verify_channel_empty(&mut commit_receiver); } /// After receiving all expected subdags, ensure channel is empty diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index cef5242c2a525..c4dd94f3a3b9d 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -8,7 +8,7 @@ use consensus_config::{local_committee_and_keys, Stake}; use consensus_config::{AuthorityIndex, ProtocolKeyPair}; use itertools::Itertools as _; #[cfg(test)] -use mysten_metrics::monitored_mpsc::{unbounded_channel, UnboundedReceiver}; +use mysten_metrics::monitored_mpsc::UnboundedReceiver; use mysten_metrics::monitored_scope; use parking_lot::RwLock; use sui_macros::fail_point; @@ -956,10 +956,10 @@ impl CoreTextFixture { // Need at least one subscriber to the block broadcast channel. let block_receiver = signal_receivers.block_broadcast_receiver(); - let (commit_sender, commit_receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(commit_sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), @@ -995,7 +995,6 @@ mod test { use std::{collections::BTreeSet, time::Duration}; use consensus_config::{AuthorityIndex, Parameters}; - use mysten_metrics::monitored_mpsc::unbounded_channel; use sui_protocol_config::ProtocolConfig; use tokio::time::sleep; @@ -1055,10 +1054,10 @@ mod test { dag_state.clone(), )); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), @@ -1173,10 +1172,10 @@ mod test { dag_state.clone(), )); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), @@ -1270,10 +1269,10 @@ mod test { dag_state.clone(), )); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), @@ -1375,10 +1374,10 @@ mod test { // Need at least one subscriber to the block broadcast channel. let _block_receiver = signal_receivers.block_broadcast_receiver(); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), @@ -1464,10 +1463,10 @@ mod test { // Need at least one subscriber to the block broadcast channel. let _block_receiver = signal_receivers.block_broadcast_receiver(); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), @@ -1652,10 +1651,10 @@ mod test { // Need at least one subscriber to the block broadcast channel. let _block_receiver = signal_receivers.block_broadcast_receiver(); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), @@ -1717,10 +1716,10 @@ mod test { // Need at least one subscriber to the block broadcast channel. let _block_receiver = signal_receivers.block_broadcast_receiver(); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store.clone(), leader_schedule.clone(), diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index 63cc514d77e29..5b2a83748672e 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -390,7 +390,6 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher { #[cfg(test)] mod test { - use mysten_metrics::monitored_mpsc::unbounded_channel; use parking_lot::RwLock; use super::*; @@ -423,14 +422,14 @@ mod test { let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone()); let (signals, signal_receivers) = CoreSignals::new(context.clone()); let _block_receiver = signal_receivers.block_broadcast_receiver(); - let (sender, _receiver) = unbounded_channel("consensus_output"); + let (commit_consumer, _commit_receiver, _transaction_receiver) = CommitConsumer::new(0); let leader_schedule = Arc::new(LeaderSchedule::from_store( context.clone(), dag_state.clone(), )); let commit_observer = CommitObserver::new( context.clone(), - CommitConsumer::new(sender.clone(), 0), + commit_consumer, dag_state.clone(), store, leader_schedule.clone(), diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index a8adc047ae4c2..0802808a10c83 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -46,9 +46,9 @@ pub(crate) struct DagState { // Note: all uncommitted blocks are kept in memory. recent_blocks: BTreeMap, - // Contains block refs of recent_blocks. - // Each element in the Vec corresponds to the authority with the index. - recent_refs: Vec>, + // Indexes recent block refs by their authorities. + // Vec position corresponds to the authority index. + recent_refs_by_authority: Vec>, // Highest round of blocks accepted. highest_accepted_round: Round, @@ -163,7 +163,7 @@ impl DagState { context, genesis, recent_blocks: BTreeMap::new(), - recent_refs: vec![BTreeSet::new(); num_authorities], + recent_refs_by_authority: vec![BTreeSet::new(); num_authorities], highest_accepted_round: 0, last_commit, last_commit_round_advancement_time: None, @@ -245,7 +245,7 @@ impl DagState { fn update_block_metadata(&mut self, block: &VerifiedBlock) { let block_ref = block.reference(); self.recent_blocks.insert(block_ref, block.clone()); - self.recent_refs[block_ref.author].insert(block_ref); + self.recent_refs_by_authority[block_ref.author].insert(block_ref); self.highest_accepted_round = max(self.highest_accepted_round, block.round()); self.context .metrics @@ -253,7 +253,7 @@ impl DagState { .highest_accepted_round .set(self.highest_accepted_round as i64); - let highest_accepted_round_for_author = self.recent_refs[block_ref.author] + let highest_accepted_round_for_author = self.recent_refs_by_authority[block_ref.author] .last() .map(|block_ref| block_ref.round) .expect("There should be by now at least one block ref"); @@ -414,7 +414,7 @@ impl DagState { /// Retrieves the last block proposed for the specified `authority`. If no block is found in cache /// then the genesis block is returned as no other block has been received from that authority. pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock { - if let Some(last) = self.recent_refs[authority].last() { + if let Some(last) = self.recent_refs_by_authority[authority].last() { return self .recent_blocks .get(last) @@ -442,7 +442,7 @@ impl DagState { start: Round, ) -> Vec { let mut blocks = vec![]; - for block_ref in self.recent_refs[authority].range(( + for block_ref in self.recent_refs_by_authority[authority].range(( Included(BlockRef::new(start, authority, BlockDigest::MIN)), Unbounded, )) { @@ -477,7 +477,7 @@ impl DagState { return blocks; } - for (authority_index, block_refs) in self.recent_refs.iter().enumerate() { + for (authority_index, block_refs) in self.recent_refs_by_authority.iter().enumerate() { let authority_index = self .context .committee @@ -527,7 +527,7 @@ impl DagState { ); } - let mut result = self.recent_refs[slot.authority].range(( + let mut result = self.recent_refs_by_authority[slot.authority].range(( Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MIN)), Included(BlockRef::new(slot.round, slot.authority, BlockDigest::MAX)), )); @@ -541,7 +541,7 @@ impl DagState { let mut missing = Vec::new(); for (index, block_ref) in block_refs.into_iter().enumerate() { - let recent_refs = &self.recent_refs[block_ref.author]; + let recent_refs = &self.recent_refs_by_authority[block_ref.author]; if recent_refs.contains(&block_ref) || self.genesis.contains_key(&block_ref) { exist[index] = true; } else if recent_refs.is_empty() || recent_refs.last().unwrap().round < block_ref.round @@ -804,7 +804,7 @@ impl DagState { // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted. let mut total_recent_refs = 0; for (authority_refs, last_committed_round) in self - .recent_refs + .recent_refs_by_authority .iter_mut() .zip(self.last_committed_rounds.iter()) { diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index 756cc9361d86b..2c85afc93800f 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -888,6 +888,7 @@ mod tests { let unscored_subdags = vec![CommittedSubDag::new( BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN), vec![], + vec![], context.clock.timestamp_utc_ms(), CommitRef::new(1, CommitDigest::MIN), vec![], @@ -969,6 +970,7 @@ mod tests { let leader_block = leader.unwrap(); let leader_ref = leader_block.reference(); let commit_index = 1; + let rejected_transactions = vec![vec![]; blocks.len()]; let last_commit = TrustedCommit::new_for_test( commit_index, @@ -984,6 +986,7 @@ mod tests { let unscored_subdags = vec![CommittedSubDag::new( leader_ref, blocks, + rejected_transactions, context.clock.timestamp_utc_ms(), last_commit.reference(), vec![], @@ -1535,6 +1538,7 @@ mod tests { let unscored_subdags = vec![CommittedSubDag::new( BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN), vec![], + vec![], context.clock.timestamp_utc_ms(), CommitRef::new(1, CommitDigest::MIN), vec![], @@ -1622,6 +1626,7 @@ mod tests { let leader_block = leader.unwrap(); let leader_ref = leader_block.reference(); let commit_index = 1; + let rejected_transactions = vec![vec![]; blocks.len()]; let last_commit = TrustedCommit::new_for_test( commit_index, @@ -1637,6 +1642,7 @@ mod tests { let unscored_subdags = vec![CommittedSubDag::new( leader_ref, blocks, + rejected_transactions, context.clock.timestamp_utc_ms(), last_commit.reference(), vec![], diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index c7b151607cd01..89803a4a273f3 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -692,6 +692,7 @@ mod tests { let unscored_subdags = vec![CommittedSubDag::new( BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN), blocks, + vec![], context.clock.timestamp_utc_ms(), CommitRef::new(1, CommitDigest::MIN), vec![], diff --git a/consensus/core/src/linearizer.rs b/consensus/core/src/linearizer.rs index cc0254b288e35..f362d7ef2f7fc 100644 --- a/consensus/core/src/linearizer.rs +++ b/consensus/core/src/linearizer.rs @@ -107,6 +107,10 @@ impl Linearizer { // Sort the blocks of the sub-dag blocks sort_sub_dag_blocks(&mut to_commit); + // TODO(fastpath): determine rejected transactions from voting. + // Get rejected transactions. + let rejected_transactions = vec![vec![]; to_commit.len()]; + // Create the Commit. let commit = Commit::new( last_commit_index + 1, @@ -127,6 +131,7 @@ impl Linearizer { let sub_dag = CommittedSubDag::new( leader_block_ref, to_commit, + rejected_transactions, timestamp_ms, commit.reference(), reputation_scores_desc, diff --git a/consensus/core/src/test_dag_builder.rs b/consensus/core/src/test_dag_builder.rs index d669ed50d42f3..347321947c879 100644 --- a/consensus/core/src/test_dag_builder.rs +++ b/consensus/core/src/test_dag_builder.rs @@ -161,6 +161,8 @@ impl DagBuilder { sort_sub_dag_blocks(&mut to_commit); + let rejected_transactions = vec![vec![]; to_commit.len()]; + let commit = TrustedCommit::new_for_test( commit_index, CommitDigest::MIN, @@ -175,6 +177,7 @@ impl DagBuilder { let sub_dag = CommittedSubDag::new( leader_block_ref, to_commit, + rejected_transactions, timestamp_ms, commit.reference(), vec![], diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 84cc36b3e75f0..b0a47df5de3d5 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -278,7 +278,7 @@ pub struct AuthorityMetrics { post_processing_total_tx_had_event_processed: IntCounter, post_processing_total_failures: IntCounter, - /// Consensus handler metrics + /// Consensus commit and transaction handler metrics pub consensus_handler_processed: IntCounterVec, pub consensus_handler_transaction_sizes: HistogramVec, pub consensus_handler_num_low_scoring_authorities: IntGauge, @@ -292,6 +292,8 @@ pub struct AuthorityMetrics { pub consensus_committed_user_transactions: IntGaugeVec, pub consensus_calculated_throughput: IntGauge, pub consensus_calculated_throughput_profile: IntGauge, + pub consensus_transaction_handler_processed: IntCounterVec, + pub consensus_transaction_handler_fastpath_executions: IntCounter, pub limits_metrics: Arc, @@ -759,6 +761,17 @@ impl AuthorityMetrics { "The current active calculated throughput profile", registry ).unwrap(), + consensus_transaction_handler_processed: register_int_counter_vec_with_registry!( + "consensus_transaction_handler_processed", + "Number of transactions processed by consensus transaction handler, by whether they are certified or rejected.", + &["outcome"], + registry + ).unwrap(), + consensus_transaction_handler_fastpath_executions: register_int_counter_with_registry!( + "consensus_transaction_handler_fastpath_executions", + "Number of fastpath transactions sent for execution by consensus transaction handler", + registry, + ).unwrap(), execution_queueing_latency: LatencyObserver::new(), txn_ready_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))), execution_rate_tracker: Arc::new(Mutex::new(RateTracker::new(Duration::from_secs(10)))), @@ -1196,6 +1209,7 @@ impl AuthorityState { self.metrics.total_cert_attempts.inc(); + // TODO(fastpath): use a separate function to check if a transaction should be executed in fastpath. if !certificate.contains_shared_object() { // Shared object transactions need to be sequenced by the consensus before enqueueing // for execution, done in AuthorityPerEpochStore::handle_consensus_transaction(). diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index 25e4c1a341472..32586fb90bdb4 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -1878,20 +1878,26 @@ impl AuthorityPerEpochStore { .pending_consensus_transactions .multi_insert(key_value_pairs)?; - // TODO: lock once for all insert() calls. - for transaction in transactions { - if let ConsensusTransactionKind::CertifiedTransaction(cert) = &transaction.kind { - let state = lock.expect("Must pass reconfiguration lock when storing certificate"); - // Caller is responsible for performing graceful check - assert!( - state.should_accept_user_certs(), - "Reconfiguration state should allow accepting user transactions" - ); - self.pending_consensus_certificates - .write() - .insert(*cert.digest()); - } + // UserTransaction exists only when mysticeti_fastpath is enabled in protocol config. + let digests: Vec<_> = transactions + .iter() + .filter_map(|tx| match &tx.kind { + ConsensusTransactionKind::CertifiedTransaction(cert) => Some(cert.digest()), + ConsensusTransactionKind::UserTransaction(txn) => Some(txn.digest()), + _ => None, + }) + .collect(); + if !digests.is_empty() { + let state = lock.expect("Must pass reconfiguration lock when storing certificate"); + // Caller is responsible for performing graceful check + assert!( + state.should_accept_user_certs(), + "Reconfiguration state should allow accepting user transactions" + ); + let mut pending_consensus_certificates = self.pending_consensus_certificates.write(); + pending_consensus_certificates.extend(digests); } + Ok(()) } @@ -2768,7 +2774,7 @@ impl AuthorityPerEpochStore { .collect(); let ( - transactions_to_schedule, + verified_transactions, notifications, lock, final_round, @@ -2790,10 +2796,7 @@ impl AuthorityPerEpochStore { authority_metrics, ) .await?; - self.finish_consensus_certificate_process_with_batch( - &mut output, - &transactions_to_schedule, - )?; + self.finish_consensus_certificate_process_with_batch(&mut output, &verified_transactions)?; output.record_consensus_commit_stats(consensus_stats.clone()); // Create pending checkpoints if we are still accepting tx. @@ -2901,7 +2904,7 @@ impl AuthorityPerEpochStore { self.record_end_of_message_quorum_time_metric(); } - Ok(transactions_to_schedule) + Ok(verified_transactions) } // Adds the consensus commit prologue transaction to the beginning of input `transactions` to update @@ -3682,7 +3685,7 @@ impl AuthorityPerEpochStore { kind: ConsensusTransactionKind::UserTransaction(_tx), .. }) => { - // TODO: implement handling of unsigned user transactions. + // TODO(fastpath): implement handling of user transactions from consensus commits. Ok(ConsensusCertificateResult::Ignored) } SequencedConsensusTransactionKind::System(system_transaction) => { diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index acf7f550bc6e6..7a3e451402d46 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use arc_swap::{ArcSwap, ArcSwapOption}; -use bytes::Bytes; use dashmap::try_result::TryResult; use dashmap::DashMap; use futures::future::{select, Either}; @@ -10,7 +9,6 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::{pin_mut, StreamExt}; use itertools::Itertools; -use narwhal_types::{TransactionProto, TransactionsClient}; use parking_lot::RwLockReadGuard; use prometheus::Histogram; use prometheus::HistogramVec; @@ -34,7 +32,6 @@ use sui_types::base_types::TransactionDigest; use sui_types::committee::Committee; use sui_types::error::{SuiError, SuiResult}; -use tap::prelude::*; use tokio::sync::{Semaphore, SemaphorePermit}; use tokio::task::JoinHandle; use tokio::time::{self}; @@ -196,36 +193,6 @@ pub trait SubmitToConsensus: Sync + Send + 'static { epoch_store: &Arc, ) -> SuiResult; } - -#[async_trait::async_trait] -impl SubmitToConsensus for TransactionsClient { - async fn submit_to_consensus( - &self, - transactions: &[ConsensusTransaction], - _epoch_store: &Arc, - ) -> SuiResult { - let transactions_bytes = transactions - .iter() - .map(|t| { - let serialized = - bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail"); - Bytes::from(serialized) - }) - .collect::>(); - self.clone() - .submit_transaction(TransactionProto { - transactions: transactions_bytes, - }) - .await - .map_err(|e| SuiError::ConsensusConnectionBroken(format!("{:?}", e))) - .tap_err(|r| { - // Will be logged by caller as well. - warn!("Submit transaction failed with: {:?}", r); - })?; - Ok(()) - } -} - /// Submit Sui certificates to the consensus. pub struct ConsensusAdapter { /// The network client connecting to the consensus node of this authority. diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index bc407592631db..d00b8273f1d02 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -10,9 +10,13 @@ use std::{ use arc_swap::ArcSwap; use consensus_config::Committee as ConsensusCommittee; -use consensus_core::CommitConsumerMonitor; +use consensus_core::{CommitConsumerMonitor, TransactionIndex, VerifiedBlock}; use lru::LruCache; -use mysten_metrics::{monitored_mpsc::UnboundedReceiver, monitored_scope, spawn_monitored_task}; +use mysten_metrics::{ + monitored_future, + monitored_mpsc::{self, UnboundedReceiver}, + monitored_scope, spawn_monitored_task, +}; use serde::{Deserialize, Serialize}; use sui_macros::{fail_point_async, fail_point_if}; use sui_protocol_config::ProtocolConfig; @@ -21,10 +25,13 @@ use sui_types::{ base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest}, digests::ConsensusCommitDigest, executable_transaction::{TrustedExecutableTransaction, VerifiedExecutableTransaction}, - messages_consensus::{ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind}, + messages_consensus::{ + AuthorityIndex, ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind, + }, sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait, transaction::{SenderSignedData, VerifiedTransaction}, }; +use tokio::task::JoinSet; use tracing::{debug, error, info, instrument, trace_span, warn}; use crate::{ @@ -38,7 +45,9 @@ use crate::{ }, checkpoints::{CheckpointService, CheckpointServiceNotify}, consensus_throughput_calculator::ConsensusThroughputCalculator, - consensus_types::{consensus_output_api::ConsensusOutputAPI, AuthorityIndex}, + consensus_types::consensus_output_api::{ + parse_block_transactions, ConsensusCommitAPI, ParsedTransaction, + }, execution_cache::ObjectCacheRead, scoring_decision::update_low_scoring_authorities, transaction_manager::TransactionManager, @@ -69,7 +78,8 @@ impl ConsensusHandlerInitializer { } } - pub fn new_for_testing( + #[cfg(test)] + pub(crate) fn new_for_testing( state: Arc, checkpoint_service: Arc, ) -> Self { @@ -85,7 +95,7 @@ impl ConsensusHandlerInitializer { } } - pub fn new_consensus_handler(&self) -> ConsensusHandler { + pub(crate) fn new_consensus_handler(&self) -> ConsensusHandler { let new_epoch_start_state = self.epoch_store.epoch_start_state(); let consensus_committee = new_epoch_start_state.get_consensus_committee(); @@ -100,6 +110,10 @@ impl ConsensusHandlerInitializer { self.throughput_calculator.clone(), ) } + + pub(crate) fn metrics(&self) -> &Arc { + &self.state.metrics + } } pub struct ConsensusHandler { @@ -122,7 +136,8 @@ pub struct ConsensusHandler { metrics: Arc, /// Lru cache to quickly discard transactions processed by consensus processed_cache: LruCache, - transaction_scheduler: AsyncTransactionScheduler, + /// Enqueues transactions to the transaction manager via a separate task. + transaction_manager_sender: TransactionManagerSender, /// Using the throughput calculator to record the current consensus throughput throughput_calculator: Arc, } @@ -148,8 +163,8 @@ impl ConsensusHandler { if !last_consensus_stats.stats.is_initialized() { last_consensus_stats.stats = ConsensusStats::new(committee.size()); } - let transaction_scheduler = - AsyncTransactionScheduler::start(transaction_manager, epoch_store.clone()); + let transaction_manager_sender = + TransactionManagerSender::start(transaction_manager, epoch_store.clone()); Self { epoch_store, last_consensus_stats, @@ -159,31 +174,29 @@ impl ConsensusHandler { committee, metrics, processed_cache: LruCache::new(NonZeroUsize::new(PROCESSED_CACHE_CAP).unwrap()), - transaction_scheduler, + transaction_manager_sender, throughput_calculator, } } /// Returns the last subdag index processed by the handler. - pub fn last_processed_subdag_index(&self) -> u64 { + pub(crate) fn last_processed_subdag_index(&self) -> u64 { self.last_consensus_stats.index.sub_dag_index } + + pub(crate) fn transaction_manager_sender(&self) -> &TransactionManagerSender { + &self.transaction_manager_sender + } } impl ConsensusHandler { #[instrument(level = "debug", skip_all)] - async fn handle_consensus_output(&mut self, consensus_output: impl ConsensusOutputAPI) { + async fn handle_consensus_commit(&mut self, consensus_commit: impl ConsensusCommitAPI) { let _scope = monitored_scope("HandleConsensusOutput"); - // This code no longer supports old protocol versions. - assert!(self - .epoch_store - .protocol_config() - .consensus_order_end_of_epoch_last()); - let last_committed_round = self.last_consensus_stats.index.last_committed_round; - let round = consensus_output.leader_round(); + let round = consensus_commit.leader_round(); // TODO: Remove this once narwhal is deprecated. For now mysticeti will not return // more than one leader per round so we are not in danger of ignoring any commits. @@ -199,11 +212,11 @@ impl ConsensusHandler { return; } - /* (serialized, transaction, output_cert) */ + /* (transaction, serialized length) */ let mut transactions = vec![]; - let timestamp = consensus_output.commit_timestamp_ms(); - let leader_author = consensus_output.leader_author_index(); - let commit_sub_dag_index = consensus_output.commit_sub_dag_index(); + let timestamp = consensus_commit.commit_timestamp_ms(); + let leader_author = consensus_commit.leader_author_index(); + let commit_sub_dag_index = consensus_commit.commit_sub_dag_index(); let epoch_start = self .epoch_store @@ -219,7 +232,7 @@ impl ConsensusHandler { }; info!( - %consensus_output, + %consensus_commit, epoch = ?self.epoch_store.epoch(), "Received consensus output" ); @@ -269,7 +282,7 @@ impl ConsensusHandler { self.low_scoring_authorities.clone(), self.epoch_store.committee(), &self.committee, - consensus_output.reputation_score_sorted_desc(), + consensus_commit.reputation_score_sorted_desc(), &self.metrics, self.epoch_store .protocol_config() @@ -284,13 +297,18 @@ impl ConsensusHandler { { let span = trace_span!("process_consensus_certs"); let _guard = span.enter(); - for (authority_index, authority_transactions) in consensus_output.transactions() { + for (authority_index, parsed_transactions) in consensus_commit.transactions() { // TODO: consider only messages within 1~3 rounds of the leader? self.last_consensus_stats .stats .inc_num_messages(authority_index as usize); - for (transaction, serialized_len) in authority_transactions { - let kind = classify(&transaction); + for parsed in parsed_transactions { + // Skip executing rejected transactions. Unlocking is the responsibility of the + // consensus transaction handler. + if parsed.rejected { + continue; + } + let kind = classify(&parsed.transaction); self.metrics .consensus_handler_processed .with_label_values(&[kind]) @@ -298,22 +316,25 @@ impl ConsensusHandler { self.metrics .consensus_handler_transaction_sizes .with_label_values(&[kind]) - .observe(serialized_len as f64); + .observe(parsed.serialized_len as f64); + // UserTransaction exists only when mysticeti_fastpath is enabled in protocol config. if matches!( - &transaction.kind, + &parsed.transaction.kind, ConsensusTransactionKind::CertifiedTransaction(_) + | ConsensusTransactionKind::UserTransaction(_) ) { self.last_consensus_stats .stats .inc_num_user_transactions(authority_index as usize); } if let ConsensusTransactionKind::RandomnessStateUpdate(randomness_round, _) = - &transaction.kind + &parsed.transaction.kind { // These are deprecated and we should never see them. Log an error and eat the tx if one appears. error!("BUG: saw deprecated RandomnessStateUpdate tx for commit round {round:?}, randomness round {randomness_round:?}") } else { - let transaction = SequencedConsensusTransactionKind::External(transaction); + let transaction = + SequencedConsensusTransactionKind::External(parsed.transaction); transactions.push((transaction, authority_index)); } } @@ -383,14 +404,14 @@ impl ConsensusHandler { } } - let transactions_to_schedule = self + let executable_transactions = self .epoch_store .process_consensus_transactions_and_commit_boundary( all_transactions, &self.last_consensus_stats, &self.checkpoint_service, self.cache_reader.as_ref(), - &ConsensusCommitInfo::new(self.epoch_store.protocol_config(), &consensus_output), + &ConsensusCommitInfo::new(self.epoch_store.protocol_config(), &consensus_commit), &self.metrics, ) .await @@ -398,7 +419,7 @@ impl ConsensusHandler { // update the calculated throughput self.throughput_calculator - .add_transactions(timestamp, transactions_to_schedule.len() as u64); + .add_transactions(timestamp, executable_transactions.len() as u64); fail_point_if!("correlated-crash-after-consensus-commit-boundary", || { let key = [commit_sub_dag_index, self.epoch_store.epoch()]; @@ -409,32 +430,35 @@ impl ConsensusHandler { fail_point_async!("crash"); // for tests that produce random crashes - self.transaction_scheduler - .schedule(transactions_to_schedule) - .await; + self.transaction_manager_sender + .send(executable_transactions); } } -struct AsyncTransactionScheduler { - sender: tokio::sync::mpsc::Sender>, +/// Sends transactions to the transaction manager in a separate task, +/// to avoid blocking consensus handler. +#[derive(Clone)] +pub(crate) struct TransactionManagerSender { + // Using unbounded channel to avoid blocking consensus commit and transaction handler. + sender: monitored_mpsc::UnboundedSender>, } -impl AsyncTransactionScheduler { - pub fn start( +impl TransactionManagerSender { + fn start( transaction_manager: Arc, epoch_store: Arc, ) -> Self { - let (sender, recv) = tokio::sync::mpsc::channel(16); + let (sender, recv) = monitored_mpsc::unbounded_channel("transaction_manager_sender"); spawn_monitored_task!(Self::run(recv, transaction_manager, epoch_store)); Self { sender } } - pub async fn schedule(&self, transactions: Vec) { - self.sender.send(transactions).await.ok(); + fn send(&self, transactions: Vec) { + let _ = self.sender.send(transactions); } - pub async fn run( - mut recv: tokio::sync::mpsc::Receiver>, + async fn run( + mut recv: monitored_mpsc::UnboundedReceiver>, transaction_manager: Arc, epoch_store: Arc, ) { @@ -445,48 +469,51 @@ impl AsyncTransactionScheduler { } } -/// Consensus handler used by Mysticeti. Since Mysticeti repo is not yet integrated, we use a -/// channel to receive the consensus output from Mysticeti. -/// During initialization, the sender is passed into Mysticeti which can send consensus output -/// to the channel. -pub struct MysticetiConsensusHandler { - handle: Option>, +/// Manages the lifetime of tasks handling the commits and transactions output by consensus. +pub(crate) struct MysticetiConsensusHandler { + tasks: JoinSet<()>, } impl MysticetiConsensusHandler { - pub fn new( + pub(crate) fn new( mut consensus_handler: ConsensusHandler, - mut receiver: UnboundedReceiver, + consensus_transaction_handler: ConsensusTransactionHandler, + mut commit_receiver: UnboundedReceiver, + mut transaction_receiver: UnboundedReceiver)>>, commit_consumer_monitor: Arc, ) -> Self { - let handle = spawn_monitored_task!(async move { + let mut tasks = JoinSet::new(); + tasks.spawn(monitored_future!(async move { // TODO: pause when execution is overloaded, so consensus can detect the backpressure. - while let Some(consensus_output) = receiver.recv().await { - let commit_index = consensus_output.commit_ref.index; + while let Some(consensus_commit) = commit_receiver.recv().await { + let commit_index = consensus_commit.commit_ref.index; consensus_handler - .handle_consensus_output(consensus_output) + .handle_consensus_commit(consensus_commit) .await; commit_consumer_monitor.set_highest_handled_commit(commit_index); } - }); - Self { - handle: Some(handle), - } - } - - pub async fn abort(&mut self) { - if let Some(handle) = self.handle.take() { - handle.abort(); - let _ = handle.await; + })); + if consensus_transaction_handler.enabled() { + tasks.spawn(monitored_future!(async move { + while let Some(blocks_and_rejected_transactions) = transaction_receiver.recv().await + { + let parsed_transactions = blocks_and_rejected_transactions + .into_iter() + .flat_map(|(block, rejected_transactions)| { + parse_block_transactions(&block, &rejected_transactions) + }) + .collect::>(); + consensus_transaction_handler + .handle_consensus_transactions(parsed_transactions) + .await; + } + })); } + Self { tasks } } -} -impl Drop for MysticetiConsensusHandler { - fn drop(&mut self) { - if let Some(handle) = self.handle.take() { - handle.abort(); - } + pub(crate) async fn abort(&mut self) { + self.tasks.shutdown().await; } } @@ -749,11 +776,11 @@ pub struct ConsensusCommitInfo { } impl ConsensusCommitInfo { - fn new(protocol_config: &ProtocolConfig, consensus_output: &impl ConsensusOutputAPI) -> Self { + fn new(protocol_config: &ProtocolConfig, consensus_commit: &impl ConsensusCommitAPI) -> Self { Self { - round: consensus_output.leader_round(), - timestamp: consensus_output.commit_timestamp_ms(), - consensus_commit_digest: consensus_output.consensus_digest(protocol_config), + round: consensus_commit.leader_round(), + timestamp: consensus_commit.commit_timestamp_ms(), + consensus_commit_digest: consensus_commit.consensus_digest(protocol_config), #[cfg(any(test, feature = "test-utils"))] skip_consensus_commit_prologue_in_test: false, @@ -829,6 +856,105 @@ impl ConsensusCommitInfo { } } +/// Handles certified and rejected transactions output by consensus. +pub(crate) struct ConsensusTransactionHandler { + /// Whether to enable handling certified transactions. + enabled: bool, + /// Per-epoch store. + epoch_store: Arc, + /// Enqueues transactions to the transaction manager via a separate task. + transaction_manager_sender: TransactionManagerSender, + /// Metrics for consensus transaction handling. + metrics: Arc, +} + +impl ConsensusTransactionHandler { + pub fn new( + epoch_store: Arc, + transaction_manager_sender: TransactionManagerSender, + metrics: Arc, + ) -> Self { + Self { + enabled: epoch_store.protocol_config().mysticeti_fastpath(), + epoch_store, + transaction_manager_sender, + metrics, + } + } + + pub fn enabled(&self) -> bool { + self.enabled + } + + pub async fn handle_consensus_transactions(&self, parsed_transactions: Vec) { + let mut pending_consensus_transactions = vec![]; + let executable_transactions: Vec<_> = parsed_transactions + .into_iter() + .filter_map(|parsed| { + // TODO(fastpath): unlock rejected transactions. + // TODO(fastpath): maybe avoid parsing blocks twice between commit and transaction handling? + if parsed.rejected { + self.metrics + .consensus_transaction_handler_processed + .with_label_values(&["rejected"]) + .inc(); + return None; + } + self.metrics + .consensus_transaction_handler_processed + .with_label_values(&["certified"]) + .inc(); + match &parsed.transaction.kind { + ConsensusTransactionKind::UserTransaction(tx) => { + // TODO(fastpath): use a separate function to check if a transaction should be executed in fastpath. + if tx.contains_shared_object() { + return None; + } + pending_consensus_transactions.push(parsed.transaction.clone()); + let tx = VerifiedTransaction::new_from_verified(*tx.clone()); + Some(VerifiedExecutableTransaction::new_from_consensus( + tx, + self.epoch_store.epoch(), + parsed.round, + parsed.authority, + parsed.transaction_index, + )) + } + _ => None, + } + }) + .collect(); + + if pending_consensus_transactions.is_empty() { + return; + } + { + let reconfig_state = self.epoch_store.get_reconfig_state_read_lock_guard(); + // Stop executing fastpath transactions when epoch change starts. + if !reconfig_state.should_accept_user_certs() { + return; + } + // Otherwise, try to ensure the certified transactions get into consensus before epoch change. + // TODO(fastpath): avoid race with removals in consensus adapter, by waiting to handle commit after + // all blocks in the commit are processed via the transaction handler. Other kinds of races need to be + // avoided as well. Or we can track pending consensus transactions inside consensus instead. + self.epoch_store + .insert_pending_consensus_transactions( + &pending_consensus_transactions, + Some(&reconfig_state), + ) + .unwrap_or_else(|e| { + panic!("Failed to insert pending consensus transactions: {}", e) + }); + } + self.metrics + .consensus_transaction_handler_fastpath_executions + .inc_by(executable_transactions.len() as u64); + self.transaction_manager_sender + .send(executable_transactions); + } +} + #[cfg(test)] mod tests { use consensus_core::{ @@ -839,6 +965,7 @@ mod tests { use sui_types::{ base_types::{random_object_ref, AuthorityName, SuiAddress}, committee::Committee, + crypto::deterministic_random_account_key, messages_consensus::{ AuthorityCapabilitiesV1, ConsensusTransaction, ConsensusTransactionKind, }, @@ -856,12 +983,14 @@ mod tests { test_authority_builder::TestAuthorityBuilder, }, checkpoints::CheckpointServiceNoop, - consensus_adapter::consensus_tests::{test_certificates, test_gas_objects}, + consensus_adapter::consensus_tests::{ + test_certificates, test_gas_objects, test_user_transaction, + }, post_consensus_tx_reorder::PostConsensusTxReorder, }; #[tokio::test] - pub async fn test_consensus_handler() { + pub async fn test_consensus_commit_handler() { // GIVEN let mut objects = test_gas_objects(); let shared_object = Object::shared_for_testing(); @@ -922,6 +1051,7 @@ mod tests { let committed_sub_dag = CommittedSubDag::new( leader_block.reference(), blocks.clone(), + vec![vec![]; blocks.len()], leader_block.timestamp_ms(), CommitRef::new(10, CommitDigest::MIN), vec![], @@ -929,7 +1059,7 @@ mod tests { // AND processing the consensus output once consensus_handler - .handle_consensus_output(committed_sub_dag.clone()) + .handle_consensus_commit(committed_sub_dag.clone()) .await; // AND capturing the consensus stats @@ -956,13 +1086,139 @@ mod tests { // THEN the consensus stats do not update for _ in 0..2 { consensus_handler - .handle_consensus_output(committed_sub_dag.clone()) + .handle_consensus_commit(committed_sub_dag.clone()) .await; let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone(); assert_eq!(last_consensus_stats_1, last_consensus_stats_2); } } + #[tokio::test] + pub async fn test_consensus_transaction_handler() { + // GIVEN + // 1 account keypair + let (sender, keypair) = deterministic_random_account_key(); + // 8 gas objects. + let gas_objects: Vec = (0..8) + .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender)) + .collect(); + // 4 owned objects. + let owned_objects: Vec = (0..4) + .map(|_| Object::with_id_owner_for_testing(ObjectID::random(), sender)) + .collect(); + // 4 shared objects. + let shared_objects: Vec = (0..4) + .map(|_| Object::shared_for_testing()) + .collect::>(); + let mut all_objects = gas_objects.clone(); + all_objects.extend(owned_objects.clone()); + all_objects.extend(shared_objects.clone()); + + let network_config = + sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir() + .with_objects(all_objects.clone()) + .build(); + + let state = TestAuthorityBuilder::new() + .with_network_config(&network_config, 0) + .build() + .await; + let epoch_store = state.epoch_store_for_testing().clone(); + let transaction_manager_sender = TransactionManagerSender::start( + state.transaction_manager().clone(), + epoch_store.clone(), + ); + let transaction_handler = ConsensusTransactionHandler::new( + epoch_store, + transaction_manager_sender, + state.metrics.clone(), + ); + + // AND create test transactions alternating between owned and shared input. + let mut transactions = vec![]; + for (i, gas_object) in gas_objects.iter().enumerate() { + let input_object = if i % 2 == 0 { + owned_objects.get(i / 2).unwrap().clone() + } else { + shared_objects.get(i / 2).unwrap().clone() + }; + let transaction = test_user_transaction( + &state, + sender, + &keypair, + gas_object.clone(), + vec![input_object], + ) + .await; + transactions.push(transaction); + } + + let serialized_transactions: Vec<_> = transactions + .iter() + .map(|t| { + Transaction::new( + bcs::to_bytes(&ConsensusTransaction::new_user_transaction_message( + &state.name, + t.inner().clone(), + )) + .unwrap(), + ) + }) + .collect(); + + // AND create block for all transactions + let block = VerifiedBlock::new_for_test( + TestBlock::new(100, 1) + .set_transactions(serialized_transactions.clone()) + .build(), + ); + + // AND set rejected transactions. + let rejected_transactions = vec![0, 3, 4]; + + // AND process the transactions from consensus output. + transaction_handler + .handle_consensus_transactions(parse_block_transactions(&block, &rejected_transactions)) + .await; + + // THEN check for execution status of transactions. + for (i, t) in transactions.iter().enumerate() { + // Do not expect shared transactions or rejected transactions to be executed. + if i % 2 == 1 || rejected_transactions.contains(&(i as TransactionIndex)) { + continue; + } + let digest = t.digest(); + if let Ok(Ok(_)) = tokio::time::timeout( + std::time::Duration::from_secs(10), + state.notify_read_effects(*digest), + ) + .await + { + // Effects exist as expected. + } else { + panic!("Transaction {} {} did not execute", i, digest); + } + } + + // THEN check for no inflight or suspended transactions. + state.transaction_manager().check_empty_for_testing(); + + // THEN check that rejected transactions are not executed. + for (i, t) in transactions.iter().enumerate() { + // Expect shared transactions or rejected transactions to not have executed. + if i % 2 == 0 && !rejected_transactions.contains(&(i as TransactionIndex)) { + continue; + } + let digest = t.digest(); + assert!( + !state.is_tx_already_executed(digest).unwrap(), + "Rejected transaction {} {} should not have been executed", + i, + digest + ); + } + } + #[test] fn test_order_by_gas_price() { let mut v = vec![cap_txn(10), user_txn(42), user_txn(100), cap_txn(1)]; diff --git a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs index 93612791e90f3..cfee957039a0d 100644 --- a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs +++ b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair}; use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority}; use fastcrypto::ed25519; -use mysten_metrics::{monitored_mpsc::unbounded_channel, RegistryID, RegistryService}; +use mysten_metrics::{RegistryID, RegistryService}; use prometheus::Registry; use sui_config::NodeConfig; use sui_protocol_config::ConsensusNetwork; @@ -19,7 +19,9 @@ use tracing::info; use crate::{ authority::authority_per_epoch_store::AuthorityPerEpochStore, - consensus_handler::{ConsensusHandlerInitializer, MysticetiConsensusHandler}, + consensus_handler::{ + ConsensusHandlerInitializer, ConsensusTransactionHandler, MysticetiConsensusHandler, + }, consensus_manager::{ ConsensusManagerMetrics, ConsensusManagerTrait, Running, RunningLockGuard, }, @@ -144,17 +146,11 @@ impl ConsensusManagerTrait for MysticetiManager { let registry = Registry::new_custom(Some("consensus".to_string()), None).unwrap(); - let (commit_sender, commit_receiver) = unbounded_channel("consensus_output"); - let consensus_handler = consensus_handler_initializer.new_consensus_handler(); - let consumer = CommitConsumer::new( - commit_sender, - consensus_handler.last_processed_subdag_index() as CommitIndex, - ); - let monitor = consumer.monitor(); + let (commit_consumer, commit_receiver, transaction_receiver) = + CommitConsumer::new(consensus_handler.last_processed_subdag_index() as CommitIndex); + let monitor = commit_consumer.monitor(); - // TODO(mysticeti): Investigate if we need to return potential errors from - // AuthorityNode and add retries here? let boot_counter = *self.boot_counter.lock().await; let authority = ConsensusAuthority::start( network_type, @@ -165,7 +161,7 @@ impl ConsensusManagerTrait for MysticetiManager { self.protocol_keypair.clone(), self.network_keypair.clone(), Arc::new(tx_validator.clone()), - consumer, + commit_consumer, registry.clone(), boot_counter, ) @@ -185,7 +181,18 @@ impl ConsensusManagerTrait for MysticetiManager { self.client.set(client); // spin up the new mysticeti consensus handler to listen for committed sub dags - let handler = MysticetiConsensusHandler::new(consensus_handler, commit_receiver, monitor); + let consensus_transaction_handler = ConsensusTransactionHandler::new( + epoch_store.clone(), + consensus_handler.transaction_manager_sender().clone(), + consensus_handler_initializer.metrics().clone(), + ); + let handler = MysticetiConsensusHandler::new( + consensus_handler, + consensus_transaction_handler, + commit_receiver, + transaction_receiver, + monitor, + ); let mut consensus_handler = self.consensus_handler.lock().await; *consensus_handler = Some(handler); diff --git a/crates/sui-core/src/consensus_types/consensus_output_api.rs b/crates/sui-core/src/consensus_types/consensus_output_api.rs index 938bfbaa4546d..e0c2eeebc16e4 100644 --- a/crates/sui-core/src/consensus_types/consensus_output_api.rs +++ b/crates/sui-core/src/consensus_types/consensus_output_api.rs @@ -1,19 +1,30 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::fmt::Display; +use std::{cmp::Ordering, fmt::Display}; -use consensus_core::{BlockAPI, CommitDigest}; +use consensus_core::{BlockAPI, CommitDigest, TransactionIndex, VerifiedBlock}; use sui_protocol_config::ProtocolConfig; -use sui_types::{digests::ConsensusCommitDigest, messages_consensus::ConsensusTransaction}; +use sui_types::{ + digests::ConsensusCommitDigest, + messages_consensus::{AuthorityIndex, ConsensusTransaction, Round}, +}; -use crate::consensus_types::AuthorityIndex; - -/// A list of tuples of: -/// (block origin authority index, all transactions contained in the block). -/// For each transaction, returns deserialized transaction and its serialized size. -type ConsensusOutputTransactions = Vec<(AuthorityIndex, Vec<(ConsensusTransaction, usize)>)>; +pub(crate) struct ParsedTransaction { + // Transaction from consensus output. + pub(crate) transaction: ConsensusTransaction, + // Whether the transaction was rejected in voting. + pub(crate) rejected: bool, + // Bytes length of the serialized transaction + pub(crate) serialized_len: usize, + // Consensus round of the block containing the transaction. + pub(crate) round: Round, + // Authority index of the block containing the transaction. + pub(crate) authority: AuthorityIndex, + // Transaction index in the block. + pub(crate) transaction_index: TransactionIndex, +} -pub(crate) trait ConsensusOutputAPI: Display { +pub(crate) trait ConsensusCommitAPI: Display { fn reputation_score_sorted_desc(&self) -> Option>; fn leader_round(&self) -> u64; fn leader_author_index(&self) -> AuthorityIndex; @@ -24,14 +35,14 @@ pub(crate) trait ConsensusOutputAPI: Display { /// Returns a unique global index for each committed sub-dag. fn commit_sub_dag_index(&self) -> u64; - /// Returns all transactions in the commit. - fn transactions(&self) -> ConsensusOutputTransactions; + /// Returns all accepted and rejected transactions per block in the commit in deterministic order. + fn transactions(&self) -> Vec<(AuthorityIndex, Vec)>; /// Returns the digest of consensus output. fn consensus_digest(&self, protocol_config: &ProtocolConfig) -> ConsensusCommitDigest; } -impl ConsensusOutputAPI for consensus_core::CommittedSubDag { +impl ConsensusCommitAPI for consensus_core::CommittedSubDag { fn reputation_score_sorted_desc(&self) -> Option> { if !self.reputation_scores_desc.is_empty() { Some( @@ -62,30 +73,15 @@ impl ConsensusOutputAPI for consensus_core::CommittedSubDag { self.commit_ref.index.into() } - fn transactions(&self) -> ConsensusOutputTransactions { + fn transactions(&self) -> Vec<(AuthorityIndex, Vec)> { self.blocks .iter() - .map(|block| { - let round = block.round(); - let author = block.author().value() as AuthorityIndex; - let transactions: Vec<_> = block - .transactions() - .iter() - .flat_map(|tx| { - let transaction = bcs::from_bytes::(tx.data()); - match transaction { - Ok(transaction) => Some(( - transaction, - tx.data().len(), - )), - Err(err) => { - tracing::error!("Failed to deserialize sequenced consensus transaction(this should not happen) {} from {author} at {round}", err); - None - }, - } - }) - .collect(); - (author, transactions) + .zip(self.rejected_transactions_by_block.iter()) + .map(|(block, rejected_transactions)| { + ( + block.author().value() as AuthorityIndex, + parse_block_transactions(block, rejected_transactions), + ) }) .collect() } @@ -101,3 +97,49 @@ impl ConsensusOutputAPI for consensus_core::CommittedSubDag { } } } + +pub(crate) fn parse_block_transactions( + block: &VerifiedBlock, + rejected_transactions: &[TransactionIndex], +) -> Vec { + let round = block.round(); + let authority = block.author().value() as AuthorityIndex; + + let mut rejected_idx = 0; + block + .transactions() + .iter().enumerate() + .map(|(index, tx)| { + let transaction = match bcs::from_bytes::(tx.data()) { + Ok(transaction) => transaction, + Err(err) => { + panic!("Failed to deserialize sequenced consensus transaction(this should not happen) {err} from {authority} at {round}"); + }, + }; + let rejected = if rejected_idx < rejected_transactions.len() { + match (index as TransactionIndex).cmp(&rejected_transactions[rejected_idx]) { + Ordering::Less => { + false + }, + Ordering::Equal => { + rejected_idx += 1; + true + }, + Ordering::Greater => { + panic!("Rejected transaction indices are not in order. Block {block:?}, rejected transactions: {rejected_transactions:?}"); + }, + } + } else { + false + }; + ParsedTransaction { + transaction, + rejected, + serialized_len: tx.data().len(), + round, + authority, + transaction_index: index as TransactionIndex, + } + }) + .collect() +} diff --git a/crates/sui-core/src/consensus_types/mod.rs b/crates/sui-core/src/consensus_types/mod.rs index 5aab7d258d1bd..742bb95d2a16b 100644 --- a/crates/sui-core/src/consensus_types/mod.rs +++ b/crates/sui-core/src/consensus_types/mod.rs @@ -2,7 +2,3 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod consensus_output_api; - -/// An unique integer ID for a validator used by consensus. -/// In Mysticeti, this is used the same way as the AuthorityIndex type there. -pub type AuthorityIndex = u32; diff --git a/crates/sui-core/src/consensus_validator.rs b/crates/sui-core/src/consensus_validator.rs index 9c2609e36d971..ad47a537b2110 100644 --- a/crates/sui-core/src/consensus_validator.rs +++ b/crates/sui-core/src/consensus_validator.rs @@ -91,7 +91,13 @@ impl SuiTxValidator { | ConsensusTransactionKind::RandomnessStateUpdate(_, _) => {} ConsensusTransactionKind::UserTransaction(_tx) => { - // TODO: implement verification for uncertified user transactions if needed + if !self.epoch_store.protocol_config().mysticeti_fastpath() { + return Err(SuiError::UnexpectedMessage( + "ConsensusTransactionKind::UserTransaction is unsupported".to_string(), + ) + .into()); + } + // TODO(fastpath): implement verification for uncertified user transactions. } } } diff --git a/crates/sui-core/src/scoring_decision.rs b/crates/sui-core/src/scoring_decision.rs index 486da40c786bc..fd48e5bfddccb 100644 --- a/crates/sui-core/src/scoring_decision.rs +++ b/crates/sui-core/src/scoring_decision.rs @@ -4,10 +4,12 @@ use std::{collections::HashMap, sync::Arc}; use arc_swap::ArcSwap; use consensus_config::Committee as ConsensusCommittee; -use sui_types::{base_types::AuthorityName, committee::Committee}; +use sui_types::{ + base_types::AuthorityName, committee::Committee, messages_consensus::AuthorityIndex, +}; use tracing::debug; -use crate::{authority::AuthorityMetrics, consensus_types::AuthorityIndex}; +use crate::authority::AuthorityMetrics; /// Updates list of authorities that are deemed to have low reputation scores by consensus /// these may be lagging behind the network, byzantine, or not reliably participating for any reason. diff --git a/crates/sui-core/src/transaction_manager.rs b/crates/sui-core/src/transaction_manager.rs index 368e259c2dbcd..0ffd0c7a92b51 100644 --- a/crates/sui-core/src/transaction_manager.rs +++ b/crates/sui-core/src/transaction_manager.rs @@ -863,7 +863,7 @@ impl TransactionManager { // Verify TM has no pending item for tests. #[cfg(test)] - fn check_empty_for_testing(&self) { + pub(crate) fn check_empty_for_testing(&self) { let reconfig_lock = self.inner.read(); let inner = reconfig_lock.read(); assert!( diff --git a/crates/sui-core/src/unit_tests/consensus_tests.rs b/crates/sui-core/src/unit_tests/consensus_tests.rs index 59516fe5f318f..c2c60f048283b 100644 --- a/crates/sui-core/src/unit_tests/consensus_tests.rs +++ b/crates/sui-core/src/unit_tests/consensus_tests.rs @@ -12,9 +12,10 @@ use narwhal_types::Transactions; use narwhal_types::TransactionsServer; use narwhal_types::{Empty, TransactionProto}; use sui_network::tonic; -use sui_types::crypto::deterministic_random_account_key; +use sui_types::base_types::SuiAddress; +use sui_types::crypto::{deterministic_random_account_key, AccountKeyPair}; use sui_types::multiaddr::Multiaddr; -use sui_types::transaction::TEST_ONLY_GAS_UNIT_FOR_OBJECT_BASICS; +use sui_types::transaction::{VerifiedTransaction, TEST_ONLY_GAS_UNIT_FOR_OBJECT_BASICS}; use sui_types::utils::to_sender_signed_transaction; use sui_types::SUI_FRAMEWORK_PACKAGE_ID; use sui_types::{ @@ -105,6 +106,70 @@ pub async fn test_certificates( certificates } +/// Fixture: creates a transaction using the specified gas and input objects. +pub async fn test_user_transaction( + authority: &AuthorityState, + sender: SuiAddress, + keypair: &AccountKeyPair, + gas_object: Object, + input_objects: Vec, +) -> VerifiedTransaction { + let epoch_store = authority.load_epoch_store_one_call_per_task(); + let rgp = epoch_store.reference_gas_price(); + + // Object digest may be different in genesis than originally generated. + let gas_object = authority + .get_object(&gas_object.id()) + .await + .unwrap() + .unwrap(); + let mut input_objs = vec![]; + for obj in input_objects { + input_objs.push(authority.get_object(&obj.id()).await.unwrap().unwrap()); + } + + let mut object_args: Vec<_> = input_objs + .into_iter() + .map(|obj| { + if obj.is_shared() { + ObjectArg::SharedObject { + id: obj.id(), + initial_shared_version: obj.version(), + mutable: true, + } + } else { + ObjectArg::ImmOrOwnedObject(obj.compute_object_reference()) + } + }) + .map(CallArg::Object) + .collect(); + object_args.extend(vec![ + CallArg::Pure(16u64.to_le_bytes().to_vec()), + CallArg::Pure(bcs::to_bytes(&AccountAddress::from(sender)).unwrap()), + ]); + + // Make a sample transaction. + let module = "object_basics"; + let function = "create"; + + let data = TransactionData::new_move_call( + sender, + SUI_FRAMEWORK_PACKAGE_ID, + ident_str!(module).to_owned(), + ident_str!(function).to_owned(), + /* type_args */ vec![], + gas_object.compute_object_reference(), + object_args, + rgp * TEST_ONLY_GAS_UNIT_FOR_OBJECT_BASICS, + rgp, + ) + .unwrap(); + + epoch_store + .verify_transaction(to_sender_signed_transaction(data, keypair)) + .unwrap() +} + pub fn make_consensus_adapter_for_test( state: Arc, process_via_checkpoint: HashSet, diff --git a/crates/sui-json-rpc/src/error.rs b/crates/sui-json-rpc/src/error.rs index 793671849ed9e..d4abdb05dfe06 100644 --- a/crates/sui-json-rpc/src/error.rs +++ b/crates/sui-json-rpc/src/error.rs @@ -554,8 +554,9 @@ mod tests { #[test] fn test_quorum_driver_internal_error() { - let quorum_driver_error = - QuorumDriverError::QuorumDriverInternalError(SuiError::UnexpectedMessage); + let quorum_driver_error = QuorumDriverError::QuorumDriverInternalError( + SuiError::UnexpectedMessage("test".to_string()), + ); let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); @@ -570,7 +571,7 @@ mod tests { fn test_system_overload() { let quorum_driver_error = QuorumDriverError::SystemOverload { overloaded_stake: 10, - errors: vec![(SuiError::UnexpectedMessage, 0, vec![])], + errors: vec![(SuiError::UnexpectedMessage("test".to_string()), 0, vec![])], }; let rpc_error: RpcError = Error::QuorumDriverError(quorum_driver_error).into(); diff --git a/crates/sui-transaction-checks/src/deny.rs b/crates/sui-transaction-checks/src/deny.rs index fd10c19489c07..d0b2b9247f7c2 100644 --- a/crates/sui-transaction-checks/src/deny.rs +++ b/crates/sui-transaction-checks/src/deny.rs @@ -81,7 +81,7 @@ fn check_disabled_features( deny_if_true!( filter_config.zklogin_disabled_providers().contains( &OIDCProvider::from_iss(z.get_iss()) - .map_err(|_| SuiError::UnexpectedMessage)? + .map_err(|_| SuiError::UnexpectedMessage(z.get_iss().to_string()))? .to_string() ), "zkLogin OAuth provider is temporarily disabled" diff --git a/crates/sui-types/src/error.rs b/crates/sui-types/src/error.rs index afa128dcf7bcf..4fe511b4674be 100644 --- a/crates/sui-types/src/error.rs +++ b/crates/sui-types/src/error.rs @@ -443,8 +443,8 @@ pub enum SuiError { #[error("Invalid DKG message size")] InvalidDkgMessageSize, - #[error("Unexpected message.")] - UnexpectedMessage, + #[error("Unexpected message: {0}")] + UnexpectedMessage(String), // Move module publishing related errors #[error("Failed to verify the Move module, reason: {error:?}.")]