Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preallocate and keep memory for HashMap in Metric aggregation #2343

Merged
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use super::{
precomputed_sum::PrecomputedSum, sum::Sum, Number,
};

const STREAM_CARDINALITY_LIMIT: u32 = 2000;
pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000;

/// Checks whether aggregator has hit cardinality limit for metric streams
pub(crate) fn is_under_cardinality_limit(size: usize) -> bool {
size < STREAM_CARDINALITY_LIMIT as usize
size < STREAM_CARDINALITY_LIMIT
}

/// Receives measurements to be aggregated.
Expand Down
36 changes: 24 additions & 12 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ mod sum;

use core::fmt;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::mem::swap;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};

use aggregate::is_under_cardinality_limit;
use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT};
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -51,6 +51,11 @@ where
{
/// Trackers store the values associated with different attribute sets.
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

/// Used by collect exclusively. The data type must match the one used in
/// `trackers` to allow mem::swap.
trackers_for_collect: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,

/// Number of different attribute set stored in the `trackers` map.
count: AtomicUsize,
/// Indicates whether a value with no attributes has been stored.
Expand All @@ -67,7 +72,10 @@ where
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
// TODO: For cumulative, this is not required, so avoid this
// pre-allocation.
trackers_for_collect: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
Expand Down Expand Up @@ -170,19 +178,23 @@ where
));
}

let trackers = match self.trackers.write() {
Ok(mut trackers) => {
if let Ok(mut trackers_collect) = self.trackers_for_collect.write() {
if let Ok(mut trackers_current) = self.trackers.write() {
swap(trackers_collect.deref_mut(), trackers_current.deref_mut());
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
} else {
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned");
return;
}
Err(_) => todo!(),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
let mut seen = HashSet::new();
for (attrs, tracker) in trackers_collect.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
}
}
} else {
otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned");
}
}
}
Expand Down
Loading