From 706a067b12dc7754e587d1ff7b59e6e04ed81fc2 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Fri, 1 Nov 2024 18:31:47 +0200 Subject: [PATCH 1/2] ValueMap interface change (#2117) Co-authored-by: Lalit Kumar Bhasin Co-authored-by: Cijo Thomas --- .../metrics/internal/exponential_histogram.rs | 1 + .../src/metrics/internal/histogram.rs | 114 ++++++++++--- .../src/metrics/internal/last_value.rs | 46 +++++- opentelemetry-sdk/src/metrics/internal/mod.rs | 156 ++++++++---------- .../src/metrics/internal/precomputed_sum.rs | 16 +- opentelemetry-sdk/src/metrics/internal/sum.rs | 47 +++++- 6 files changed, 244 insertions(+), 136 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index e85e0ece55..13f4200112 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -352,6 +352,7 @@ impl ExpoHistogram { pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) { let f_value = value.into_float(); // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? if f_value.is_infinite() || f_value.is_nan() { return; } diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 089415ba7c..4da6144c2f 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -7,23 +7,22 @@ use crate::metrics::data::HistogramDataPoint; use crate::metrics::data::{self, Aggregation, Temporality}; use opentelemetry::KeyValue; -use super::Number; -use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap}; - -struct HistogramUpdate; - -impl Operation for HistogramUpdate { - fn update_tracker>(tracker: &AT, value: T, index: usize) { - tracker.update_histogram(index, value); - } -} +use super::ValueMap; +use super::{Aggregator, Number}; struct HistogramTracker { buckets: Mutex>, } -impl AtomicTracker for HistogramTracker { - fn update_histogram(&self, index: usize, value: T) { +impl Aggregator for HistogramTracker +where + T: Number, +{ + type InitConfig = usize; + /// Value and bucket index + type PreComputedValue = (T, usize); + + fn update(&self, (value, index): (T, usize)) { let mut buckets = match self.buckets.lock() { Ok(guard) => guard, Err(_) => return, @@ -32,15 +31,10 @@ impl AtomicTracker for HistogramTracker { buckets.bin(index, value); buckets.sum(value); } -} - -impl AtomicallyUpdate for HistogramTracker { - type AtomicTracker = HistogramTracker; - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker { - let count = buckets_count.unwrap(); + fn create(count: &usize) -> Self { HistogramTracker { - buckets: Mutex::new(Buckets::::new(count)), + buckets: Mutex::new(Buckets::::new(*count)), } } } @@ -94,7 +88,7 @@ impl Buckets { /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { - value_map: ValueMap, T, HistogramUpdate>, + value_map: ValueMap>, bounds: Vec, record_min_max: bool, record_sum: bool, @@ -103,9 +97,11 @@ pub(crate) struct Histogram { impl Histogram { pub(crate) fn new(boundaries: Vec, record_min_max: bool, record_sum: bool) -> Self { + // TODO fix the bug, by first removing NaN and only then getting buckets_count + // once we know the reason for performance degradation let buckets_count = boundaries.len() + 1; let mut histogram = Histogram { - value_map: ValueMap::new_with_buckets_count(buckets_count), + value_map: ValueMap::new(buckets_count), bounds: boundaries, record_min_max, record_sum, @@ -122,14 +118,20 @@ impl Histogram { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { let f = measurement.into_float(); - + // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? + // TODO: uncomment once we know the reason for performance degradation + // if f.is_infinite() || f.is_nan() { + // return; + // } // This search will return an index in the range `[0, bounds.len()]`, where // it will return `bounds.len()` if value is greater than the last element // of `bounds`. This aligns with the buckets in that the length of buckets // is `bounds.len()+1`, with the last bucket representing: // `(bounds[bounds.len()-1], +∞)`. let index = self.bounds.partition_point(|&x| x < f); - self.value_map.measure(measurement, attrs, index); + + self.value_map.measure((measurement, index), attrs); } pub(crate) fn delta( @@ -350,3 +352,69 @@ impl Histogram { (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } } + +// TODO: uncomment once we know the reason for performance degradation +// #[cfg(test)] +// mod tests { + +// use super::*; + +// #[test] +// fn when_f64_is_nan_or_infinity_then_ignore() { +// struct Expected { +// min: f64, +// max: f64, +// sum: f64, +// count: u64, +// } +// impl Expected { +// fn new(min: f64, max: f64, sum: f64, count: u64) -> Self { +// Expected { +// min, +// max, +// sum, +// count, +// } +// } +// } +// struct TestCase { +// values: Vec, +// expected: Expected, +// } + +// let test_cases = vec![ +// TestCase { +// values: vec![2.0, 4.0, 1.0], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![2.0, 4.0, 1.0, f64::INFINITY], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![2.0, 4.0, 1.0, -f64::INFINITY], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![2.0, f64::NAN, 4.0, 1.0], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0], +// expected: Expected::new(1.0, 16.0, 31.0, 6), +// }, +// ]; + +// for test in test_cases { +// let h = Histogram::new(vec![], true, true); +// for v in test.values { +// h.measure(v, &[]); +// } +// let res = h.value_map.no_attribute_tracker.buckets.lock().unwrap(); +// assert_eq!(test.expected.max, res.max); +// assert_eq!(test.expected.min, res.min); +// assert_eq!(test.expected.sum, res.total); +// assert_eq!(test.expected.count, res.count); +// } +// } +// } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index d1eab4fada..e4c9433f9a 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -7,25 +7,51 @@ use std::{ use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap}; + +/// this is reused by PrecomputedSum +pub(crate) struct Assign +where + T: AtomicallyUpdate, +{ + pub(crate) value: T::AtomicTracker, +} + +impl Aggregator for Assign +where + T: Number, +{ + type InitConfig = (); + type PreComputedValue = T; + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, value: T) { + self.value.store(value) + } +} /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue { - value_map: ValueMap, + value_map: ValueMap>, start: Mutex, } impl LastValue { pub(crate) fn new() -> Self { LastValue { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), start: Mutex::new(SystemTime::now()), } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to LastValue. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { @@ -49,7 +75,11 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -66,7 +96,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -101,7 +131,7 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -118,7 +148,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 6d3b012d1d..6b5470f633 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -7,7 +7,6 @@ mod sum; use core::fmt; use std::collections::HashMap; -use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -23,79 +22,65 @@ use crate::metrics::AttributeSet; pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); -/// Abstracts the update operation for a measurement. -pub(crate) trait Operation { - fn update_tracker>(tracker: &AT, value: T, index: usize); -} - -struct Increment; +pub(crate) trait Aggregator +where + T: Number, +{ + /// A static configuration that is needed in order to initialize aggregator. + /// E.g. bucket_size at creation time . + type InitConfig; -impl Operation for Increment { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.add(value); - } -} + /// Some aggregators can do some computations before updating aggregator. + /// This helps to reduce contention for aggregators because it makes + /// [`Aggregator::update`] as short as possible. + type PreComputedValue; -struct Assign; + /// Called everytime a new attribute-set is stored. + fn create(init: &Self::InitConfig) -> Self; -impl Operation for Assign { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.store(value); - } + /// Called for each measurement. + fn update(&self, value: Self::PreComputedValue); } /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how /// updates to the underlying value trackers should be performed. -pub(crate) struct ValueMap, T: Number, O> { +pub(crate) struct ValueMap +where + T: Number, + A: Aggregator, +{ /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: RwLock, Arc>>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. has_no_attribute_value: AtomicBool, /// Tracker for values with no attributes attached. - no_attribute_tracker: AU::AtomicTracker, - /// Buckets Count is only used by Histogram. - buckets_count: Option, - phantom: PhantomData, + no_attribute_tracker: A, + /// Configuration for an Aggregator + config: A::InitConfig, } -impl, T: Number, O> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } -} - -impl, T: Number, O> ValueMap { - fn new() -> Self { - ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(None), - count: AtomicUsize::new(0), - buckets_count: None, - phantom: PhantomData, - } - } - - fn new_with_buckets_count(buckets_count: usize) -> Self { +impl ValueMap +where + T: Number, + A: Aggregator, +{ + fn new(config: A::InitConfig) -> Self { ValueMap { trackers: RwLock::new(HashMap::new()), has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(Some(buckets_count)), + no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), - buckets_count: Some(buckets_count), - phantom: PhantomData, + config, } } -} -impl, T: Number, O: Operation> ValueMap { - fn measure(&self, measurement: T, attributes: &[KeyValue], index: usize) { + fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) { if attributes.is_empty() { - O::update_tracker(&self.no_attribute_tracker, measurement, index); + self.no_attribute_tracker.update(value); self.has_no_attribute_value.store(true, Ordering::Release); return; } @@ -106,14 +91,14 @@ impl, T: Number, O: Operation> ValueMap { // Try to retrieve and update the tracker with the attributes in the provided order first if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); return; } // Try to retrieve and update the tracker with the attributes sorted. let sorted_attrs = AttributeSet::from(attributes).into_vec(); if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); return; } @@ -127,12 +112,12 @@ impl, T: Number, O: Operation> ValueMap { // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(AU::new_atomic_tracker(self.buckets_count)); - O::update_tracker(&*new_tracker, measurement, index); + let new_tracker = Arc::new(A::create(&self.config)); + new_tracker.update(value); // Insert tracker with the attributes in the provided and sorted orders trackers.insert(attributes.to_vec(), new_tracker.clone()); @@ -140,10 +125,10 @@ impl, T: Number, O: Operation> ValueMap { self.count.fetch_add(1, Ordering::SeqCst); } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - O::update_tracker(&**overflow_value, measurement, index); + overflow_value.update(value); } else { - let new_tracker = AU::new_atomic_tracker(self.buckets_count); - O::update_tracker(&new_tracker, measurement, index); + let new_tracker = A::create(&self.config); + new_tracker.update(value); trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); otel_warn!( name: "ValueMap.measure", message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged." @@ -154,22 +139,17 @@ impl, T: Number, O: Operation> ValueMap { /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms -pub(crate) trait AtomicTracker: Sync + Send + 'static { - fn store(&self, _value: T) {} - fn add(&self, _value: T) {} - fn get_value(&self) -> T { - T::default() - } - fn get_and_reset_value(&self) -> T { - T::default() - } - fn update_histogram(&self, _index: usize, _value: T) {} +pub(crate) trait AtomicTracker: Sync + Send + 'static { + fn store(&self, _value: T); + fn add(&self, _value: T); + fn get_value(&self) -> T; + fn get_and_reset_value(&self) -> T; } /// Marks a type that can have an atomic tracker generated for it -pub(crate) trait AtomicallyUpdate { +pub(crate) trait AtomicallyUpdate { type AtomicTracker: AtomicTracker; - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker; + fn new_atomic_tracker(init: T) -> Self::AtomicTracker; } pub(crate) trait Number: @@ -256,8 +236,8 @@ impl AtomicTracker for AtomicU64 { impl AtomicallyUpdate for u64 { type AtomicTracker = AtomicU64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicU64::new(0) + fn new_atomic_tracker(init: u64) -> Self::AtomicTracker { + AtomicU64::new(init) } } @@ -282,8 +262,8 @@ impl AtomicTracker for AtomicI64 { impl AtomicallyUpdate for i64 { type AtomicTracker = AtomicI64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicI64::new(0) + fn new_atomic_tracker(init: i64) -> Self::AtomicTracker { + AtomicI64::new(init) } } @@ -292,10 +272,10 @@ pub(crate) struct F64AtomicTracker { } impl F64AtomicTracker { - fn new() -> Self { - let zero_as_u64 = 0.0_f64.to_bits(); + fn new(init: f64) -> Self { + let value_as_u64 = init.to_bits(); F64AtomicTracker { - inner: AtomicU64::new(zero_as_u64), + inner: AtomicU64::new(value_as_u64), } } } @@ -344,8 +324,8 @@ impl AtomicTracker for F64AtomicTracker { impl AtomicallyUpdate for f64 { type AtomicTracker = F64AtomicTracker; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - F64AtomicTracker::new() + fn new_atomic_tracker(init: f64) -> Self::AtomicTracker { + F64AtomicTracker::new(init) } } @@ -355,7 +335,7 @@ mod tests { #[test] fn can_store_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -368,7 +348,7 @@ mod tests { #[test] fn can_add_and_get_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); atomic.add(10); @@ -378,7 +358,7 @@ mod tests { #[test] fn can_reset_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -390,7 +370,7 @@ mod tests { #[test] fn can_store_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -407,7 +387,7 @@ mod tests { #[test] fn can_add_and_get_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); atomic.add(-10); @@ -417,7 +397,7 @@ mod tests { #[test] fn can_reset_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -429,7 +409,7 @@ mod tests { #[test] fn can_store_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -446,7 +426,7 @@ mod tests { #[test] fn can_add_and_get_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.3); atomic.add(10.4); @@ -457,7 +437,7 @@ mod tests { #[test] fn can_reset_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.5); let value = atomic.get_and_reset_value(); diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 060c7baaa6..f08f70b73e 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -2,7 +2,7 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; use std::{ collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc, Mutex}, @@ -11,7 +11,7 @@ use std::{ /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, reported: Mutex, T>>, @@ -20,7 +20,7 @@ pub(crate) struct PrecomputedSum { impl PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -29,7 +29,7 @@ impl PrecomputedSum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to PrecomputedSum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -73,7 +73,7 @@ impl PrecomputedSum { .has_no_attribute_value .swap(false, Ordering::AcqRel) { - let value = self.value_map.no_attribute_tracker.get_value(); + let value = self.value_map.no_attribute_tracker.value.get_value(); let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); new_reported.insert(vec![], value); @@ -94,7 +94,7 @@ impl PrecomputedSum { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.drain() { if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.get_value(); + let value = tracker.value.get_value(); let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); new_reported.insert(attrs.clone(), value); s_data.data_points.push(DataPoint { @@ -162,7 +162,7 @@ impl PrecomputedSum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -179,7 +179,7 @@ impl PrecomputedSum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 66af75734d..17d81ca262 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -7,12 +7,37 @@ use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::KeyValue; -use super::{AtomicTracker, Number}; -use super::{Increment, ValueMap}; +use super::{Aggregator, AtomicTracker, Number}; +use super::{AtomicallyUpdate, ValueMap}; + +struct Increment +where + T: AtomicallyUpdate, +{ + value: T::AtomicTracker, +} + +impl Aggregator for Increment +where + T: Number, +{ + type InitConfig = (); + type PreComputedValue = T; + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, value: T) { + self.value.add(value) + } +} /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, } @@ -25,7 +50,7 @@ impl Sum { /// were made in. pub(crate) fn new(monotonic: bool) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), } @@ -33,7 +58,7 @@ impl Sum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to Sum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -76,7 +101,11 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -93,7 +122,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -152,7 +181,7 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -173,7 +202,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } From 9f6c725cd34fb1611ff92870db2be2fcf2c99fb6 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Fri, 1 Nov 2024 09:40:14 -0700 Subject: [PATCH 2/2] Update links to log api spec (#2265) --- README.md | 2 +- opentelemetry/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0780b84d47..27bc095c22 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ observability tools. *OpenTelemetry Rust is not introducing a new end user callable Logging API. Instead, it provides [Logs Bridge -API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/bridge-api.md), +API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/api.md), that allows one to write log appenders that can bridge existing logging libraries to the OpenTelemetry log data model. The following log appenders are available: diff --git a/opentelemetry/README.md b/opentelemetry/README.md index 102e8431d4..d96e2851f2 100644 --- a/opentelemetry/README.md +++ b/opentelemetry/README.md @@ -56,7 +56,7 @@ Here's a breakdown of its components: Allows for the attachment of metadata (baggage) to telemetry, which can be used for sharing application-specific information across service boundaries. - **[Logs Bridge - API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/bridge-api.md):** + API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/api.md):** Allows to bridge existing logging mechanisms with OpenTelemetry logging. This is **NOT** meant for end users to call, instead it is meant to enable writing bridges/appenders for existing logging mechanisms such as