Skip to content

Commit

Permalink
Preallocate and keep memory for HashMap in Metric aggregation (open-t…
Browse files Browse the repository at this point in the history
  • Loading branch information
cijothomas authored and pitoniak32 committed Dec 4, 2024
1 parent 3c96a34 commit e7b1d86
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 14 deletions.
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use super::{
precomputed_sum::PrecomputedSum, sum::Sum, Number,
};

const STREAM_CARDINALITY_LIMIT: u32 = 2000;
pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;

/// Checks whether aggregator has hit cardinality limit for metric streams
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
size < STREAM_CARDINALITY_LIMIT as usize
size < STREAM_CARDINALITY_LIMIT
}

/// Receives measurements to be aggregated.
Expand Down
36 changes: 24 additions & 12 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ mod sum;

use core::fmt;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};

use aggregate::is_under_cardinality_limit;
use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -51,6 +51,11 @@ where
{
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

/// Used by collect exclusively. The data type must match the one used in
/// `trackers` to allow mem::swap.
trackers_for_collect: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
Expand All @@ -67,7 +72,10 @@ where
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
// TODO: For cumulative, this is not required, so avoid this
// pre-allocation.
trackers_for_collect: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
Expand Down Expand Up @@ -170,19 +178,23 @@ where
));
}

let trackers = match self.trackers.write() {
Ok(mut trackers) => {
if let Ok(mut trackers_collect) = self.trackers_for_collect.write() {
if let Ok(mut trackers_current) = self.trackers.write() {
swap(trackers_collect.deref_mut(), trackers_current.deref_mut());
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
} else {
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
return;
}
Err(_) => todo!(),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
let mut seen = HashSet::new();
for (attrs, tracker) in trackers_collect.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
}
}
} else {
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
}
}
}
Expand Down

0 comments on commit e7b1d86

Please sign in to comment.