Skip to content

Commit

Permalink
feat: an adapter for tower service (#124)
Browse files Browse the repository at this point in the history
* feat: an adapter for tower service

* chore: short form
  • Loading branch information
fundon authored Dec 22, 2023
1 parent 5f25ac4 commit 93d2b51
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"viz-handlers",
"viz-macros",
"viz-router",
"viz-tower",
"viz-test",

"examples/hello-world",
Expand Down
30 changes: 21 additions & 9 deletions viz-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,34 @@ impl Body {
Self::Empty
}

/// Wraps a body into box.
pub fn wrap<B>(body: B) -> Self
where
B: HttpBody + Send + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
{
body.map_frame(|frame| frame.map_data(Into::into))
.map_err(Error::boxed)
.boxed_unsync()
.into()
}

/// A body created from a [`Stream`].
pub fn from_stream<S>(stream: S) -> Self
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
Self::Boxed(SyncWrapper::new(
StreamBody::new(
stream
.map_ok(Into::into)
.map_ok(Frame::data)
.map_err(Error::boxed),
)
.boxed_unsync(),
))
StreamBody::new(
stream
.map_ok(Into::into)
.map_ok(Frame::data)
.map_err(Error::boxed),
)
.boxed_unsync()
.into()
}

/// A stream created from a [`http_body::Body`].
Expand Down
2 changes: 1 addition & 1 deletion viz-core/src/handler/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ where
body.map_frame(|f| f.map_data(Into::into))
.map_err(Into::into)
.boxed_unsync()
.into()
})
.map(Into::into)
})
.map_err(Into::into)
}
Expand Down
4 changes: 2 additions & 2 deletions viz-core/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,11 @@ impl RequestExt for Request {
match state {
BodyState::Empty => Err(PayloadError::Empty)?,
BodyState::Used => Err(PayloadError::Used)?,
BodyState::Normal => unreachable!(),
BodyState::Normal => {}
}
}

let (state, result) = match std::mem::replace(self.body_mut(), Body::empty()) {
let (state, result) = match std::mem::replace(self.body_mut(), Body::Empty) {
Body::Empty => (BodyState::Empty, Err(PayloadError::Empty)),
body => (BodyState::Used, Ok(body)),
};
Expand Down
22 changes: 22 additions & 0 deletions viz-tower/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "viz-tower"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
documentation.workspace = true
repository.workspace = true
license.workspace = true
rust-version.workspace = true

[dependencies]
viz-core.workspace = true
http-body-util.workspace = true
tower = { version = "0.4", features = ["util"] }

[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "test-util"] }
tower-http = { version = "0.5", features = ["limit", "request-id", "timeout"] }

[lints]
workspace = true
112 changes: 112 additions & 0 deletions viz-tower/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//! An adapter that makes a tower [`Service`] into a [`Handler`].
use http_body_util::BodyExt;
use tower::{Service, ServiceExt};
use viz_core::{async_trait, BoxError, Bytes, Error, Handler, HttpBody, Request, Response, Result};

/// Converts a tower [`Service`] into a [`Handler`].
#[derive(Debug, Clone)]
pub struct TowerServiceHandler<S>(S);

impl<S> TowerServiceHandler<S> {
/// Creates a new [`TowerServiceHandler`].
pub fn new(s: S) -> Self {
Self(s)
}
}

#[async_trait]
impl<O, S> Handler<Request> for TowerServiceHandler<S>
where
O: HttpBody + Send + 'static,
O::Data: Into<Bytes>,
O::Error: Into<BoxError>,
S: Service<Request, Response = Response<O>> + Send + Sync + Clone + 'static,
S::Future: Send,
S::Error: Into<BoxError>,
{
type Output = Result<Response>;

async fn call(&self, req: Request) -> Self::Output {
self.0
.clone()
.oneshot(req)
.await
.map(|resp| {
resp.map(|body| {
body.map_frame(|f| f.map_data(Into::into))
.map_err(Error::boxed)
.boxed_unsync()
.into()
})
})
.map_err(Error::boxed)
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tower::util::{MapErrLayer, MapRequestLayer, MapResponseLayer};
use tower::{service_fn, ServiceBuilder};
use tower_http::{
limit::RequestBodyLimitLayer,
request_id::{MakeRequestId, RequestId, SetRequestIdLayer},
timeout::TimeoutLayer,
};
use viz_core::{
Body, BoxHandler, Handler, HandlerExt, IntoResponse, Request, RequestExt, Response,
};

#[derive(Clone, Default, Debug)]
struct MyMakeRequestId {
counter: Arc<AtomicU64>,
}

impl MakeRequestId for MyMakeRequestId {
fn make_request_id<B>(&mut self, _: &Request<B>) -> Option<RequestId> {
let request_id = self
.counter
.fetch_add(1, Ordering::SeqCst)
.to_string()
.parse()
.unwrap();

Some(RequestId::new(request_id))
}
}

async fn hello(mut req: Request) -> Result<Response> {
let bytes = req.bytes().await?;
Ok(bytes.into_response())
}

#[tokio::test]
async fn tower_service_into_handler() {
let hello_svc = service_fn(hello);

let svc = ServiceBuilder::new()
.layer(RequestBodyLimitLayer::new(1))
.layer(MapErrLayer::new(Error::from))
.layer(SetRequestIdLayer::x_request_id(MyMakeRequestId::default()))
.layer(MapResponseLayer::new(IntoResponse::into_response))
.layer(MapRequestLayer::new(|req: Request<_>| req.map(Body::wrap)))
.layer(TimeoutLayer::new(Duration::from_secs(10)))
.service(hello_svc);

let r0 = Request::new(Body::Full("12".into()));
let h0 = TowerServiceHandler::new(svc);
assert!(h0.call(r0).await.is_err());

let r1 = Request::new(Body::Full("1".into()));
let b0: BoxHandler = h0.boxed();
assert!(b0.call(r1).await.is_ok());
}
}

0 comments on commit 93d2b51

Please sign in to comment.