Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: an adapter for tower service #124

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: an adapter for tower service
fundon committed Dec 22, 2023
commit 72853595a62d1b70cbd5697406bd7e0a5951216f
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ members = [
"viz-handlers",
"viz-macros",
"viz-router",
"viz-tower",
"viz-test",

"examples/hello-world",
14 changes: 14 additions & 0 deletions viz-core/src/body.rs
Original file line number Diff line number Diff line change
@@ -45,6 +45,20 @@ impl Body {
Self::Empty
}

/// Wraps a body into box.
pub fn wrap<B>(body: B) -> Self
where
B: HttpBody + Send + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
{
Self::Boxed(SyncWrapper::new(
body.map_frame(|frame| frame.map_data(Into::into))
.map_err(Error::boxed)
.boxed_unsync(),
))
}

/// A body created from a [`Stream`].
pub fn from_stream<S>(stream: S) -> Self
where
4 changes: 2 additions & 2 deletions viz-core/src/request.rs
Original file line number Diff line number Diff line change
@@ -254,11 +254,11 @@ impl RequestExt for Request {
match state {
BodyState::Empty => Err(PayloadError::Empty)?,
BodyState::Used => Err(PayloadError::Used)?,
BodyState::Normal => unreachable!(),
BodyState::Normal => {}
}
}

let (state, result) = match std::mem::replace(self.body_mut(), Body::empty()) {
let (state, result) = match std::mem::replace(self.body_mut(), Body::Empty) {
Body::Empty => (BodyState::Empty, Err(PayloadError::Empty)),
body => (BodyState::Used, Ok(body)),
};
22 changes: 22 additions & 0 deletions viz-tower/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "viz-tower"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
documentation.workspace = true
repository.workspace = true
license.workspace = true
rust-version.workspace = true

[dependencies]
viz-core.workspace = true
http-body-util.workspace = true
tower = { version = "0.4", features = ["util"] }

[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "test-util"] }
tower-http = { version = "0.5", features = ["limit", "request-id", "timeout"] }

[lints]
workspace = true
112 changes: 112 additions & 0 deletions viz-tower/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//! An adapter that makes a tower [`Service`] into a [`Handler`].

use http_body_util::BodyExt;
use tower::{Service, ServiceExt};
use viz_core::{async_trait, BoxError, Bytes, Error, Handler, HttpBody, Request, Response, Result};

/// Converts a tower [`Service`] into a [`Handler`].
#[derive(Debug, Clone)]
pub struct TowerServiceHandler<S>(S);

impl<S> TowerServiceHandler<S> {
/// Creates a new [`TowerServiceHandler`].
pub fn new(s: S) -> Self {
Self(s)
}
}

#[async_trait]
impl<O, S> Handler<Request> for TowerServiceHandler<S>
where
O: HttpBody + Send + 'static,
O::Data: Into<Bytes>,
O::Error: Into<BoxError>,
S: Service<Request, Response = Response<O>> + Send + Sync + Clone + 'static,
S::Future: Send,
S::Error: Into<BoxError>,
{
type Output = Result<Response>;

async fn call(&self, req: Request) -> Self::Output {
self.0
.clone()
.oneshot(req)
.await
.map(|resp| {
resp.map(|body| {
body.map_frame(|f| f.map_data(Into::into))
.map_err(Error::boxed)
.boxed_unsync()
})
.map(Into::into)
})
.map_err(Error::boxed)
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tower::util::{MapErrLayer, MapRequestLayer, MapResponseLayer};
use tower::{service_fn, ServiceBuilder};
use tower_http::{
limit::RequestBodyLimitLayer,
request_id::{MakeRequestId, RequestId, SetRequestIdLayer},
timeout::TimeoutLayer,
};
use viz_core::{
Body, BoxHandler, Handler, HandlerExt, IntoResponse, Request, RequestExt, Response,
};

#[derive(Clone, Default, Debug)]
struct MyMakeRequestId {
counter: Arc<AtomicU64>,
}

impl MakeRequestId for MyMakeRequestId {
fn make_request_id<B>(&mut self, _: &Request<B>) -> Option<RequestId> {
let request_id = self
.counter
.fetch_add(1, Ordering::SeqCst)
.to_string()
.parse()
.unwrap();

Some(RequestId::new(request_id))
}
}

async fn hello(mut req: Request) -> Result<Response> {
let bytes = req.bytes().await?;
Ok(bytes.into_response())
}

#[tokio::test]
async fn tower_service_into_handler() {
let hello_svc = service_fn(hello);

let svc = ServiceBuilder::new()
.layer(RequestBodyLimitLayer::new(1))
.layer(MapErrLayer::new(Error::from))
.layer(SetRequestIdLayer::x_request_id(MyMakeRequestId::default()))
.layer(MapResponseLayer::new(IntoResponse::into_response))
.layer(MapRequestLayer::new(|req: Request<_>| req.map(Body::wrap)))
.layer(TimeoutLayer::new(Duration::from_secs(10)))
.service(hello_svc);

let r0 = Request::new(Body::Full("12".into()));
let h0 = TowerServiceHandler::new(svc);
assert!(h0.call(r0).await.is_err());

let r1 = Request::new(Body::Full("1".into()));
let b0: BoxHandler = h0.boxed();
assert!(b0.call(r1).await.is_ok());
}
}