Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directly implement ComputeAggregation #2425

Merged
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/lib.rs
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@
#![doc(
html_logo_url = "https://raw.githubusercontent.com/open-telemetry/opentelemetry-rust/main/assets/logo.svg"
)]
#![cfg_attr(test, deny(warnings))]
// #![cfg_attr(test, deny(warnings))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this intentional?

cijothomas marked this conversation as resolved.
Show resolved Hide resolved

pub mod export;
pub(crate) mod growable_array;
69 changes: 23 additions & 46 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
@@ -100,11 +100,7 @@ impl Default for AggregateTimeInitiator {
/// Builds aggregate functions
pub(crate) struct AggregateBuilder<T> {
/// The temporality used for the returned aggregate functions.
///
/// If this is not provided, a default of cumulative will be used (except for the
/// last-value aggregate function where delta is the only appropriate
/// temporality).
temporality: Option<Temporality>,
temporality: Temporality,

/// The attribute filter the aggregate function will use on the input of
/// measurements.
@@ -116,7 +112,7 @@ pub(crate) struct AggregateBuilder<T> {
type Filter = Arc<dyn Fn(&KeyValue) -> bool + Send + Sync>;

impl<T: Number> AggregateBuilder<T> {
pub(crate) fn new(temporality: Option<Temporality>, filter: Option<Filter>) -> Self {
pub(crate) fn new(temporality: Temporality, filter: Option<Filter>) -> Self {
AggregateBuilder {
temporality,
filter,
@@ -140,16 +136,12 @@ impl<T: Number> AggregateBuilder<T> {

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv = Arc::new(LastValue::new());
let lv = Arc::new(LastValue::new(self.temporality));
let agg_lv = Arc::clone(&lv);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_lv.delta(dest),
_ => agg_lv.cumulative(dest),
},
agg_lv,
)
}

@@ -158,31 +150,23 @@ impl<T: Number> AggregateBuilder<T> {
&self,
monotonic: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(PrecomputedSum::new(monotonic));
let s = Arc::new(PrecomputedSum::new(self.temporality, monotonic));
let agg_sum = Arc::clone(&s);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
},
agg_sum,
)
}

/// Builds a sum aggregate function input and output.
pub(crate) fn sum(&self, monotonic: bool) -> (impl Measure<T>, impl ComputeAggregation) {
let s = Arc::new(Sum::new(monotonic));
let s = Arc::new(Sum::new(self.temporality, monotonic));
let agg_sum = Arc::clone(&s);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| s.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_sum.delta(dest),
_ => agg_sum.cumulative(dest),
},
agg_sum,
)
}

@@ -193,17 +177,15 @@ impl<T: Number> AggregateBuilder<T> {
record_min_max: bool,
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(Histogram::new(boundaries, record_min_max, record_sum));
let h = Arc::new(Histogram::new(
self.temporality,
boundaries,
record_min_max,
record_sum,
));
let agg_h = Arc::clone(&h);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
},
)
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
}

/// Builds an exponential histogram aggregate function input and output.
@@ -215,21 +197,15 @@ impl<T: Number> AggregateBuilder<T> {
record_sum: bool,
) -> (impl Measure<T>, impl ComputeAggregation) {
let h = Arc::new(ExpoHistogram::new(
self.temporality,
max_size,
max_scale,
record_min_max,
record_sum,
));
let agg_h = Arc::clone(&h);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| h.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_h.delta(dest),
_ => agg_h.cumulative(dest),
},
)
(self.filter(move |n, a: &[KeyValue]| h.measure(n, a)), agg_h)
}
}

@@ -245,7 +221,8 @@ mod tests {

#[test]
fn last_value_aggregation() {
let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
let (measure, agg) =
AggregateBuilder::<u64>::new(Temporality::Cumulative, None).last_value();
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
@@ -271,7 +248,7 @@ mod tests {
fn precomputed_sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) =
AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
AggregateBuilder::<u64>::new(temporality, None).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
@@ -312,7 +289,7 @@ mod tests {
#[test]
fn sum_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None).sum(true);
let mut a = Sum {
data_points: vec![
SumDataPoint {
@@ -353,7 +330,7 @@ mod tests {
#[test]
fn explicit_bucket_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
.explicit_bucket_histogram(vec![1.0], true, true);
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
@@ -396,7 +373,7 @@ mod tests {
#[test]
fn exponential_histogram_aggregation() {
for temporality in [Temporality::Delta, Temporality::Cumulative] {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None)
let (measure, agg) = AggregateBuilder::<u64>::new(temporality, None)
.exponential_bucket_histogram(4, 20, true, true);
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
45 changes: 31 additions & 14 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{f64::consts::LOG2_E, mem::replace, ops::DerefMut, sync::Mutex};
use std::{
f64::consts::LOG2_E,
mem::replace,
ops::DerefMut,
sync::{Arc, Mutex},
};

use opentelemetry::{otel_debug, KeyValue};
use std::sync::OnceLock;
@@ -8,7 +13,7 @@ use crate::metrics::{
Temporality,
};

use super::{aggregate::AggregateTimeInitiator, Aggregator, Number, ValueMap};
use super::{aggregate::AggregateTimeInitiator, Aggregator, ComputeAggregation, Number, ValueMap};

pub(crate) const EXPO_MAX_SCALE: i8 = 20;
pub(crate) const EXPO_MIN_SCALE: i8 = -10;
@@ -351,13 +356,15 @@ struct BucketConfig {
pub(crate) struct ExpoHistogram<T: Number> {
value_map: ValueMap<Mutex<ExpoHistogramDataPoint<T>>>,
init_time: AggregateTimeInitiator,
temporality: Temporality,
record_sum: bool,
record_min_max: bool,
}

impl<T: Number> ExpoHistogram<T> {
/// Create a new exponential histogram.
pub(crate) fn new(
temporality: Temporality,
max_size: u32,
max_scale: i8,
record_min_max: bool,
@@ -368,9 +375,10 @@ impl<T: Number> ExpoHistogram<T> {
max_size: max_size as i32,
max_scale,
}),
init_time: AggregateTimeInitiator::default(),
temporality,
record_sum,
record_min_max,
init_time: AggregateTimeInitiator::default(),
}
}

@@ -385,10 +393,7 @@ impl<T: Number> ExpoHistogram<T> {
self.value_map.measure(value, attrs);
}

pub(crate) fn delta(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
fn delta(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
let time = self.init_time.delta();

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
@@ -442,7 +447,7 @@ impl<T: Number> ExpoHistogram<T> {
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
}

pub(crate) fn cumulative(
fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
@@ -500,6 +505,18 @@ impl<T: Number> ExpoHistogram<T> {
}
}

impl<T> ComputeAggregation for Arc<ExpoHistogram<T>>
where
T: Number,
{
fn call(&self, dest: Option<&mut dyn Aggregation>) -> (usize, Option<Box<dyn Aggregation>>) {
match self.temporality {
Temporality::Delta => self.delta(dest),
_ => self.cumulative(dest),
}
}
}

#[cfg(test)]
mod tests {
use std::{ops::Neg, time::SystemTime};
@@ -665,7 +682,7 @@ mod tests {
];

for test in test_cases {
let h = ExpoHistogram::new(4, 20, true, true);
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
for v in test.values {
h.measure(v, &[]);
}
@@ -714,7 +731,7 @@ mod tests {
];

for test in test_cases {
let h = ExpoHistogram::new(4, 20, true, true);
let h = ExpoHistogram::new(Temporality::Cumulative, 4, 20, true, true);
for v in test.values {
h.measure(v, &[]);
}
@@ -1241,7 +1258,7 @@ mod tests {
name: "Delta Single",
build: Box::new(move || {
box_val(
AggregateBuilder::new(Some(Temporality::Delta), None)
AggregateBuilder::new(Temporality::Delta, None)
.exponential_bucket_histogram(
max_size,
max_scale,
@@ -1284,7 +1301,7 @@ mod tests {
name: "Cumulative Single",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
@@ -1327,7 +1344,7 @@ mod tests {
name: "Delta Multiple",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Delta), None)
internal::AggregateBuilder::new(Temporality::Delta, None)
.exponential_bucket_histogram(
max_size,
max_scale,
@@ -1373,7 +1390,7 @@ mod tests {
name: "Cumulative Multiple ",
build: Box::new(move || {
box_val(
internal::AggregateBuilder::new(Some(Temporality::Cumulative), None)
internal::AggregateBuilder::new(Temporality::Cumulative, None)
.exponential_bucket_histogram(
max_size,
max_scale,
Loading