Skip to content

Commit

Permalink
make it work
Browse files Browse the repository at this point in the history
  • Loading branch information
cratelyn committed Nov 9, 2024
1 parent e1c5dee commit cc0e8f0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,39 @@ async fn http_route_request_body_frames() {
assert_eq!(frames_total.get(), 0);
assert_eq!(frames_bytes.get(), 0);

// Now, send some request body bytes.
todo!("send request body bytes")
/// Returns the next chunk from a boxed body.
async fn read_chunk(body: &mut std::pin::Pin<Box<BoxBody>>) -> Vec<u8> {
use bytes::Buf;
use http_body::Body;
use std::task::{Context, Poll};
let mut ctx = Context::from_waker(futures_util::task::noop_waker_ref());
let data = match body.as_mut().poll_data(&mut ctx) {
Poll::Ready(Some(Ok(d))) => d,
_ => panic!("next chunk should be ready"),
};
data.chunk().to_vec()
}

// And now, send request body bytes.
tracing::info!("sending request body bytes");
{
// Get the client's sending half, and the server's receiving half of the request body.
let (mut tx, mut rx) = (tx, Box::pin(rx));

tx.send_data(b"milk".as_slice().into()).await.unwrap();
let chunk = read_chunk(&mut rx).await;
debug_assert_eq!(chunk, b"milk");
assert_eq!(frames_total.get(), 1); // bytes are counted once polled.
assert_eq!(frames_bytes.get(), 4);

tx.send_data(b"syrup".as_slice().into()).await.unwrap();
let chunk = read_chunk(&mut rx).await;
debug_assert_eq!(chunk, b"syrup");
assert_eq!(frames_total.get(), 2);
assert_eq!(frames_bytes.get(), 4 + 5);
}

tracing::info!("passed");
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
Expand Down
38 changes: 13 additions & 25 deletions linkerd/http/prom/src/body_data/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
pub use super::metrics::{BodyDataMetrics, RequestBodyFamilies};

use http::{Request, Response};
use http_body::Body;
use linkerd_error::Error;
use linkerd_http_box::BoxBody;
use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service};
Expand Down Expand Up @@ -81,11 +80,11 @@ where

impl<ReqB, RespB, S, ReqX, L> Service<Request<ReqB>> for RecordBodyData<S, ReqX, L>
where
S: Service<Request<ReqB>, Response = Response<RespB>>,
S: Service<Request<BoxBody>, Response = Response<RespB>>,
S::Future: Send + 'static,
RespB: Body + Send + 'static,
RespB::Data: Send + 'static,
RespB::Error: Into<Error>,
ReqB: http_body::Body + Send + 'static,
ReqB::Data: Send + 'static,
ReqB::Error: Into<Error>,
ReqX: ExtractParam<L, Request<ReqB>>,
L: linkerd_metrics::prom::encoding::EncodeLabelSet
+ std::fmt::Debug
Expand All @@ -96,9 +95,9 @@ where
+ Sync
+ 'static,
{
type Response = Response<BoxBody>;
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
type Future = S::Future;

#[inline]
fn poll_ready(
Expand All @@ -109,30 +108,19 @@ where
}

fn call(&mut self, req: Request<ReqB>) -> Self::Future {
use futures::{FutureExt, TryFutureExt};

let Self {
inner,
extract,
metrics,
} = self;

let labels = extract.extract_param(&req);
let metrics = metrics.get(&labels);
let instrument = Box::new(|resp| Self::instrument_response(resp, metrics));

inner.call(req).map_ok(instrument).boxed()
}
}
let req = {
let labels = extract.extract_param(&req);
let metrics = metrics.get(&labels);
let instrument = |b| super::body::Body::new(b, metrics);
req.map(instrument).map(BoxBody::new)
};

impl<S, ReqX, L> RecordBodyData<S, ReqX, L> {
fn instrument_response<B>(resp: Response<B>, metrics: BodyDataMetrics) -> Response<BoxBody>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Error>,
{
resp.map(|b| super::body::Body::new(b, metrics))
.map(BoxBody::new)
inner.call(req)
}
}

0 comments on commit cc0e8f0

Please sign in to comment.