Skip to content

Commit

Permalink
refactor: update metrics Result to be MetricResult (#2241)
Browse files Browse the repository at this point in the history
Co-authored-by: Cijo Thomas <[email protected]>
  • Loading branch information
pitoniak32 and cijothomas authored Oct 26, 2024
1 parent 9382bfb commit f3be05b
Show file tree
Hide file tree
Showing 25 changed files with 162 additions and 159 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ For a deeper discussion, see:

Currently, the Opentelemetry Rust SDK has two ways to handle errors. In the situation where errors are not allowed to return. One should call global error handler to process the errors. Otherwise, one should return the errors.

The Opentelemetry Rust SDK comes with an error type `opentelemetry::Error`. For different function, one error has been defined. All error returned by trace module MUST be wrapped in `opentelemetry::trace::TraceError`. All errors returned by metrics module MUST be wrapped in `opentelemetry::metrics::MetricsError`. All errors returned by logs module MUST be wrapped in `opentelemetry::logs::LogsError`.
The Opentelemetry Rust SDK comes with an error type `opentelemetry::Error`. For different function, one error has been defined. All error returned by trace module MUST be wrapped in `opentelemetry::trace::TraceError`. All errors returned by metrics module MUST be wrapped in `opentelemetry::metrics::MetricError`. All errors returned by logs module MUST be wrapped in `opentelemetry::logs::LogsError`.

For users that want to implement their own exporters. It's RECOMMENDED to wrap all errors from the exporter into a crate-level error type, and implement `ExporterError` trait.

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use once_cell::sync::Lazy;
use opentelemetry::{
global,
metrics::MetricsError,
metrics::MetricError,
trace::{TraceContextExt, TraceError, Tracer},
InstrumentationScope, KeyValue,
};
Expand Down Expand Up @@ -65,7 +65,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
.build())
}

fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, MetricsError> {
fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, MetricError> {
let exporter = MetricsExporter::builder()
.with_http()
.with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use once_cell::sync::Lazy;
use opentelemetry::logs::LogError;
use opentelemetry::metrics::MetricsError;
use opentelemetry::metrics::MetricError;
use opentelemetry::trace::{TraceContextExt, TraceError, Tracer};
use opentelemetry::KeyValue;
use opentelemetry::{global, InstrumentationScope};
Expand Down Expand Up @@ -33,7 +33,7 @@ fn init_tracer_provider() -> Result<sdktrace::TracerProvider, TraceError> {
.build())
}

fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, MetricsError> {
fn init_metrics() -> Result<opentelemetry_sdk::metrics::SdkMeterProvider, MetricError> {
let exporter = MetricsExporter::builder().with_tonic().build()?;

let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::metrics::{MetricsError, Result};
use opentelemetry::metrics::{MetricError, MetricResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;

use crate::{metric::MetricsClient, Error};
Expand All @@ -11,14 +11,14 @@ use super::OtlpHttpClient;

#[async_trait]
impl MetricsClient for OtlpHttpClient {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
let client = self
.client
.lock()
.map_err(Into::into)
.and_then(|g| match &*g {
Some(client) => Ok(Arc::clone(client)),
_ => Err(MetricsError::Other("exporter is already shut down".into())),
_ => Err(MetricError::Other("exporter is already shut down".into())),
})?;

let (body, content_type) = self.build_metrics_export_body(metrics)?;
Expand All @@ -36,12 +36,12 @@ impl MetricsClient for OtlpHttpClient {
client
.send(request)
.await
.map_err(|e| MetricsError::ExportErr(Box::new(Error::RequestFailed(e))))?;
.map_err(|e| MetricError::ExportErr(Box::new(Error::RequestFailed(e))))?;

Ok(())
}

fn shutdown(&self) -> Result<()> {
fn shutdown(&self) -> MetricResult<()> {
let _ = self.client.lock()?.take();

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl HttpExporterBuilder {
pub fn build_metrics_exporter(
mut self,
temporality: opentelemetry_sdk::metrics::data::Temporality,
) -> opentelemetry::metrics::Result<crate::MetricsExporter> {
) -> opentelemetry::metrics::MetricResult<crate::MetricsExporter> {
use crate::{
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS,
OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
Expand Down Expand Up @@ -311,7 +311,7 @@ impl OtlpHttpClient {
fn build_metrics_export_body(
&self,
metrics: &mut opentelemetry_sdk::metrics::data::ResourceMetrics,
) -> opentelemetry::metrics::Result<(Vec<u8>, &'static str)> {
) -> opentelemetry::metrics::MetricResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;

let req: ExportMetricsServiceRequest = (&*metrics).into();
Expand All @@ -320,7 +320,7 @@ impl OtlpHttpClient {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
Ok(json) => Ok((json.into(), "application/json")),
Err(e) => Err(opentelemetry::metrics::MetricsError::Other(e.to_string())),
Err(e) => Err(opentelemetry::metrics::MetricError::Other(e.to_string())),
},
_ => Ok((req.encode_to_vec(), "application/x-protobuf")),
}
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use core::fmt;
use std::sync::Mutex;

use async_trait::async_trait;
use opentelemetry::metrics::{MetricsError, Result};
use opentelemetry::metrics::{MetricError, MetricResult};
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl TonicMetricsClient {

#[async_trait]
impl MetricsClient for TonicMetricsClient {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
let (mut client, metadata, extensions) =
self.inner
.lock()
Expand All @@ -62,14 +62,14 @@ impl MetricsClient for TonicMetricsClient {
.interceptor
.call(Request::new(()))
.map_err(|e| {
MetricsError::Other(format!(
MetricError::Other(format!(
"unexpected status while exporting {e:?}"
))
})?
.into_parts();
Ok((inner.client.clone(), m, e))
}
None => Err(MetricsError::Other("exporter is already shut down".into())),
None => Err(MetricError::Other("exporter is already shut down".into())),
})?;

client
Expand All @@ -84,7 +84,7 @@ impl MetricsClient for TonicMetricsClient {
Ok(())
}

fn shutdown(&self) -> Result<()> {
fn shutdown(&self) -> MetricResult<()> {
let _ = self.inner.lock()?.take();

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl TonicExporterBuilder {
pub(crate) fn build_metrics_exporter(
self,
temporality: opentelemetry_sdk::metrics::data::Temporality,
) -> opentelemetry::metrics::Result<crate::MetricsExporter> {
) -> opentelemetry::metrics::MetricResult<crate::MetricsExporter> {
use crate::MetricsExporter;
use metrics::TonicMetricsClient;

Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::NoExporterBuilderSet;

use async_trait::async_trait;
use core::fmt;
use opentelemetry::metrics::Result;
use opentelemetry::metrics::MetricResult;

use opentelemetry_sdk::metrics::{
data::{ResourceMetrics, Temporality},
Expand Down Expand Up @@ -77,15 +77,15 @@ impl<C> MetricsExporterBuilder<C> {

#[cfg(feature = "grpc-tonic")]
impl MetricsExporterBuilder<TonicExporterBuilderSet> {
pub fn build(self) -> Result<MetricsExporter> {
pub fn build(self) -> MetricResult<MetricsExporter> {
let exporter = self.client.0.build_metrics_exporter(self.temporality)?;
Ok(exporter)
}
}

#[cfg(any(feature = "http-proto", feature = "http-json"))]
impl MetricsExporterBuilder<HttpExporterBuilderSet> {
pub fn build(self) -> Result<MetricsExporter> {
pub fn build(self) -> MetricResult<MetricsExporter> {
let exporter = self.client.0.build_metrics_exporter(self.temporality)?;
Ok(exporter)
}
Expand Down Expand Up @@ -122,8 +122,8 @@ impl HasHttpConfig for MetricsExporterBuilder<HttpExporterBuilderSet> {
/// An interface for OTLP metrics clients
#[async_trait]
pub trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()>;
fn shutdown(&self) -> Result<()>;
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
fn shutdown(&self) -> MetricResult<()>;
}

/// Export metrics in OTEL format.
Expand All @@ -140,16 +140,16 @@ impl Debug for MetricsExporter {

#[async_trait]
impl PushMetricsExporter for MetricsExporter {
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> {
self.client.export(metrics).await
}

async fn force_flush(&self) -> Result<()> {
async fn force_flush(&self) -> MetricResult<()> {
// this component is stateless
Ok(())
}

fn shutdown(&self) -> Result<()> {
fn shutdown(&self) -> MetricResult<()> {
self.client.shutdown()
}

Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-proto/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod tonic {
use std::any::Any;
use std::fmt;

use opentelemetry::{global, metrics::MetricsError, Key, Value};
use opentelemetry::{global, metrics::MetricError, Key, Value};
use opentelemetry_sdk::metrics::data::{
self, Exemplar as SdkExemplar, ExponentialHistogram as SdkExponentialHistogram,
Gauge as SdkGauge, Histogram as SdkHistogram, Metric as SdkMetric,
Expand Down Expand Up @@ -97,7 +97,7 @@ pub mod tonic {
Temporality::Cumulative => AggregationTemporality::Cumulative,
Temporality::Delta => AggregationTemporality::Delta,
other => {
opentelemetry::global::handle_error(MetricsError::Other(format!(
opentelemetry::global::handle_error(MetricError::Other(format!(
"Unknown temporality {:?}, using default instead.",
other
)));
Expand Down Expand Up @@ -184,7 +184,7 @@ pub mod tonic {
} else if let Some(gauge) = data.downcast_ref::<SdkGauge<f64>>() {
Ok(TonicMetricData::Gauge(gauge.into()))
} else {
global::handle_error(MetricsError::Other("unknown aggregator".into()));
global::handle_error(MetricError::Other("unknown aggregator".into()));
Err(())
}
}
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{Arc, Weak};

use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use opentelemetry::{
metrics::{Counter, Histogram, MeterProvider as _, Result},
metrics::{Counter, Histogram, MeterProvider as _, MetricResult},
Key, KeyValue,
};
use opentelemetry_sdk::{
Expand All @@ -25,15 +25,15 @@ impl MetricReader for SharedReader {
self.0.register_pipeline(pipeline)
}

fn collect(&self, rm: &mut ResourceMetrics) -> Result<()> {
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
self.0.collect(rm)
}

fn force_flush(&self) -> Result<()> {
fn force_flush(&self) -> MetricResult<()> {
self.0.force_flush()
}

fn shutdown(&self) -> Result<()> {
fn shutdown(&self) -> MetricResult<()> {
self.0.shutdown()
}

Expand Down
18 changes: 9 additions & 9 deletions opentelemetry-sdk/src/metrics/aggregation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;

use crate::metrics::internal::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use opentelemetry::metrics::{MetricsError, Result};
use opentelemetry::metrics::{MetricError, MetricResult};

/// The way recorded measurements are summarized.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -109,7 +109,7 @@ impl fmt::Display for Aggregation {

impl Aggregation {
/// Validate that this aggregation has correct configuration
pub fn validate(&self) -> Result<()> {
pub fn validate(&self) -> MetricResult<()> {
match self {
Aggregation::Drop => Ok(()),
Aggregation::Default => Ok(()),
Expand All @@ -118,7 +118,7 @@ impl Aggregation {
Aggregation::ExplicitBucketHistogram { boundaries, .. } => {
for x in boundaries.windows(2) {
if x[0] >= x[1] {
return Err(MetricsError::Config(format!(
return Err(MetricError::Config(format!(
"aggregation: explicit bucket histogram: non-monotonic boundaries: {:?}",
boundaries,
)));
Expand All @@ -129,13 +129,13 @@ impl Aggregation {
}
Aggregation::Base2ExponentialHistogram { max_scale, .. } => {
if *max_scale > EXPO_MAX_SCALE {
return Err(MetricsError::Config(format!(
return Err(MetricError::Config(format!(
"aggregation: exponential histogram: max scale ({}) is greater than 20",
max_scale,
)));
}
if *max_scale < EXPO_MIN_SCALE {
return Err(MetricsError::Config(format!(
return Err(MetricError::Config(format!(
"aggregation: exponential histogram: max scale ({}) is less than -10",
max_scale,
)));
Expand All @@ -153,17 +153,17 @@ mod tests {
internal::{EXPO_MAX_SCALE, EXPO_MIN_SCALE},
Aggregation,
};
use opentelemetry::metrics::{MetricsError, Result};
use opentelemetry::metrics::{MetricError, MetricResult};

#[test]
fn validate_aggregation() {
struct TestCase {
name: &'static str,
input: Aggregation,
check: Box<dyn Fn(Result<()>) -> bool>,
check: Box<dyn Fn(MetricResult<()>) -> bool>,
}
let ok = Box::new(|result: Result<()>| result.is_ok());
let config_error = Box::new(|result| matches!(result, Err(MetricsError::Config(_))));
let ok = Box::new(|result: MetricResult<()>| result.is_ok());
let config_error = Box::new(|result| matches!(result, Err(MetricError::Config(_))));

let test_cases: Vec<TestCase> = vec![
TestCase {
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Interfaces for exporting metrics
use async_trait::async_trait;

use opentelemetry::metrics::Result;
use opentelemetry::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;

Expand All @@ -18,16 +18,16 @@ pub trait PushMetricsExporter: Send + Sync + 'static {
/// implement any retry logic. All errors returned by this function are
/// considered unrecoverable and will be reported to a configured error
/// Handler.
async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()>;
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;

/// Flushes any metric data held by an exporter.
async fn force_flush(&self) -> Result<()>;
async fn force_flush(&self) -> MetricResult<()>;

/// Releases any held computational resources.
///
/// After Shutdown is called, calls to Export will perform no operation and
/// instead will return an error indicating the shutdown state.
fn shutdown(&self) -> Result<()>;
fn shutdown(&self) -> MetricResult<()>;

/// Access the [Temporality] of the MetricsExporter.
fn temporality(&self) -> Temporality;
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::metrics::MetricsError;
use opentelemetry::metrics::MetricError;
use opentelemetry::{global, otel_warn, KeyValue};

use crate::metrics::AttributeSet;
Expand Down Expand Up @@ -146,7 +146,7 @@ impl<AU: AtomicallyUpdate<T>, T: Number, O: Operation> ValueMap<AU, T, O> {
let new_tracker = AU::new_atomic_tracker(self.buckets_count);
O::update_tracker(&new_tracker, measurement, index);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
global::handle_error(MetricError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged.".into()));
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
Expand Down
Loading

0 comments on commit f3be05b

Please sign in to comment.