diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index fd6cbdebff..f11bc9f65d 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -133,7 +133,11 @@ where // Set request extensions based on the route configuration // AND/OR headers .push(extensions::NewSetExtensions::layer()) - .push(metrics::layer(&metrics.requests)) + .push(metrics::layer( + &metrics.requests, + Self::label_extractor, + &metrics.body_data, + )) .check_new::() .check_new_service::>() // Configure a classifier to use in the endpoint stack. diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs index 2ed3fa03a9..c943e6c3d5 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics.rs @@ -3,7 +3,10 @@ use linkerd_app_core::{ metrics::prom::{self, EncodeLabelSetMut}, svc, }; -use linkerd_http_prom::record_response::{self, StreamLabel}; +use linkerd_http_prom::{ + body_data::request::{NewRecordBodyData, RequestBodyFamilies}, + record_response::{self, StreamLabel}, +}; pub use linkerd_http_prom::record_response::MkStreamLabel; @@ -23,6 +26,7 @@ pub struct RouteMetrics { pub(super) retry: retry::RouteRetryMetrics, pub(super) requests: RequestMetrics, pub(super) backend: backend::RouteBackendMetrics, + pub(super) body_data: RequestBodyFamilies, } pub type HttpRouteMetrics = RouteMetrics; @@ -56,13 +60,30 @@ pub type NewRecordDuration = #[derive(Clone, Debug)] pub struct ExtractRecordDurationParams(pub M); -pub fn layer( +#[derive(Clone, Debug)] +pub struct ExtractRecordBodyDataParams(RequestBodyFamilies); + +pub fn layer( metrics: &RequestMetrics, -) -> impl svc::Layer, N>> + Clone + mk: X, + body_data: &RequestBodyFamilies, +) -> impl svc::Layer< + N, + Service = NewRecordBodyData< + NewRecordDuration, N>, + X, + labels::RouteLabelExtract, + labels::Route, + >, +> where T: Clone + MkStreamLabel, + X: Clone, { - NewRecordDuration::layer_via(ExtractRecordDurationParams(metrics.clone())) + let record = NewRecordDuration::layer_via(ExtractRecordDurationParams(metrics.clone())); + let body_data = NewRecordBodyData::new(mk, body_data.clone()); + + svc::layers().push(record).push(body_data) } // === impl RouteMetrics === @@ -89,6 +110,7 @@ impl Default for RouteMetrics { requests: Default::default(), backend: Default::default(), retry: Default::default(), + body_data: Default::default(), } } } @@ -99,6 +121,7 @@ impl Clone for RouteMetrics { requests: self.requests.clone(), backend: self.backend.clone(), retry: self.retry.clone(), + body_data: self.body_data.clone(), } } } @@ -113,11 +136,13 @@ impl RouteMetrics { ); let retry = retry::RouteRetryMetrics::register(reg.sub_registry_with_prefix("retry")); + let body_data = RequestBodyFamilies::register(reg); Self { requests, backend, retry, + body_data, } } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs index 6c825796ee..3f318b897d 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -1,3 +1,5 @@ +use crate::http::policy::route::MatchedRoute; + use super::{ super::{Grpc, Http, Route}, labels, @@ -12,19 +14,25 @@ use linkerd_app_core::{ Layer, NewService, }, }; +use linkerd_http_prom::body_data::request::RequestBodyFamilies; use linkerd_proxy_client_policy as policy; #[tokio::test(flavor = "current_thread", start_paused = true)] async fn http_request_statuses() { let _trace = linkerd_tracing::test::trace_init(); - let metrics = super::HttpRouteMetrics::default().requests; + let super::HttpRouteMetrics { + requests, + body_data, + .. + } = super::HttpRouteMetrics::default(); let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); let route_ref = crate::RouteRef(policy::Meta::new_default("route")); - let (mut svc, mut handle) = mock_http_route_metrics(&metrics, &parent_ref, &route_ref); + let (mut svc, mut handle) = + mock_http_route_metrics(&requests, &body_data, &parent_ref, &route_ref); // Send one request and ensure it's counted. - let ok = metrics.get_statuses(&labels::Rsp( + let ok = requests.get_statuses(&labels::Rsp( labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()), labels::HttpRsp { status: Some(http::StatusCode::OK), @@ -43,7 +51,7 @@ async fn http_request_statuses() { // Send another request and ensure it's counted with a different response // status. - let no_content = metrics.get_statuses(&labels::Rsp( + let no_content = requests.get_statuses(&labels::Rsp( labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()), labels::HttpRsp { status: Some(http::StatusCode::NO_CONTENT), @@ -67,7 +75,7 @@ async fn http_request_statuses() { .await; // Emit a response with an error and ensure it's counted. - let unknown = metrics.get_statuses(&labels::Rsp( + let unknown = requests.get_statuses(&labels::Rsp( labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()), labels::HttpRsp { status: None, @@ -81,7 +89,7 @@ async fn http_request_statuses() { // Emit a successful response with a body that fails and ensure that both // the status and error are recorded. - let mixed = metrics.get_statuses(&labels::Rsp( + let mixed = requests.get_statuses(&labels::Rsp( labels::Route::new(parent_ref, route_ref, &Uri::default()), labels::HttpRsp { status: Some(http::StatusCode::OK), @@ -117,13 +125,18 @@ async fn http_request_hostnames() { let _trace = linkerd_tracing::test::trace_init(); - let metrics = super::HttpRouteMetrics::default().requests; + let super::HttpRouteMetrics { + requests, + body_data, + .. + } = super::HttpRouteMetrics::default(); let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); let route_ref = crate::RouteRef(policy::Meta::new_default("route")); - let (mut svc, mut handle) = mock_http_route_metrics(&metrics, &parent_ref, &route_ref); + let (mut svc, mut handle) = + mock_http_route_metrics(&requests, &body_data, &parent_ref, &route_ref); let get_counter = |host: Option<&'static str>, status: Option| { - metrics.get_statuses(&labels::Rsp( + requests.get_statuses(&labels::Rsp( labels::Route::new_with_name( parent_ref.clone(), route_ref.clone(), @@ -231,17 +244,150 @@ async fn http_request_hostnames() { .await; } +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_route_request_body_frames() { + let _trace = linkerd_tracing::test::trace_init(); + + use linkerd_http_prom::body_data::request::BodyDataMetrics; + + let super::HttpRouteMetrics { + requests, + body_data, + .. + } = super::HttpRouteMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = + mock_http_route_metrics(&requests, &body_data, &parent_ref, &route_ref); + handle.allow(1); + + let BodyDataMetrics { + frames_total, + frames_bytes, + } = { + let labels = labels::Route::new( + parent_ref, + route_ref, + &http::uri::Uri::from_static("http://frame.count.test/"), + ); + body_data.get(&labels) + }; + + // Create a request whose body is backed by a channel that we can send chunks to. + tracing::info!("creating request"); + let (req, tx) = { + let (tx, body) = hyper::Body::channel(); + let body = BoxBody::new(body); + let req = http::Request::builder() + .uri("http://frame.count.test") + .method("BARK") + .body(body) + .unwrap(); + (req, tx) + }; + + // Before the service has been called, the counters should be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Call the service. + tracing::info!("sending request to service"); + let (fut, resp_tx, rx) = { + use tower::{Service, ServiceExt}; + tracing::info!("calling service"); + let fut = svc.ready().await.expect("ready").call(req); + let (req, send_resp) = handle.next_request().await.unwrap(); + let (parts, rx) = req.into_parts(); + debug_assert_eq!(parts.method.as_str(), "BARK"); + (fut, send_resp, rx) + }; + + // Before the client has sent any body chunks, the counters should be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Send a response back to the client. + tracing::info!("sending request to service"); + let resp = { + use http::{Response, StatusCode}; + let body = BoxBody::new("earl grey".to_owned()); + let resp = Response::builder() + .status(StatusCode::IM_A_TEAPOT) + .body(body) + .unwrap(); + resp_tx.send_response(resp); + fut.await.expect("resp") + }; + + // The counters should still be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Read the response body. + tracing::info!("reading response body"); + { + use http_body::Body; + let (parts, body) = resp.into_parts(); + debug_assert_eq!(parts.status, 418); + let bytes = body.collect().await.expect("resp body").to_bytes(); + debug_assert_eq!(bytes, "earl grey"); + } + + // Reading the response body should not affect the counters should still be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + /// Returns the next chunk from a boxed body. + async fn read_chunk(body: &mut std::pin::Pin>) -> Vec { + use bytes::Buf; + use http_body::Body; + use std::task::{Context, Poll}; + let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref()); + let data = match body.as_mut().poll_data(&mut ctx) { + Poll::Ready(Some(Ok(d))) => d, + _ => panic!("next chunk should be ready"), + }; + data.chunk().to_vec() + } + + // And now, send request body bytes. + tracing::info!("sending request body bytes"); + { + // Get the client's sending half, and the server's receiving half of the request body. + let (mut tx, mut rx) = (tx, Box::pin(rx)); + + tx.send_data(b"milk".as_slice().into()).await.unwrap(); + let chunk = read_chunk(&mut rx).await; + debug_assert_eq!(chunk, b"milk"); + assert_eq!(frames_total.get(), 1); // bytes are counted once polled. + assert_eq!(frames_bytes.get(), 4); + + tx.send_data(b"syrup".as_slice().into()).await.unwrap(); + let chunk = read_chunk(&mut rx).await; + debug_assert_eq!(chunk, b"syrup"); + assert_eq!(frames_total.get(), 2); + assert_eq!(frames_bytes.get(), 4 + 5); + } + + tracing::info!("passed"); +} + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn grpc_request_statuses_ok() { let _trace = linkerd_tracing::test::trace_init(); - let metrics = super::GrpcRouteMetrics::default().requests; + let super::GrpcRouteMetrics { + requests, + body_data, + .. + } = super::GrpcRouteMetrics::default(); let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); let route_ref = crate::RouteRef(policy::Meta::new_default("route")); - let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + let (mut svc, mut handle) = + mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref); // Send one request and ensure it's counted. - let ok = metrics.get_statuses(&labels::Rsp( + let ok = requests.get_statuses(&labels::Rsp( labels::Route::new( parent_ref.clone(), route_ref.clone(), @@ -280,14 +426,19 @@ async fn grpc_request_statuses_ok() { async fn grpc_request_statuses_not_found() { let _trace = linkerd_tracing::test::trace_init(); - let metrics = super::GrpcRouteMetrics::default().requests; + let super::GrpcRouteMetrics { + requests, + body_data, + .. + } = super::GrpcRouteMetrics::default(); let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); let route_ref = crate::RouteRef(policy::Meta::new_default("route")); - let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + let (mut svc, mut handle) = + mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref); // Send another request and ensure it's counted with a different response // status. - let not_found = metrics.get_statuses(&labels::Rsp( + let not_found = requests.get_statuses(&labels::Rsp( labels::Route::new( parent_ref.clone(), route_ref.clone(), @@ -326,12 +477,17 @@ async fn grpc_request_statuses_not_found() { async fn grpc_request_statuses_error_response() { let _trace = linkerd_tracing::test::trace_init(); - let metrics = super::GrpcRouteMetrics::default().requests; + let super::GrpcRouteMetrics { + requests, + body_data, + .. + } = super::GrpcRouteMetrics::default(); let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); let route_ref = crate::RouteRef(policy::Meta::new_default("route")); - let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + let (mut svc, mut handle) = + mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref); - let unknown = metrics.get_statuses(&labels::Rsp( + let unknown = requests.get_statuses(&labels::Rsp( labels::Route::new( parent_ref.clone(), route_ref.clone(), @@ -360,12 +516,17 @@ async fn grpc_request_statuses_error_response() { async fn grpc_request_statuses_error_body() { let _trace = linkerd_tracing::test::trace_init(); - let metrics = super::GrpcRouteMetrics::default().requests; + let super::GrpcRouteMetrics { + requests, + body_data, + .. + } = super::GrpcRouteMetrics::default(); let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); let route_ref = crate::RouteRef(policy::Meta::new_default("route")); - let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); + let (mut svc, mut handle) = + mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref); - let unknown = metrics.get_statuses(&labels::Rsp( + let unknown = requests.get_statuses(&labels::Rsp( labels::Route::new( parent_ref.clone(), route_ref.clone(), @@ -404,6 +565,7 @@ const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method"; pub fn mock_http_route_metrics( metrics: &RequestMetrics, + body_data: &RequestBodyFamilies, parent_ref: &crate::ParentRef, route_ref: &crate::RouteRef, ) -> (svc::BoxHttp, Handle) { @@ -425,8 +587,9 @@ pub fn mock_http_route_metrics( ) .expect("find default route"); + let extract = MatchedRoute::label_extractor; let (tx, handle) = tower_test::mock::pair::, http::Response>(); - let svc = super::layer(metrics) + let svc = super::layer(metrics, extract, body_data) .layer(move |_t: Http<()>| tx.clone()) .new_service(Http { r#match, @@ -446,6 +609,7 @@ pub fn mock_http_route_metrics( pub fn mock_grpc_route_metrics( metrics: &RequestMetrics, + body_data: &RequestBodyFamilies, parent_ref: &crate::ParentRef, route_ref: &crate::RouteRef, ) -> (svc::BoxHttp, Handle) { @@ -471,8 +635,9 @@ pub fn mock_grpc_route_metrics( ) .expect("find default route"); + let extract = MatchedRoute::label_extractor; let (tx, handle) = tower_test::mock::pair::, http::Response>(); - let svc = super::layer(metrics) + let svc = super::layer(metrics, extract, body_data) .layer(move |_t: Grpc<()>| tx.clone()) .new_service(Grpc { r#match, diff --git a/linkerd/http/prom/src/body_data/metrics.rs b/linkerd/http/prom/src/body_data/metrics.rs index 8ae1872f33..02f46b1d16 100644 --- a/linkerd/http/prom/src/body_data/metrics.rs +++ b/linkerd/http/prom/src/body_data/metrics.rs @@ -2,6 +2,15 @@ use linkerd_metrics::prom::{self, Counter, Family, Registry}; +/// Counters for request body frames. +#[derive(Clone, Debug)] +pub struct RequestBodyFamilies { + /// Counts the number of request body frames. + req_body_frames_total: Family, + /// Counts the total number of bytes in request body frames. + req_body_frames_bytes: Family, +} + /// Counters for response body frames. #[derive(Clone, Debug)] pub struct ResponseBodyFamilies { @@ -20,6 +29,79 @@ pub struct BodyDataMetrics { pub frames_bytes: Counter, } +// === impl RequestBodyFamilies === + +impl Default for RequestBodyFamilies +where + L: Clone + std::hash::Hash + Eq, +{ + fn default() -> Self { + Self { + req_body_frames_total: Default::default(), + req_body_frames_bytes: Default::default(), + } + } +} + +impl RequestBodyFamilies +where + L: prom::encoding::EncodeLabelSet + + std::fmt::Debug + + std::hash::Hash + + Eq + + Clone + + Send + + Sync + + 'static, +{ + const REQ_BODY_FRAMES_TOTAL_NAME: &'static str = "req_body_frames_total"; + const REQ_BODY_FRAMES_TOTAL_HELP: &'static str = + "Counts the number of frames in request bodies."; + + const REQ_BODY_FRAMES_BYTES_NAME: &'static str = "req_body_frames_bytes"; + const REQ_BODY_FRAMES_BYTES_HELP: &'static str = + "Counts the total number of bytes in request bodies."; + + /// Registers and returns a new family of body data metrics. + pub fn register(registry: &mut Registry) -> Self { + let req_body_frames_total = Family::default(); + registry.register( + Self::REQ_BODY_FRAMES_TOTAL_NAME, + Self::REQ_BODY_FRAMES_TOTAL_HELP, + req_body_frames_total.clone(), + ); + + let req_body_frames_bytes = Family::default(); + registry.register_with_unit( + Self::REQ_BODY_FRAMES_BYTES_NAME, + Self::REQ_BODY_FRAMES_BYTES_HELP, + prom::Unit::Bytes, + req_body_frames_bytes.clone(), + ); + + Self { + req_body_frames_total, + req_body_frames_bytes, + } + } + + /// Returns the [`BodyDataMetrics`] for the given label set. + pub fn get(&self, labels: &L) -> BodyDataMetrics { + let Self { + req_body_frames_total, + req_body_frames_bytes, + } = self; + + let frames_total = req_body_frames_total.get_or_create(labels).clone(); + let frames_bytes = req_body_frames_bytes.get_or_create(labels).clone(); + + BodyDataMetrics { + frames_total, + frames_bytes, + } + } +} + // === impl ResponseBodyFamilies === impl Default for ResponseBodyFamilies diff --git a/linkerd/http/prom/src/body_data/request.rs b/linkerd/http/prom/src/body_data/request.rs index fb270395da..d115671da8 100644 --- a/linkerd/http/prom/src/body_data/request.rs +++ b/linkerd/http/prom/src/body_data/request.rs @@ -1 +1,126 @@ -// TODO(kate): write a middleware for request body. +//! Tower middleware to instrument request bodies. + +pub use super::metrics::{BodyDataMetrics, RequestBodyFamilies}; + +use http::{Request, Response}; +use linkerd_error::Error; +use linkerd_http_box::BoxBody; +use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service}; +use std::marker::PhantomData; + +/// A [`NewService`] that creates [`RecordBodyData`] services. +#[derive(Clone, Debug)] +pub struct NewRecordBodyData { + /// The inner [`NewService`]. + inner: N, + extract: X, + metrics: RequestBodyFamilies, + marker: PhantomData, +} + +/// Tracks body frames for an inner `S`-typed [`Service`]. +#[derive(Clone, Debug)] +pub struct RecordBodyData { + /// The inner [`Service`]. + inner: S, + extract: ReqX, + metrics: RequestBodyFamilies, +} + +// === impl NewRecordBodyData === + +impl NewRecordBodyData +where + X: Clone, + L: Clone, +{ + /// Returns a [`Layer`] that tracks body chunks. + /// + /// This uses an `X`-typed [`ExtractParam`] implementation to extract service parameters + /// from a `T`-typed target. + pub fn new(extract: X, metrics: RequestBodyFamilies) -> impl Layer { + svc::layer::mk(move |inner| Self { + inner, + extract: extract.clone(), + metrics: metrics.clone(), + marker: PhantomData, + }) + } +} + +impl NewService for NewRecordBodyData +where + N: NewService, + X: ExtractParam, + L: Clone, +{ + type Service = RecordBodyData; + + fn new_service(&self, target: T) -> Self::Service { + let Self { + inner, + extract, + metrics, + marker: _, + } = self; + + let extract = extract.extract_param(&target); + let inner = inner.new_service(target); + let metrics = metrics.clone(); + + RecordBodyData { + inner, + extract, + metrics, + } + } +} + +// === impl RecordBodyData === + +impl Service> for RecordBodyData +where + S: Service, Response = Response>, + S::Future: Send + 'static, + ReqB: http_body::Body + Send + 'static, + ReqB::Data: Send + 'static, + ReqB::Error: Into, + ReqX: ExtractParam>, + L: linkerd_metrics::prom::encoding::EncodeLabelSet + + std::fmt::Debug + + std::hash::Hash + + Eq + + Clone + + Send + + Sync + + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let Self { + inner, + extract, + metrics, + } = self; + + let req = { + let labels = extract.extract_param(&req); + let metrics = metrics.get(&labels); + let instrument = |b| super::body::Body::new(b, metrics); + req.map(instrument).map(BoxBody::new) + }; + + inner.call(req) + } +}