From 60e17bf408cff0da1b52d47b3e5a44b3c872fcc0 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Thu, 24 Oct 2024 22:25:16 +0300 Subject: [PATCH] add collect methods on ValueMap --- .../src/metrics/internal/histogram.rs | 188 ++++-------------- .../src/metrics/internal/last_value.rs | 113 ++--------- opentelemetry-sdk/src/metrics/internal/mod.rs | 83 ++++++-- .../src/metrics/internal/precomputed_sum.rs | 119 +++-------- opentelemetry-sdk/src/metrics/internal/sum.rs | 117 +++-------- 5 files changed, 180 insertions(+), 440 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 4da6144c2f..4456d36645 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::mem::replace; +use std::ops::DerefMut; use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::HistogramDataPoint; @@ -14,7 +13,7 @@ struct HistogramTracker { buckets: Mutex>, } -impl Aggregator for HistogramTracker +impl Aggregator for HistogramTracker where T: Number, { @@ -37,6 +36,14 @@ where buckets: Mutex::new(Buckets::::new(*count)), } } + + fn clone_and_reset(&self, count: &usize) -> Self { + let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner()); + let cloned = replace(current.deref_mut(), Buckets::new(*count)); + Self { + buckets: Mutex::new(cloned), + } + } } #[derive(Default)] @@ -73,22 +80,12 @@ impl Buckets { self.max = value } } - - fn reset(&mut self) { - for item in &mut self.counts { - *item = 0; - } - self.count = Default::default(); - self.total = Default::default(); - self.min = T::max(); - self.max = T::min(); - } } /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { - value_map: ValueMap>, + value_map: ValueMap>, bounds: Vec, record_min_max: bool, record_sum: bool, @@ -139,11 +136,6 @@ impl Histogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -155,24 +147,22 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Delta; - h.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + + self.value_map + .collect_and_reset(&mut h.data_points, |attributes, aggr| { + let b = aggr + .buckets + .into_inner() + .unwrap_or_else(|err| err.into_inner()); + HistogramDataPoint { + attributes, + start_time: prev_start, time: t, count: b.count, bounds: self.bounds.clone(), @@ -193,54 +183,8 @@ impl Histogram { None }, exemplars: vec![], - }); - - b.reset(); - } - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); + }); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } @@ -250,11 +194,6 @@ impl Histogram { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let start = self - .start - .lock() - .map(|s| *s) - .unwrap_or_else(|_| SystemTime::now()); let h = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if h.is_none() { Some(data::Histogram { @@ -266,24 +205,19 @@ impl Histogram { }; let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none")); h.temporality = Temporality::Cumulative; - h.data_points.clear(); - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > h.data_points.capacity() { - h.data_points.reserve_exact(n - h.data_points.capacity()); - } + let prev_start = self + .start + .lock() + .map(|s| *s) + .unwrap_or_else(|_| SystemTime::now()); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: vec![], - start_time: start, + self.value_map + .collect_readonly(&mut h.data_points, |attributes, aggr| { + let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner()); + HistogramDataPoint { + attributes, + start_time: prev_start, time: t, count: b.count, bounds: self.bounds.clone(), @@ -304,50 +238,8 @@ impl Histogram { None }, exemplars: vec![], - }); - } - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - if let Ok(b) = tracker.buckets.lock() { - h.data_points.push(HistogramDataPoint { - attributes: attrs.clone(), - start_time: start, - time: t, - count: b.count, - bounds: self.bounds.clone(), - bucket_counts: b.counts.clone(), - sum: if self.record_sum { - b.total - } else { - T::default() - }, - min: if self.record_min_max { - Some(b.min) - } else { - None - }, - max: if self.record_min_max { - Some(b.max) - } else { - None - }, - exemplars: vec![], - }); } - } - } + }); (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index e4c9433f9a..05fedc1489 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashSet, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, -}; +use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; @@ -17,7 +13,7 @@ where pub(crate) value: T::AtomicTracker, } -impl Aggregator for Assign +impl Aggregator for Assign where T: Number, { @@ -33,11 +29,17 @@ where fn update(&self, value: T) { self.value.store(value) } + + fn clone_and_reset(&self, _: &()) -> Self { + Self { + value: T::new_atomic_tracker(self.value.get_and_reset_value()), + } + } } /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue { - value_map: ValueMap>, + value_map: ValueMap>, start: Mutex, } @@ -56,102 +58,31 @@ impl LastValue { pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - dest.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - dest.push(DataPoint { - attributes: vec![], + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + self.value_map + .collect_and_reset(dest, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self - .value_map - .no_attribute_tracker - .value - .get_and_reset_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); } pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec>) { let t = SystemTime::now(); let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - - dest.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > dest.capacity() { - dest.reserve_exact(n - dest.capacity()); - } - - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - dest.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(dest, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - _ => return, - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - dest.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } } } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 6b5470f633..50edda395b 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -6,8 +6,9 @@ mod precomputed_sum; mod sum; use core::fmt; -use std::collections::HashMap; -use std::ops::{Add, AddAssign, Sub}; +use std::collections::{HashMap, HashSet}; +use std::mem::take; +use std::ops::{Add, AddAssign, DerefMut, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -22,10 +23,7 @@ use crate::metrics::AttributeSet; pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); -pub(crate) trait Aggregator -where - T: Number, -{ +pub(crate) trait Aggregator { /// A static configuration that is needed in order to initialize aggregator. /// E.g. bucket_size at creation time . type InitConfig; @@ -40,16 +38,18 @@ where /// Called for each measurement. fn update(&self, value: Self::PreComputedValue); + + /// Return current value and reset this instance + fn clone_and_reset(&self, init: &Self::InitConfig) -> Self; } /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how /// updates to the underlying value trackers should be performed. -pub(crate) struct ValueMap +pub(crate) struct ValueMap where - T: Number, - A: Aggregator, + A: Aggregator, { /// Trackers store the values associated with different attribute sets. trackers: RwLock, Arc>>, @@ -63,10 +63,9 @@ where config: A::InitConfig, } -impl ValueMap +impl ValueMap where - T: Number, - A: Aggregator, + A: Aggregator, { fn new(config: A::InitConfig) -> Self { ValueMap { @@ -135,6 +134,66 @@ where ); } } + + /// Iterate through all attribute sets and populate `DataPoints` in readonly mode. + pub(crate) fn collect_readonly(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, &A) -> Res, + { + prepare_data(dest, self.count.load(Ordering::SeqCst)); + if self.has_no_attribute_value.load(Ordering::Acquire) { + dest.push(map_fn(vec![], &self.no_attribute_tracker)); + } + + let Ok(trackers) = self.trackers.read() else { + return; + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.iter() { + if seen.insert(Arc::as_ptr(tracker)) { + dest.push(map_fn(attrs.clone(), tracker)); + } + } + } + + /// Iterate through all attribute sets, populate `DataPoints` and and reset. + pub(crate) fn collect_and_reset(&self, dest: &mut Vec, mut map_fn: MapFn) + where + MapFn: FnMut(Vec, A) -> Res, + { + prepare_data(dest, self.count.load(Ordering::SeqCst)); + if self.has_no_attribute_value.swap(false, Ordering::AcqRel) { + dest.push(map_fn( + vec![], + self.no_attribute_tracker.clone_and_reset(&self.config), + )); + } + + let trackers = match self.trackers.write() { + Ok(mut trackers) => { + self.count.store(0, Ordering::SeqCst); + take(trackers.deref_mut()) + } + Err(_) => todo!(), + }; + + let mut seen = HashSet::new(); + for (attrs, tracker) in trackers.into_iter() { + if seen.insert(Arc::as_ptr(&tracker)) { + dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + } + } + } +} + +/// Clear and allocate exactly required amount of space for all attribute-sets +fn prepare_data(data: &mut Vec, list_len: usize) { + data.clear(); + let total_len = list_len + 2; // to account for no_attributes case + overflow state + if total_len > data.capacity() { + data.reserve_exact(total_len - data.capacity()); + } } /// Marks a type that can have a value added and retrieved atomically. Required since diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index f08f70b73e..7bd843f4cd 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -3,15 +3,11 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; -use std::{ - collections::{HashMap, HashSet}, - sync::{atomic::Ordering, Arc, Mutex}, - time::SystemTime, -}; +use std::{collections::HashMap, mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime}; /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { - value_map: ValueMap>, + value_map: ValueMap>, monotonic: bool, start: Mutex, reported: Mutex, T>>, @@ -37,7 +33,6 @@ impl PrecomputedSum { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { @@ -50,68 +45,34 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let mut new_reported = HashMap::with_capacity(n); + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + let mut reported = match self.reported.lock() { Ok(r) => r, Err(_) => return (0, None), }; - - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - let value = self.value_map.no_attribute_tracker.value.get_value(); - let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); - new_reported.insert(vec![], value); - - s_data.data_points.push(DataPoint { - attributes: vec![], - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.value.get_value(); - let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value); - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), + let mut new_reported = HashMap::with_capacity(reported.len()); + + self.value_map + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| { + let value = aggr.value.get_value(); + new_reported.insert(attributes.clone(), value); + let delta = value - *reported.get(&attributes).unwrap_or(&T::default()); + DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), value: delta, exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); + } + }); *reported = new_reported; drop(reported); // drop before values guard is dropped @@ -127,7 +88,6 @@ impl PrecomputedSum { dest: Option<&mut dyn Aggregation>, ) -> (usize, Option>) { let t = SystemTime::now(); - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); let s_data = dest.and_then(|d| d.as_mut().downcast_mut::>()); let mut new_agg = if s_data.is_none() { @@ -140,50 +100,19 @@ impl PrecomputedSum { None }; let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); - s_data.data_points.clear(); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } ( s_data.data_points.len(), diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 17d81ca262..40b95a5e60 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -1,6 +1,5 @@ -use std::collections::HashSet; -use std::sync::atomic::Ordering; -use std::sync::Arc; +use std::mem::replace; +use std::ops::DerefMut; use std::vec; use std::{sync::Mutex, time::SystemTime}; @@ -17,7 +16,7 @@ where value: T::AtomicTracker, } -impl Aggregator for Increment +impl Aggregator for Increment where T: Number, { @@ -33,11 +32,17 @@ where fn update(&self, value: T) { self.value.add(value) } + + fn clone_and_reset(&self, _: &()) -> Self { + Self { + value: T::new_atomic_tracker(self.value.get_and_reset_value()), + } + } } /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { - value_map: ValueMap>, + value_map: ValueMap>, monotonic: bool, start: Mutex, } @@ -80,59 +85,20 @@ impl Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Delta; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } - let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .swap(false, Ordering::AcqRel) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + let prev_start = self + .start + .lock() + .map(|mut start| replace(start.deref_mut(), t)) + .unwrap_or(t); + self.value_map + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self - .value_map - .no_attribute_tracker - .value - .get_and_reset_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let mut trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.drain() { - if seen.insert(Arc::as_ptr(&tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } - - // The delta collection cycle resets. - if let Ok(mut start) = self.start.lock() { - *start = t; - } - self.value_map.count.store(0, Ordering::SeqCst); ( s_data.data_points.len(), @@ -159,54 +125,17 @@ impl Sum { let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none")); s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; - s_data.data_points.clear(); - - // Max number of data points need to account for the special casing - // of the no attribute value + overflow attribute. - let n = self.value_map.count.load(Ordering::SeqCst) + 2; - if n > s_data.data_points.capacity() { - s_data - .data_points - .reserve_exact(n - s_data.data_points.capacity()); - } let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); - if self - .value_map - .has_no_attribute_value - .load(Ordering::Acquire) - { - s_data.data_points.push(DataPoint { - attributes: vec![], + self.value_map + .collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint { + attributes, start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.value.get_value(), + value: aggr.value.get_value(), exemplars: vec![], }); - } - - let trackers = match self.value_map.trackers.write() { - Ok(v) => v, - Err(_) => return (0, None), - }; - - // TODO: This will use an unbounded amount of memory if there - // are unbounded number of attribute sets being aggregated. Attribute - // sets that become "stale" need to be forgotten so this will not - // overload the system. - let mut seen = HashSet::new(); - for (attrs, tracker) in trackers.iter() { - if seen.insert(Arc::as_ptr(tracker)) { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.value.get_value(), - exemplars: vec![], - }); - } - } ( s_data.data_points.len(),