Skip to content

Commit

Permalink
feat(app): Backend response frame count metrics
Browse files Browse the repository at this point in the history
this introduces a new tower middleware for Prometheus metrics, used for
instrumenting HTTP and gRPC response bodies, and observing (a) the
number of frames yielded by a body, and (b) the number of bytes included
in body frames.

this middleware allows operators to reason about how large or small the
packets being served in a backend's response bodies are.

a route-level middleware that instruments request bodies will be added
in a follow-on PR.

 ### 📝 changes

an overview of changes made here:

* the `linkerd-http-prom` has a new `body_data` submodule. it exposes
  `request` and `response` halves, to be explicit about which body is
  being instrumented on a `tower::Service`.

* the `linkerd-http-prom` crate now has a collection of new
  dependencies: `bytes` is added as a dependency in order to inspect the
  data chunk when the inner body yields a new frame. `futures-util` and
  `http-body` are added as dev-dependencies for the accompanying test
  coverage.

* body metrics are affixed to the `RouteBackendMetrics<L>` structure,
  and registered at startup.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Nov 1, 2024
1 parent c268774 commit c08a6b5
Show file tree
Hide file tree
Showing 11 changed files with 467 additions and 6 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,9 @@ dependencies = [
"ahash",
"bytes",
"futures",
"futures-util",
"http",
"http-body",
"hyper",
"linkerd-app-core",
"linkerd-app-test",
Expand Down Expand Up @@ -1554,6 +1556,7 @@ dependencies = [
name = "linkerd-http-prom"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http",
"http-body",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }

[dev-dependencies]
futures-util = "0.3"
http-body = "0.4"
hyper = { version = "0.14", features = ["http1", "http2"] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-rustls = "0.24"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{BackendRef, ParentRef, RouteRef};
use linkerd_app_core::{metrics::prom, svc};
use linkerd_http_prom::{
body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseBodyFamilies},
record_response::{self, NewResponseDuration, StreamLabel},
NewCountRequests, RequestCount, RequestCountFamilies,
};
Expand All @@ -15,6 +16,7 @@ mod tests;
pub struct RouteBackendMetrics<L: StreamLabel> {
requests: RequestCountFamilies<labels::RouteBackend>,
responses: ResponseMetrics<L>,
body_metrics: ResponseBodyFamilies<labels::RouteBackend>,
}

type ResponseMetrics<L> = record_response::ResponseMetrics<
Expand All @@ -26,14 +28,24 @@ pub fn layer<T, N>(
metrics: &RouteBackendMetrics<T::StreamLabel>,
) -> impl svc::Layer<
N,
Service = NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Service = NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
>,
> + Clone
where
T: MkStreamLabel,
N: svc::NewService<T>,
NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
>: svc::NewService<T>,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Expand All @@ -44,28 +56,37 @@ where
let RouteBackendMetrics {
requests,
responses,
body_metrics,
} = metrics.clone();

svc::layer::mk(move |inner| {
use svc::Layer;
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer(
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
),
)
})
}

#[derive(Clone, Debug)]
pub struct ExtractRequestCount(RequestCountFamilies<labels::RouteBackend>);

#[derive(Clone, Debug)]
pub struct ExtractRecordBodyDataParams(ResponseBodyFamilies<labels::RouteBackend>);

// === impl RouteBackendMetrics ===

impl<L: StreamLabel> RouteBackendMetrics<L> {
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
let requests = RequestCountFamilies::register(reg);
let responses = record_response::ResponseMetrics::register(reg, histo);
let body_metrics = ResponseBodyFamilies::register(reg);
Self {
requests,
responses,
body_metrics,
}
}

Expand All @@ -83,13 +104,22 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
self.responses.get_statuses(l)
}

#[cfg(test)]
pub(crate) fn get_response_body_metrics(
&self,
l: &labels::RouteBackend,
) -> linkerd_http_prom::body_data::response::BodyDataMetrics {
self.body_metrics.get(l)
}
}

impl<L: StreamLabel> Default for RouteBackendMetrics<L> {
fn default() -> Self {
Self {
requests: Default::default(),
responses: Default::default(),
body_metrics: Default::default(),
}
}
}
Expand All @@ -99,6 +129,7 @@ impl<L: StreamLabel> Clone for RouteBackendMetrics<L> {
Self {
requests: self.requests.clone(),
responses: self.responses.clone(),
body_metrics: self.body_metrics.clone(),
}
}
}
Expand All @@ -114,3 +145,17 @@ where
.metrics(&labels::RouteBackend(t.param(), t.param(), t.param()))
}
}

// === impl ExtractRecordBodyDataParams ===

impl<T> svc::ExtractParam<BodyDataMetrics, T> for ExtractRecordBodyDataParams
where
T: svc::Param<ParentRef> + svc::Param<RouteRef> + svc::Param<BackendRef>,
{
fn extract_param(&self, t: &T) -> BodyDataMetrics {
let Self(families) = self;
let labels = labels::RouteBackend(t.param(), t.param(), t.param());

families.get(&labels)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use super::{
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
};
use crate::http::{concrete, logical::Concrete};
use bytes::Buf;
use linkerd_app_core::{
svc::{self, http::BoxBody, Layer, NewService},
transport::{Remote, ServerAddr},
Error,
};
use linkerd_proxy_client_policy as policy;

Expand Down Expand Up @@ -116,6 +118,128 @@ async fn http_request_statuses() {
assert_eq!(mixed.get(), 1);
}

/// Tests that metrics count frames in the backend response body.
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn body_data_layer_records_frames() -> Result<(), Error> {
use http_body::Body;
use linkerd_app_core::proxy::http;
use linkerd_http_prom::body_data::response::BodyDataMetrics;
use tower::{Service, ServiceExt};

let _trace = linkerd_tracing::test::trace_init();

let metrics = super::RouteBackendMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let backend_ref = crate::BackendRef(policy::Meta::new_default("backend"));

let (mut svc, mut handle) =
mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref);
handle.allow(1);

// Create a request.
let req = {
let empty = hyper::Body::empty();
let body = BoxBody::new(empty);
http::Request::builder().method("DOOT").body(body).unwrap()
};

// Call the service once it is ready to accept a request.
tracing::info!("calling service");
svc.ready().await.expect("ready");
let call = svc.call(req);
let (req, send_resp) = handle.next_request().await.unwrap();
debug_assert_eq!(req.method().as_str(), "DOOT");

// Acquire the counters for this backend.
tracing::info!("acquiring response body metrics");
let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone());
let BodyDataMetrics {
frames_total,
frames_bytes,
} = metrics.get_response_body_metrics(&labels);

// Before we've sent a response, the counter should be zero.
assert_eq!(frames_total.get(), 0);
assert_eq!(frames_bytes.get(), 0);

// Create a response whose body is backed by a channel that we can send chunks to, send it.
tracing::info!("sending response");
let mut resp_tx = {
let (tx, body) = hyper::Body::channel();
let body = BoxBody::new(body);
let resp = http::Response::builder()
.status(http::StatusCode::IM_A_TEAPOT)
.body(body)
.unwrap();
send_resp.send_response(resp);
tx
};

// Before we've sent any bytes, the counter should be zero.
assert_eq!(frames_total.get(), 0);
assert_eq!(frames_bytes.get(), 0);

// On the client end, poll our call future and await the response.
tracing::info!("polling service future");
let (parts, body) = call.await?.into_parts();
debug_assert_eq!(parts.status, 418);

let mut body = Box::pin(body);

/// Returns the next chunk from a boxed body.
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Result<Vec<u8>, Error> {
use std::task::{Context, Poll};
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
let data = match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(Some(Ok(d))) => d,
_ => panic!("next chunk should be ready"),
};
let chunk = data.chunk().to_vec();
Ok(chunk)
}

{
// Send a chunk, confirm that our counters are incremented.
tracing::info!("sending first chunk");
resp_tx.send_data("hello".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out");
assert_eq!(frames_total.get(), 1);
assert_eq!(frames_bytes.get(), 5);
}

{
// Send another chunk, confirm that our counters are incremented once more.
tracing::info!("sending second chunk");
resp_tx.send_data(", world!".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!(
", world!".as_bytes(),
chunk,
"should get same value back out"
);
assert_eq!(frames_total.get(), 2);
assert_eq!(frames_bytes.get(), 5 + 8);
}

{
// Close the body, show that the counters remain at the same values.
use std::task::{Context, Poll};
tracing::info!("closing response body");
drop(resp_tx);
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(None) => {}
_ => panic!("got unexpected poll result"),
};
assert_eq!(frames_total.get(), 2);
assert_eq!(frames_bytes.get(), 5 + 8);
}

Ok(())
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_request_statuses_ok() {
let _trace = linkerd_tracing::test::trace_init();
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/prom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Tower middleware for Prometheus metrics.
test-util = []

[dependencies]
bytes = "1"
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
Expand Down
5 changes: 5 additions & 0 deletions linkerd/http/prom/src/body_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod request;
pub mod response;

mod body;
mod metrics;
80 changes: 80 additions & 0 deletions linkerd/http/prom/src/body_data/body.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use super::metrics::BodyDataMetrics;
use http::HeaderMap;
use http_body::SizeHint;
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
};

/// An instrumented body.
#[pin_project]
pub struct Body<B> {
/// The inner body.
#[pin]
inner: B,
/// Metrics with which the inner body will be instrumented.
metrics: BodyDataMetrics,
}

impl<B> Body<B> {
/// Returns a new, instrumented body.
pub(crate) fn new(body: B, metrics: BodyDataMetrics) -> Self {
Self {
inner: body,
metrics,
}
}
}

impl<B> http_body::Body for Body<B>
where
B: http_body::Body,
{
type Data = B::Data;
type Error = B::Error;

/// Attempt to pull out the next data buffer of this stream.
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.project();
let inner = this.inner;
let BodyDataMetrics {
frames_total,
frames_bytes,
} = this.metrics;

let data = std::task::ready!(inner.poll_data(cx));

if let Some(Ok(data)) = data.as_ref() {
// We've polled and yielded a new chunk! Increment our telemetry.
//
// NB: We're careful to call `remaining()` rather than `chunk()`, which
// "can return a shorter slice (this allows non-continuous internal representation)."
let bytes = <B::Data as bytes::Buf>::remaining(data)
.try_into()
.unwrap_or(u64::MAX);
frames_bytes.inc_by(bytes);
frames_total.inc();
}

Poll::Ready(data)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
self.project().inner.poll_trailers(cx)
}

fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}
Loading

0 comments on commit c08a6b5

Please sign in to comment.