From 36070fc4a1cbc1d6a119b316366c167d4a6e0ee3 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 25 Nov 2024 16:18:55 -0800 Subject: [PATCH 1/8] Preallocate and keep memory for HashMap in Metric aggregation --- .../src/metrics/internal/aggregate.rs | 2 +- opentelemetry-sdk/src/metrics/internal/mod.rs | 31 ++++++++++++++----- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 9bc95b43b5..da67241dde 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -12,7 +12,7 @@ use super::{ precomputed_sum::PrecomputedSum, sum::Sum, Number, }; -const STREAM_CARDINALITY_LIMIT: u32 = 2000; +pub(crate) const STREAM_CARDINALITY_LIMIT: u32 = 2000; /// Checks whether aggregator has hit cardinality limit for metric streams pub(crate) fn is_under_cardinality_limit(size: usize) -> bool { diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 4eaea7972c..6cd5dd8c8e 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -7,12 +7,12 @@ mod sum; use core::fmt; use std::collections::{HashMap, HashSet}; -use std::mem::take; +use std::mem::swap; use std::ops::{Add, AddAssign, DerefMut, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; -use aggregate::is_under_cardinality_limit; +use aggregate::{is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT}; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; use once_cell::sync::Lazy; @@ -51,6 +51,11 @@ where { /// Trackers store the values associated with different attribute sets. trackers: RwLock, Arc>>, + + /// Used by collect exclusively. The data type must match the one used in + /// `trackers` to allow mem::swap. + trackers_for_collect: RwLock, Arc>>, + /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. @@ -67,7 +72,8 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), + trackers: RwLock::new(HashMap::with_capacity(STREAM_CARDINALITY_LIMIT as usize)), + trackers_for_collect: RwLock::new(HashMap::with_capacity(STREAM_CARDINALITY_LIMIT as usize)), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), @@ -170,16 +176,25 @@ where )); } - let trackers = match self.trackers.write() { - Ok(mut trackers) => { + let mut trackers = if let Ok(mut trackers_guard) = self.trackers.write() { + if let Ok(mut trackers_for_collect_guard) = self.trackers_for_collect.write() { + swap( + trackers_guard.deref_mut(), + trackers_for_collect_guard.deref_mut(), + ); self.count.store(0, Ordering::SeqCst); - take(trackers.deref_mut()) + trackers_for_collect_guard + } else { + otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned"); + return; } - Err(_) => todo!(), + } else { + otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned"); + return; }; let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.into_iter() { + for (attrs, tracker) in trackers.drain() { if seen.insert(Arc::as_ptr(&tracker)) { dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); } From f32378332fceeedc0db7450c487d7fce3e0401b7 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 25 Nov 2024 16:22:15 -0800 Subject: [PATCH 2/8] one more --- opentelemetry-sdk/src/metrics/internal/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 6cd5dd8c8e..767290b640 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -72,8 +72,12 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::with_capacity(STREAM_CARDINALITY_LIMIT as usize)), - trackers_for_collect: RwLock::new(HashMap::with_capacity(STREAM_CARDINALITY_LIMIT as usize)), + trackers: RwLock::new(HashMap::with_capacity( + 1 + STREAM_CARDINALITY_LIMIT as usize, + )), + trackers_for_collect: RwLock::new(HashMap::with_capacity( + 1 + STREAM_CARDINALITY_LIMIT as usize, + )), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), From a151d582c5070eda181d3a2a6ec90579a3fb995f Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 25 Nov 2024 16:27:55 -0800 Subject: [PATCH 3/8] add todo comment --- opentelemetry-sdk/src/metrics/internal/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 767290b640..33f2fa53d5 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -75,6 +75,8 @@ where trackers: RwLock::new(HashMap::with_capacity( 1 + STREAM_CARDINALITY_LIMIT as usize, )), + // TODO: For cumulative, this is not required, so avoid this + // pre-allocation. trackers_for_collect: RwLock::new(HashMap::with_capacity( 1 + STREAM_CARDINALITY_LIMIT as usize, )), From ac07e478a70a80e7e1ba95593ddb16e1b8af209c Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 25 Nov 2024 16:38:44 -0800 Subject: [PATCH 4/8] bump sysinfo --- stress/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 90bb6e2889..9b1c1e21d5 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -50,7 +50,7 @@ rand = { version = "0.8.4", features = ["small_rng"] } tracing = { workspace = true, features = ["std"]} tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" -sysinfo = { version = "0.30.12", optional = true } +sysinfo = { version = "0.32.00", optional = true } [features] stats = ["sysinfo"] \ No newline at end of file From 883ba84852e5eed5cc256350b699874eb4647b89 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 25 Nov 2024 16:39:43 -0800 Subject: [PATCH 5/8] remove o --- stress/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 9b1c1e21d5..6bfb820227 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -50,7 +50,7 @@ rand = { version = "0.8.4", features = ["small_rng"] } tracing = { workspace = true, features = ["std"]} tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" -sysinfo = { version = "0.32.00", optional = true } +sysinfo = { version = "0.32.0", optional = true } [features] stats = ["sysinfo"] \ No newline at end of file From 167d465c76361f50d76a681cca42a34abf97dbde Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 26 Nov 2024 07:02:55 -0800 Subject: [PATCH 6/8] reverrt sysino --- stress/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stress/Cargo.toml b/stress/Cargo.toml index 6bfb820227..90bb6e2889 100644 --- a/stress/Cargo.toml +++ b/stress/Cargo.toml @@ -50,7 +50,7 @@ rand = { version = "0.8.4", features = ["small_rng"] } tracing = { workspace = true, features = ["std"]} tracing-subscriber = { workspace = true, features = ["registry", "std"] } num-format = "0.4.4" -sysinfo = { version = "0.32.0", optional = true } +sysinfo = { version = "0.30.12", optional = true } [features] stats = ["sysinfo"] \ No newline at end of file From a5f17b4825adbaff364b8bc2ca806612a4adb7d7 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 26 Nov 2024 07:20:56 -0800 Subject: [PATCH 7/8] rearrange to reduce time inside lock to minimal --- .../src/metrics/internal/aggregate.rs | 2 +- opentelemetry-sdk/src/metrics/internal/mod.rs | 35 +++++++------------ 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index da67241dde..747f47263d 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -12,7 +12,7 @@ use super::{ precomputed_sum::PrecomputedSum, sum::Sum, Number, }; -pub(crate) const STREAM_CARDINALITY_LIMIT: u32 = 2000; +pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000; /// Checks whether aggregator has hit cardinality limit for metric streams pub(crate) fn is_under_cardinality_limit(size: usize) -> bool { diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 33f2fa53d5..49544948bb 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -72,14 +72,10 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::with_capacity( - 1 + STREAM_CARDINALITY_LIMIT as usize, - )), + trackers: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)), // TODO: For cumulative, this is not required, so avoid this // pre-allocation. - trackers_for_collect: RwLock::new(HashMap::with_capacity( - 1 + STREAM_CARDINALITY_LIMIT as usize, - )), + trackers_for_collect: RwLock::new(HashMap::with_capacity(1 + STREAM_CARDINALITY_LIMIT)), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), @@ -182,28 +178,23 @@ where )); } - let mut trackers = if let Ok(mut trackers_guard) = self.trackers.write() { - if let Ok(mut trackers_for_collect_guard) = self.trackers_for_collect.write() { - swap( - trackers_guard.deref_mut(), - trackers_for_collect_guard.deref_mut(), - ); + if let Ok(mut trackers_collect) = self.trackers_for_collect.write() { + if let Ok(mut trackers_current) = self.trackers.write() { + swap(trackers_collect.deref_mut(), trackers_current.deref_mut()); self.count.store(0, Ordering::SeqCst); - trackers_for_collect_guard } else { - otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned"); + otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned"); return; } - } else { - otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers lock poisoned"); - return; - }; - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers_collect.drain() { + if seen.insert(Arc::as_ptr(&tracker)) { + dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + } } + } else { + otel_warn!(name: "MeterProvider.InternalError", message = "Metric collection failed. Report this issue in OpenTelemetry repo.", details ="ValueMap trackers for collect lock poisoned"); } } } From 7bd92ad8cb87dc23851bda718a4354fd3b833778 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 26 Nov 2024 07:23:17 -0800 Subject: [PATCH 8/8] unwanted cast removed --- opentelemetry-sdk/src/metrics/internal/aggregate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 747f47263d..cd99979923 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -16,7 +16,7 @@ pub(crate) const STREAM_CARDINALITY_LIMIT: usize = 2000; /// Checks whether aggregator has hit cardinality limit for metric streams pub(crate) fn is_under_cardinality_limit(size: usize) -> bool { - size < STREAM_CARDINALITY_LIMIT as usize + size < STREAM_CARDINALITY_LIMIT } /// Receives measurements to be aggregated.