diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 9a6501cf3c..edfb918a6e 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -18,8 +18,8 @@ jobs: steps: - uses: actions/checkout@v3 - run: git config --global --add safe.directory "$GITHUB_WORKSPACE" - - name: Running Katana benchmarks - run: cargo bench --bench codec --bench execution -- --output-format bencher |sed 1d | tee output.txt + - name: Running Katana benchmarks + run: cargo bench --bench codec --bench execution --bench concurrent -- --output-format bencher |sed 1d | tee output.txt - uses: benchmark-action/github-action-benchmark@v1 with: tool: "cargo" diff --git a/Cargo.lock b/Cargo.lock index 0fccac086f..c347305de5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8146,8 +8146,10 @@ dependencies = [ "katana-primitives", "katana-provider", "katana-rpc-types", + "oneshot", "parking_lot 0.12.3", "pprof", + "rayon", "rstest 0.18.2", "rstest_reuse", "serde_json", @@ -9991,6 +9993,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oneshot" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" + [[package]] name = "oorandom" version = "11.1.3" diff --git a/crates/katana/executor/Cargo.toml b/crates/katana/executor/Cargo.toml index f2c66d6f33..01f6063f1b 100644 --- a/crates/katana/executor/Cargo.toml +++ b/crates/katana/executor/Cargo.toml @@ -37,7 +37,9 @@ similar-asserts.workspace = true tokio.workspace = true criterion.workspace = true +oneshot = { version = "0.1.8", default-features = false, features = [ "std" ] } pprof = { version = "0.13.0", features = [ "criterion", "flamegraph" ] } +rayon.workspace = true [features] blockifier = [ "dep:blockifier", "dep:katana-cairo", "dep:starknet" ] @@ -49,3 +51,8 @@ default = [ "blockifier" ] harness = false name = "execution" required-features = [ "blockifier" ] + +[[bench]] +harness = false +name = "concurrent" +required-features = [ "blockifier" ] diff --git a/crates/katana/executor/benches/concurrent.rs b/crates/katana/executor/benches/concurrent.rs new file mode 100644 index 0000000000..cf0df90374 --- /dev/null +++ b/crates/katana/executor/benches/concurrent.rs @@ -0,0 +1,110 @@ +//! This benchmark is used to measure how much concurrency we can get when accessing the main +//! execution state for executing indepdenent transactions in parallel. This is useful to measure +//! how much concurrency we can get when the pending state is being accessed by multiple independent +//! requests. + +use std::sync::Arc; +use std::time::Duration; + +use criterion::measurement::WallTime; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkGroup, Criterion}; +use katana_executor::implementation::blockifier::BlockifierFactory; +use katana_executor::{ExecutorFactory, SimulationFlag}; +use katana_primitives::env::{BlockEnv, CfgEnv}; +use katana_primitives::transaction::ExecutableTxWithHash; +use katana_provider::test_utils; +use katana_provider::traits::state::StateFactoryProvider; +use pprof::criterion::{Output, PProfProfiler}; +use rayon::ThreadPoolBuilder; + +mod utils; +use utils::{envs, tx}; + +/// Right now, we guarantee that the transaction's execution will not fail/revert. +fn concurrent(c: &mut Criterion) { + const CONCURRENCY_SIZE: usize = 1000; + + let mut group = c.benchmark_group("Concurrent.Simulate"); + group.warm_up_time(Duration::from_millis(200)); + + let provider = test_utils::test_in_memory_provider(); + let flags = SimulationFlag::new().skip_validate(); + + let tx = tx(); + let envs = envs(); + + blockifier(&mut group, CONCURRENCY_SIZE, &provider, flags.clone(), envs.clone(), tx); +} + +fn blockifier( + group: &mut BenchmarkGroup<'_, WallTime>, + concurrency_size: usize, + provider: impl StateFactoryProvider, + flags: SimulationFlag, + (block_env, cfg_env): (BlockEnv, CfgEnv), + tx: ExecutableTxWithHash, +) { + let factory = Arc::new(BlockifierFactory::new(cfg_env, flags.clone())); + + group.bench_function("Blockifier.1", |b| { + b.iter_batched( + || { + let state = provider.latest().expect("failed to get latest state"); + let executor = factory.with_state_and_block_env(state, block_env.clone()); + (executor, tx.clone(), flags.clone()) + }, + |(executor, tx, flags)| executor.simulate(vec![tx], flags), + BatchSize::SmallInput, + ) + }); + + group.bench_function(format!("Blockifier.{concurrency_size}"), |b| { + // Setup the inputs for each thread to remove the overhead of creating the execution context + // for every thread inside the benchmark. + b.iter_batched( + || { + let state = provider.latest().expect("failed to get latest state"); + let executor = Arc::new(factory.with_state_and_block_env(state, block_env.clone())); + let pool = ThreadPoolBuilder::new().num_threads(concurrency_size).build().unwrap(); + + // setup inputs for each thread + let mut fxs = Vec::with_capacity(concurrency_size); + let mut handles = Vec::with_capacity(concurrency_size); + + for _ in 0..concurrency_size { + let (sender, rx) = oneshot::channel(); + handles.push(rx); + + let tx = tx.clone(); + let flags = flags.clone(); + let executor = Arc::clone(&executor); + + fxs.push(move || { + let _ = executor.simulate(vec![tx], flags); + sender.send(()).unwrap(); + }); + } + + (pool, fxs, handles) + }, + |(pool, fxs, handles)| { + for fx in fxs { + pool.spawn(fx); + } + + for handle in handles { + handle.recv().unwrap(); + } + }, + BatchSize::SmallInput, + ) + }); +} + +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = concurrent +} + +criterion_main!(benches);