diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs index 007657d506..1bd231f8fb 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -244,6 +244,105 @@ async fn http_request_hostnames() { .await; } +// XXX(kate) + +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_route_request_body_frames() { + let _trace = linkerd_tracing::test::trace_init(); + + use linkerd_http_prom::body_data::request::BodyDataMetrics; + + let super::HttpRouteMetrics { + requests, + body_data, + .. + } = super::HttpRouteMetrics::default(); + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = + mock_http_route_metrics(&requests, &body_data, &parent_ref, &route_ref); + handle.allow(1); + + let BodyDataMetrics { + frames_total, + frames_bytes, + } = { + let labels = labels::Route::new( + parent_ref, + route_ref, + &http::uri::Uri::from_static("http://frame.count.test/"), + ); + body_data.get(&labels) + }; + + // Create a request whose body is backed by a channel that we can send chunks to. + tracing::info!("creating request"); + let (req, tx) = { + let (tx, body) = hyper::Body::channel(); + let body = BoxBody::new(body); + let req = http::Request::builder() + .uri("http://frame.count.test") + .method("BARK") + .body(body) + .unwrap(); + (req, tx) + }; + + // Before the service has been called, the counters should be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Call the service. + tracing::info!("sending request to service"); + let (fut, resp_tx, rx) = { + use tower::{Service, ServiceExt}; + tracing::info!("calling service"); + let fut = svc.ready().await.expect("ready").call(req); + let (req, send_resp) = handle.next_request().await.unwrap(); + let (parts, rx) = req.into_parts(); + debug_assert_eq!(parts.method.as_str(), "BARK"); + (fut, send_resp, rx) + }; + + // Before the client has sent any body chunks, the counters should be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Send a response back to the client. + tracing::info!("sending request to service"); + let resp = { + use http::{Response, StatusCode}; + let body = BoxBody::new("earl grey".to_owned()); + let resp = Response::builder() + .status(StatusCode::IM_A_TEAPOT) + .body(body) + .unwrap(); + resp_tx.send_response(resp); + fut.await.expect("resp") + }; + + // The counters should still be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Read the response body. + tracing::info!("reading response body"); + { + use http_body::Body; + let (parts, body) = resp.into_parts(); + debug_assert_eq!(parts.status, 418); + let bytes = body.collect().await.expect("resp body").to_bytes(); + debug_assert_eq!(bytes, "earl grey"); + } + + // Reading the response body should not affect the counters should still be zero. + assert_eq!(frames_total.get(), 0); + assert_eq!(frames_bytes.get(), 0); + + // Now, send some request body bytes. + todo!("send request body bytes") +} + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn grpc_request_statuses_ok() { let _trace = linkerd_tracing::test::trace_init();