Skip to content

Commit

Permalink
feat(examples): tower
Browse files Browse the repository at this point in the history
  • Loading branch information
fundon committed Dec 23, 2023
1 parent ac273e4 commit 0e70cec
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 40 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ members = [
"examples/graceful-shutdown",
"examples/databases/*",
"examples/htmlx",
"examples/tower",
]

[workspace.package]
Expand All @@ -52,6 +53,7 @@ viz-router = { version = "0.7.1", path = "viz-router" }
viz-handlers = { version = "0.7.1", path = "viz-handlers", default-features = false }
viz-macros = { version = "0.2.0", path = "viz-macros" }
viz-test = { version = "0.2.0", path = "viz-test" }
viz-tower = { version = "0.1.0", path = "viz-tower" }

async-trait = "0.1"
dyn-clone = "1.0"
Expand Down Expand Up @@ -103,6 +105,10 @@ prometheus = "0.13"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Tower
tower = "0.4"
tower-http = "0.5"

[workspace.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
18 changes: 18 additions & 0 deletions examples/tower/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "tower-example"
version = "0.1.0"
edition.workspace = true
publish = false

[dependencies]
viz.workspace = true
viz-tower.workspace = true

tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tower.workspace = true
tower-http = { workspace = true, features = ["full"] }

[lints]
workspace = true
70 changes: 70 additions & 0 deletions examples/tower/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//! Viz + Tower services
use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
use tower::util::{MapErrLayer, MapRequestLayer, MapResponseLayer};
use tower::{service_fn, ServiceBuilder};
use tower_http::limit::RequestBodyLimitLayer;
use tower_http::request_id::{MakeRequestUuid, SetRequestIdLayer};
use tower_http::trace::TraceLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use viz::{serve, Body, Error, IntoResponse, Request, Response, Result, Router, Tree};
use viz_tower::{Layered, TowerServiceHandler};

async fn index(_: Request) -> Result<Response> {
Ok("Hello, World!".into_response())
}

async fn about(_: Request) -> Result<&'static str> {
Ok("About me!")
}

async fn any(_: Request) -> Result<String> {
Ok("std::any::Any".to_string())
}

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "tower-example=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

let index_svc = ServiceBuilder::new()
.layer(MapRequestLayer::new(|req: Request<_>| req.map(Body::wrap)))
.service(service_fn(index));
let index_handler = TowerServiceHandler::new(index_svc);

let any_svc = ServiceBuilder::new()
.layer(MapResponseLayer::new(IntoResponse::into_response))
.service_fn(any);
let any_handler = TowerServiceHandler::new(any_svc);

let layer = ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.layer(RequestBodyLimitLayer::new(1024))
.layer(SetRequestIdLayer::x_request_id(MakeRequestUuid))
.layer(MapErrLayer::new(Error::from))
.layer(MapResponseLayer::new(IntoResponse::into_response))
.layer(MapRequestLayer::new(|req: Request<_>| req.map(Body::wrap)));

let app = Router::new()
.get("/", index_handler)
.get("/about", about)
.any("/*", any_handler)
.with(Layered::new(layer));
let tree = Arc::new(Tree::from(app));

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await?;
println!("listening on http://{addr}");

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(serve(stream, tree, Some(addr)));
}
}
2 changes: 1 addition & 1 deletion examples/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ publish = false
[dependencies]
viz.workspace = true

tokio = { workspace = true, features = [ "rt-multi-thread", "macros" ] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
13 changes: 10 additions & 3 deletions viz-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ impl Body {
B::Data: Into<Bytes>,
B::Error: Into<BoxError>,
{
body.map_frame(|frame| frame.map_data(Into::into))
.map_err(Error::boxed)
.boxed_unsync()
// Copied from Axum, thanks.
let mut body = Some(body);
<dyn std::any::Any>::downcast_mut::<Option<UnsyncBoxBody<Bytes, Error>>>(&mut body)
.and_then(Option::take)
.unwrap_or_else(|| {
body.unwrap()
.map_frame(|frame| frame.map_data(Into::into))
.map_err(Error::boxed)
.boxed_unsync()
})
.into()
}

Expand Down
30 changes: 11 additions & 19 deletions viz-core/src/handler/service.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,38 @@
use http_body_util::BodyExt;
use hyper::service::Service;

use crate::{async_trait, Bytes, Error, Handler, HttpBody, Request, Response, Result};
use crate::{
async_trait, Body, BoxError, Bytes, Error, Handler, HttpBody, Request, Response, Result,
};

/// Converts a hyper [`Service`] to a viz [`Handler`].
#[derive(Debug, Clone)]
pub struct ServiceHandler<S> {
s: S,
}
pub struct ServiceHandler<S>(S);

impl<S> ServiceHandler<S> {
/// Creates a new [`ServiceHandler`].
pub fn new(s: S) -> Self {
Self { s }
Self(s)
}
}

#[async_trait]
impl<I, O, S> Handler<Request<I>> for ServiceHandler<S>
where
I: HttpBody + Send + Unpin + 'static,
I: HttpBody + Send + 'static,
O: HttpBody + Send + 'static,
O::Data: Into<Bytes>,
O::Error: Into<Error>,
O::Error: Into<BoxError>,
S: Service<Request<I>, Response = Response<O>> + Send + Sync + Clone + 'static,
S::Future: Send,
S::Error: Into<Error>,
S::Error: Into<BoxError>,
{
type Output = Result<Response>;

async fn call(&self, req: Request<I>) -> Self::Output {
self.s
self.0
.call(req)
.await
.map(|resp| {
resp.map(|body| {
body.map_frame(|f| f.map_data(Into::into))
.map_err(Into::into)
.boxed_unsync()
.into()
})
})
.map_err(Into::into)
.map(|resp| resp.map(Body::wrap))
.map_err(Error::boxed)
}
}
10 changes: 5 additions & 5 deletions viz-router/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ mod tests {
req.extensions_mut().insert(Arc::from(RouteInfo {
id: *route.id,
pattern: route.pattern(),
params: Into::<Params>::into(route.params()),
params: route.params().into(),
}));
assert_eq!(
h.call(req).await?.into_body().collect().await?.to_bytes(),
Expand Down Expand Up @@ -396,7 +396,7 @@ mod tests {
req.extensions_mut().insert(Arc::from(RouteInfo {
id: *route.id,
pattern: route.pattern(),
params: Into::<Params>::into(route.params()),
params: route.params().into(),
}));
assert_eq!(
h.call(req).await?.into_body().collect().await?.to_bytes(),
Expand Down Expand Up @@ -431,7 +431,7 @@ mod tests {
req.extensions_mut().insert(Arc::from(RouteInfo {
id: *route.id,
pattern: route.pattern(),
params: Into::<Params>::into(route.params()),
params: route.params().into(),
}));
assert_eq!(
h.call(req).await?.into_body().collect().await?.to_bytes(),
Expand All @@ -446,7 +446,7 @@ mod tests {
let route_info = Arc::from(RouteInfo {
id: *route.id,
pattern: route.pattern(),
params: Into::<Params>::into(route.params()),
params: route.params().into(),
});
assert_eq!(route.pattern(), "/posts/:post_id/users/:user_id");
assert_eq!(route_info.pattern, "/posts/:post_id/users/:user_id");
Expand All @@ -464,7 +464,7 @@ mod tests {
req.extensions_mut().insert(Arc::from(RouteInfo {
id: *route.id,
pattern: route.pattern(),
params: Into::<Params>::into(route.params()),
params: route.params().into(),
}));
assert_eq!(
h.call(req).await?.into_body().collect().await?.to_bytes(),
Expand Down
5 changes: 3 additions & 2 deletions viz-tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name = "viz-tower"
version = "0.1.0"
documentation = "https://docs.rs/viz-tower"
description = "An adapter for tower service"
readme = "README.md"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
Expand All @@ -13,11 +14,11 @@ rust-version.workspace = true
[dependencies]
viz-core.workspace = true
http-body-util.workspace = true
tower = { version = "0.4", features = ["util"] }
tower = { workspace = true, features = ["util"] }

[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "test-util"] }
tower-http = { version = "0.5", features = ["limit", "request-id", "timeout"] }
tower-http = { workspace = true, features = ["limit", "request-id", "timeout"] }

[lints]
workspace = true
9 changes: 9 additions & 0 deletions viz-tower/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
viz-tower
---------

An adapter that makes a tower [`Service`] into a [`Handler`].

See [tower example](../examples/tower/).

[`Service`]: https://docs.rs/tower/latest/tower/trait.Service.html
[`Handler`]: https://docs.rs/viz/latest/viz/trait.Handler.html
20 changes: 10 additions & 10 deletions viz-tower/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
//! An adapter that makes a tower [`Service`] into a [`Handler`].
use http_body_util::BodyExt;
use tower::{Service, ServiceExt};
use viz_core::{async_trait, BoxError, Bytes, Error, Handler, HttpBody, Request, Response, Result};
use viz_core::{
async_trait, Body, BoxError, Bytes, Error, Handler, HttpBody, Request, Response, Result,
};

mod service;
pub use service::HandlerService;

mod middleware;
pub use middleware::{Layered, Middleware};

/// Converts a tower [`Service`] into a [`Handler`].
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -32,14 +39,7 @@ where
.clone()
.oneshot(req)
.await
.map(|resp| {
resp.map(|body| {
body.map_frame(|f| f.map_data(Into::into))
.map_err(Error::boxed)
.boxed_unsync()
.into()
})
})
.map(|resp| resp.map(Body::wrap))
.map_err(Error::boxed)
}
}
Expand Down
68 changes: 68 additions & 0 deletions viz-tower/src/middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use tower::{Layer, Service, ServiceExt};
use viz_core::{
async_trait, Body, BoxError, Bytes, Error, Handler, HttpBody, Request, Response, Result,
Transform,
};

use crate::HandlerService;

/// Transforms a Tower layer into Viz Middleware.
#[derive(Debug)]
pub struct Layered<L>(L);

impl<L> Layered<L> {
/// Creates a new tower layer.
pub fn new(l: L) -> Self {
Self(l)
}
}

impl<L, H> Transform<H> for Layered<L>
where
L: Clone,
{
type Output = Middleware<L, H>;

fn transform(&self, h: H) -> Self::Output {
Middleware::new(self.0.clone(), h)
}
}

/// A [`Service`] created from a [`Handler`] by applying a Tower middleware.
#[derive(Debug, Clone)]
pub struct Middleware<L, H> {
l: L,
h: H,
}

impl<L, H> Middleware<L, H> {
/// Creates a new tower middleware.
pub fn new(l: L, h: H) -> Self {
Self { l, h }
}
}

#[async_trait]
impl<O, L, H> Handler<Request> for Middleware<L, H>
where
L: Layer<HandlerService<H>> + Send + Sync + Clone + 'static,
H: Handler<Request, Output = Result<Response>> + Send + Sync + Clone + 'static,
O: HttpBody + Send + 'static,
O::Data: Into<Bytes>,
O::Error: Into<BoxError>,
L::Service: Service<Request, Response = Response<O>> + Send + Sync + Clone + 'static,
<L::Service as Service<Request>>::Future: Send,
<L::Service as Service<Request>>::Error: Into<BoxError>,
{
type Output = Result<Response>;

async fn call(&self, req: Request) -> Self::Output {
self.l
.clone()
.layer(HandlerService::new(self.h.clone()))
.oneshot(req)
.await
.map(|resp| resp.map(Body::wrap))
.map_err(Error::boxed)
}
}
Loading

0 comments on commit 0e70cec

Please sign in to comment.