Skip to content

Commit

Permalink
chore(http/upgrade): upgrade to hyper 1.x
Browse files Browse the repository at this point in the history
NOTE: there is a comment noting that the upgrade middleware does not
expect to be cloneable. it is unfortunately, however, at odds with the
new bounds expected of extensions.

so, `Http11Upgrade` is now Clone'able, but a comment is left in place
noting this weakened invariant.

it's worth investigating how upgrades have changed since, in more
detail, but for the current moment we are interested in being
especially conservative about changing behavior, and focusing on api
changes like `Body::poll_frame(..)`.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 7, 2025
1 parent 00d8030 commit dd6f80f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1894,6 +1894,7 @@ dependencies = [
"http 1.2.0",
"http-body",
"hyper",
"hyper-util",
"linkerd-duplex",
"linkerd-error",
"linkerd-http-box",
Expand Down
4 changes: 4 additions & 0 deletions linkerd/http/upgrade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ futures = { version = "0.3", default-features = false }
http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true, default-features = false, features = ["client"] }
hyper-util = { workspace = true, default-features = false, features = [
"client",
"client-legacy",
] }
pin-project = "1"
tokio = { version = "1", default-features = false }
tower = { version = "0.4", default-features = false }
Expand Down
33 changes: 8 additions & 25 deletions linkerd/http/upgrade/src/glue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::upgrade::Http11Upgrade;
use futures::{ready, TryFuture};
use http_body::Body;
use hyper::client::connect as hyper_connect;
use http_body::{Body, Frame};
use hyper_util::client::legacy::connect as hyper_connect;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_io::{self as io, AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -63,38 +63,21 @@ where
self.body.is_end_stream()
}

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// Poll the next chunk from the body.
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
// Poll the next frame from the body.
let this = self.project();
let body = this.body;
let data = ready!(body.poll_data(cx));
let frame = ready!(body.poll_frame(cx));

// Log errors.
if let Some(Err(e)) = &data {
if let Some(Err(e)) = &frame {
debug!("http body error: {}", e);
}

Poll::Ready(data)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
// Poll the trailers from the body.
let this = self.project();
let body = this.body;
let trailers = ready!(body.poll_trailers(cx));

// Log errors.
if let Err(e) = &trailers {
debug!("http trailers error: {}", e);
}

Poll::Ready(trailers)
Poll::Ready(frame)
}

#[inline]
Expand Down
11 changes: 10 additions & 1 deletion linkerd/http/upgrade/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ use try_lock::TryLock;
/// inserted into the `Request::extensions()`. If the HTTP1 client service
/// also detects an upgrade, the two `OnUpgrade` futures will be joined
/// together with the glue in this type.
//
// Note: this relies on their only having been 2 Inner clones, so don't
// implement `Clone` for this type.
// XXX(kate): to satisfy new trait bounds when upgrading to hyper 1.x, this type must now be
// Clone'able.
#[derive(Clone)]
pub struct Http11Upgrade {
half: Half,
inner: Arc<Inner>,
Expand All @@ -50,7 +54,9 @@ struct Inner {
upgrade_drain_signal: Option<drain::Watch>,
}

#[derive(Debug)]
// XXX(kate): to satisfy new trait bounds when upgrading to hyper 1.x, this type must now be
// Clone'able.
#[derive(Clone, Debug)]
enum Half {
Server,
Client,
Expand Down Expand Up @@ -139,6 +145,9 @@ impl Drop for Inner {
let both_upgrades = async move {
let (server_conn, client_conn) = tokio::try_join!(server_upgrade, client_upgrade)?;
trace!("HTTP upgrade successful");
use hyper_util::rt::TokioIo;
let client_conn = TokioIo::new(client_conn);
let server_conn = TokioIo::new(server_conn);
if let Err(e) = Duplex::new(client_conn, server_conn).await {
info!("tcp duplex error: {}", e)
}
Expand Down

0 comments on commit dd6f80f

Please sign in to comment.