Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update opentelemetry deps #17

Merged
merged 1 commit into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@ publish = ["famedly"]


[dependencies]
axum = { version = "0.6.1", features = ["headers"] }
bytes = "1.1.0"
futures = "0.3.21"
opentelemetry = { version = "0.18.0", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = "0.11.0"
prometheus = "0.13.0"
time = "0.3.9"
tokio = { version = "1.17.0", features = ["rt-multi-thread", "macros"] }
tower = "0.4.12"
axum = { version = "0.6.20", features = ["headers"] }
bytes = "1.5.0"
futures = "0.3.29"
opentelemetry = { version = "0.21.0", features = ["metrics"] }
opentelemetry_sdk = { version = "0.21.1", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = "0.14.1"
prometheus = "0.13.3"
time = "0.3.30"
tokio = { version = "1.35.0", features = ["rt-multi-thread", "macros"] }
tower = "0.4.13"

[dev-dependencies]
futures = "0.3.21"
http = "0.2.6"
http = "1.0.0"
regex = "1.5.5"
5 changes: 2 additions & 3 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use std::net::SocketAddr;

use axum::{response::IntoResponse, routing::get, Extension, Router};
use opentelemetry::{metrics::Counter, Context};
use opentelemetry::metrics::Counter;
use tokio::time::{sleep, Duration};

async fn uwu() -> impl IntoResponse {
Expand All @@ -22,10 +22,9 @@ async fn uwu() -> impl IntoResponse {
/// Foxes need some time to be counted!
/// (Will increase the fox counter every 100ms until it counts 50 foxes)
async fn fooox(Extension(fox_counter): Extension<Counter<u64>>) -> impl IntoResponse {
let ctx = Context::current();
for _ in 0..50 {
sleep(Duration::from_millis(100)).await;
fox_counter.add(&ctx, 1, &[]);
fox_counter.add(1, &[]);
}
"Counted lots of foxes!!!"
}
Expand Down
91 changes: 35 additions & 56 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{
fmt,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use axum::{
Expand All @@ -17,16 +16,12 @@ use axum::{
};
use futures::future::BoxFuture;
use opentelemetry::{
metrics::{Counter, Histogram, Meter, MeterProvider},
sdk::{
export::metrics::aggregation,
metrics::{controllers, processors, selectors},
Resource,
},
global,
metrics::{Counter, Histogram, Meter, MeterProvider as _},
KeyValue,
};
use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder};
use opentelemetry_sdk::{metrics::MeterProvider, Resource};
use prometheus::{Encoder, Registry, TextEncoder};
use tower::{Layer, Service};

/// Filter function type.
Expand Down Expand Up @@ -55,11 +50,8 @@ type FilterFunction = Arc<dyn Fn(&str, &str) -> bool + Send + Sync>;
/// .layer(Extension(fox_counter)); // (optional) add our own counter as an extension
/// ```
pub struct RecorderMiddlewareBuilder {
/// The prometheus exporter.
/// Eventually this should just be an
/// `opentelemetry::sdk::export::metrics::Exporter` but PrometheusExporter
/// doesn't appear to implement the Exporter trait
exporter: PrometheusExporter,
/// The prometheus registry.
registry: Registry,
/// Optional function for determining if an endpoint should be recorded or
/// not
filter_function: Option<FilterFunction>,
Expand All @@ -70,7 +62,7 @@ pub struct RecorderMiddlewareBuilder {
impl fmt::Debug for RecorderMiddlewareBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RecorderMiddlewareBuilder")
.field("exporter", &self.exporter)
.field("registry", &self.registry)
skomski marked this conversation as resolved.
Show resolved Hide resolved
.field("meter", &self.meter)
.field("filter_function", &self.filter_function.is_some())
.finish()
Expand All @@ -81,27 +73,24 @@ impl RecorderMiddlewareBuilder {
/// Creates the builder for the optentelemetry middleware
#[must_use]
pub fn new(service_name: &str) -> Self {
let controller = controllers::basic(
processors::factory(
selectors::simple::histogram([0.1, 1.0, 5.0]),
aggregation::cumulative_temporality_selector(),
)
.with_memory(true),
)
.with_resource(Resource::new(vec![KeyValue::new("service.name", service_name.to_owned())]))
.with_collect_period(Duration::from_millis(500))
.build();

let exporter = opentelemetry_prometheus::exporter(controller).init();

let registry = Registry::new();
#[allow(clippy::expect_used)]
// If this fails then something is deeply wrong with the opentelemetry_prometheus crate
let meter = exporter
.meter_provider()
.expect("Failed to get PrometheusExporter provider")
.meter("axum-opentelemetry");

Self { exporter, meter, filter_function: None }
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.expect("Exporter should build");
let provider = MeterProvider::builder()
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name.to_owned(),
)]))
.with_reader(exporter)
.build();
let meter = provider.meter("axum-opentelemetry");

global::set_meter_provider(provider);

Self { registry, meter, filter_function: None }
}

/// Registers a function for filtering which endpoints should be logged and
Expand All @@ -119,16 +108,16 @@ impl RecorderMiddlewareBuilder {
#[must_use]
/// Builds the middleware data
pub fn build(self) -> RecorderMiddleware {
RecorderMiddleware::new(self.meter, self.exporter, self.filter_function)
RecorderMiddleware::new(self.meter, self.registry, self.filter_function)
}
}

/// The actual RecorderMiddleware, has to added to an axum Router via
/// `Router::layer(middleware)` See [RecorderMiddlewareBuilder] for more details
#[derive(Clone)]
pub struct RecorderMiddleware {
/// The prometheus exporter
exporter: PrometheusExporter,
/// The prometheus registry
registry: Registry,
/// Metric tracking the duration of each request
http_requests_duration_seconds: Histogram<f64>,
/// Metric tracking amounts of taken requests
Expand All @@ -153,11 +142,7 @@ impl fmt::Debug for RecorderMiddleware {
impl RecorderMiddleware {
/// Create the actual middleware struct
#[must_use]
fn new(
meter: Meter,
exporter: PrometheusExporter,
filter_function: Option<FilterFunction>,
) -> Self {
fn new(meter: Meter, registry: Registry, filter_function: Option<FilterFunction>) -> Self {
// ValueRecorder == prometheus histogram
let http_requests_duration_seconds =
meter.f64_histogram("http.requests.duration.seconds").init();
Expand All @@ -167,7 +152,7 @@ impl RecorderMiddleware {
meter.u64_counter("http.mismatched.requests.total").init();

Self {
exporter,
registry,
http_requests_duration_seconds,
http_requests_total,
http_unmatched_requests_total,
Expand Down Expand Up @@ -209,15 +194,13 @@ where
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let data = self.data.clone();
let method = req.method().as_str().to_owned();
req.extensions_mut().insert(data.exporter);
req.extensions_mut().insert(data.registry);
let matched_path =
req.extensions().get::<MatchedPath>().map(|path| path.as_str().to_owned());
let future = self.inner.call(req);
Box::pin(async move {
let ctx = opentelemetry::Context::current();

let Some(path) = matched_path else {
data.http_unmatched_requests_total.add(&ctx, 1, &[]);
data.http_unmatched_requests_total.add(1, &[]);
return future.await;
};

Expand All @@ -242,12 +225,8 @@ where
KeyValue::new("status", status),
];

data.http_requests_duration_seconds.record(
&ctx,
time_taken.as_seconds_f64(),
&attributes,
);
data.http_requests_total.add(&ctx, 1, &attributes);
data.http_requests_duration_seconds.record(time_taken.as_seconds_f64(), &attributes);
data.http_requests_total.add(1, &attributes);

Ok(resp)
})
Expand All @@ -258,10 +237,10 @@ where
/// usually should be on get("/metrics")
#[allow(clippy::unused_async)] // needs to be async else axum complains
pub async fn metrics_endpoint(
Extension(exporter): Extension<PrometheusExporter>,
Extension(registry): Extension<Registry>,
) -> Result<String, (StatusCode, String)> {
let encoder = TextEncoder::new();
let metric_families = exporter.registry().gather();
let metric_families = registry.gather();
let mut result = Vec::new();
encoder
.encode(&metric_families, &mut result)
Expand Down
46 changes: 30 additions & 16 deletions tests/basic_metrics_output.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
# HELP fox_counter fox.counter
# TYPE fox_counter counter
fox_counter{service_name="🦊"} 6
# HELP http_mismatched_requests_total http.mismatched.requests.total
# TYPE http_mismatched_requests_total counter
http_mismatched_requests_total{service_name="🦊"} 10
# HELP http_requests_duration_seconds http.requests.duration.seconds
# TYPE fox_counter_total counter
fox_counter_total{otel_scope_name="axum-opentelemetry"} 6
# TYPE http_mismatched_requests_total_total counter
http_mismatched_requests_total_total{otel_scope_name="axum-opentelemetry"} 10
# TYPE http_requests_duration_seconds histogram
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="FLOAT"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="1"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="5"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",service_name="🦊",status="200",le="+Inf"} 5
http_requests_duration_seconds_sum{endpoint="/visible_fox",method="GET",service_name="🦊",status="200"} FLOAT
http_requests_duration_seconds_count{endpoint="/visible_fox",method="GET",service_name="🦊",status="200"} 5
# HELP http_requests_total http.requests.total
# TYPE http_requests_total counter
http_requests_total{endpoint="/visible_fox",method="GET",service_name="🦊",status="200"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="0"} 0
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="5"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="10"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="25"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="50"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="75"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="100"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="250"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="500"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="750"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="1000"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="2500"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="5000"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="7500"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="10000"} 5
http_requests_duration_seconds_bucket{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry",le="+Inf"} 5
http_requests_duration_seconds_sum{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry"} FLOAT
http_requests_duration_seconds_count{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry"} 5
# TYPE http_requests_total_total counter
http_requests_total_total{endpoint="/visible_fox",method="GET",status="200",otel_scope_name="axum-opentelemetry"} 5
# HELP otel_scope_info Instrumentation Scope metadata
# TYPE otel_scope_info gauge
otel_scope_info{otel_scope_name="axum-opentelemetry"} 1
# HELP target_info Target metadata
# TYPE target_info gauge
target_info{service_name="🦊"} 1
26 changes: 13 additions & 13 deletions tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@
clippy::unused_async
)]

use std::future::poll_fn;

use axum::{
body::{Body, HttpBody},
response::IntoResponse,
routing::get,
Extension, Router,
};
use futures::future::poll_fn;
use opentelemetry::{metrics::Counter, Context};
use opentelemetry::metrics::Counter;
use regex::Regex;
use tower::Service;

async fn visible_fox(Extension(fox_counter): Extension<Counter<u64>>) -> impl IntoResponse {
// need to keep track of foxes
let ctx = Context::current();
fox_counter.add(&ctx, 1, &[]);
fox_counter.add(1, &[]);
"Heya!"
}

async fn shy_fox(Extension(fox_counter): Extension<Counter<u64>>) -> impl IntoResponse {
// still counts tho
let ctx = Context::current();
fox_counter.add(&ctx, 1, &[]);
fox_counter.add(1, &[]);
"*hides*"
}

Expand All @@ -40,28 +39,29 @@ async fn basic_functionality() -> Result<(), Box<dyn std::error::Error>> {
let fox_counter = metrics_middleware.meter.u64_counter("fox.counter").init();
let metrics_middleware = metrics_middleware.build();

let mut service = Router::new()
let mut service: Router = Router::new()
.route("/metrics", get(axum_opentelemetry_middleware::metrics_endpoint))
.route("/visible_fox", get(visible_fox))
.route("/shy_fox", get(shy_fox))
.layer(metrics_middleware)
.layer(Extension(fox_counter));

poll_fn(|cx| service.poll_ready(cx)).await?;
poll_fn(|cx| tower::Service::poll_ready(&mut service, cx)).await?;

for _ in 0..5 {
service.call(http::Request::get("/visible_fox").body(Body::empty())?).await?;
service.call(axum::http::Request::get("/visible_fox").body(Body::empty())?).await?;
}

service.call(http::Request::get("/shy_fox").body(Body::empty())?).await?;
service.call(axum::http::Request::get("/shy_fox").body(Body::empty())?).await?;

for _ in 0..10 {
// Endpoint doesn't exist so it should increase the unmatched requests counter
service.call(http::Request::get("/unmatched_fox").body(Body::empty())?).await?;
service.call(axum::http::Request::get("/unmatched_fox").body(Body::empty())?).await?;
}

let mut resp = service.call(http::Request::get("/metrics").body(Body::empty())?).await?;
let body = String::from_utf8(resp.data().await.unwrap()?.to_vec())?;
let resp = service.call(axum::http::Request::get("/metrics").body(Body::empty())?).await?;
let body: String =
String::from_utf8(resp.collect().await.unwrap().to_bytes().to_vec()).unwrap();
let re = Regex::new(r"[0-9]\.[0-9]*")?; // remove all floats as they will vary from run to run
let body = re.replace_all(&body, "FLOAT");

Expand Down