Skip to content

Commit

Permalink
Update rust dependencies scripting 2024-08-01 hyper/http (#242)
Browse files Browse the repository at this point in the history
* Updated hyper and http (#237)

* Updated cargo lock file to latest versions

* Fixed clippy warnings

* Updated dashmap to new release

* Updated hyper and http in test_common

- Major changes. Many of the utility functions have been moved to the new hyper-util
- See https://hyper.rs/guides/1/upgrading/

* Updated root Cargo.toml to latest hyper and http

- Broken build. Still more to go.

* Updated root Cargo.toml to latest hyper and http

- Broken build. Still more to go.

* Updated code to handle hyper v1 and http v1

* Ignore clippy warning that breaks code if fixed as suggested

* Updated hyper, http, and lock file to latest

- Removed minor version constraint on hyper and http
- Added approved license BSD-2 Clause to the deny.toml

* Cleaned up code

- Removed unneeded dependencies
- Removed commented out code
- Cleaned up BoxBody type in test_common

* Fixed comments from merge
  • Loading branch information
tkmcmaster authored Aug 1, 2024
1 parent f6edae8 commit adc9dc8
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 159 deletions.
279 changes: 189 additions & 90 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 10 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ path = "src/bin/test_server.rs"
[dependencies]
base64.workspace = true
body_reader = { path = "./lib/body_reader" }
bytes = "1"
bytes.workspace = true
channel = { path = "./lib/channel" }
clap = { version = "4", features = ["derive", "cargo", "std", "help", "usage", "error-context", "wrap_help"], default-features = false }
config.workspace = true
Expand All @@ -33,8 +33,10 @@ futures.workspace = true
futures-timer.workspace = true
hdrhistogram = "7"
http.workspace = true
hyper = { workspace = true, features = ["client", "http1", "http2", "stream"] }
hyper-tls = "0.5"
hyper = { workspace = true, features = ["client", "http1", "http2"] }
hyper-tls = "0.6"
hyper-util = { workspace = true, features = ["tokio", "client", "http1", "http2"] }
http-body-util.workspace = true
itertools.workspace = true
mod_interval = { path = "./lib/mod_interval" }
native-tls = "0.2"
Expand Down Expand Up @@ -101,12 +103,15 @@ license = "Apache 2.0"

[workspace.dependencies]
base64 = "0.22"
bytes = "1"
config = { path = "./lib/config" }
ether = { path = "./lib/either" }
futures = "0.3"
futures-timer = "3"
http = "0.2"
hyper = "0.14"
http = "1"
http-body-util = "0.1"
hyper = "1"
hyper-util = "0.1"
itertools = "0.13"
js-sys = "0.3.64"
log = "0.4"
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private = { ignore = true }
version = 2
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"MIT",
"MPL-2.0", # MPL is only copyleft in regards to modifications to the original, adding as a dependency is fine
Expand Down
5 changes: 4 additions & 1 deletion lib/test_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ doctest = false
path = "test_common.rs"

[dependencies]
bytes.workspace = true
futures.workspace = true
futures-timer.workspace = true
hyper = { workspace = true, features = ["server"] }
hyper = { workspace = true, features = ["http1", "http2"] }
hyper-util = { workspace = true, features = ["tokio", "server", "http1", "http2"] }
http.workspace = true
http-body-util.workspace = true
parking_lot = "0.12"
tokio = { workspace = true, features = ["full"] }
url.workspace = true
Expand Down
58 changes: 39 additions & 19 deletions lib/test_common/test_common.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use log::{debug, info};
use std::{future::Future, io, str::FromStr, sync::Arc, time::Duration};
use std::{future::Future, io, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
use tokio::net::TcpListener;

use bytes::Bytes;
use futures::{channel::oneshot, future::select, FutureExt};
use futures_timer::Delay;
use http::{header, StatusCode};
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Error, Request, Response, Server,
use http_body_util::{BodyExt, Empty};
use hyper::{body::Incoming as Body, service::service_fn, Error, Request, Response};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as HyperBuilder,
};
use parking_lot::Mutex;
use url::Url;

async fn echo_route(req: Request<Body>) -> Response<Body> {
type HyperBody = http_body_util::combinators::BoxBody<Bytes, Error>;

async fn echo_route(req: Request<Body>) -> Response<HyperBody> {
let headers = req.headers();
let content_type = headers
.get(header::CONTENT_TYPE)
Expand All @@ -36,24 +41,24 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
if echo.is_some() {
debug!("Echo Body = {}", echo.clone().unwrap_or_default());
}
let mut response = match (req.method(), echo) {
let mut response: Response<HyperBody> = match (req.method(), echo) {
(&http::Method::GET, Some(b)) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type)
.body(b.into())
.body(b.map_err(|never| match never {}).boxed())
.unwrap(),
(&http::Method::POST, _) | (&http::Method::PUT, _) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type)
.body(req.into_body())
.body(req.into_body().boxed())
.unwrap(),
_ => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.body(empty())
.unwrap(),
};
let ms = wait.and_then(|c| FromStr::from_str(&c).ok()).unwrap_or(0);
let old_body = std::mem::replace(response.body_mut(), Body::empty());
let old_body = std::mem::replace(response.body_mut(), empty());
if ms > 0 {
debug!("waiting {} ms", ms);
}
Expand All @@ -62,13 +67,22 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
response
}

pub fn start_test_server(
fn empty() -> HyperBody {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

pub async fn start_test_server(
port: Option<u16>,
) -> (u16, oneshot::Sender<()>, impl Future<Output = ()>) {
let port = port.unwrap_or(0);
let address = ([127, 0, 0, 1], port).into();
let address: SocketAddr = ([127, 0, 0, 1], port).into();

let listener = TcpListener::bind(address).await.unwrap();
let local_addr = listener.local_addr().unwrap();

let make_svc = make_service_fn(|_: &AddrStream| async {
let server = tokio::spawn(async move {
let service = service_fn(|req: Request<Body>| async {
debug!("{:?}", req);
let method = req.method().to_string();
Expand All @@ -78,7 +92,7 @@ pub fn start_test_server(
"/" => echo_route(req).await,
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.body(empty())
.unwrap(),
};
debug!("{:?}", response);
Expand All @@ -92,14 +106,20 @@ pub fn start_test_server(
);
Ok::<_, Error>(response)
});
Ok::<_, Error>(service)

loop {
let (stream, _) = listener.accept().await.unwrap();
let stream = TokioIo::new(stream);
tokio::task::spawn(async move {
let builder = HyperBuilder::new(TokioExecutor::new());
builder.serve_connection(stream, service).await.unwrap();
});
}
});

let (tx, rx) = oneshot::channel();

let server = Server::bind(&address).serve(make_svc);

let port = server.local_addr().port();
let port = local_addr.port();

let future = select(server, rx);

Expand Down
2 changes: 1 addition & 1 deletion src/bin/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {
rt.block_on(async {
let port = std::env::var("PORT").ok().and_then(|s| s.parse().ok());
debug!("port = {}", port.unwrap_or_default());
let (port, rx, handle) = start_test_server(port);
let (port, rx, handle) = start_test_server(port).await;

println!("Listening on port {port}");

Expand Down
20 changes: 14 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ use futures::{
stream, FutureExt, Stream, StreamExt,
};
use futures_timer::Delay;
use hyper::{client::HttpConnector, Body, Client};
use http_body_util::combinators::BoxBody;
use hyper_tls::HttpsConnector;
use hyper_util::{
client::legacy::{
connect::{dns::GaiResolver, HttpConnector},
Client,
},
rt::TokioExecutor,
};
use itertools::Itertools;
use line_writer::{blocking_writer, MsgType};
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -60,6 +67,8 @@ use std::{
time::{Duration, Instant},
};

type Body = BoxBody<bytes::Bytes, std::io::Error>;

struct Endpoints {
inner: Vec<(
BTreeMap<Arc<str>, String>,
Expand Down Expand Up @@ -1175,16 +1184,15 @@ fn create_load_test_future(

pub(crate) fn create_http_client(
keepalive: Duration,
) -> Result<
Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>,
TestError,
> {
) -> Result<Client<HttpsConnector<HttpConnector<GaiResolver>>, Body>, TestError> {
let mut http = HttpConnector::new();
http.set_keepalive(Some(keepalive));
http.set_reuse_address(true);
http.enforce_http(false);
let https = HttpsConnector::from((http, TlsConnector::new()?.into()));
Ok(Client::builder().set_host(false).build::<_, Body>(https))
Ok(Client::builder(TokioExecutor::new())
.set_host(false)
.build::<_, Body>(https))
}

type ProvidersResult =
Expand Down
35 changes: 24 additions & 11 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ use futures::{
sink::SinkExt,
stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use hyper::{
client::HttpConnector,
header::{Entry as HeaderEntry, HeaderName, HeaderValue, CONTENT_DISPOSITION},
Body as HyperBody, Client, Method, Response,
Method, Response,
};
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::{
connect::{dns::GaiResolver, HttpConnector},
Client,
};
use rand::distributions::{Alphanumeric, Distribution};
use select_any::select_any;
use serde_json as json;
Expand Down Expand Up @@ -62,6 +66,8 @@ use std::{
time::{Duration, Instant},
};

type HyperBody = BoxBody<Bytes, std::io::Error>;

#[derive(Clone)]
pub struct AutoReturn {
send_option: ProviderSend,
Expand Down Expand Up @@ -206,8 +212,7 @@ pub struct BuilderContext {
#[allow(dead_code)]
pub config_path: PathBuf,
// the http client
pub client:
Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
pub client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
// a mapping of names to their prospective providers
pub providers: Arc<BTreeMap<Arc<str>, providers::Provider>>,
// a mapping of names to their prospective loggers
Expand Down Expand Up @@ -504,7 +509,7 @@ fn multipart_body_as_hyper_body(
let piece_stream = future::ok(Bytes::from(piece_data)).into_stream();
tweak_path(&mut body, path);
let a = create_file_hyper_body(body).map_ok(move |(bytes, body)| {
let stream = piece_stream.chain(body).a();
let stream = piece_stream.chain(body.into_data_stream()).a();
(bytes + piece_data_bytes, stream)
});
Either::A(a)
Expand Down Expand Up @@ -542,7 +547,9 @@ fn multipart_body_as_hyper_body(
.flatten()
.chain(stream::once(future::ok(closing_boundary)));

(bytes, HyperBody::wrap_stream(stream))
let body: HyperBody =
BodyExt::boxed(StreamBody::new(stream.map_ok(hyper::body::Frame::data)));
(bytes, body)
});
Ok(ret)
}
Expand Down Expand Up @@ -572,7 +579,9 @@ async fn create_file_hyper_body(filename: String) -> Result<(u64, HyperBody), Te
}
});

let body = HyperBody::wrap_stream(stream);
let body: HyperBody = BodyExt::boxed(StreamBody::new(
stream.map_ok(|x| hyper::body::Frame::data(x.into())),
));
Ok((bytes, body))
}

Expand All @@ -590,7 +599,7 @@ fn body_template_as_hyper_body(
return Either3::A(future::ready(r).and_then(|x| x));
}
Some(EndPointBody::String(t)) => t,
None => return Either3::B(future::ok((0, HyperBody::empty()))),
None => return Either3::B(future::ok((0, BoxBody::default()))),
};
let mut body = match template.evaluate(Cow::Borrowed(template_values.as_json()) /*, None*/) {
Ok(b) => b,
Expand All @@ -605,7 +614,10 @@ fn body_template_as_hyper_body(
Either3::C(create_file_hyper_body(body))
} else {
*body_value = Some(body.clone());
Either3::B(future::ok((body.as_bytes().len() as u64, body.into())))
Either3::B(future::ok((
body.as_bytes().len() as u64,
body.map_err(|never| match never {}).boxed(),
)))
}
}

Expand All @@ -618,7 +630,7 @@ pub type StatsTx = futures_channel::UnboundedSender<stats::StatsMessage>;

pub struct Endpoint {
body: Option<EndPointBody>,
client: Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
headers: config::Headers<True>,
max_parallel_requests: Option<NonZeroUsize>,
method: Method,
Expand Down Expand Up @@ -909,7 +921,8 @@ mod tests {
let (_, body) = create_file_hyper_body("tests/test.jpg".to_string())
.await
.unwrap();
body.map(|b| stream::iter(b.unwrap()))
body.into_data_stream()
.map(|b| stream::iter(b.unwrap()))
.flatten()
.collect::<Vec<_>>()
.await
Expand Down
Loading

0 comments on commit adc9dc8

Please sign in to comment.