From dd4fbcdb6e1d6f4b92684e7cc9517f2f3c5bb3c3 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Fri, 24 Jan 2025 15:40:00 -0500 Subject: [PATCH] feat(http/retry): model `PeekTrailersBody` with `Frame` (#3559) this branch contains a sequence of commits that refactor `PeekTrailersBody`. this branch is specifically focused on making this body middleware forward-compatible with the 1.0 interface(s) of `http_body::Body` and `http_body_util::BodyExt`. it does this in two main steps: (1) temporarily vendoring `http_body::Frame` and providing a compatibility shim that provides a `frame()` method for a body, and (2) modeling `PeekTrailersBody` and its peeking logic in terms of this `Frame<'a, T>` future. --- * feat(http/retry): add `Frame` compatibility facilities this commit introduces a `compat` submodule to `linkerd-http-retry`. this helps us frontrun the task of replacing all of the finicky control flow in `PeekTrailersBody` using the antiquated `data()` and `trailers()` future combinators. instead, we can perform our peeking in terms of an approximation of `http_body_util::BodyExt::frame()`. to accomplish this, this commit vendors a copy of the `Frame` type. we can use this to preemptively model our peek body in terms of this type, and move to the "real" version of it when we're upgrading in pr #3504. additionally, this commit includes a type called `ForwardCompatibleBody`, and a variant of the `Frame<'a, T>` combinator. these are a bit boilerplate-y, admittedly, but the pleasant part of this is that we have, in effect, migrated the trickiest body middleware in advance of #3504. once we upgrade to http-body 1.0, all of these types can be removed. https://docs.rs/http-body-util/latest/http_body_util/trait.BodyExt.html#method.frame https://docs.rs/http-body-util/0.1.2/src/http_body_util/combinators/frame.rs.html#10 Signed-off-by: katelyn martin * refactor(http/retry): `PeekTrailersBody` uses `BodyExt::frame()` this commit reworks `PeekTrailersBody`. the most important goal here is replacing the control flow of `read_body()`, which polls a body using `BodyExt` future combinators `data()` and `frame()` for up to two frames, with varying levels of persistence depending on outcomes. to quote #3556: > the intent of this type is to only yield the asynchronous task > responsible for reading the body once. depending on what the inner > body yields, and when, this can result in combinations of: no data > frames and no trailers, no data frames with trailers, one data frame > and no trailers, one data frame with trailers, or two data frames. > moreover, depending on which of these are yielded, the body will call > .await some scenarios, and only poll functions once in others. > > migrating this to the Frame and poll_frame() style of the 1.0 Body > interface, away from the 0.4 interface that provides distinct > poll_data() and poll_trailers() methods, is fundamentally tricky. this means that `PeekTrailersBody` is notably difficult to port across the http-body 0.4/1.0 upgrade boundary. this body middleware must navigate a number of edge conditions, and once it _has_ obtained a `Frame`, make use of conversion methods to ascertain whether it is a data or trailers frame, due to the fact that its internal enum representation is not exposed publicly. one it has done all of that, it must do the same thing once more to examine the second frame. this commit uses the compatibility facilities and backported `Frame` introduced in the previous commit, and rewrites this control flow using a form of the `BodyExt::frame()` combinator. this means that this middleware is forward-compatible with http-body 1.0, which will dramatically simplify the remaining migration work to be done in #3504. see https://github.com/linkerd/linkerd2/issues/8733 for more information and other links related to this ongoing migration work. Signed-off-by: katelyn martin * refactor(http/retry): mock body enforces `poll_trailers()` contract this commit addresses a `TODO` note, and tightens the enforcement of a rule defined by the v0.4 signature of the `Body` trait. this commit changes the mock body type, used in tests, so that it will panic if the caller improperly polls for a trailers frame before the final data frame has been yielded. previously, a comment indicated that we were "fairly sure" this was okay. while that may have been acceptable in practice, the changes in the previous commit mean that we now properly respect these rules. thus, a panic can be placed here, to enforce that "[is] only be called once `poll_data()` returns `None`", per the documentation. Signed-off-by: katelyn martin * refactor(http/retry): rename `PeekTrailersBody::Buffered` this is a nicer name than `Unknown` for this case. not to mention, we'll want that name shortly to address the possibility of unknown frame variants. Signed-off-by: katelyn martin * refactor(http/retry): encapsulate `Inner` enum variants this commit makes the inner enum variants private. https://github.com/linkerd/linkerd2-proxy/pull/3559#discussion_r1928946521 Signed-off-by: katelyn martin * refactor(http/retry): gracefully ignore unknown frames https://github.com/linkerd/linkerd2-proxy/pull/3559#discussion_r1928963019 Signed-off-by: katelyn martin --------- Signed-off-by: katelyn martin --- linkerd/http/retry/src/compat.rs | 115 ++++++++ linkerd/http/retry/src/compat/frame.rs | 131 +++++++++ linkerd/http/retry/src/lib.rs | 2 + linkerd/http/retry/src/peek_trailers.rs | 338 ++++++++++++++++-------- 4 files changed, 475 insertions(+), 111 deletions(-) create mode 100644 linkerd/http/retry/src/compat.rs create mode 100644 linkerd/http/retry/src/compat/frame.rs diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/retry/src/compat.rs new file mode 100644 index 0000000000..1294319496 --- /dev/null +++ b/linkerd/http/retry/src/compat.rs @@ -0,0 +1,115 @@ +//! Compatibility utilities for upgrading to http-body 1.0. + +use http_body::Body; + +pub(crate) use self::frame::Frame; + +mod frame; + +#[derive(Debug)] +pub(crate) struct ForwardCompatibleBody { + inner: B, + data_finished: bool, + trailers_finished: bool, +} + +// === impl ForwardCompatibleBody === + +impl ForwardCompatibleBody { + pub(crate) fn new(body: B) -> Self { + if body.is_end_stream() { + Self { + inner: body, + data_finished: true, + trailers_finished: true, + } + } else { + Self { + inner: body, + data_finished: false, + trailers_finished: false, + } + } + } + + pub(crate) fn into_inner(self) -> B { + self.inner + } + + /// Returns a future that resolves to the next frame. + pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> { + combinators::Frame(self) + } +} + +/// Future that resolves to the next frame from a `Body`. +/// +/// NB: This is a vendored stand-in for [`Frame<'a, T>`][frame], and and can be replaced once +/// we upgrade from http-body 0.4 to 1.0. This file was vendored, and subsequently adapted to this +/// project, at commit 86fdf00. +/// +/// See linkerd/linkerd2#8733 for more information. +/// +/// [frame]: https://docs.rs/http-body-util/0.1.2/http_body_util/combinators/struct.Frame.html +mod combinators { + use core::future::Future; + use core::pin::Pin; + use core::task; + use http_body::Body; + use std::ops::Not; + use std::task::ready; + + use super::ForwardCompatibleBody; + + #[must_use = "futures don't do anything unless polled"] + #[derive(Debug)] + /// Future that resolves to the next frame from a [`Body`]. + pub struct Frame<'a, T>(pub(super) &'a mut super::ForwardCompatibleBody); + + impl Future for Frame<'_, T> { + type Output = Option, T::Error>>; + + fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll { + let Self(ForwardCompatibleBody { + inner, + data_finished, + trailers_finished, + }) = self.get_mut(); + let mut pinned = Pin::new(inner); + + // We have already yielded the trailers, the body is done. + if *trailers_finished { + return task::Poll::Ready(None); + } + + // We are still yielding data frames. + if data_finished.not() { + match ready!(pinned.as_mut().poll_data(ctx)) { + Some(Ok(data)) => { + // We yielded a frame. + return task::Poll::Ready(Some(Ok(super::Frame::data(data)))); + } + Some(Err(error)) => { + // If we encountered an error, we are finished. + *data_finished = true; + *trailers_finished = true; + return task::Poll::Ready(Some(Err(error))); + } + None => { + // We are done yielding data frames. Mark the corresponding flag, and fall + // through to poll the trailers... + *data_finished = true; + } + }; + } + + // We have yielded all of the data frames but have not yielded the trailers. + let trailers = ready!(pinned.poll_trailers(ctx)); + *trailers_finished = true; + let trailers = trailers + .transpose() + .map(|res| res.map(super::Frame::trailers)); + task::Poll::Ready(trailers) + } + } +} diff --git a/linkerd/http/retry/src/compat/frame.rs b/linkerd/http/retry/src/compat/frame.rs new file mode 100644 index 0000000000..3e03ca0ea2 --- /dev/null +++ b/linkerd/http/retry/src/compat/frame.rs @@ -0,0 +1,131 @@ +#![allow(unused, reason = "this code is vendored from `http-body v1.0.1")] + +//! A frame of any kind related to an HTTP stream (body). +//! +//! NB: This is a vendored stand-in for [`Frame`][frame], and and can be replaced once +//! we upgrade from http-body 0.4 to 1.0. This file was vendored at commit 86fdf00. +//! +//! See linkerd/linkerd2#8733 for more information. +//! +//! [frame]: https://docs.rs/http-body/1.0.1/http_body/struct.Frame.html> + +use http::HeaderMap; + +/// A frame of any kind related to an HTTP stream (body). +#[derive(Debug)] +pub struct Frame { + kind: Kind, +} + +#[derive(Debug)] +enum Kind { + // The first two variants are "inlined" since they are undoubtedly + // the most common. This saves us from having to allocate a + // boxed trait object for them. + Data(T), + Trailers(HeaderMap), + //Unknown(Box), +} + +impl Frame { + /// Create a DATA frame with the provided `Buf`. + pub fn data(buf: T) -> Self { + Self { + kind: Kind::Data(buf), + } + } + + /// Create a trailers frame. + pub fn trailers(map: HeaderMap) -> Self { + Self { + kind: Kind::Trailers(map), + } + } + + /// Maps this frame's data to a different type. + pub fn map_data(self, f: F) -> Frame + where + F: FnOnce(T) -> D, + { + match self.kind { + Kind::Data(data) => Frame { + kind: Kind::Data(f(data)), + }, + Kind::Trailers(trailers) => Frame { + kind: Kind::Trailers(trailers), + }, + } + } + + /// Returns whether this is a DATA frame. + pub fn is_data(&self) -> bool { + matches!(self.kind, Kind::Data(..)) + } + + /// Consumes self into the buf of the DATA frame. + /// + /// Returns an [`Err`] containing the original [`Frame`] when frame is not a DATA frame. + /// `Frame::is_data` can also be used to determine if the frame is a DATA frame. + pub fn into_data(self) -> Result { + match self.kind { + Kind::Data(data) => Ok(data), + _ => Err(self), + } + } + + /// If this is a DATA frame, returns a reference to it. + /// + /// Returns `None` if not a DATA frame. + pub fn data_ref(&self) -> Option<&T> { + match self.kind { + Kind::Data(ref data) => Some(data), + _ => None, + } + } + + /// If this is a DATA frame, returns a mutable reference to it. + /// + /// Returns `None` if not a DATA frame. + pub fn data_mut(&mut self) -> Option<&mut T> { + match self.kind { + Kind::Data(ref mut data) => Some(data), + _ => None, + } + } + + /// Returns whether this is a trailers frame. + pub fn is_trailers(&self) -> bool { + matches!(self.kind, Kind::Trailers(..)) + } + + /// Consumes self into the buf of the trailers frame. + /// + /// Returns an [`Err`] containing the original [`Frame`] when frame is not a trailers frame. + /// `Frame::is_trailers` can also be used to determine if the frame is a trailers frame. + pub fn into_trailers(self) -> Result { + match self.kind { + Kind::Trailers(trailers) => Ok(trailers), + _ => Err(self), + } + } + + /// If this is a trailers frame, returns a reference to it. + /// + /// Returns `None` if not a trailers frame. + pub fn trailers_ref(&self) -> Option<&HeaderMap> { + match self.kind { + Kind::Trailers(ref trailers) => Some(trailers), + _ => None, + } + } + + /// If this is a trailers frame, returns a mutable reference to it. + /// + /// Returns `None` if not a trailers frame. + pub fn trailers_mut(&mut self) -> Option<&mut HeaderMap> { + match self.kind { + Kind::Trailers(ref mut trailers) => Some(trailers), + _ => None, + } + } +} diff --git a/linkerd/http/retry/src/lib.rs b/linkerd/http/retry/src/lib.rs index 5aedecf8bf..5384a55f62 100644 --- a/linkerd/http/retry/src/lib.rs +++ b/linkerd/http/retry/src/lib.rs @@ -4,6 +4,8 @@ pub mod peek_trailers; pub mod replay; +mod compat; + pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody}; pub use tower::retry::budget::Budget; diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index 267dcf8946..b11bb583ec 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -2,6 +2,7 @@ use futures::{ future::{self, Either}, FutureExt, }; +use http::HeaderMap; use http_body::Body; use linkerd_http_box::BoxBody; use pin_project::pin_project; @@ -16,32 +17,39 @@ use std::{ /// /// If the first frame of the body stream was *not* a `TRAILERS` frame, this /// behaves identically to a normal body. +#[pin_project] +pub struct PeekTrailersBody(#[pin] Inner); + #[pin_project(project = Projection)] -pub struct PeekTrailersBody { - /// The inner [`Body`]. +enum Inner { + /// An empty body. + Empty, + /// A body that contains zero or one DATA frame. /// - /// This is the request or response body whose trailers are being peeked. - #[pin] - inner: B, - - /// The first DATA frame received from the inner body, or an error that - /// occurred while polling for data. + /// This variant MAY have trailers that can be peeked. + Unary { + data: Option>, + trailers: Option>, + }, + /// A body that (potentially) contains more than one DATA frame. /// - /// If this is `None`, then the body has completed without any DATA frames. - first_data: Option>, - - /// The inner body's trailers, if it was terminated by a `TRAILERS` frame - /// after 0-1 DATA frames, or an error if polling for trailers failed. + /// This variant indicates that the inner body's trailers could not be observed, with some + /// frames that were buffered. + Buffered { + first: Option>, + second: Option>, + /// The inner [`Body`]. + #[pin] + inner: B, + }, + /// A transparent, inert body. /// - /// Yes, this is a bit of a complex type, so let's break it down: - /// - the outer `Option` indicates whether any trailers were received by - /// `WithTrailers`; if it's `None`, then we don't *know* if the response - /// had trailers, as it is not yet complete. - /// - the inner `Result` and `Option` are the `Result` and `Option` returned - /// by `HttpBody::trailers` on the inner body. If this is `Ok(None)`, then - /// the body has terminated without trailers --- it is *known* to not have - /// trailers. - trailers: Option, B::Error>>, + /// This variant will not attempt to peek the inner body's trailers. + Passthru { + /// The inner [`Body`]. + #[pin] + inner: B, + }, } /// A future that yields a response instrumented with [`PeekTrailersBody`]. @@ -60,9 +68,20 @@ impl PeekTrailersBody { /// This function will return `None` if the body's trailers could not be peeked, or if there /// were no trailers included. pub fn peek_trailers(&self) -> Option<&http::HeaderMap> { - self.trailers - .as_ref() - .and_then(|trls| trls.as_ref().ok()?.as_ref()) + let Self(inner) = self; + match inner { + Inner::Unary { + trailers: Some(Ok(trailers)), + .. + } => Some(trailers), + Inner::Unary { + trailers: None | Some(Err(_)), + .. + } + | Inner::Empty + | Inner::Buffered { .. } + | Inner::Passthru { .. } => None, + } } pub fn map_response(rsp: http::Response) -> WithPeekTrailersBody @@ -76,14 +95,16 @@ impl PeekTrailersBody { // If the response isn't an HTTP version that has trailers, skip trying // to read a trailers frame. if let Version::HTTP_09 | Version::HTTP_10 | Version::HTTP_11 = rsp.version() { - return Either::Left(future::ready(Self::no_trailers(rsp))); + return Either::Left(future::ready( + rsp.map(|inner| Self(Inner::Passthru { inner })), + )); } // If the response doesn't have a body stream, also skip trying to read // a trailers frame. if rsp.is_end_stream() { tracing::debug!("Skipping trailers for empty body"); - return Either::Left(future::ready(Self::no_trailers(rsp))); + return Either::Left(future::ready(rsp.map(|_| Self(Inner::Empty)))); } // Otherwise, return a future that tries to read the next frame. @@ -94,55 +115,123 @@ impl PeekTrailersBody { })) } - async fn read_body(mut body: B) -> Self + async fn read_body(body: B) -> Self where B: Send + Unpin, B::Data: Send + Unpin, B::Error: Send, { + // XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame`s. + // this can be removed when we upgrade to http-body 1.0. + use crate::compat::ForwardCompatibleBody; + let mut body = ForwardCompatibleBody::new(body); + // First, poll the body for its first frame. tracing::debug!("Buffering first data frame"); - let first_data = body.data().await; - - // Now, inspect the frame yielded. If the body yielded a data frame, we will only peek - // the trailers if they are immediately available. If the body did not yield a data frame, - // we will poll a future to yield the trailers. - let trailers = if first_data.is_some() { - // The body has data; stop waiting for trailers. Peek to see if there's immediately a - // trailers frame, and grab it if so. Otherwise, bail. - // - // XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers` - // should only be called after `poll_data` returns `None`...but, in practice, I'm - // fairly sure that this just means that it *will not return `Ready`* until there are - // no data frames left, which is fine for us here, because we `now_or_never` it. - body.trailers().now_or_never() - } else { - // Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see - // if there's trailers... - let trls = body.trailers().await; - Some(trls) - }; - - if trailers.is_some() { + let first_frame = body + .frame() + .map(|f| f.map(|r| r.map(Self::split_frame))) + .await; + + let body = Self(match first_frame { + // The body has no frames. It is empty. + None => Inner::Empty, + // The body yielded an error. We are done. + Some(Err(error)) => Inner::Unary { + data: Some(Err(error)), + trailers: None, + }, + // The body yielded a TRAILERS frame. We are done. + Some(Ok(Some(Either::Right(trailers)))) => Inner::Unary { + data: None, + trailers: Some(Ok(trailers)), + }, + // The body yielded an unknown kind of frame. + Some(Ok(None)) => Inner::Buffered { + first: None, + second: None, + inner: body.into_inner(), + }, + // The body yielded a DATA frame. Check for a second frame, without yielding again. + Some(Ok(Some(Either::Left(first)))) => { + if let Some(second) = body + .frame() + .map(|f| f.map(|r| r.map(Self::split_frame))) + .now_or_never() + { + // The second frame is available. Let's inspect it and determine what to do. + match second { + // The body is finished. There is not a TRAILERS frame. + None => Inner::Unary { + data: Some(Ok(first)), + trailers: None, + }, + // We immediately yielded a result, but it was an error. Alas! + Some(Err(error)) => Inner::Unary { + data: Some(Ok(first)), + trailers: Some(Err(error)), + }, + // We immediately yielded another frame, but it was a second DATA frame. + // We hold on to each frame, but we cannot wait for the TRAILERS. + Some(Ok(Some(Either::Left(second)))) => Inner::Buffered { + first: Some(Ok(first)), + second: Some(Ok(second)), + inner: body.into_inner(), + }, + // The body immediately yielded a second TRAILERS frame. Nice! + Some(Ok(Some(Either::Right(trailers)))) => Inner::Unary { + data: Some(Ok(first)), + trailers: Some(Ok(trailers)), + }, + // The body yielded an unknown kind of frame. + Some(Ok(None)) => Inner::Buffered { + first: None, + second: None, + inner: body.into_inner(), + }, + } + } else { + // If we are here, the second frame is not yet available. We cannot be sure + // that a second DATA frame is on the way, and we are no longer willing to + // await additional frames. There are no trailers to peek. + Inner::Buffered { + first: None, + second: None, + inner: body.into_inner(), + } + } + } + }); + + if body.peek_trailers().is_some() { tracing::debug!("Buffered trailers frame"); } - Self { - inner: body, - first_data, - trailers, - } + body } - /// Returns a response with an inert [`PeekTrailersBody`]. + /// Splits a `Frame` into a chunk of data or a header map. + /// + /// Frames do not expose their inner enums, and instead expose `into_data()` and + /// `into_trailers()` methods. This function breaks the frame into either `Some(Left(data))` + /// if it is given a DATA frame, and `Some(Right(trailers))` if it is given a TRAILERS frame. /// - /// This will not peek the inner body's trailers. - fn no_trailers(rsp: http::Response) -> http::Response { - rsp.map(|inner| Self { - inner, - first_data: None, - trailers: None, - }) + /// This returns `None` if an unknown frame is provided, that is neither. + /// + /// This is an internal helper to facilitate pattern matching in `read_body(..)`, above. + fn split_frame( + frame: crate::compat::Frame, + ) -> Option> { + use {crate::compat::Frame, futures::future::Either}; + match frame.into_data().map_err(Frame::into_trailers) { + Ok(data) => Some(Either::Left(data)), + Err(Ok(trailers)) => Some(Either::Right(trailers)), + Err(Err(_unknown)) => { + // It's possible that some sort of unknown frame could be encountered. + tracing::warn!("an unknown body frame has been buffered"); + None + } + } } } @@ -159,68 +248,97 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let Projection { - inner, - first_data, - trailers: _, - } = self.project(); - - if let Some(first_data) = first_data.take() { - return Poll::Ready(Some(first_data)); + let this = self.project().0.project(); + match this { + Projection::Empty => Poll::Ready(None), + Projection::Passthru { inner } => inner.poll_data(cx), + Projection::Unary { data, .. } => Poll::Ready(data.take()), + Projection::Buffered { + first, + second, + inner, + } => { + if let data @ Some(_) = first.take().or_else(|| second.take()) { + Poll::Ready(data) + } else { + inner.poll_data(cx) + } + } } - - inner.poll_data(cx) } fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let Projection { - inner, - first_data: _, - trailers, - } = self.project(); - - if let Some(trailers) = trailers.take() { - return Poll::Ready(trailers); + let this = self.project().0.project(); + match this { + Projection::Empty => Poll::Ready(Ok(None)), + Projection::Passthru { inner } => inner.poll_trailers(cx), + Projection::Unary { trailers, .. } => Poll::Ready(trailers.take().transpose()), + Projection::Buffered { inner, .. } => inner.poll_trailers(cx), } - - inner.poll_trailers(cx) } #[inline] fn is_end_stream(&self) -> bool { - let Self { - inner, - first_data, - trailers, - } = self; - - let trailers_finished = match trailers { - Some(Ok(Some(_)) | Err(_)) => false, - None | Some(Ok(None)) => true, - }; - - first_data.is_none() && trailers_finished && inner.is_end_stream() + let Self(inner) = self; + match inner { + Inner::Empty => true, + Inner::Passthru { inner } => inner.is_end_stream(), + Inner::Unary { + data: None, + trailers: None, + } => true, + Inner::Unary { .. } => false, + Inner::Buffered { + inner, + first: None, + second: None, + } => inner.is_end_stream(), + Inner::Buffered { .. } => false, + } } #[inline] fn size_hint(&self) -> http_body::SizeHint { use bytes::Buf; - - let mut hint = self.inner.size_hint(); - // If we're holding onto a chunk of data, add its length to the inner - // `Body`'s size hint. - if let Some(Ok(chunk)) = self.first_data.as_ref() { - let buffered = chunk.remaining() as u64; - if let Some(upper) = hint.upper() { - hint.set_upper(upper + buffered); + let Self(inner) = self; + match inner { + Inner::Empty => http_body::SizeHint::new(), + Inner::Passthru { inner } => inner.size_hint(), + Inner::Unary { + data: Some(Ok(data)), + .. + } => { + let size = data.remaining() as u64; + http_body::SizeHint::with_exact(size) + } + Inner::Unary { + data: None | Some(Err(_)), + .. + } => http_body::SizeHint::new(), + Inner::Buffered { + first, + second, + inner, + } => { + // Add any frames we've buffered to the inner `Body`'s size hint. + let mut hint = inner.size_hint(); + let mut add_to_hint = |frame: &Option>| { + if let Some(Ok(buf)) = frame { + let size = buf.remaining() as u64; + if let Some(upper) = hint.upper() { + hint.set_upper(upper + size); + } + hint.set_lower(hint.lower() + size); + } + }; + add_to_hint(first); + add_to_hint(second); + hint } - hint.set_lower(hint.lower() + buffered); } - - hint } } @@ -383,12 +501,10 @@ mod tests { let poll = if data_polls.is_empty() { trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None))) } else { - // If the data frames have not all been yielded, yield `Pending`. - // - // TODO(kate): this arm should panic. it indicates `PeekTrailersBody` isn't - // respecting the contract outlined in + // The called has polled for trailers before exhausting the stream of DATA frames. + // This indicates `PeekTrailersBody` isn't respecting the contract outlined in // . - Poll::Pending + panic!("`poll_trailers()` was called before `poll_data()` returned `Poll::Ready(None)`"); }; // If we return `Poll::Pending`, we must schedule the task to be awoken.