-
Notifications
You must be signed in to change notification settings - Fork 187
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(volo-http): add TimeoutLayer and FailOnStatus layer
Since the previous `MetaService` was too coupled with the client configuration, it was very inelegant. So we removed `MetaService` and used `TimeoutLayer` and `FailOnStatus` layers to replace the previous `MetaService` with the two configs. BREAK CHANGES: - `ClientBuilder::fail_on_status` has been removed, users should use `FailOnStatus` layer instead - `ClientBuilder::set_request_timeout` has been removed, users should use `TimeoutLayer` instead TODO: - Refactor `Target`, `CallOpt` and `Config` - Support setting timeout at request level Signed-off-by: Yu Li <[email protected]>
- Loading branch information
1 parent
ff7c7af
commit 21ef067
Showing
9 changed files
with
261 additions
and
221 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,242 @@ | ||
//! Collections of some useful `Layer`s. | ||
use std::{error::Error, fmt, time::Duration}; | ||
|
||
use http::status::StatusCode; | ||
use motore::{layer::Layer, service::Service}; | ||
|
||
use crate::{ | ||
error::{client::request_error, ClientError}, | ||
response::ClientResponse, | ||
}; | ||
|
||
/// [`Layer`] for setting timeout to the request. | ||
/// | ||
/// See [`TimeoutLayer::new`] for more details. | ||
pub struct TimeoutLayer { | ||
duration: Duration, | ||
} | ||
|
||
impl TimeoutLayer { | ||
/// Create a new [`TimeoutLayer`] with given [`Duration`]. | ||
/// | ||
/// If the request times out, an error [`Timeout`] is returned. | ||
/// | ||
/// [`Timeout`]: crate::error::client::Timeout | ||
pub fn new(duration: Duration) -> Self { | ||
Self { duration } | ||
} | ||
} | ||
|
||
impl<S> Layer<S> for TimeoutLayer { | ||
type Service = TimeoutService<S>; | ||
|
||
fn layer(self, inner: S) -> Self::Service { | ||
TimeoutService { | ||
inner, | ||
duration: self.duration, | ||
} | ||
} | ||
} | ||
|
||
/// The [`Service`] generated by [`TimeoutLayer`]. | ||
/// | ||
/// See [`TimeoutLayer`] and [`TimeoutLayer::new`] for more details. | ||
pub struct TimeoutService<S> { | ||
inner: S, | ||
duration: Duration, | ||
} | ||
|
||
impl<Cx, Req, S> Service<Cx, Req> for TimeoutService<S> | ||
where | ||
Cx: Send, | ||
Req: Send, | ||
S: Service<Cx, Req, Error = ClientError> + Send + Sync, | ||
{ | ||
type Response = S::Response; | ||
type Error = S::Error; | ||
|
||
async fn call(&self, cx: &mut Cx, req: Req) -> Result<Self::Response, Self::Error> { | ||
let fut = self.inner.call(cx, req); | ||
let sleep = tokio::time::sleep(self.duration); | ||
|
||
tokio::select! { | ||
res = fut => res, | ||
_ = sleep => { | ||
tracing::error!("[Volo-HTTP] request timeout"); | ||
Err(crate::error::client::timeout()) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// [`Layer`] for throwing service error with the response's error status code. | ||
/// | ||
/// Users can use [`FailOnStatus::all`], [`FailOnStatus::client_error`] or | ||
/// [`FailOnStatus::server_error`] for creating the [`FailOnStatus`] layer that convert all (4XX and | ||
/// 5XX), client error (4XX) or server error (5XX) to a error of service. | ||
pub struct FailOnStatus { | ||
client_error: bool, | ||
server_error: bool, | ||
} | ||
|
||
impl FailOnStatus { | ||
/// Create a [`FailOnStatus`] layer that return error [`StatusCodeError`] for all error status | ||
/// codes (4XX and 5XX). | ||
pub fn all() -> Self { | ||
Self { | ||
client_error: true, | ||
server_error: true, | ||
} | ||
} | ||
|
||
/// Create a [`FailOnStatus`] layer that return error [`StatusCodeError`] for client error | ||
/// status codes (4XX). | ||
pub fn client_error() -> Self { | ||
Self { | ||
client_error: true, | ||
server_error: false, | ||
} | ||
} | ||
|
||
/// Create a [`FailOnStatus`] layer that return error [`StatusCodeError`] for server error | ||
/// status codes (5XX). | ||
pub fn server_error() -> Self { | ||
Self { | ||
client_error: false, | ||
server_error: true, | ||
} | ||
} | ||
} | ||
|
||
impl<S> Layer<S> for FailOnStatus { | ||
type Service = FailOnStatusService<S>; | ||
|
||
fn layer(self, inner: S) -> Self::Service { | ||
FailOnStatusService { | ||
inner, | ||
fail_on: self, | ||
} | ||
} | ||
} | ||
|
||
/// The [`Service`] generated by [`FailOnStatus`] layer. | ||
/// | ||
/// See [`FailOnStatus`] for more details. | ||
pub struct FailOnStatusService<S> { | ||
inner: S, | ||
fail_on: FailOnStatus, | ||
} | ||
|
||
impl<Cx, Req, S, B> Service<Cx, Req> for FailOnStatusService<S> | ||
where | ||
Cx: Send, | ||
Req: Send, | ||
S: Service<Cx, Req, Response = ClientResponse<B>, Error = ClientError> + Send + Sync, | ||
{ | ||
type Response = S::Response; | ||
type Error = S::Error; | ||
|
||
async fn call(&self, cx: &mut Cx, req: Req) -> Result<Self::Response, Self::Error> { | ||
let resp = self.inner.call(cx, req).await?; | ||
let status = resp.status(); | ||
if (self.fail_on.client_error && status.is_client_error()) | ||
|| (self.fail_on.server_error && status.is_server_error()) | ||
{ | ||
Err(request_error(StatusCodeError::new(status))) | ||
} else { | ||
Ok(resp) | ||
} | ||
} | ||
} | ||
|
||
/// Client received a response with an error status code. | ||
pub struct StatusCodeError { | ||
/// The original status code | ||
pub status: StatusCode, | ||
} | ||
|
||
impl StatusCodeError { | ||
fn new(status: StatusCode) -> Self { | ||
Self { status } | ||
} | ||
} | ||
|
||
impl fmt::Debug for StatusCodeError { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.debug_struct("StatusCodeError") | ||
.field("status", &self.status) | ||
.finish() | ||
} | ||
} | ||
|
||
impl fmt::Display for StatusCodeError { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
write!(f, "client received an error status code: {}", self.status) | ||
} | ||
} | ||
|
||
impl Error for StatusCodeError {} | ||
|
||
#[cfg(test)] | ||
mod client_layers_tests { | ||
use http::status::StatusCode; | ||
use motore::service::Service; | ||
|
||
use super::FailOnStatus; | ||
use crate::{ | ||
body::Body, client::test_helpers::MockTransport, context::ClientContext, | ||
error::ClientError, request::ClientRequest, response::ClientResponse, ClientBuilder, | ||
}; | ||
|
||
struct ReturnStatus; | ||
|
||
impl Service<ClientContext, ClientRequest> for ReturnStatus { | ||
type Response = ClientResponse; | ||
type Error = ClientError; | ||
|
||
fn call( | ||
&self, | ||
_: &mut ClientContext, | ||
req: ClientRequest, | ||
) -> impl std::future::Future<Output = Result<Self::Response, Self::Error>> + Send { | ||
let path = req.uri().path(); | ||
assert_eq!(&path[..1], "/"); | ||
let status_code = path[1..].parse::<u16>().expect("invalid uri"); | ||
let status_code = StatusCode::from_u16(status_code).expect("invalid status code"); | ||
let mut resp = ClientResponse::new(Body::empty()); | ||
*resp.status_mut() = status_code; | ||
async { Ok(resp) } | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn fail_on_status_test() { | ||
{ | ||
// Reject all error status codes | ||
let client = ClientBuilder::new() | ||
.layer_outer_front(FailOnStatus::all()) | ||
.mock(MockTransport::service(ReturnStatus)); | ||
client.get("/400").send().await.unwrap_err(); | ||
client.get("/500").send().await.unwrap_err(); | ||
} | ||
{ | ||
// Reject client error status codes | ||
let client = ClientBuilder::new() | ||
.layer_outer_front(FailOnStatus::client_error()) | ||
.mock(MockTransport::service(ReturnStatus)); | ||
client.get("/400").send().await.unwrap_err(); | ||
// 5XX is server error, it should not be handled | ||
client.get("/500").send().await.unwrap(); | ||
} | ||
{ | ||
// Reject all error status codes | ||
let client = ClientBuilder::new() | ||
.layer_outer_front(FailOnStatus::server_error()) | ||
.mock(MockTransport::service(ReturnStatus)); | ||
// 4XX is client error, it should not be handled | ||
client.get("/400").send().await.unwrap(); | ||
client.get("/500").send().await.unwrap_err(); | ||
} | ||
} | ||
} |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.