Skip to content

Commit

Permalink
feat: Update opentelemetry deps
Browse files Browse the repository at this point in the history
  • Loading branch information
skomski committed Dec 18, 2023
1 parent c697f4d commit 7125cca
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 98 deletions.
21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ publish = ["famedly"]


[dependencies]
axum = { version = "0.6.1", features = ["headers"] }
bytes = "1.1.0"
futures = "0.3.21"
opentelemetry = { version = "0.18.0", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = "0.11.0"
prometheus = "0.13.0"
time = "0.3.9"
tokio = { version = "1.17.0", features = ["rt-multi-thread", "macros"] }
tower = "0.4.12"
axum = { version = "0.6.20", features = ["headers"] }
bytes = "1.5.0"
futures = "0.3.29"
opentelemetry = { version = "0.21.0", features = ["metrics"] }
opentelemetry_sdk = { version = "0.21.1", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = "0.14.1"
prometheus = "0.13.3"
time = "0.3.30"
tokio = { version = "1.35.0", features = ["rt-multi-thread", "macros"] }
tower = "0.4.13"

[dev-dependencies]
futures = "0.3.21"
http = "0.2.6"
http = "1.0.0"
regex = "1.5.5"
5 changes: 2 additions & 3 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use std::net::SocketAddr;

use axum::{response::IntoResponse, routing::get, Extension, Router};
use opentelemetry::{metrics::Counter, Context};
use opentelemetry::metrics::Counter;
use tokio::time::{sleep, Duration};

async fn uwu() -> impl IntoResponse {
Expand All @@ -22,10 +22,9 @@ async fn uwu() -> impl IntoResponse {
/// Foxes need some time to be counted!
/// (Will increase the fox counter every 100ms until it counts 50 foxes)
async fn fooox(Extension(fox_counter): Extension<Counter<u64>>) -> impl IntoResponse {
let ctx = Context::current();
for _ in 0..50 {
sleep(Duration::from_millis(100)).await;
fox_counter.add(&ctx, 1, &[]);
fox_counter.add(1, &[]);
}
"Counted lots of foxes!!!"
}
Expand Down
91 changes: 35 additions & 56 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
fmt,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use axum::{
Expand All @@ -17,16 +16,12 @@ use axum::{
};
use futures::future::BoxFuture;
use opentelemetry::{
metrics::{Counter, Histogram, Meter, MeterProvider},
sdk::{
export::metrics::aggregation,
metrics::{controllers, processors, selectors},
Resource,
},
global,
metrics::{Counter, Histogram, Meter, MeterProvider as _},
KeyValue,
};
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use opentelemetry_sdk::{metrics::MeterProvider, Resource};
use prometheus::{Encoder, Registry, TextEncoder};
use tower::{Layer, Service};

/// Filter function type.
Expand Down Expand Up @@ -55,11 +50,8 @@ type FilterFunction = Arc<dyn Fn(&str, &str) -> bool + Send + Sync>;
/// .layer(Extension(fox_counter)); // (optional) add our own counter as an extension
/// ```
pub struct RecorderMiddlewareBuilder {
/// The prometheus exporter.
/// Eventually this should just be an
/// `opentelemetry::sdk::export::metrics::Exporter` but PrometheusExporter
/// doesn't appear to implement the Exporter trait
exporter: PrometheusExporter,
/// The prometheus registry.
registry: Registry,
/// Optional function for determining if an endpoint should be recorded or
/// not
filter_function: Option<FilterFunction>,
Expand All @@ -70,7 +62,7 @@ pub struct RecorderMiddlewareBuilder {
impl fmt::Debug for RecorderMiddlewareBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RecorderMiddlewareBuilder")
.field("exporter", &self.exporter)
.field("registry", &self.registry)
.field("meter", &self.meter)
.field("filter_function", &self.filter_function.is_some())
.finish()
Expand All @@ -81,27 +73,24 @@ impl RecorderMiddlewareBuilder {
/// Creates the builder for the optentelemetry middleware
#[must_use]
pub fn new(service_name: &str) -> Self {
let controller = controllers::basic(
processors::factory(
selectors::simple::histogram([0.1, 1.0, 5.0]),
aggregation::cumulative_temporality_selector(),
)
.with_memory(true),
)
.with_resource(Resource::new(vec![KeyValue::new("service.name", service_name.to_owned())]))
.with_collect_period(Duration::from_millis(500))
.build();

let exporter = opentelemetry_prometheus::exporter(controller).init();

let registry = Registry::new();
#[allow(clippy::expect_used)]
// If this fails then something is deeply wrong with the opentelemetry_prometheus crate
let meter = exporter
.meter_provider()
.expect("Failed to get PrometheusExporter provider")
.meter("axum-opentelemetry");

Self { exporter, meter, filter_function: None }
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.expect("Exporter should build");
let provider = MeterProvider::builder()
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name.to_owned(),
)]))
.with_reader(exporter)
.build();
let meter = provider.meter("axum-opentelemetry");

global::set_meter_provider(provider);

Self { registry, meter, filter_function: None }
}

/// Registers a function for filtering which endpoints should be logged and
Expand All @@ -119,16 +108,16 @@ impl RecorderMiddlewareBuilder {
#[must_use]
/// Builds the middleware data
pub fn build(self) -> RecorderMiddleware {
RecorderMiddleware::new(self.meter, self.exporter, self.filter_function)
RecorderMiddleware::new(self.meter, self.registry, self.filter_function)
}
}

/// The actual RecorderMiddleware, has to added to an axum Router via
/// `Router::layer(middleware)` See [RecorderMiddlewareBuilder] for more details
#[derive(Clone)]
pub struct RecorderMiddleware {
/// The prometheus exporter
exporter: PrometheusExporter,
/// The prometheus registry
registry: Registry,
/// Metric tracking the duration of each request
http_requests_duration_seconds: Histogram<f64>,
/// Metric tracking amounts of taken requests
Expand All @@ -153,11 +142,7 @@ impl fmt::Debug for RecorderMiddleware {
impl RecorderMiddleware {
/// Create the actual middleware struct
#[must_use]
fn new(
meter: Meter,
exporter: PrometheusExporter,
filter_function: Option<FilterFunction>,
) -> Self {
fn new(meter: Meter, registry: Registry, filter_function: Option<FilterFunction>) -> Self {
// ValueRecorder == prometheus histogram
let http_requests_duration_seconds =
meter.f64_histogram("http.requests.duration.seconds").init();
Expand All @@ -167,7 +152,7 @@ impl RecorderMiddleware {
meter.u64_counter("http.mismatched.requests.total").init();

Self {
exporter,
registry,
http_requests_duration_seconds,
http_requests_total,
http_unmatched_requests_total,
Expand Down Expand Up @@ -209,15 +194,13 @@ where
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let data = self.data.clone();
let method = req.method().as_str().to_owned();
req.extensions_mut().insert(data.exporter);
req.extensions_mut().insert(data.registry);
let matched_path =
req.extensions().get::<MatchedPath>().map(|path| path.as_str().to_owned());
let future = self.inner.call(req);
Box::pin(async move {
let ctx = opentelemetry::Context::current();

let Some(path) = matched_path else {
data.http_unmatched_requests_total.add(&ctx, 1, &[]);
data.http_unmatched_requests_total.add(1, &[]);
return future.await;
};

Expand All @@ -242,12 +225,8 @@ where
KeyValue::new("status", status),
];

data.http_requests_duration_seconds.record(
&ctx,
time_taken.as_seconds_f64(),
&attributes,
);
data.http_requests_total.add(&ctx, 1, &attributes);
data.http_requests_duration_seconds.record(time_taken.as_seconds_f64(), &attributes);
data.http_requests_total.add(1, &attributes);

Ok(resp)
})
Expand All @@ -258,10 +237,10 @@ where
/// usually should be on get("/metrics")
#[allow(clippy::unused_async)] // needs to be async else axum complains
pub async fn metrics_endpoint(
Extension(exporter): Extension<PrometheusExporter>,
Extension(registry): Extension<Registry>,
) -> Result<String, (StatusCode, String)> {
let encoder = TextEncoder::new();
let metric_families = exporter.registry().gather();
let metric_families = registry.gather();
let mut result = Vec::new();
encoder
.encode(&metric_families, &mut result)
Expand Down
46 changes: 30 additions & 16 deletions tests/basic_metrics_output.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
# HELP fox_counter fox.counter
# TYPE fox_counter counter
fox_counter{service_name="🦊"} 6
# HELP http_mismatched_requests_total http.mismatched.requests.total
# TYPE http_mismatched_requests_total counter
http_mismatched_requests_total{service_name="🦊"} 10
# HELP http_requests_duration_seconds http.requests.duration.seconds
# TYPE fox_counter_total counter
fox_counter_total{otel_scope_name="axum-opentelemetry"} 6
# TYPE http_mismatched_requests_total_total counter
http_mismatched_requests_total_total{otel_scope_name="axum-opentelemetry"} 10
# TYPE http_requests_duration_seconds histogram
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="FLOAT"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="1"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="5"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="+Inf"} 5
http_requests_duration_seconds_sum{endpoint="/visible_fox",method="GET",service_name="🦊",status="200"} FLOAT
http_requests_duration_seconds_count{endpoint="/visible_fox",method="GET",service_name="🦊",status="200"} 5
# HELP http_requests_total http.requests.total
# TYPE http_requests_total counter
http_requests_total{endpoint="/visible_fox",method="GET",service_name="🦊",status="200"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="0"} 0
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="5"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="10"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="25"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="50"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="75"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="100"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="250"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="500"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="750"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="1000"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="2500"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="5000"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="7500"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="10000"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="+Inf"} 5
http_requests_duration_seconds_sum{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry"} FLOAT
http_requests_duration_seconds_count{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry"} 5
# TYPE http_requests_total_total counter
http_requests_total_total{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry"} 5
# HELP otel_scope_info Instrumentation Scope metadata
# TYPE otel_scope_info gauge
otel_scope_info{otel_scope_name="axum-opentelemetry"} 1
# HELP target_info Target metadata
# TYPE target_info gauge
target_info{service_name="🦊"} 1
26 changes: 13 additions & 13 deletions tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@
clippy::unused_async
)]

use std::future::poll_fn;

use axum::{
body::{Body, HttpBody},
response::IntoResponse,
routing::get,
Extension, Router,
};
use futures::future::poll_fn;
use opentelemetry::{metrics::Counter, Context};
use opentelemetry::metrics::Counter;
use regex::Regex;
use tower::Service;

async fn visible_fox(Extension(fox_counter): Extension<Counter<u64>>) -> impl IntoResponse {
// need to keep track of foxes
let ctx = Context::current();
fox_counter.add(&ctx, 1, &[]);
fox_counter.add(1, &[]);
"Heya!"
}

async fn shy_fox(Extension(fox_counter): Extension<Counter<u64>>) -> impl IntoResponse {
// still counts tho
let ctx = Context::current();
fox_counter.add(&ctx, 1, &[]);
fox_counter.add(1, &[]);
"*hides*"
}

Expand All @@ -40,28 +39,29 @@ async fn basic_functionality() -> Result<(), Box<dyn std::error::Error>> {
let fox_counter = metrics_middleware.meter.u64_counter("fox.counter").init();
let metrics_middleware = metrics_middleware.build();

let mut service = Router::new()
let mut service: Router = Router::new()
.route("/metrics", get(axum_opentelemetry_middleware::metrics_endpoint))
.route("/visible_fox", get(visible_fox))
.route("/shy_fox", get(shy_fox))
.layer(metrics_middleware)
.layer(Extension(fox_counter));

poll_fn(|cx| service.poll_ready(cx)).await?;
poll_fn(|cx| tower::Service::poll_ready(&mut service, cx)).await?;

for _ in 0..5 {
service.call(http::Request::get("/visible_fox").body(Body::empty())?).await?;
service.call(axum::http::Request::get("/visible_fox").body(Body::empty())?).await?;
}

service.call(http::Request::get("/shy_fox").body(Body::empty())?).await?;
service.call(axum::http::Request::get("/shy_fox").body(Body::empty())?).await?;

for _ in 0..10 {
// Endpoint doesn't exist so it should increase the unmatched requests counter
service.call(http::Request::get("/unmatched_fox").body(Body::empty())?).await?;
service.call(axum::http::Request::get("/unmatched_fox").body(Body::empty())?).await?;
}

let mut resp = service.call(http::Request::get("/metrics").body(Body::empty())?).await?;
let body = String::from_utf8(resp.data().await.unwrap()?.to_vec())?;
let resp = service.call(axum::http::Request::get("/metrics").body(Body::empty())?).await?;
let body: String =
String::from_utf8(resp.collect().await.unwrap().to_bytes().to_vec()).unwrap();
let re = Regex::new(r"[0-9]\.[0-9]*")?; // remove all floats as they will vary from run to run
let body = re.replace_all(&body, "FLOAT");

Expand Down

0 comments on commit 7125cca

Please sign in to comment.