From cf0d5db823b6f761329512d1fd75d571f58f07be Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Wed, 21 Feb 2024 13:28:26 +0100 Subject: [PATCH] Use `DDSketch` to compute percentiles in stresstest --- Cargo.lock | 7 +++++ crates/symbolicator-stress/Cargo.toml | 1 + crates/symbolicator-stress/src/stresstest.rs | 31 +++++++++++++------- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 02d07091c..1a3ecfd6b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3928,6 +3928,12 @@ dependencies = [ "walkdir", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" + [[package]] name = "slab" version = "0.4.9" @@ -4554,6 +4560,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sketches-ddsketch", "symbolicator-js", "symbolicator-native", "symbolicator-proguard", diff --git a/crates/symbolicator-stress/Cargo.toml b/crates/symbolicator-stress/Cargo.toml index 23c1c4e0b..bd3abc31a 100644 --- a/crates/symbolicator-stress/Cargo.toml +++ b/crates/symbolicator-stress/Cargo.toml @@ -14,6 +14,7 @@ sentry = { version = "0.32.1", features = ["anyhow", "debug-images", "tracing"] serde = { version = "1.0.137", features = ["derive"] } serde_json = "1.0.81" serde_yaml = "0.9.14" +sketches-ddsketch = "0.2.2" symbolicator-js = { path = "../symbolicator-js" } symbolicator-native = { path = "../symbolicator-native" } symbolicator-proguard = { path = "../symbolicator-proguard" } diff --git a/crates/symbolicator-stress/src/stresstest.rs b/crates/symbolicator-stress/src/stresstest.rs index d04189042..43f73c13b 100644 --- a/crates/symbolicator-stress/src/stresstest.rs +++ b/crates/symbolicator-stress/src/stresstest.rs @@ -1,10 +1,10 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use anyhow::{Context, Result}; use sentry::SentryFutureExt; +use sketches_ddsketch::DDSketch; use symbolicator_js::SourceMapService; use symbolicator_native::SymbolicationActor; use symbolicator_service::config::Config as SymbolicatorConfig; @@ -68,7 +68,7 @@ pub async fn perform_stresstest( let workload = Arc::clone(&workload); let task = tokio::spawn(async move { - let finished_tasks = Arc::new(AtomicUsize::new(0)); + let task_durations = Arc::new(Mutex::new(DDSketch::default())); let semaphore = Arc::new(Semaphore::new(concurrency)); // See @@ -83,7 +83,8 @@ pub async fn perform_stresstest( permit = semaphore.clone().acquire_owned() => { let workload = Arc::clone(&workload); let symbolication = Arc::clone(&symbolication); - let finished_tasks = Arc::clone(&finished_tasks); + let task_durations = Arc::clone(&task_durations); + let task_start = Instant::now(); let hub = sentry::Hub::new_from_top(sentry::Hub::current()); let ctx = sentry::TransactionContext::new("stresstest", "stresstest"); @@ -92,11 +93,10 @@ pub async fn perform_stresstest( let future = async move { process_payload(&symbolication, &workload).await; - // TODO: maybe maintain a histogram? - finished_tasks.fetch_add(1, Ordering::Relaxed); - transaction.finish(); + task_durations.lock().unwrap().add(task_start.elapsed().as_secs_f64()); + drop(permit); }; let future = future.bind_hub(hub); @@ -109,13 +109,15 @@ pub async fn perform_stresstest( } } - // we only count finished tasks - let ops = finished_tasks.load(Ordering::Relaxed); + let task_durations: DDSketch = { + let mut task_durations = task_durations.lock().unwrap(); + std::mem::take(&mut task_durations) + }; // by acquiring *all* the semaphores, we essentially wait for all outstanding tasks to finish let _permits = semaphore.acquire_many(concurrency as u32).await; - (concurrency, ops) + (concurrency, task_durations) }); tasks.push(task); } @@ -123,10 +125,17 @@ pub async fn perform_stresstest( let finished_tasks = futures::future::join_all(tasks).await; for (i, task) in finished_tasks.into_iter().enumerate() { - let (concurrency, ops) = task.unwrap(); + let (concurrency, task_durations) = task.unwrap(); + let ops = task_durations.count(); let ops_ps = ops as f32 / duration.as_secs() as f32; println!("Workload {i} (concurrency: {concurrency}): {ops} operations, {ops_ps:.2} ops/s"); + + let avg = Duration::from_secs_f64(task_durations.sum().unwrap() / ops as f64); + let p50 = Duration::from_secs_f64(task_durations.quantile(0.5).unwrap().unwrap()); + let p90 = Duration::from_secs_f64(task_durations.quantile(0.9).unwrap().unwrap()); + let p99 = Duration::from_secs_f64(task_durations.quantile(0.99).unwrap().unwrap()); + println!(" avg: {avg:.2?}; p50: {p50:.2?}; p90: {p90:.2?}; p99: {p99:.2?}"); } Ok(())