diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index aa17398519ebb..ccd4d29e64873 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -17,7 +17,10 @@ use aptos_config::{ }; use aptos_crypto::{ed25519::Ed25519PrivateKey, hash::HashValue, SigningKey}; use aptos_db::AptosDB; -use aptos_executor::{block_executor::BlockExecutor, db_bootstrapper}; +use aptos_executor::{ + block_executor::{AptosVMBlockExecutor, BlockExecutor}, + db_bootstrapper, +}; use aptos_executor_types::BlockExecutorTrait; use aptos_framework::BuiltPackage; use aptos_indexer_grpc_table_info::internal_indexer_db_service::MockInternalIndexerDBService; @@ -204,7 +207,7 @@ pub fn new_test_context_inner( rng, root_key, validator_owner, - Box::new(BlockExecutor::::new(db_rw)), + Box::new(BlockExecutor::::new(db_rw)), mempool, db, test_name, diff --git a/aptos-move/block-executor/src/counters.rs b/aptos-move/block-executor/src/counters.rs index 50c2d2bb1f5df..c3f1dc61f8fd3 100644 --- a/aptos-move/block-executor/src/counters.rs +++ b/aptos-move/block-executor/src/counters.rs @@ -49,6 +49,18 @@ fn output_buckets() -> std::vec::Vec { .unwrap() } +pub static BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK: Lazy = Lazy::new(|| { + register_histogram!( + // metric name + "aptos_executor_block_executor_inner_execute_block_seconds", + // metric description + "The time spent in the most-inner part of executing a block of transactions, \ + i.e. for BlockSTM that is how long parallel or sequential execution took.", + exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), + ) + .unwrap() +}); + /// Count of times the module publishing fallback was triggered in parallel execution. pub static MODULE_PUBLISHING_FALLBACK_COUNT: Lazy = Lazy::new(|| { register_int_counter!( diff --git a/aptos-move/block-executor/src/executor.rs b/aptos-move/block-executor/src/executor.rs index eb7965ace5adb..182ff626059f0 100644 --- a/aptos-move/block-executor/src/executor.rs +++ b/aptos-move/block-executor/src/executor.rs @@ -4,10 +4,10 @@ use crate::{ code_cache_global::ImmutableModuleCache, - counters, counters::{ - PARALLEL_EXECUTION_SECONDS, RAYON_EXECUTION_SECONDS, TASK_EXECUTE_SECONDS, - TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, WORK_WITH_TASK_SECONDS, + self, BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK, PARALLEL_EXECUTION_SECONDS, + RAYON_EXECUTION_SECONDS, TASK_EXECUTE_SECONDS, TASK_VALIDATE_SECONDS, VM_INIT_SECONDS, + WORK_WITH_TASK_SECONDS, }, errors::*, executor_utilities::*, @@ -1692,6 +1692,8 @@ where signature_verified_block: &[T], base_view: &S, ) -> BlockExecutionResult, E::Error> { + let _timer = BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK.start_timer(); + if self.config.local.concurrency_level > 1 { let parallel_result = self.execute_transactions_parallel(&env, signature_verified_block, base_view); diff --git a/aptos-move/e2e-benchmark/src/main.rs b/aptos-move/e2e-benchmark/src/main.rs index 7169989088d13..6ac46ffd3f785 100644 --- a/aptos-move/e2e-benchmark/src/main.rs +++ b/aptos-move/e2e-benchmark/src/main.rs @@ -15,7 +15,7 @@ use aptos_transaction_generator_lib::{ use aptos_types::{account_address::AccountAddress, transaction::TransactionPayload}; use rand::{rngs::StdRng, SeedableRng}; use serde_json::json; -use std::process::exit; +use std::{collections::HashMap, process::exit}; pub fn execute_txn( executor: &mut FakeExecutor, @@ -80,10 +80,55 @@ const ALLOWED_REGRESSION: f32 = 0.15; const ALLOWED_IMPROVEMENT: f32 = 0.15; const ABSOLUTE_BUFFER_US: f32 = 2.0; +const CALIBRATION_VALUES: &str = " +Loop { loop_count: Some(100000), loop_type: NoOp } 6 0.988 1.039 41212.4 +Loop { loop_count: Some(10000), loop_type: Arithmetic } 6 0.977 1.038 25868.8 +CreateObjects { num_objects: 10, object_payload_size: 0 } 6 0.940 1.026 152.1 +CreateObjects { num_objects: 10, object_payload_size: 10240 } 6 0.934 1.051 9731.3 +CreateObjects { num_objects: 100, object_payload_size: 0 } 6 0.966 1.051 1458.3 +CreateObjects { num_objects: 100, object_payload_size: 10240 } 6 0.969 1.077 11196.4 +InitializeVectorPicture { length: 40 } 6 0.973 1.066 75.0 +VectorPicture { length: 40 } 6 0.955 1.092 22.0 +VectorPictureRead { length: 40 } 6 0.952 1.047 21.0 +InitializeVectorPicture { length: 30720 } 6 0.969 1.071 27295.8 +VectorPicture { length: 30720 } 6 0.957 1.066 6560.2 +VectorPictureRead { length: 30720 } 6 0.948 1.053 6642.8 +SmartTablePicture { length: 30720, num_points_per_txn: 200 } 6 0.972 1.024 42660.4 +SmartTablePicture { length: 1048576, num_points_per_txn: 300 } 6 0.961 1.020 73725.5 +ResourceGroupsSenderWriteTag { string_length: 1024 } 6 0.867 1.001 15.0 +ResourceGroupsSenderMultiChange { string_length: 1024 } 6 0.966 1.069 29.0 +TokenV1MintAndTransferFT 6 0.972 1.045 356.8 +TokenV1MintAndTransferNFTSequential 6 0.991 1.067 543.7 +TokenV2AmbassadorMint { numbered: true } 6 0.987 1.052 474.4 +LiquidityPoolSwap { is_stable: true } 6 0.970 1.042 555.4 +LiquidityPoolSwap { is_stable: false } 6 0.925 1.001 535.3 +"; + +struct CalibrationInfo { + // count: usize, + expected_time: f32, +} + +fn get_parsed_calibration_values() -> HashMap { + CALIBRATION_VALUES + .trim() + .split('\n') + .map(|line| { + let parts = line.split('\t').collect::>(); + (parts[0].to_string(), CalibrationInfo { + // count: parts[1].parse().unwrap(), + expected_time: parts[parts.len() - 1].parse().unwrap(), + }) + }) + .collect() +} + fn main() { let executor = FakeExecutor::from_head_genesis(); let mut executor = executor.set_not_parallel(); + let calibration_values = get_parsed_calibration_values(); + let entry_points = vec![ // too fast for the timer // (, EntryPoints::Nop), @@ -91,59 +136,57 @@ fn main() { // data_length: Some(32), // }), // (, EntryPoints::IncGlobal), - (34651, EntryPoints::Loop { + EntryPoints::Loop { loop_count: Some(100000), loop_type: LoopType::NoOp, - }), - (21145, EntryPoints::Loop { + }, + EntryPoints::Loop { loop_count: Some(10000), loop_type: LoopType::Arithmetic, - }), + }, // This is a cheap bcs (serializing vec), so not representative of what BCS native call should cost. // (, EntryPoints::Loop { loop_count: Some(1000), loop_type: LoopType::BCS { len: 1024 }}), - (124, EntryPoints::CreateObjects { + EntryPoints::CreateObjects { num_objects: 10, object_payload_size: 0, - }), - (8090, EntryPoints::CreateObjects { + }, + EntryPoints::CreateObjects { num_objects: 10, object_payload_size: 10 * 1024, - }), - (1246, EntryPoints::CreateObjects { + }, + EntryPoints::CreateObjects { num_objects: 100, object_payload_size: 0, - }), - (9556, EntryPoints::CreateObjects { + }, + EntryPoints::CreateObjects { num_objects: 100, object_payload_size: 10 * 1024, - }), - (61, EntryPoints::InitializeVectorPicture { length: 40 }), - (16, EntryPoints::VectorPicture { length: 40 }), - (16, EntryPoints::VectorPictureRead { length: 40 }), - (23256, EntryPoints::InitializeVectorPicture { - length: 30 * 1024, - }), - (5860, EntryPoints::VectorPicture { length: 30 * 1024 }), - (5849, EntryPoints::VectorPictureRead { length: 30 * 1024 }), - (35440, EntryPoints::SmartTablePicture { + }, + EntryPoints::InitializeVectorPicture { length: 40 }, + EntryPoints::VectorPicture { length: 40 }, + EntryPoints::VectorPictureRead { length: 40 }, + EntryPoints::InitializeVectorPicture { length: 30 * 1024 }, + EntryPoints::VectorPicture { length: 30 * 1024 }, + EntryPoints::VectorPictureRead { length: 30 * 1024 }, + EntryPoints::SmartTablePicture { length: 30 * 1024, num_points_per_txn: 200, - }), - (60464, EntryPoints::SmartTablePicture { + }, + EntryPoints::SmartTablePicture { length: 1024 * 1024, num_points_per_txn: 300, - }), - (13, EntryPoints::ResourceGroupsSenderWriteTag { + }, + EntryPoints::ResourceGroupsSenderWriteTag { string_length: 1024, - }), - (27, EntryPoints::ResourceGroupsSenderMultiChange { + }, + EntryPoints::ResourceGroupsSenderMultiChange { string_length: 1024, - }), - (291, EntryPoints::TokenV1MintAndTransferFT), - (468, EntryPoints::TokenV1MintAndTransferNFTSequential), - (386, EntryPoints::TokenV2AmbassadorMint { numbered: true }), - (467, EntryPoints::LiquidityPoolSwap { is_stable: true }), - (429, EntryPoints::LiquidityPoolSwap { is_stable: false }), + }, + EntryPoints::TokenV1MintAndTransferFT, + EntryPoints::TokenV1MintAndTransferNFTSequential, + EntryPoints::TokenV2AmbassadorMint { numbered: true }, + EntryPoints::LiquidityPoolSwap { is_stable: true }, + EntryPoints::LiquidityPoolSwap { is_stable: false }, ]; let mut failures = Vec::new(); @@ -154,7 +197,12 @@ fn main() { "wall time (us)", "expected (us)", "diff(- is impr)" ); - for (index, (expected_time, entry_point)) in entry_points.into_iter().enumerate() { + for (index, entry_point) in entry_points.into_iter().enumerate() { + let entry_point_name = format!("{:?}", entry_point); + let expected_time = calibration_values + .get(&entry_point_name) + .unwrap() + .expected_time; let publisher = executor.new_account_at(AccountAddress::random()); let mut package_handler = PackageHandler::new(entry_point.package_name()); @@ -189,23 +237,23 @@ fn main() { &package, publisher.address(), &mut executor, - if expected_time > 10000 { + if expected_time > 10000.0 { 6 - } else if expected_time > 1000 { + } else if expected_time > 1000.0 { 10 } else { 100 }, ); - let diff = (elapsed_micros as f32 - expected_time as f32) / (expected_time as f32) * 100.0; + let diff = (elapsed_micros as f32 - expected_time) / expected_time * 100.0; println!( - "{:15} {:15} {:14.1}% {:?}", + "{:15} {:15.1} {:14.1}% {:?}", elapsed_micros, expected_time, diff, entry_point ); json_lines.push(json!({ "grep": "grep_json_aptos_move_vm_perf", - "transaction_type": format!("{:?}", entry_point), + "transaction_type": entry_point_name, "wall_time_us": elapsed_micros, "expected_wall_time_us": expected_time, "test_index": index, diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 667b1afb13188..cec6d3cd64353 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -48,7 +48,6 @@ aptos-temppath = { workspace = true } aptos-time-service = { workspace = true } aptos-types = { workspace = true } aptos-validator-transaction-pool = { workspace = true } -aptos-vm = { workspace = true } async-trait = { workspace = true } bcs = { workspace = true } byteorder = { workspace = true } diff --git a/consensus/src/consensus_provider.rs b/consensus/src/consensus_provider.rs index 8667c9dec1226..87764e5df2f7d 100644 --- a/consensus/src/consensus_provider.rs +++ b/consensus/src/consensus_provider.rs @@ -29,14 +29,13 @@ use aptos_channels::aptos_channel::Receiver; use aptos_config::config::NodeConfig; use aptos_consensus_notifications::ConsensusNotificationSender; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; -use aptos_executor::block_executor::BlockExecutor; +use aptos_executor::block_executor::{AptosVMBlockExecutor, BlockExecutor}; use aptos_logger::prelude::*; use aptos_mempool::QuorumStoreRequest; use aptos_network::application::interface::{NetworkClient, NetworkServiceEvents}; use aptos_storage_interface::DbReaderWriter; use aptos_time_service::TimeService; use aptos_validator_transaction_pool::VTxnPoolState; -use aptos_vm::AptosVM; use futures::channel::mpsc; use move_core_types::account_address::AccountAddress; use std::{collections::HashMap, sync::Arc}; @@ -65,7 +64,7 @@ pub fn start_consensus( )); let execution_proxy = ExecutionProxy::new( - Arc::new(BlockExecutor::::new(aptos_db)), + Arc::new(BlockExecutor::::new(aptos_db)), txn_notifier, state_sync_notifier, runtime.handle(), @@ -158,7 +157,7 @@ pub fn start_consensus_observer( node_config.consensus.mempool_executed_txn_timeout_ms, )); let execution_proxy = ExecutionProxy::new( - Arc::new(BlockExecutor::::new(aptos_db.clone())), + Arc::new(BlockExecutor::::new(aptos_db.clone())), txn_notifier, state_sync_notifier, consensus_observer_runtime.handle(), diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index c3bbb9c2f7cfb..6cb016fa02158 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -34,6 +34,10 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; +/// Smallest number of transactions Rayon should put into a single worker task. +/// Same as in execution/executor-benchmark/src/block_preparation.rs +pub const SIG_VERIFY_RAYON_MIN_THRESHOLD: usize = 32; + pub type PreCommitHook = Box BoxFuture<'static, ()> + Send>; @@ -156,7 +160,7 @@ impl ExecutionPipeline { let num_txns = txns_to_execute.len(); txns_to_execute .into_par_iter() - .with_min_len(optimal_min_len(num_txns, 32)) + .with_min_len(optimal_min_len(num_txns, SIG_VERIFY_RAYON_MIN_THRESHOLD)) .map(|t| t.into()) .collect::>() }); diff --git a/ecosystem/indexer-grpc/indexer-test-transactions/src/lib.rs b/ecosystem/indexer-grpc/indexer-test-transactions/src/lib.rs index d2782bd8ee1d9..433dc44aa4573 100644 --- a/ecosystem/indexer-grpc/indexer-test-transactions/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-test-transactions/src/lib.rs @@ -14,6 +14,6 @@ mod tests { // Check that the transaction is valid JSON let transaction = serde_json::from_slice::(json_bytes).unwrap(); - assert_eq!(transaction.version, 61); + assert_eq!(transaction.version, 53); } } diff --git a/execution/executor-benchmark/src/block_preparation.rs b/execution/executor-benchmark/src/block_preparation.rs index 773a6c9f31b79..c58e428214b30 100644 --- a/execution/executor-benchmark/src/block_preparation.rs +++ b/execution/executor-benchmark/src/block_preparation.rs @@ -1,7 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{metrics::TIMER, pipeline::ExecuteBlockMessage}; +use crate::{ + metrics::{NUM_TXNS, TIMER}, + pipeline::ExecuteBlockMessage, +}; use aptos_block_partitioner::{BlockPartitioner, PartitionerConfig}; use aptos_crypto::HashValue; use aptos_experimental_runtimes::thread_manager::optimal_min_len; @@ -10,28 +13,33 @@ use aptos_types::{ block_executor::partitioner::{ExecutableBlock, ExecutableTransactions}, transaction::{signature_verified_transaction::SignatureVerifiedTransaction, Transaction}, }; -use once_cell::sync::Lazy; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; -use std::{sync::Arc, time::Instant}; +use std::time::Instant; -pub static SIG_VERIFY_POOL: Lazy> = Lazy::new(|| { - Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(8) // More than 8 threads doesn't seem to help much - .thread_name(|index| format!("signature-checker-{}", index)) - .build() - .unwrap(), - ) -}); +/// Smallest number of transactions Rayon should put into a single worker task. +/// Same as in consensus/src/execution_pipeline.rs +pub const SIG_VERIFY_RAYON_MIN_THRESHOLD: usize = 32; +/// Executes preparation stage - set of operations that are +/// executed in a separate stage of the pipeline from execution, +/// like signature verificaiton or block partitioning pub(crate) struct BlockPreparationStage { - num_executor_shards: usize, + /// Number of blocks processed num_blocks_processed: usize, + /// Pool of theads for signature verification + sig_verify_pool: rayon::ThreadPool, + /// When execution sharding is enabled, number of executor shards + num_executor_shards: usize, + /// When execution sharding is enabled, partitioner that splits block into shards maybe_partitioner: Option>, } impl BlockPreparationStage { - pub fn new(num_shards: usize, partitioner_config: &dyn PartitionerConfig) -> Self { + pub fn new( + num_sig_verify_threads: usize, + num_shards: usize, + partitioner_config: &dyn PartitionerConfig, + ) -> Self { let maybe_partitioner = if num_shards == 0 { None } else { @@ -39,10 +47,16 @@ impl BlockPreparationStage { Some(partitioner) }; + let sig_verify_pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_sig_verify_threads) + .thread_name(|index| format!("signature-checker-{}", index)) + .build() + .expect("couldn't create sig_verify thread pool"); Self { num_executor_shards: num_shards, num_blocks_processed: 0, maybe_partitioner, + sig_verify_pool, } } @@ -54,16 +68,26 @@ impl BlockPreparationStage { txns.len() ); let block_id = HashValue::random(); - let sig_verified_txns: Vec = SIG_VERIFY_POOL.install(|| { - let num_txns = txns.len(); - txns.into_par_iter() - .with_min_len(optimal_min_len(num_txns, 32)) - .map(|t| t.into()) - .collect::>() - }); + let sig_verified_txns: Vec = + self.sig_verify_pool.install(|| { + let _timer = TIMER.with_label_values(&["sig_verify"]).start_timer(); + + let num_txns = txns.len(); + NUM_TXNS + .with_label_values(&["sig_verify"]) + .inc_by(num_txns as u64); + + txns.into_par_iter() + .with_min_len(optimal_min_len(num_txns, SIG_VERIFY_RAYON_MIN_THRESHOLD)) + .map(|t| t.into()) + .collect::>() + }); let block: ExecutableBlock = match &self.maybe_partitioner { None => (block_id, sig_verified_txns).into(), Some(partitioner) => { + NUM_TXNS + .with_label_values(&["partition"]) + .inc_by(sig_verified_txns.len() as u64); let analyzed_transactions = sig_verified_txns.into_iter().map(|t| t.into()).collect(); let timer = TIMER.with_label_values(&["partition"]).start_timer(); diff --git a/execution/executor-benchmark/src/db_generator.rs b/execution/executor-benchmark/src/db_generator.rs index 626b07687205d..23fb2375cc525 100644 --- a/execution/executor-benchmark/src/db_generator.rs +++ b/execution/executor-benchmark/src/db_generator.rs @@ -62,7 +62,7 @@ pub fn create_db_with_accounts( ); } -fn bootstrap_with_genesis( +pub(crate) fn bootstrap_with_genesis( db_dir: impl AsRef, enable_storage_sharding: bool, init_features: Features, diff --git a/execution/executor-benchmark/src/lib.rs b/execution/executor-benchmark/src/lib.rs index 09f2a5b91558b..ff355f98a6c43 100644 --- a/execution/executor-benchmark/src/lib.rs +++ b/execution/executor-benchmark/src/lib.rs @@ -9,6 +9,7 @@ pub mod db_generator; mod db_reliable_submitter; mod ledger_update_stage; mod metrics; +pub mod native; pub mod native_executor; pub mod pipeline; pub mod transaction_committer; @@ -19,15 +20,16 @@ use crate::{ db_access::DbAccessUtil, pipeline::Pipeline, transaction_committer::TransactionCommitter, transaction_executor::TransactionExecutor, transaction_generator::TransactionGenerator, }; -use aptos_block_executor::counters::{self as block_executor_counters, GasType}; -use aptos_block_partitioner::v2::counters::BLOCK_PARTITIONING_SECONDS; +use aptos_block_executor::counters::{ + self as block_executor_counters, GasType, BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK, +}; use aptos_config::config::{NodeConfig, PrunerConfig}; use aptos_db::AptosDB; use aptos_executor::{ block_executor::{BlockExecutor, TransactionBlockExecutor}, metrics::{ - COMMIT_BLOCKS, EXECUTE_BLOCK, OTHER_TIMERS, PROCESSED_TXNS_OUTPUT_SIZE, UPDATE_LEDGER, - VM_EXECUTE_BLOCK, + COMMIT_BLOCKS, GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING, OTHER_TIMERS, + PROCESSED_TXNS_OUTPUT_SIZE, UPDATE_LEDGER, }, }; use aptos_jellyfish_merkle::metrics::{ @@ -43,6 +45,7 @@ use aptos_transaction_generator_lib::{ }; use aptos_types::on_chain_config::Features; use db_reliable_submitter::DbReliableTransactionSubmitter; +use metrics::TIMER; use pipeline::PipelineConfig; use std::{ collections::HashMap, @@ -91,16 +94,22 @@ fn create_checkpoint( .expect("db checkpoint creation fails."); } +pub enum BenchmarkWorkload { + TransactionMix(Vec<(TransactionType, usize)>), + Transfer { + connected_tx_grps: usize, + shuffle_connected_txns: bool, + hotspot_probability: Option, + }, +} + /// Runs the benchmark with given parameters. #[allow(clippy::too_many_arguments)] pub fn run_benchmark( block_size: usize, num_blocks: usize, - transaction_mix: Option>, + workload: BenchmarkWorkload, mut transactions_per_sender: usize, - connected_tx_grps: usize, - shuffle_connected_txns: bool, - hotspot_probability: Option, num_main_signer_accounts: usize, num_additional_dst_pool_accounts: usize, source_dir: impl AsRef, @@ -126,7 +135,10 @@ pub fn run_benchmark( let (db, executor) = init_db_and_executor::(&config); let root_account = TransactionGenerator::read_root_account(genesis_key, &db); let root_account = Arc::new(root_account); - let transaction_generators = transaction_mix.clone().map(|transaction_mix| { + + let transaction_generators = if let BenchmarkWorkload::TransactionMix(transaction_mix) = + &workload + { let num_existing_accounts = TransactionGenerator::read_meta(&source_dir); let num_accounts_to_be_loaded = std::cmp::min( num_existing_accounts, @@ -134,8 +146,9 @@ pub fn run_benchmark( ); let mut num_accounts_to_skip = 0; - for (transaction_type, _) in &transaction_mix { - if matches!(transaction_type, CoinTransfer { non_conflicting, .. } if *non_conflicting) { + for (transaction_type, _) in transaction_mix { + if matches!(transaction_type, CoinTransfer { non_conflicting, .. } if *non_conflicting) + { // In case of random non-conflicting coin transfer using `P2PTransactionGenerator`, // `3*block_size` addresses is required: // `block_size` number of signers, and 2 groups of burn-n-recycle recipients used alternatively. @@ -146,13 +159,16 @@ pub fn run_benchmark( } } - let accounts_cache = - TransactionGenerator::gen_user_account_cache(db.reader.clone(), num_accounts_to_be_loaded, num_accounts_to_skip); + let accounts_cache = TransactionGenerator::gen_user_account_cache( + db.reader.clone(), + num_accounts_to_be_loaded, + num_accounts_to_skip, + ); let (main_signer_accounts, burner_accounts) = accounts_cache.split(num_main_signer_accounts); let (transaction_generator_creator, phase) = init_workload::( - transaction_mix, + transaction_mix.clone(), root_account.clone(), main_signer_accounts, burner_accounts, @@ -162,15 +178,23 @@ pub fn run_benchmark( &PipelineConfig::default(), ); // need to initialize all workers and finish with all transactions before we start the timer: - ((0..pipeline_config.num_generator_workers).map(|_| transaction_generator_creator.create_transaction_generator()).collect::>(), phase) - }); + Some(( + (0..pipeline_config.num_generator_workers) + .map(|_| transaction_generator_creator.create_transaction_generator()) + .collect::>(), + phase, + )) + } else { + None + }; let start_version = db.reader.expect_synced_version(); let (pipeline, block_sender) = Pipeline::new(executor, start_version, &pipeline_config, Some(num_blocks)); let mut num_accounts_to_load = num_main_signer_accounts; - if let Some(mix) = &transaction_mix { + + if let BenchmarkWorkload::TransactionMix(mix) = &workload { for (transaction_type, _) in mix { if matches!(transaction_type, CoinTransfer { non_conflicting, .. } if *non_conflicting) { @@ -199,39 +223,44 @@ pub fn run_benchmark( let mut overall_measuring = OverallMeasuring::start(); - let num_blocks_created = if let Some((transaction_generators, phase)) = transaction_generators { - generator.run_workload( - block_size, - num_blocks, - transaction_generators, - phase, - transactions_per_sender, - ) - } else { - generator.run_transfer( - block_size, - num_blocks, - transactions_per_sender, + let (num_blocks_created, workload_name) = match workload { + BenchmarkWorkload::TransactionMix(mix) => { + let (transaction_generators, phase) = transaction_generators.unwrap(); + let num_blocks_created = generator.run_workload( + block_size, + num_blocks, + transaction_generators, + phase, + transactions_per_sender, + ); + (num_blocks_created, format!("{:?} via txn generator", mix)) + }, + BenchmarkWorkload::Transfer { connected_tx_grps, shuffle_connected_txns, hotspot_probability, - ) + } => { + let num_blocks_created = generator.run_transfer( + block_size, + num_blocks, + transactions_per_sender, + connected_tx_grps, + shuffle_connected_txns, + hotspot_probability, + ); + (num_blocks_created, "raw transfer".to_string()) + }, }; - if pipeline_config.delay_execution_start { + if pipeline_config.generate_then_execute { overall_measuring.start_time = Instant::now(); } - pipeline.start_execution(); generator.drop_sender(); + info!("Done creating workload"); + pipeline.start_pipeline_processing(); + info!("Waiting for pipeline to finish"); pipeline.join(); - info!( - "Executed workload {}", - if let Some(mix) = transaction_mix { - format!("{:?} via txn generator", mix) - } else { - "raw transfer".to_string() - } - ); + info!("Executed workload {}", workload_name); if !pipeline_config.skip_commit { let num_txns = @@ -277,7 +306,7 @@ where block_sender, }; - create_txn_generator_creator( + let result = create_txn_generator_creator( &[transaction_mix], AlwaysApproveRootAccountHandle { root_account }, &mut main_signer_accounts, @@ -287,9 +316,14 @@ where &transaction_factory, phase_clone, ) - .await + .await; + + drop(db_gen_init_transaction_executor); + + result }); + info!("Waiting for init to finish"); pipeline.join(); (txn_generator_creator, phase) @@ -376,8 +410,8 @@ fn add_accounts_impl( init_account_balance, block_size, ); - pipeline.start_execution(); generator.drop_sender(); + pipeline.start_pipeline_processing(); pipeline.join(); let elapsed = start_time.elapsed().as_secs_f32(); @@ -389,12 +423,12 @@ fn add_accounts_impl( ); if verify_sequence_numbers { - println!("Verifying sequence numbers..."); + info!("Verifying sequence numbers..."); // Do a sanity check on the sequence number to make sure all transactions are committed. generator.verify_sequence_numbers(db.reader.clone()); } - println!( + info!( "Created {} new accounts. Now at version {}, total # of accounts {}.", num_new_accounts, now_version, @@ -523,14 +557,14 @@ static OTHER_LABELS: &[(&str, bool, &str)] = &[ struct ExecutionTimeMeasurement { output_size: f64, - partitioning_total: f64, - execution_total: f64, - vm_only: f64, + sig_verify_total_time: f64, + partitioning_total_time: f64, + execution_total_time: f64, + block_executor_total_time: f64, + block_executor_inner_total_time: f64, by_other: HashMap<&'static str, f64>, ledger_update_total: f64, - commit_total: f64, - - vm_time: f64, + commit_total_time: f64, } impl ExecutionTimeMeasurement { @@ -539,9 +573,11 @@ impl ExecutionTimeMeasurement { .with_label_values(&["execution"]) .get_sample_sum(); - let partitioning_total = BLOCK_PARTITIONING_SECONDS.get_sample_sum(); - let execution_total = EXECUTE_BLOCK.get_sample_sum(); - let vm_only = VM_EXECUTE_BLOCK.get_sample_sum(); + let sig_verify_total = TIMER.with_label_values(&["sig_verify"]).get_sample_sum(); + let partitioning_total = TIMER.with_label_values(&["partition"]).get_sample_sum(); + let execution_total = TIMER.with_label_values(&["execute"]).get_sample_sum(); + let block_executor_total = GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum(); + let block_executor_inner_total = BLOCK_EXECUTOR_INNER_EXECUTE_BLOCK.get_sample_sum(); let by_other = OTHER_LABELS .iter() @@ -557,17 +593,16 @@ impl ExecutionTimeMeasurement { let ledger_update_total = UPDATE_LEDGER.get_sample_sum(); let commit_total = COMMIT_BLOCKS.get_sample_sum(); - let vm_time = VM_EXECUTE_BLOCK.get_sample_sum(); - Self { output_size, - partitioning_total, - execution_total, - vm_only, + sig_verify_total_time: sig_verify_total, + partitioning_total_time: partitioning_total, + execution_total_time: execution_total, + block_executor_total_time: block_executor_total, + block_executor_inner_total_time: block_executor_inner_total, by_other, ledger_update_total, - commit_total, - vm_time, + commit_total_time: commit_total, } } @@ -576,17 +611,20 @@ impl ExecutionTimeMeasurement { Self { output_size: end.output_size - self.output_size, - partitioning_total: end.partitioning_total - self.partitioning_total, - execution_total: end.execution_total - self.execution_total, - vm_only: end.vm_only - self.vm_only, + sig_verify_total_time: end.sig_verify_total_time - self.sig_verify_total_time, + partitioning_total_time: end.partitioning_total_time - self.partitioning_total_time, + execution_total_time: end.execution_total_time - self.execution_total_time, + block_executor_total_time: end.block_executor_total_time + - self.block_executor_total_time, + block_executor_inner_total_time: end.block_executor_inner_total_time + - self.block_executor_inner_total_time, by_other: end .by_other .into_iter() .map(|(k, v)| (k, v - self.by_other.get(&k).unwrap())) .collect::>(), ledger_update_total: end.ledger_update_total - self.ledger_update_total, - commit_total: end.commit_total - self.commit_total, - vm_time: end.vm_time - self.vm_time, + commit_total_time: end.commit_total_time - self.commit_total_time, } } } @@ -620,13 +658,6 @@ impl OverallMeasuring { num_txns, elapsed ); - info!( - "{} VM execution TPS {} txn/s; ({} / {})", - prefix, - (num_txns / delta_execution.vm_time) as usize, - num_txns, - delta_execution.vm_time - ); info!("{} GPS: {} gas/s", prefix, delta_gas.gas / elapsed); info!( "{} effectiveGPS: {} gas/s ({} effective block gas, in {} s)", @@ -670,31 +701,42 @@ impl OverallMeasuring { ); info!( - "{} fraction of total: {:.3} in partitioning (component TPS: {})", + "{} fraction of total: {:.4} in signature verification (component TPS: {:.1})", prefix, - delta_execution.partitioning_total / elapsed, - num_txns / delta_execution.partitioning_total + delta_execution.sig_verify_total_time / elapsed, + num_txns / delta_execution.sig_verify_total_time ); - info!( - "{} fraction of total: {:.3} in execution (component TPS: {})", + "{} fraction of total: {:.4} in partitioning (component TPS: {:.1})", prefix, - delta_execution.execution_total / elapsed, - num_txns / delta_execution.execution_total + delta_execution.partitioning_total_time / elapsed, + num_txns / delta_execution.partitioning_total_time ); info!( - "{} fraction of execution {:.3} in VM (component TPS: {})", + "{} fraction of total: {:.4} in execution (component TPS: {:.1})", prefix, - delta_execution.vm_only / delta_execution.execution_total, - num_txns / delta_execution.vm_only + delta_execution.execution_total_time / elapsed, + num_txns / delta_execution.execution_total_time + ); + info!( + "{} fraction of execution {:.4} in get execution output by executing (component TPS: {:.1})", + prefix, + delta_execution.block_executor_total_time / delta_execution.execution_total_time, + num_txns / delta_execution.block_executor_total_time + ); + info!( + "{} fraction of execution {:.4} in inner block executor (component TPS: {:.1})", + prefix, + delta_execution.block_executor_inner_total_time / delta_execution.execution_total_time, + num_txns / delta_execution.block_executor_inner_total_time ); for (prefix, top_level, other_label) in OTHER_LABELS { let time_in_label = delta_execution.by_other.get(other_label).unwrap(); - if *top_level || time_in_label / delta_execution.execution_total > 0.01 { + if *top_level || time_in_label / delta_execution.execution_total_time > 0.01 { info!( - "{} fraction of execution {:.3} in {} {} (component TPS: {})", + "{} fraction of execution {:.4} in {} {} (component TPS: {:.1})", prefix, - time_in_label / delta_execution.execution_total, + time_in_label / delta_execution.execution_total_time, prefix, other_label, num_txns / time_in_label @@ -703,17 +745,17 @@ impl OverallMeasuring { } info!( - "{} fraction of total: {:.3} in ledger update (component TPS: {})", + "{} fraction of total: {:.4} in ledger update (component TPS: {:.1})", prefix, delta_execution.ledger_update_total / elapsed, num_txns / delta_execution.ledger_update_total ); info!( - "{} fraction of total: {:.4} in commit (component TPS: {})", + "{} fraction of total: {:.4} in commit (component TPS: {:.1})", prefix, - delta_execution.commit_total / elapsed, - num_txns / delta_execution.commit_total + delta_execution.commit_total_time / elapsed, + num_txns / delta_execution.commit_total_time ); } } @@ -726,13 +768,114 @@ fn log_total_supply(db_reader: &Arc) { #[cfg(test)] mod tests { - use crate::{native_executor::NativeExecutor, pipeline::PipelineConfig}; + use crate::{ + db_generator::bootstrap_with_genesis, init_db_and_executor, + native::native_config::NativeConfig, pipeline::PipelineConfig, + transaction_executor::BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + transaction_generator::TransactionGenerator, BenchmarkWorkload, + }; use aptos_config::config::NO_OP_STORAGE_PRUNER_CONFIG; - use aptos_executor::block_executor::TransactionBlockExecutor; + use aptos_crypto::HashValue; + use aptos_executor::block_executor::{AptosVMBlockExecutor, TransactionBlockExecutor}; + use aptos_executor_types::BlockExecutorTrait; + use aptos_sdk::{transaction_builder::aptos_stdlib, types::LocalAccount}; use aptos_temppath::TempPath; use aptos_transaction_generator_lib::{args::TransactionTypeArg, WorkflowProgress}; - use aptos_types::on_chain_config::Features; + use aptos_types::{ + on_chain_config::{FeatureFlag, Features}, + transaction::Transaction, + }; use aptos_vm::AptosVM; + use rand::thread_rng; + use std::fs; + + #[test] + fn test_compare_vm_and_native() { + aptos_logger::Logger::new().init(); + + let db_dir = TempPath::new(); + + fs::create_dir_all(db_dir.as_ref()).unwrap(); + + let mut init_features = Features::default(); + init_features.enable(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); + init_features.enable(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); + + bootstrap_with_genesis(&db_dir, false, init_features.clone()); + + let (mut config, genesis_key) = + aptos_genesis::test_utils::test_config_with_custom_features(init_features); + config.storage.dir = db_dir.as_ref().to_path_buf(); + config.storage.storage_pruner_config = NO_OP_STORAGE_PRUNER_CONFIG; + config.storage.rocksdb_configs.enable_storage_sharding = false; + + let _txn = { + let (vm_db, vm_executor) = init_db_and_executor::(&config); + let root_account = TransactionGenerator::read_root_account(genesis_key, &vm_db); + let dst = LocalAccount::generate(&mut thread_rng()); + let num_coins = 1000; + + let txn_factory = TransactionGenerator::create_transaction_factory(); + let txn = Transaction::UserTransaction(root_account.sign_with_transaction_builder( + txn_factory.payload(aptos_stdlib::aptos_account_fungible_transfer_only( + dst.address(), + num_coins, + )), + )); + vm_executor + .execute_and_state_checkpoint( + (HashValue::zero(), vec![txn.clone()]).into(), + vm_executor.committed_block_id(), + BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + + txn + }; + + // let (_native_db, native_executor) = init_db_and_executor::(&config); + // native_executor + // .execute_and_state_checkpoint( + // (HashValue::zero(), vec![txn]).into(), + // native_executor.committed_block_id(), + // BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + // ) + // .unwrap(); + + // let ( + // vm_txns, + // _vm_state_updates_vec, + // _vm_state_checkpoint_hashes, + // _vm_state_updates_before_last_checkpoint, + // _vm_sharded_state_cache, + // _vm_block_end_info, + // ) = vm_result.into_inner(); + // let (vm_statuses_for_input_txns, + // vm_to_commit, + // vm_to_discard, + // vm_to_retry, + // ) = vm_txns.into_inner(); + + // let ( + // native_txns, + // _native_state_updates_vec, + // _native_state_checkpoint_hashes, + // _native_state_updates_before_last_checkpoint, + // _native_sharded_state_cache, + // _native_block_end_info, + // ) = native_result.into_inner(); + // let (native_statuses_for_input_txns, + // native_to_commit, + // native_to_discard, + // native_to_retry, + // ) = native_txns.into_inner(); + + // println!("{:?}", vm_to_commit.parsed_outputs()); + // assert_eq!(vm_statuses_for_input_txns, native_statuses_for_input_txns); + // assert_eq!(vm_to_commit, native_to_commit); + // assert_eq!(vm_to_discard, native_to_discard); + // assert_eq!(vm_to_retry, native_to_retry); + } fn test_generic_benchmark( transaction_type: Option, @@ -747,7 +890,11 @@ mod tests { println!("db_generator::create_db_with_accounts"); - crate::db_generator::create_db_with_accounts::( + let mut features = Features::default(); + features.enable(FeatureFlag::NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE); + features.enable(FeatureFlag::OPERATIONS_DEFAULT_TO_FA_APT_STORE); + + crate::db_generator::create_db_with_accounts::( 100, /* num_accounts */ // TODO(Gas): double check if this is correct 100_000_000_000, /* init_account_balance */ @@ -757,7 +904,7 @@ mod tests { verify_sequence_numbers, false, PipelineConfig::default(), - Features::default(), + features.clone(), ); println!("run_benchmark"); @@ -765,27 +912,35 @@ mod tests { super::run_benchmark::( 10, /* block_size */ 30, /* num_blocks */ - transaction_type - .map(|t| vec![(t.materialize(1, true, WorkflowProgress::MoveByPhases), 1)]), - 2, /* transactions per sender */ - 0, /* connected txn groups in a block */ - false, /* shuffle the connected txns in a block */ - None, /* maybe_hotspot_probability */ - 25, /* num_main_signer_accounts */ - 30, /* num_dst_pool_accounts */ + transaction_type.map_or_else( + || BenchmarkWorkload::Transfer { + connected_tx_grps: 0, + shuffle_connected_txns: false, + hotspot_probability: None, + }, + |t| { + BenchmarkWorkload::TransactionMix(vec![( + t.materialize(1, true, WorkflowProgress::MoveByPhases), + 1, + )]) + }, + ), + 2, /* transactions per sender */ + 25, /* num_main_signer_accounts */ + 30, /* num_dst_pool_accounts */ storage_dir.as_ref(), checkpoint_dir, verify_sequence_numbers, NO_OP_STORAGE_PRUNER_CONFIG, false, PipelineConfig::default(), - Features::default(), + features, ); } #[test] fn test_benchmark_default() { - test_generic_benchmark::(None, true); + test_generic_benchmark::(None, true); } #[test] @@ -793,16 +948,16 @@ mod tests { AptosVM::set_num_shards_once(1); AptosVM::set_concurrency_level_once(4); AptosVM::set_processed_transactions_detailed_counters(); - NativeExecutor::set_concurrency_level_once(4); - test_generic_benchmark::( + NativeConfig::set_concurrency_level_once(1); + test_generic_benchmark::( Some(TransactionTypeArg::ModifyGlobalMilestoneAggV2), true, ); } - #[test] - fn test_native_benchmark() { - // correct execution not yet implemented, so cannot be checked for validity - test_generic_benchmark::(None, false); - } + // #[test] + // fn test_native_benchmark() { + // // correct execution not yet implemented, so cannot be checked for validity + // test_generic_benchmark::(None, false); + // } } diff --git a/execution/executor-benchmark/src/main.rs b/execution/executor-benchmark/src/main.rs index 9d3e23722e68b..c3ec6c5fbb5c5 100644 --- a/execution/executor-benchmark/src/main.rs +++ b/execution/executor-benchmark/src/main.rs @@ -13,8 +13,11 @@ use aptos_block_partitioner::{ use aptos_config::config::{ EpochSnapshotPrunerConfig, LedgerPrunerConfig, PrunerConfig, StateMerklePrunerConfig, }; -use aptos_executor::block_executor::TransactionBlockExecutor; -use aptos_executor_benchmark::{native_executor::NativeExecutor, pipeline::PipelineConfig}; +use aptos_executor::block_executor::{AptosVMBlockExecutor, TransactionBlockExecutor}; +use aptos_executor_benchmark::{ + native::native_config::NativeConfig, native_executor::NativeExecutor, pipeline::PipelineConfig, + BenchmarkWorkload, +}; use aptos_executor_service::remote_executor_client; use aptos_experimental_ptx_executor::PtxBlockExecutor; #[cfg(target_os = "linux")] @@ -26,7 +29,7 @@ use aptos_transaction_generator_lib::{args::TransactionTypeArg, WorkflowProgress use aptos_types::on_chain_config::{FeatureFlag, Features}; use aptos_vm::AptosVM; use aptos_vm_environment::prod_configs::set_paranoid_type_checks; -use clap::{ArgGroup, Parser, Subcommand}; +use clap::{Parser, Subcommand, ValueEnum}; use once_cell::sync::Lazy; use std::{ net::SocketAddr, @@ -98,20 +101,43 @@ impl PrunerOpt { #[derive(Debug, Parser)] pub struct PipelineOpt { + /// First generate all transactions for all blocks (and keep them in memory), + /// and only then start the pipeline. + /// Useful when not running large number of blocks (so it can fit in memory), + /// as generation of blocks takes not-insignificant amount of CPU. #[clap(long)] generate_then_execute: bool, + /// Run each stage separately, i.e. each stage wait for previous stage to finish + /// processing all blocks, before starting. + /// Allows to see individual throughput of each stage, avoiding resource contention. #[clap(long)] split_stages: bool, + /// Skip commit stage - i.e. create executed blocks in memory, but never commit them. + /// Useful when commit is the bottleneck, to see throughput of the rest of the pipeline. #[clap(long)] skip_commit: bool, + /// Whether transactions are allowed to abort. + /// By default, workload generates transactions that are all expected to succeeded, + /// so aborts are not allowed - to catch any correctness/configuration issues. #[clap(long)] allow_aborts: bool, + /// Whether transactions are allowed to be discarded. + /// By default, workload generates transactions that are all expected to succeeded, + /// so discards are not allowed - to catch any correctness/configuration issues. #[clap(long)] allow_discards: bool, + /// Whether transactions are allowed to be retried. + /// By default, workload generates transactions that are all expected to succeeded, + /// so retries are not allowed - to catch any correctness/configuration issues. #[clap(long)] allow_retries: bool, + /// Number of worker threads transaction generation will use. #[clap(long, default_value = "4")] num_generator_workers: usize, + /// Number of worker threads signature verification will use. + #[clap(long, default_value = "8")] + num_sig_verify_threads: usize, + /// Sharding configuration. #[clap(flatten)] sharding_opt: ShardingOpt, } @@ -119,16 +145,16 @@ pub struct PipelineOpt { impl PipelineOpt { fn pipeline_config(&self) -> PipelineConfig { PipelineConfig { - delay_execution_start: self.generate_then_execute, + generate_then_execute: self.generate_then_execute, split_stages: self.split_stages, skip_commit: self.skip_commit, allow_aborts: self.allow_aborts, allow_discards: self.allow_discards, allow_retries: self.allow_retries, num_executor_shards: self.sharding_opt.num_executor_shards, - use_global_executor: self.sharding_opt.use_global_executor, num_generator_workers: self.num_generator_workers, partitioner_config: self.sharding_opt.partitioner_config(), + num_sig_verify_threads: self.num_sig_verify_threads, } } } @@ -202,17 +228,18 @@ struct ProfilerOpt { memory_profiling: bool, } -#[derive(Parser, Debug)] -#[clap(group( - ArgGroup::new("vm_selection") - .args(&["use_native_executor", "use_ptx_executor"]), -))] -pub struct VmSelectionOpt { - #[clap(long)] - use_native_executor: bool, - - #[clap(long)] - use_ptx_executor: bool, +#[derive(Parser, Debug, ValueEnum, Clone, Default)] +enum BlockExecutorTypeOpt { + /// Transaction execution: AptosVM + /// Executing conflicts: in the input order, via BlockSTM, + /// State: BlockSTM-provided MVHashMap-based view with caching + #[default] + AptosVMWithBlockSTM, + /// Transaction execution: Native rust code producing WriteSet + /// Executing conflicts: All transactions execute on the state at the beginning of the block + /// State: Raw CachedStateView + NativeLooseSpeculative, + PtxExecutor, } #[derive(Parser, Debug)] @@ -256,8 +283,8 @@ struct Opt { #[clap(long)] verify_sequence_numbers: bool, - #[clap(flatten)] - vm_selection_opt: VmSelectionOpt, + #[clap(long, value_enum, ignore_case = true)] + block_executor_type: BlockExecutorTypeOpt, #[clap(flatten)] profiler_opt: ProfilerOpt, @@ -436,8 +463,12 @@ where // disable_feature, // ); - let transaction_mix = if transaction_type.is_empty() { - None + let workload = if transaction_type.is_empty() { + BenchmarkWorkload::Transfer { + connected_tx_grps: opt.connected_tx_grps, + shuffle_connected_txns: opt.shuffle_connected_txns, + hotspot_probability: opt.hotspot_probability, + } } else { let mix_per_phase = TransactionTypeArg::args_to_transaction_mix_per_phase( &transaction_type, @@ -448,7 +479,7 @@ where WorkflowProgress::MoveByPhases, ); assert!(mix_per_phase.len() == 1); - Some(mix_per_phase[0].clone()) + BenchmarkWorkload::TransactionMix(mix_per_phase[0].clone()) }; if let Some(hotspot_probability) = opt.hotspot_probability { @@ -462,11 +493,8 @@ where aptos_executor_benchmark::run_benchmark::( opt.block_size, blocks, - transaction_mix, + workload, opt.transactions_per_sender, - opt.connected_tx_grps, - opt.shuffle_connected_txns, - opt.hotspot_probability, main_signer_accounts, additional_dst_pool_accounts, data_dir, @@ -563,7 +591,7 @@ fn main() { } AptosVM::set_num_shards_once(execution_shards); AptosVM::set_concurrency_level_once(execution_threads_per_shard); - NativeExecutor::set_concurrency_level_once(execution_threads_per_shard); + NativeConfig::set_concurrency_level_once(execution_threads_per_shard); AptosVM::set_processed_transactions_detailed_counters(); let config = ProfilerConfig::new_with_defaults(); @@ -582,14 +610,20 @@ fn main() { let _mem_start = memory_profiler.start_profiling(); } - if opt.vm_selection_opt.use_native_executor { - run::(opt); - } else if opt.vm_selection_opt.use_ptx_executor { - #[cfg(target_os = "linux")] - ThreadManagerBuilder::set_thread_config_strategy(ThreadConfigStrategy::ThreadsPriority(48)); - run::(opt); - } else { - run::(opt); + match opt.block_executor_type { + BlockExecutorTypeOpt::AptosVMWithBlockSTM => { + run::(opt); + }, + BlockExecutorTypeOpt::NativeLooseSpeculative => { + run::(opt); + }, + BlockExecutorTypeOpt::PtxExecutor => { + #[cfg(target_os = "linux")] + ThreadManagerBuilder::set_thread_config_strategy( + ThreadConfigStrategy::ThreadsPriority(48), + ); + run::(opt); + }, } if cpu_profiling { diff --git a/execution/executor-benchmark/src/native/mod.rs b/execution/executor-benchmark/src/native/mod.rs new file mode 100644 index 0000000000000..affa99e856bfb --- /dev/null +++ b/execution/executor-benchmark/src/native/mod.rs @@ -0,0 +1,4 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod native_config; diff --git a/execution/executor-benchmark/src/native/native_config.rs b/execution/executor-benchmark/src/native/native_config.rs new file mode 100644 index 0000000000000..ce552b67ef5e4 --- /dev/null +++ b/execution/executor-benchmark/src/native/native_config.rs @@ -0,0 +1,34 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use once_cell::sync::{Lazy, OnceCell}; +use rayon::{ThreadPool, ThreadPoolBuilder}; +use std::sync::Arc; + +pub static NATIVE_EXECUTOR_CONCURRENCY_LEVEL: OnceCell = OnceCell::new(); +pub static NATIVE_EXECUTOR_POOL: Lazy> = Lazy::new(|| { + Arc::new( + ThreadPoolBuilder::new() + .num_threads(NativeConfig::get_concurrency_level()) + .thread_name(|index| format!("native_exe_{}", index)) + .build() + .unwrap(), + ) +}); + +pub struct NativeConfig; + +impl NativeConfig { + pub fn set_concurrency_level_once(concurrency_level: usize) { + NATIVE_EXECUTOR_CONCURRENCY_LEVEL + .set(concurrency_level) + .ok(); + } + + pub fn get_concurrency_level() -> usize { + match NATIVE_EXECUTOR_CONCURRENCY_LEVEL.get() { + Some(concurrency_level) => *concurrency_level, + None => 1, + } + } +} diff --git a/execution/executor-benchmark/src/native_executor.rs b/execution/executor-benchmark/src/native_executor.rs index 9fc4827b20751..b289cccaf12ea 100644 --- a/execution/executor-benchmark/src/native_executor.rs +++ b/execution/executor-benchmark/src/native_executor.rs @@ -76,7 +76,7 @@ impl IncrementalOutput { } } -pub struct NativeExecutor {} +pub struct NativeExecutor; static NATIVE_EXECUTOR_CONCURRENCY_LEVEL: OnceCell = OnceCell::new(); static NATIVE_EXECUTOR_POOL: Lazy = Lazy::new(|| { @@ -449,7 +449,12 @@ impl VMExecutor for NativeExecutor { } impl TransactionBlockExecutor for NativeExecutor { + fn new() -> Self { + Self + } + fn execute_transaction_block( + &self, transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, diff --git a/execution/executor-benchmark/src/pipeline.rs b/execution/executor-benchmark/src/pipeline.rs index c61fd79dfecbf..f08e7f357ead8 100644 --- a/execution/executor-benchmark/src/pipeline.rs +++ b/execution/executor-benchmark/src/pipeline.rs @@ -30,7 +30,7 @@ use std::{ #[derive(Debug, Derivative)] #[derivative(Default)] pub struct PipelineConfig { - pub delay_execution_start: bool, + pub generate_then_execute: bool, pub split_stages: bool, pub skip_commit: bool, pub allow_aborts: bool, @@ -38,16 +38,16 @@ pub struct PipelineConfig { pub allow_retries: bool, #[derivative(Default(value = "0"))] pub num_executor_shards: usize, - pub use_global_executor: bool, #[derivative(Default(value = "4"))] pub num_generator_workers: usize, pub partitioner_config: PartitionerV2Config, + pub num_sig_verify_threads: usize, } pub struct Pipeline { join_handles: Vec>, phantom: PhantomData, - start_execution_tx: Option>, + start_pipeline_tx: Option>, } impl Pipeline @@ -67,15 +67,21 @@ where let executor_3 = executor_1.clone(); let (raw_block_sender, raw_block_receiver) = mpsc::sync_channel::>( - if config.delay_execution_start { + if config.generate_then_execute { (num_blocks.unwrap() + 1).max(50) } else { 10 }, /* bound */ ); - // Assume the distributed executor and the distributed partitioner share the same worker set. - let num_partitioner_shards = config.num_executor_shards; + let (executable_block_sender, executable_block_receiver) = + mpsc::sync_channel::( + if config.split_stages { + (num_blocks.unwrap() + 1).max(50) + } else { + 10 + }, /* bound */ + ); let (ledger_update_sender, ledger_update_receiver) = mpsc::sync_channel::( @@ -94,24 +100,22 @@ where }, /* bound */ ); - let (start_execution_tx, start_execution_rx) = if config.delay_execution_start { - let (start_execution_tx, start_execution_rx) = mpsc::sync_channel::<()>(1); - (Some(start_execution_tx), Some(start_execution_rx)) - } else { - (None, None) - }; - - let (start_commit_tx, start_commit_rx) = if config.split_stages { - let (start_commit_tx, start_commit_rx) = mpsc::sync_channel::<()>(1); - (Some(start_commit_tx), Some(start_commit_rx)) - } else { - (None, None) - }; + let (start_pipeline_tx, start_pipeline_rx) = + create_start_tx_rx(config.generate_then_execute); + let (start_execution_tx, start_execution_rx) = create_start_tx_rx(config.split_stages); + let (start_ledger_update_tx, start_ledger_update_rx) = + create_start_tx_rx(config.split_stages); + let (start_commit_tx, start_commit_rx) = create_start_tx_rx(config.split_stages); let mut join_handles = vec![]; - let mut partitioning_stage = - BlockPreparationStage::new(num_partitioner_shards, &config.partitioner_config); + // signature verification and partitioning + let mut preparation_stage = BlockPreparationStage::new( + config.num_sig_verify_threads, + // Assume the distributed executor and the distributed partitioner share the same worker set. + config.num_executor_shards, + &config.partitioner_config, + ); let mut exe = TransactionExecutor::new(executor_1, parent_block_id, ledger_update_sender); @@ -128,22 +132,19 @@ where config.allow_retries, ); - let (executable_block_sender, executable_block_receiver) = - mpsc::sync_channel::(3); - - let partitioning_thread = std::thread::Builder::new() - .name("block_partitioning".to_string()) + let preparation_thread = std::thread::Builder::new() + .name("block_preparation".to_string()) .spawn(move || { + start_pipeline_rx.map(|rx| rx.recv()); while let Ok(txns) = raw_block_receiver.recv() { - NUM_TXNS - .with_label_values(&["partition"]) - .inc_by(txns.len() as u64); - let exe_block_msg = partitioning_stage.process(txns); + let exe_block_msg = preparation_stage.process(txns); executable_block_sender.send(exe_block_msg).unwrap(); } + info!("Done preparation"); + start_execution_tx.map(|tx| tx.send(())); }) .expect("Failed to spawn block partitioner thread."); - join_handles.push(partitioning_thread); + join_handles.push(preparation_thread); let exe_thread = std::thread::Builder::new() .name("txn_executor".to_string()) @@ -199,7 +200,7 @@ where if num_blocks.is_some() { overall_measuring.print_end("Overall execution", executed); } - start_commit_tx.map(|tx| tx.send(())); + start_ledger_update_tx.map(|tx| tx.send(())); }) .expect("Failed to spawn transaction executor thread."); join_handles.push(exe_thread); @@ -207,12 +208,15 @@ where let ledger_update_thread = std::thread::Builder::new() .name("ledger_update".to_string()) .spawn(move || { + start_ledger_update_rx.map(|rx| rx.recv()); + while let Ok(ledger_update_msg) = ledger_update_receiver.recv() { NUM_TXNS .with_label_values(&["ledger_update"]) .inc_by(ledger_update_msg.num_input_txns as u64); ledger_update_stage.ledger_update(ledger_update_msg); } + start_commit_tx.map(|tx| tx.send(())); }) .expect("Failed to spawn ledger update thread."); join_handles.push(ledger_update_thread); @@ -235,14 +239,14 @@ where Self { join_handles, phantom: PhantomData, - start_execution_tx, + start_pipeline_tx, }, raw_block_sender, ) } - pub fn start_execution(&self) { - self.start_execution_tx.as_ref().map(|tx| tx.send(())); + pub fn start_pipeline_processing(&self) { + self.start_pipeline_tx.as_ref().map(|tx| tx.send(())); } pub fn join(self) { @@ -252,6 +256,16 @@ where } } +fn create_start_tx_rx(should_wait: bool) -> (Option>, Option>) { + let (start_tx, start_rx) = if should_wait { + let (start_tx, start_rx) = mpsc::sync_channel::<()>(1); + (Some(start_tx), Some(start_rx)) + } else { + (None, None) + }; + (start_tx, start_rx) +} + /// Message from partitioning stage to execution stage. pub struct ExecuteBlockMessage { pub current_block_start_time: Instant, diff --git a/execution/executor-benchmark/src/transaction_committer.rs b/execution/executor-benchmark/src/transaction_committer.rs index a725bbe3ed15b..b235b8fcc7209 100644 --- a/execution/executor-benchmark/src/transaction_committer.rs +++ b/execution/executor-benchmark/src/transaction_committer.rs @@ -7,7 +7,9 @@ use aptos_crypto::hash::HashValue; use aptos_db::metrics::API_LATENCY_SECONDS; use aptos_executor::{ block_executor::{BlockExecutor, TransactionBlockExecutor}, - metrics::{COMMIT_BLOCKS, EXECUTE_BLOCK, VM_EXECUTE_BLOCK}, + metrics::{ + BLOCK_EXECUTION_WORKFLOW_WHOLE, COMMIT_BLOCKS, GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING, + }, }; use aptos_executor_types::BlockExecutorTrait; use aptos_logger::prelude::*; @@ -132,18 +134,18 @@ fn report_block( total_versions / first_block_start_time.elapsed().as_secs_f64(), ); info!( - "Accumulative total: VM time: {:.0} secs, executor time: {:.0} secs, commit time: {:.0} secs, DB commit time: {:.0} secs", - VM_EXECUTE_BLOCK.get_sample_sum(), - EXECUTE_BLOCK.get_sample_sum() - VM_EXECUTE_BLOCK.get_sample_sum(), + "Accumulative total: BlockSTM+VM time: {:.0} secs, executor time: {:.0} secs, commit time: {:.0} secs, DB commit time: {:.0} secs", + GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum(), + BLOCK_EXECUTION_WORKFLOW_WHOLE.get_sample_sum() - GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum(), COMMIT_BLOCKS.get_sample_sum(), API_LATENCY_SECONDS.get_metric_with_label_values(&["save_transactions", "Ok"]).expect("must exist.").get_sample_sum(), ); const NANOS_PER_SEC: f64 = 1_000_000_000.0; info!( - "Accumulative per transaction: VM time: {:.0} ns, executor time: {:.0} ns, commit time: {:.0} ns, DB commit time: {:.0} ns", - VM_EXECUTE_BLOCK.get_sample_sum() * NANOS_PER_SEC + "Accumulative per transaction: BlockSTM+VM time: {:.0} ns, executor time: {:.0} ns, commit time: {:.0} ns, DB commit time: {:.0} ns", + GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum() * NANOS_PER_SEC / total_versions, - (EXECUTE_BLOCK.get_sample_sum() - VM_EXECUTE_BLOCK.get_sample_sum()) * NANOS_PER_SEC + (BLOCK_EXECUTION_WORKFLOW_WHOLE.get_sample_sum() - GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.get_sample_sum()) * NANOS_PER_SEC / total_versions, COMMIT_BLOCKS.get_sample_sum() * NANOS_PER_SEC / total_versions, diff --git a/execution/executor-benchmark/src/transaction_executor.rs b/execution/executor-benchmark/src/transaction_executor.rs index 2d4079a3db848..328b5fb3794b5 100644 --- a/execution/executor-benchmark/src/transaction_executor.rs +++ b/execution/executor-benchmark/src/transaction_executor.rs @@ -2,7 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::pipeline::LedgerUpdateMessage; +use crate::{metrics::TIMER, pipeline::LedgerUpdateMessage}; use aptos_crypto::hash::HashValue; use aptos_executor::block_executor::{BlockExecutor, TransactionBlockExecutor}; use aptos_executor_types::BlockExecutorTrait; @@ -60,14 +60,16 @@ where self.num_blocks_processed, block_id ); let num_input_txns = executable_block.transactions.num_transactions(); - self.executor - .execute_and_state_checkpoint( - executable_block, - self.parent_block_id, - BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, - ) - .unwrap(); - + { + let _timer = TIMER.with_label_values(&["execute"]).start_timer(); + self.executor + .execute_and_state_checkpoint( + executable_block, + self.parent_block_id, + BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + } let msg = LedgerUpdateMessage { current_block_start_time, first_block_start_time: *self.maybe_first_block_start_time.as_ref().unwrap(), diff --git a/execution/executor-test-helpers/src/integration_test_impl.rs b/execution/executor-test-helpers/src/integration_test_impl.rs index 885c8be4e8a11..cd0dea1ba46aa 100644 --- a/execution/executor-test-helpers/src/integration_test_impl.rs +++ b/execution/executor-test-helpers/src/integration_test_impl.rs @@ -8,7 +8,7 @@ use aptos_cached_packages::aptos_stdlib; use aptos_config::config::DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD; use aptos_consensus_types::block::Block; use aptos_db::AptosDB; -use aptos_executor::block_executor::BlockExecutor; +use aptos_executor::block_executor::{AptosVMBlockExecutor, BlockExecutor}; use aptos_executor_types::BlockExecutorTrait; use aptos_sdk::{ move_types::account_address::AccountAddress, @@ -382,7 +382,7 @@ pub fn create_db_and_executor>( ) -> ( Arc, DbReaderWriter, - BlockExecutor, + BlockExecutor, Waypoint, ) { let (db, dbrw) = force_sharding diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 2ef3b126ce34f..9d954737e6dbc 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -7,8 +7,9 @@ use crate::{ logging::{LogEntry, LogSchema}, metrics::{ - COMMIT_BLOCKS, CONCURRENCY_GAUGE, EXECUTE_BLOCK, OTHER_TIMERS, SAVE_TRANSACTIONS, - TRANSACTIONS_SAVED, UPDATE_LEDGER, VM_EXECUTE_BLOCK, + BLOCK_EXECUTION_WORKFLOW_WHOLE, COMMIT_BLOCKS, CONCURRENCY_GAUGE, + GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING, OTHER_TIMERS, SAVE_TRANSACTIONS, + TRANSACTIONS_SAVED, UPDATE_LEDGER, }, types::partial_state_compute_result::PartialStateComputeResult, workflow::{ @@ -40,12 +41,15 @@ use aptos_types::{ use aptos_vm::AptosVM; use block_tree::BlockTree; use fail::fail_point; -use std::{marker::PhantomData, sync::Arc}; +use std::sync::Arc; pub mod block_tree; pub trait TransactionBlockExecutor: Send + Sync { + fn new() -> Self; + fn execute_transaction_block( + &self, transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, @@ -53,8 +57,20 @@ pub trait TransactionBlockExecutor: Send + Sync { ) -> Result; } -impl TransactionBlockExecutor for AptosVM { +/// Production implementation of TransactionBlockExecutor. +/// +/// Transaction execution: AptosVM +/// Executing conflicts: in the input order, via BlockSTM, +/// State: BlockSTM-provided MVHashMap-based view with caching +pub struct AptosVMBlockExecutor; + +impl TransactionBlockExecutor for AptosVMBlockExecutor { + fn new() -> Self { + Self + } + fn execute_transaction_block( + &self, transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, @@ -176,7 +192,7 @@ where struct BlockExecutorInner { db: DbReaderWriter, block_tree: BlockTree, - phantom: PhantomData, + block_executor: V, } impl BlockExecutorInner @@ -188,7 +204,7 @@ where Ok(Self { db, block_tree, - phantom: PhantomData, + block_executor: V::new(), }) } } @@ -207,7 +223,7 @@ where parent_block_id: HashValue, onchain_config: BlockExecutorConfigFromOnchain, ) -> ExecutorResult<()> { - let _timer = EXECUTE_BLOCK.start_timer(); + let _timer = BLOCK_EXECUTION_WORKFLOW_WHOLE.start_timer(); let ExecutableBlock { block_id, transactions, @@ -253,13 +269,13 @@ where }; let execution_output = { - let _timer = VM_EXECUTE_BLOCK.start_timer(); - fail_point!("executor::vm_execute_block", |_| { + let _timer = GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING.start_timer(); + fail_point!("executor::block_executor_execute_block", |_| { Err(ExecutorError::from(anyhow::anyhow!( - "Injected error in vm_execute_block" + "Injected error in block_executor_execute_block" ))) }); - V::execute_transaction_block( + self.block_executor.execute_transaction_block( transactions, state_view, onchain_config.clone(), diff --git a/execution/executor/src/fuzzing.rs b/execution/executor/src/fuzzing.rs index 0430a7706d442..46554ada0a01c 100644 --- a/execution/executor/src/fuzzing.rs +++ b/execution/executor/src/fuzzing.rs @@ -68,7 +68,12 @@ pub fn fuzz_execute_and_commit_blocks( pub struct FakeVM; impl TransactionBlockExecutor for FakeVM { + fn new() -> Self { + Self + } + fn execute_transaction_block( + &self, transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, diff --git a/execution/executor/src/metrics.rs b/execution/executor/src/metrics.rs index e6c5ee41300af..c47a31967bc8c 100644 --- a/execution/executor/src/metrics.rs +++ b/execution/executor/src/metrics.rs @@ -53,12 +53,12 @@ pub static COMMIT_CHUNK: Lazy = Lazy::new(|| { .unwrap() }); -pub static VM_EXECUTE_BLOCK: Lazy = Lazy::new(|| { +pub static GET_BLOCK_EXECUTION_OUTPUT_BY_EXECUTING: Lazy = Lazy::new(|| { register_histogram!( // metric name - "aptos_executor_vm_execute_block_seconds", + "aptos_executor_get_block_execution_output_by_executing_seconds", // metric description - "The time spent in seconds of vm block execution in Aptos executor", + "The total time spent in seconds in executing execute_and_state_checkpoint in the BlockExecutorInner.", exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), ) .unwrap() @@ -80,12 +80,12 @@ pub static EXECUTOR_ERRORS: Lazy = Lazy::new(|| { register_int_counter!("aptos_executor_error_total", "Cumulative number of errors").unwrap() }); -pub static EXECUTE_BLOCK: Lazy = Lazy::new(|| { +pub static BLOCK_EXECUTION_WORKFLOW_WHOLE: Lazy = Lazy::new(|| { register_histogram!( // metric name - "aptos_executor_execute_block_seconds", + "aptos_executor_block_execution_workflow_whole_seconds", // metric description - "The total time spent in seconds of block execution in the block executor.", + "The total time spent in seconds in executing execute_and_state_checkpoint in the BlockExecutorInner.", exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), ) .unwrap() diff --git a/execution/executor/src/tests/mock_vm/mod.rs b/execution/executor/src/tests/mock_vm/mod.rs index 4bfc0a525c99d..cc157a2b46e9a 100644 --- a/execution/executor/src/tests/mock_vm/mod.rs +++ b/execution/executor/src/tests/mock_vm/mod.rs @@ -66,7 +66,12 @@ pub static DISCARD_STATUS: Lazy = pub struct MockVM; impl TransactionBlockExecutor for MockVM { + fn new() -> Self { + Self + } + fn execute_transaction_block( + &self, transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, diff --git a/execution/executor/tests/db_bootstrapper_test.rs b/execution/executor/tests/db_bootstrapper_test.rs index dd6dc01e537ff..1d98fa479d7a7 100644 --- a/execution/executor/tests/db_bootstrapper_test.rs +++ b/execution/executor/tests/db_bootstrapper_test.rs @@ -8,7 +8,7 @@ use aptos_cached_packages::aptos_stdlib; use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue, PrivateKey, Uniform}; use aptos_db::AptosDB; use aptos_executor::{ - block_executor::BlockExecutor, + block_executor::{AptosVMBlockExecutor, BlockExecutor}, db_bootstrapper::{generate_waypoint, maybe_bootstrap}, }; use aptos_executor_test_helpers::{ @@ -82,7 +82,7 @@ fn execute_and_commit(txns: Vec, db: &DbReaderWriter, signer: &Vali let version = li.ledger_info().version(); let epoch = li.ledger_info().next_block_epoch(); let target_version = version + txns.len() as u64 + 1; // Due to StateCheckpoint txn - let executor = BlockExecutor::::new(db.clone()); + let executor = BlockExecutor::::new(db.clone()); let output = executor .execute_block( (block_id, block(txns)).into(), diff --git a/experimental/execution/ptx-executor/src/lib.rs b/experimental/execution/ptx-executor/src/lib.rs index 84172a4f86f08..92b9d3c0f95e5 100644 --- a/experimental/execution/ptx-executor/src/lib.rs +++ b/experimental/execution/ptx-executor/src/lib.rs @@ -118,7 +118,12 @@ impl VMExecutor for PtxBlockExecutor { } impl TransactionBlockExecutor for PtxBlockExecutor { + fn new() -> Self { + Self + } + fn execute_transaction_block( + &self, transactions: ExecutableTransactions, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, diff --git a/testsuite/single_node_performance.py b/testsuite/single_node_performance.py index cd98a98953ca9..e17d8b9cef11b 100755 --- a/testsuite/single_node_performance.py +++ b/testsuite/single_node_performance.py @@ -8,7 +8,7 @@ import tempfile import json import itertools -from typing import Callable, Optional, Tuple, Mapping, Sequence, Any +from typing import Callable, Optional, Tuple, Mapping, Sequence, Any, List from tabulate import tabulate from subprocess import Popen, PIPE, CalledProcessError from dataclasses import dataclass, field @@ -53,7 +53,7 @@ class Flow(Flag): DEFAULT_MAX_BLOCK_SIZE = "10000" MAX_BLOCK_SIZE = int(os.environ.get("MAX_BLOCK_SIZE", default=DEFAULT_MAX_BLOCK_SIZE)) -NUM_BLOCKS = int(os.environ.get("NUM_BLOCKS_PER_TEST", default=15)) +NUM_BLOCKS = int(os.environ.get("NUM_BLOCKS_PER_TEST", default=30)) NUM_BLOCKS_DETAILED = 10 NUM_ACCOUNTS = max( [ @@ -73,9 +73,9 @@ class Flow(Flag): SKIP_WARNS = IS_MAINNET SKIP_PERF_IMPROVEMENT_NOTICE = IS_MAINNET -# bump after a perf improvement, so you can easily distinguish runs +# bump after a bigger test or perf change, so you can easily distinguish runs # that are on top of this commit -CODE_PERF_VERSION = "v6" +CODE_PERF_VERSION = "v8" # default to using production number of execution threads for assertions NUMBER_OF_EXECUTION_THREADS = int( @@ -87,12 +87,12 @@ class Flow(Flag): else: EXECUTION_ONLY_NUMBER_OF_THREADS = [] -if os.environ.get("RELEASE_BUILD"): - BUILD_FLAG = "--release" - BUILD_FOLDER = "target/release" -else: +if os.environ.get("PERFORMANCE_BUILD"): BUILD_FLAG = "--profile performance" BUILD_FOLDER = "target/performance" +else: + BUILD_FLAG = "--release" + BUILD_FOLDER = "target/release" if os.environ.get("PROD_DB_FLAGS"): DB_CONFIG_FLAGS = "" @@ -101,10 +101,8 @@ class Flow(Flag): if os.environ.get("DISABLE_FA_APT"): FEATURE_FLAGS = "" - SKIP_NATIVE = False else: FEATURE_FLAGS = "--enable-feature NEW_ACCOUNTS_DEFAULT_TO_FA_APT_STORE --enable-feature OPERATIONS_DEFAULT_TO_FA_APT_STORE" - SKIP_NATIVE = True if os.environ.get("ENABLE_PRUNER"): DB_PRUNER_FLAGS = "--enable-state-pruner --enable-ledger-pruner --enable-epoch-snapshot-pruner --ledger-pruning-batch-size 10000 --state-prune-window 3000000 --epoch-snapshot-prune-window 3000000 --ledger-prune-window 3000000" @@ -127,6 +125,10 @@ class RunGroupKeyExtra: transaction_type_override: Optional[str] = field(default=None) transaction_weights_override: Optional[str] = field(default=None) sharding_traffic_flags: Optional[str] = field(default=None) + sig_verify_num_threads_override: Optional[int] = field(default=None) + execution_num_threads_override: Optional[int] = field(default=None) + split_stages_override: bool = field(default=False) + single_block_dst_working_set: bool = field(default=False) @dataclass @@ -159,46 +161,47 @@ class RunGroupConfig: # transaction_type module_working_set_size executor_type count min_ratio max_ratio median CALIBRATION = """ -no-op 1 VM 34 0.841 1.086 42046.2 -no-op 1000 VM 33 0.857 1.026 23125.1 -apt-fa-transfer 1 VM 34 0.843 1.057 29851.6 -account-generation 1 VM 34 0.843 1.046 24134.9 -account-resource32-b 1 VM 34 0.803 1.089 37283.8 -modify-global-resource 1 VM 34 0.841 1.017 2854.7 -modify-global-resource 100 VM 34 0.844 1.035 36514.1 -publish-package 1 VM 34 0.915 1.049 143.4 -mix_publish_transfer 1 VM 34 0.912 1.131 2149.7 -batch100-transfer 1 VM 33 0.823 1.037 754.2 -vector-picture30k 1 VM 33 0.892 1.018 112.4 -vector-picture30k 100 VM 34 0.706 1.03 2050.1 -smart-table-picture30-k-with200-change 1 VM 34 0.959 1.057 21.5 -smart-table-picture30-k-with200-change 100 VM 34 0.9 1.021 412.2 -modify-global-resource-agg-v2 1 VM 34 0.729 1.076 39288.2 -modify-global-flag-agg-v2 1 VM 34 0.948 1.016 5598.2 -modify-global-bounded-agg-v2 1 VM 34 0.881 1.06 9968.4 -modify-global-milestone-agg-v2 1 VM 34 0.831 1.029 29575.5 -resource-groups-global-write-tag1-kb 1 VM 34 0.933 1.051 9285.8 -resource-groups-global-write-and-read-tag1-kb 1 VM 34 0.9 1.016 6353 -resource-groups-sender-write-tag1-kb 1 VM 34 0.845 1.163 20568.6 -resource-groups-sender-multi-change1-kb 1 VM 34 0.888 1.116 17029.7 -token-v1ft-mint-and-transfer 1 VM 34 0.853 1.029 1455.7 -token-v1ft-mint-and-transfer 100 VM 34 0.801 1.021 20418.7 -token-v1nft-mint-and-transfer-sequential 1 VM 34 0.881 1.023 884.4 -token-v1nft-mint-and-transfer-sequential 100 VM 34 0.85 1.021 14733.1 -coin-init-and-mint 1 VM 35 0.839 1.056 31116.2 -coin-init-and-mint 100 VM 35 0.788 1.04 25367 -fungible-asset-mint 1 VM 35 0.861 1.043 27493.2 -fungible-asset-mint 100 VM 35 0.865 1.033 22113.3 -no-op5-signers 1 VM 34 0.825 1.104 41817.6 -token-v2-ambassador-mint 1 VM 35 0.864 1.026 18187.6 -token-v2-ambassador-mint 100 VM 35 0.894 1.033 16597.8 -liquidity-pool-swap 1 VM 34 0.894 1.026 965 -liquidity-pool-swap 100 VM 35 0.893 1.026 11439.4 -liquidity-pool-swap-stable 1 VM 36 0.897 1.018 945.1 -liquidity-pool-swap-stable 100 VM 36 0.824 1.031 11196.4 -deserialize-u256 1 VM 36 0.881 1.06 41062.1 -no-op-fee-payer 1 VM 36 0.863 1.031 2141.3 -no-op-fee-payer 100 VM 36 0.898 1.02 28717.2 +no-op 1 VM 5 0.914 1.024 40987.0 +no-op 1000 VM 5 0.880 1.008 20606.2 +apt-fa-transfer 1 VM 5 0.885 1.024 27345.0 +account-generation 1 VM 5 0.956 1.035 21446.3 +account-resource32-b 1 VM 5 0.917 1.055 35479.7 +modify-global-resource 1 VM 5 0.891 1.006 2396.8 +modify-global-resource 100 VM 5 0.888 1.010 33129.7 +publish-package 1 VM 5 0.988 1.026 127.6 +mix_publish_transfer 1 VM 5 0.802 1.068 3274.2 +batch100-transfer 1 VM 5 0.835 1.011 669.0 +vector-picture30k 1 VM 5 0.977 1.002 100.2 +vector-picture30k 100 VM 5 0.707 1.024 1818.0 +smart-table-picture30-k-with200-change 1 VM 5 0.969 1.009 16.0 +smart-table-picture30-k-with200-change 100 VM 5 0.950 1.044 246.9 +modify-global-resource-agg-v2 1 VM 5 0.870 1.033 37992.0 +modify-global-flag-agg-v2 1 VM 5 0.948 1.010 4607.8 +modify-global-bounded-agg-v2 1 VM 5 0.903 1.120 8233.1 +modify-global-milestone-agg-v2 1 VM 5 0.890 1.020 26334.6 +resource-groups-global-write-tag1-kb 1 VM 5 0.929 1.019 9515.8 +resource-groups-global-write-and-read-tag1-kb 1 VM 5 0.908 1.016 5791.6 +resource-groups-sender-write-tag1-kb 1 VM 5 0.966 1.105 19103.0 +resource-groups-sender-multi-change1-kb 1 VM 5 0.860 1.037 16597.8 +token-v1ft-mint-and-transfer 1 VM 5 0.840 1.004 1227.8 +token-v1ft-mint-and-transfer 100 VM 5 0.860 1.003 17399.7 +token-v1nft-mint-and-transfer-sequential 1 VM 5 0.874 1.011 763.7 +token-v1nft-mint-and-transfer-sequential 100 VM 5 0.860 1.006 12305.8 +coin-init-and-mint 1 VM 5 0.866 1.004 30075.3 +coin-init-and-mint 100 VM 5 0.880 1.024 22797.1 +fungible-asset-mint 1 VM 5 0.879 1.013 25097.7 +fungible-asset-mint 100 VM 5 0.868 1.016 19174.6 +no-op5-signers 1 VM 5 0.875 1.017 41438.6 +token-v2-ambassador-mint 1 VM 5 0.875 1.012 16335.1 +token-v2-ambassador-mint 100 VM 5 0.880 1.012 14124.9 +liquidity-pool-swap 1 VM 5 0.843 1.005 849.9 +liquidity-pool-swap 100 VM 5 0.906 1.006 9163.3 +liquidity-pool-swap-stable 1 VM 5 0.838 1.007 817.1 +liquidity-pool-swap-stable 100 VM 5 0.876 1.003 9024.6 +deserialize-u256 1 VM 5 0.844 1.010 39288.2 +no-op-fee-payer 1 VM 5 0.877 1.010 1657.7 +no-op-fee-payer 100 VM 5 0.857 1.017 23963.8 +simple-script 1 VM 5 0.863 1.072 35274.8 """ # when adding a new test, add estimated expected_tps to it, as well as waived=True. @@ -210,19 +213,20 @@ class RunGroupConfig: RunGroupConfig(key=RunGroupKey("no-op"), included_in=LAND_BLOCKING_AND_C), RunGroupConfig(key=RunGroupKey("no-op", module_working_set_size=1000), included_in=LAND_BLOCKING_AND_C), RunGroupConfig(key=RunGroupKey("apt-fa-transfer"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), - RunGroupConfig(key=RunGroupKey("apt-fa-transfer", executor_type="native"), included_in=LAND_BLOCKING_AND_C), + # RunGroupConfig(key=RunGroupKey("apt-fa-transfer", executor_type="NativeSpeculative"), included_in=Flow.CONTINUOUS), + RunGroupConfig(key=RunGroupKey("account-generation"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), - RunGroupConfig(key=RunGroupKey("account-generation", executor_type="native"), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("account-generation", executor_type="NativeSpeculative"), included_in=Flow.CONTINUOUS), RunGroupConfig(key=RunGroupKey("account-resource32-b"), included_in=Flow.CONTINUOUS), RunGroupConfig(key=RunGroupKey("modify-global-resource"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE), RunGroupConfig(key=RunGroupKey("modify-global-resource", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), RunGroupConfig(key=RunGroupKey("publish-package"), included_in=LAND_BLOCKING_AND_C | Flow.REPRESENTATIVE | Flow.MAINNET), RunGroupConfig(key=RunGroupKey("mix_publish_transfer"), key_extra=RunGroupKeyExtra( transaction_type_override="publish-package apt-fa-transfer", - transaction_weights_override="1 500", - ), included_in=LAND_BLOCKING_AND_C), + transaction_weights_override="1 100", + ), included_in=LAND_BLOCKING_AND_C, waived=True), RunGroupConfig(key=RunGroupKey("batch100-transfer"), included_in=LAND_BLOCKING_AND_C), - RunGroupConfig(key=RunGroupKey("batch100-transfer", executor_type="native"), included_in=Flow.CONTINUOUS), + # RunGroupConfig(key=RunGroupKey("batch100-transfer", executor_type="NativeSpeculative"), included_in=Flow.CONTINUOUS), RunGroupConfig(expected_tps=100, key=RunGroupKey("vector-picture40"), included_in=Flow(0), waived=True), RunGroupConfig(expected_tps=1000, key=RunGroupKey("vector-picture40", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow(0), waived=True), @@ -282,6 +286,7 @@ class RunGroupConfig: # fee payer sequentializes transactions today. in these tests module publisher is the fee payer, so larger number of modules tests throughput with multiple fee payers RunGroupConfig(key=RunGroupKey("no-op-fee-payer"), included_in=LAND_BLOCKING_AND_C), RunGroupConfig(key=RunGroupKey("no-op-fee-payer", module_working_set_size=DEFAULT_MODULE_WORKING_SET_SIZE), included_in=Flow.CONTINUOUS), + RunGroupConfig(key=RunGroupKey("simple-script"), included_in=LAND_BLOCKING_AND_C, waived=True), RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.REPRESENTATIVE, waived=True), RunGroupConfig(expected_tps=50000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.REPRESENTATIVE, waived=True), @@ -294,7 +299,6 @@ class RunGroupConfig: # RunGroupConfig(expected_tps=17000 if NUM_ACCOUNTS < 5000000 else 28000, key=RunGroupKey("coin_transfer_connected_components", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--connected-tx-grps 5000", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), # RunGroupConfig(expected_tps=27000 if NUM_ACCOUNTS < 5000000 else 23000, key=RunGroupKey("coin_transfer_hotspot", executor_type="sharded"), key_extra=RunGroupKeyExtra(sharding_traffic_flags="--hotspot-probability 0.8", transaction_type_override=""), included_in=Flow.MAINNET | Flow.MAINNET_LARGE_DB, waived=True), - RunGroupConfig(expected_tps=1000, key=RunGroupKey("simple-script"), included_in=LAND_BLOCKING_AND_C, waived=True), ] # fmt: on @@ -354,8 +358,11 @@ class RunResults: gpt: float storage_fee_pt: float output_bps: float + fraction_in_sig_verify: float fraction_in_execution: float - fraction_of_execution_in_vm: float + fraction_of_execution_in_block_executor: float + fraction_of_execution_in_inner_block_executor: float + fraction_in_ledger_update: float fraction_in_commit: float @@ -409,8 +416,11 @@ def extract_run_results( gpt = 0 storage_fee_pt = 0 output_bps = 0 + fraction_in_sig_verify = 0 fraction_in_execution = 0 - fraction_of_execution_in_vm = 0 + fraction_of_execution_in_block_executor = 0 + fraction_of_execution_in_inner_block_executor = 0 + fraction_in_ledger_update = 0 fraction_in_commit = 0 else: tps = float(get_only(re.findall(prefix + r" TPS: (\d+\.?\d*) txn/s", output))) @@ -434,13 +444,35 @@ def extract_run_results( output_bps = float( get_only(re.findall(prefix + r" output: (\d+\.?\d*) bytes/s", output)) ) + fraction_in_sig_verify = float( + re.findall( + prefix + r" fraction of total: (\d+\.?\d*) in signature verification", + output, + )[-1] + ) fraction_in_execution = float( re.findall( prefix + r" fraction of total: (\d+\.?\d*) in execution", output )[-1] ) - fraction_of_execution_in_vm = float( - re.findall(prefix + r" fraction of execution (\d+\.?\d*) in VM", output)[-1] + # see if useful or to remove + fraction_of_execution_in_block_executor = float( + re.findall( + prefix + + r" fraction of execution (\d+\.?\d*) in get execution output by executing", + output, + )[-1] + ) + fraction_of_execution_in_inner_block_executor = float( + re.findall( + prefix + r" fraction of execution (\d+\.?\d*) in inner block executor", + output, + )[-1] + ) + fraction_in_ledger_update = float( + re.findall( + prefix + r" fraction of total: (\d+\.?\d*) in ledger update", output + )[-1] ) fraction_in_commit = float( re.findall(prefix + r" fraction of total: (\d+\.?\d*) in commit", output)[ @@ -457,8 +489,11 @@ def extract_run_results( gpt=gpt, storage_fee_pt=storage_fee_pt, output_bps=output_bps, + fraction_in_sig_verify=fraction_in_sig_verify, fraction_in_execution=fraction_in_execution, - fraction_of_execution_in_vm=fraction_of_execution_in_vm, + fraction_of_execution_in_block_executor=fraction_of_execution_in_block_executor, + fraction_of_execution_in_inner_block_executor=fraction_of_execution_in_inner_block_executor, + fraction_in_ledger_update=fraction_in_ledger_update, fraction_in_commit=fraction_in_commit, ) @@ -466,31 +501,39 @@ def extract_run_results( def print_table( results: Sequence[RunGroupInstance], by_levels: bool, - single_field: Optional[Tuple[str, Callable[[RunResults], Any]]], + only_fields: List[Tuple[str, Callable[[RunGroupInstance], Any]]], number_of_execution_threads=EXECUTION_ONLY_NUMBER_OF_THREADS, ): headers = [ "transaction_type", "module_working_set", "executor", - "block_size", - "expected t/s", ] + + if not only_fields: + headers.extend( + [ + "block_size", + "expected t/s", + ] + ) + if by_levels: headers.extend( [f"exe_only {num_threads}" for num_threads in number_of_execution_threads] ) - assert single_field is not None + assert only_fields - if single_field is not None: - field_name, _ = single_field - headers.append(field_name) + if only_fields: + for field_name, _ in only_fields: + headers.append(field_name) else: headers.extend( [ "t/s", + "sigver/total", "exe/total", - "vm/exe", + "block_exe/exe", "commit/total", "g/s", "eff g/s", @@ -508,12 +551,16 @@ def print_table( result.key.transaction_type, result.key.module_working_set_size, result.key.executor_type, - result.block_size, - result.expected_tps, ] + if not only_fields: + row.extend( + [ + result.block_size, + result.expected_tps, + ] + ) if by_levels: - if single_field is not None: - _, field_getter = single_field + for _, field_getter in only_fields: for num_threads in number_of_execution_threads: if num_threads in result.number_of_threads_results: row.append( @@ -522,13 +569,18 @@ def print_table( else: row.append("-") - if single_field is not None: - _, field_getter = single_field - row.append(field_getter(result.single_node_result)) + if only_fields: + for _, field_getter in only_fields: + row.append(field_getter(result)) else: row.append(int(round(result.single_node_result.tps))) + row.append(round(result.single_node_result.fraction_in_sig_verify, 3)) row.append(round(result.single_node_result.fraction_in_execution, 3)) - row.append(round(result.single_node_result.fraction_of_execution_in_vm, 3)) + row.append( + round( + result.single_node_result.fraction_of_execution_in_block_executor, 3 + ) + ) row.append(round(result.single_node_result.fraction_in_commit, 3)) row.append(int(round(result.single_node_result.gps))) row.append(int(round(result.single_node_result.effective_gps))) @@ -582,7 +634,7 @@ def print_table( execute_command(f"cargo build {BUILD_FLAG} --package aptos-executor-benchmark") print(f"Warmup - creating DB with {NUM_ACCOUNTS} accounts") - create_db_command = f"RUST_BACKTRACE=1 {BUILD_FOLDER}/aptos-executor-benchmark --block-size {MAX_BLOCK_SIZE} --execution-threads {NUMBER_OF_EXECUTION_THREADS} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} create-db {FEATURE_FLAGS} --data-dir {tmpdirname}/db --num-accounts {NUM_ACCOUNTS}" + create_db_command = f"RUST_BACKTRACE=1 {BUILD_FOLDER}/aptos-executor-benchmark --block-executor-type aptos-vm-with-block-stm --block-size {MAX_BLOCK_SIZE} --execution-threads {NUMBER_OF_EXECUTION_THREADS} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} create-db {FEATURE_FLAGS} --data-dir {tmpdirname}/db --num-accounts {NUM_ACCOUNTS}" output = execute_command(create_db_command) results = [] @@ -604,9 +656,6 @@ def print_table( if SELECTED_FLOW not in test.included_in: continue - if SKIP_NATIVE and test.key.executor_type == "native": - continue - if test.expected_tps is not None: print(f"WARNING: using uncalibrated TPS for {test.key}") criteria = Criteria( @@ -641,7 +690,8 @@ def print_table( * pow(cur_calibration.max_ratio, 0.8), ) - cur_block_size = int(min([criteria.expected_tps, MAX_BLOCK_SIZE])) + # target 250ms blocks, a bit larger than prod + cur_block_size = max(4, int(min([criteria.expected_tps / 4, MAX_BLOCK_SIZE]))) print(f"Testing {test.key}") if test.key_extra.transaction_type_override == "": @@ -655,21 +705,57 @@ def print_table( ) workload_args_str = f"--transaction-type {transaction_type_list} --transaction-weights {transaction_weights_list}" + pipeline_extra_args = [] + + number_of_execution_threads = NUMBER_OF_EXECUTION_THREADS + if test.key_extra.execution_num_threads_override: + number_of_execution_threads = test.key_extra.execution_num_threads_override + + if test.key_extra.sig_verify_num_threads_override: + pipeline_extra_args.extend( + [ + "--num-sig-verify-threads", + str(test.key_extra.sig_verify_num_threads_override), + ] + ) + + if test.key_extra.split_stages_override: + pipeline_extra_args.append("--split-stages") + sharding_traffic_flags = test.key_extra.sharding_traffic_flags or "" if test.key.executor_type == "VM": - executor_type_str = "--transactions-per-sender 1" - elif test.key.executor_type == "native": - executor_type_str = "--use-native-executor --transactions-per-sender 1" + executor_type_str = "--block-executor-type aptos-vm-with-block-stm --transactions-per-sender 1" + # elif test.key.executor_type == "NativeVM": + # executor_type_str = ( + # "--block-executor-type native-vm-with-block-stm --transactions-per-sender 1" + # ) + elif test.key.executor_type == "NativeSpeculative": + executor_type_str = "--block-executor-type native-loose-speculative --transactions-per-sender 1" + # elif test.key.executor_type == "NativeValueCacheSpeculative": + # executor_type_str = ( + # "--block-executor-type native-value-cache-loose-speculative --transactions-per-sender 1" + # ) + # elif test.key.executor_type == "NativeNoStorageSpeculative": + # executor_type_str = ( + # "--block-executor-type native-no-storage-loose-speculative --transactions-per-sender 1" + # ) elif test.key.executor_type == "sharded": - executor_type_str = f"--num-executor-shards {NUMBER_OF_EXECUTION_THREADS} {sharding_traffic_flags}" + executor_type_str = f"--num-executor-shards {number_of_execution_threads} {sharding_traffic_flags}" else: raise Exception(f"executor type not supported {test.key.executor_type}") - txn_emitter_prefix_str = "" if NUM_BLOCKS > 200 else " --generate-then-execute" - ADDITIONAL_DST_POOL_ACCOUNTS = 2 * MAX_BLOCK_SIZE * NUM_BLOCKS + if NUM_BLOCKS < 200: + pipeline_extra_args.append("--generate-then-execute") - common_command_suffix = f"{executor_type_str} {txn_emitter_prefix_str} --block-size {cur_block_size} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} run-executor {FEATURE_FLAGS} {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {ADDITIONAL_DST_POOL_ACCOUNTS} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp" + pipeline_extra_args_str = " ".join(pipeline_extra_args) + + if test.key_extra.single_block_dst_working_set: + additional_dst_pool_accounts = MAX_BLOCK_SIZE + else: + additional_dst_pool_accounts = 2 * MAX_BLOCK_SIZE * NUM_BLOCKS + + common_command_suffix = f"{executor_type_str} {pipeline_extra_args_str} --block-size {cur_block_size} {DB_CONFIG_FLAGS} {DB_PRUNER_FLAGS} run-executor {FEATURE_FLAGS} {workload_args_str} --module-working-set-size {test.key.module_working_set_size} --main-signer-accounts {MAIN_SIGNER_ACCOUNTS} --additional-dst-pool-accounts {additional_dst_pool_accounts} --data-dir {tmpdirname}/db --checkpoint-dir {tmpdirname}/cp" number_of_threads_results = {} @@ -681,7 +767,7 @@ def print_table( output, "Overall execution" ) - test_db_command = f"RUST_BACKTRACE=1 {BUILD_FOLDER}/aptos-executor-benchmark --execution-threads {NUMBER_OF_EXECUTION_THREADS} {common_command_suffix} --blocks {NUM_BLOCKS}" + test_db_command = f"RUST_BACKTRACE=1 {BUILD_FOLDER}/aptos-executor-benchmark --execution-threads {number_of_execution_threads} {common_command_suffix} --blocks {NUM_BLOCKS}" output = execute_command(test_db_command) single_node_result = extract_run_results(output, "Overall") @@ -731,7 +817,7 @@ def print_table( "module_working_set_size": test.key.module_working_set_size, "executor_type": test.key.executor_type, "block_size": cur_block_size, - "execution_threads": NUMBER_OF_EXECUTION_THREADS, + "execution_threads": number_of_execution_threads, "warmup_num_accounts": NUM_ACCOUNTS, "expected_tps": criteria.expected_tps, "expected_min_tps": criteria.min_tps, @@ -740,6 +826,12 @@ def print_table( "tps": single_node_result.tps, "gps": single_node_result.gps, "gpt": single_node_result.gpt, + "fraction_in_sig_verify": single_node_result.fraction_in_sig_verify, + "fraction_in_execution": single_node_result.fraction_in_execution, + "fraction_of_execution_in_block_executor": single_node_result.fraction_of_execution_in_block_executor, + "fraction_of_execution_in_inner_block_executor": single_node_result.fraction_of_execution_in_inner_block_executor, + "fraction_in_ledger_update": single_node_result.fraction_in_ledger_update, + "fraction_in_commit": single_node_result.fraction_in_commit, "code_perf_version": CODE_PERF_VERSION, "flow": str(SELECTED_FLOW), "test_index": test_index, @@ -751,40 +843,120 @@ def print_table( print_table( results, by_levels=True, - single_field=("t/s", lambda r: int(round(r.tps))), + only_fields=[ + ("block_size", lambda r: r.block_size), + ("expected t/s", lambda r: r.expected_tps), + ("t/s", lambda r: int(round(r.single_node_result.tps))), + ], ) print_table( - results, + results[1:], by_levels=True, - single_field=("g/s", lambda r: int(round(r.gps))), - ) - print_table( - results, - by_levels=False, - single_field=("gas/txn", lambda r: int(round(r.gpt))), - ) - print_table( - results, - by_levels=False, - single_field=( - "storage fee/txn", - lambda r: int(round(r.storage_fee_pt)), - ), + only_fields=[ + ("g/s", lambda r: int(round(r.single_node_result.gps))), + ("gas/txn", lambda r: int(round(r.single_node_result.gpt))), + ( + "storage fee/txn", + lambda r: int(round(r.single_node_result.storage_fee_pt)), + ), + ], ) print_table( - results, + results[1:], by_levels=True, - single_field=("exe/total", lambda r: round(r.fraction_in_execution, 3)), + only_fields=[ + ( + "sigver/total", + lambda r: round(r.single_node_result.fraction_in_sig_verify, 3), + ), + ( + "exe/total", + lambda r: round(r.single_node_result.fraction_in_execution, 3), + ), + ( + "block_exe/exe", + lambda r: round( + r.single_node_result.fraction_of_execution_in_block_executor, + 3, + ), + ), + ( + "ledger/total", + lambda r: round( + r.single_node_result.fraction_in_ledger_update, 3 + ), + ), + ( + "commit/total", + lambda r: round(r.single_node_result.fraction_in_commit, 3), + ), + ], ) print_table( - results, + results[1:], by_levels=True, - single_field=( - "vm/exe", - lambda r: round(r.fraction_of_execution_in_vm, 3), - ), + only_fields=[ + ( + "sigver tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_sig_verify, 0.001), + 1, + ), + ), + ( + "exe tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_execution, 0.001), + 1, + ), + ), + ( + "block exe tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_execution, 0.001) + / max( + r.single_node_result.fraction_of_execution_in_block_executor, + 0.001, + ), + 1, + ), + ), + ( + "inner block exe tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_execution, 0.001) + / max( + r.single_node_result.fraction_of_execution_in_inner_block_executor, + 0.001, + ), + 1, + ), + ), + ( + "ledger tps", + lambda r: round( + r.single_node_result.tps + / max( + r.single_node_result.fraction_in_ledger_update, 0.001 + ), + 1, + ), + ), + ( + "commit tps", + lambda r: round( + r.single_node_result.tps + / max(r.single_node_result.fraction_in_commit, 0.001), + 1, + ), + ), + ], ) - print_table(results, by_levels=False, single_field=None) + print_table(results, by_levels=False, only_fields=None) if single_node_result.tps < criteria.min_tps: text = f"regression detected {single_node_result.tps}, expected median {criteria.expected_tps}, threshold: {criteria.min_tps}), {test.key} didn't meet TPS requirements"