diff --git a/viz-core/Cargo.toml b/viz-core/Cargo.toml index b740ef11..3b82b253 100644 --- a/viz-core/Cargo.toml +++ b/viz-core/Cargo.toml @@ -99,6 +99,7 @@ tokio = { workspace = true, optional = true } tokio-tungstenite = { workspace = true, optional = true } tokio-stream = { workspace = true, optional = true } tokio-util = { workspace = true, optional = true } +sync_wrapper = { version = "0.1.2", git = "https://github.com/fundon/sync_wrapper.git", rev = "e2ce8d8" } [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/viz-core/src/body.rs b/viz-core/src/body.rs index d00a40ef..2727c13e 100644 --- a/viz-core/src/body.rs +++ b/viz-core/src/body.rs @@ -4,8 +4,9 @@ use std::{ }; use futures_util::{Stream, TryStreamExt}; -use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody}; +use http_body_util::{combinators::UnsyncBoxBody, BodyExt, Full, StreamBody}; use hyper::body::{Body, Frame, Incoming, SizeHint}; +use sync_wrapper::SyncWrapper; use crate::{BoxError, Bytes, Error, Result}; @@ -123,26 +124,26 @@ pub enum OutgoingBody { /// A body that consists of a single chunk. Full(Full), /// A boxed [`Body`] trait object. - Boxed(BoxBody), + Boxed(SyncWrapper>), } impl OutgoingBody { /// A body created from a [`Stream`]. - pub fn streaming(stream: S) -> Self + pub fn stream(stream: S) -> Self where - S: Stream> + Send + Sync + 'static, + S: Stream> + Send + 'static, D: Into + 'static, E: Into + 'static, { - Self::Boxed( + Self::Boxed(SyncWrapper::new( StreamBody::new( stream .map_ok(Into::into) .map_ok(Frame::data) .map_err(Into::into), ) - .boxed(), - ) + .boxed_unsync(), + )) } } @@ -164,7 +165,7 @@ impl Body for OutgoingBody { match self.get_mut() { Self::Empty => Poll::Ready(None), Self::Full(full) => Pin::new(full).poll_frame(cx).map_err(Error::from), - Self::Boxed(body) => Pin::new(body).poll_frame(cx), + Self::Boxed(body) => Pin::new(body.get_mut()).poll_frame(cx), } } @@ -173,7 +174,7 @@ impl Body for OutgoingBody { match self { Self::Empty => true, Self::Full(full) => full.is_end_stream(), - Self::Boxed(body) => body.is_end_stream(), + Self::Boxed(wrapper) => wrapper.get_ref().is_end_stream(), } } @@ -182,7 +183,7 @@ impl Body for OutgoingBody { match self { Self::Empty => SizeHint::with_exact(0), Self::Full(full) => full.size_hint(), - Self::Boxed(body) => body.size_hint(), + Self::Boxed(wrapper) => wrapper.get_ref().size_hint(), } } } @@ -197,7 +198,7 @@ impl Stream for OutgoingBody { Self::Full(full) => Pin::new(full) .poll_frame(cx) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, - Self::Boxed(body) => Pin::new(body) + Self::Boxed(wrapper) => Pin::new(wrapper.get_mut()) .poll_frame(cx) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?, } { @@ -212,7 +213,7 @@ impl Stream for OutgoingBody { let sh = match self { Self::Empty => return (0, Some(0)), Self::Full(full) => full.size_hint(), - Self::Boxed(body) => body.size_hint(), + Self::Boxed(wrapper) => wrapper.get_ref().size_hint(), }; ( usize::try_from(sh.lower()).unwrap_or(usize::MAX), @@ -233,8 +234,8 @@ impl From> for OutgoingBody { } } -impl From> for OutgoingBody { - fn from(value: BoxBody) -> Self { - Self::Boxed(value) +impl From> for OutgoingBody { + fn from(value: UnsyncBoxBody) -> Self { + Self::Boxed(SyncWrapper::new(value)) } } diff --git a/viz-core/src/middleware/compression.rs b/viz-core/src/middleware/compression.rs index f41a0e46..e79c952e 100644 --- a/viz-core/src/middleware/compression.rs +++ b/viz-core/src/middleware/compression.rs @@ -80,15 +80,11 @@ impl IntoResponse for Compress { res = res.map(|body| { let body = StreamReader::new(body); if self.algo == ContentCoding::Gzip { - OutgoingBody::streaming(ReaderStream::new(bufread::GzipEncoder::new(body))) + OutgoingBody::stream(ReaderStream::new(bufread::GzipEncoder::new(body))) } else if self.algo == ContentCoding::Deflate { - OutgoingBody::streaming(ReaderStream::new(bufread::DeflateEncoder::new( - body, - ))) + OutgoingBody::stream(ReaderStream::new(bufread::DeflateEncoder::new(body))) } else { - OutgoingBody::streaming(ReaderStream::new(bufread::BrotliEncoder::new( - body, - ))) + OutgoingBody::stream(ReaderStream::new(bufread::BrotliEncoder::new(body))) } }); res.headers_mut() diff --git a/viz-core/src/response.rs b/viz-core/src/response.rs index 0701c6c8..2993ad86 100644 --- a/viz-core/src/response.rs +++ b/viz-core/src/response.rs @@ -23,6 +23,12 @@ pub trait ResponseExt: private::Sealed + Sized { /// [mdn]: fn ok(&self) -> bool; + /// Creates a response with an empty body. + #[must_use] + fn empty() -> Response { + Response::new(OutgoingBody::default()) + } + /// The response with the specified [`Content-Type`][mdn]. /// /// [mdn]: @@ -81,11 +87,11 @@ pub trait ResponseExt: private::Sealed + Sized { /// Responds to a stream. fn stream(stream: S) -> Response where - S: Stream> + Send + Sync + 'static, + S: Stream> + Send + 'static, D: Into + 'static, E: Into + 'static, { - Response::new(OutgoingBody::streaming(stream)) + Response::new(OutgoingBody::stream(stream)) } /// Downloads transfers the file from path as an attachment. @@ -223,7 +229,7 @@ impl ResponseExt for Response { .escape_default(); let mut resp = Self::attachment(&format!("attachment; filename=\"{value}\"")); - *resp.body_mut() = OutgoingBody::streaming(tokio_util::io::ReaderStream::new( + *resp.body_mut() = OutgoingBody::stream(tokio_util::io::ReaderStream::new( tokio::fs::File::open(path).await.map_err(Error::from)?, )); Ok(resp) diff --git a/viz-core/src/types/sse.rs b/viz-core/src/types/sse.rs index 5a2a2f37..29a1672e 100644 --- a/viz-core/src/types/sse.rs +++ b/viz-core/src/types/sse.rs @@ -25,7 +25,7 @@ pub struct Sse { impl Sse where - S: Stream + Send + Sync + 'static, + S: Stream + Send + 'static, { /// Creates a new Server-Sent Event. #[must_use] @@ -46,7 +46,7 @@ where impl IntoResponse for Sse where - S: Stream + Send + Sync + 'static, + S: Stream + Send + 'static, { fn into_response(self) -> Response { let stream = self.stream.map(|e| Ok::(e.into())); diff --git a/viz-router/src/route.rs b/viz-router/src/route.rs index de380b5d..c86334d7 100644 --- a/viz-router/src/route.rs +++ b/viz-router/src/route.rs @@ -167,7 +167,7 @@ impl FromIterator<(Method, BoxHandler)> for Route { pub fn on(method: Method, handler: H) -> Route where H: Handler> + Clone, - O: IntoResponse + Send + Sync + 'static, + O: IntoResponse + Send + 'static, { Route::new().on(method, handler) }