diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 9bc95b43b5..cd99979923 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -12,11 +12,11 @@ use super::{ precomputed_sum::PrecomputedSum, sum::Sum, Number, }; -const STREAM_CARDINALITY_LIMIT: u32 = 2000; +pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000; /// Checks whether aggregator has hit cardinality limit for metric streams pub(crate) fn is_under_cardinality_limit(size: usize) -> bool { - size < STREAM_CARDINALITY_LIMIT as usize + size < STREAM_CARDINALITY_LIMIT } /// Receives measurements to be aggregated. diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 4eaea7972c..49544948bb 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -7,12 +7,12 @@ mod sum; use core::fmt; use std::collections::{HashMap, HashSet}; -use std::mem::take; +use std::mem::swap; use std::ops::{Add, AddAssign, DerefMut, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; -use aggregate::is_under_cardinality_limit; +use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT}; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; use once_cell::sync::Lazy; @@ -51,6 +51,11 @@ where { /// Trackers store the values associated with different attribute sets. trackers: RwLock, Arc>>, + + /// Used by collect exclusively. The data type must match the one used in + /// `trackers` to allow mem::swap. + trackers_for_collect: RwLock, Arc>>, + /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. @@ -67,7 +72,10 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), + trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)), + // TODO: For cumulative, this is not required, so avoid this + // pre-allocation. + trackers_for_collect: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), @@ -170,19 +178,23 @@ where )); } - let trackers = match self.trackers.write() { - Ok(mut trackers) => { + if let Ok(mut trackers_collect) = self.trackers_for_collect.write() { + if let Ok(mut trackers_current) = self.trackers.write() { + swap(trackers_collect.deref_mut(), trackers_current.deref_mut()); self.count.store(0, Ordering::SeqCst); - take(trackers.deref_mut()) + } else { + otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned"); + return; } - Err(_) => todo!(), - }; - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.into_iter() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers_collect.drain() { + if seen.insert(Arc::as_ptr(&tracker)) { + dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + } } + } else { + otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned"); } } }