diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 97c90efd2a..0acf176517 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,11 +73,11 @@ jobs: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@nightly with: - toolchain: nightly-2024-05-01 + toolchain: nightly-2024-06-30 components: rustfmt - name: external-type-check run: | - cargo install cargo-check-external-types + cargo install cargo-check-external-types@0.1.13 cd ${{ matrix.example }} cargo check-external-types --config allowed-external-types.toml non-default-examples: diff --git a/Cargo.toml b/Cargo.toml index e08de8187d..a91ecbb4e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ hyper-util = "0.1" log = "0.4.21" once_cell = "1.13" ordered-float = "4.0" -pin-project-lite = "=0.2.14" # 0.2.15 is failing for cargo-check-external-types +pin-project-lite = "0.2" prost = "0.13" prost-build = "0.13" prost-types = "0.13" diff --git a/README.md b/README.md index 0780b84d47..27bc095c22 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ observability tools. *OpenTelemetry Rust is not introducing a new end user callable Logging API. Instead, it provides [Logs Bridge -API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/bridge-api.md), +API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/api.md), that allows one to write log appenders that can bridge existing logging libraries to the OpenTelemetry log data model. The following log appenders are available: diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index ff1d5f0a72..95ba0613bc 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -20,7 +20,7 @@ bytes = { workspace = true } http = { workspace = true } http-body-util = { workspace = true, optional = true } hyper = { workspace = true, optional = true } -hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true } +hyper-util = { workspace = true, features = ["client-legacy", "http1", "http2"], optional = true } opentelemetry = { version = "0.26", path = "../opentelemetry", features = ["trace"] } reqwest = { workspace = true, features = ["blocking"], optional = true } tokio = { workspace = true, features = ["time"], optional = true } diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index bed95cd389..094fdb4664 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -108,7 +108,10 @@ pub mod hyper { use http::HeaderValue; use http_body_util::{BodyExt, Full}; use hyper::body::{Body as HttpBody, Frame}; - use hyper_util::client::legacy::{connect::Connect, Client}; + use hyper_util::client::legacy::{ + connect::{Connect, HttpConnector}, + Client, + }; use std::fmt::Debug; use std::pin::Pin; use std::task::{self, Poll}; @@ -116,39 +119,42 @@ pub mod hyper { use tokio::time; #[derive(Debug, Clone)] - pub struct HyperClient { + pub struct HyperClient + where + C: Connect + Clone + Send + Sync + 'static, + { inner: Client, timeout: Duration, authorization: Option, } - impl HyperClient { - pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { + impl HyperClient + where + C: Connect + Clone + Send + Sync + 'static, + { + pub fn new(connector: C, timeout: Duration, authorization: Option) -> Self { + // TODO - support custom executor + let inner = Client::builder(hyper_util::rt::TokioExecutor::new()).build(connector); Self { inner, timeout, - authorization: None, + authorization, } } + } - pub fn new_with_timeout_and_authorization_header( - inner: Client, + impl HyperClient { + /// Creates a new `HyperClient` with a default `HttpConnector`. + pub fn with_default_connector( timeout: Duration, - authorization: HeaderValue, + authorization: Option, ) -> Self { - Self { - inner, - timeout, - authorization: Some(authorization), - } + Self::new(HttpConnector::new(), timeout, authorization) } } #[async_trait] - impl HttpClient for HyperClient - where - C: Connect + Send + Sync + Clone + Debug + 'static, - { + impl HttpClient for HyperClient { async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); let mut request = Request::from_parts(parts, Body(Full::from(body))); diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index fce91cbb61..02d6b145cc 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -53,6 +53,10 @@ Released 2024-Sep-30 - `MetricsExporter` -> `MetricExporter` - `MetricsExporterBuilder` -> `MetricExporterBuilder` + - [#2263](https://github.com/open-telemetry/opentelemetry-rust/pull/2263) + Support `hyper` client for opentelemetry-otlp. This can be enabled using flag `hyper-client`. + Refer example: https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-otlp/examples/basic-otlp-http + ## v0.25.0 - Update `opentelemetry` dependency version to 0.25 diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index 272481053e..9dcf9547c7 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -80,6 +80,7 @@ reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"] reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] reqwest-rustls = ["reqwest", "opentelemetry-http/reqwest-rustls"] reqwest-rustls-webpki-roots = ["reqwest", "opentelemetry-http/reqwest-rustls-webpki-roots"] +hyper-client = ["opentelemetry-http/hyper"] # test integration-testing = ["tonic", "prost", "tokio/full", "trace"] diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index ccbe22e960..89df249f2d 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -8,15 +8,15 @@ publish = false [features] default = ["reqwest"] reqwest = ["opentelemetry-otlp/reqwest-client"] -hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"] +hyper = ["opentelemetry-otlp/hyper-client"] [dependencies] once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } -opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"] } -opentelemetry-http = { path = "../../../opentelemetry-http", optional = true } -opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "reqwest-client", "logs"] } +opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "metrics", "logs"]} +opentelemetry-http = { path = "../../../opentelemetry-http", optional = true, default-features = false} +opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs"] , default-features = false} opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-conventions" } @@ -24,8 +24,6 @@ async-trait = { workspace = true, optional = true } bytes = { workspace = true, optional = true } http = { workspace = true, optional = true } http-body-util = { workspace = true, optional = true } -hyper = { workspace = true, features = ["client"], optional = true } -hyper-util = { workspace = true, features = ["client-legacy"], optional = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true, features = ["std"]} tracing-core = { workspace = true } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs deleted file mode 100644 index 80a28ae62d..0000000000 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs +++ /dev/null @@ -1,49 +0,0 @@ -use async_trait::async_trait; -use bytes::Bytes; -use http::{Request, Response}; -use http_body_util::{BodyExt, Full}; -use hyper_util::{ - client::legacy::{ - connect::{Connect, HttpConnector}, - Client, - }, - rt::TokioExecutor, -}; -use opentelemetry_http::{HttpClient, HttpError, ResponseExt}; - -pub struct HyperClient { - inner: hyper_util::client::legacy::Client>, -} - -impl Default for HyperClient { - fn default() -> Self { - Self { - inner: Client::builder(TokioExecutor::new()).build_http(), - } - } -} - -impl std::fmt::Debug for HyperClient { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("HyperClient") - .field("inner", &self.inner) - .finish() - } -} - -#[async_trait] -impl HttpClient for HyperClient { - async fn send(&self, request: Request>) -> Result, HttpError> { - let request = request.map(|body| Full::new(Bytes::from(body))); - - let (parts, body) = self - .inner - .request(request) - .await? - .error_for_status()? - .into_parts(); - let body = body.collect().await?.to_bytes(); - - Ok(Response::from_parts(parts, body)) - } -} diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 1ff36c7549..c67d6b21c5 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -1,3 +1,4 @@ +/// To use hyper as the HTTP client - cargo run --features="hyper" --no-default-features use once_cell::sync::Lazy; use opentelemetry::{ global, @@ -23,12 +24,6 @@ use tracing::info; use tracing_subscriber::prelude::*; use tracing_subscriber::EnvFilter; -#[cfg(feature = "hyper")] -use opentelemetry_otlp::WithHttpConfig; - -#[cfg(feature = "hyper")] -mod hyper; - static RESOURCE: Lazy = Lazy::new(|| { Resource::new(vec![KeyValue::new( opentelemetry_semantic_conventions::resource::SERVICE_NAME, @@ -37,15 +32,11 @@ static RESOURCE: Lazy = Lazy::new(|| { }); fn init_logs() -> Result { - let exporter_builder = LogExporter::builder() + let exporter = LogExporter::builder() .with_http() .with_endpoint("http://localhost:4318/v1/logs") - .with_protocol(Protocol::HttpBinary); - - #[cfg(feature = "hyper")] - let exporter_builder = exporter_builder.with_http_client(hyper::HyperClient::default()); - - let exporter = exporter_builder.build()?; + .with_protocol(Protocol::HttpBinary) + .build()?; Ok(LoggerProvider::builder() .with_batch_exporter(exporter, runtime::Tokio) @@ -59,6 +50,7 @@ fn init_tracer_provider() -> Result { .with_protocol(Protocol::HttpBinary) //can be changed to `Protocol::HttpJson` to export in JSON format .with_endpoint("http://localhost:4318/v1/traces") .build()?; + Ok(TracerProvider::builder() .with_batch_exporter(exporter, runtime::Tokio) .with_config(Config::default().with_resource(RESOURCE.clone())) diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index 74ae46817d..154871887b 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -33,12 +33,20 @@ mod logs; #[cfg(feature = "trace")] mod trace; +#[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "hyper-client" +))] +use opentelemetry_http::hyper::HyperClient; + /// Configuration of the http transport #[derive(Debug)] #[cfg_attr( all( not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client") + not(feature = "reqwest-blocking-client"), + not(feature = "hyper-client") ), derive(Default) )] @@ -50,19 +58,36 @@ pub struct HttpConfig { headers: Option>, } -#[cfg(any(feature = "reqwest-blocking-client", feature = "reqwest-client",))] +#[cfg(any( + feature = "reqwest-blocking-client", + feature = "reqwest-client", + feature = "hyper-client" +))] impl Default for HttpConfig { fn default() -> Self { + #[cfg(feature = "reqwest-blocking-client")] + let default_client = + Some(Arc::new(reqwest::blocking::Client::new()) as Arc); + #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))] + let default_client = Some(Arc::new(reqwest::Client::new()) as Arc); + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "hyper-client" + ))] + // TODO - support configuring custom connector and executor + let default_client = Some(Arc::new(HyperClient::with_default_connector( + Duration::from_secs(10), + None, + )) as Arc); + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + not(feature = "hyper-client") + ))] + let default_client = None; HttpConfig { - #[cfg(feature = "reqwest-blocking-client")] - client: Some(Arc::new(reqwest::blocking::Client::new())), - #[cfg(all(not(feature = "reqwest-blocking-client"), feature = "reqwest-client"))] - client: Some(Arc::new(reqwest::Client::new())), - #[cfg(all( - not(feature = "reqwest-client"), - not(feature = "reqwest-blocking-client") - ))] - client: None, + client: default_client, headers: None, } } @@ -140,13 +165,11 @@ impl HttpExporterBuilder { }, None => self.exporter_config.timeout, }; - let http_client = self .http_config .client .take() .ok_or(crate::Error::NoHttpClient)?; - #[allow(clippy::mutable_key_type)] // http headers are not mutated let mut headers: HashMap = self .http_config diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index e85e0ece55..13f4200112 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -352,6 +352,7 @@ impl ExpoHistogram { pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) { let f_value = value.into_float(); // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? if f_value.is_infinite() || f_value.is_nan() { return; } diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index 089415ba7c..4da6144c2f 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -7,23 +7,22 @@ use crate::metrics::data::HistogramDataPoint; use crate::metrics::data::{self, Aggregation, Temporality}; use opentelemetry::KeyValue; -use super::Number; -use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap}; - -struct HistogramUpdate; - -impl Operation for HistogramUpdate { - fn update_tracker>(tracker: &AT, value: T, index: usize) { - tracker.update_histogram(index, value); - } -} +use super::ValueMap; +use super::{Aggregator, Number}; struct HistogramTracker { buckets: Mutex>, } -impl AtomicTracker for HistogramTracker { - fn update_histogram(&self, index: usize, value: T) { +impl Aggregator for HistogramTracker +where + T: Number, +{ + type InitConfig = usize; + /// Value and bucket index + type PreComputedValue = (T, usize); + + fn update(&self, (value, index): (T, usize)) { let mut buckets = match self.buckets.lock() { Ok(guard) => guard, Err(_) => return, @@ -32,15 +31,10 @@ impl AtomicTracker for HistogramTracker { buckets.bin(index, value); buckets.sum(value); } -} - -impl AtomicallyUpdate for HistogramTracker { - type AtomicTracker = HistogramTracker; - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker { - let count = buckets_count.unwrap(); + fn create(count: &usize) -> Self { HistogramTracker { - buckets: Mutex::new(Buckets::::new(count)), + buckets: Mutex::new(Buckets::::new(*count)), } } } @@ -94,7 +88,7 @@ impl Buckets { /// Summarizes a set of measurements as a histogram with explicitly defined /// buckets. pub(crate) struct Histogram { - value_map: ValueMap, T, HistogramUpdate>, + value_map: ValueMap>, bounds: Vec, record_min_max: bool, record_sum: bool, @@ -103,9 +97,11 @@ pub(crate) struct Histogram { impl Histogram { pub(crate) fn new(boundaries: Vec, record_min_max: bool, record_sum: bool) -> Self { + // TODO fix the bug, by first removing NaN and only then getting buckets_count + // once we know the reason for performance degradation let buckets_count = boundaries.len() + 1; let mut histogram = Histogram { - value_map: ValueMap::new_with_buckets_count(buckets_count), + value_map: ValueMap::new(buckets_count), bounds: boundaries, record_min_max, record_sum, @@ -122,14 +118,20 @@ impl Histogram { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { let f = measurement.into_float(); - + // Ignore NaN and infinity. + // Only makes sense if T is f64, maybe this could be no-op for other cases? + // TODO: uncomment once we know the reason for performance degradation + // if f.is_infinite() || f.is_nan() { + // return; + // } // This search will return an index in the range `[0, bounds.len()]`, where // it will return `bounds.len()` if value is greater than the last element // of `bounds`. This aligns with the buckets in that the length of buckets // is `bounds.len()+1`, with the last bucket representing: // `(bounds[bounds.len()-1], +∞)`. let index = self.bounds.partition_point(|&x| x < f); - self.value_map.measure(measurement, attrs, index); + + self.value_map.measure((measurement, index), attrs); } pub(crate) fn delta( @@ -350,3 +352,69 @@ impl Histogram { (h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>)) } } + +// TODO: uncomment once we know the reason for performance degradation +// #[cfg(test)] +// mod tests { + +// use super::*; + +// #[test] +// fn when_f64_is_nan_or_infinity_then_ignore() { +// struct Expected { +// min: f64, +// max: f64, +// sum: f64, +// count: u64, +// } +// impl Expected { +// fn new(min: f64, max: f64, sum: f64, count: u64) -> Self { +// Expected { +// min, +// max, +// sum, +// count, +// } +// } +// } +// struct TestCase { +// values: Vec, +// expected: Expected, +// } + +// let test_cases = vec![ +// TestCase { +// values: vec![2.0, 4.0, 1.0], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![2.0, 4.0, 1.0, f64::INFINITY], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![2.0, 4.0, 1.0, -f64::INFINITY], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![2.0, f64::NAN, 4.0, 1.0], +// expected: Expected::new(1.0, 4.0, 7.0, 3), +// }, +// TestCase { +// values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0], +// expected: Expected::new(1.0, 16.0, 31.0, 6), +// }, +// ]; + +// for test in test_cases { +// let h = Histogram::new(vec![], true, true); +// for v in test.values { +// h.measure(v, &[]); +// } +// let res = h.value_map.no_attribute_tracker.buckets.lock().unwrap(); +// assert_eq!(test.expected.max, res.max); +// assert_eq!(test.expected.min, res.min); +// assert_eq!(test.expected.sum, res.total); +// assert_eq!(test.expected.count, res.count); +// } +// } +// } diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index d1eab4fada..e4c9433f9a 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -7,25 +7,51 @@ use std::{ use crate::metrics::data::DataPoint; use opentelemetry::KeyValue; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap}; + +/// this is reused by PrecomputedSum +pub(crate) struct Assign +where + T: AtomicallyUpdate, +{ + pub(crate) value: T::AtomicTracker, +} + +impl Aggregator for Assign +where + T: Number, +{ + type InitConfig = (); + type PreComputedValue = T; + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, value: T) { + self.value.store(value) + } +} /// Summarizes a set of measurements as the last one made. pub(crate) struct LastValue { - value_map: ValueMap, + value_map: ValueMap>, start: Mutex, } impl LastValue { pub(crate) fn new() -> Self { LastValue { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), start: Mutex::new(SystemTime::now()), } } pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to LastValue. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec>) { @@ -49,7 +75,11 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -66,7 +96,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -101,7 +131,7 @@ impl LastValue { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -118,7 +148,7 @@ impl LastValue { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 6d3b012d1d..6b5470f633 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -7,7 +7,6 @@ mod sum; use core::fmt; use std::collections::HashMap; -use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -23,79 +22,65 @@ use crate::metrics::AttributeSet; pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); -/// Abstracts the update operation for a measurement. -pub(crate) trait Operation { - fn update_tracker>(tracker: &AT, value: T, index: usize); -} - -struct Increment; +pub(crate) trait Aggregator +where + T: Number, +{ + /// A static configuration that is needed in order to initialize aggregator. + /// E.g. bucket_size at creation time . + type InitConfig; -impl Operation for Increment { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.add(value); - } -} + /// Some aggregators can do some computations before updating aggregator. + /// This helps to reduce contention for aggregators because it makes + /// [`Aggregator::update`] as short as possible. + type PreComputedValue; -struct Assign; + /// Called everytime a new attribute-set is stored. + fn create(init: &Self::InitConfig) -> Self; -impl Operation for Assign { - fn update_tracker>(tracker: &AT, value: T, _: usize) { - tracker.store(value); - } + /// Called for each measurement. + fn update(&self, value: Self::PreComputedValue); } /// The storage for sums. /// /// This structure is parametrized by an `Operation` that indicates how /// updates to the underlying value trackers should be performed. -pub(crate) struct ValueMap, T: Number, O> { +pub(crate) struct ValueMap +where + T: Number, + A: Aggregator, +{ /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: RwLock, Arc>>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. has_no_attribute_value: AtomicBool, /// Tracker for values with no attributes attached. - no_attribute_tracker: AU::AtomicTracker, - /// Buckets Count is only used by Histogram. - buckets_count: Option, - phantom: PhantomData, + no_attribute_tracker: A, + /// Configuration for an Aggregator + config: A::InitConfig, } -impl, T: Number, O> Default for ValueMap { - fn default() -> Self { - ValueMap::new() - } -} - -impl, T: Number, O> ValueMap { - fn new() -> Self { - ValueMap { - trackers: RwLock::new(HashMap::new()), - has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(None), - count: AtomicUsize::new(0), - buckets_count: None, - phantom: PhantomData, - } - } - - fn new_with_buckets_count(buckets_count: usize) -> Self { +impl ValueMap +where + T: Number, + A: Aggregator, +{ + fn new(config: A::InitConfig) -> Self { ValueMap { trackers: RwLock::new(HashMap::new()), has_no_attribute_value: AtomicBool::new(false), - no_attribute_tracker: AU::new_atomic_tracker(Some(buckets_count)), + no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), - buckets_count: Some(buckets_count), - phantom: PhantomData, + config, } } -} -impl, T: Number, O: Operation> ValueMap { - fn measure(&self, measurement: T, attributes: &[KeyValue], index: usize) { + fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) { if attributes.is_empty() { - O::update_tracker(&self.no_attribute_tracker, measurement, index); + self.no_attribute_tracker.update(value); self.has_no_attribute_value.store(true, Ordering::Release); return; } @@ -106,14 +91,14 @@ impl, T: Number, O: Operation> ValueMap { // Try to retrieve and update the tracker with the attributes in the provided order first if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); return; } // Try to retrieve and update the tracker with the attributes sorted. let sorted_attrs = AttributeSet::from(attributes).into_vec(); if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); return; } @@ -127,12 +112,12 @@ impl, T: Number, O: Operation> ValueMap { // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. if let Some(tracker) = trackers.get(attributes) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { - O::update_tracker(&**tracker, measurement, index); + tracker.update(value); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { - let new_tracker = Arc::new(AU::new_atomic_tracker(self.buckets_count)); - O::update_tracker(&*new_tracker, measurement, index); + let new_tracker = Arc::new(A::create(&self.config)); + new_tracker.update(value); // Insert tracker with the attributes in the provided and sorted orders trackers.insert(attributes.to_vec(), new_tracker.clone()); @@ -140,10 +125,10 @@ impl, T: Number, O: Operation> ValueMap { self.count.fetch_add(1, Ordering::SeqCst); } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { - O::update_tracker(&**overflow_value, measurement, index); + overflow_value.update(value); } else { - let new_tracker = AU::new_atomic_tracker(self.buckets_count); - O::update_tracker(&new_tracker, measurement, index); + let new_tracker = A::create(&self.config); + new_tracker.update(value); trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker)); otel_warn!( name: "ValueMap.measure", message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged." @@ -154,22 +139,17 @@ impl, T: Number, O: Operation> ValueMap { /// Marks a type that can have a value added and retrieved atomically. Required since /// different types have different backing atomic mechanisms -pub(crate) trait AtomicTracker: Sync + Send + 'static { - fn store(&self, _value: T) {} - fn add(&self, _value: T) {} - fn get_value(&self) -> T { - T::default() - } - fn get_and_reset_value(&self) -> T { - T::default() - } - fn update_histogram(&self, _index: usize, _value: T) {} +pub(crate) trait AtomicTracker: Sync + Send + 'static { + fn store(&self, _value: T); + fn add(&self, _value: T); + fn get_value(&self) -> T; + fn get_and_reset_value(&self) -> T; } /// Marks a type that can have an atomic tracker generated for it -pub(crate) trait AtomicallyUpdate { +pub(crate) trait AtomicallyUpdate { type AtomicTracker: AtomicTracker; - fn new_atomic_tracker(buckets_count: Option) -> Self::AtomicTracker; + fn new_atomic_tracker(init: T) -> Self::AtomicTracker; } pub(crate) trait Number: @@ -256,8 +236,8 @@ impl AtomicTracker for AtomicU64 { impl AtomicallyUpdate for u64 { type AtomicTracker = AtomicU64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicU64::new(0) + fn new_atomic_tracker(init: u64) -> Self::AtomicTracker { + AtomicU64::new(init) } } @@ -282,8 +262,8 @@ impl AtomicTracker for AtomicI64 { impl AtomicallyUpdate for i64 { type AtomicTracker = AtomicI64; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - AtomicI64::new(0) + fn new_atomic_tracker(init: i64) -> Self::AtomicTracker { + AtomicI64::new(init) } } @@ -292,10 +272,10 @@ pub(crate) struct F64AtomicTracker { } impl F64AtomicTracker { - fn new() -> Self { - let zero_as_u64 = 0.0_f64.to_bits(); + fn new(init: f64) -> Self { + let value_as_u64 = init.to_bits(); F64AtomicTracker { - inner: AtomicU64::new(zero_as_u64), + inner: AtomicU64::new(value_as_u64), } } } @@ -344,8 +324,8 @@ impl AtomicTracker for F64AtomicTracker { impl AtomicallyUpdate for f64 { type AtomicTracker = F64AtomicTracker; - fn new_atomic_tracker(_: Option) -> Self::AtomicTracker { - F64AtomicTracker::new() + fn new_atomic_tracker(init: f64) -> Self::AtomicTracker { + F64AtomicTracker::new(init) } } @@ -355,7 +335,7 @@ mod tests { #[test] fn can_store_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -368,7 +348,7 @@ mod tests { #[test] fn can_add_and_get_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); atomic.add(10); @@ -378,7 +358,7 @@ mod tests { #[test] fn can_reset_u64_atomic_value() { - let atomic = u64::new_atomic_tracker(None); + let atomic = u64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -390,7 +370,7 @@ mod tests { #[test] fn can_store_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -407,7 +387,7 @@ mod tests { #[test] fn can_add_and_get_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); atomic.add(-10); @@ -417,7 +397,7 @@ mod tests { #[test] fn can_reset_i64_atomic_value() { - let atomic = i64::new_atomic_tracker(None); + let atomic = i64::new_atomic_tracker(0); atomic.add(15); let value = atomic.get_and_reset_value(); @@ -429,7 +409,7 @@ mod tests { #[test] fn can_store_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); let atomic_tracker = &atomic as &dyn AtomicTracker; let value = atomic.get_value(); @@ -446,7 +426,7 @@ mod tests { #[test] fn can_add_and_get_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.3); atomic.add(10.4); @@ -457,7 +437,7 @@ mod tests { #[test] fn can_reset_f64_atomic_value() { - let atomic = f64::new_atomic_tracker(None); + let atomic = f64::new_atomic_tracker(0.0); atomic.add(15.5); let value = atomic.get_and_reset_value(); diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index 060c7baaa6..f08f70b73e 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -2,7 +2,7 @@ use opentelemetry::KeyValue; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; -use super::{Assign, AtomicTracker, Number, ValueMap}; +use super::{last_value::Assign, AtomicTracker, Number, ValueMap}; use std::{ collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc, Mutex}, @@ -11,7 +11,7 @@ use std::{ /// Summarizes a set of pre-computed sums as their arithmetic sum. pub(crate) struct PrecomputedSum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, reported: Mutex, T>>, @@ -20,7 +20,7 @@ pub(crate) struct PrecomputedSum { impl PrecomputedSum { pub(crate) fn new(monotonic: bool) -> Self { PrecomputedSum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), reported: Mutex::new(Default::default()), @@ -29,7 +29,7 @@ impl PrecomputedSum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to PrecomputedSum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -73,7 +73,7 @@ impl PrecomputedSum { .has_no_attribute_value .swap(false, Ordering::AcqRel) { - let value = self.value_map.no_attribute_tracker.get_value(); + let value = self.value_map.no_attribute_tracker.value.get_value(); let delta = value - *reported.get(&vec![]).unwrap_or(&T::default()); new_reported.insert(vec![], value); @@ -94,7 +94,7 @@ impl PrecomputedSum { let mut seen = HashSet::new(); for (attrs, tracker) in trackers.drain() { if seen.insert(Arc::as_ptr(&tracker)) { - let value = tracker.get_value(); + let value = tracker.value.get_value(); let delta = value - *reported.get(&attrs).unwrap_or(&T::default()); new_reported.insert(attrs.clone(), value); s_data.data_points.push(DataPoint { @@ -162,7 +162,7 @@ impl PrecomputedSum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -179,7 +179,7 @@ impl PrecomputedSum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index 66af75734d..17d81ca262 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -7,12 +7,37 @@ use std::{sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; use opentelemetry::KeyValue; -use super::{AtomicTracker, Number}; -use super::{Increment, ValueMap}; +use super::{Aggregator, AtomicTracker, Number}; +use super::{AtomicallyUpdate, ValueMap}; + +struct Increment +where + T: AtomicallyUpdate, +{ + value: T::AtomicTracker, +} + +impl Aggregator for Increment +where + T: Number, +{ + type InitConfig = (); + type PreComputedValue = T; + + fn create(_init: &()) -> Self { + Self { + value: T::new_atomic_tracker(T::default()), + } + } + + fn update(&self, value: T) { + self.value.add(value) + } +} /// Summarizes a set of measurements made as their arithmetic sum. pub(crate) struct Sum { - value_map: ValueMap, + value_map: ValueMap>, monotonic: bool, start: Mutex, } @@ -25,7 +50,7 @@ impl Sum { /// were made in. pub(crate) fn new(monotonic: bool) -> Self { Sum { - value_map: ValueMap::new(), + value_map: ValueMap::new(()), monotonic, start: Mutex::new(SystemTime::now()), } @@ -33,7 +58,7 @@ impl Sum { pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) { // The argument index is not applicable to Sum. - self.value_map.measure(measurement, attrs, 0); + self.value_map.measure(measurement, attrs); } pub(crate) fn delta( @@ -76,7 +101,11 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_and_reset_value(), + value: self + .value_map + .no_attribute_tracker + .value + .get_and_reset_value(), exemplars: vec![], }); } @@ -93,7 +122,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } @@ -152,7 +181,7 @@ impl Sum { attributes: vec![], start_time: Some(prev_start), time: Some(t), - value: self.value_map.no_attribute_tracker.get_value(), + value: self.value_map.no_attribute_tracker.value.get_value(), exemplars: vec![], }); } @@ -173,7 +202,7 @@ impl Sum { attributes: attrs.clone(), start_time: Some(prev_start), time: Some(t), - value: tracker.get_value(), + value: tracker.value.get_value(), exemplars: vec![], }); } diff --git a/opentelemetry/README.md b/opentelemetry/README.md index 102e8431d4..d96e2851f2 100644 --- a/opentelemetry/README.md +++ b/opentelemetry/README.md @@ -56,7 +56,7 @@ Here's a breakdown of its components: Allows for the attachment of metadata (baggage) to telemetry, which can be used for sharing application-specific information across service boundaries. - **[Logs Bridge - API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/bridge-api.md):** + API](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/api.md):** Allows to bridge existing logging mechanisms with OpenTelemetry logging. This is **NOT** meant for end users to call, instead it is meant to enable writing bridges/appenders for existing logging mechanisms such as