diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs index 1bd231f8fb..ae132e94af 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -339,8 +339,39 @@ async fn http_route_request_body_frames() { assert_eq!(frames_total.get(), 0); assert_eq!(frames_bytes.get(), 0); - // Now, send some request body bytes. - todo!("send request body bytes") + /// Returns the next chunk from a boxed body. + async fn read_chunk(body: &mut std::pin::Pin>) -> Vec { + use bytes::Buf; + use http_body::Body; + use std::task::{Context, Poll}; + let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref()); + let data = match body.as_mut().poll_data(&mut ctx) { + Poll::Ready(Some(Ok(d))) => d, + _ => panic!("next chunk should be ready"), + }; + data.chunk().to_vec() + } + + // And now, send request body bytes. + tracing::info!("sending request body bytes"); + { + // Get the client's sending half, and the server's receiving half of the request body. + let (mut tx, mut rx) = (tx, Box::pin(rx)); + + tx.send_data(b"milk".as_slice().into()).await.unwrap(); + let chunk = read_chunk(&mut rx).await; + debug_assert_eq!(chunk, b"milk"); + assert_eq!(frames_total.get(), 1); // bytes are counted once polled. + assert_eq!(frames_bytes.get(), 4); + + tx.send_data(b"syrup".as_slice().into()).await.unwrap(); + let chunk = read_chunk(&mut rx).await; + debug_assert_eq!(chunk, b"syrup"); + assert_eq!(frames_total.get(), 2); + assert_eq!(frames_bytes.get(), 4 + 5); + } + + tracing::info!("passed"); } #[tokio::test(flavor = "current_thread", start_paused = true)] diff --git a/linkerd/http/prom/src/body_data/request.rs b/linkerd/http/prom/src/body_data/request.rs index c3e228f60b..e2d0811f9d 100644 --- a/linkerd/http/prom/src/body_data/request.rs +++ b/linkerd/http/prom/src/body_data/request.rs @@ -3,7 +3,6 @@ pub use super::metrics::{BodyDataMetrics, RequestBodyFamilies}; use http::{Request, Response}; -use http_body::Body; use linkerd_error::Error; use linkerd_http_box::BoxBody; use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service}; @@ -81,11 +80,11 @@ where impl Service> for RecordBodyData where - S: Service, Response = Response>, + S: Service, Response = Response>, S::Future: Send + 'static, - RespB: Body + Send + 'static, - RespB::Data: Send + 'static, - RespB::Error: Into, + ReqB: http_body::Body + Send + 'static, + ReqB::Data: Send + 'static, + ReqB::Error: Into, ReqX: ExtractParam>, L: linkerd_metrics::prom::encoding::EncodeLabelSet + std::fmt::Debug @@ -96,9 +95,9 @@ where + Sync + 'static, { - type Response = Response; + type Response = S::Response; type Error = S::Error; - type Future = Pin> + Send>>; + type Future = S::Future; #[inline] fn poll_ready( @@ -109,30 +108,19 @@ where } fn call(&mut self, req: Request) -> Self::Future { - use futures::{FutureExt, TryFutureExt}; - let Self { inner, extract, metrics, } = self; - let labels = extract.extract_param(&req); - let metrics = metrics.get(&labels); - let instrument = Box::new(|resp| Self::instrument_response(resp, metrics)); - - inner.call(req).map_ok(instrument).boxed() - } -} + let req = { + let labels = extract.extract_param(&req); + let metrics = metrics.get(&labels); + let instrument = |b| super::body::Body::new(b, metrics); + req.map(instrument).map(BoxBody::new) + }; -impl RecordBodyData { - fn instrument_response(resp: Response, metrics: BodyDataMetrics) -> Response - where - B: Body + Send + 'static, - B::Data: Send + 'static, - B::Error: Into, - { - resp.map(|b| super::body::Body::new(b, metrics)) - .map(BoxBody::new) + inner.call(req) } }