From 0aaf7c058788a760910d82971a9e79db2426b596 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Wed, 11 Dec 2024 01:15:25 -0500 Subject: [PATCH 1/5] chore(app/inbound): address hyper deprecations in http/1 tests this is a follow-up commit related to 24dc5d8a (#3445). see for more information on upgrading to hyper 1.0. --- this addresses hyper deprecations in the http/1 tests for the inbound proxy. prior, we made use of `tower::ServiceExt::oneshot`, which consumes a service and drops it after sending a request and polling the response future to completion. tower is not a 1.0 library yet, so `SendRequest` does not provide an implementation of `tower::Service` in hyper's 1.0 interface: - - consequentially, we must drop the sender ourselves after receiving a response now. --- this commit *also* addresses hyper deprecations in the http/1 downgrade tests for the inbound proxy. because these tests involve a http/2 client and an http/1 server, we take the choice of inlining the body of `http_util::connect_and_accept()` rather than introducing a new, third `http_util::connect_and_accept_http_downgrade()` function. we will refactor these helper functions in follow-on commits. NB: because `ContextError` is internal to the `linkerd-app-test` crate, we do not wrap the errors. these are allegedly used by the fuzzing tests (_see f.ex #986 and #989_), but for our purposes with respect to the inbound proxy we can elide them rather than making `ctx()` a public method. --- Signed-off-by: katelyn martin --- linkerd/app/inbound/src/http/tests.rs | 113 +++++++++++++++++++------- linkerd/app/test/src/http_util.rs | 45 ++++++++++ 2 files changed, 129 insertions(+), 29 deletions(-) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 524971713d..3a86df8fde 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -48,8 +48,7 @@ where #[tokio::test(flavor = "current_thread")] async fn unmeshed_http1_hello_world() { let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -64,7 +63,7 @@ async fn unmeshed_http1_hello_world() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; let req = Request::builder() .method(http::Method::GET) @@ -72,7 +71,7 @@ async fn unmeshed_http1_hello_world() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -81,6 +80,7 @@ async fn unmeshed_http1_hello_world() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -92,9 +92,7 @@ async fn unmeshed_http1_hello_world() { async fn downgrade_origin_form() { // Reproduces https://github.com/linkerd/linkerd2/issues/5298 let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let client = hyper::client::conn::http2::Builder::new(TracingExecutor); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -109,7 +107,35 @@ async fn downgrade_origin_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = { + tracing::info!(settings = ?client, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server.oneshot(server_io).await?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) + }; let req = Request::builder() .method(http::Method::GET) @@ -119,7 +145,7 @@ async fn downgrade_origin_form() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -128,6 +154,7 @@ async fn downgrade_origin_form() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -137,10 +164,8 @@ async fn downgrade_origin_form() { #[tokio::test(flavor = "current_thread")] async fn downgrade_absolute_form() { + let client = hyper::client::conn::http2::Builder::new(TracingExecutor); let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -155,7 +180,36 @@ async fn downgrade_absolute_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + + let (mut client, bg) = { + tracing::info!(settings = ?client, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server.oneshot(server_io).await?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) + }; let req = Request::builder() .method(http::Method::GET) @@ -165,7 +219,7 @@ async fn downgrade_absolute_form() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -174,6 +228,7 @@ async fn downgrade_absolute_form() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -190,8 +245,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { // Build a client using the connect that always errors so that responses // are BAD_GATEWAY. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -199,7 +253,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -221,6 +275,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { check_error_header(rsp.headers(), "server is not listening"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -237,8 +292,7 @@ async fn http1_bad_gateway_unmeshed_response() { // Build a client using the connect that always errors so that responses // are BAD_GATEWAY. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -246,7 +300,7 @@ async fn http1_bad_gateway_unmeshed_response() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -256,7 +310,7 @@ async fn http1_bad_gateway_unmeshed_response() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -267,6 +321,7 @@ async fn http1_bad_gateway_unmeshed_response() { ); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -285,8 +340,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -294,7 +348,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -304,7 +358,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -317,6 +371,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { check_error_header(rsp.headers(), "connect timed out after 1s"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -335,8 +390,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -344,7 +398,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -354,7 +408,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -365,6 +419,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { ); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index f675263b27..b4be313603 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -55,6 +55,51 @@ pub async fn connect_and_accept( (client, bg) } +/// Connects a client and server, running a proxy between them. +/// +/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and +/// await a response, and (2) a [`JoinSet`] running background tasks. +pub async fn connect_and_accept_http1( + client_settings: &mut hyper::client::conn::http1::Builder, + server: BoxServer, +) -> ( + hyper::client::conn::http1::SendRequest, + JoinSet>, +) { + tracing::info!(settings = ?client_settings, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client_settings + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server + .oneshot(server_io) + .await + .map_err(ContextError::ctx("proxy background task failed"))?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await + .map_err(ContextError::ctx("client background task failed")) + .map_err(Error::from)?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) +} + /// Connects a client and server, running a proxy between them. /// /// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and From 7fef948b7cf6054330be406714c91fa512db95a6 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 2/5] refactor(app/test): remove unused `http_util::connect_and_accept(..)` this removes `connect_and_accept(..)`. this will break fuzzing builds, but it is not used elsewhere. Signed-off-by: katelyn martin --- linkerd/app/test/src/http_util.rs | 46 ------------------------------- 1 file changed, 46 deletions(-) diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index b4be313603..935907b895 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -9,52 +9,6 @@ use tracing::Instrument; type BoxServer = svc::BoxTcp; -/// Connects a client and server, running a proxy between them. -/// -/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and -/// await a response, and (2) a [`JoinSet`] running background tasks. -#[allow(deprecated)] // linkerd/linkerd2#8733 -pub async fn connect_and_accept( - client_settings: &mut hyper::client::conn::Builder, - server: BoxServer, -) -> ( - hyper::client::conn::SendRequest, - JoinSet>, -) { - tracing::info!(settings = ?client_settings, "connecting client with"); - let (client_io, server_io) = io::duplex(4096); - - let (client, conn) = client_settings - .handshake(client_io) - .await - .expect("Client must connect"); - - let mut bg = tokio::task::JoinSet::new(); - bg.spawn( - async move { - server - .oneshot(server_io) - .await - .map_err(ContextError::ctx("proxy background task failed"))?; - tracing::info!("proxy serve task complete"); - Ok(()) - } - .instrument(tracing::info_span!("proxy")), - ); - bg.spawn( - async move { - conn.await - .map_err(ContextError::ctx("client background task failed")) - .map_err(Error::from)?; - tracing::info!("client background complete"); - Ok(()) - } - .instrument(tracing::info_span!("client_bg")), - ); - - (client, bg) -} - /// Connects a client and server, running a proxy between them. /// /// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and From 71f289aec98da4b5dff793c6b9e27788152d9630 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 3/5] chore(fuzz): address hyper deprecation in inbound fuzz tests Signed-off-by: katelyn martin --- linkerd/app/inbound/src/http.rs | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/linkerd/app/inbound/src/http.rs b/linkerd/app/inbound/src/http.rs index 3de8c136d2..bd5071bc58 100644 --- a/linkerd/app/inbound/src/http.rs +++ b/linkerd/app/inbound/src/http.rs @@ -18,7 +18,7 @@ pub mod fuzz { test_util::{support::connect::Connect, *}, Config, Inbound, }; - use hyper::{client::conn::Builder as ClientBuilder, Body, Request, Response}; + use hyper::{Body, Request, Response}; use libfuzzer_sys::arbitrary::Arbitrary; use linkerd_app_core::{ identity, io, @@ -41,9 +41,8 @@ pub mod fuzz { } pub async fn fuzz_entry_raw(requests: Vec) { - let mut server = hyper::server::conn::Http::new(); - server.http1_only(true); - let mut client = ClientBuilder::new(); + let server = hyper::server::conn::http1::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let connect = support::connect().endpoint_fn_boxed(Target::addr(), hello_fuzz_server(server)); let profiles = profile::resolver(); @@ -55,7 +54,7 @@ pub mod fuzz { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_fuzz_server(cfg, rt, profiles, connect).new_service(Target::HTTP1); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Now send all of the requests for inp in requests.iter() { @@ -74,14 +73,7 @@ pub mod fuzz { .header(header_name, header_value) .body(Body::default()) { - let rsp = client - .ready() - .await - .expect("HTTP client poll_ready failed") - .call(req) - .await - .expect("HTTP client request failed"); - tracing::info!(?rsp); + let rsp = client.send_request(req).await; tracing::info!(?rsp); if let Ok(rsp) = rsp { let body = http_util::body_to_string(rsp.into_body()).await; @@ -93,18 +85,18 @@ pub mod fuzz { } } - drop(client); // It's okay if the background task returns an error, as this would // indicate that the proxy closed the connection --- which it will do on // invalid inputs. We want to ensure that the proxy doesn't crash in the // face of these inputs, and the background task will panic in this // case. - let res = bg.await; + drop(client); + let res = bg.join_all().await; tracing::info!(?res, "background tasks completed") } fn hello_fuzz_server( - http: hyper::server::conn::Http, + http: hyper::server::conn::http1::Builder, ) -> impl Fn(Remote) -> io::Result { move |_endpoint| { let (client_io, server_io) = support::io::duplex(4096); From e2eb861034b1f959bdf49f9e9e20aece724797f5 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 4/5] =?UTF-8?q?chore(fuzz):=20address=20pre=C3=ABxisting?= =?UTF-8?q?=20fuzz=20breakage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit this commit addresses other breakage found in the fuzz tests, tied to other previous work. after these changes, one can observe that the fuzz tests build and run once more by running the following: ```sh cargo +nightly fuzz run --fuzz-dir=linkerd/app/inbound/fuzz/ fuzz_target_1 ``` Signed-off-by: katelyn martin --- linkerd/app/inbound/Cargo.toml | 3 +++ linkerd/app/inbound/src/http.rs | 3 +++ 2 files changed, 6 insertions(+) diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index 9e152aa587..d3f7acb529 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -49,6 +49,9 @@ hyper = { version = "0.14", features = ["deprecated", "http1", "http2"] } linkerd-app-test = { path = "../test" } arbitrary = { version = "1", features = ["derive"] } libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] } +linkerd-meshtls-rustls = { path = "../../meshtls/rustls", features = [ + "test-util", +] } [dev-dependencies] hyper = { version = "0.14", features = ["deprecated", "http1", "http2"] } diff --git a/linkerd/app/inbound/src/http.rs b/linkerd/app/inbound/src/http.rs index bd5071bc58..855c122ad0 100644 --- a/linkerd/app/inbound/src/http.rs +++ b/linkerd/app/inbound/src/http.rs @@ -227,6 +227,9 @@ pub mod fuzz { kind: "server".into(), name: "testsrv".into(), }), + local_rate_limit: Arc::new( + linkerd_proxy_server_policy::LocalRateLimit::default(), + ), }, ); policy From 2bfb71d1bb83d426e106dd8f7ec8c465564e798b Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH 5/5] nit(fuzz): remove stray newline from manifest Signed-off-by: katelyn martin --- linkerd/app/inbound/fuzz/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/linkerd/app/inbound/fuzz/Cargo.toml b/linkerd/app/inbound/fuzz/Cargo.toml index 9a49887f91..342d6429af 100644 --- a/linkerd/app/inbound/fuzz/Cargo.toml +++ b/linkerd/app/inbound/fuzz/Cargo.toml @@ -1,4 +1,3 @@ - [package] name = "linkerd-app-inbound-fuzz" version = "0.0.0"