diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index 87ea9e4c61793..4254beeac9b08 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -49,6 +49,7 @@ use sui_types::base_types::AuthorityName; use sui_types::fp_ensure; use sui_types::messages_consensus::ConsensusTransactionKind; use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKey}; +use sui_types::transaction::TransactionDataAPI; use tokio::time::Duration; use tracing::{debug, info, trace, warn}; @@ -351,7 +352,7 @@ impl ConsensusAdapter { fn await_submit_delay( &self, - committee: &Committee, + epoch_store: &Arc, transactions: &[ConsensusTransaction], ) -> (impl Future, usize, usize, usize) { if transactions.iter().any(|tx| tx.is_user_transaction()) { @@ -361,23 +362,35 @@ impl ConsensusAdapter { } // Use the minimum digest to compute submit delay. - let min_digest = transactions + let min_digest_and_gas_price = transactions .iter() .filter_map(|tx| match &tx.kind { ConsensusTransactionKind::CertifiedTransaction(certificate) => { - Some(certificate.digest()) - } - ConsensusTransactionKind::UserTransaction(transaction) => { - Some(transaction.digest()) + Some((certificate.digest(), certificate.gas_price())) } + ConsensusTransactionKind::UserTransaction(transaction) => Some(( + transaction.digest(), + transaction.data().transaction_data().gas_price(), + )), _ => None, }) .min(); - let (duration, position, positions_moved, preceding_disconnected) = match min_digest { - Some(digest) => self.await_submit_delay_user_transaction(committee, digest), - _ => (Duration::ZERO, 0, 0, 0), - }; + let (duration, position, positions_moved, preceding_disconnected) = + match min_digest_and_gas_price { + Some((digest, gas_price)) => { + // TODO: Make this configurable. + let k = 5; + let multipler = gas_price / epoch_store.reference_gas_price(); + let amplification_factor = if multipler >= k { multipler } else { 0 }; + self.await_submit_delay_user_transaction( + epoch_store.committee(), + digest, + amplification_factor as usize, + ) + } + _ => (Duration::ZERO, 0, 0, 0), + }; ( tokio::time::sleep(duration), position, @@ -390,9 +403,13 @@ impl ConsensusAdapter { &self, committee: &Committee, tx_digest: &TransactionDigest, + amplification_factor: usize, ) -> (Duration, usize, usize, usize) { - let (position, positions_moved, preceding_disconnected) = + let (mut position, positions_moved, preceding_disconnected) = self.submission_position(committee, tx_digest); + if amplification_factor > 0 { + position = (position + 1).saturating_sub(amplification_factor); + } const DEFAULT_LATENCY: Duration = Duration::from_secs(1); // > p50 consensus latency with global deployment const MIN_LATENCY: Duration = Duration::from_millis(150); @@ -687,7 +704,7 @@ impl ConsensusAdapter { // Create the waiter until the node's turn comes to submit to consensus let (await_submit, position, positions_moved, preceding_disconnected) = - self.await_submit_delay(epoch_store.committee(), &transactions[..]); + self.await_submit_delay(epoch_store, &transactions[..]); // Create the waiter until the transaction is processed by consensus or via checkpoint let processed_via_consensus_or_checkpoint = @@ -1325,7 +1342,7 @@ mod adapter_tests { // Make sure that position is set to max value 0 let (delay_step, position, positions_moved, _) = - consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest); + consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0); assert_eq!(position, 1); assert_eq!(delay_step, Duration::from_secs(2)); @@ -1345,13 +1362,25 @@ mod adapter_tests { ); let (delay_step, position, positions_moved, _) = - consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest); + consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 0); assert_eq!(position, 7); // delay_step * position * 2 = 1 * 7 * 2 = 14 assert_eq!(delay_step, Duration::from_secs(14)); assert!(!positions_moved > 0); + + // With an amplification factor of 7, the position should be moved to 1. + let (delay_step, position, _, _) = + consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 7); + assert_eq!(position, 1); + assert_eq!(delay_step, Duration::from_secs(2)); + + // With an amplification factor > 7, the position should become 0. + let (delay_step, position, _, _) = + consensus_adapter.await_submit_delay_user_transaction(&committee, &tx_digest, 8); + assert_eq!(position, 0); + assert_eq!(delay_step, Duration::ZERO); } #[test]