Skip to content

Commit

Permalink
feat(app): Route request frame count metrics
Browse files Browse the repository at this point in the history
Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Nov 7, 2024
1 parent 2d8d0f2 commit 50288aa
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 1 deletion.
82 changes: 82 additions & 0 deletions linkerd/http/prom/src/body_data/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
use linkerd_metrics::prom::{self, Counter, Family, Registry};

/// Counters for request body frames.
#[derive(Clone, Debug)]
pub struct RequestBodyFamilies<L> {
/// Counts the number of request body frames.
req_body_frames_total: Family<L, Counter>,
/// Counts the total number of bytes in request body frames.
req_body_frames_bytes: Family<L, Counter>,
}

/// Counters for response body frames.
#[derive(Clone, Debug)]
pub struct ResponseBodyFamilies<L> {
Expand All @@ -20,6 +29,79 @@ pub struct BodyDataMetrics {
pub frames_bytes: Counter,
}

// === impl RequestBodyFamilies ===

impl<L> Default for RequestBodyFamilies<L>
where
L: Clone + std::hash::Hash + Eq,
{
fn default() -> Self {
Self {
req_body_frames_total: Default::default(),
req_body_frames_bytes: Default::default(),
}
}
}

impl<L> RequestBodyFamilies<L>
where
L: prom::encoding::EncodeLabelSet
+ std::fmt::Debug
+ std::hash::Hash
+ Eq
+ Clone
+ Send
+ Sync
+ 'static,
{
const REQ_BODY_FRAMES_TOTAL_NAME: &'static str = "req_body_frames_total";
const REQ_BODY_FRAMES_TOTAL_HELP: &'static str =
"Counts the number of frames in request bodies.";

const REQ_BODY_FRAMES_BYTES_NAME: &'static str = "req_body_frames_bytes";
const REQ_BODY_FRAMES_BYTES_HELP: &'static str =
"Counts the total number of bytes in request bodies.";

/// Registers and returns a new family of body data metrics.
pub fn register(registry: &mut Registry) -> Self {
let req_body_frames_total = Family::default();
registry.register(
Self::REQ_BODY_FRAMES_TOTAL_NAME,
Self::REQ_BODY_FRAMES_TOTAL_HELP,
req_body_frames_total.clone(),
);

let req_body_frames_bytes = Family::default();
registry.register_with_unit(
Self::REQ_BODY_FRAMES_BYTES_NAME,
Self::REQ_BODY_FRAMES_BYTES_HELP,
prom::Unit::Bytes,
req_body_frames_bytes.clone(),
);

Self {
req_body_frames_total,
req_body_frames_bytes,
}
}

/// Returns the [`BodyDataMetrics`] for the given label set.
pub fn get(&self, labels: &L) -> BodyDataMetrics {
let Self {
req_body_frames_total,
req_body_frames_bytes,
} = self;

let frames_total = req_body_frames_total.get_or_create(labels).clone();
let frames_bytes = req_body_frames_bytes.get_or_create(labels).clone();

BodyDataMetrics {
frames_total,
frames_bytes,
}
}
}

// === impl ResponseBodyFamilies ===

impl<L> Default for ResponseBodyFamilies<L>
Expand Down
106 changes: 105 additions & 1 deletion linkerd/http/prom/src/body_data/request.rs
Original file line number Diff line number Diff line change
@@ -1 +1,105 @@
// TODO(kate): write a middleware for request body.
//! Tower middleware to instrument request bodies.
pub use super::metrics::{BodyDataMetrics, RequestBodyFamilies};

use http::{Request, Response};
use http_body::Body;
use linkerd_error::Error;
use linkerd_http_box::BoxBody;
use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service};
use std::{future::Future, pin::Pin};

/// A [`NewService<T>`] that creates [`RecordBodyData`] services.
#[derive(Clone, Debug)]
pub struct NewRecordBodyData<X, N> {
/// The [`ExtractParam<P, T>`] strategy for obtaining our parameters.
extract: X,
/// The inner [`NewService<T>`].
inner: N,
}

/// Tracks body frames for an inner `S`-typed [`Service`].
#[derive(Clone, Debug)]
pub struct RecordBodyData<S> {
/// The inner [`Service<T>`].
inner: S,
/// The metrics to be affixed to the response body.
metrics: BodyDataMetrics,
}

// === impl NewRecordBodyData ===

impl<X: Clone, N> NewRecordBodyData<X, N> {
/// Returns a [`Layer<S>`] that tracks body chunks.
///
/// This uses an `X`-typed [`ExtractParam<P, T>`] implementation to extract service parameters
/// from a `T`-typed target.
pub fn layer_via(extract: X) -> impl Layer<N, Service = Self> {
svc::layer::mk(move |inner| Self {
extract: extract.clone(),
inner,
})
}
}

impl<T, X, N> NewService<T> for NewRecordBodyData<X, N>
where
X: ExtractParam<BodyDataMetrics, T>,
N: NewService<T>,
{
type Service = RecordBodyData<N::Service>;

fn new_service(&self, target: T) -> Self::Service {
let Self { extract, inner } = self;

let metrics = extract.extract_param(&target);
let inner = inner.new_service(target);

RecordBodyData { inner, metrics }
}
}

// === impl RecordBodyData ===

impl<ReqB, RespB, S> Service<Request<ReqB>> for RecordBodyData<S>
where
S: Service<Request<ReqB>, Response = Response<RespB>>,
S::Future: Send + 'static,
RespB: Body + Send + 'static,
RespB::Data: Send + 'static,
RespB::Error: Into<Error>,
{
type Response = Response<BoxBody>;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

#[inline]
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<ReqB>) -> Self::Future {
use futures::{FutureExt, TryFutureExt};

let Self { inner, metrics } = self;
let metrics = metrics.clone();
let instrument = Box::new(|resp| Self::instrument_response(resp, metrics));

inner.call(req).map_ok(instrument).boxed()
}
}

impl<S> RecordBodyData<S> {
fn instrument_response<B>(resp: Response<B>, metrics: BodyDataMetrics) -> Response<BoxBody>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
{
resp.map(|b| super::body::Body::new(b, metrics))
.map(BoxBody::new)
}
}

0 comments on commit 50288aa

Please sign in to comment.