diff --git a/Cargo.lock b/Cargo.lock index 64490c0d..edb97041 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2938,7 +2938,6 @@ version = "0.1.0" dependencies = [ "bytes", "futures-util", - "http 1.0.0", "http-body-util", "hyper 1.0.1", "hyper-util", diff --git a/volo-http/Cargo.toml b/volo-http/Cargo.toml index 35d15099..4954a9da 100644 --- a/volo-http/Cargo.toml +++ b/volo-http/Cargo.toml @@ -20,7 +20,6 @@ maintenance = { status = "actively-developed" } [dependencies] volo = { version = "0.8", path = "../volo" } -http = "1" http-body-util = "0.1" hyper = { version = "1", features = ["server", "http1", "http2"] } hyper-util = { version = "0.1", features = ["tokio"] } diff --git a/volo-http/src/extract.rs b/volo-http/src/extract.rs index cbce3f04..c9688a61 100644 --- a/volo-http/src/extract.rs +++ b/volo-http/src/extract.rs @@ -1,15 +1,16 @@ +use std::convert::Infallible; + use futures_util::Future; -use http::{Method, Response, Uri}; +use hyper::http::{Method, Uri}; use volo::net::Address; -use crate::{response::IntoResponse, HttpContext, Params, State}; +use crate::{HttpContext, Params, State}; pub trait FromContext: Sized { - type Rejection: IntoResponse; fn from_context( context: &HttpContext, state: &S, - ) -> impl Future> + Send; + ) -> impl Future> + Send; } impl FromContext for Option @@ -17,9 +18,7 @@ where T: FromContext, S: Send + Sync, { - type Rejection = Response<()>; // Infallible - - async fn from_context(context: &HttpContext, state: &S) -> Result { + async fn from_context(context: &HttpContext, state: &S) -> Result { Ok(T::from_context(context, state).await.ok()) } } @@ -28,9 +27,7 @@ impl FromContext for Address where S: Send + Sync, { - type Rejection = Response<()>; // Infallible - - async fn from_context(context: &HttpContext, _state: &S) -> Result { + async fn from_context(context: &HttpContext, _state: &S) -> Result { Ok(context.peer.clone()) } } @@ -39,9 +36,7 @@ impl FromContext for Uri where S: Send + Sync, { - type Rejection = Response<()>; // Infallible - - async fn from_context(context: &HttpContext, _state: &S) -> Result { + async fn from_context(context: &HttpContext, _state: &S) -> Result { Ok(context.uri.clone()) } } @@ -50,9 +45,7 @@ impl FromContext for Method where S: Send + Sync, { - type Rejection = Response<()>; // Infallible - - async fn from_context(context: &HttpContext, _state: &S) -> Result { + async fn from_context(context: &HttpContext, _state: &S) -> Result { Ok(context.method.clone()) } } @@ -61,9 +54,7 @@ impl FromContext for Params where S: Send + Sync, { - type Rejection = Response<()>; // Infallible - - async fn from_context(context: &HttpContext, _state: &S) -> Result { + async fn from_context(context: &HttpContext, _state: &S) -> Result { Ok(context.params.clone()) } } @@ -72,9 +63,7 @@ impl FromContext for State where S: Clone + Send + Sync, { - type Rejection = Response<()>; // Infallible - - async fn from_context(_context: &HttpContext, state: &S) -> Result { + async fn from_context(_context: &HttpContext, state: &S) -> Result { Ok(State(state.clone())) } } diff --git a/volo-http/src/handler.rs b/volo-http/src/handler.rs index 0d8fa852..b6c21481 100644 --- a/volo-http/src/handler.rs +++ b/volo-http/src/handler.rs @@ -1,6 +1,5 @@ -use std::{future::Future, marker::PhantomData}; +use std::{convert::Infallible, future::Future, marker::PhantomData}; -use http::Response; use hyper::body::Incoming; use motore::Service; @@ -8,8 +7,8 @@ use crate::{ extract::FromContext, macros::{all_the_tuples, all_the_tuples_no_last_special_case}, request::FromRequest, - response::{IntoResponse, RespBody}, - DynError, DynService, HttpContext, + response::{IntoResponse, Response}, + DynService, HttpContext, }; pub trait Handler: Sized { @@ -18,7 +17,7 @@ pub trait Handler: Sized { context: &mut HttpContext, req: Incoming, state: &S, - ) -> impl Future> + Send; + ) -> impl Future + Send; fn with_state(self, state: S) -> HandlerService where @@ -39,12 +38,7 @@ where Res: IntoResponse, S: Send + Sync, { - async fn call( - self, - _context: &mut HttpContext, - _req: Incoming, - _state: &S, - ) -> Response { + async fn call(self, _context: &mut HttpContext, _req: Incoming, _state: &S) -> Response { self().await.into_response() } } @@ -63,7 +57,7 @@ macro_rules! impl_handler { $( for<'r> $ty: FromContext + Send + 'r, )* for<'r> $last: FromRequest + Send + 'r, { - async fn call(self, context: &mut HttpContext, req: Incoming, state: &S) -> Response { + async fn call(self, context: &mut HttpContext, req: Incoming, state: &S) -> Response { $( let $ty = match $ty::from_context(context, state).await { Ok(value) => value, @@ -132,7 +126,7 @@ where cx: &mut HttpContext, req: Incoming, state: S, - ) -> Result, DynError> { + ) -> Result { self.0.into_route(state).call(cx, req).await } } @@ -238,8 +232,8 @@ where for<'r> H: Handler + Clone + Send + Sync + 'r, S: Sync, { - type Response = Response; - type Error = DynError; + type Response = Response; + type Error = Infallible; async fn call<'s, 'cx>( &'s self, @@ -251,7 +245,7 @@ where } pub trait HandlerWithoutRequest: Sized { - fn call(self, context: &HttpContext) -> impl Future> + Send; + fn call(self, context: &HttpContext) -> impl Future + Send; } impl HandlerWithoutRequest<()> for F @@ -259,7 +253,7 @@ where F: FnOnce() -> Res + Clone + Send, Res: IntoResponse, { - async fn call(self, _context: &HttpContext) -> Response { + async fn call(self, _context: &HttpContext) -> Response { self().into_response() } } @@ -275,7 +269,7 @@ macro_rules! impl_handler_without_request { Res: IntoResponse, $( for<'r> $ty: FromContext<()> + Send + 'r, )* { - async fn call(self, context: &HttpContext) -> Response { + async fn call(self, context: &HttpContext) -> Response { $( let $ty = match $ty::from_context(context, &()).await { Ok(value) => value, diff --git a/volo-http/src/layer.rs b/volo-http/src/layer.rs index 93b3f199..417f2989 100644 --- a/volo-http/src/layer.rs +++ b/volo-http/src/layer.rs @@ -1,13 +1,15 @@ use std::{marker::PhantomData, time::Duration}; -use http::{Method, Request, Response, StatusCode}; -use http_body_util::Full; -use hyper::body::{Bytes, Incoming}; +use hyper::{ + body::Incoming, + http::{Method, StatusCode}, +}; use motore::{layer::Layer, service::Service}; use crate::{ handler::HandlerWithoutRequest, - response::{IntoResponse, RespBody}, + request::Request, + response::{IntoResponse, Response}, HttpContext, }; @@ -15,25 +17,23 @@ pub trait LayerExt { fn method( self, method: Method, - ) -> FilterLayer) -> Result<(), StatusCode>>> + ) -> FilterLayer Result<(), StatusCode>>> where Self: Sized, { - self.filter(Box::new( - move |cx: &mut HttpContext, _: &Request| { - if cx.method == method { - Ok(()) - } else { - Err(StatusCode::METHOD_NOT_ALLOWED) - } - }, - )) + self.filter(Box::new(move |cx: &mut HttpContext, _: &Request| { + if cx.method == method { + Ok(()) + } else { + Err(StatusCode::METHOD_NOT_ALLOWED) + } + })) } fn filter(self, f: F) -> FilterLayer where Self: Sized, - F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode>, + F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode>, { FilterLayer { f } } @@ -45,11 +45,8 @@ pub struct FilterLayer { impl Layer for FilterLayer where - S: Service, Response = Response>> - + Send - + Sync - + 'static, - F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync, + S: Service + Send + Sync + 'static, + F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync, { type Service = Filter; @@ -66,13 +63,10 @@ pub struct Filter { f: F, } -impl Service> for Filter +impl Service for Filter where - S: Service, Response = Response>> - + Send - + Sync - + 'static, - F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync, + S: Service + Send + Sync + 'static, + F: Fn(&mut HttpContext, &Request) -> Result<(), StatusCode> + Send + Sync, { type Response = S::Response; @@ -81,13 +75,10 @@ where async fn call<'s, 'cx>( &'s self, cx: &'cx mut HttpContext, - req: Request, + req: Request, ) -> Result { if let Err(status) = (self.f)(cx, &req) { - return Ok(Response::builder() - .status(status) - .body(Full::new(Bytes::new())) - .unwrap()); + return Ok(status.into_response()); } self.service.call(cx, req).await } @@ -115,7 +106,7 @@ impl TimeoutLayer { impl Layer for TimeoutLayer where - S: Service> + Send + Sync + 'static, + S: Service + Send + Sync + 'static, H: HandlerWithoutRequest + Clone + Send + Sync + 'static, T: Sync, { @@ -141,7 +132,7 @@ pub struct Timeout { impl Service for Timeout where - S: Service> + Send + Sync + 'static, + S: Service + Send + Sync + 'static, S::Error: Send, H: HandlerWithoutRequest + Clone + Send + Sync + 'static, T: Sync, @@ -161,7 +152,7 @@ where tokio::select! { resp = fut_service => resp, _ = fut_timeout => { - Ok(self.handler.clone().call(cx).await.into_response()) + Ok(self.handler.clone().call(cx).await) }, } } diff --git a/volo-http/src/lib.rs b/volo-http/src/lib.rs index 68735579..c794c92b 100644 --- a/volo-http/src/lib.rs +++ b/volo-http/src/lib.rs @@ -9,25 +9,23 @@ pub mod server; mod macros; +use std::convert::Infallible; + pub use bytes::Bytes; -use http::{Extensions, HeaderMap, HeaderValue, Version}; -pub use http::{Method, StatusCode, Uri}; -use hyper::{body::Incoming, Response}; +pub use hyper::{ + body::Incoming, + http::{Extensions, HeaderMap, HeaderName, HeaderValue, Method, StatusCode, Uri, Version}, +}; pub use volo::net::Address; -pub use crate::{param::Params, request::Json, server::Server}; - -mod private { - #[derive(Debug, Clone, Copy)] - pub enum ViaContext {} - - #[derive(Debug, Clone, Copy)] - pub enum ViaRequest {} -} +pub use crate::{ + param::Params, + request::{Json, Request}, + response::Response, + server::Server, +}; -pub type DynService = - motore::BoxCloneService, DynError>; -pub type DynError = Box; +pub type DynService = motore::BoxCloneService; #[derive(Debug, Default, Clone, Copy)] pub struct State(pub S); diff --git a/volo-http/src/request.rs b/volo-http/src/request.rs index 193c3063..111d5e34 100644 --- a/volo-http/src/request.rs +++ b/volo-http/src/request.rs @@ -1,23 +1,62 @@ +use std::ops::{Deref, DerefMut}; + use bytes::Bytes; use futures_util::Future; -use http::{header, HeaderMap, Response, StatusCode}; use http_body_util::BodyExt; -use hyper::body::Incoming; +use hyper::{ + body::Incoming, + http::{header, request::Builder, HeaderMap, StatusCode}, +}; use serde::de::DeserializeOwned; use crate::{ extract::FromContext, - private, - response::{IntoResponse, RespBody}, + response::{IntoResponse, Response}, HttpContext, }; +pub struct Request(pub(crate) hyper::http::Request); + +impl Request { + pub fn builder() -> Builder { + Builder::new() + } +} + +impl Deref for Request { + type Target = hyper::http::Request; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Request { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From> for Request { + fn from(value: hyper::http::Request) -> Self { + Self(value) + } +} + +mod private { + #[derive(Debug, Clone, Copy)] + pub enum ViaContext {} + + #[derive(Debug, Clone, Copy)] + pub enum ViaRequest {} +} + pub trait FromRequest: Sized { fn from( cx: &HttpContext, body: Incoming, state: &S, - ) -> impl Future>> + Send; + ) -> impl Future> + Send; } impl FromRequest for T @@ -25,11 +64,7 @@ where T: FromContext + Sync, S: Sync, { - async fn from( - cx: &HttpContext, - _body: Incoming, - state: &S, - ) -> Result> { + async fn from(cx: &HttpContext, _body: Incoming, state: &S) -> Result { match T::from_context(cx, state).await { Ok(value) => Ok(value), Err(rejection) => Err(rejection.into_response()), @@ -41,11 +76,7 @@ impl FromRequest for Incoming where S: Sync, { - async fn from( - _cx: &HttpContext, - body: Incoming, - _state: &S, - ) -> Result> { + async fn from(_cx: &HttpContext, body: Incoming, _state: &S) -> Result { Ok(body) } } @@ -57,13 +88,14 @@ impl FromRequest for Json { cx: &HttpContext, body: Incoming, _state: &S, - ) -> impl Future>> + Send { + ) -> impl Future> + Send { async move { if !json_content_type(&cx.headers) { return Err(Response::builder() .status(StatusCode::UNSUPPORTED_MEDIA_TYPE) .body(Bytes::new().into()) - .unwrap()); + .unwrap() + .into()); } match body.collect().await { @@ -76,7 +108,8 @@ impl FromRequest for Json { Err(Response::builder() .status(StatusCode::BAD_REQUEST) .body(Bytes::new().into()) - .unwrap()) + .unwrap() + .into()) } } } @@ -85,7 +118,8 @@ impl FromRequest for Json { Err(Response::builder() .status(StatusCode::BAD_REQUEST) .body(Bytes::new().into()) - .unwrap()) + .unwrap() + .into()) } } } diff --git a/volo-http/src/response.rs b/volo-http/src/response.rs index 897d2fdb..7e79fff3 100644 --- a/volo-http/src/response.rs +++ b/volo-http/src/response.rs @@ -1,57 +1,78 @@ use std::{ + convert::Infallible, + ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}, }; -use futures_util::{ready, stream}; -use http::{Response, StatusCode}; -use http_body_util::{Full, StreamBody}; -use hyper::body::{Body, Bytes, Frame}; +use futures_util::ready; +use http_body_util::Full; +use hyper::{ + body::{Body, Bytes, Frame}, + http::{response::Builder, StatusCode}, +}; use pin_project::pin_project; -use crate::DynError; +pub struct Response(hyper::http::Response); + +impl Response { + pub fn builder() -> Builder { + Builder::new() + } + + pub(crate) fn inner(self) -> hyper::http::Response { + self.0 + } +} + +impl Deref for Response { + type Target = hyper::http::Response; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Response { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From> for Response { + fn from(value: hyper::http::Response) -> Self { + Self(value) + } +} -#[pin_project(project = RespBodyProj)] -pub enum RespBody { - Stream { - #[pin] - inner: StreamBody< - stream::Iter, DynError>> + Send + Sync>>, - >, - }, - Full { - #[pin] - inner: Full, - }, +#[pin_project] +pub struct RespBody { + #[pin] + inner: Full, } impl Body for RespBody { type Data = Bytes; - type Error = DynError; + type Error = Infallible; fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { - match self.project() { - RespBodyProj::Stream { inner } => inner.poll_frame(cx), - RespBodyProj::Full { inner } => { - Poll::Ready(ready!(inner.poll_frame(cx)).map(|result| Ok(result.unwrap()))) - } - } + Poll::Ready(ready!(self.project().inner.poll_frame(cx)).map(|result| Ok(result.unwrap()))) } } impl From> for RespBody { fn from(value: Full) -> Self { - Self::Full { inner: value } + Self { inner: value } } } impl From for RespBody { fn from(value: Bytes) -> Self { - Self::Full { + Self { inner: Full::new(value), } } @@ -59,7 +80,7 @@ impl From for RespBody { impl From for RespBody { fn from(value: String) -> Self { - Self::Full { + Self { inner: Full::new(value.into()), } } @@ -67,7 +88,7 @@ impl From for RespBody { impl From<&'static str> for RespBody { fn from(value: &'static str) -> Self { - Self::Full { + Self { inner: Full::new(value.into()), } } @@ -75,23 +96,19 @@ impl From<&'static str> for RespBody { impl From<()> for RespBody { fn from(_: ()) -> Self { - Self::Full { + Self { inner: Full::new(Bytes::new()), } } } pub trait IntoResponse { - fn into_response(self) -> Response; + fn into_response(self) -> Response; } -impl IntoResponse for Response -where - T: Into, -{ - fn into_response(self) -> Response { - let (parts, body) = self.into_parts(); - Response::from_parts(parts, body.into()) +impl IntoResponse for Infallible { + fn into_response(self) -> Response { + StatusCode::INTERNAL_SERVER_ERROR.into_response() } } @@ -99,11 +116,12 @@ impl IntoResponse for T where T: Into, { - fn into_response(self) -> Response { + fn into_response(self) -> Response { Response::builder() .status(StatusCode::OK) .body(self.into()) .unwrap() + .into() } } @@ -112,7 +130,7 @@ where R: IntoResponse, E: IntoResponse, { - fn into_response(self) -> Response { + fn into_response(self) -> Response { match self { Ok(value) => value.into_response(), Err(err) => err.into_response(), @@ -124,18 +142,19 @@ impl IntoResponse for (StatusCode, T) where T: IntoResponse, { - fn into_response(self) -> Response { + fn into_response(self) -> Response { let mut resp = self.1.into_response(); - *resp.status_mut() = self.0; + *resp.0.status_mut() = self.0; resp } } impl IntoResponse for StatusCode { - fn into_response(self) -> Response { + fn into_response(self) -> Response { Response::builder() .status(self) .body(String::new().into()) .unwrap() + .into() } } diff --git a/volo-http/src/route.rs b/volo-http/src/route.rs index c0b0fef8..9ae2c390 100644 --- a/volo-http/src/route.rs +++ b/volo-http/src/route.rs @@ -1,13 +1,15 @@ -use std::collections::HashMap; +use std::{collections::HashMap, convert::Infallible}; -use http::{Method, Response, StatusCode}; -use hyper::body::Incoming; +use hyper::{ + body::Incoming, + http::{Method, StatusCode}, +}; use motore::{layer::Layer, service::Service}; use crate::{ handler::{DynHandler, Handler}, - response::{IntoResponse, RespBody}, - DynError, DynService, HttpContext, + response::IntoResponse, + DynService, HttpContext, Response, }; // The `matchit::Router` cannot be converted to `Iterator`, so using @@ -70,7 +72,7 @@ where pub fn layer(self, l: L) -> Self where L: Layer + Clone + Send + Sync + 'static, - L::Service: Service, Error = DynError> + L::Service: Service + Clone + Send + Sync @@ -116,9 +118,9 @@ where } impl Service for Router<()> { - type Response = Response; + type Response = Response; - type Error = DynError; + type Error = Infallible; async fn call<'s, 'cx>( &'s self, @@ -177,7 +179,7 @@ where pub fn layer(self, l: L) -> Self where L: Layer + Clone + Send + Sync + 'static, - L::Service: Service, Error = DynError> + L::Service: Service + Clone + Send + Sync @@ -244,7 +246,7 @@ where cx: &'cx mut HttpContext, req: Incoming, state: S, - ) -> Result, DynError> + ) -> Result where S: 'cx, { @@ -353,7 +355,7 @@ where pub fn from_service(srv: Srv) -> MethodEndpoint where - Srv: Service, Error = DynError> + Srv: Service + Clone + Send + Sync @@ -409,7 +411,7 @@ where pub fn from_service(srv: Srv) -> Fallback where - Srv: Service, Error = DynError> + Srv: Service + Clone + Send + Sync @@ -431,7 +433,7 @@ where pub(crate) fn layer(self, l: L) -> Self where L: Layer + Clone + Send + Sync + 'static, - L::Service: Service, Error = DynError> + L::Service: Service + Clone + Send + Sync @@ -452,7 +454,7 @@ where cx: &'cx mut HttpContext, req: Incoming, state: S, - ) -> Result, DynError> + ) -> Result where S: 'cx, { @@ -474,7 +476,7 @@ where pub fn from_service(srv: Srv) -> MethodEndpoint where - Srv: Service, Error = DynError> + Srv: Service + Clone + Send + Sync @@ -488,8 +490,8 @@ where struct RouteForStatusCode(StatusCode); impl Service for RouteForStatusCode { - type Response = Response; - type Error = DynError; + type Response = Response; + type Error = Infallible; async fn call<'s, 'cx>( &'s self, diff --git a/volo-http/src/server.rs b/volo-http/src/server.rs index 7b49df4d..49bd63e9 100644 --- a/volo-http/src/server.rs +++ b/volo-http/src/server.rs @@ -1,20 +1,21 @@ use std::{ + convert::Infallible, sync::{atomic::Ordering, Arc}, time::Duration, }; -use http::{Request, Response}; -use hyper::{ - body::{Body, Incoming as BodyIncoming}, - server::conn::http1, -}; +use hyper::{body::Incoming as BodyIncoming, server::conn::http1}; use hyper_util::rt::TokioIo; use motore::BoxError; use tokio::sync::Notify; use tracing::{info, trace}; use volo::net::{conn::Conn, incoming::Incoming, Address, MakeIncoming}; -use crate::{param::Params, DynError, HttpContext}; +use crate::{ + param::Params, + response::{IntoResponse, RespBody, Response}, + HttpContext, +}; pub struct Server { app: Arc, @@ -28,13 +29,12 @@ impl Clone for Server { } } -impl Server +impl Server where - OB: Body + Send + 'static, - OB::Data: Send, - App: - motore::Service> + Send + Sync + 'static, - App::Error: Into, + App: motore::Service + + Send + + Sync + + 'static, { pub fn new(app: App) -> Self { Self { app: Arc::new(app) } @@ -156,7 +156,7 @@ where } } -async fn handle_conn( +async fn handle_conn( conn: Conn, service: S, exit_notify: Arc, @@ -164,25 +164,23 @@ async fn handle_conn( conn_cnt: Arc, peer: Address, ) where - S: motore::Service> + S: motore::Service + Clone + Send + Sync + 'static, - S::Error: Into, - B: Body + Send + 'static, - B::Data: Send, { let notified = exit_notify.notified(); tokio::pin!(notified); let mut http_conn = http1::Builder::new().serve_connection( TokioIo::new(conn), - hyper::service::service_fn(move |req: Request| { + hyper::service::service_fn(move |req: hyper::http::Request| { let service = service.clone(); let peer = peer.clone(); async move { let (parts, req) = req.into_parts(); + let req = req.into(); let mut cx = HttpContext { peer, method: parts.method, @@ -194,7 +192,11 @@ async fn handle_conn( inner: Vec::with_capacity(0), }, }; - service.call(&mut cx, req).await + let resp = match service.call(&mut cx, req).await { + Ok(resp) => resp, + Err(inf) => inf.into_response(), + }; + Ok::, Infallible>(resp.inner()) } }), );