Skip to content

Commit

Permalink
bench(katana-executor): measure cached state concurrency (#2190)
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy authored Jul 19, 2024
1 parent 1d3318a commit 97ea6b0
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 2 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ jobs:
steps:
- uses: actions/checkout@v3
- run: git config --global --add safe.directory "$GITHUB_WORKSPACE"
- name: Running Katana benchmarks
run: cargo bench --bench codec --bench execution -- --output-format bencher |sed 1d | tee output.txt
- name: Running Katana benchmarks
run: cargo bench --bench codec --bench execution --bench concurrent -- --output-format bencher |sed 1d | tee output.txt
- uses: benchmark-action/github-action-benchmark@v1
with:
tool: "cargo"
Expand Down
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/katana/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ similar-asserts.workspace = true
tokio.workspace = true

criterion.workspace = true
oneshot = { version = "0.1.8", default-features = false, features = [ "std" ] }
pprof = { version = "0.13.0", features = [ "criterion", "flamegraph" ] }
rayon.workspace = true

[features]
blockifier = [ "dep:blockifier", "dep:katana-cairo", "dep:starknet" ]
Expand All @@ -49,3 +51,8 @@ default = [ "blockifier" ]
harness = false
name = "execution"
required-features = [ "blockifier" ]

[[bench]]
harness = false
name = "concurrent"
required-features = [ "blockifier" ]
110 changes: 110 additions & 0 deletions crates/katana/executor/benches/concurrent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//! This benchmark is used to measure how much concurrency we can get when accessing the main
//! execution state for executing indepdenent transactions in parallel. This is useful to measure
//! how much concurrency we can get when the pending state is being accessed by multiple independent
//! requests.
use std::sync::Arc;
use std::time::Duration;

use criterion::measurement::WallTime;
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkGroup, Criterion};
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_primitives::env::{BlockEnv, CfgEnv};
use katana_primitives::transaction::ExecutableTxWithHash;
use katana_provider::test_utils;
use katana_provider::traits::state::StateFactoryProvider;
use pprof::criterion::{Output, PProfProfiler};
use rayon::ThreadPoolBuilder;

mod utils;
use utils::{envs, tx};

/// Right now, we guarantee that the transaction's execution will not fail/revert.
fn concurrent(c: &mut Criterion) {
const CONCURRENCY_SIZE: usize = 1000;

let mut group = c.benchmark_group("Concurrent.Simulate");
group.warm_up_time(Duration::from_millis(200));

let provider = test_utils::test_in_memory_provider();
let flags = SimulationFlag::new().skip_validate();

let tx = tx();
let envs = envs();

blockifier(&mut group, CONCURRENCY_SIZE, &provider, flags.clone(), envs.clone(), tx);
}

fn blockifier(
group: &mut BenchmarkGroup<'_, WallTime>,
concurrency_size: usize,
provider: impl StateFactoryProvider,
flags: SimulationFlag,
(block_env, cfg_env): (BlockEnv, CfgEnv),
tx: ExecutableTxWithHash,
) {
let factory = Arc::new(BlockifierFactory::new(cfg_env, flags.clone()));

group.bench_function("Blockifier.1", |b| {
b.iter_batched(
|| {
let state = provider.latest().expect("failed to get latest state");
let executor = factory.with_state_and_block_env(state, block_env.clone());
(executor, tx.clone(), flags.clone())
},
|(executor, tx, flags)| executor.simulate(vec![tx], flags),
BatchSize::SmallInput,
)
});

group.bench_function(format!("Blockifier.{concurrency_size}"), |b| {
// Setup the inputs for each thread to remove the overhead of creating the execution context
// for every thread inside the benchmark.
b.iter_batched(
|| {
let state = provider.latest().expect("failed to get latest state");
let executor = Arc::new(factory.with_state_and_block_env(state, block_env.clone()));
let pool = ThreadPoolBuilder::new().num_threads(concurrency_size).build().unwrap();

// setup inputs for each thread
let mut fxs = Vec::with_capacity(concurrency_size);
let mut handles = Vec::with_capacity(concurrency_size);

for _ in 0..concurrency_size {
let (sender, rx) = oneshot::channel();
handles.push(rx);

let tx = tx.clone();
let flags = flags.clone();
let executor = Arc::clone(&executor);

fxs.push(move || {
let _ = executor.simulate(vec![tx], flags);
sender.send(()).unwrap();
});
}

(pool, fxs, handles)
},
|(pool, fxs, handles)| {
for fx in fxs {
pool.spawn(fx);
}

for handle in handles {
handle.recv().unwrap();
}
},
BatchSize::SmallInput,
)
});
}

criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = concurrent
}

criterion_main!(benches);

0 comments on commit 97ea6b0

Please sign in to comment.