Skip to content

Commit

Permalink
Updated code to handle hyper v1 and http v1
Browse files Browse the repository at this point in the history
  • Loading branch information
tkmcmaster committed Jul 31, 2024
1 parent 2ce140c commit 0764b01
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 18 deletions.
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::{
stream, FutureExt, Stream, StreamExt,
};
use futures_timer::Delay;
use hyper::body::Incoming as Body;
use http_body_util::combinators::BoxBody;
use hyper_tls::HttpsConnector;
use hyper_util::{
client::legacy::{
Expand Down Expand Up @@ -64,6 +64,8 @@ use std::{
time::{Duration, Instant},
};

type Body = BoxBody<bytes::Bytes, std::io::Error>;

struct Endpoints {
// yaml index of the endpoint, (endpoint tags, builder)
inner: Vec<(
Expand Down
26 changes: 16 additions & 10 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use futures::{
sink::SinkExt,
stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use http_body_util::{combinators::BoxBody, BodyExt, Empty, StreamBody};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use hyper::{
body::Incoming as HyperBody,
header::{Entry as HeaderEntry, HeaderName, HeaderValue, CONTENT_DISPOSITION},
Method, Response,
};
Expand Down Expand Up @@ -60,6 +59,8 @@ use std::{
time::{Duration, Instant},
};

type HyperBody = BoxBody<Bytes, std::io::Error>;

#[derive(Clone)]
pub struct AutoReturn {
send_option: EndpointProvidesSendOptions,
Expand Down Expand Up @@ -545,9 +546,9 @@ fn multipart_body_as_hyper_body(
.flatten()
.chain(stream::once(future::ok(closing_boundary)));

// let body = StreamBody::new(stream);
// let body: Incoming = body.into();
(bytes, HyperBody::wrap_stream(stream))
let body: HyperBody =
BodyExt::boxed(StreamBody::new(stream.map_ok(hyper::body::Frame::data)));
(bytes, body)
});
Ok(ret)
}
Expand Down Expand Up @@ -577,8 +578,9 @@ async fn create_file_hyper_body(filename: String) -> Result<(u64, HyperBody), Te
}
});

// let body = StreamBody::new(stream);
let body = HyperBody::wrap_stream(stream);
let body: HyperBody = BodyExt::boxed(StreamBody::new(
stream.map_ok(|x| hyper::body::Frame::data(x.into())),
));
Ok((bytes, body))
}

Expand All @@ -601,7 +603,7 @@ fn body_template_as_hyper_body(
);
return Either3::A(future::ready(r).and_then(|x| x));
}
BodyTemplate::None => return Either3::B(future::ok((0, Empty::<Bytes>::new()))),
BodyTemplate::None => return Either3::B(future::ok((0, BoxBody::default()))),
BodyTemplate::String(t) => t,
};
let mut body = match template.evaluate(Cow::Borrowed(template_values.as_json()), None) {
Expand All @@ -618,7 +620,10 @@ fn body_template_as_hyper_body(
if copy_body_value {
*body_value = Some(body.clone());
}
Either3::B(future::ok((body.as_bytes().len() as u64, body.into())))
Either3::B(future::ok((
body.as_bytes().len() as u64,
body.map_err(|never| match never {}).boxed(),
)))
}
}

Expand Down Expand Up @@ -928,7 +933,8 @@ mod tests {
let (_, body) = create_file_hyper_body("tests/test.jpg".to_string())
.await
.unwrap();
body.map(|b| stream::iter(b.unwrap()))
body.into_data_stream()
.map(|b| stream::iter(b.unwrap()))
.flatten()
.collect::<Vec<_>>()
.await
Expand Down
8 changes: 5 additions & 3 deletions src/request/request_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use futures::{
FutureExt, TryFutureExt,
};
use futures_timer::Delay;
use http_body_util::{combinators::BoxBody, BodyExt};
use hyper::{
body::Incoming,
header::{HeaderMap, HeaderName, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, HOST},
Method, Request,
};
Expand Down Expand Up @@ -45,7 +45,9 @@ pub(super) struct RequestMaker {
pub(super) headers: Vec<(String, Template)>,
pub(super) body: BodyTemplate,
pub(super) rr_providers: u16,
pub(super) client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, Incoming>>,
pub(super) client: Arc<
Client<HttpsConnector<HttpConnector<GaiResolver>>, BoxBody<bytes::Bytes, std::io::Error>>,
>,
pub(super) stats_tx: StatsTx,
pub(super) no_auto_returns: bool,
pub(super) outgoing: Arc<Vec<Outgoing>>,
Expand Down Expand Up @@ -316,7 +318,7 @@ impl RequestMaker {
stats_tx,
tags,
};
rh.handle(response, auto_returns)
rh.handle(hyper::Response::new(response.into_body().boxed().map_err(std::io::Error::other).boxed()), auto_returns)
.map_err(TestError::from)
})
.or_else(move |r| {
Expand Down
9 changes: 5 additions & 4 deletions src/request/response_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::*;

use config::{RESPONSE_BODY, RESPONSE_HEADERS, RESPONSE_HEADERS_ALL, RESPONSE_STARTLINE, STATS};
use futures::TryStreamExt;
use http_body_util::{combinators::BoxBody, BodyExt};

pub(super) struct ResponseHandler {
pub(super) provider_delays: ProviderDelays,
Expand All @@ -19,7 +20,7 @@ impl ResponseHandler {
// https://github.com/rust-lang/rust/issues/71723
pub(super) fn handle<F>(
self,
response: hyper::Response<HyperBody>,
response: hyper::Response<BoxBody<bytes::Bytes, std::io::Error>>,
auto_returns: Option<F>,
) -> impl Future<Output = Result<(), RecoverableError>>
where
Expand Down Expand Up @@ -86,7 +87,7 @@ impl ResponseHandler {
) {
(true, Some(ce)) => {
let body = response
.into_body()
.into_data_stream()
.map_err(|e| RecoverableError::BodyErr(Arc::new(e)));
let br = body_reader::BodyReader::new(ce);
let body_buffer = bytes::BytesMut::new();
Expand All @@ -109,7 +110,7 @@ impl ResponseHandler {
_ => {
// when we don't need the body, skip parsing it, but make sure we get it all
response
.into_body()
.into_data_stream()
.map_err(|e| RecoverableError::BodyErr(Arc::new(e)))
.try_fold((), |_, _| future::ok(()))
.map_ok(|_| None)
Expand Down Expand Up @@ -143,7 +144,7 @@ fn handle_response_requirements(
bitwise: u16,
response_fields_added: &mut u16,
rp: &mut json::map::Map<String, json::Value>,
response: &Response<HyperBody>,
response: &Response<BoxBody<bytes::Bytes, std::io::Error>>,
) {
// check if we need the response startline and it hasn't already been set
if ((bitwise & RESPONSE_STARTLINE) ^ (*response_fields_added & RESPONSE_STARTLINE)) != 0 {
Expand Down

0 comments on commit 0764b01

Please sign in to comment.