From f2e9df28dd26134ae6ff278cb5a03de38b6827e3 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Tue, 13 Aug 2024 12:54:56 -0700 Subject: [PATCH] Fix metrics aggregation bug for Sum and Precomputed Sum to avoid duplicate export (#2018) Co-authored-by: Cijo Thomas --- opentelemetry-sdk/src/metrics/internal/sum.rs | 58 ++-- opentelemetry-sdk/src/metrics/mod.rs | 260 +++++++++--------- 2 files changed, 169 insertions(+), 149 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 36108c86dc..b62c7cc108 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -165,14 +165,17 @@ impl> Sum { // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. + let mut seen = HashSet::new(); for (attrs, tracker) in trackers.iter() { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); + if seen.insert(Arc::as_ptr(tracker)) { + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: tracker.get_value(), + exemplars: vec![], + }); + } } ( @@ -263,17 +266,20 @@ impl> PrecomputedSum { Err(_) => return (0, None), }; + let mut seen = HashSet::new(); for (attrs, tracker) in trackers.drain() { - let value = tracker.get_value(); - let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); - new_reported.insert(attrs.clone(), value); - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: delta, - exemplars: vec![], - }); + if seen.insert(Arc::as_ptr(&tracker)) { + let value = tracker.get_value(); + let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); + new_reported.insert(attrs.clone(), value); + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: delta, + exemplars: vec![], + }); + } } // The delta collection cycle resets. @@ -340,14 +346,18 @@ impl> PrecomputedSum { Ok(v) => v, Err(_) => return (0, None), }; + + let mut seen = HashSet::new(); for (attrs, tracker) in trackers.iter() { - s_data.data_points.push(DataPoint { - attributes: attrs.clone(), - start_time: Some(prev_start), - time: Some(t), - value: tracker.get_value(), - exemplars: vec![], - }); + if seen.insert(Arc::as_ptr(tracker)) { + s_data.data_points.push(DataPoint { + attributes: attrs.clone(), + start_time: Some(prev_start), + time: Some(t), + value: tracker.get_value(), + exemplars: vec![], + }); + } } ( diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 7bd5b3fef8..c784641dec 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -835,72 +835,77 @@ mod tests { // Run this test with stdout enabled to see output. // cargo test counter_aggregation_attribute_order_sorted_first --features=testing -- --nocapture - // Arrange - let mut test_context = TestContext::new(Temporality::Delta); - let counter = test_context.u64_counter("test", "my_counter", None); - - // Act - // Add the same set of attributes in different order. (they are expected - // to be treated as same attributes) - // start with sorted order - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - ], - ); - test_context.flush_metrics(); + counter_aggregation_attribute_order_sorted_first_helper(Temporality::Delta); + counter_aggregation_attribute_order_sorted_first_helper(Temporality::Cumulative); + + fn counter_aggregation_attribute_order_sorted_first_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let counter = test_context.u64_counter("test", "my_counter", None); + + // Act + // Add the same set of attributes in different order. (they are expected + // to be treated as same attributes) + // start with sorted order + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + ], + ); + test_context.flush_metrics(); - let sum = test_context.get_aggregation::>("my_counter", None); + let sum = test_context.get_aggregation::>("my_counter", None); - // Expecting 1 time-series. - assert_eq!(sum.data_points.len(), 1); + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); - // validate the sole datapoint - let data_point1 = &sum.data_points[0]; - assert_eq!(data_point1.value, 6); + // validate the sole datapoint + let data_point1 = &sum.data_points[0]; + assert_eq!(data_point1.value, 6); + } } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -908,72 +913,77 @@ mod tests { // Run this test with stdout enabled to see output. // cargo test counter_aggregation_attribute_order_unsorted_first --features=testing -- --nocapture - // Arrange - let mut test_context = TestContext::new(Temporality::Delta); - let counter = test_context.u64_counter("test", "my_counter", None); + counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Delta); + counter_aggregation_attribute_order_unsorted_first_helper(Temporality::Cumulative); - // Act - // Add the same set of attributes in different order. (they are expected - // to be treated as same attributes) - // start with unsorted order - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - KeyValue::new("C", "c"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("B", "b"), - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("B", "b"), - KeyValue::new("A", "a"), - ], - ); - counter.add( - 1, - &[ - KeyValue::new("C", "c"), - KeyValue::new("A", "a"), - KeyValue::new("B", "b"), - ], - ); - test_context.flush_metrics(); + fn counter_aggregation_attribute_order_unsorted_first_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let counter = test_context.u64_counter("test", "my_counter", None); - let sum = test_context.get_aggregation::>("my_counter", None); + // Act + // Add the same set of attributes in different order. (they are expected + // to be treated as same attributes) + // start with unsorted order + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + KeyValue::new("C", "c"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("B", "b"), + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("B", "b"), + KeyValue::new("A", "a"), + ], + ); + counter.add( + 1, + &[ + KeyValue::new("C", "c"), + KeyValue::new("A", "a"), + KeyValue::new("B", "b"), + ], + ); + test_context.flush_metrics(); - // Expecting 1 time-series. - assert_eq!(sum.data_points.len(), 1); + let sum = test_context.get_aggregation::>("my_counter", None); - // validate the sole datapoint - let data_point1 = &sum.data_points[0]; - assert_eq!(data_point1.value, 6); + // Expecting 1 time-series. + assert_eq!(sum.data_points.len(), 1); + + // validate the sole datapoint + let data_point1 = &sum.data_points[0]; + assert_eq!(data_point1.value, 6); + } } #[tokio::test(flavor = "multi_thread", worker_threads = 1)]