Skip to content

Commit

Permalink
Fix metrics aggregation bug for Sum and Precomputed Sum to avoid dupl…
Browse files Browse the repository at this point in the history
…icate export (#2018)

Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
utpilla and cijothomas authored Aug 13, 2024
1 parent 07b918d commit f2e9df2
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 149 deletions.
58 changes: 34 additions & 24 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,17 @@ impl<T: Number<T>> Sum<T> {
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
exemplars: vec![],
});
if seen.insert(Arc::as_ptr(tracker)) {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
exemplars: vec![],
});
}
}

(
Expand Down Expand Up @@ -263,17 +266,20 @@ impl<T: Number<T>> PrecomputedSum<T> {
Err(_) => return (0, None),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.drain() {
let value = tracker.get_value();
let delta = value - *reported.get(&attrs).unwrap_or(&T::default());
new_reported.insert(attrs.clone(), value);
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
if seen.insert(Arc::as_ptr(&tracker)) {
let value = tracker.get_value();
let delta = value - *reported.get(&attrs).unwrap_or(&T::default());
new_reported.insert(attrs.clone(), value);
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: delta,
exemplars: vec![],
});
}
}

// The delta collection cycle resets.
Expand Down Expand Up @@ -340,14 +346,18 @@ impl<T: Number<T>> PrecomputedSum<T> {
Ok(v) => v,
Err(_) => return (0, None),
};

let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
exemplars: vec![],
});
if seen.insert(Arc::as_ptr(tracker)) {
s_data.data_points.push(DataPoint {
attributes: attrs.clone(),
start_time: Some(prev_start),
time: Some(t),
value: tracker.get_value(),
exemplars: vec![],
});
}
}

(
Expand Down
260 changes: 135 additions & 125 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,145 +835,155 @@ mod tests {
// Run this test with stdout enabled to see output.
// cargo test counter_aggregation_attribute_order_sorted_first --features=testing -- --nocapture

// Arrange
let mut test_context = TestContext::new(Temporality::Delta);
let counter = test_context.u64_counter("test", "my_counter", None);

// Act
// Add the same set of attributes in different order. (they are expected
// to be treated as same attributes)
// start with sorted order
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
],
);
test_context.flush_metrics();
counter_aggregation_attribute_order_sorted_first_helper(Temporality::Delta);
counter_aggregation_attribute_order_sorted_first_helper(Temporality::Cumulative);

fn counter_aggregation_attribute_order_sorted_first_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
let counter = test_context.u64_counter("test", "my_counter", None);

// Act
// Add the same set of attributes in different order. (they are expected
// to be treated as same attributes)
// start with sorted order
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
],
);
test_context.flush_metrics();

let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);

// Expecting 1 time-series.
assert_eq!(sum.data_points.len(), 1);
// Expecting 1 time-series.
assert_eq!(sum.data_points.len(), 1);

// validate the sole datapoint
let data_point1 = &sum.data_points[0];
assert_eq!(data_point1.value, 6);
// validate the sole datapoint
let data_point1 = &sum.data_points[0];
assert_eq!(data_point1.value, 6);
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_aggregation_attribute_order_unsorted_first() {
// Run this test with stdout enabled to see output.
// cargo test counter_aggregation_attribute_order_unsorted_first --features=testing -- --nocapture

// Arrange
let mut test_context = TestContext::new(Temporality::Delta);
let counter = test_context.u64_counter("test", "my_counter", None);
counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Delta);
counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Cumulative);

// Act
// Add the same set of attributes in different order. (they are expected
// to be treated as same attributes)
// start with unsorted order
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
],
);
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
],
);
test_context.flush_metrics();
fn counter_aggregation_attribute_order_unsorted_first_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
let counter = test_context.u64_counter("test", "my_counter", None);

let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
// Act
// Add the same set of attributes in different order. (they are expected
// to be treated as same attributes)
// start with unsorted order
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
],
);
counter.add(
1,
&[
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
KeyValue::new("C", "c"),
],
);
counter.add(
1,
&[
KeyValue::new("B", "b"),
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("B", "b"),
KeyValue::new("A", "a"),
],
);
counter.add(
1,
&[
KeyValue::new("C", "c"),
KeyValue::new("A", "a"),
KeyValue::new("B", "b"),
],
);
test_context.flush_metrics();

// Expecting 1 time-series.
assert_eq!(sum.data_points.len(), 1);
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);

// validate the sole datapoint
let data_point1 = &sum.data_points[0];
assert_eq!(data_point1.value, 6);
// Expecting 1 time-series.
assert_eq!(sum.data_points.len(), 1);

// validate the sole datapoint
let data_point1 = &sum.data_points[0];
assert_eq!(data_point1.value, 6);
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down

0 comments on commit f2e9df2

Please sign in to comment.