From c8136d9b46cd3dcc7ff9dc79a2cb6f48963dbf85 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Tue, 24 Sep 2024 10:55:40 -0700 Subject: [PATCH] Add `with_boundaries` hint API for explicit bucket histograms (#2135) Co-authored-by: Cijo Thomas --- examples/metrics-basic/src/main.rs | 4 ++ opentelemetry-sdk/src/metrics/meter.rs | 22 +++++++- opentelemetry-sdk/src/metrics/mod.rs | 59 ++++++++++++++++++++ opentelemetry-sdk/src/metrics/pipeline.rs | 24 ++++++-- opentelemetry/CHANGELOG.md | 2 + opentelemetry/src/metrics/instruments/mod.rs | 11 ++++ 6 files changed, 116 insertions(+), 6 deletions(-) diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index ecd5083d7c..91f981b9c5 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -3,6 +3,7 @@ use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; use opentelemetry_sdk::{runtime, Resource}; use std::error::Error; +use std::vec; fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { let exporter = opentelemetry_stdout::MetricsExporterBuilder::default() @@ -90,6 +91,9 @@ async fn main() -> Result<(), Box> { let histogram = meter .f64_histogram("my_histogram") .with_description("My histogram example description") + // Setting boundaries is optional. By default, the boundaries are set to + // [0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0] + .with_boundaries(vec![0.0, 5.0, 10.0, 15.0, 20.0, 25.0]) .init(); // Record measurements using the histogram instrument. diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index 6e1081c761..975297a112 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -84,6 +84,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, ) .map(|i| Counter::new(Arc::new(i))) } @@ -96,6 +97,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, ) .map(|i| Counter::new(Arc::new(i))) } @@ -111,6 +113,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, )?; if ms.is_empty() { return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new()))); @@ -138,6 +141,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, )?; if ms.is_empty() { return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new()))); @@ -164,6 +168,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, ) .map(|i| UpDownCounter::new(Arc::new(i))) } @@ -179,6 +184,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, ) .map(|i| UpDownCounter::new(Arc::new(i))) } @@ -194,6 +200,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, )?; if ms.is_empty() { return Ok(ObservableUpDownCounter::new(Arc::new( @@ -223,6 +230,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, )?; if ms.is_empty() { return Ok(ObservableUpDownCounter::new(Arc::new( @@ -249,6 +257,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, ) .map(|i| Gauge::new(Arc::new(i))) } @@ -261,6 +270,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, ) .map(|i| Gauge::new(Arc::new(i))) } @@ -273,6 +283,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, ) .map(|i| Gauge::new(Arc::new(i))) } @@ -288,6 +299,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -315,6 +327,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -342,6 +355,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + None, )?; if ms.is_empty() { return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new()))); @@ -366,6 +380,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + builder.boundaries, ) .map(|i| Histogram::new(Arc::new(i))) } @@ -378,6 +393,7 @@ impl InstrumentProvider for SdkMeter { builder.name, builder.description, builder.unit, + builder.boundaries, ) .map(|i| Histogram::new(Arc::new(i))) } @@ -479,8 +495,9 @@ where name: Cow<'static, str>, description: Option>, unit: Option>, + boundaries: Option>, ) -> Result> { - let aggregators = self.measures(kind, name, description, unit)?; + let aggregators = self.measures(kind, name, description, unit, boundaries)?; Ok(ResolvedMeasures { measures: aggregators, }) @@ -492,6 +509,7 @@ where name: Cow<'static, str>, description: Option>, unit: Option>, + boundaries: Option>, ) -> Result>>> { let inst = Instrument { name, @@ -501,7 +519,7 @@ where scope: self.meter.scope.clone(), }; - self.resolve.measures(inst) + self.resolve.measures(inst, boundaries) } } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index cf6e3fb928..cd6cc84643 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -259,6 +259,14 @@ mod tests { histogram_aggregation_helper(Temporality::Delta); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn histogram_aggregation_with_custom_bounds() { + // Run this test with stdout enabled to see output. + // cargo test histogram_aggregation_with_custom_bounds --features=testing -- --nocapture + histogram_aggregation_with_custom_bounds_helper(Temporality::Delta); + histogram_aggregation_with_custom_bounds_helper(Temporality::Cumulative); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn updown_counter_aggregation_cumulative() { // Run this test with stdout enabled to see output. @@ -1790,6 +1798,57 @@ mod tests { } } + fn histogram_aggregation_with_custom_bounds_helper(temporality: Temporality) { + let mut test_context = TestContext::new(temporality); + let histogram = test_context + .meter() + .u64_histogram("test_histogram") + .with_boundaries(vec![1.0, 2.5, 5.5]) + .init(); + histogram.record(1, &[KeyValue::new("key1", "value1")]); + histogram.record(2, &[KeyValue::new("key1", "value1")]); + histogram.record(3, &[KeyValue::new("key1", "value1")]); + histogram.record(4, &[KeyValue::new("key1", "value1")]); + histogram.record(5, &[KeyValue::new("key1", "value1")]); + + test_context.flush_metrics(); + + // Assert + let histogram_data = + test_context.get_aggregation::>("test_histogram", None); + // Expecting 2 time-series. + assert_eq!(histogram_data.data_points.len(), 1); + if let Temporality::Cumulative = temporality { + assert_eq!( + histogram_data.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!( + histogram_data.temporality, + Temporality::Delta, + "Should produce delta" + ); + } + + // find and validate key1=value1 datapoint + let data_point = + find_histogram_datapoint_with_key_value(&histogram_data.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); + + assert_eq!(data_point.count, 5); + assert_eq!(data_point.sum, 15); + + // Check the bucket counts + // -∞ to 1.0: 1 + // 1.0 to 2.5: 1 + // 2.5 to 5.5: 3 + // 5.5 to +∞: 0 + + assert_eq!(vec![1.0, 2.5, 5.5], data_point.bounds); + assert_eq!(vec![1, 1, 3, 0], data_point.bucket_counts); + } fn gauge_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality); diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index c4638eb28c..e3b2b43729 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -244,7 +244,11 @@ where /// /// If an instrument is determined to use a [aggregation::Aggregation::Drop], /// that instrument is not inserted nor returned. - fn instrument(&self, inst: Instrument) -> Result>>> { + fn instrument( + &self, + inst: Instrument, + boundaries: Option<&[f64]>, + ) -> Result>>> { let mut matched = false; let mut measures = vec![]; let mut errs = vec![]; @@ -288,7 +292,7 @@ where } // Apply implicit default view if no explicit matched. - let stream = Stream { + let mut stream = Stream { name: inst.name, description: inst.description, unit: inst.unit, @@ -296,6 +300,14 @@ where allowed_attribute_keys: None, }; + // Override default histogram boundaries if provided. + if let Some(boundaries) = boundaries { + stream.aggregation = Some(Aggregation::ExplicitBucketHistogram { + boundaries: boundaries.to_vec(), + record_min_max: true, + }); + } + match self.cached_aggregator(&inst.scope, kind, stream) { Ok(agg) => { if errs.is_empty() { @@ -682,11 +694,15 @@ where } /// The measures that must be updated by the instrument defined by key. - pub(crate) fn measures(&self, id: Instrument) -> Result>>> { + pub(crate) fn measures( + &self, + id: Instrument, + boundaries: Option>, + ) -> Result>>> { let (mut measures, mut errs) = (vec![], vec![]); for inserter in &self.inserters { - match inserter.instrument(id.clone()) { + match inserter.instrument(id.clone(), boundaries.as_deref()) { Ok(ms) => measures.extend(ms), Err(err) => errs.push(err), } diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 32a512ba7d..c3421f3a83 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -9,6 +9,8 @@ - **Modified**: `MeterProvider.meter()` and `MeterProvider.versioned_meter()` argument types have been updated to `&'static str` instead of `impl Into>>` [#2112](https://github.com/open-telemetry/opentelemetry-rust/pull/2112). These APIs were modified to enforce the Meter `name`, `version`, and `schema_url` to be `&'static str`. +- Added `with_boundaries` API to allow users to provide custom bounds for Histogram instruments. [#2135](https://github.com/open-telemetry/opentelemetry-rust/pull/2135) + ## v0.25.0 - **BREAKING** [#1993](https://github.com/open-telemetry/opentelemetry-rust/pull/1993) Box complex types in AnyValue enum diff --git a/opentelemetry/src/metrics/instruments/mod.rs b/opentelemetry/src/metrics/instruments/mod.rs index cf4bab2d1f..c9319d6615 100644 --- a/opentelemetry/src/metrics/instruments/mod.rs +++ b/opentelemetry/src/metrics/instruments/mod.rs @@ -39,6 +39,9 @@ pub struct HistogramBuilder<'a, T> { /// Unit of the Histogram. pub unit: Option>, + /// Bucket boundaries for the histogram. + pub boundaries: Option>, + // boundaries: Vec, _marker: marker::PhantomData, } @@ -51,6 +54,7 @@ impl<'a, T> HistogramBuilder<'a, T> { name, description: None, unit: None, + boundaries: None, _marker: marker::PhantomData, } } @@ -72,6 +76,12 @@ impl<'a, T> HistogramBuilder<'a, T> { self.unit = Some(unit.into()); self } + + /// Set the boundaries for this histogram. + pub fn with_boundaries(mut self, boundaries: Vec) -> Self { + self.boundaries = Some(boundaries); + self + } } impl<'a> HistogramBuilder<'a, f64> { @@ -198,6 +208,7 @@ impl fmt::Debug for HistogramBuilder<'_, T> { .field("name", &self.name) .field("description", &self.description) .field("unit", &self.unit) + .field("boundaries", &self.boundaries) .field( "kind", &format!("Histogram<{}>", &std::any::type_name::()),