Skip to content

Commit

Permalink
feat(viz): add Server
Browse files Browse the repository at this point in the history
  • Loading branch information
fundon committed Dec 30, 2023
1 parent f33585b commit 8748177
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 50 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[workspace]
resolver = "2"
members = [
# "viz",
"viz",
"viz-core",
"viz-handlers",
"viz-macros",
"viz-router",
"viz-tower",
"viz-test",
# "viz-test",

"examples/hello-world",
# "examples/unix-socket",
Expand Down
11 changes: 6 additions & 5 deletions examples/hello-world/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{net::SocketAddr, str::FromStr};
use tokio::net::TcpListener;
use viz::{serve, Request, Result, Router, Tree};
use viz::{Request, Result, Router, Server, Tree};

async fn index(_: Request) -> Result<String> {
Ok(String::from("Hello, World!"))
Expand All @@ -23,8 +23,9 @@ async fn main() -> Result<()> {

let tree = Tree::from(app);

loop {
let (stream, addr) = listener.accept().await?;
tokio::task::spawn(serve(stream, tree.clone(), Some(addr)));
}
let server = Server::new(listener, tree);

if let Err(_) = server.await {}

Ok(())
}
2 changes: 1 addition & 1 deletion viz-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ fn generate_handler(input: TokenStream) -> Result<TokenStream> {
type Output = viz_core::Result<viz_core::Response>;

#[allow(unused, unused_mut)]
fn call(&self, mut req: viz_core::Request) -> viz_core::future::BoxFuture<'static, Self::Output> {
fn call(&self, mut req: viz_core::Request) -> viz_core::BoxFuture<Self::Output> {
Box::pin(async move {
#ast
let res = #name(#(#extractors),*)#asyncness;
Expand Down
6 changes: 3 additions & 3 deletions viz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ otel-tracing = ["otel", "viz-core/otel-tracing"]
otel-metrics = ["otel", "viz-core/otel-metrics"]
otel-prometheus = ["handlers", "viz-handlers?/prometheus"]

rustls = ["dep:rustls-pemfile", "dep:tokio-rustls", "dep:futures-util"]
native-tls = ["dep:tokio-native-tls", "dep:futures-util"]
rustls = ["dep:rustls-pemfile", "dep:tokio-rustls"]
native-tls = ["dep:tokio-native-tls"]

[dependencies]
viz-core.workspace = true
Expand All @@ -75,10 +75,10 @@ viz-macros = { workspace = true, optional = true }

hyper.workspace = true
hyper-util.workspace = true
futures-util.workspace = true

rustls-pemfile = { workspace = true, optional = true }

futures-util = { workspace = true, optional = true }
tokio-native-tls = { workspace = true, optional = true }
tokio-rustls = { workspace = true, optional = true }
tokio.workspace = true
Expand Down
9 changes: 5 additions & 4 deletions viz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,11 @@ mod responder;
pub use responder::Responder;

#[cfg(any(feature = "http1", feature = "http2"))]
mod serve;
// mod serve;
#[cfg(any(feature = "http1", feature = "http2"))]
pub use serve::{serve, serve_with_upgrades};
// pub use serve::{serve, serve_with_upgrades};
mod server;
pub use server::*;

/// TLS
pub mod tls;
Expand All @@ -543,8 +545,7 @@ pub use viz_router::*;
pub use viz_handlers as handlers;

#[cfg(any(feature = "http1", feature = "http2"))]
pub use hyper::server;

// pub use hyper::server;
#[cfg(feature = "macros")]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
#[doc(inline)]
Expand Down
29 changes: 18 additions & 11 deletions viz/src/responder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
use std::{convert::Infallible, sync::Arc};

use crate::{
future::{FutureExt, TryFutureExt},
Expand All @@ -8,36 +8,40 @@ use crate::{

/// Handles the HTTP [`Request`] and retures the HTTP [`Response`].
#[derive(Debug)]
pub struct Responder {
pub struct Responder<A> {
tree: Tree,
addr: Option<SocketAddr>,
remote_addr: Option<A>,
}

impl Responder {
impl<A> Responder<A> {
/// Creates a Responder for handling the [`Request`].
#[must_use]
pub fn new(tree: Tree, addr: Option<SocketAddr>) -> Self {
Self { tree, addr }
pub fn new(tree: Tree, remote_addr: Option<A>) -> Self {
Self { tree, remote_addr }

Check warning on line 20 in viz/src/responder.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/responder.rs#L19-L20

Added lines #L19 - L20 were not covered by tests
}
}

impl Handler<Request<Incoming>> for Responder {
impl<A> Handler<Request<Incoming>> for Responder<A>
where
A: Clone + Send + Sync + 'static,
{
type Output = Result<Response, Infallible>;

fn call(&self, mut req: Request<Incoming>) -> BoxFuture<Self::Output> {
let Self { remote_addr, tree } = self;

Check warning on line 31 in viz/src/responder.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/responder.rs#L30-L31

Added lines #L30 - L31 were not covered by tests
let method = req.method().clone();
let path = req.uri().path().to_string();

let matched = self.tree.find(&method, &path).or_else(|| {
let matched = tree.find(&method, &path).or_else(|| {
if method == Method::HEAD {
self.tree.find(&Method::GET, &path)
tree.find(&Method::GET, &path)

Check warning on line 37 in viz/src/responder.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/responder.rs#L33-L37

Added lines #L33 - L37 were not covered by tests
} else {
None

Check warning on line 39 in viz/src/responder.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/responder.rs#L39

Added line #L39 was not covered by tests
}
});

Check warning on line 41 in viz/src/responder.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/responder.rs#L41

Added line #L41 was not covered by tests

if let Some((handler, route)) = matched {
req.extensions_mut().insert(self.addr);
req.extensions_mut().insert(remote_addr.clone());
req.extensions_mut().insert(Arc::from(RouteInfo {
id: *route.id,
pattern: route.pattern(),
Expand All @@ -56,7 +60,10 @@ impl Handler<Request<Incoming>> for Responder {
}
}

impl hyper::service::Service<Request<Incoming>> for Responder {
impl<A> hyper::service::Service<Request<Incoming>> for Responder<A>
where
A: Clone + Send + Sync + 'static,
{
type Response = Response;
type Error = Infallible;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
Expand Down
48 changes: 24 additions & 24 deletions viz/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ use crate::Responder;
/// # Errors
///
/// Will return `Err` if the connection does not be served.
pub async fn serve<I>(stream: I, tree: Tree, addr: Option<SocketAddr>) -> Result<()>
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Builder::new(TokioExecutor::new())
.serve_connection(Io::new(stream), Responder::new(tree, addr))
.await
.map_err(Into::into)
}

/// Serve the connections with upgrades.
///
/// # Errors
///
/// Will return `Err` if the connection does not be served.
pub async fn serve_with_upgrades<I>(stream: I, tree: Tree, addr: Option<SocketAddr>) -> Result<()>
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Builder::new(TokioExecutor::new())
.serve_connection_with_upgrades(Io::new(stream), Responder::new(tree, addr))
.await
.map_err(Into::into)
}
// pub async fn serve<I>(stream: I, tree: Tree, addr: Option<SocketAddr>) -> Result<()>
// where
// I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
// {
// Builder::new(TokioExecutor::new())
// .serve_connection(Io::new(stream), Responder::new(tree, addr))
// .await
// .map_err(Into::into)
// }
//
// /// Serve the connections with upgrades.
// ///
// /// # Errors
// ///
// /// Will return `Err` if the connection does not be served.
// pub async fn serve_with_upgrades<I>(stream: I, tree: Tree, addr: Option<SocketAddr>) -> Result<()>
// where
// I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
// {
// Builder::new(TokioExecutor::new())
// .serve_connection_with_upgrades(Io::new(stream), Responder::new(tree, addr))
// .await
// .map_err(Into::into)
// }
77 changes: 77 additions & 0 deletions viz/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::future::{Future, IntoFuture};
use std::io::Result;

use futures_util::{pin_mut, TryFutureExt};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder,
};
use tokio::io::{AsyncRead, AsyncWrite};

use crate::{BoxFuture, Responder, Tree};

mod accept;
pub use accept::Accept;

mod tcp;
pub use tcp::*;

mod unix;
pub use unix::*;

pub struct Server<L, E> {
tree: Tree,
listener: L,
builder: Builder<E>,
}

impl<L, E> Server<L, E> {
pub fn listener(&self) -> &L {
&self.listener
}

Check warning on line 31 in viz/src/server.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/server.rs#L29-L31

Added lines #L29 - L31 were not covered by tests

pub fn builder(&mut self) -> &mut Builder<E> {
&mut self.builder
}

Check warning on line 35 in viz/src/server.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/server.rs#L33-L35

Added lines #L33 - L35 were not covered by tests
}

impl<L> Server<L, TokioExecutor> {
pub fn new(listener: L, tree: Tree) -> Self {
Self {
tree,
listener,
builder: Builder::new(TokioExecutor::new()),
}
}

Check warning on line 45 in viz/src/server.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/server.rs#L39-L45

Added lines #L39 - L45 were not covered by tests
}

impl<L> IntoFuture for Server<L, TokioExecutor>
where
L: Accept + Send + 'static,
L::Conn: AsyncWrite + AsyncRead + Unpin + Send,
L::Addr: Clone + Send + Sync + 'static,
{
type Output = Result<()>;
type IntoFuture = BoxFuture<Self::Output>;

fn into_future(self) -> Self::IntoFuture {
let Self {
tree,
listener,
builder,
} = self;

Box::pin(async move {
loop {
let (stream, remote_addr) = listener.accept().await?;
let io = TokioIo::new(stream);
let builder = builder.clone();
let responder = Responder::<L::Addr>::new(tree.clone(), Some(remote_addr));

tokio::spawn(async move {
if let Err(_) = builder.serve_connection(io, responder).await {}
});

Check warning on line 73 in viz/src/server.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/server.rs#L57-L73

Added lines #L57 - L73 were not covered by tests
}
})
}

Check warning on line 76 in viz/src/server.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/server.rs#L75-L76

Added lines #L75 - L76 were not covered by tests
}
9 changes: 9 additions & 0 deletions viz/src/server/accept.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::future::Future;
use std::io::Result;

pub trait Accept {
type Conn;
type Addr;

fn accept(&self) -> impl Future<Output = Result<(Self::Conn, Self::Addr)>> + Send;
}
14 changes: 14 additions & 0 deletions viz/src/server/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use std::future::Future;
use std::io::Result;
use std::net::SocketAddr;

use tokio::net::{TcpListener, TcpStream};

impl super::Accept for TcpListener {
type Conn = TcpStream;
type Addr = SocketAddr;

fn accept(&self) -> impl Future<Output = Result<(Self::Conn, Self::Addr)>> + Send {
TcpListener::accept(self)
}

Check warning on line 13 in viz/src/server/tcp.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/server/tcp.rs#L11-L13

Added lines #L11 - L13 were not covered by tests
}
13 changes: 13 additions & 0 deletions viz/src/server/unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::future::Future;
use std::io::Result;

use tokio::net::{unix::SocketAddr, UnixListener, UnixStream};

impl super::Accept for UnixListener {
type Conn = UnixStream;
type Addr = SocketAddr;

fn accept(&self) -> impl Future<Output = Result<(Self::Conn, Self::Addr)>> + Send {
UnixListener::accept(self)
}

Check warning on line 12 in viz/src/server/unix.rs

View check run for this annotation

Codecov / codecov/patch

viz/src/server/unix.rs#L10-L12

Added lines #L10 - L12 were not covered by tests
}

0 comments on commit 8748177

Please sign in to comment.