diff --git a/src/lib.rs b/src/lib.rs index 287170f0..f7251590 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ use futures::{ stream, FutureExt, Stream, StreamExt, }; use futures_timer::Delay; -use hyper::body::Incoming as Body; +use http_body_util::combinators::BoxBody; use hyper_tls::HttpsConnector; use hyper_util::{ client::legacy::{ @@ -64,6 +64,8 @@ use std::{ time::{Duration, Instant}, }; +type Body = BoxBody; + struct Endpoints { // yaml index of the endpoint, (endpoint tags, builder) inner: Vec<( diff --git a/src/request.rs b/src/request.rs index ae746520..e2d1970a 100644 --- a/src/request.rs +++ b/src/request.rs @@ -18,9 +18,8 @@ use futures::{ sink::SinkExt, stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, }; -use http_body_util::{combinators::BoxBody, BodyExt, Empty, StreamBody}; +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; use hyper::{ - body::Incoming as HyperBody, header::{Entry as HeaderEntry, HeaderName, HeaderValue, CONTENT_DISPOSITION}, Method, Response, }; @@ -60,6 +59,8 @@ use std::{ time::{Duration, Instant}, }; +type HyperBody = BoxBody; + #[derive(Clone)] pub struct AutoReturn { send_option: EndpointProvidesSendOptions, @@ -545,9 +546,9 @@ fn multipart_body_as_hyper_body( .flatten() .chain(stream::once(future::ok(closing_boundary))); - // let body = StreamBody::new(stream); - // let body: Incoming = body.into(); - (bytes, HyperBody::wrap_stream(stream)) + let body: HyperBody = + BodyExt::boxed(StreamBody::new(stream.map_ok(hyper::body::Frame::data))); + (bytes, body) }); Ok(ret) } @@ -577,8 +578,9 @@ async fn create_file_hyper_body(filename: String) -> Result<(u64, HyperBody), Te } }); - // let body = StreamBody::new(stream); - let body = HyperBody::wrap_stream(stream); + let body: HyperBody = BodyExt::boxed(StreamBody::new( + stream.map_ok(|x| hyper::body::Frame::data(x.into())), + )); Ok((bytes, body)) } @@ -601,7 +603,7 @@ fn body_template_as_hyper_body( ); return Either3::A(future::ready(r).and_then(|x| x)); } - BodyTemplate::None => return Either3::B(future::ok((0, Empty::::new()))), + BodyTemplate::None => return Either3::B(future::ok((0, BoxBody::default()))), BodyTemplate::String(t) => t, }; let mut body = match template.evaluate(Cow::Borrowed(template_values.as_json()), None) { @@ -618,7 +620,10 @@ fn body_template_as_hyper_body( if copy_body_value { *body_value = Some(body.clone()); } - Either3::B(future::ok((body.as_bytes().len() as u64, body.into()))) + Either3::B(future::ok(( + body.as_bytes().len() as u64, + body.map_err(|never| match never {}).boxed(), + ))) } } @@ -928,7 +933,8 @@ mod tests { let (_, body) = create_file_hyper_body("tests/test.jpg".to_string()) .await .unwrap(); - body.map(|b| stream::iter(b.unwrap())) + body.into_data_stream() + .map(|b| stream::iter(b.unwrap())) .flatten() .collect::>() .await diff --git a/src/request/request_maker.rs b/src/request/request_maker.rs index 883b503e..e7bf39d3 100644 --- a/src/request/request_maker.rs +++ b/src/request/request_maker.rs @@ -11,8 +11,8 @@ use futures::{ FutureExt, TryFutureExt, }; use futures_timer::Delay; +use http_body_util::{combinators::BoxBody, BodyExt}; use hyper::{ - body::Incoming, header::{HeaderMap, HeaderName, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, HOST}, Method, Request, }; @@ -45,7 +45,9 @@ pub(super) struct RequestMaker { pub(super) headers: Vec<(String, Template)>, pub(super) body: BodyTemplate, pub(super) rr_providers: u16, - pub(super) client: Arc>, Incoming>>, + pub(super) client: Arc< + Client>, BoxBody>, + >, pub(super) stats_tx: StatsTx, pub(super) no_auto_returns: bool, pub(super) outgoing: Arc>, @@ -316,7 +318,7 @@ impl RequestMaker { stats_tx, tags, }; - rh.handle(response, auto_returns) + rh.handle(hyper::Response::new(response.into_body().boxed().map_err(std::io::Error::other).boxed()), auto_returns) .map_err(TestError::from) }) .or_else(move |r| { diff --git a/src/request/response_handler.rs b/src/request/response_handler.rs index a70f1112..0062fe30 100644 --- a/src/request/response_handler.rs +++ b/src/request/response_handler.rs @@ -2,6 +2,7 @@ use super::*; use config::{RESPONSE_BODY, RESPONSE_HEADERS, RESPONSE_HEADERS_ALL, RESPONSE_STARTLINE, STATS}; use futures::TryStreamExt; +use http_body_util::{combinators::BoxBody, BodyExt}; pub(super) struct ResponseHandler { pub(super) provider_delays: ProviderDelays, @@ -19,7 +20,7 @@ impl ResponseHandler { // https://github.com/rust-lang/rust/issues/71723 pub(super) fn handle( self, - response: hyper::Response, + response: hyper::Response>, auto_returns: Option, ) -> impl Future> where @@ -86,7 +87,7 @@ impl ResponseHandler { ) { (true, Some(ce)) => { let body = response - .into_body() + .into_data_stream() .map_err(|e| RecoverableError::BodyErr(Arc::new(e))); let br = body_reader::BodyReader::new(ce); let body_buffer = bytes::BytesMut::new(); @@ -109,7 +110,7 @@ impl ResponseHandler { _ => { // when we don't need the body, skip parsing it, but make sure we get it all response - .into_body() + .into_data_stream() .map_err(|e| RecoverableError::BodyErr(Arc::new(e))) .try_fold((), |_, _| future::ok(())) .map_ok(|_| None) @@ -143,7 +144,7 @@ fn handle_response_requirements( bitwise: u16, response_fields_added: &mut u16, rp: &mut json::map::Map, - response: &Response, + response: &Response>, ) { // check if we need the response startline and it hasn't already been set if ((bitwise & RESPONSE_STARTLINE) ^ (*response_fields_added & RESPONSE_STARTLINE)) != 0 {