From 50288aa3fa59e4419607e6018422f15bc938c4ea Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Fri, 1 Nov 2024 00:00:00 +0000 Subject: [PATCH] feat(app): Route request frame count metrics Signed-off-by: katelyn martin --- linkerd/http/prom/src/body_data/metrics.rs | 82 ++++++++++++++++ linkerd/http/prom/src/body_data/request.rs | 106 ++++++++++++++++++++- 2 files changed, 187 insertions(+), 1 deletion(-) diff --git a/linkerd/http/prom/src/body_data/metrics.rs b/linkerd/http/prom/src/body_data/metrics.rs index 8ae1872f33..02f46b1d16 100644 --- a/linkerd/http/prom/src/body_data/metrics.rs +++ b/linkerd/http/prom/src/body_data/metrics.rs @@ -2,6 +2,15 @@ use linkerd_metrics::prom::{self, Counter, Family, Registry}; +/// Counters for request body frames. +#[derive(Clone, Debug)] +pub struct RequestBodyFamilies { + /// Counts the number of request body frames. + req_body_frames_total: Family, + /// Counts the total number of bytes in request body frames. + req_body_frames_bytes: Family, +} + /// Counters for response body frames. #[derive(Clone, Debug)] pub struct ResponseBodyFamilies { @@ -20,6 +29,79 @@ pub struct BodyDataMetrics { pub frames_bytes: Counter, } +// === impl RequestBodyFamilies === + +impl Default for RequestBodyFamilies +where + L: Clone + std::hash::Hash + Eq, +{ + fn default() -> Self { + Self { + req_body_frames_total: Default::default(), + req_body_frames_bytes: Default::default(), + } + } +} + +impl RequestBodyFamilies +where + L: prom::encoding::EncodeLabelSet + + std::fmt::Debug + + std::hash::Hash + + Eq + + Clone + + Send + + Sync + + 'static, +{ + const REQ_BODY_FRAMES_TOTAL_NAME: &'static str = "req_body_frames_total"; + const REQ_BODY_FRAMES_TOTAL_HELP: &'static str = + "Counts the number of frames in request bodies."; + + const REQ_BODY_FRAMES_BYTES_NAME: &'static str = "req_body_frames_bytes"; + const REQ_BODY_FRAMES_BYTES_HELP: &'static str = + "Counts the total number of bytes in request bodies."; + + /// Registers and returns a new family of body data metrics. + pub fn register(registry: &mut Registry) -> Self { + let req_body_frames_total = Family::default(); + registry.register( + Self::REQ_BODY_FRAMES_TOTAL_NAME, + Self::REQ_BODY_FRAMES_TOTAL_HELP, + req_body_frames_total.clone(), + ); + + let req_body_frames_bytes = Family::default(); + registry.register_with_unit( + Self::REQ_BODY_FRAMES_BYTES_NAME, + Self::REQ_BODY_FRAMES_BYTES_HELP, + prom::Unit::Bytes, + req_body_frames_bytes.clone(), + ); + + Self { + req_body_frames_total, + req_body_frames_bytes, + } + } + + /// Returns the [`BodyDataMetrics`] for the given label set. + pub fn get(&self, labels: &L) -> BodyDataMetrics { + let Self { + req_body_frames_total, + req_body_frames_bytes, + } = self; + + let frames_total = req_body_frames_total.get_or_create(labels).clone(); + let frames_bytes = req_body_frames_bytes.get_or_create(labels).clone(); + + BodyDataMetrics { + frames_total, + frames_bytes, + } + } +} + // === impl ResponseBodyFamilies === impl Default for ResponseBodyFamilies diff --git a/linkerd/http/prom/src/body_data/request.rs b/linkerd/http/prom/src/body_data/request.rs index fb270395da..7e4bbd2e1a 100644 --- a/linkerd/http/prom/src/body_data/request.rs +++ b/linkerd/http/prom/src/body_data/request.rs @@ -1 +1,105 @@ -// TODO(kate): write a middleware for request body. +//! Tower middleware to instrument request bodies. + +pub use super::metrics::{BodyDataMetrics, RequestBodyFamilies}; + +use http::{Request, Response}; +use http_body::Body; +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service}; +use std::{future::Future, pin::Pin}; + +/// A [`NewService`] that creates [`RecordBodyData`] services. +#[derive(Clone, Debug)] +pub struct NewRecordBodyData { + /// The [`ExtractParam`] strategy for obtaining our parameters. + extract: X, + /// The inner [`NewService`]. + inner: N, +} + +/// Tracks body frames for an inner `S`-typed [`Service`]. +#[derive(Clone, Debug)] +pub struct RecordBodyData { + /// The inner [`Service`]. + inner: S, + /// The metrics to be affixed to the response body. + metrics: BodyDataMetrics, +} + +// === impl NewRecordBodyData === + +impl NewRecordBodyData { + /// Returns a [`Layer`] that tracks body chunks. + /// + /// This uses an `X`-typed [`ExtractParam`] implementation to extract service parameters + /// from a `T`-typed target. + pub fn layer_via(extract: X) -> impl Layer { + svc::layer::mk(move |inner| Self { + extract: extract.clone(), + inner, + }) + } +} + +impl NewService for NewRecordBodyData +where + X: ExtractParam, + N: NewService, +{ + type Service = RecordBodyData; + + fn new_service(&self, target: T) -> Self::Service { + let Self { extract, inner } = self; + + let metrics = extract.extract_param(&target); + let inner = inner.new_service(target); + + RecordBodyData { inner, metrics } + } +} + +// === impl RecordBodyData === + +impl Service> for RecordBodyData +where + S: Service, Response = Response>, + S::Future: Send + 'static, + RespB: Body + Send + 'static, + RespB::Data: Send + 'static, + RespB::Error: Into, +{ + type Response = Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + #[inline] + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + use futures::{FutureExt, TryFutureExt}; + + let Self { inner, metrics } = self; + let metrics = metrics.clone(); + let instrument = Box::new(|resp| Self::instrument_response(resp, metrics)); + + inner.call(req).map_ok(instrument).boxed() + } +} + +impl RecordBodyData { + fn instrument_response(resp: Response, metrics: BodyDataMetrics) -> Response + where + B: Body + Send + 'static, + B::Data: Send + 'static, + B::Error: Into, + { + resp.map(|b| super::body::Body::new(b, metrics)) + .map(BoxBody::new) + } +}