Skip to content

Commit

Permalink
feat(app): Backend frame count metrics (#3308)
Browse files Browse the repository at this point in the history
* feat(app): Backend response frame count metrics

this introduces a new tower middleware for Prometheus metrics, used for
instrumenting HTTP and gRPC response bodies, and observing (a) the
number of frames yielded by a body, and (b) the number of bytes included
in body frames, and (c) the distribution of frame sizes.

this middleware allows operators to reason about how large or small the
packets being served in a backend's response bodies are.

a route-level middleware that instruments request bodies will be added
in a follow-on PR.

 ### 📝 changes

an overview of changes made here:

* the `linkerd-http-prom` has a new `body_data` submodule. it exposes
  `request` and `response` halves, to be explicit about which body is
  being instrumented on a `tower::Service`.

* the `linkerd-http-prom` crate now has a collection of new
  dependencies: `bytes` is added as a dependency in order to inspect the
  data chunk when the inner body yields a new frame. `futures-util` and
  `http-body` are added as dev-dependencies for the accompanying test
  coverage.

* body metrics are affixed to the `RouteBackendMetrics<L>` structure,
  and registered at startup.

Signed-off-by: katelyn martin <[email protected]>

* review: Inline attribute to service passthrough

Signed-off-by: katelyn martin <[email protected]>

* review: Inline attribute to body passthrough

continuing this theme of inlining, we inline the passthrough methods on
`Body` as well.

Signed-off-by: katelyn martin <[email protected]>

* review: Box `<RecordBodyData as Service>::Future` values

Signed-off-by: katelyn martin <[email protected]>

* review: rename `get()` to `metrics()`

Signed-off-by: katelyn martin <[email protected]>

* review: simplify `RecordBodyData<S>` response

Signed-off-by: katelyn martin <[email protected]>

* Update ResponseBody metrics to use a histogram

Signed-off-by: Oliver Gould <[email protected]>

* refactor(tests): feature gate frame size histogram assertions

see:

* prometheus/client_rust#242
* prometheus/client_rust#241

for now, refactor this test so that it gates all use of the (proposed)
`sum()` and `count()` accessors behind a temporary feature gate.

Signed-off-by: katelyn martin <[email protected]>

---------

Signed-off-by: katelyn martin <[email protected]>
Signed-off-by: Oliver Gould <[email protected]>
Co-authored-by: Oliver Gould <[email protected]>
  • Loading branch information
cratelyn and olix0r authored Nov 20, 2024
1 parent 5df46ac commit 049e01b
Show file tree
Hide file tree
Showing 12 changed files with 457 additions and 7 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1506,7 +1506,9 @@ dependencies = [
"ahash",
"bytes",
"futures",
"futures-util",
"http",
"http-body",
"hyper",
"linkerd-app-core",
"linkerd-app-test",
Expand Down Expand Up @@ -1744,6 +1746,7 @@ dependencies = [
name = "linkerd-http-prom"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http",
"http-body",
Expand Down
2 changes: 2 additions & 0 deletions linkerd/app/outbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" }
linkerd-tonic-watch = { path = "../../tonic-watch" }

[dev-dependencies]
futures-util = "0.3"
http-body = "0.4"
hyper = { version = "0.14", features = ["http1", "http2"] }
tokio = { version = "1", features = ["macros", "sync", "time"] }
tokio-rustls = "0.24"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{BackendRef, ParentRef, RouteRef};
use linkerd_app_core::{metrics::prom, svc};
use linkerd_http_prom::{
body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseBodyFamilies},
record_response::{self, NewResponseDuration, StreamLabel},
NewCountRequests, RequestCount, RequestCountFamilies,
};
Expand All @@ -15,6 +16,7 @@ mod tests;
pub struct RouteBackendMetrics<L: StreamLabel> {
requests: RequestCountFamilies<labels::RouteBackend>,
responses: ResponseMetrics<L>,
body_metrics: ResponseBodyFamilies<labels::RouteBackend>,
}

type ResponseMetrics<L> = record_response::ResponseMetrics<
Expand All @@ -26,14 +28,24 @@ pub fn layer<T, N>(
metrics: &RouteBackendMetrics<T::StreamLabel>,
) -> impl svc::Layer<
N,
Service = NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Service = NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
>,
> + Clone
where
T: MkStreamLabel,
N: svc::NewService<T>,
NewRecordBodyData<
ExtractRecordBodyDataParams,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
>,
>: svc::NewService<T>,
NewCountRequests<
ExtractRequestCount,
NewResponseDuration<T, ExtractRecordDurationParams<ResponseMetrics<T::StreamLabel>>, N>,
Expand All @@ -44,28 +56,37 @@ where
let RouteBackendMetrics {
requests,
responses,
body_metrics,
} = metrics.clone();

svc::layer::mk(move |inner| {
use svc::Layer;
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer(
NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer(
NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone()))
.layer(inner),
),
)
})
}

#[derive(Clone, Debug)]
pub struct ExtractRequestCount(RequestCountFamilies<labels::RouteBackend>);

#[derive(Clone, Debug)]
pub struct ExtractRecordBodyDataParams(ResponseBodyFamilies<labels::RouteBackend>);

// === impl RouteBackendMetrics ===

impl<L: StreamLabel> RouteBackendMetrics<L> {
pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator<Item = f64>) -> Self {
let requests = RequestCountFamilies::register(reg);
let responses = record_response::ResponseMetrics::register(reg, histo);
let body_metrics = ResponseBodyFamilies::register(reg);
Self {
requests,
responses,
body_metrics,
}
}

Expand All @@ -83,13 +104,22 @@ impl<L: StreamLabel> RouteBackendMetrics<L> {
pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter {
self.responses.get_statuses(l)
}

#[cfg(test)]
pub(crate) fn get_response_body_metrics(
&self,
l: &labels::RouteBackend,
) -> linkerd_http_prom::body_data::response::BodyDataMetrics {
self.body_metrics.metrics(l)
}
}

impl<L: StreamLabel> Default for RouteBackendMetrics<L> {
fn default() -> Self {
Self {
requests: Default::default(),
responses: Default::default(),
body_metrics: Default::default(),
}
}
}
Expand All @@ -99,6 +129,7 @@ impl<L: StreamLabel> Clone for RouteBackendMetrics<L> {
Self {
requests: self.requests.clone(),
responses: self.responses.clone(),
body_metrics: self.body_metrics.clone(),
}
}
}
Expand All @@ -114,3 +145,17 @@ where
.metrics(&labels::RouteBackend(t.param(), t.param(), t.param()))
}
}

// === impl ExtractRecordBodyDataParams ===

impl<T> svc::ExtractParam<BodyDataMetrics, T> for ExtractRecordBodyDataParams
where
T: svc::Param<ParentRef> + svc::Param<RouteRef> + svc::Param<BackendRef>,
{
fn extract_param(&self, t: &T) -> BodyDataMetrics {
let Self(families) = self;
let labels = labels::RouteBackend(t.param(), t.param(), t.param());

families.metrics(&labels)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use super::{
LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics,
};
use crate::http::{concrete, logical::Concrete};
use bytes::Buf;
use linkerd_app_core::{
svc::{self, http::BoxBody, Layer, NewService},
transport::{Remote, ServerAddr},
Error,
};
use linkerd_proxy_client_policy as policy;

Expand Down Expand Up @@ -114,6 +116,145 @@ async fn http_request_statuses() {
assert_eq!(mixed.get(), 1);
}

/// Tests that metrics count frames in the backend response body.
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn body_data_layer_records_frames() -> Result<(), Error> {
use http_body::Body;
use linkerd_app_core::proxy::http;
use linkerd_http_prom::body_data::response::BodyDataMetrics;
use tower::{Service, ServiceExt};

let _trace = linkerd_tracing::test::trace_init();

let metrics = super::RouteBackendMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let backend_ref = crate::BackendRef(policy::Meta::new_default("backend"));

let (mut svc, mut handle) =
mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref);
handle.allow(1);

// Create a request.
let req = {
let empty = hyper::Body::empty();
let body = BoxBody::new(empty);
http::Request::builder().method("DOOT").body(body).unwrap()
};

// Call the service once it is ready to accept a request.
tracing::info!("calling service");
svc.ready().await.expect("ready");
let call = svc.call(req);
let (req, send_resp) = handle.next_request().await.unwrap();
debug_assert_eq!(req.method().as_str(), "DOOT");

// Acquire the counters for this backend.
tracing::info!("acquiring response body metrics");
let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone());
let BodyDataMetrics {
// TODO(kate): currently, histograms do not expose their observation count or sum. so,
// we're left unable to exercise these metrics until prometheus/client_rust#242 lands.
// - https://github.com/prometheus/client_rust/pull/241
// - https://github.com/prometheus/client_rust/pull/242
#[cfg(feature = "prometheus-client-rust-242")]
frame_size,
..
} = metrics.get_response_body_metrics(&labels);

// Before we've sent a response, the counter should be zero.
#[cfg(feature = "prometheus-client-rust-242")]
{
assert_eq!(frame_size.count(), 0);
assert_eq!(frame_size.sum(), 0);
}

// Create a response whose body is backed by a channel that we can send chunks to, send it.
tracing::info!("sending response");
let mut resp_tx = {
let (tx, body) = hyper::Body::channel();
let body = BoxBody::new(body);
let resp = http::Response::builder()
.status(http::StatusCode::IM_A_TEAPOT)
.body(body)
.unwrap();
send_resp.send_response(resp);
tx
};

// Before we've sent any bytes, the counter should be zero.
#[cfg(feature = "prometheus-client-rust-242")]
{
assert_eq!(frame_size.count(), 0);
assert_eq!(frame_size.sum(), 0);
}

// On the client end, poll our call future and await the response.
tracing::info!("polling service future");
let (parts, body) = call.await?.into_parts();
debug_assert_eq!(parts.status, 418);

let mut body = Box::pin(body);

/// Returns the next chunk from a boxed body.
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Result<Vec<u8>, Error> {
use std::task::{Context, Poll};
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
let data = match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(Some(Ok(d))) => d,
_ => panic!("next chunk should be ready"),
};
let chunk = data.chunk().to_vec();
Ok(chunk)
}

{
// Send a chunk, confirm that our counters are incremented.
tracing::info!("sending first chunk");
resp_tx.send_data("hello".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out");
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.count(), 1);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.sum(), 5);
}

{
// Send another chunk, confirm that our counters are incremented once more.
tracing::info!("sending second chunk");
resp_tx.send_data(", world!".into()).await?;
let chunk = read_chunk(&mut body).await?;
debug_assert_eq!(
", world!".as_bytes(),
chunk,
"should get same value back out"
);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.count(), 2);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.sum(), 5 + 8);
}

{
// Close the body, show that the counters remain at the same values.
use std::task::{Context, Poll};
tracing::info!("closing response body");
drop(resp_tx);
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(None) => {}
_ => panic!("got unexpected poll result"),
};
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.count(), 2);
#[cfg(feature = "prometheus-client-rust-242")]
assert_eq!(frame_size.sum(), 5 + 8);
}

Ok(())
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn grpc_request_statuses_ok() {
let _trace = linkerd_tracing::test::trace_init();
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/prom/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Tower middleware for Prometheus metrics.
test-util = []

[dependencies]
bytes = "1"
futures = { version = "0.3", default-features = false }
http = "0.2"
http-body = "0.4"
Expand Down
5 changes: 5 additions & 0 deletions linkerd/http/prom/src/body_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod request;
pub mod response;

mod body;
mod metrics;
Loading

0 comments on commit 049e01b

Please sign in to comment.