Skip to content

Commit

Permalink
Merge branch 'main' into fix/local_idl_path
Browse files Browse the repository at this point in the history
  • Loading branch information
Ggiggle authored Sep 29, 2024
2 parents ba5d4cd + 566fcb5 commit 2974395
Show file tree
Hide file tree
Showing 17 changed files with 691 additions and 569 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ webpki-roots = "0.26"
tokio-rustls = "0.25"
native-tls = "0.2"
tokio-native-tls = "0.3"
tokio-tungstenite = "0.23"

tungstenite = "0.24"
tokio-tungstenite = "0.24"

[profile.release]
opt-level = 3
Expand Down
1 change: 0 additions & 1 deletion volo-grpc/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ impl Status {
}

/// Create a new [`Status`] with the associated code and message.
pub fn new(code: Code, message: impl Into<String>) -> Self {
Self {
code,
Expand Down
4 changes: 3 additions & 1 deletion volo-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ tokio-util = { workspace = true, features = ["io"] }
tracing.workspace = true

# =====optional=====

# server optional
matchit = { workspace = true, optional = true }

# protocol optional
tungstenite = { workspace = true, optional = true }
tokio-tungstenite = { workspace = true, optional = true }

# tls optional
Expand Down Expand Up @@ -95,7 +97,7 @@ full = ["client", "server", "rustls", "cookie", "query", "form", "json", "tls",
client = ["hyper/client", "hyper/http1"] # client core
server = ["hyper/server", "hyper/http1", "dep:matchit"] # server core

ws = ["dep:tokio-tungstenite"]
ws = ["dep:tungstenite", "dep:tokio-tungstenite"]

tls = ["rustls"]
__tls = []
Expand Down
102 changes: 72 additions & 30 deletions volo-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,36 @@ use std::{

use bytes::Bytes;
use faststr::FastStr;
use futures_util::{ready, Stream};
use futures_util::stream::Stream;
use http_body::{Frame, SizeHint};
use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody};
pub use hyper::body::Incoming;
use motore::BoxError;
use hyper::body::Incoming;
use pin_project::pin_project;
#[cfg(feature = "json")]
use serde::de::DeserializeOwned;

use crate::error::BoxError;

// The `futures_util::stream::BoxStream` does not have `Sync`
type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;

/// An implementation for [`http_body::Body`].
#[pin_project]
pub struct Body {
#[pin]
repr: BodyRepr,
}

#[pin_project(project = BodyProj)]
pub enum Body {
enum BodyRepr {
/// Complete [`Bytes`], with a certain size and content
Full(#[pin] Full<Bytes>),
/// Wrapper of [`hyper::body::Incoming`], it usually appers in request of server or response of
/// client.
///
/// Althrough [`hyper::body::Incoming`] implements [`http_body::Body`], the type is so commonly
/// used, we wrap it here as [`Body::Hyper`] to avoid cost of [`Box`] with dynamic dispatch.
Hyper(#[pin] Incoming),
/// Boxed stream with `Item = Result<Frame<Bytes>, BoxError>`
Stream(#[pin] StreamBody<BoxStream<'static, Result<Frame<Bytes>, BoxError>>>),
/// Boxed [`http_body::Body`]
Expand All @@ -44,15 +57,29 @@ impl Default for Body {
impl Body {
/// Create an empty body.
pub fn empty() -> Self {
Self::Full(Full::new(Bytes::new()))
Self {
repr: BodyRepr::Full(Full::new(Bytes::new())),
}
}

/// Create a body by [`hyper::body::Incoming`].
///
/// Compared to [`Body::from_body`], this function avoids overhead of allocating by [`Box`]
/// and dynamic dispatch by [`dyn http_body::Body`][http_body::Body].
pub fn from_incoming(incoming: Incoming) -> Self {
Self {
repr: BodyRepr::Hyper(incoming),
}
}

/// Create a body by a [`Stream`] with `Item = Result<Frame<Bytes>, BoxError>`.
pub fn from_stream<S>(stream: S) -> Self
where
S: Stream<Item = Result<Frame<Bytes>, BoxError>> + Send + Sync + 'static,
{
Self::Stream(StreamBody::new(Box::pin(stream)))
Self {
repr: BodyRepr::Stream(StreamBody::new(Box::pin(stream))),
}
}

/// Create a body by another [`http_body::Body`] instance.
Expand All @@ -61,7 +88,9 @@ impl Body {
B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
B::Error: Into<BoxError>,
{
Self::Body(BoxBody::new(body.map_err(Into::into)))
Self {
repr: BodyRepr::Body(BoxBody::new(body.map_err(Into::into))),
}
}
}

Expand All @@ -73,39 +102,42 @@ impl http_body::Body for Body {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.project() {
BodyProj::Full(full) => {
// Convert `Infallible` to `BoxError`
Poll::Ready(ready!(full.poll_frame(cx)).map(|res| Ok(res?)))
match self.project().repr.project() {
BodyProj::Full(full) => http_body::Body::poll_frame(full, cx).map_err(BoxError::from),
BodyProj::Hyper(incoming) => {
http_body::Body::poll_frame(incoming, cx).map_err(BoxError::from)
}
BodyProj::Stream(stream) => stream.poll_frame(cx),
BodyProj::Body(body) => body.poll_frame(cx),
BodyProj::Stream(stream) => http_body::Body::poll_frame(stream, cx),
BodyProj::Body(body) => http_body::Body::poll_frame(body, cx),
}
}

fn is_end_stream(&self) -> bool {
match self {
Self::Full(full) => full.is_end_stream(),
Self::Stream(stream) => stream.is_end_stream(),
Self::Body(body) => body.is_end_stream(),
match &self.repr {
BodyRepr::Full(full) => http_body::Body::is_end_stream(full),
BodyRepr::Hyper(incoming) => http_body::Body::is_end_stream(incoming),
BodyRepr::Stream(stream) => http_body::Body::is_end_stream(stream),
BodyRepr::Body(body) => http_body::Body::is_end_stream(body),
}
}

fn size_hint(&self) -> SizeHint {
match self {
Self::Full(full) => full.size_hint(),
Self::Stream(stream) => http_body::Body::size_hint(stream),
Self::Body(body) => body.size_hint(),
match &self.repr {
BodyRepr::Full(full) => http_body::Body::size_hint(full),
BodyRepr::Hyper(incoming) => http_body::Body::size_hint(incoming),
BodyRepr::Stream(stream) => http_body::Body::size_hint(stream),
BodyRepr::Body(body) => http_body::Body::size_hint(body),
}
}
}

impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Full(_) => f.write_str("Body::Full"),
Self::Stream(_) => f.write_str("Body::Stream"),
Self::Body(_) => f.write_str("Body::Body"),
match &self.repr {
BodyRepr::Full(_) => f.write_str("Body::Full"),
BodyRepr::Hyper(_) => f.write_str("Body::Hyper"),
BodyRepr::Stream(_) => f.write_str("Body::Stream"),
BodyRepr::Body(_) => f.write_str("Body::Body"),
}
}
}
Expand Down Expand Up @@ -256,30 +288,40 @@ impl From<()> for Body {

impl From<&'static str> for Body {
fn from(value: &'static str) -> Self {
Self::Full(Full::new(Bytes::from_static(value.as_bytes())))
Self {
repr: BodyRepr::Full(Full::new(Bytes::from_static(value.as_bytes()))),
}
}
}

impl From<Vec<u8>> for Body {
fn from(value: Vec<u8>) -> Self {
Self::Full(Full::new(Bytes::from(value)))
Self {
repr: BodyRepr::Full(Full::new(Bytes::from(value))),
}
}
}

impl From<Bytes> for Body {
fn from(value: Bytes) -> Self {
Self::Full(Full::new(value))
Self {
repr: BodyRepr::Full(Full::new(value)),
}
}
}

impl From<FastStr> for Body {
fn from(value: FastStr) -> Self {
Self::Full(Full::new(value.into_bytes()))
Self {
repr: BodyRepr::Full(Full::new(value.into_bytes())),
}
}
}

impl From<String> for Body {
fn from(value: String) -> Self {
Self::Full(Full::new(Bytes::from(value)))
Self {
repr: BodyRepr::Full(Full::new(Bytes::from(value))),
}
}
}
7 changes: 3 additions & 4 deletions volo-http/src/client/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::error::Error;

use http_body::Body;
use hyper::client::conn::http1;
use hyper_util::rt::TokioIo;
use motore::{make::MakeConnection, service::Service};
Expand Down Expand Up @@ -116,7 +115,7 @@ impl ClientTransport {
req: ClientRequest<B>,
) -> Result<ClientResponse, ClientError>
where
B: Body + Send + 'static,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
{
Expand All @@ -132,13 +131,13 @@ impl ClientTransport {
tracing::error!("[Volo-HTTP] failed to send request, error: {err}");
request_error(err)
})?;
Ok(resp)
Ok(resp.map(crate::body::Body::from_incoming))
}
}

impl<B> Service<ClientContext, ClientRequest<B>> for ClientTransport
where
B: Body + Send + 'static,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
{
Expand Down
66 changes: 0 additions & 66 deletions volo-http/src/error/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,69 +99,3 @@ pub fn body_collection_error() -> ExtractBodyError {
pub fn invalid_content_type() -> ExtractBodyError {
ExtractBodyError::Generic(GenericRejectionError::InvalidContentType)
}

/// Rejection used for [`WebSocketUpgrade`](crate::server::utils::ws::WebSocketUpgrade).
#[derive(Debug)]
#[non_exhaustive]
pub enum WebSocketUpgradeRejectionError {
/// The request method must be `GET`
MethodNotGet,
/// The HTTP version is not supported
InvalidHttpVersion,
/// The `Connection` header is invalid
InvalidConnectionHeader,
/// The `Upgrade` header is invalid
InvalidUpgradeHeader,
/// The `Sec-WebSocket-Version` header is invalid
InvalidWebSocketVersionHeader,
/// The `Sec-WebSocket-Key` header is missing
WebSocketKeyHeaderMissing,
/// The connection is not upgradable
ConnectionNotUpgradable,
}

impl WebSocketUpgradeRejectionError {
/// Convert the [`WebSocketUpgradeRejectionError`] to the corresponding [`StatusCode`]
fn to_status_code(&self) -> StatusCode {
match self {
Self::MethodNotGet => StatusCode::METHOD_NOT_ALLOWED,
Self::InvalidHttpVersion => StatusCode::HTTP_VERSION_NOT_SUPPORTED,
Self::InvalidConnectionHeader => StatusCode::BAD_REQUEST,
Self::InvalidUpgradeHeader => StatusCode::BAD_REQUEST,
Self::InvalidWebSocketVersionHeader => StatusCode::BAD_REQUEST,
Self::WebSocketKeyHeaderMissing => StatusCode::BAD_REQUEST,
Self::ConnectionNotUpgradable => StatusCode::UPGRADE_REQUIRED,
}
}
}

impl Error for WebSocketUpgradeRejectionError {}

impl fmt::Display for WebSocketUpgradeRejectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MethodNotGet => write!(f, "Request method must be 'GET'"),
Self::InvalidHttpVersion => {
write!(f, "Http version not support, only support HTTP 1.1 for now")
}
Self::InvalidConnectionHeader => {
write!(f, "Connection header did not include 'upgrade'")
}
Self::InvalidUpgradeHeader => write!(f, "`Upgrade` header did not include 'websocket'"),
Self::InvalidWebSocketVersionHeader => {
write!(f, "`Sec-WebSocket-Version` header did not include '13'")
}
Self::WebSocketKeyHeaderMissing => write!(f, "`Sec-WebSocket-Key` header missing"),
Self::ConnectionNotUpgradable => write!(
f,
"WebSocket request couldn't be upgraded since no upgrade state was present"
),
}
}
}

impl IntoResponse for WebSocketUpgradeRejectionError {
fn into_response(self) -> ServerResponse {
self.to_status_code().into_response()
}
}
2 changes: 1 addition & 1 deletion volo-http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub type ClientRequest<B = crate::body::Body> = Request<B>;
///
/// [`Incoming`]: hyper::body::Incoming
#[cfg(feature = "server")]
pub type ServerRequest<B = hyper::body::Incoming> = Request<B>;
pub type ServerRequest<B = crate::body::Body> = Request<B>;

/// HTTP header [`X-Forwarded-For`][mdn].
///
Expand Down
Loading

0 comments on commit 2974395

Please sign in to comment.