Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Rust hyper and http #237

Merged
merged 12 commits into from
Jul 31, 2024
Merged
253 changes: 176 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ for_each_parallel = { path = "./lib/for_each_parallel" }
futures = "0.3"
futures-timer = "3"
hdrhistogram = "7"
http = "0.2"
hyper = { version = "0.14", features = ["client", "http1", "http2", "stream"] }
hyper-tls = "0.5"
http = "1"
hyper = { version = "1", features = ["client", "http1", "http2"] }
hyper-tls = "0.6"
hyper-util = { version = "0.1", features = ["tokio", "client", "http1", "http2"] }
http-body-util = "0.1"
itertools = "0.13"
mod_interval = { path = "./lib/mod_interval" }
native-tls = "0.2"
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private = { ignore = true }
version = 2
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"MIT",
"Unicode-DFS-2016",
Expand Down
2 changes: 1 addition & 1 deletion lib/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ doctest = false
base64 = "0.22"
ether = { path = "../either" }
futures = "0.3"
http = "0.2"
http = "1"
itertools = "0.13"
jsonpath_lib = "0.3.0"
percent-encoding = "2"
Expand Down
7 changes: 5 additions & 2 deletions lib/test_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ doctest = false
path = "test_common.rs"

[dependencies]
bytes = "1"
futures = "0.3"
futures-timer = "3"
hyper = { version = "0.14", features = ["server"] }
http = "0.2"
hyper = { version = "1", features = ["http1", "http2"] }
hyper-util = { version = "0.1", features = ["tokio", "server", "http1", "http2"] }
http = "1"
http-body-util = "0.1"
parking_lot = "0.12"
tokio = { version = "1", features = ["full"] }
url = "2"
Expand Down
58 changes: 39 additions & 19 deletions lib/test_common/test_common.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use log::{debug, info};
use std::{future::Future, io, str::FromStr, sync::Arc, time::Duration};
use std::{future::Future, io, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
use tokio::net::TcpListener;

use bytes::Bytes;
use futures::{channel::oneshot, future::select, FutureExt};
use futures_timer::Delay;
use http::{header, StatusCode};
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Error, Request, Response, Server,
use http_body_util::{BodyExt, Empty};
use hyper::{body::Incoming as Body, service::service_fn, Error, Request, Response};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as HyperBuilder,
};
use parking_lot::Mutex;
use url::Url;

async fn echo_route(req: Request<Body>) -> Response<Body> {
type HyperBody = http_body_util::combinators::BoxBody<Bytes, Error>;

async fn echo_route(req: Request<Body>) -> Response<HyperBody> {
let headers = req.headers();
let content_type = headers
.get(header::CONTENT_TYPE)
Expand All @@ -36,24 +41,24 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
if echo.is_some() {
debug!("Echo Body = {}", echo.clone().unwrap_or_default());
}
let mut response = match (req.method(), echo) {
let mut response: Response<HyperBody> = match (req.method(), echo) {
(&http::Method::GET, Some(b)) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type)
.body(b.into())
.body(b.map_err(|never| match never {}).boxed())
.unwrap(),
(&http::Method::POST, _) | (&http::Method::PUT, _) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type)
.body(req.into_body())
.body(req.into_body().boxed())
.unwrap(),
_ => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.body(empty())
.unwrap(),
};
let ms = wait.and_then(|c| FromStr::from_str(&c).ok()).unwrap_or(0);
let old_body = std::mem::replace(response.body_mut(), Body::empty());
let old_body = std::mem::replace(response.body_mut(), empty());
if ms > 0 {
debug!("waiting {} ms", ms);
}
Expand All @@ -62,13 +67,22 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
response
}

pub fn start_test_server(
fn empty() -> HyperBody {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

pub async fn start_test_server(
port: Option<u16>,
) -> (u16, oneshot::Sender<()>, impl Future<Output = ()>) {
let port = port.unwrap_or(0);
let address = ([127, 0, 0, 1], port).into();
let address: SocketAddr = ([127, 0, 0, 1], port).into();

let listener = TcpListener::bind(address).await.unwrap();
let local_addr = listener.local_addr().unwrap();

let make_svc = make_service_fn(|_: &AddrStream| async {
let server = tokio::spawn(async move {
let service = service_fn(|req: Request<Body>| async {
debug!("{:?}", req);
let method = req.method().to_string();
Expand All @@ -78,7 +92,7 @@ pub fn start_test_server(
"/" => echo_route(req).await,
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.body(empty())
.unwrap(),
};
debug!("{:?}", response);
Expand All @@ -92,14 +106,20 @@ pub fn start_test_server(
);
Ok::<_, Error>(response)
});
Ok::<_, Error>(service)

loop {
let (stream, _) = listener.accept().await.unwrap();
let stream = TokioIo::new(stream);
tokio::task::spawn(async move {
let builder = HyperBuilder::new(TokioExecutor::new());
builder.serve_connection(stream, service).await.unwrap();
});
}
});

let (tx, rx) = oneshot::channel();

let server = Server::bind(&address).serve(make_svc);

let port = server.local_addr().port();
let port = local_addr.port();

let future = select(server, rx);

Expand Down
2 changes: 1 addition & 1 deletion src/bin/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {
rt.block_on(async {
let port = std::env::var("PORT").ok().and_then(|s| s.parse().ok());
debug!("port = {}", port.unwrap_or_default());
let (port, rx, handle) = start_test_server(port);
let (port, rx, handle) = start_test_server(port).await;

println!("Listening on port {port}");

Expand Down
20 changes: 14 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ use futures::{
stream, FutureExt, Stream, StreamExt,
};
use futures_timer::Delay;
use hyper::{client::HttpConnector, Body, Client};
use http_body_util::combinators::BoxBody;
use hyper_tls::HttpsConnector;
use hyper_util::{
client::legacy::{
connect::{dns::GaiResolver, HttpConnector},
Client,
},
rt::TokioExecutor,
};
use itertools::Itertools;
use line_writer::{blocking_writer, MsgType};
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -57,6 +64,8 @@ use std::{
time::{Duration, Instant},
};

type Body = BoxBody<bytes::Bytes, std::io::Error>;

struct Endpoints {
// yaml index of the endpoint, (endpoint tags, builder)
inner: Vec<(
Expand Down Expand Up @@ -1139,16 +1148,15 @@ fn create_load_test_future(

pub(crate) fn create_http_client(
keepalive: Duration,
) -> Result<
Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>,
TestError,
> {
) -> Result<Client<HttpsConnector<HttpConnector<GaiResolver>>, Body>, TestError> {
let mut http = HttpConnector::new();
http.set_keepalive(Some(keepalive));
http.set_reuse_address(true);
http.enforce_http(false);
let https = HttpsConnector::from((http, TlsConnector::new()?.into()));
Ok(Client::builder().set_host(false).build::<_, Body>(https))
Ok(Client::builder(TokioExecutor::new())
.set_host(false)
.build::<_, Body>(https))
}

type ProvidersResult = Result<(BTreeMap<String, providers::Provider>, BTreeSet<String>), TestError>;
Expand Down
35 changes: 24 additions & 11 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ use futures::{
sink::SinkExt,
stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use hyper::{
client::HttpConnector,
header::{Entry as HeaderEntry, HeaderName, HeaderValue, CONTENT_DISPOSITION},
Body as HyperBody, Client, Method, Response,
Method, Response,
};
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::{
connect::{dns::GaiResolver, HttpConnector},
Client,
};
use rand::distributions::{Alphanumeric, Distribution};
use select_any::select_any;
use serde_json as json;
Expand Down Expand Up @@ -55,6 +59,8 @@ use std::{
time::{Duration, Instant},
};

type HyperBody = BoxBody<Bytes, std::io::Error>;

#[derive(Clone)]
pub struct AutoReturn {
send_option: EndpointProvidesSendOptions,
Expand Down Expand Up @@ -203,8 +209,7 @@ pub struct BuilderContext {
#[allow(dead_code)]
pub config_path: PathBuf,
// the http client
pub client:
Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
pub client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
// a mapping of names to their prospective providers
pub providers: Arc<BTreeMap<String, providers::Provider>>,
// a mapping of names to their prospective loggers
Expand Down Expand Up @@ -498,7 +503,7 @@ fn multipart_body_as_hyper_body(
let piece_stream = future::ok(Bytes::from(piece_data)).into_stream();
tweak_path(&mut body, &multipart_body.path);
let a = create_file_hyper_body(body).map_ok(move |(bytes, body)| {
let stream = piece_stream.chain(body).a();
let stream = piece_stream.chain(body.into_data_stream()).a();
(bytes + piece_data_bytes, stream)
});
Either::A(a)
Expand Down Expand Up @@ -539,7 +544,9 @@ fn multipart_body_as_hyper_body(
.flatten()
.chain(stream::once(future::ok(closing_boundary)));

(bytes, HyperBody::wrap_stream(stream))
let body: HyperBody =
BodyExt::boxed(StreamBody::new(stream.map_ok(hyper::body::Frame::data)));
(bytes, body)
});
Ok(ret)
}
Expand Down Expand Up @@ -569,7 +576,9 @@ async fn create_file_hyper_body(filename: String) -> Result<(u64, HyperBody), Te
}
});

let body = HyperBody::wrap_stream(stream);
let body: HyperBody = BodyExt::boxed(StreamBody::new(
stream.map_ok(|x| hyper::body::Frame::data(x.into())),
));
Ok((bytes, body))
}

Expand All @@ -592,7 +601,7 @@ fn body_template_as_hyper_body(
);
return Either3::A(future::ready(r).and_then(|x| x));
}
BodyTemplate::None => return Either3::B(future::ok((0, HyperBody::empty()))),
BodyTemplate::None => return Either3::B(future::ok((0, BoxBody::default()))),
BodyTemplate::String(t) => t,
};
let mut body = match template.evaluate(Cow::Borrowed(template_values.as_json()), None) {
Expand All @@ -609,7 +618,10 @@ fn body_template_as_hyper_body(
if copy_body_value {
*body_value = Some(body.clone());
}
Either3::B(future::ok((body.as_bytes().len() as u64, body.into())))
Either3::B(future::ok((
body.as_bytes().len() as u64,
body.map_err(|never| match never {}).boxed(),
)))
}
}

Expand All @@ -622,7 +634,7 @@ pub type StatsTx = futures_channel::UnboundedSender<stats::StatsMessage>;

pub struct Endpoint {
body: BodyTemplate,
client: Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
headers: Vec<(String, Template)>,
max_parallel_requests: Option<NonZeroUsize>,
method: Method,
Expand Down Expand Up @@ -919,7 +931,8 @@ mod tests {
let (_, body) = create_file_hyper_body("tests/test.jpg".to_string())
.await
.unwrap();
body.map(|b| stream::iter(b.unwrap()))
body.into_data_stream()
.map(|b| stream::iter(b.unwrap()))
.flatten()
.collect::<Vec<_>>()
.await
Expand Down
Loading