Skip to content

Commit

Permalink
feat(katana): add executor metrics (#1791)
Browse files Browse the repository at this point in the history
ref #1369 

Add metrics on katana executor; tracking the total L1 **gas** and Cairo **steps** used.

There were two approaches that i thought of; 
1. record the metrics on every tx execution, or
2. on every block

~Decided to go with (1) as it would allow to measure it in realtime (as the tx is being executed), instead of having to wait until the block is finished being processed.~

Thought im not exactly sure which one is the ideal one.

Doing (1) might be less performant bcs we have to acquire the lock to the metrics recorder more frequently (ie every tx), as opposed to only updating the metrics once every block.

another thing to note, currently doing (1) would require all executor implementations to define the metrics in their own implmentations, meaning have to duplicate code. If do (2) can just define it under `block_producer` scope and be executor agnostic.

EDIT: doing (2). metrics are collected upon completion of block production

---

some changes are made to gather the value after block production:
- simplify params on `backend::do_mine_block`, now only accept two args; `BlockEnv` and `ExecutionOutput`
- add a new type `ExecutionStats` under `katana-executor`, this is where executor would store the gas and steps value
  • Loading branch information
kariy authored and matzayonc committed Apr 12, 2024
1 parent adc691e commit b448892
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 50 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/katana/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ katana-tasks.workspace = true

anyhow.workspace = true
async-trait.workspace = true
dojo-metrics.workspace = true
metrics.workspace = true
cairo-lang-casm = "2.3.1"
cairo-lang-starknet = "2.3.1"
cairo-vm.workspace = true
Expand Down
36 changes: 19 additions & 17 deletions crates/katana/core/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::sync::Arc;

use katana_executor::ExecutorFactory;
use katana_executor::{ExecutionOutput, ExecutionResult, ExecutorFactory};
use katana_primitives::block::{
Block, FinalityStatus, GasPrices, Header, PartialHeader, SealedBlockWithStatus,
};
use katana_primitives::chain::ChainId;
use katana_primitives::env::BlockEnv;
use katana_primitives::state::StateUpdatesWithDeclaredClasses;
use katana_primitives::version::CURRENT_STARKNET_VERSION;
use katana_primitives::FieldElement;
use katana_provider::providers::fork::ForkedProvider;
Expand All @@ -26,7 +25,7 @@ pub mod storage;
use self::config::StarknetConfig;
use self::storage::Blockchain;
use crate::env::BlockContextGenerator;
use crate::service::block_producer::{BlockProductionError, MinedBlockOutcome, TxWithOutcome};
use crate::service::block_producer::{BlockProductionError, MinedBlockOutcome};
use crate::utils::get_current_timestamp;

pub(crate) const LOG_TARGET: &str = "katana::core::backend";
Expand Down Expand Up @@ -120,17 +119,20 @@ impl<EF: ExecutorFactory> Backend<EF> {
pub fn do_mine_block(
&self,
block_env: &BlockEnv,
txs_outcomes: Vec<TxWithOutcome>,
state_updates: StateUpdatesWithDeclaredClasses,
execution_output: ExecutionOutput,
) -> Result<MinedBlockOutcome, BlockProductionError> {
let mut txs = vec![];
let mut receipts = vec![];
let mut execs = vec![];

for t in txs_outcomes {
txs.push(t.tx);
receipts.push(t.receipt);
execs.push(t.exec_info);
// we optimistically allocate the maximum amount possible
let mut txs = Vec::with_capacity(execution_output.transactions.len());
let mut traces = Vec::with_capacity(execution_output.transactions.len());
let mut receipts = Vec::with_capacity(execution_output.transactions.len());

// only include successful transactions in the block
for (tx, res) in execution_output.transactions {
if let ExecutionResult::Success { receipt, trace, .. } = res {
txs.push(tx);
traces.push(trace);
receipts.push(receipt);
}
}

let prev_hash = BlockHashProvider::latest_hash(self.blockchain.provider())?;
Expand All @@ -156,9 +158,9 @@ impl<EF: ExecutorFactory> Backend<EF> {
BlockWriter::insert_block_with_states_and_receipts(
self.blockchain.provider(),
block,
state_updates,
execution_output.states,
receipts,
execs,
traces,
)?;

info!(
Expand All @@ -168,7 +170,7 @@ impl<EF: ExecutorFactory> Backend<EF> {
"Block mined.",
);

Ok(MinedBlockOutcome { block_number })
Ok(MinedBlockOutcome { block_number, stats: execution_output.stats })
}

pub fn update_block_env(&self, block_env: &mut BlockEnv) {
Expand All @@ -192,7 +194,7 @@ impl<EF: ExecutorFactory> Backend<EF> {
&self,
block_env: &BlockEnv,
) -> Result<MinedBlockOutcome, BlockProductionError> {
self.do_mine_block(block_env, Default::default(), Default::default())
self.do_mine_block(block_env, Default::default())
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/katana/core/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ impl<EF: ExecutorFactory> KatanaSequencer<EF> {

let block_producer = Arc::new(block_producer);

tokio::spawn(NodeService {
tokio::spawn(NodeService::new(
Arc::clone(&pool),
miner,
pool: Arc::clone(&pool),
block_producer: block_producer.clone(),
block_producer.clone(),
#[cfg(feature = "messaging")]
messaging,
});
));

Ok(Self { pool, config, backend, block_producer })
}
Expand Down
27 changes: 10 additions & 17 deletions crates/katana/core/src/service/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::time::Duration;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::stream::{Stream, StreamExt};
use futures::FutureExt;
use katana_executor::{BlockExecutor, ExecutionOutput, ExecutionResult, ExecutorFactory};
use katana_executor::{BlockExecutor, ExecutionResult, ExecutionStats, ExecutorFactory};
use katana_primitives::block::{BlockHashOrNumber, ExecutableBlock, PartialHeader};
use katana_primitives::receipt::Receipt;
use katana_primitives::trace::TxExecInfo;
Expand Down Expand Up @@ -42,8 +42,10 @@ pub enum BlockProductionError {
TransactionExecutionError(#[from] katana_executor::ExecutorError),
}

#[derive(Debug, Clone)]
pub struct MinedBlockOutcome {
pub block_number: u64,
pub stats: ExecutionStats,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -264,19 +266,8 @@ impl<EF: ExecutorFactory> IntervalBlockProducer<EF> {
trace!(target: LOG_TARGET, "Creating new block.");

let block_env = executor.block_env();
let ExecutionOutput { states, transactions } = executor.take_execution_output()?;

let transactions = transactions
.into_iter()
.filter_map(|(tx, res)| match res {
ExecutionResult::Failed { .. } => None,
ExecutionResult::Success { receipt, trace, .. } => {
Some(TxWithOutcome { tx, receipt, exec_info: trace })
}
})
.collect::<Vec<_>>();

let outcome = backend.do_mine_block(&block_env, transactions, states)?;
let execution_output = executor.take_execution_output()?;
let outcome = backend.do_mine_block(&block_env, execution_output)?;

trace!(target: LOG_TARGET, block_number = %outcome.block_number, "Created new block.");

Expand Down Expand Up @@ -515,8 +506,10 @@ impl<EF: ExecutorFactory> InstantBlockProducer<EF> {

executor.execute_block(block)?;

let ExecutionOutput { states, transactions } = executor.take_execution_output()?;
let txs_outcomes = transactions
let execution_output = executor.take_execution_output()?;
let txs_outcomes = execution_output
.transactions
.clone()
.into_iter()
.filter_map(|(tx, res)| match res {
ExecutionResult::Success { receipt, trace, .. } => {
Expand All @@ -526,7 +519,7 @@ impl<EF: ExecutorFactory> InstantBlockProducer<EF> {
})
.collect::<Vec<_>>();

let outcome = backend.do_mine_block(&block_env, txs_outcomes.clone(), states)?;
let outcome = backend.do_mine_block(&block_env, execution_output)?;

trace!(target: LOG_TARGET, block_number = %outcome.block_number, "Created new block.");

Expand Down
15 changes: 15 additions & 0 deletions crates/katana/core/src/service/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use dojo_metrics::Metrics;
use metrics::Counter;

pub(crate) struct ServiceMetrics {
pub(crate) block_producer: BlockProducerMetrics,
}

#[derive(Metrics)]
#[metrics(scope = "block_producer")]
pub(crate) struct BlockProducerMetrics {
/// The amount of L1 gas processed in a block.
pub(crate) l1_gas_processed_total: Counter,
/// The amount of Cairo steps processed in a block.
pub(crate) cairo_steps_processed_total: Counter,
}
30 changes: 30 additions & 0 deletions crates/katana/core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ use starknet::core::types::FieldElement;
use tracing::{error, info};

use self::block_producer::BlockProducer;
use self::metrics::{BlockProducerMetrics, ServiceMetrics};
use crate::pool::TransactionPool;

pub mod block_producer;
#[cfg(feature = "messaging")]
pub mod messaging;
mod metrics;

#[cfg(feature = "messaging")]
use self::messaging::{MessagingOutcome, MessagingService};
Expand All @@ -39,6 +41,28 @@ pub struct NodeService<EF: ExecutorFactory> {
/// The messaging service
#[cfg(feature = "messaging")]
pub(crate) messaging: Option<MessagingService<EF>>,
/// Metrics for recording the service operations
metrics: ServiceMetrics,
}

impl<EF: ExecutorFactory> NodeService<EF> {
pub fn new(
pool: Arc<TransactionPool>,
miner: TransactionMiner,
block_producer: Arc<BlockProducer<EF>>,
#[cfg(feature = "messaging")] messaging: Option<MessagingService<EF>>,
) -> Self {
let metrics = ServiceMetrics { block_producer: BlockProducerMetrics::default() };

Self {
pool,
miner,
block_producer,
metrics,
#[cfg(feature = "messaging")]
messaging,
}
}
}

impl<EF: ExecutorFactory> Future for NodeService<EF> {
Expand Down Expand Up @@ -68,6 +92,12 @@ impl<EF: ExecutorFactory> Future for NodeService<EF> {
match res {
Ok(outcome) => {
info!(target: LOG_TARGET, block_number = %outcome.block_number, "Mined block.");

let metrics = &pin.metrics.block_producer;
let gas_used = outcome.stats.l1_gas_used;
let steps_used = outcome.stats.cairo_steps_used;
metrics.l1_gas_processed_total.increment(gas_used as u64);
metrics.cairo_steps_processed_total.increment(steps_used as u64);
}

Err(err) => {
Expand Down
11 changes: 11 additions & 0 deletions crates/katana/executor/src/abstraction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,20 @@ impl SimulationFlag {
}
}

/// Stats about the transactions execution.
#[derive(Debug, Clone, Default)]
pub struct ExecutionStats {
/// The total gas used.
pub l1_gas_used: u128,
/// The total cairo steps used.
pub cairo_steps_used: u128,
}

/// The output of a executor after a series of executions.
#[derive(Debug, Default)]
pub struct ExecutionOutput {
/// Statistics throughout the executions process.
pub stats: ExecutionStats,
/// The state updates produced by the executions.
pub states: StateUpdatesWithDeclaredClasses,
/// The transactions that have been executed.
Expand Down
14 changes: 10 additions & 4 deletions crates/katana/executor/src/implementation/blockifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use tracing::info;
use self::output::receipt_from_exec_info;
use self::state::CachedState;
use crate::{
BlockExecutor, EntryPointCall, ExecutionError, ExecutionOutput, ExecutionResult, ExecutorExt,
ExecutorFactory, ExecutorResult, ResultAndStates, SimulationFlag, StateProviderDb,
BlockExecutor, EntryPointCall, ExecutionError, ExecutionOutput, ExecutionResult,
ExecutionStats, ExecutorExt, ExecutorFactory, ExecutorResult, ResultAndStates, SimulationFlag,
StateProviderDb,
};

pub(crate) const LOG_TARGET: &str = "katana::executor::blockifier";
Expand Down Expand Up @@ -69,6 +70,7 @@ pub struct StarknetVMProcessor<'a> {
state: CachedState<StateProviderDb<'a>>,
transactions: Vec<(TxWithHash, ExecutionResult)>,
simulation_flags: SimulationFlag,
stats: ExecutionStats,
}

impl<'a> StarknetVMProcessor<'a> {
Expand All @@ -81,7 +83,7 @@ impl<'a> StarknetVMProcessor<'a> {
let transactions = Vec::new();
let block_context = utils::block_context_from_envs(&block_env, &cfg_env);
let state = state::CachedState::new(StateProviderDb(state));
Self { block_context, state, transactions, simulation_flags }
Self { block_context, state, transactions, simulation_flags, stats: Default::default() }
}

fn fill_block_env_from_header(&mut self, header: &PartialHeader) {
Expand Down Expand Up @@ -159,6 +161,9 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> {
crate::utils::log_resources(&trace.actual_resources);
crate::utils::log_events(receipt.events());

self.stats.l1_gas_used += fee.gas_consumed;
self.stats.cairo_steps_used += receipt.resources_used().steps as u128;

if let Some(reason) = receipt.revert_reason() {
info!(target: LOG_TARGET, reason = %reason, "Transaction reverted.");
}
Expand Down Expand Up @@ -187,7 +192,8 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> {
fn take_execution_output(&mut self) -> ExecutorResult<ExecutionOutput> {
let states = utils::state_update_from_cached_state(&self.state);
let transactions = std::mem::take(&mut self.transactions);
Ok(ExecutionOutput { states, transactions })
let stats = std::mem::take(&mut self.stats);
Ok(ExecutionOutput { stats, states, transactions })
}

fn state(&self) -> Box<dyn StateProvider + 'a> {
Expand Down
11 changes: 8 additions & 3 deletions crates/katana/executor/src/implementation/sir/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::abstraction::{
BlockExecutor, ExecutionOutput, ExecutorExt, ExecutorFactory, ExecutorResult, SimulationFlag,
StateProviderDb,
};
use crate::{EntryPointCall, ExecutionError, ExecutionResult, ResultAndStates};
use crate::{EntryPointCall, ExecutionError, ExecutionResult, ExecutionStats, ResultAndStates};

pub(crate) const LOG_TARGET: &str = "katana::executor::sir";

Expand Down Expand Up @@ -72,6 +72,7 @@ pub struct StarknetVMProcessor<'a> {
state: CachedState<StateProviderDb<'a>, PermanentContractClassCache>,
transactions: Vec<(TxWithHash, ExecutionResult)>,
simulation_flags: SimulationFlag,
stats: ExecutionStats,
}

impl<'a> StarknetVMProcessor<'a> {
Expand All @@ -85,7 +86,7 @@ impl<'a> StarknetVMProcessor<'a> {
let block_context = utils::block_context_from_envs(&block_env, &cfg_env);
let state =
CachedState::new(StateProviderDb(state), PermanentContractClassCache::default());
Self { block_context, state, transactions, simulation_flags }
Self { block_context, state, transactions, simulation_flags, stats: Default::default() }
}

fn fill_block_env_from_header(&mut self, header: &PartialHeader) {
Expand Down Expand Up @@ -160,6 +161,9 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> {
crate::utils::log_resources(&trace.actual_resources);
crate::utils::log_events(receipt.events());

self.stats.l1_gas_used += fee.gas_consumed;
self.stats.cairo_steps_used += receipt.resources_used().steps as u128;

if let Some(reason) = receipt.revert_reason() {
info!(target: LOG_TARGET, reason = %reason, "Transaction reverted.");
}
Expand Down Expand Up @@ -194,7 +198,8 @@ impl<'a> BlockExecutor<'a> for StarknetVMProcessor<'a> {
fn take_execution_output(&mut self) -> ExecutorResult<ExecutionOutput> {
let states = utils::state_update_from_cached_state(&self.state);
let transactions = std::mem::take(&mut self.transactions);
Ok(ExecutionOutput { states, transactions })
let stats = std::mem::take(&mut self.stats);
Ok(ExecutionOutput { stats, states, transactions })
}

fn state(&self) -> Box<dyn StateProvider + 'a> {
Expand Down
Loading

0 comments on commit b448892

Please sign in to comment.