Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directly implement ComputeAggregation #2425

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 23 additions & 46 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ impl Default for AggregateTimeInitiator {
/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
///
/// If this is not provided, a default of cumulative will be used (except for the
/// last-value aggregate function where delta is the only appropriate
/// temporality).
temporality: Option<Temporality>,
temporality: Temporality,

/// The attribute filter the aggregate function will use on the input of
/// measurements.
Expand All @@ -116,7 +112,7 @@ pub(crate) struct AggregateBuilder<T> {
type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;

impl<T: Number> AggregateBuilder<T> {
pub(crate) fn new(temporality: Option<Temporality>, filter: Option<Filter>) -> Self {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
AggregateBuilder {
temporality,
filter,
Expand All @@ -140,16 +136,12 @@ impl<T: Number> AggregateBuilder<T> {

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv = Arc::new(LastValue::new());
let lv = Arc::new(LastValue::new(self.temporality));
let agg_lv = Arc::clone(&lv);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_lv.delta(dest),
_ => agg_lv.cumulative(dest),
},
agg_lv,
)
}

Expand All @@ -158,31 +150,23 @@ impl<T: Number> AggregateBuilder<T> {
&self,
monotonic: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(PrecomputedSum::new(monotonic));
let s = Arc::new(PrecomputedSum::new(self.temporality, monotonic));
let agg_sum = Arc::clone(&s);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
},
agg_sum,
)
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(monotonic));
let s = Arc::new(Sum::new(self.temporality, monotonic));
let agg_sum = Arc::clone(&s);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
},
agg_sum,
)
}

Expand All @@ -193,17 +177,15 @@ impl<T: Number> AggregateBuilder<T> {
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
let h = Arc::new(Histogram::new(
self.temporality,
boundaries,
record_min_max,
record_sum,
));
let agg_h = Arc::clone(&h);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
},
)
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
}

/// Builds an exponential histogram aggregate function input and output.
Expand All @@ -215,21 +197,15 @@ impl<T: Number> AggregateBuilder<T> {
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(ExpoHistogram::new(
self.temporality,
max_size,
max_scale,
record_min_max,
record_sum,
));
let agg_h = Arc::clone(&h);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
},
)
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
}
}

Expand All @@ -245,7 +221,8 @@ mod tests {

#[test]
fn last_value_aggregation() {
let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
let (measure, agg) =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
Expand All @@ -271,7 +248,7 @@ mod tests {
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) =
AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -312,7 +289,7 @@ mod tests {
#[test]
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
Expand Down Expand Up @@ -353,7 +330,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
Expand Down Expand Up @@ -396,7 +373,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
Expand Down
45 changes: 31 additions & 14 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};
use std::{
f64::consts::LOG2_E,
mem::replace,
ops::DerefMut,
sync::{Arc, Mutex},
};

use opentelemetry::{otel_debug, KeyValue};
use std::sync::OnceLock;
Expand All @@ -8,7 +13,7 @@ use crate::metrics::{
Temporality,
};

use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap};
use super::{aggregate::AggregateTimeInitiator, Aggregator, ComputeAggregation, Number, ValueMap};

pub(crate) const EXPO_MAX_SCALE: i8 = 20;
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
Expand Down Expand Up @@ -351,13 +356,15 @@ struct BucketConfig {
pub(crate) struct ExpoHistogram<T: Number> {
value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
init_time: AggregateTimeInitiator,
temporality: Temporality,
record_sum: bool,
record_min_max: bool,
}

impl<T: Number> ExpoHistogram<T> {
/// Create a new exponential histogram.
pub(crate) fn new(
temporality: Temporality,
max_size: u32,
max_scale: i8,
record_min_max: bool,
Expand All @@ -368,9 +375,10 @@ impl<T: Number> ExpoHistogram<T> {
max_size: max_size as i32,
max_scale,
}),
init_time: AggregateTimeInitiator::default(),
temporality,
record_sum,
record_min_max,
init_time: AggregateTimeInitiator::default(),
}
}

Expand All @@ -385,10 +393,7 @@ impl<T: Number> ExpoHistogram<T> {
self.value_map.measure(value, attrs);
}

pub(crate) fn delta(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
Expand Down Expand Up @@ -442,7 +447,7 @@ impl<T: Number> ExpoHistogram<T> {
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}

pub(crate) fn cumulative(
fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
Expand Down Expand Up @@ -500,6 +505,18 @@ impl<T: Number> ExpoHistogram<T> {
}
}

impl<T> ComputeAggregation for Arc<ExpoHistogram<T>>
where
T: Number,
{
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
match self.temporality {
Temporality::Delta => self.delta(dest),
_ => self.cumulative(dest),
}
}
}

#[cfg(test)]
mod tests {
use std::{ops::Neg, time::SystemTime};
Expand Down Expand Up @@ -665,7 +682,7 @@ mod tests {
];

for test in test_cases {
let h = ExpoHistogram::new(4, 20, true, true);
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
for v in test.values {
h.measure(v, &[]);
}
Expand Down Expand Up @@ -714,7 +731,7 @@ mod tests {
];

for test in test_cases {
let h = ExpoHistogram::new(4, 20, true, true);
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
for v in test.values {
h.measure(v, &[]);
}
Expand Down Expand Up @@ -1241,7 +1258,7 @@ mod tests {
name: "Delta Single",
build: Box::new(move || {
box_val(
AggregateBuilder::new(Some(Temporality::Delta), None)
AggregateBuilder::new(Temporality::Delta, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down Expand Up @@ -1284,7 +1301,7 @@ mod tests {
name: "Cumulative Single",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down Expand Up @@ -1327,7 +1344,7 @@ mod tests {
name: "Delta Multiple",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Delta), None)
internal::AggregateBuilder::new(Temporality::Delta, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down Expand Up @@ -1373,7 +1390,7 @@ mod tests {
name: "Cumulative Multiple ",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Expand Down
Loading
Loading