Skip to content

Commit

Permalink
add collect methods on ValueMap
Browse files Browse the repository at this point in the history
  • Loading branch information
fraillt committed Nov 2, 2024
1 parent 3742953 commit 60e17bf
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 440 deletions.
188 changes: 40 additions & 148 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::mem::replace;
use std::ops::DerefMut;
use std::{sync::Mutex, time::SystemTime};

use crate::metrics::data::HistogramDataPoint;
Expand All @@ -14,7 +13,7 @@ struct HistogramTracker<T> {
buckets: Mutex<Buckets<T>>,
}

impl<T> Aggregator<T> for HistogramTracker<T>
impl<T> Aggregator for HistogramTracker<T>
where
T: Number,
{
Expand All @@ -37,6 +36,14 @@ where
buckets: Mutex::new(Buckets::<T>::new(*count)),
}
}

fn clone_and_reset(&self, count: &usize) -> Self {
let mut current = self.buckets.lock().unwrap_or_else(|err| err.into_inner());
let cloned = replace(current.deref_mut(), Buckets::new(*count));
Self {
buckets: Mutex::new(cloned),
}
}
}

#[derive(Default)]
Expand Down Expand Up @@ -73,22 +80,12 @@ impl<T: Number> Buckets<T> {
self.max = value
}
}

fn reset(&mut self) {
for item in &mut self.counts {
*item = 0;
}
self.count = Default::default();
self.total = Default::default();
self.min = T::max();
self.max = T::min();
}
}

/// Summarizes a set of measurements as a histogram with explicitly defined
/// buckets.
pub(crate) struct Histogram<T: Number> {
value_map: ValueMap<T, HistogramTracker<T>>,
value_map: ValueMap<HistogramTracker<T>>,
bounds: Vec<f64>,
record_min_max: bool,
record_sum: bool,
Expand Down Expand Up @@ -139,11 +136,6 @@ impl<T: Number> Histogram<T> {
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
Expand All @@ -155,24 +147,22 @@ impl<T: Number> Histogram<T> {
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;
h.data_points.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > h.data_points.capacity() {
h.data_points.reserve_exact(n - h.data_points.capacity());
}

if self
.value_map
.has_no_attribute_value
.swap(false, Ordering::AcqRel)
{
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
let b = aggr
.buckets
.into_inner()
.unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
Expand All @@ -193,54 +183,8 @@ impl<T: Number> Histogram<T> {
None
},
exemplars: vec![],
});

b.reset();
}
}

let mut trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
if seen.insert(Arc::as_ptr(&tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
Some(b.max)
} else {
None
},
exemplars: vec![],
});
}
}
}

// The delta collection cycle resets.
if let Ok(mut start) = self.start.lock() {
*start = t;
}
self.value_map.count.store(0, Ordering::SeqCst);
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}
Expand All @@ -250,11 +194,6 @@ impl<T: Number> Histogram<T> {
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
Expand All @@ -266,24 +205,19 @@ impl<T: Number> Histogram<T> {
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;
h.data_points.clear();

// Max number of data points need to account for the special casing
// of the no attribute value + overflow attribute.
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
if n > h.data_points.capacity() {
h.data_points.reserve_exact(n - h.data_points.capacity());
}
let prev_start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

if self
.value_map
.has_no_attribute_value
.load(Ordering::Acquire)
{
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: vec![],
start_time: start,
self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
let b = aggr.buckets.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
Expand All @@ -304,50 +238,8 @@ impl<T: Number> Histogram<T> {
None
},
exemplars: vec![],
});
}
}

let trackers = match self.value_map.trackers.write() {
Ok(v) => v,
Err(_) => return (0, None),
};

// TODO: This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
if let Ok(b) = tracker.buckets.lock() {
h.data_points.push(HistogramDataPoint {
attributes: attrs.clone(),
start_time: start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
sum: if self.record_sum {
b.total
} else {
T::default()
},
min: if self.record_min_max {
Some(b.min)
} else {
None
},
max: if self.record_min_max {
Some(b.max)
} else {
None
},
exemplars: vec![],
});
}
}
}
});

(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}
Expand Down
Loading

0 comments on commit 60e17bf

Please sign in to comment.