From 049e01bfdb02ca30b4d06704b31071991158477c Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Wed, 20 Nov 2024 15:45:27 -0500 Subject: [PATCH] feat(app): Backend frame count metrics (#3308) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(app): Backend response frame count metrics this introduces a new tower middleware for Prometheus metrics, used for instrumenting HTTP and gRPC response bodies, and observing (a) the number of frames yielded by a body, and (b) the number of bytes included in body frames, and (c) the distribution of frame sizes. this middleware allows operators to reason about how large or small the packets being served in a backend's response bodies are. a route-level middleware that instruments request bodies will be added in a follow-on PR. ### 📝 changes an overview of changes made here: * the `linkerd-http-prom` has a new `body_data` submodule. it exposes `request` and `response` halves, to be explicit about which body is being instrumented on a `tower::Service`. * the `linkerd-http-prom` crate now has a collection of new dependencies: `bytes` is added as a dependency in order to inspect the data chunk when the inner body yields a new frame. `futures-util` and `http-body` are added as dev-dependencies for the accompanying test coverage. * body metrics are affixed to the `RouteBackendMetrics` structure, and registered at startup. Signed-off-by: katelyn martin * review: Inline attribute to service passthrough Signed-off-by: katelyn martin * review: Inline attribute to body passthrough continuing this theme of inlining, we inline the passthrough methods on `Body` as well. Signed-off-by: katelyn martin * review: Box `::Future` values Signed-off-by: katelyn martin * review: rename `get()` to `metrics()` Signed-off-by: katelyn martin * review: simplify `RecordBodyData` response Signed-off-by: katelyn martin * Update ResponseBody metrics to use a histogram Signed-off-by: Oliver Gould * refactor(tests): feature gate frame size histogram assertions see: * https://github.com/prometheus/client_rust/pull/242 * https://github.com/prometheus/client_rust/pull/241 for now, refactor this test so that it gates all use of the (proposed) `sum()` and `count()` accessors behind a temporary feature gate. Signed-off-by: katelyn martin --------- Signed-off-by: katelyn martin Signed-off-by: Oliver Gould Co-authored-by: Oliver Gould --- Cargo.lock | 3 + linkerd/app/outbound/Cargo.toml | 2 + .../logical/policy/route/backend/metrics.rs | 57 ++++++- .../policy/route/backend/metrics/tests.rs | 141 ++++++++++++++++++ linkerd/http/prom/Cargo.toml | 1 + linkerd/http/prom/src/body_data.rs | 5 + linkerd/http/prom/src/body_data/body.rs | 77 ++++++++++ linkerd/http/prom/src/body_data/metrics.rs | 75 ++++++++++ linkerd/http/prom/src/body_data/request.rs | 1 + linkerd/http/prom/src/body_data/response.rs | 94 ++++++++++++ linkerd/http/prom/src/lib.rs | 1 + linkerd/metrics/src/lib.rs | 7 +- 12 files changed, 457 insertions(+), 7 deletions(-) create mode 100644 linkerd/http/prom/src/body_data.rs create mode 100644 linkerd/http/prom/src/body_data/body.rs create mode 100644 linkerd/http/prom/src/body_data/metrics.rs create mode 100644 linkerd/http/prom/src/body_data/request.rs create mode 100644 linkerd/http/prom/src/body_data/response.rs diff --git a/Cargo.lock b/Cargo.lock index cf9ceec32e..c6d20f12c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1506,7 +1506,9 @@ dependencies = [ "ahash", "bytes", "futures", + "futures-util", "http", + "http-body", "hyper", "linkerd-app-core", "linkerd-app-test", @@ -1744,6 +1746,7 @@ dependencies = [ name = "linkerd-http-prom" version = "0.1.0" dependencies = [ + "bytes", "futures", "http", "http-body", diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index e932718509..a61d6d988e 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -50,6 +50,8 @@ linkerd-tonic-stream = { path = "../../tonic-stream" } linkerd-tonic-watch = { path = "../../tonic-watch" } [dev-dependencies] +futures-util = "0.3" +http-body = "0.4" hyper = { version = "0.14", features = ["http1", "http2"] } tokio = { version = "1", features = ["macros", "sync", "time"] } tokio-rustls = "0.24" diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs index 3e2684bed1..ed7f0cded0 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics.rs @@ -1,6 +1,7 @@ use crate::{BackendRef, ParentRef, RouteRef}; use linkerd_app_core::{metrics::prom, svc}; use linkerd_http_prom::{ + body_data::response::{BodyDataMetrics, NewRecordBodyData, ResponseBodyFamilies}, record_response::{self, NewResponseDuration, StreamLabel}, NewCountRequests, RequestCount, RequestCountFamilies, }; @@ -15,6 +16,7 @@ mod tests; pub struct RouteBackendMetrics { requests: RequestCountFamilies, responses: ResponseMetrics, + body_metrics: ResponseBodyFamilies, } type ResponseMetrics = record_response::ResponseMetrics< @@ -26,14 +28,24 @@ pub fn layer( metrics: &RouteBackendMetrics, ) -> impl svc::Layer< N, - Service = NewCountRequests< - ExtractRequestCount, - NewResponseDuration>, N>, + Service = NewRecordBodyData< + ExtractRecordBodyDataParams, + NewCountRequests< + ExtractRequestCount, + NewResponseDuration>, N>, + >, >, > + Clone where T: MkStreamLabel, N: svc::NewService, + NewRecordBodyData< + ExtractRecordBodyDataParams, + NewCountRequests< + ExtractRequestCount, + NewResponseDuration>, N>, + >, + >: svc::NewService, NewCountRequests< ExtractRequestCount, NewResponseDuration>, N>, @@ -44,12 +56,16 @@ where let RouteBackendMetrics { requests, responses, + body_metrics, } = metrics.clone(); + svc::layer::mk(move |inner| { use svc::Layer; - NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer( - NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone())) - .layer(inner), + NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_metrics.clone())).layer( + NewCountRequests::layer_via(ExtractRequestCount(requests.clone())).layer( + NewRecordDuration::layer_via(ExtractRecordDurationParams(responses.clone())) + .layer(inner), + ), ) }) } @@ -57,15 +73,20 @@ where #[derive(Clone, Debug)] pub struct ExtractRequestCount(RequestCountFamilies); +#[derive(Clone, Debug)] +pub struct ExtractRecordBodyDataParams(ResponseBodyFamilies); + // === impl RouteBackendMetrics === impl RouteBackendMetrics { pub fn register(reg: &mut prom::Registry, histo: impl IntoIterator) -> Self { let requests = RequestCountFamilies::register(reg); let responses = record_response::ResponseMetrics::register(reg, histo); + let body_metrics = ResponseBodyFamilies::register(reg); Self { requests, responses, + body_metrics, } } @@ -83,6 +104,14 @@ impl RouteBackendMetrics { pub(crate) fn get_statuses(&self, l: &L::StatusLabels) -> prom::Counter { self.responses.get_statuses(l) } + + #[cfg(test)] + pub(crate) fn get_response_body_metrics( + &self, + l: &labels::RouteBackend, + ) -> linkerd_http_prom::body_data::response::BodyDataMetrics { + self.body_metrics.metrics(l) + } } impl Default for RouteBackendMetrics { @@ -90,6 +119,7 @@ impl Default for RouteBackendMetrics { Self { requests: Default::default(), responses: Default::default(), + body_metrics: Default::default(), } } } @@ -99,6 +129,7 @@ impl Clone for RouteBackendMetrics { Self { requests: self.requests.clone(), responses: self.responses.clone(), + body_metrics: self.body_metrics.clone(), } } } @@ -114,3 +145,17 @@ where .metrics(&labels::RouteBackend(t.param(), t.param(), t.param())) } } + +// === impl ExtractRecordBodyDataParams === + +impl svc::ExtractParam for ExtractRecordBodyDataParams +where + T: svc::Param + svc::Param + svc::Param, +{ + fn extract_param(&self, t: &T) -> BodyDataMetrics { + let Self(families) = self; + let labels = labels::RouteBackend(t.param(), t.param(), t.param()); + + families.metrics(&labels) + } +} diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs index adeb4bfd9f..5883bc73e1 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend/metrics/tests.rs @@ -5,9 +5,11 @@ use super::{ LabelGrpcRouteBackendRsp, LabelHttpRouteBackendRsp, RouteBackendMetrics, }; use crate::http::{concrete, logical::Concrete}; +use bytes::Buf; use linkerd_app_core::{ svc::{self, http::BoxBody, Layer, NewService}, transport::{Remote, ServerAddr}, + Error, }; use linkerd_proxy_client_policy as policy; @@ -114,6 +116,145 @@ async fn http_request_statuses() { assert_eq!(mixed.get(), 1); } +/// Tests that metrics count frames in the backend response body. +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn body_data_layer_records_frames() -> Result<(), Error> { + use http_body::Body; + use linkerd_app_core::proxy::http; + use linkerd_http_prom::body_data::response::BodyDataMetrics; + use tower::{Service, ServiceExt}; + + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::RouteBackendMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let backend_ref = crate::BackendRef(policy::Meta::new_default("backend")); + + let (mut svc, mut handle) = + mock_http_route_backend_metrics(&metrics, &parent_ref, &route_ref, &backend_ref); + handle.allow(1); + + // Create a request. + let req = { + let empty = hyper::Body::empty(); + let body = BoxBody::new(empty); + http::Request::builder().method("DOOT").body(body).unwrap() + }; + + // Call the service once it is ready to accept a request. + tracing::info!("calling service"); + svc.ready().await.expect("ready"); + let call = svc.call(req); + let (req, send_resp) = handle.next_request().await.unwrap(); + debug_assert_eq!(req.method().as_str(), "DOOT"); + + // Acquire the counters for this backend. + tracing::info!("acquiring response body metrics"); + let labels = labels::RouteBackend(parent_ref.clone(), route_ref.clone(), backend_ref.clone()); + let BodyDataMetrics { + // TODO(kate): currently, histograms do not expose their observation count or sum. so, + // we're left unable to exercise these metrics until prometheus/client_rust#242 lands. + // - https://github.com/prometheus/client_rust/pull/241 + // - https://github.com/prometheus/client_rust/pull/242 + #[cfg(feature = "prometheus-client-rust-242")] + frame_size, + .. + } = metrics.get_response_body_metrics(&labels); + + // Before we've sent a response, the counter should be zero. + #[cfg(feature = "prometheus-client-rust-242")] + { + assert_eq!(frame_size.count(), 0); + assert_eq!(frame_size.sum(), 0); + } + + // Create a response whose body is backed by a channel that we can send chunks to, send it. + tracing::info!("sending response"); + let mut resp_tx = { + let (tx, body) = hyper::Body::channel(); + let body = BoxBody::new(body); + let resp = http::Response::builder() + .status(http::StatusCode::IM_A_TEAPOT) + .body(body) + .unwrap(); + send_resp.send_response(resp); + tx + }; + + // Before we've sent any bytes, the counter should be zero. + #[cfg(feature = "prometheus-client-rust-242")] + { + assert_eq!(frame_size.count(), 0); + assert_eq!(frame_size.sum(), 0); + } + + // On the client end, poll our call future and await the response. + tracing::info!("polling service future"); + let (parts, body) = call.await?.into_parts(); + debug_assert_eq!(parts.status, 418); + + let mut body = Box::pin(body); + + /// Returns the next chunk from a boxed body. + async fn read_chunk(body: &mut std::pin::Pin>) -> Result, Error> { + use std::task::{Context, Poll}; + let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref()); + let data = match body.as_mut().poll_data(&mut ctx) { + Poll::Ready(Some(Ok(d))) => d, + _ => panic!("next chunk should be ready"), + }; + let chunk = data.chunk().to_vec(); + Ok(chunk) + } + + { + // Send a chunk, confirm that our counters are incremented. + tracing::info!("sending first chunk"); + resp_tx.send_data("hello".into()).await?; + let chunk = read_chunk(&mut body).await?; + debug_assert_eq!("hello".as_bytes(), chunk, "should get same value back out"); + #[cfg(feature = "prometheus-client-rust-242")] + assert_eq!(frame_size.count(), 1); + #[cfg(feature = "prometheus-client-rust-242")] + assert_eq!(frame_size.sum(), 5); + } + + { + // Send another chunk, confirm that our counters are incremented once more. + tracing::info!("sending second chunk"); + resp_tx.send_data(", world!".into()).await?; + let chunk = read_chunk(&mut body).await?; + debug_assert_eq!( + ", world!".as_bytes(), + chunk, + "should get same value back out" + ); + #[cfg(feature = "prometheus-client-rust-242")] + assert_eq!(frame_size.count(), 2); + #[cfg(feature = "prometheus-client-rust-242")] + assert_eq!(frame_size.sum(), 5 + 8); + } + + { + // Close the body, show that the counters remain at the same values. + use std::task::{Context, Poll}; + tracing::info!("closing response body"); + drop(resp_tx); + let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref()); + match body.as_mut().poll_data(&mut ctx) { + Poll::Ready(None) => {} + _ => panic!("got unexpected poll result"), + }; + #[cfg(feature = "prometheus-client-rust-242")] + assert_eq!(frame_size.count(), 2); + #[cfg(feature = "prometheus-client-rust-242")] + assert_eq!(frame_size.sum(), 5 + 8); + } + + Ok(()) +} + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn grpc_request_statuses_ok() { let _trace = linkerd_tracing::test::trace_init(); diff --git a/linkerd/http/prom/Cargo.toml b/linkerd/http/prom/Cargo.toml index 53c5a5496e..6f7ed8110d 100644 --- a/linkerd/http/prom/Cargo.toml +++ b/linkerd/http/prom/Cargo.toml @@ -13,6 +13,7 @@ Tower middleware for Prometheus metrics. test-util = [] [dependencies] +bytes = "1" futures = { version = "0.3", default-features = false } http = "0.2" http-body = "0.4" diff --git a/linkerd/http/prom/src/body_data.rs b/linkerd/http/prom/src/body_data.rs new file mode 100644 index 0000000000..237e811e36 --- /dev/null +++ b/linkerd/http/prom/src/body_data.rs @@ -0,0 +1,5 @@ +pub mod request; +pub mod response; + +mod body; +mod metrics; diff --git a/linkerd/http/prom/src/body_data/body.rs b/linkerd/http/prom/src/body_data/body.rs new file mode 100644 index 0000000000..d14a895610 --- /dev/null +++ b/linkerd/http/prom/src/body_data/body.rs @@ -0,0 +1,77 @@ +use super::metrics::BodyDataMetrics; +use http::HeaderMap; +use http_body::SizeHint; +use pin_project::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +/// An instrumented body. +#[pin_project] +pub struct Body { + /// The inner body. + #[pin] + inner: B, + /// Metrics with which the inner body will be instrumented. + metrics: BodyDataMetrics, +} + +impl Body { + /// Returns a new, instrumented body. + pub(crate) fn new(body: B, metrics: BodyDataMetrics) -> Self { + Self { + inner: body, + metrics, + } + } +} + +impl http_body::Body for Body +where + B: http_body::Body, +{ + type Data = B::Data; + type Error = B::Error; + + /// Attempt to pull out the next data buffer of this stream. + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.project(); + let inner = this.inner; + let BodyDataMetrics { frame_size } = this.metrics; + + let data = std::task::ready!(inner.poll_data(cx)); + + if let Some(Ok(data)) = data.as_ref() { + // We've polled and yielded a new chunk! Increment our telemetry. + // + // NB: We're careful to call `remaining()` rather than `chunk()`, which + // "can return a shorter slice (this allows non-continuous internal representation)." + let bytes = bytes::Buf::remaining(data); + frame_size.observe(linkerd_metrics::to_f64(bytes as u64)); + } + + Poll::Ready(data) + } + + #[inline] + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + self.project().inner.poll_trailers(cx) + } + + #[inline] + fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } + + #[inline] + fn size_hint(&self) -> SizeHint { + self.inner.size_hint() + } +} diff --git a/linkerd/http/prom/src/body_data/metrics.rs b/linkerd/http/prom/src/body_data/metrics.rs new file mode 100644 index 0000000000..3c74fc819c --- /dev/null +++ b/linkerd/http/prom/src/body_data/metrics.rs @@ -0,0 +1,75 @@ +//! Prometheus counters for request and response bodies. + +use linkerd_metrics::prom::{ + self, metrics::family::MetricConstructor, Family, Histogram, Registry, Unit, +}; + +/// Counters for response body frames. +#[derive(Clone, Debug)] +pub struct ResponseBodyFamilies { + /// Counts the number of response body frames by size. + frame_sizes: Family, +} + +/// Counters to instrument a request or response body. +#[derive(Clone, Debug)] +pub struct BodyDataMetrics { + /// Counts the number of request body frames. + pub frame_size: Histogram, +} + +#[derive(Clone, Copy)] +struct NewHisto; + +impl MetricConstructor for NewHisto { + fn new_metric(&self) -> Histogram { + Histogram::new([128.0, 1024.0, 10240.0].into_iter()) + } +} + +// === impl ResponseBodyFamilies === + +impl Default for ResponseBodyFamilies +where + L: Clone + std::hash::Hash + Eq, +{ + fn default() -> Self { + Self { + frame_sizes: Family::new_with_constructor(NewHisto), + } + } +} + +impl ResponseBodyFamilies +where + L: prom::encoding::EncodeLabelSet + + std::fmt::Debug + + std::hash::Hash + + Eq + + Clone + + Send + + Sync + + 'static, +{ + /// Registers and returns a new family of body data metrics. + pub fn register(registry: &mut Registry) -> Self { + let frame_sizes = Family::new_with_constructor(NewHisto); + registry.register_with_unit( + "response_frame_size", + "Response data frame sizes", + Unit::Bytes, + frame_sizes.clone(), + ); + + Self { frame_sizes } + } + + /// Returns the [`BodyDataMetrics`] for the given label set. + pub fn metrics(&self, labels: &L) -> BodyDataMetrics { + let Self { frame_sizes } = self; + + let frame_size = frame_sizes.get_or_create(labels).clone(); + + BodyDataMetrics { frame_size } + } +} diff --git a/linkerd/http/prom/src/body_data/request.rs b/linkerd/http/prom/src/body_data/request.rs new file mode 100644 index 0000000000..fb270395da --- /dev/null +++ b/linkerd/http/prom/src/body_data/request.rs @@ -0,0 +1 @@ +// TODO(kate): write a middleware for request body. diff --git a/linkerd/http/prom/src/body_data/response.rs b/linkerd/http/prom/src/body_data/response.rs new file mode 100644 index 0000000000..6463f3ba44 --- /dev/null +++ b/linkerd/http/prom/src/body_data/response.rs @@ -0,0 +1,94 @@ +//! Tower middleware to instrument response bodies. + +pub use super::metrics::{BodyDataMetrics, ResponseBodyFamilies}; + +use http::{Request, Response}; +use http_body::Body; +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service}; +use std::{future::Future, pin::Pin}; + +/// A [`NewService`] that creates [`RecordBodyData`] services. +#[derive(Clone, Debug)] +pub struct NewRecordBodyData { + /// The [`ExtractParam`] strategy for obtaining our parameters. + extract: X, + /// The inner [`NewService`]. + inner: N, +} + +/// Tracks body frames for an inner `S`-typed [`Service`]. +#[derive(Clone, Debug)] +pub struct RecordBodyData { + /// The inner [`Service`]. + inner: S, + /// The metrics to be affixed to the response body. + metrics: BodyDataMetrics, +} + +// === impl NewRecordBodyData === + +impl NewRecordBodyData { + /// Returns a [`Layer`] that tracks body chunks. + /// + /// This uses an `X`-typed [`ExtractParam`] implementation to extract service parameters + /// from a `T`-typed target. + pub fn layer_via(extract: X) -> impl Layer { + svc::layer::mk(move |inner| Self { + extract: extract.clone(), + inner, + }) + } +} + +impl NewService for NewRecordBodyData +where + X: ExtractParam, + N: NewService, +{ + type Service = RecordBodyData; + + fn new_service(&self, target: T) -> Self::Service { + let Self { extract, inner } = self; + + let metrics = extract.extract_param(&target); + let inner = inner.new_service(target); + + RecordBodyData { inner, metrics } + } +} + +// === impl RecordBodyData === + +impl Service> for RecordBodyData +where + S: Service, Response = Response>, + S::Future: Send + 'static, + RespB: Body + Send + 'static, + RespB::Data: Send + 'static, + RespB::Error: Into, +{ + type Response = Response; + type Error = S::Error; + type Future = Pin> + Send>>; + + #[inline] + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + use futures::{FutureExt, TryFutureExt}; + + let Self { inner, metrics } = self; + let metrics = metrics.clone(); + inner + .call(req) + .map_ok(|rsp| rsp.map(|b| BoxBody::new(super::body::Body::new(b, metrics)))) + .boxed() + } +} diff --git a/linkerd/http/prom/src/lib.rs b/linkerd/http/prom/src/lib.rs index 4f00f842b4..51ee223934 100644 --- a/linkerd/http/prom/src/lib.rs +++ b/linkerd/http/prom/src/lib.rs @@ -1,6 +1,7 @@ #![deny(rust_2018_idioms, clippy::disallowed_methods, clippy::disallowed_types)] #![forbid(unsafe_code)] +pub mod body_data; mod count_reqs; pub mod record_response; diff --git a/linkerd/metrics/src/lib.rs b/linkerd/metrics/src/lib.rs index 1f1dcc3cb6..58f2259642 100644 --- a/linkerd/metrics/src/lib.rs +++ b/linkerd/metrics/src/lib.rs @@ -85,7 +85,12 @@ pub trait Factor { const MAX_PRECISE_UINT64: u64 = 0x20_0000_0000_0000; impl Factor for () { + #[inline] fn factor(n: u64) -> f64 { - n.wrapping_rem(MAX_PRECISE_UINT64 + 1) as f64 + to_f64(n) } } + +pub fn to_f64(n: u64) -> f64 { + n.wrapping_rem(MAX_PRECISE_UINT64 + 1) as f64 +}