From 2d1bb5a0972f500bdde22ada420b1f6c5fcfaed8 Mon Sep 17 00:00:00 2001 From: Ivan Chepurnyi Date: Mon, 1 Apr 2024 23:09:29 +0200 Subject: [PATCH] Continue working on reporters - Benchmarks for decisions on aggregate storage data structure (choosen smallvec instead of hashmap) - Added scale for aggregate report precision in histogram to prevent high values --- Cargo.toml | 4 + benches/aggregator_benchmark.rs | 8 +- benches/aggregator_metric_lookup.rs | 183 +++++++++++++++++++++++++++ src/metric/reporter/mod.rs | 1 + src/metric/reporter/scale.rs | 153 ++++++++++++++++++++++ src/metric/reporter/test_reporter.rs | 2 +- 6 files changed, 349 insertions(+), 2 deletions(-) create mode 100644 benches/aggregator_metric_lookup.rs create mode 100644 src/metric/reporter/scale.rs diff --git a/Cargo.toml b/Cargo.toml index 27e7174..d2a0482 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,5 +38,9 @@ lto = false name = "aggregator_benchmark" harness = false +[[bench]] +name = "aggregator_metric_lookup" +harness = false + [features] test_util = [] \ No newline at end of file diff --git a/benches/aggregator_benchmark.rs b/benches/aggregator_benchmark.rs index e995005..cb4627b 100644 --- a/benches/aggregator_benchmark.rs +++ b/benches/aggregator_benchmark.rs @@ -1,3 +1,9 @@ +/* + * Copyright © 2024. EcomDev B.V. + * All rights reserved. + * See LICENSE for license details. + */ + use criterion::{black_box, criterion_group, criterion_main, Criterion}; use hdrhistogram::Histogram; use smallvec::SmallVec; @@ -106,7 +112,7 @@ impl FlusherRecorder { } fn criterion_benchmark(c: &mut Criterion) { - let mut group = c.benchmark_group("measure_performance"); + let mut group = c.benchmark_group("decision_on_aggregate"); let values = black_box(0..2000); group.bench_with_input("accumulator", &values, |bench, values| { diff --git a/benches/aggregator_metric_lookup.rs b/benches/aggregator_metric_lookup.rs new file mode 100644 index 0000000..bbe8a88 --- /dev/null +++ b/benches/aggregator_metric_lookup.rs @@ -0,0 +1,183 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use hdrhistogram::Histogram; +use profusion::metric::{Metric, MetricRecordError, MetricReporter}; +use smallvec::SmallVec; +use std::collections::HashMap; +use std::time::Duration; + +#[derive(Hash, PartialEq, Eq, Debug, Clone, Copy)] +enum BenchMetric { + MetricOne, + MetricTwo, + MetricThree, + MetricFour, + MetricFive, + MetricSix, +} + +impl Metric for BenchMetric { + fn name(&self) -> &'static str { + match self { + Self::MetricOne => "metric_one", + Self::MetricTwo => "metric_two", + Self::MetricThree => "metric_three", + Self::MetricFour => "metric_four", + Self::MetricFive => "metric_five", + Self::MetricSix => "metric_six", + } + } +} + +struct ReportVec { + storage: Vec<(BenchMetric, Histogram)>, +} + +struct ReportSmallVec { + storage: SmallVec<[(BenchMetric, Histogram); 10]>, +} + +struct ReportHashMap { + storage: HashMap>, +} + +impl MetricReporter for ReportHashMap { + type Metric = BenchMetric; + + fn add_entry( + &mut self, + metric: Self::Metric, + latency: Duration, + error: Option<&MetricRecordError>, + ) { + let entry = self + .storage + .entry(metric) + .or_insert_with(|| Histogram::new(3).unwrap()); + + entry.record(latency.as_nanos() as u64).unwrap_or_default(); + } + + fn aggregate_into(self, other: &mut Self) { + todo!() + } +} + +impl MetricReporter for ReportVec { + type Metric = BenchMetric; + + fn add_entry( + &mut self, + metric: Self::Metric, + latency: Duration, + error: Option<&MetricRecordError>, + ) { + let position = self.storage.iter().position(|(metric, _)| metric == metric); + let (_, histogram) = match position { + Some(value) => &mut self.storage[value], + None => { + self.storage.push((metric, Histogram::new(3).unwrap())); + let index = self.storage.len() - 1; + &mut self.storage[index] + } + }; + + histogram + .record(latency.as_nanos() as u64) + .unwrap_or_default(); + } + + fn aggregate_into(self, other: &mut Self) { + todo!() + } +} + +impl MetricReporter for ReportSmallVec { + type Metric = BenchMetric; + + fn add_entry( + &mut self, + metric: Self::Metric, + latency: Duration, + error: Option<&MetricRecordError>, + ) { + let position = self.storage.iter().position(|(metric, _)| metric == metric); + let (_, histogram) = match position { + Some(value) => &mut self.storage[value], + None => { + self.storage.push((metric, Histogram::new(3).unwrap())); + let index = self.storage.len() - 1; + &mut self.storage[index] + } + }; + + histogram + .record(latency.as_nanos() as u64) + .unwrap_or_default(); + } + + fn aggregate_into(self, other: &mut Self) { + todo!() + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("decision_on_collection"); + + let values = black_box( + (0..=2000usize) + .into_iter() + .map(|index| { + ( + match index % 6 { + 0 => BenchMetric::MetricOne, + 1 => BenchMetric::MetricTwo, + 2 => BenchMetric::MetricThree, + 3 => BenchMetric::MetricFour, + 4 => BenchMetric::MetricFive, + _ => BenchMetric::MetricSix, + }, + Duration::from_micros((index % 100 * 1000) as u64), + ) + }) + .collect::>(), + ); + + group.bench_with_input("hashmap", &values, |bench, values| { + bench.iter(move || { + let mut reporter = ReportHashMap { + storage: HashMap::new(), + }; + + values + .iter() + .for_each(|(metric, value)| reporter.add_entry(*metric, *value, None)); + }); + }); + + group.bench_with_input("vec", &values, |bench, values| { + bench.iter(move || { + let mut reporter = ReportVec { + storage: Vec::new(), + }; + + values + .iter() + .for_each(|(metric, value)| reporter.add_entry(*metric, *value, None)); + }); + }); + + group.bench_with_input("small_vec", &values, |bench, values| { + bench.iter(move || { + let mut reporter = ReportSmallVec { + storage: SmallVec::new(), + }; + + values + .iter() + .for_each(|(metric, value)| reporter.add_entry(*metric, *value, None)); + }); + }); +} + +criterion_group!(metric_benches, criterion_benchmark); +criterion_main!(metric_benches); diff --git a/src/metric/reporter/mod.rs b/src/metric/reporter/mod.rs index 5d17a3e..d0030f8 100644 --- a/src/metric/reporter/mod.rs +++ b/src/metric/reporter/mod.rs @@ -1,3 +1,4 @@ +mod scale; #[cfg(any(feature = "test_util", test))] mod test_reporter; diff --git a/src/metric/reporter/scale.rs b/src/metric/reporter/scale.rs new file mode 100644 index 0000000..e68022e --- /dev/null +++ b/src/metric/reporter/scale.rs @@ -0,0 +1,153 @@ +use std::time::Duration; + +/// Scale for reports +/// +/// Defaults to microseconds + +#[derive(Default, Debug, PartialEq, Eq)] +pub enum ReporterScale { + Nanoseconds, + #[default] + Microseconds, + Milliseconds, + Seconds, +} + +const NANOS_PER_SEC: u64 = 1_000_000_000; +const MICROS_PER_SEC: u64 = 1_000_000; +const MILLIS_PER_SEC: u64 = 1_000; + +impl ReporterScale { + /// Converts duration into single u64 value + /// + /// It is used by report aggregator to calculate stats based on chosen scale + pub fn duration_to_value(&self, duration: Duration) -> u64 { + match self { + Self::Nanoseconds => { + duration.as_secs() * NANOS_PER_SEC + duration.subsec_nanos() as u64 + } + Self::Microseconds => { + duration.as_secs() * MICROS_PER_SEC + duration.subsec_micros() as u64 + } + Self::Milliseconds => { + duration.as_secs() * MILLIS_PER_SEC + duration.subsec_millis() as u64 + } + Self::Seconds => duration.as_secs(), + } + } + + /// Converts aggregated value into duration + /// + /// Can be used to generate duration based on aggregate report + pub fn value_to_duration(&self, value: u64) -> Duration { + match self { + ReporterScale::Nanoseconds => Duration::from_nanos(value), + ReporterScale::Microseconds => Duration::from_micros(value), + ReporterScale::Milliseconds => Duration::from_millis(value), + ReporterScale::Seconds => Duration::from_secs(value), + } + } + + pub fn aggregate_to_duration(&self, value: f64) -> Duration { + let (seconds, nanos) = match self { + ReporterScale::Nanoseconds => ( + value as u64 / NANOS_PER_SEC, + (value as u64 % NANOS_PER_SEC) as u32, + ), + ReporterScale::Microseconds => ( + value as u64 / MICROS_PER_SEC, + ((value % MICROS_PER_SEC as f64) * 1_000f64) as u32, + ), + ReporterScale::Milliseconds => ( + value as u64 / MILLIS_PER_SEC, + ((value % MILLIS_PER_SEC as f64) * 1_000_000f64) as u32, + ), + ReporterScale::Seconds => ( + value as u64, + (value * NANOS_PER_SEC as f64 % NANOS_PER_SEC as f64) as u32, + ), + }; + + Duration::new(seconds, nanos) + } +} + +#[cfg(test)] +mod tests { + use crate::metric::reporter::scale::ReporterScale; + use std::time::Duration; + + #[test] + fn defaults_to_microseconds() { + assert_eq!(ReporterScale::default(), ReporterScale::Microseconds) + } + + #[test] + fn converts_duration_to_value() { + assert_eq!( + ReporterScale::Nanoseconds.duration_to_value(Duration::new(2, 100_000_000)), + 2_100_000_000 + ); + + assert_eq!( + ReporterScale::Microseconds.duration_to_value(Duration::new(29, 20_000)), + 29_000_020 + ); + + assert_eq!( + ReporterScale::Milliseconds.duration_to_value(Duration::new(25, 100_000_000)), + 25_100 + ); + + assert_eq!( + ReporterScale::Seconds.duration_to_value(Duration::new(25, 20)), + 25 + ); + } + + #[test] + fn converts_value_to_duration() { + assert_eq!( + ReporterScale::Nanoseconds.value_to_duration(2_100_000_000), + Duration::new(2, 100_000_000) + ); + + assert_eq!( + ReporterScale::Microseconds.value_to_duration(29_000_020), + Duration::new(29, 20_000) + ); + + assert_eq!( + ReporterScale::Milliseconds.value_to_duration(25_100), + Duration::new(25, 100_000_000) + ); + + assert_eq!( + ReporterScale::Seconds.value_to_duration(25), + Duration::new(25, 0) + ); + } + + #[test] + fn converts_aggregate_to_duration() { + assert_eq!( + ReporterScale::Nanoseconds.aggregate_to_duration(1_000_002_100.001), + Duration::new(1, 2_100) + ); + + assert_eq!( + ReporterScale::Microseconds.aggregate_to_duration(29_000_020.01), + Duration::new(29, 20_010) + ); + + assert_eq!( + ReporterScale::Milliseconds.aggregate_to_duration(25_100.04), + Duration::new(25, 100_040_000) + ); + + assert_eq!( + ReporterScale::Seconds.aggregate_to_duration(25.0122), + Duration::new(25, 12_200_000) + ); + } +} diff --git a/src/metric/reporter/test_reporter.rs b/src/metric/reporter/test_reporter.rs index 2305e13..92a2705 100644 --- a/src/metric/reporter/test_reporter.rs +++ b/src/metric/reporter/test_reporter.rs @@ -3,7 +3,7 @@ use std::{marker::PhantomData, time::Duration}; /// Test reporter builder /// -/// Creates instances of test reporter for testing +/// Creates instances of reporter for testing pub struct TestReporterBuilder(PhantomData); /// Test reporter