Skip to content

Commit

Permalink
chore(core): use sync_wrapper to wrap boxed body
Browse files Browse the repository at this point in the history
  • Loading branch information
fundon committed Dec 16, 2023
1 parent 68d5dc5 commit a1292b1
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 28 deletions.
1 change: 1 addition & 0 deletions viz-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ tokio = { workspace = true, optional = true }
tokio-tungstenite = { workspace = true, optional = true }
tokio-stream = { workspace = true, optional = true }
tokio-util = { workspace = true, optional = true }
sync_wrapper = { version = "0.1.2", git = "https://github.com/fundon/sync_wrapper.git", rev = "e2ce8d8" }

[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
Expand Down
31 changes: 16 additions & 15 deletions viz-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::{
};

use futures_util::{Stream, TryStreamExt};
use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody};
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, Full, StreamBody};
use hyper::body::{Body, Frame, Incoming, SizeHint};
use sync_wrapper::SyncWrapper;

use crate::{BoxError, Bytes, Error, Result};

Expand Down Expand Up @@ -123,26 +124,26 @@ pub enum OutgoingBody<D = Bytes> {
/// A body that consists of a single chunk.
Full(Full<D>),
/// A boxed [`Body`] trait object.
Boxed(BoxBody<D, Error>),
Boxed(SyncWrapper<UnsyncBoxBody<D, Error>>),
}

impl OutgoingBody {
/// A body created from a [`Stream`].
pub fn streaming<S, D, E>(stream: S) -> Self
pub fn stream<S, D, E>(stream: S) -> Self
where
S: Stream<Item = Result<D, E>> + Send + Sync + 'static,
S: Stream<Item = Result<D, E>> + Send + 'static,
D: Into<Bytes> + 'static,
E: Into<Error> + 'static,
{
Self::Boxed(
Self::Boxed(SyncWrapper::new(
StreamBody::new(
stream
.map_ok(Into::into)
.map_ok(Frame::data)
.map_err(Into::into),
)
.boxed(),
)
.boxed_unsync(),
))
}
}

Expand All @@ -164,7 +165,7 @@ impl Body for OutgoingBody {
match self.get_mut() {
Self::Empty => Poll::Ready(None),
Self::Full(full) => Pin::new(full).poll_frame(cx).map_err(Error::from),
Self::Boxed(body) => Pin::new(body).poll_frame(cx),
Self::Boxed(body) => Pin::new(body.get_mut()).poll_frame(cx),
}
}

Expand All @@ -173,7 +174,7 @@ impl Body for OutgoingBody {
match self {
Self::Empty => true,
Self::Full(full) => full.is_end_stream(),
Self::Boxed(body) => body.is_end_stream(),
Self::Boxed(wrapper) => wrapper.get_ref().is_end_stream(),
}
}

Expand All @@ -182,7 +183,7 @@ impl Body for OutgoingBody {
match self {
Self::Empty => SizeHint::with_exact(0),
Self::Full(full) => full.size_hint(),
Self::Boxed(body) => body.size_hint(),
Self::Boxed(wrapper) => wrapper.get_ref().size_hint(),
}
}
}
Expand All @@ -197,7 +198,7 @@ impl Stream for OutgoingBody {
Self::Full(full) => Pin::new(full)
.poll_frame(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
Self::Boxed(body) => Pin::new(body)
Self::Boxed(wrapper) => Pin::new(wrapper.get_mut())
.poll_frame(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
} {
Expand All @@ -212,7 +213,7 @@ impl Stream for OutgoingBody {
let sh = match self {
Self::Empty => return (0, Some(0)),
Self::Full(full) => full.size_hint(),
Self::Boxed(body) => body.size_hint(),
Self::Boxed(wrapper) => wrapper.get_ref().size_hint(),
};
(
usize::try_from(sh.lower()).unwrap_or(usize::MAX),
Expand All @@ -233,8 +234,8 @@ impl<D> From<Full<D>> for OutgoingBody<D> {
}
}

impl<D> From<BoxBody<D, Error>> for OutgoingBody<D> {
fn from(value: BoxBody<D, Error>) -> Self {
Self::Boxed(value)
impl<D> From<UnsyncBoxBody<D, Error>> for OutgoingBody<D> {
fn from(value: UnsyncBoxBody<D, Error>) -> Self {
Self::Boxed(SyncWrapper::new(value))
}
}
10 changes: 3 additions & 7 deletions viz-core/src/middleware/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,11 @@ impl<T: IntoResponse> IntoResponse for Compress<T> {
res = res.map(|body| {
let body = StreamReader::new(body);
if self.algo == ContentCoding::Gzip {
OutgoingBody::streaming(ReaderStream::new(bufread::GzipEncoder::new(body)))
OutgoingBody::stream(ReaderStream::new(bufread::GzipEncoder::new(body)))
} else if self.algo == ContentCoding::Deflate {
OutgoingBody::streaming(ReaderStream::new(bufread::DeflateEncoder::new(
body,
)))
OutgoingBody::stream(ReaderStream::new(bufread::DeflateEncoder::new(body)))
} else {
OutgoingBody::streaming(ReaderStream::new(bufread::BrotliEncoder::new(
body,
)))
OutgoingBody::stream(ReaderStream::new(bufread::BrotliEncoder::new(body)))
}
});
res.headers_mut()
Expand Down
12 changes: 9 additions & 3 deletions viz-core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub trait ResponseExt: private::Sealed + Sized {
/// [mdn]: <https://developer.mozilla.org/en-US/docs/Web/API/Response/ok>
fn ok(&self) -> bool;

/// Creates a response with an empty body.
#[must_use]
fn empty() -> Response {
Response::new(OutgoingBody::default())
}

/// The response with the specified [`Content-Type`][mdn].
///
/// [mdn]: <https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type>
Expand Down Expand Up @@ -81,11 +87,11 @@ pub trait ResponseExt: private::Sealed + Sized {
/// Responds to a stream.
fn stream<S, D, E>(stream: S) -> Response
where
S: Stream<Item = Result<D, E>> + Send + Sync + 'static,
S: Stream<Item = Result<D, E>> + Send + 'static,
D: Into<Bytes> + 'static,
E: Into<Error> + 'static,
{
Response::new(OutgoingBody::streaming(stream))
Response::new(OutgoingBody::stream(stream))
}

/// Downloads transfers the file from path as an attachment.
Expand Down Expand Up @@ -223,7 +229,7 @@ impl ResponseExt for Response {
.escape_default();

let mut resp = Self::attachment(&format!("attachment; filename=\"{value}\""));
*resp.body_mut() = OutgoingBody::streaming(tokio_util::io::ReaderStream::new(
*resp.body_mut() = OutgoingBody::stream(tokio_util::io::ReaderStream::new(
tokio::fs::File::open(path).await.map_err(Error::from)?,
));
Ok(resp)
Expand Down
4 changes: 2 additions & 2 deletions viz-core/src/types/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct Sse<S> {

impl<S> Sse<S>
where
S: Stream<Item = Event> + Send + Sync + 'static,
S: Stream<Item = Event> + Send + 'static,
{
/// Creates a new Server-Sent Event.
#[must_use]
Expand All @@ -46,7 +46,7 @@ where

impl<S> IntoResponse for Sse<S>
where
S: Stream<Item = Event> + Send + Sync + 'static,
S: Stream<Item = Event> + Send + 'static,
{
fn into_response(self) -> Response {
let stream = self.stream.map(|e| Ok::<Bytes, std::io::Error>(e.into()));
Expand Down
2 changes: 1 addition & 1 deletion viz-router/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl FromIterator<(Method, BoxHandler)> for Route {
pub fn on<H, O>(method: Method, handler: H) -> Route
where
H: Handler<Request, Output = Result<O>> + Clone,
O: IntoResponse + Send + Sync + 'static,
O: IntoResponse + Send + 'static,
{
Route::new().on(method, handler)
}
Expand Down

0 comments on commit a1292b1

Please sign in to comment.