Skip to content

Commit

Permalink
Measure metrics latency
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 21, 2024
1 parent 465fcc2 commit e0df038
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 0 deletions.
5 changes: 5 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ name = "metrics_histogram"
path = "src/metrics_histogram.rs"
doc = false

[[bin]] # Bin to run measure latency in various conditions
name = "metrics_latency"
path = "src/metrics_latency.rs"
doc = false

[[bin]] # Bin to run the metrics overflow stress tests
name = "metrics_overflow"
path = "src/metrics_overflow.rs"
Expand Down
229 changes: 229 additions & 0 deletions stress/src/metrics_latency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
use std::{
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex, Weak},
time::{Duration, Instant},
};

use opentelemetry::{metrics::MeterProvider, KeyValue};
use opentelemetry_sdk::{
metrics::{
data::{self, ResourceMetrics},
reader::MetricReader,
InstrumentKind, ManualReader, MetricError, Pipeline, SdkMeterProvider, Temporality,
},
Resource,
};

// copy/paste from opentelemetry-sdk/benches/metric.rs
#[derive(Clone, Debug)]
pub struct SharedReader(Arc<dyn MetricReader>);

impl SharedReader {
pub fn new<R>(reader: R) -> Self
where
R: MetricReader,
{
Self(Arc::new(reader))
}
}

impl MetricReader for SharedReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(&self, rm: &mut ResourceMetrics) -> Result<(), MetricError> {
self.0.collect(rm)
}

fn force_flush(&self) -> Result<(), MetricError> {
self.0.force_flush()
}

fn shutdown(&self) -> Result<(), MetricError> {
self.0.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}

const ITERATIONS_PER_THREAD: usize = 10000;

fn main() {
let available_threads: usize = std::thread::available_parallelism().map_or(1, |p| p.get());

measure_update_latency(
&format!("half threads = {available_threads}/2, lots of inserts"),
available_threads / 2,
|_i, j| format!("{}", j % 2000),
);
measure_update_latency(
&format!("half threads = {available_threads}/2, updates only"),
available_threads / 2,
|_i, _j| String::new(),
);
measure_update_latency(
&format!("twice threads = {available_threads}*2, lots of inserts"),
available_threads * 2,
|_i, j| format!("{}", j % 2000),
);
measure_update_latency(
&format!("twice threads = {available_threads}*2, updates only"),
available_threads * 2,
|_i, _j| String::new(),
);
}

fn measure_update_latency(msg: &str, threads_count: usize, keyname: fn(usize, usize) -> String) {
let reader = SharedReader::new(
ManualReader::builder()
.with_temporality(Temporality::Delta)
.build(),
);
let provider = SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
let histogram = provider.meter("test").u64_counter("hello").build();
let mut threads = Vec::new();
let mut stats = Vec::new();
stats.resize_with(threads_count, || {
Arc::new(Mutex::new(HashMap::<u64, u64>::new()))
});
// run multiple threads and measure how time it takes to update metric
for thread_idx in 0..threads_count {
let hist = histogram.clone();
let stat = stats[thread_idx].clone();
threads.push(std::thread::spawn(move || {
let mut stat = stat.lock().unwrap();
for iter_idx in 0..ITERATIONS_PER_THREAD {
let kv = KeyValue::new(keyname(thread_idx, iter_idx), 1);
let start = Instant::now();
hist.add(1, &[kv]);
let curr = stat.entry(start.elapsed().as_nanos() as u64).or_default();
*curr += 1;
}
}));
}
// collect threads in short intervals
let mut total_count = 0;
while threads.iter().any(|t| !t.is_finished()) {
// collect agressively so we could measure inserts properly,
// but not loop, as we'll see no difference in updates vs inserts due to super high contention
std::thread::sleep(Duration::from_micros(500));
total_count += collect_and_return_count(&reader);
}
threads.into_iter().for_each(|t| {
t.join().unwrap();
});
total_count += collect_and_return_count(&reader);

let total_measurements = (threads_count * ITERATIONS_PER_THREAD) as u64;
assert_eq!(total_count, total_measurements);

let stats = stats
.into_iter()
.map(|s| Arc::into_inner(s).unwrap().into_inner().unwrap())
.flat_map(|s| s.into_iter())
.fold(BTreeMap::<u64, u64>::default(), |mut acc, (time, count)| {
*acc.entry(time).or_default() += count;
acc
});

let sum = stats.iter().fold(0, |mut acc, (&time, &count)| {
acc += time * count;
acc
});

println!("{msg}");
println!("avg {}", format_time(sum / total_measurements as u64));
println!(
"p50 {}",
format_time(get_percentile_value(total_measurements, &stats, 50))
);
println!(
"p95 {}",
format_time(get_percentile_value(total_measurements, &stats, 95))
);
println!(
"p99 {}",
format_time(get_percentile_value(total_measurements, &stats, 99))
);
}

fn collect_and_return_count(reader: &SharedReader) -> u64 {
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
reader.collect(&mut rm).unwrap();
rm.scope_metrics
.into_iter()
.flat_map(|sm| sm.metrics.into_iter())
.flat_map(|m| {
m.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.unwrap()
.data_points
.clone()
.into_iter()
})
.map(|dp| dp.value)
.sum()
}

fn get_percentile_value(
total_measurements: u64,
stats: &BTreeMap<u64, u64>,
percentile: u64,
) -> u64 {
assert!(percentile > 0 && percentile < 100);
let break_point = ((total_measurements as f64 * percentile as f64) / 100.0) as u64;
let mut iter = stats.iter().peekable();
let mut sum = 0;
while let Some(left) = iter.next() {
sum += left.1;
if let Some(&right) = iter.peek() {
let next_sum = sum + right.1;
if next_sum > break_point {
// interpolate
let diff = (next_sum - sum) as f32;
let ratio = (break_point - sum) as f32 / diff;
let time_diff = (right.0 - left.0) as f32;
return *left.0 + (time_diff * ratio) as u64;
}
}
}
0
}

fn format_time(nanos: u64) -> String {
let nanos = nanos as f64;
let (val, symbol) = if nanos > 1000000.0 {
(nanos / 1000000.0, "ms")
} else if nanos > 1000.0 {
(nanos / 1000.0, "μs")
} else {
(nanos, "ns")
};
if val > 100.0 {
format!("{val:>5.1}{symbol}")
} else if val > 10.0 {
format!("{val:>5.2}{symbol}")
} else {
format!("{val:>5.3}{symbol}")
}
}

#[test]
fn format_number() {
assert_eq!("12.00ns", format_time(12));
assert_eq!("123.0ns", format_time(123));
assert_eq!("1.234μs", format_time(1234));
assert_eq!("12.35μs", format_time(12349));
assert_eq!("123.4μs", format_time(123400));
assert_eq!("1.235ms", format_time(1234900));
assert_eq!("12.34ms", format_time(12340000));
}

0 comments on commit e0df038

Please sign in to comment.