Skip to content

Commit

Permalink
Continue working on reporters
Browse files Browse the repository at this point in the history
- Benchmarks for decisions on aggregate storage data structure (choosen
  smallvec instead of hashmap)
- Added scale for aggregate report precision in histogram to prevent
  high values
  • Loading branch information
IvanChepurnyi committed Apr 1, 2024
1 parent 974d5d6 commit 2d1bb5a
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 2 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,9 @@ lto = false
name = "aggregator_benchmark"
harness = false

[[bench]]
name = "aggregator_metric_lookup"
harness = false

[features]
test_util = []
8 changes: 7 additions & 1 deletion benches/aggregator_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/*
* Copyright © 2024. EcomDev B.V.
* All rights reserved.
* See LICENSE for license details.
*/

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use hdrhistogram::Histogram;
use smallvec::SmallVec;
Expand Down Expand Up @@ -106,7 +112,7 @@ impl<M: Metric> FlusherRecorder<M> {
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("measure_performance");
let mut group = c.benchmark_group("decision_on_aggregate");
let values = black_box(0..2000);

group.bench_with_input("accumulator", &values, |bench, values| {
Expand Down
183 changes: 183 additions & 0 deletions benches/aggregator_metric_lookup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use hdrhistogram::Histogram;
use profusion::metric::{Metric, MetricRecordError, MetricReporter};
use smallvec::SmallVec;
use std::collections::HashMap;
use std::time::Duration;

#[derive(Hash, PartialEq, Eq, Debug, Clone, Copy)]
enum BenchMetric {
MetricOne,
MetricTwo,
MetricThree,
MetricFour,
MetricFive,
MetricSix,
}

impl Metric for BenchMetric {
fn name(&self) -> &'static str {
match self {
Self::MetricOne => "metric_one",
Self::MetricTwo => "metric_two",
Self::MetricThree => "metric_three",
Self::MetricFour => "metric_four",
Self::MetricFive => "metric_five",
Self::MetricSix => "metric_six",
}
}
}

struct ReportVec {
storage: Vec<(BenchMetric, Histogram<u64>)>,
}

struct ReportSmallVec {
storage: SmallVec<[(BenchMetric, Histogram<u64>); 10]>,
}

struct ReportHashMap {
storage: HashMap<BenchMetric, Histogram<u64>>,
}

impl MetricReporter for ReportHashMap {
type Metric = BenchMetric;

fn add_entry(
&mut self,
metric: Self::Metric,
latency: Duration,
error: Option<&MetricRecordError>,
) {
let entry = self
.storage
.entry(metric)
.or_insert_with(|| Histogram::new(3).unwrap());

entry.record(latency.as_nanos() as u64).unwrap_or_default();
}

fn aggregate_into(self, other: &mut Self) {
todo!()
}
}

impl MetricReporter for ReportVec {
type Metric = BenchMetric;

fn add_entry(
&mut self,
metric: Self::Metric,
latency: Duration,
error: Option<&MetricRecordError>,
) {
let position = self.storage.iter().position(|(metric, _)| metric == metric);
let (_, histogram) = match position {
Some(value) => &mut self.storage[value],
None => {
self.storage.push((metric, Histogram::new(3).unwrap()));
let index = self.storage.len() - 1;
&mut self.storage[index]
}
};

histogram
.record(latency.as_nanos() as u64)
.unwrap_or_default();
}

fn aggregate_into(self, other: &mut Self) {
todo!()
}
}

impl MetricReporter for ReportSmallVec {
type Metric = BenchMetric;

fn add_entry(
&mut self,
metric: Self::Metric,
latency: Duration,
error: Option<&MetricRecordError>,
) {
let position = self.storage.iter().position(|(metric, _)| metric == metric);
let (_, histogram) = match position {
Some(value) => &mut self.storage[value],
None => {
self.storage.push((metric, Histogram::new(3).unwrap()));
let index = self.storage.len() - 1;
&mut self.storage[index]
}
};

histogram
.record(latency.as_nanos() as u64)
.unwrap_or_default();
}

fn aggregate_into(self, other: &mut Self) {
todo!()
}
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("decision_on_collection");

let values = black_box(
(0..=2000usize)
.into_iter()
.map(|index| {
(
match index % 6 {
0 => BenchMetric::MetricOne,
1 => BenchMetric::MetricTwo,
2 => BenchMetric::MetricThree,
3 => BenchMetric::MetricFour,
4 => BenchMetric::MetricFive,
_ => BenchMetric::MetricSix,
},
Duration::from_micros((index % 100 * 1000) as u64),
)
})
.collect::<Vec<_>>(),
);

group.bench_with_input("hashmap", &values, |bench, values| {
bench.iter(move || {
let mut reporter = ReportHashMap {
storage: HashMap::new(),
};

values
.iter()
.for_each(|(metric, value)| reporter.add_entry(*metric, *value, None));
});
});

group.bench_with_input("vec", &values, |bench, values| {
bench.iter(move || {
let mut reporter = ReportVec {
storage: Vec::new(),
};

values
.iter()
.for_each(|(metric, value)| reporter.add_entry(*metric, *value, None));
});
});

group.bench_with_input("small_vec", &values, |bench, values| {
bench.iter(move || {
let mut reporter = ReportSmallVec {
storage: SmallVec::new(),
};

values
.iter()
.for_each(|(metric, value)| reporter.add_entry(*metric, *value, None));
});
});
}

criterion_group!(metric_benches, criterion_benchmark);
criterion_main!(metric_benches);
1 change: 1 addition & 0 deletions src/metric/reporter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod scale;
#[cfg(any(feature = "test_util", test))]
mod test_reporter;

Expand Down
153 changes: 153 additions & 0 deletions src/metric/reporter/scale.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::time::Duration;

/// Scale for reports
///
/// Defaults to microseconds
#[derive(Default, Debug, PartialEq, Eq)]
pub enum ReporterScale {
Nanoseconds,
#[default]
Microseconds,
Milliseconds,
Seconds,
}

const NANOS_PER_SEC: u64 = 1_000_000_000;
const MICROS_PER_SEC: u64 = 1_000_000;
const MILLIS_PER_SEC: u64 = 1_000;

impl ReporterScale {
/// Converts duration into single u64 value
///
/// It is used by report aggregator to calculate stats based on chosen scale
pub fn duration_to_value(&self, duration: Duration) -> u64 {
match self {
Self::Nanoseconds => {
duration.as_secs() * NANOS_PER_SEC + duration.subsec_nanos() as u64
}
Self::Microseconds => {
duration.as_secs() * MICROS_PER_SEC + duration.subsec_micros() as u64
}
Self::Milliseconds => {
duration.as_secs() * MILLIS_PER_SEC + duration.subsec_millis() as u64
}
Self::Seconds => duration.as_secs(),
}
}

/// Converts aggregated value into duration
///
/// Can be used to generate duration based on aggregate report
pub fn value_to_duration(&self, value: u64) -> Duration {
match self {
ReporterScale::Nanoseconds => Duration::from_nanos(value),
ReporterScale::Microseconds => Duration::from_micros(value),
ReporterScale::Milliseconds => Duration::from_millis(value),
ReporterScale::Seconds => Duration::from_secs(value),
}
}

pub fn aggregate_to_duration(&self, value: f64) -> Duration {
let (seconds, nanos) = match self {
ReporterScale::Nanoseconds => (
value as u64 / NANOS_PER_SEC,
(value as u64 % NANOS_PER_SEC) as u32,
),
ReporterScale::Microseconds => (
value as u64 / MICROS_PER_SEC,
((value % MICROS_PER_SEC as f64) * 1_000f64) as u32,
),
ReporterScale::Milliseconds => (
value as u64 / MILLIS_PER_SEC,
((value % MILLIS_PER_SEC as f64) * 1_000_000f64) as u32,
),
ReporterScale::Seconds => (
value as u64,
(value * NANOS_PER_SEC as f64 % NANOS_PER_SEC as f64) as u32,
),
};

Duration::new(seconds, nanos)
}
}

#[cfg(test)]
mod tests {
use crate::metric::reporter::scale::ReporterScale;
use std::time::Duration;

#[test]
fn defaults_to_microseconds() {
assert_eq!(ReporterScale::default(), ReporterScale::Microseconds)
}

#[test]
fn converts_duration_to_value() {
assert_eq!(
ReporterScale::Nanoseconds.duration_to_value(Duration::new(2, 100_000_000)),
2_100_000_000
);

assert_eq!(
ReporterScale::Microseconds.duration_to_value(Duration::new(29, 20_000)),
29_000_020
);

assert_eq!(
ReporterScale::Milliseconds.duration_to_value(Duration::new(25, 100_000_000)),
25_100
);

assert_eq!(
ReporterScale::Seconds.duration_to_value(Duration::new(25, 20)),
25
);
}

#[test]
fn converts_value_to_duration() {
assert_eq!(
ReporterScale::Nanoseconds.value_to_duration(2_100_000_000),
Duration::new(2, 100_000_000)
);

assert_eq!(
ReporterScale::Microseconds.value_to_duration(29_000_020),
Duration::new(29, 20_000)
);

assert_eq!(
ReporterScale::Milliseconds.value_to_duration(25_100),
Duration::new(25, 100_000_000)
);

assert_eq!(
ReporterScale::Seconds.value_to_duration(25),
Duration::new(25, 0)
);
}

#[test]
fn converts_aggregate_to_duration() {
assert_eq!(
ReporterScale::Nanoseconds.aggregate_to_duration(1_000_002_100.001),
Duration::new(1, 2_100)
);

assert_eq!(
ReporterScale::Microseconds.aggregate_to_duration(29_000_020.01),
Duration::new(29, 20_010)
);

assert_eq!(
ReporterScale::Milliseconds.aggregate_to_duration(25_100.04),
Duration::new(25, 100_040_000)
);

assert_eq!(
ReporterScale::Seconds.aggregate_to_duration(25.0122),
Duration::new(25, 12_200_000)
);
}
}
2 changes: 1 addition & 1 deletion src/metric/reporter/test_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{marker::PhantomData, time::Duration};

/// Test reporter builder
///
/// Creates instances of test reporter for testing
/// Creates instances of reporter for testing
pub struct TestReporterBuilder<T>(PhantomData<T>);

/// Test reporter
Expand Down

0 comments on commit 2d1bb5a

Please sign in to comment.