diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index aa366cdc4b..dbd4f3aa76 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -1,6 +1,6 @@ use std::{ collections::HashSet, - sync::{atomic::Ordering, Arc}, + sync::{atomic::Ordering, Arc, Mutex}, time::SystemTime, }; @@ -12,12 +12,14 @@ use super::{Assign, AtomicTracker, Number, ValueMap}; /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue> { value_map: ValueMap, + start: Mutex, } impl> LastValue { pub(crate) fn new() -> Self { LastValue { value_map: ValueMap::new(), + start: Mutex::new(SystemTime::now()), } } @@ -27,6 +29,7 @@ impl> LastValue { pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { let t = SystemTime::now(); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); dest.clear(); // Max number of data points need to account for the special casing @@ -43,7 +46,7 @@ impl> LastValue { { dest.push(DataPoint { attributes: vec![], - start_time: None, + start_time: Some(prev_start), time: Some(t), value: self.value_map.no_attribute_tracker.get_and_reset_value(), exemplars: vec![], @@ -60,17 +63,25 @@ impl> LastValue { if seen.insert(Arc::as_ptr(&tracker)) { dest.push(DataPoint { attributes: attrs.clone(), - start_time: None, + start_time: Some(prev_start), time: Some(t), value: tracker.get_value(), exemplars: vec![], }); } } + + // The delta collection cycle resets. + if let Ok(mut start) = self.start.lock() { + *start = t; + } + self.value_map.count.store(0, Ordering::SeqCst); } pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec>) { let t = SystemTime::now(); + let prev_start = self.start.lock().map(|start| *start).unwrap_or(t); + dest.clear(); // Max number of data points need to account for the special casing @@ -87,7 +98,7 @@ impl> LastValue { { dest.push(DataPoint { attributes: vec![], - start_time: None, + start_time: Some(prev_start), time: Some(t), value: self.value_map.no_attribute_tracker.get_value(), exemplars: vec![], @@ -104,7 +115,7 @@ impl> LastValue { if seen.insert(Arc::as_ptr(tracker)) { dest.push(DataPoint { attributes: attrs.clone(), - start_time: None, + start_time: Some(prev_start), time: Some(t), value: tracker.get_value(), exemplars: vec![],