From 1577338242172c9574576297d967b00fcdac26fe Mon Sep 17 00:00:00 2001 From: Ammar Arif Date: Fri, 12 Apr 2024 17:04:47 +0800 Subject: [PATCH] feat(katana): add executor metrics (#1791) ref #1369 Add metrics on katana executor; tracking the total L1 **gas** and Cairo **steps** used. There were two approaches that i thought of; 1. record the metrics on every tx execution, or 2. on every block ~Decided to go with (1) as it would allow to measure it in realtime (as the tx is being executed), instead of having to wait until the block is finished being processed.~ Thought im not exactly sure which one is the ideal one. Doing (1) might be less performant bcs we have to acquire the lock to the metrics recorder more frequently (ie every tx), as opposed to only updating the metrics once every block. another thing to note, currently doing (1) would require all executor implementations to define the metrics in their own implmentations, meaning have to duplicate code. If do (2) can just define it under `block_producer` scope and be executor agnostic. EDIT: doing (2). metrics are collected upon completion of block production --- some changes are made to gather the value after block production: - simplify params on `backend::do_mine_block`, now only accept two args; `BlockEnv` and `ExecutionOutput` - add a new type `ExecutionStats` under `katana-executor`, this is where executor would store the gas and steps value --- Cargo.lock | 2 ++ crates/katana/core/Cargo.toml | 2 ++ crates/katana/core/src/backend/mod.rs | 36 ++++++++++--------- crates/katana/core/src/sequencer.rs | 8 ++--- .../katana/core/src/service/block_producer.rs | 27 ++++++-------- crates/katana/core/src/service/metrics.rs | 15 ++++++++ crates/katana/core/src/service/mod.rs | 30 ++++++++++++++++ crates/katana/executor/src/abstraction/mod.rs | 11 ++++++ .../src/implementation/blockifier/mod.rs | 14 +++++--- .../executor/src/implementation/sir/mod.rs | 11 ++++-- crates/katana/executor/tests/executor.rs | 23 +++++++++--- crates/katana/executor/tests/simulate.rs | 4 ++- crates/katana/primitives/src/receipt.rs | 12 +++++++ 13 files changed, 145 insertions(+), 50 deletions(-) create mode 100644 crates/katana/core/src/service/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 86b07d5fe3..2b16039879 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6682,6 +6682,7 @@ dependencies = [ "cairo-vm 0.9.2", "convert_case 0.6.0", "derive_more", + "dojo-metrics", "flate2", "futures", "hex", @@ -6692,6 +6693,7 @@ dependencies = [ "katana-provider", "katana-tasks", "lazy_static", + "metrics", "parking_lot 0.12.1", "rand", "reqwest", diff --git a/crates/katana/core/Cargo.toml b/crates/katana/core/Cargo.toml index aeb7c1af4e..68c395f28b 100644 --- a/crates/katana/core/Cargo.toml +++ b/crates/katana/core/Cargo.toml @@ -15,6 +15,8 @@ katana-tasks.workspace = true anyhow.workspace = true async-trait.workspace = true +dojo-metrics.workspace = true +metrics.workspace = true cairo-lang-casm = "2.3.1" cairo-lang-starknet = "2.3.1" cairo-vm.workspace = true diff --git a/crates/katana/core/src/backend/mod.rs b/crates/katana/core/src/backend/mod.rs index badd55ab36..209b58a2dc 100644 --- a/crates/katana/core/src/backend/mod.rs +++ b/crates/katana/core/src/backend/mod.rs @@ -1,12 +1,11 @@ use std::sync::Arc; -use katana_executor::ExecutorFactory; +use katana_executor::{ExecutionOutput, ExecutionResult, ExecutorFactory}; use katana_primitives::block::{ Block, FinalityStatus, GasPrices, Header, PartialHeader, SealedBlockWithStatus, }; use katana_primitives::chain::ChainId; use katana_primitives::env::BlockEnv; -use katana_primitives::state::StateUpdatesWithDeclaredClasses; use katana_primitives::version::CURRENT_STARKNET_VERSION; use katana_primitives::FieldElement; use katana_provider::providers::fork::ForkedProvider; @@ -26,7 +25,7 @@ pub mod storage; use self::config::StarknetConfig; use self::storage::Blockchain; use crate::env::BlockContextGenerator; -use crate::service::block_producer::{BlockProductionError, MinedBlockOutcome, TxWithOutcome}; +use crate::service::block_producer::{BlockProductionError, MinedBlockOutcome}; use crate::utils::get_current_timestamp; pub(crate) const LOG_TARGET: &str = "katana::core::backend"; @@ -120,17 +119,20 @@ impl Backend { pub fn do_mine_block( &self, block_env: &BlockEnv, - txs_outcomes: Vec, - state_updates: StateUpdatesWithDeclaredClasses, + execution_output: ExecutionOutput, ) -> Result { - let mut txs = vec![]; - let mut receipts = vec![]; - let mut execs = vec![]; - - for t in txs_outcomes { - txs.push(t.tx); - receipts.push(t.receipt); - execs.push(t.exec_info); + // we optimistically allocate the maximum amount possible + let mut txs = Vec::with_capacity(execution_output.transactions.len()); + let mut traces = Vec::with_capacity(execution_output.transactions.len()); + let mut receipts = Vec::with_capacity(execution_output.transactions.len()); + + // only include successful transactions in the block + for (tx, res) in execution_output.transactions { + if let ExecutionResult::Success { receipt, trace, .. } = res { + txs.push(tx); + traces.push(trace); + receipts.push(receipt); + } } let prev_hash = BlockHashProvider::latest_hash(self.blockchain.provider())?; @@ -156,9 +158,9 @@ impl Backend { BlockWriter::insert_block_with_states_and_receipts( self.blockchain.provider(), block, - state_updates, + execution_output.states, receipts, - execs, + traces, )?; info!( @@ -168,7 +170,7 @@ impl Backend { "Block mined.", ); - Ok(MinedBlockOutcome { block_number }) + Ok(MinedBlockOutcome { block_number, stats: execution_output.stats }) } pub fn update_block_env(&self, block_env: &mut BlockEnv) { @@ -192,7 +194,7 @@ impl Backend { &self, block_env: &BlockEnv, ) -> Result { - self.do_mine_block(block_env, Default::default(), Default::default()) + self.do_mine_block(block_env, Default::default()) } } diff --git a/crates/katana/core/src/sequencer.rs b/crates/katana/core/src/sequencer.rs index 790be10c69..0a0db8678d 100644 --- a/crates/katana/core/src/sequencer.rs +++ b/crates/katana/core/src/sequencer.rs @@ -85,13 +85,13 @@ impl KatanaSequencer { let block_producer = Arc::new(block_producer); - tokio::spawn(NodeService { + tokio::spawn(NodeService::new( + Arc::clone(&pool), miner, - pool: Arc::clone(&pool), - block_producer: block_producer.clone(), + block_producer.clone(), #[cfg(feature = "messaging")] messaging, - }); + )); Ok(Self { pool, config, backend, block_producer }) } diff --git a/crates/katana/core/src/service/block_producer.rs b/crates/katana/core/src/service/block_producer.rs index 09e5be0a1b..4205290389 100644 --- a/crates/katana/core/src/service/block_producer.rs +++ b/crates/katana/core/src/service/block_producer.rs @@ -8,7 +8,7 @@ use std::time::Duration; use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::stream::{Stream, StreamExt}; use futures::FutureExt; -use katana_executor::{BlockExecutor, ExecutionOutput, ExecutionResult, ExecutorFactory}; +use katana_executor::{BlockExecutor, ExecutionResult, ExecutionStats, ExecutorFactory}; use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader}; use katana_primitives::receipt::Receipt; use katana_primitives::trace::TxExecInfo; @@ -42,8 +42,10 @@ pub enum BlockProductionError { TransactionExecutionError(#[from] katana_executor::ExecutorError), } +#[derive(Debug, Clone)] pub struct MinedBlockOutcome { pub block_number: u64, + pub stats: ExecutionStats, } #[derive(Debug, Clone)] @@ -264,19 +266,8 @@ impl IntervalBlockProducer { trace!(target: LOG_TARGET, "Creating new block."); let block_env = executor.block_env(); - let ExecutionOutput { states, transactions } = executor.take_execution_output()?; - - let transactions = transactions - .into_iter() - .filter_map(|(tx, res)| match res { - ExecutionResult::Failed { .. } => None, - ExecutionResult::Success { receipt, trace, .. } => { - Some(TxWithOutcome { tx, receipt, exec_info: trace }) - } - }) - .collect::>(); - - let outcome = backend.do_mine_block(&block_env, transactions, states)?; + let execution_output = executor.take_execution_output()?; + let outcome = backend.do_mine_block(&block_env, execution_output)?; trace!(target: LOG_TARGET, block_number = %outcome.block_number, "Created new block."); @@ -515,8 +506,10 @@ impl InstantBlockProducer { executor.execute_block(block)?; - let ExecutionOutput { states, transactions } = executor.take_execution_output()?; - let txs_outcomes = transactions + let execution_output = executor.take_execution_output()?; + let txs_outcomes = execution_output + .transactions + .clone() .into_iter() .filter_map(|(tx, res)| match res { ExecutionResult::Success { receipt, trace, .. } => { @@ -526,7 +519,7 @@ impl InstantBlockProducer { }) .collect::>(); - let outcome = backend.do_mine_block(&block_env, txs_outcomes.clone(), states)?; + let outcome = backend.do_mine_block(&block_env, execution_output)?; trace!(target: LOG_TARGET, block_number = %outcome.block_number, "Created new block."); diff --git a/crates/katana/core/src/service/metrics.rs b/crates/katana/core/src/service/metrics.rs new file mode 100644 index 0000000000..3749ab8c03 --- /dev/null +++ b/crates/katana/core/src/service/metrics.rs @@ -0,0 +1,15 @@ +use dojo_metrics::Metrics; +use metrics::Counter; + +pub(crate) struct ServiceMetrics { + pub(crate) block_producer: BlockProducerMetrics, +} + +#[derive(Metrics)] +#[metrics(scope = "block_producer")] +pub(crate) struct BlockProducerMetrics { + /// The amount of L1 gas processed in a block. + pub(crate) l1_gas_processed_total: Counter, + /// The amount of Cairo steps processed in a block. + pub(crate) cairo_steps_processed_total: Counter, +} diff --git a/crates/katana/core/src/service/mod.rs b/crates/katana/core/src/service/mod.rs index a636764ec1..74b9fb2a18 100644 --- a/crates/katana/core/src/service/mod.rs +++ b/crates/katana/core/src/service/mod.rs @@ -13,11 +13,13 @@ use starknet::core::types::FieldElement; use tracing::{error, info}; use self::block_producer::BlockProducer; +use self::metrics::{BlockProducerMetrics, ServiceMetrics}; use crate::pool::TransactionPool; pub mod block_producer; #[cfg(feature = "messaging")] pub mod messaging; +mod metrics; #[cfg(feature = "messaging")] use self::messaging::{MessagingOutcome, MessagingService}; @@ -39,6 +41,28 @@ pub struct NodeService { /// The messaging service #[cfg(feature = "messaging")] pub(crate) messaging: Option>, + /// Metrics for recording the service operations + metrics: ServiceMetrics, +} + +impl NodeService { + pub fn new( + pool: Arc, + miner: TransactionMiner, + block_producer: Arc>, + #[cfg(feature = "messaging")] messaging: Option>, + ) -> Self { + let metrics = ServiceMetrics { block_producer: BlockProducerMetrics::default() }; + + Self { + pool, + miner, + block_producer, + metrics, + #[cfg(feature = "messaging")] + messaging, + } + } } impl Future for NodeService { @@ -68,6 +92,12 @@ impl Future for NodeService { match res { Ok(outcome) => { info!(target: LOG_TARGET, block_number = %outcome.block_number, "Mined block."); + + let metrics = &pin.metrics.block_producer; + let gas_used = outcome.stats.l1_gas_used; + let steps_used = outcome.stats.cairo_steps_used; + metrics.l1_gas_processed_total.increment(gas_used as u64); + metrics.cairo_steps_processed_total.increment(steps_used as u64); } Err(err) => { diff --git a/crates/katana/executor/src/abstraction/mod.rs b/crates/katana/executor/src/abstraction/mod.rs index 313ce8e045..5e231cc823 100644 --- a/crates/katana/executor/src/abstraction/mod.rs +++ b/crates/katana/executor/src/abstraction/mod.rs @@ -72,9 +72,20 @@ impl SimulationFlag { } } +/// Stats about the transactions execution. +#[derive(Debug, Clone, Default)] +pub struct ExecutionStats { + /// The total gas used. + pub l1_gas_used: u128, + /// The total cairo steps used. + pub cairo_steps_used: u128, +} + /// The output of a executor after a series of executions. #[derive(Debug, Default)] pub struct ExecutionOutput { + /// Statistics throughout the executions process. + pub stats: ExecutionStats, /// The state updates produced by the executions. pub states: StateUpdatesWithDeclaredClasses, /// The transactions that have been executed. diff --git a/crates/katana/executor/src/implementation/blockifier/mod.rs b/crates/katana/executor/src/implementation/blockifier/mod.rs index a18c4cb0e6..1ad7bd2094 100644 --- a/crates/katana/executor/src/implementation/blockifier/mod.rs +++ b/crates/katana/executor/src/implementation/blockifier/mod.rs @@ -19,8 +19,9 @@ use tracing::info; use self::output::receipt_from_exec_info; use self::state::CachedState; use crate::{ - BlockExecutor, EntryPointCall, ExecutionError, ExecutionOutput, ExecutionResult, ExecutorExt, - ExecutorFactory, ExecutorResult, ResultAndStates, SimulationFlag, StateProviderDb, + BlockExecutor, EntryPointCall, ExecutionError, ExecutionOutput, ExecutionResult, + ExecutionStats, ExecutorExt, ExecutorFactory, ExecutorResult, ResultAndStates, SimulationFlag, + StateProviderDb, }; pub(crate) const LOG_TARGET: &str = "katana::executor::blockifier"; @@ -69,6 +70,7 @@ pub struct StarknetVMProcessor<'a> { state: CachedState>, transactions: Vec<(TxWithHash, ExecutionResult)>, simulation_flags: SimulationFlag, + stats: ExecutionStats, } impl<'a> StarknetVMProcessor<'a> { @@ -81,7 +83,7 @@ impl<'a> StarknetVMProcessor<'a> { let transactions = Vec::new(); let block_context = utils::block_context_from_envs(&block_env, &cfg_env); let state = state::CachedState::new(StateProviderDb(state)); - Self { block_context, state, transactions, simulation_flags } + Self { block_context, state, transactions, simulation_flags, stats: Default::default() } } fn fill_block_env_from_header(&mut self, header: &PartialHeader) { @@ -159,6 +161,9 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> { crate::utils::log_resources(&trace.actual_resources); crate::utils::log_events(receipt.events()); + self.stats.l1_gas_used += fee.gas_consumed; + self.stats.cairo_steps_used += receipt.resources_used().steps as u128; + if let Some(reason) = receipt.revert_reason() { info!(target: LOG_TARGET, reason = %reason, "Transaction reverted."); } @@ -187,7 +192,8 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> { fn take_execution_output(&mut self) -> ExecutorResult { let states = utils::state_update_from_cached_state(&self.state); let transactions = std::mem::take(&mut self.transactions); - Ok(ExecutionOutput { states, transactions }) + let stats = std::mem::take(&mut self.stats); + Ok(ExecutionOutput { stats, states, transactions }) } fn state(&self) -> Box { diff --git a/crates/katana/executor/src/implementation/sir/mod.rs b/crates/katana/executor/src/implementation/sir/mod.rs index f73930b73d..bb6c44d704 100644 --- a/crates/katana/executor/src/implementation/sir/mod.rs +++ b/crates/katana/executor/src/implementation/sir/mod.rs @@ -23,7 +23,7 @@ use crate::abstraction::{ BlockExecutor, ExecutionOutput, ExecutorExt, ExecutorFactory, ExecutorResult, SimulationFlag, StateProviderDb, }; -use crate::{EntryPointCall, ExecutionError, ExecutionResult, ResultAndStates}; +use crate::{EntryPointCall, ExecutionError, ExecutionResult, ExecutionStats, ResultAndStates}; pub(crate) const LOG_TARGET: &str = "katana::executor::sir"; @@ -72,6 +72,7 @@ pub struct StarknetVMProcessor<'a> { state: CachedState, PermanentContractClassCache>, transactions: Vec<(TxWithHash, ExecutionResult)>, simulation_flags: SimulationFlag, + stats: ExecutionStats, } impl<'a> StarknetVMProcessor<'a> { @@ -85,7 +86,7 @@ impl<'a> StarknetVMProcessor<'a> { let block_context = utils::block_context_from_envs(&block_env, &cfg_env); let state = CachedState::new(StateProviderDb(state), PermanentContractClassCache::default()); - Self { block_context, state, transactions, simulation_flags } + Self { block_context, state, transactions, simulation_flags, stats: Default::default() } } fn fill_block_env_from_header(&mut self, header: &PartialHeader) { @@ -160,6 +161,9 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> { crate::utils::log_resources(&trace.actual_resources); crate::utils::log_events(receipt.events()); + self.stats.l1_gas_used += fee.gas_consumed; + self.stats.cairo_steps_used += receipt.resources_used().steps as u128; + if let Some(reason) = receipt.revert_reason() { info!(target: LOG_TARGET, reason = %reason, "Transaction reverted."); } @@ -194,7 +198,8 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> { fn take_execution_output(&mut self) -> ExecutorResult { let states = utils::state_update_from_cached_state(&self.state); let transactions = std::mem::take(&mut self.transactions); - Ok(ExecutionOutput { states, transactions }) + let stats = std::mem::take(&mut self.stats); + Ok(ExecutionOutput { stats, states, transactions }) } fn state(&self) -> Box { diff --git a/crates/katana/executor/tests/executor.rs b/crates/katana/executor/tests/executor.rs index 70a85ee6fb..3d64cf2393 100644 --- a/crates/katana/executor/tests/executor.rs +++ b/crates/katana/executor/tests/executor.rs @@ -249,13 +249,28 @@ fn test_executor_with_valid_blocks_impl( ); // assert the state updates after all the blocks are executed - // + let mut actual_total_gas: u128 = 0; + let mut actual_total_steps: u128 = 0; // assert the state updates - let ExecutionOutput { states, transactions } = executor.take_execution_output().unwrap(); - // asserts that the executed transactions are stored - let actual_txs: Vec = transactions.iter().map(|(tx, _)| tx.clone()).collect(); + let ExecutionOutput { states, transactions, stats } = executor.take_execution_output().unwrap(); + // asserts that the executed transactions are stored + let actual_txs: Vec = transactions + .iter() + .map(|(tx, res)| { + if let Some(fee) = res.fee() { + actual_total_gas += fee.gas_consumed; + } + if let Some(rec) = res.receipt() { + actual_total_steps += rec.resources_used().steps as u128; + } + tx.clone() + }) + .collect(); + + assert_eq!(actual_total_gas, stats.l1_gas_used); + assert_eq!(actual_total_steps, stats.cairo_steps_used); assert_eq!(actual_txs, expected_txs); let actual_nonce_updates = states.state_updates.nonce_updates; diff --git a/crates/katana/executor/tests/simulate.rs b/crates/katana/executor/tests/simulate.rs index 7a8905fa43..076f2ba013 100644 --- a/crates/katana/executor/tests/simulate.rs +++ b/crates/katana/executor/tests/simulate.rs @@ -62,9 +62,11 @@ fn test_simulate_tx_impl( }),); // check that the underlying state is not modified - let ExecutionOutput { states, transactions } = + let ExecutionOutput { states, transactions, stats } = executor.take_execution_output().expect("must take output"); + assert_eq!(stats.l1_gas_used, 0, "no gas usage should be recorded"); + assert_eq!(stats.cairo_steps_used, 0, "no steps usage should be recorded"); assert!(transactions.is_empty(), "simulated tx should not be stored"); assert!(states.state_updates.nonce_updates.is_empty(), "no state updates"); diff --git a/crates/katana/primitives/src/receipt.rs b/crates/katana/primitives/src/receipt.rs index b9f5bb5e9f..f17e6d9cc9 100644 --- a/crates/katana/primitives/src/receipt.rs +++ b/crates/katana/primitives/src/receipt.rs @@ -122,6 +122,7 @@ impl Receipt { } } + /// Returns the L1 messages sent. pub fn messages_sent(&self) -> &[MessageToL1] { match self { Receipt::Invoke(rct) => &rct.messages_sent, @@ -131,6 +132,7 @@ impl Receipt { } } + /// Returns the events emitted. pub fn events(&self) -> &[Event] { match self { Receipt::Invoke(rct) => &rct.events, @@ -139,6 +141,16 @@ impl Receipt { Receipt::DeployAccount(rct) => &rct.events, } } + + /// Returns the execution resources used. + pub fn resources_used(&self) -> &TxExecutionResources { + match self { + Receipt::Invoke(rct) => &rct.execution_resources, + Receipt::Declare(rct) => &rct.execution_resources, + Receipt::L1Handler(rct) => &rct.execution_resources, + Receipt::DeployAccount(rct) => &rct.execution_resources, + } + } } /// Transaction execution resources.