diff --git a/Cargo.toml b/Cargo.toml index 77ab7032..9cba898a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,33 +7,33 @@ members = [ "viz-macros", "viz-router", "viz-tower", - # "viz-test", + "viz-test", "examples/hello-world", - # "examples/unix-socket", - # "examples/static-files/embed", - # "examples/static-files/serve", - # "examples/static-files/include-dir", - # "examples/limits", - # "examples/forms/form", - # "examples/forms/multipart", - # "examples/websocket-chat", - # "examples/sse", - # "examples/session", - # "examples/csrf", - # "examples/cors", - # "examples/rustls", + "examples/unix-socket", + "examples/static-files/embed", + "examples/static-files/serve", + "examples/static-files/include-dir", + "examples/limits", + "examples/forms/form", + "examples/forms/multipart", + "examples/websocket-chat", + "examples/sse", + "examples/session", + "examples/csrf", + "examples/cors", + "examples/rustls", # "examples/static-routes", - # "examples/routing/todos", - # "examples/routing/openapi", - # "examples/otel/*", - # "examples/compression", - # "examples/templates/*", - # "examples/tracing", - # "examples/graceful-shutdown", - # "examples/databases/*", - # "examples/htmlx", - # "examples/tower", + "examples/routing/todos", + "examples/routing/openapi", + "examples/otel/*", + "examples/compression", + "examples/templates/*", + "examples/tracing", + "examples/graceful-shutdown", + "examples/databases/*", + "examples/htmlx", + "examples/tower", ] [workspace.package] diff --git a/examples/compression/src/main.rs b/examples/compression/src/main.rs index fb1d86cb..839b3a3a 100644 --- a/examples/compression/src/main.rs +++ b/examples/compression/src/main.rs @@ -1,10 +1,9 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; -use viz::{get, middleware::compression, serve, Request, Result, Router, Tree}; +use viz::{get, middleware::compression, serve, Request, Result, Router}; async fn index(_req: Request) -> Result<&'static str> { Ok("Hello, World!") @@ -19,15 +18,10 @@ async fn main() -> Result<()> { let app = Router::new() .route("/", get(index)) .with(compression::Config); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/cors/src/main.rs b/examples/cors/src/main.rs index b2178cea..f62b84df 100644 --- a/examples/cors/src/main.rs +++ b/examples/cors/src/main.rs @@ -1,9 +1,8 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; -use viz::{get, middleware::cors, serve, Method, Request, Result, Router, Tree}; +use viz::{get, middleware::cors, serve, Method, Request, Result, Router}; async fn index(_req: Request) -> Result<&'static str> { Ok("Hello, World!") @@ -27,15 +26,10 @@ async fn main() -> Result<()> { .route("/", get(index).options(options)) // .with(cors::Config::default()); // Default CORS config .with(custom_cors); // Our custom CORS config - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/csrf/src/main.rs b/examples/csrf/src/main.rs index c133c7a9..e6878b35 100644 --- a/examples/csrf/src/main.rs +++ b/examples/csrf/src/main.rs @@ -1,7 +1,6 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, time::Duration}; use tokio::net::TcpListener; use viz::{ @@ -10,7 +9,7 @@ use viz::{ csrf::{self, CsrfToken}, helper::CookieOptions, }, - serve, Method, Request, RequestExt, Result, Router, Tree, + serve, Method, Request, RequestExt, Result, Router, }; async fn index(mut req: Request) -> Result { @@ -39,15 +38,10 @@ async fn main() -> Result<()> { csrf::verify, )) .with(cookie::Config::default()); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/databases/sea-orm/src/main.rs b/examples/databases/sea-orm/src/main.rs index 8fba74a0..ced916ca 100644 --- a/examples/databases/sea-orm/src/main.rs +++ b/examples/databases/sea-orm/src/main.rs @@ -1,11 +1,10 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] //! `SeaOrm` example for Viz framework. use sea_orm_example::{api, db::init_db}; -use std::{env, net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{env, net::SocketAddr, path::PathBuf}; use tokio::net::TcpListener; -use viz::{handlers::serve, middleware, serve, types::State, Result, Router, Tree}; +use viz::{handlers::serve, middleware, serve, types::State, Result, Router}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -26,11 +25,10 @@ async fn main() -> Result<(), Box> { .delete("/todos/:id", api::delete) .with(State::new(db)) .with(middleware::limits::Config::new()); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(serve(stream, tree, Some(addr))); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/forms/form/src/main.rs b/examples/forms/form/src/main.rs index d862287e..9624979f 100644 --- a/examples/forms/form/src/main.rs +++ b/examples/forms/form/src/main.rs @@ -1,12 +1,11 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] use serde::{Deserialize, Serialize}; -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use viz::{ get, middleware::limits, serve, types::Form, IntoHandler, Request, Response, ResponseExt, - Result, Router, Tree, + Result, Router, }; #[derive(Deserialize, Serialize)] @@ -35,15 +34,10 @@ async fn main() -> Result<()> { .route("/", get(new).post(create.into_handler())) // limit body size .with(limits::Config::default()); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/forms/multipart/src/main.rs b/examples/forms/multipart/src/main.rs index dc31ea9e..83e661d0 100644 --- a/examples/forms/multipart/src/main.rs +++ b/examples/forms/multipart/src/main.rs @@ -1,15 +1,14 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] use futures_util::TryStreamExt; -use std::{fs::File, net::SocketAddr, sync::Arc}; +use std::{fs::File, net::SocketAddr}; use tempfile::tempdir; use tokio::net::TcpListener; use viz::{ middleware::limits, serve, types::{Multipart, PayloadError}, - IntoHandler, IntoResponse, Request, Response, ResponseExt, Result, Router, Tree, + IntoHandler, IntoResponse, Request, Response, ResponseExt, Result, Router, }; // HTML form for uploading photos @@ -54,15 +53,10 @@ async fn main() -> Result<()> { .post("/", upload.into_handler()) // limit body size .with(limits::Config::default()); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/graceful-shutdown/Cargo.toml b/examples/graceful-shutdown/Cargo.toml index 750df073..3f4e8af1 100644 --- a/examples/graceful-shutdown/Cargo.toml +++ b/examples/graceful-shutdown/Cargo.toml @@ -7,4 +7,6 @@ publish = false [dependencies] viz.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread", "macros", "time"] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] } +tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/examples/graceful-shutdown/src/main.rs b/examples/graceful-shutdown/src/main.rs index 6940c037..3a2b112a 100644 --- a/examples/graceful-shutdown/src/main.rs +++ b/examples/graceful-shutdown/src/main.rs @@ -1,13 +1,11 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -//! Graceful shutdown server. -//! -//! See - -use std::{net::SocketAddr, sync::Arc, time::Duration}; -use tokio::{net::TcpListener, pin}; -use viz::{server::conn::http1, Io, Request, Responder, Result, Router, Tree}; +use std::net::SocketAddr; +use tokio::net::TcpListener; +use tokio::signal; +use tracing::{error, info}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use viz::{serve, Request, Result, Router}; async fn index(_: Request) -> Result<&'static str> { Ok("Hello, World!") @@ -15,55 +13,49 @@ async fn index(_: Request) -> Result<&'static str> { #[tokio::main] async fn main() -> Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "tracing=debug,hyper=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = TcpListener::bind(addr).await?; - println!("listening on {addr}"); + info!("listening on {addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - - // Use a 5 second timeout for incoming connections to the server. - // If a request is in progress when the 5 second timeout elapses, - // use a 2 second timeout for processing the final request and graceful shutdown. - let connection_timeouts = vec![Duration::from_secs(5), Duration::from_secs(2)]; - loop { - // Clone the connection_timeouts so they can be passed to the new task. - let connection_timeouts_clone = connection_timeouts.clone(); + let server = serve(listener, app).signal(shutdown_signal()); - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); + if let Err(e) = server.await { + error!("{e}"); + } - tokio::task::spawn(async move { - // Pin the connection object so we can use tokio::select! below. - let conn = http1::Builder::new() - .serve_connection(Io::new(stream), Responder::new(tree, Some(addr))); - pin!(conn); + Ok(()) +} - // Iterate the timeouts. Use tokio::select! to wait on the - // result of polling the connection itself, - // and also on tokio::time::sleep for the current timeout duration. - for (iter, sleep_duration) in connection_timeouts_clone.iter().enumerate() { - println!("iter = {iter} sleep_duration = {sleep_duration:?}"); - tokio::select! { - res = conn.as_mut() => { - // Polling the connection returned a result. - // In this case print either the successful or error result for the connection - // and break out of the loop. - match res { - Ok(()) => println!("after polling conn, no error"), - Err(e) => println!("error serving connection: {e:?}"), - }; - break; - } - () = tokio::time::sleep(*sleep_duration) => { - // tokio::time::sleep returned a result. - // Call graceful_shutdown on the connection and continue the loop. - println!("iter = {iter} got timeout_interval, calling conn.graceful_shutdown"); - conn.as_mut().graceful_shutdown(); - } - } - } - }); +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, } } diff --git a/examples/hello-world/src/main.rs b/examples/hello-world/src/main.rs index 8c005f59..433134ae 100644 --- a/examples/hello-world/src/main.rs +++ b/examples/hello-world/src/main.rs @@ -1,9 +1,8 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] use std::{net::SocketAddr, str::FromStr}; use tokio::net::TcpListener; -use viz::{Request, Result, Router, Server, Tree}; +use viz::{serve, Request, Result, Router}; async fn index(_: Request) -> Result { Ok(String::from("Hello, World!")) @@ -21,11 +20,9 @@ async fn main() -> Result<()> { app = app.get(&format!("/{}", n), index); } - let tree = Tree::from(app); - - let server = Server::new(listener, tree); - - if let Err(_) = server.await {} + if let Err(e) = serve(listener, app).await { + println!("{e}"); + } Ok(()) } diff --git a/examples/htmlx/src/main.rs b/examples/htmlx/src/main.rs index 3dd61a45..1a032ce4 100644 --- a/examples/htmlx/src/main.rs +++ b/examples/htmlx/src/main.rs @@ -13,7 +13,7 @@ use std::{ use tokio::net::TcpListener; use viz::{ header::HeaderValue, middleware::limits, serve, types::State, Error, IntoResponse, Request, - RequestExt, Response, ResponseExt, Result, Router, StatusCode, Tree, + RequestExt, Response, ResponseExt, Result, Router, StatusCode, }; /// In-memory todo store @@ -100,15 +100,10 @@ async fn main() -> Result<()> { .any("/*", |_| async { Ok(Response::text("Welcome!")) }) .with(State::new(DB::default())) .with(limits::Config::default()); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/limits/src/main.rs b/examples/limits/src/main.rs index 4de0df46..0f2da782 100644 --- a/examples/limits/src/main.rs +++ b/examples/limits/src/main.rs @@ -1,7 +1,6 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use viz::{ middleware::limits, @@ -12,7 +11,6 @@ use viz::{ RequestExt, Result, Router, - Tree, }; async fn echo(mut req: Request) -> Result { @@ -36,15 +34,10 @@ async fn main() -> Result<()> { .post("/", echo) // limit body size .with(limits::Config::default().limits(limits)); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/otel/metrics/src/main.rs b/examples/otel/metrics/src/main.rs index 97fe9b1e..cbec1ce4 100644 --- a/examples/otel/metrics/src/main.rs +++ b/examples/otel/metrics/src/main.rs @@ -1,7 +1,6 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use opentelemetry::{global, KeyValue}; @@ -13,7 +12,7 @@ use opentelemetry_sdk::{ use viz::{ handlers::prometheus::{ExporterBuilder, Prometheus, Registry}, middleware::otel, - serve, Error, Request, Result, Router, Tree, + serve, Error, Request, Result, Router, }; async fn index(_: Request) -> Result<&'static str> { @@ -59,18 +58,13 @@ async fn main() -> Result<()> { .get("/:username", index) .get("/metrics", Prometheus::new(registry)) .with(otel::metrics::Config::new(&global::meter("otel"))); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + Ok(()) + // Ensure all spans have been reported // global::shutdown_tracer_provider(); // provider.shutdown(); diff --git a/examples/otel/tracing/src/main.rs b/examples/otel/tracing/src/main.rs index b53fd236..67e98116 100644 --- a/examples/otel/tracing/src/main.rs +++ b/examples/otel/tracing/src/main.rs @@ -6,9 +6,9 @@ use opentelemetry_sdk::{ runtime::TokioCurrentThread, {propagation::TraceContextPropagator, trace::Tracer}, }; -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; -use viz::{middleware::otel, serve, Request, Result, Router, Tree}; +use viz::{middleware::otel, serve, Request, Result, Router}; fn init_tracer() -> Tracer { global::set_text_map_propagator(TraceContextPropagator::new()); @@ -34,15 +34,10 @@ async fn main() -> Result<()> { .get("/", index) .get("/:username", index) .with(otel::tracing::Config::new(tracer)); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/routing/openapi/src/main.rs b/examples/routing/openapi/src/main.rs index f5b8a2eb..2a22f9b0 100644 --- a/examples/routing/openapi/src/main.rs +++ b/examples/routing/openapi/src/main.rs @@ -21,7 +21,7 @@ use viz::{ middleware, serve, types::{Json, Params, Query, State, StateError}, Error, HandlerExt, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Router, - StatusCode, Tree, + StatusCode, }; /// In-memory todo store @@ -329,15 +329,10 @@ async fn main() -> Result<(), Error> { "#, )) }); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/routing/todos/src/main.rs b/examples/routing/todos/src/main.rs index 37fdf81d..c0d2f1fc 100644 --- a/examples/routing/todos/src/main.rs +++ b/examples/routing/todos/src/main.rs @@ -11,7 +11,6 @@ use viz::{ middleware, serve, types::{Json, Params, Query, State}, Error, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Router, StatusCode, - Tree, }; /// In-memory todo store @@ -137,15 +136,10 @@ async fn main() -> Result<()> { .with(State::new(db)) // Set limits for the payload data of request .with(middleware::limits::Config::new()); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/rustls/src/main.rs b/examples/rustls/src/main.rs index 0afb6b6e..66f18741 100644 --- a/examples/rustls/src/main.rs +++ b/examples/rustls/src/main.rs @@ -1,9 +1,8 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpListener; -use viz::{get, serve, tls, Request, Result, Router, Tree}; +use viz::{get, serve, tls, Request, Result, Router}; async fn index(_: Request) -> Result<&'static str> { Ok("Hello, World!") @@ -16,7 +15,6 @@ async fn main() -> Result<()> { println!("listening on http://{addr}"); let app = Router::new().route("/", get(index)); - let tree = Arc::new(Tree::from(app)); let listener = tls::Listener::<_, tls::rustls::TlsAcceptor>::new( listener, @@ -28,13 +26,9 @@ async fn main() -> Result<()> { .into(), ); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/session/src/main.rs b/examples/session/src/main.rs index a3465036..44f87c06 100644 --- a/examples/session/src/main.rs +++ b/examples/session/src/main.rs @@ -1,7 +1,6 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use sessions::MemoryStorage; @@ -15,7 +14,7 @@ use viz::{ }, serve, types::CookieKey, - Request, RequestExt, Result, Router, Tree, + Request, RequestExt, Result, Router, }; async fn index(req: Request) -> Result<&'static str> { @@ -41,15 +40,10 @@ async fn main() -> Result<()> { CookieOptions::default(), )) .with(cookie::Config::with_key(CookieKey::generate())); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/sse/src/main.rs b/examples/sse/src/main.rs index 5eeb2984..1fc2ffcc 100644 --- a/examples/sse/src/main.rs +++ b/examples/sse/src/main.rs @@ -1,5 +1,4 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] use futures_util::StreamExt; use std::{net::SocketAddr, sync::Arc}; @@ -13,7 +12,7 @@ use viz::{ serve, types::{Event, Sse, State}, Error, HandlerExt, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Router, - StatusCode, Tree, + StatusCode, }; type ArcSystem = Arc; @@ -64,15 +63,10 @@ async fn main() -> Result<()> { let app = Router::new() .route("/", get(index)) .route("/stats", get(stats.with(State::new(sys)))); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/static-files/embed/src/main.rs b/examples/static-files/embed/src/main.rs index 8c7cdb58..e6d7283f 100644 --- a/examples/static-files/embed/src/main.rs +++ b/examples/static-files/embed/src/main.rs @@ -1,8 +1,8 @@ #![deny(warnings)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; -use viz::{handlers::embed, serve, Result, Router, StatusCode, Tree}; +use viz::{handlers::embed, serve, Result, Router, StatusCode}; #[derive(rust_embed::RustEmbed)] #[folder = "public"] @@ -21,15 +21,10 @@ async fn main() -> Result<()> { .get("/", embed::File::::new("index.html")) .get("/static/*", embed::Dir::::default()) .any("/*", |_| async { Ok(StatusCode::NOT_FOUND) }); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/static-files/include-dir/src/main.rs b/examples/static-files/include-dir/src/main.rs index 64218e76..0f330ee5 100644 --- a/examples/static-files/include-dir/src/main.rs +++ b/examples/static-files/include-dir/src/main.rs @@ -3,11 +3,10 @@ use http_body_util::Full; use include_dir::{include_dir, Dir}; -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use viz::{ serve, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Router, StatusCode, - Tree, }; const ASSETS: Dir = include_dir!("examples/static-files/include-dir/html"); // frontend dir @@ -45,11 +44,10 @@ async fn main() -> Result<()> { .any("/*", |_| async { Ok(StatusCode::NOT_FOUND) }) .get("/", index) .get("/*", assets); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(serve(stream, tree, Some(addr))); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/static-files/serve/src/main.rs b/examples/static-files/serve/src/main.rs index e34d25ac..ff197a57 100644 --- a/examples/static-files/serve/src/main.rs +++ b/examples/static-files/serve/src/main.rs @@ -1,9 +1,8 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{env, net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{env, net::SocketAddr, path::PathBuf}; use tokio::net::TcpListener; -use viz::{handlers::serve, serve, Request, Response, ResponseExt, Result, Router, Tree}; +use viz::{handlers::serve, serve, Request, Response, ResponseExt, Result, Router}; async fn index(_: Request) -> Result<&'static str> { Ok("Hello, World!") @@ -25,15 +24,10 @@ async fn main() -> Result<()> { serve::Dir::new(dir.join("../../../examples")).listing(), ) .any("/*", |_| async { Ok(Response::text("Welcome!")) }); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/static-routes/Cargo.toml b/examples/static-routes/Cargo.toml index 29f38645..b8a82479 100644 --- a/examples/static-routes/Cargo.toml +++ b/examples/static-routes/Cargo.toml @@ -7,6 +7,6 @@ publish = false [dependencies] viz.workspace = true -hyper.workspace = true +hyper.workspace = true tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } once_cell = "1.19" diff --git a/examples/static-routes/src/main.rs b/examples/static-routes/src/main.rs index de7e0012..b44b97b3 100644 --- a/examples/static-routes/src/main.rs +++ b/examples/static-routes/src/main.rs @@ -1,15 +1,16 @@ #![deny(warnings)] #![allow(clippy::unused_async)] +use hyper::server::conn::http1; use hyper::service::service_fn; use once_cell::sync::Lazy; use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpListener; + use viz::{ - server::conn::http1, types::{Params, RouteInfo}, - Body, IntoResponse, Io, Method, Request, RequestExt, Response, Result, Router, StatusCode, - Tree, + Body, Handler, IntoResponse, Io, Method, Request, RequestExt, Response, Result, Router, + StatusCode, Tree, }; /// Static Lazy Routes @@ -66,8 +67,7 @@ async fn serve(mut req: Request, mut addr: Option) -> Result::into(route.params()), })); - handler - .call(req) + Handler::call(handler, req) .await .unwrap_or_else(IntoResponse::into_response) } diff --git a/examples/templates/askama/src/main.rs b/examples/templates/askama/src/main.rs index 63cf62d8..75e3c39c 100644 --- a/examples/templates/askama/src/main.rs +++ b/examples/templates/askama/src/main.rs @@ -1,11 +1,10 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use askama::Template; use tokio::net::TcpListener; -use viz::{serve, BytesMut, Error, Request, Response, ResponseExt, Result, Router, Tree}; +use viz::{serve, BytesMut, Error, Request, Response, ResponseExt, Result, Router}; #[derive(Template)] #[template(path = "hello.html")] @@ -32,15 +31,10 @@ async fn main() -> Result<()> { println!("listening on http://{addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/templates/markup/src/main.rs b/examples/templates/markup/src/main.rs index 587343a2..9a71cf3c 100644 --- a/examples/templates/markup/src/main.rs +++ b/examples/templates/markup/src/main.rs @@ -1,11 +1,10 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] #![allow(clippy::must_use_candidate)] #![allow(clippy::inherent_to_string_shadow_display)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; -use viz::{serve, BytesMut, Request, Response, ResponseExt, Result, Router, Tree}; +use viz::{serve, BytesMut, Request, Response, ResponseExt, Result, Router}; pub struct Todo<'a> { id: u64, @@ -36,17 +35,12 @@ async fn main() -> Result<()> { println!("listening on http://{addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } markup::define! { diff --git a/examples/templates/maud/src/main.rs b/examples/templates/maud/src/main.rs index 16727200..36c4c69e 100644 --- a/examples/templates/maud/src/main.rs +++ b/examples/templates/maud/src/main.rs @@ -1,12 +1,11 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] #![allow(clippy::must_use_candidate)] #![allow(clippy::inherent_to_string_shadow_display)] use maud::{html, PreEscaped, DOCTYPE}; -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; -use viz::{serve, Request, Response, ResponseExt, Result, Router, Tree}; +use viz::{serve, Request, Response, ResponseExt, Result, Router}; pub struct Todo<'a> { id: u64, @@ -53,15 +52,10 @@ async fn main() -> Result<()> { println!("listening on http://{addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/templates/minijinja/src/main.rs b/examples/templates/minijinja/src/main.rs index 65d9467b..11e690a6 100644 --- a/examples/templates/minijinja/src/main.rs +++ b/examples/templates/minijinja/src/main.rs @@ -1,13 +1,13 @@ #![deny(warnings)] #![allow(clippy::unused_async)] -use std::{env, net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{env, net::SocketAddr, path::PathBuf}; use minijinja::{context, path_loader, Environment}; use once_cell::sync::Lazy; use serde::Serialize; use tokio::net::TcpListener; -use viz::{serve, BytesMut, Error, Request, Response, ResponseExt, Result, Router, Tree}; +use viz::{serve, BytesMut, Error, Request, Response, ResponseExt, Result, Router}; static TPLS: Lazy = Lazy::new(|| { let dir = env::var("CARGO_MANIFEST_DIR").map(PathBuf::from).unwrap(); @@ -54,15 +54,10 @@ async fn main() -> Result<()> { println!("listening on http://{addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/templates/tera/src/main.rs b/examples/templates/tera/src/main.rs index dc74c56c..788a8849 100644 --- a/examples/templates/tera/src/main.rs +++ b/examples/templates/tera/src/main.rs @@ -1,13 +1,12 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use once_cell::sync::Lazy; use serde::Serialize; use tera::{Context, Tera}; use tokio::net::TcpListener; -use viz::{serve, BytesMut, Error, Request, Response, ResponseExt, Result, Router, Tree}; +use viz::{serve, BytesMut, Error, Request, Response, ResponseExt, Result, Router}; static TPLS: Lazy = Lazy::new(|| Tera::new("examples/templates/tera/templates/**/*").unwrap()); @@ -51,15 +50,10 @@ async fn main() -> Result<()> { println!("listening on http://{addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/tower/src/main.rs b/examples/tower/src/main.rs index 6371445a..d1ad350f 100644 --- a/examples/tower/src/main.rs +++ b/examples/tower/src/main.rs @@ -1,6 +1,6 @@ //! Viz + Tower services -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use tower::{ service_fn, @@ -13,7 +13,7 @@ use tower_http::{ trace::TraceLayer, }; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use viz::{serve, Body, Error, IntoResponse, Request, Response, Result, Router, Tree}; +use viz::{serve, Body, Error, IntoResponse, Request, Response, Result, Router}; use viz_tower::{Layered, ServiceHandler}; async fn index(_: Request) -> Result { @@ -61,15 +61,14 @@ async fn main() -> Result<()> { .get("/about", about) .any("/*", any_handler) .with(Layered::new(layer)); - let tree = Arc::new(Tree::from(app)); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = TcpListener::bind(addr).await?; println!("listening on http://{addr}"); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(serve(stream, tree, Some(addr))); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/examples/tracing/src/main.rs b/examples/tracing/src/main.rs index cdcbdc6c..36473542 100644 --- a/examples/tracing/src/main.rs +++ b/examples/tracing/src/main.rs @@ -1,11 +1,10 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use tracing::{debug, error, info, instrument}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use viz::{serve, Request, RequestExt, Result, Router, Tree}; +use viz::{serve, Request, RequestExt, Result, Router}; #[instrument] async fn index(req: Request) -> Result<&'static str> { @@ -28,15 +27,10 @@ async fn main() -> Result<()> { info!("listening on http://{addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, Some(addr)).await { - error!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + error!("{e}"); } + + Ok(()) } diff --git a/examples/unix-socket/src/main.rs b/examples/unix-socket/src/main.rs index 76b88e81..1294a455 100644 --- a/examples/unix-socket/src/main.rs +++ b/examples/unix-socket/src/main.rs @@ -4,15 +4,12 @@ //! curl --unix-socket /tmp/viz.sock http://localhost/ //! ``` #![deny(warnings)] -#![allow(clippy::unused_async)] #[cfg(unix)] #[tokio::main] async fn main() -> viz::Result<()> { - use std::sync::Arc; - use tokio::net::UnixListener; - use viz::{get, serve, IntoHandler, Result, Router, Tree}; + use viz::{get, serve, IntoHandler, Result, Router}; async fn index() -> Result<&'static str> { Ok("Hello world!") @@ -24,17 +21,12 @@ async fn main() -> viz::Result<()> { let listener = UnixListener::bind(path)?; let app = Router::new().route("/", get(index.into_handler())); - let tree = Arc::new(Tree::from(app)); - - loop { - let (stream, _) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve(stream, tree, None).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } #[cfg(not(unix))] diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 01ec3798..888fba6b 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -1,15 +1,14 @@ #![deny(warnings)] -#![allow(clippy::unused_async)] use futures_util::{SinkExt, StreamExt}; -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; use tokio::sync::broadcast::{channel, Sender}; use viz::{ - get, serve_with_upgrades, + get, serve, types::{Message, Params, State, WebSocket}, HandlerExt, IntoHandler, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, - Router, Tree, + Router, }; async fn index() -> Result { @@ -61,15 +60,10 @@ async fn main() -> Result<()> { let app = Router::new() .route("/", get(index.into_handler())) .route("/ws/:name", get(ws.with(State::new(channel.0)))); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(async move { - if let Err(err) = serve_with_upgrades(stream, tree, Some(addr)).await { - eprintln!("Error while serving HTTP connection: {err}"); - } - }); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } diff --git a/viz-handlers/src/embed.rs b/viz-handlers/src/embed.rs index 37089ca6..a9c80971 100644 --- a/viz-handlers/src/embed.rs +++ b/viz-handlers/src/embed.rs @@ -71,6 +71,7 @@ where } } +#[allow(clippy::unused_async)] async fn serve(path: String, req: Request) -> Result where E: RustEmbed + Send + Sync + 'static, diff --git a/viz-test/src/lib.rs b/viz-test/src/lib.rs index 99de9f2d..20764f3f 100644 --- a/viz-test/src/lib.rs +++ b/viz-test/src/lib.rs @@ -1,7 +1,7 @@ use reqwest::Client; -use std::net::SocketAddr; +use std::{future::IntoFuture, net::SocketAddr}; use tokio::net::TcpListener; -use viz::{serve, Error, Result, Router, Tree}; +use viz::{serve, Error, Result, Router}; pub use http; pub use nano_id; @@ -22,14 +22,13 @@ impl TestServer { /// Will return `Err` if the server fails to start. pub async fn new(router: Router) -> Result { let listener = TcpListener::bind("127.0.0.1:0").await?; - let tree = Tree::from(router); let addr = listener.local_addr()?; let client = reqwest::Client::builder() .redirect(reqwest::redirect::Policy::none()) .build() .map_err(Error::boxed)?; - tokio::spawn(run(listener, tree)); + tokio::spawn(serve(listener, router).into_future()); Ok(Self { addr, client }) } @@ -59,11 +58,3 @@ impl TestServer { self.client.put(self.path(url)) } } - -async fn run(listener: TcpListener, tree: Tree) -> Result<()> { - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(serve(stream, tree, Some(addr))); - } -} diff --git a/viz/Cargo.toml b/viz/Cargo.toml index 08bddc94..4ecf8f32 100644 --- a/viz/Cargo.toml +++ b/viz/Cargo.toml @@ -81,7 +81,8 @@ rustls-pemfile = { workspace = true, optional = true } tokio-native-tls = { workspace = true, optional = true } tokio-rustls = { workspace = true, optional = true } -tokio.workspace = true +tokio = { workspace = true, features = ["macros"] } +tracing.workspace = true [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] } diff --git a/viz/README.md b/viz/README.md index f8ff5042..c86a1f87 100644 --- a/viz/README.md +++ b/viz/README.md @@ -49,14 +49,14 @@ - Simple + Flexible `Handler` & `Middleware` -- Supports: Tower `Service` +- Supports Tower `Service` ## Hello Viz ```rust -use std::{net::SocketAddr, sync::Arc}; +use std::net::SocketAddr; use tokio::net::TcpListener; -use viz::{serve, Request, Result, Router, Tree}; +use viz::{serve, Request, Result, Router}; async fn index(_: Request) -> Result<&'static str> { Ok("Hello, Viz!") @@ -69,13 +69,12 @@ async fn main() -> Result<()> { println!("listening on http://{addr}"); let app = Router::new().get("/", index); - let tree = Arc::new(Tree::from(app)); - loop { - let (stream, addr) = listener.accept().await?; - let tree = tree.clone(); - tokio::task::spawn(serve(stream, tree, Some(addr))); + if let Err(e) = serve(listener, app).await { + println!("{e}"); } + + Ok(()) } ``` diff --git a/viz/src/lib.rs b/viz/src/lib.rs index 0ede8fb9..303449e9 100644 --- a/viz/src/lib.rs +++ b/viz/src/lib.rs @@ -14,12 +14,14 @@ //! //! * Robust [`Routing`](#routing) //! +//! * Supports Tower [`Service`] +//! //! # Hello Viz //! //! ```no_run -//! use std::{net::SocketAddr, sync::Arc}; +//! use std::net::SocketAddr; //! use tokio::net::TcpListener; -//! use viz::{serve, Request, Result, Router, Tree}; +//! use viz::{serve, Request, Result, Router}; //! //! async fn index(_: Request) -> Result<&'static str> { //! Ok("Hello, Viz!") @@ -32,13 +34,12 @@ //! println!("listening on http://{addr}"); //! //! let app = Router::new().get("/", index); -//! let tree = Arc::new(Tree::from(app)); //! -//! loop { -//! let (stream, addr) = listener.accept().await?; -//! let tree = tree.clone(); -//! tokio::task::spawn(serve(stream, tree, Some(addr))); +//! if let Err(e) = serve(listener, app).await { +//! println!("{e}"); //! } +//! +//! Ok(()) //! } //! ``` //! @@ -513,6 +514,7 @@ //! //! [`FutureExt`]: https://docs.rs/futures/latest/futures/future/trait.FutureExt.html //! [`StreamExt`]: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html +//! [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html #![doc(html_logo_url = "https://viz.rs/logo.svg")] #![doc(html_favicon_url = "https://viz.rs/logo.svg")] @@ -529,7 +531,7 @@ pub use responder::Responder; #[cfg(any(feature = "http1", feature = "http2"))] mod server; #[cfg(any(feature = "http1", feature = "http2"))] -pub use server::*; +pub use server::{serve, Accept, Server}; /// TLS #[cfg(any(feature = "native_tls", feature = "rustls"))] @@ -540,8 +542,6 @@ pub mod tls; #[doc(inline)] pub use viz_handlers as handlers; -#[cfg(any(feature = "http1", feature = "http2"))] -// pub use hyper::server; #[cfg(feature = "macros")] #[cfg_attr(docsrs, doc(cfg(feature = "macros")))] #[doc(inline)] diff --git a/viz/src/responder.rs b/viz/src/responder.rs index 415c9bae..a1138dcf 100644 --- a/viz/src/responder.rs +++ b/viz/src/responder.rs @@ -28,20 +28,19 @@ where type Output = Result; fn call(&self, mut req: Request) -> BoxFuture { - let Self { remote_addr, tree } = self; let method = req.method().clone(); let path = req.uri().path().to_string(); - let matched = tree.find(&method, &path).or_else(|| { + let matched = self.tree.find(&method, &path).or_else(|| { if method == Method::HEAD { - tree.find(&Method::GET, &path) + self.tree.find(&Method::GET, &path) } else { None } }); if let Some((handler, route)) = matched { - req.extensions_mut().insert(remote_addr.clone()); + req.extensions_mut().insert(self.remote_addr.clone()); req.extensions_mut().insert(Arc::from(RouteInfo { id: *route.id, pattern: route.pattern(), @@ -73,7 +72,8 @@ where } } -#[inline(always)] +#[allow(clippy::unused_async)] +#[inline] async fn not_found() -> Result { Ok(StatusCode::NOT_FOUND.into_response()) } diff --git a/viz/src/server.rs b/viz/src/server.rs index 73ca7b53..ac8fc39f 100644 --- a/viz/src/server.rs +++ b/viz/src/server.rs @@ -1,76 +1,190 @@ -use std::future::{Future, IntoFuture}; -use std::io::Result; +use std::{ + fmt::Debug, + future::{pending, Future, IntoFuture, Pending}, + io, + sync::Arc, +}; -use futures_util::{pin_mut, TryFutureExt}; +use futures_util::FutureExt; use hyper_util::{ rt::{TokioExecutor, TokioIo}, server::conn::auto::Builder, }; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + pin, select, + sync::watch, +}; -use crate::{BoxFuture, Responder, Tree}; +use crate::{BoxFuture, Responder, Router, Tree}; mod accept; pub use accept::Accept; +#[cfg(any(feature = "http1", feature = "http2"))] mod tcp; -pub use tcp::*; +#[cfg(feature = "unix-socket")] mod unix; -pub use unix::*; -pub struct Server { +/// Starts a server and serves the connections. +pub fn serve(listener: L, router: Router) -> Server +where + L: Accept + Send + 'static, + L::Stream: AsyncWrite + AsyncRead + Send + Unpin, + L::Addr: Send + Sync + Debug + 'static, +{ + Server::::new(listener, router.into()) +} + +/// A listening HTTP server that accepts connections. +#[derive(Debug)] +pub struct Server> { + signal: F, tree: Tree, listener: L, builder: Builder, } -impl Server { - pub fn listener(&self) -> &L { - &self.listener +impl Server { + /// Starts a [`Server`] with a listener and a [`Tree`]. + pub fn new(listener: L, router: Router) -> Server { + Server { + listener, + signal: pending(), + tree: router.into(), + builder: Builder::new(TokioExecutor::new()), + } + } + + /// Changes the signal for graceful shutdown. + pub fn signal(self, signal: T) -> Server { + Server { + signal, + tree: self.tree, + builder: self.builder, + listener: self.listener, + } } + /// Returns the HTTP1 or HTTP2 connection builder. pub fn builder(&mut self) -> &mut Builder { &mut self.builder } } -impl Server { - pub fn new(listener: L, tree: Tree) -> Self { - Self { - tree, - listener, - builder: Builder::new(TokioExecutor::new()), - } - } -} - -impl IntoFuture for Server +/// Copied from Axum. Thanks. +impl IntoFuture for Server where L: Accept + Send + 'static, - L::Conn: AsyncWrite + AsyncRead + Send + Unpin, - L::Addr: Clone + Send + Sync + 'static, + L::Stream: AsyncWrite + AsyncRead + Send + Unpin, + L::Addr: Send + Sync + Debug + 'static, + F: Future + Send + 'static, { - type Output = Result<()>; + type Output = io::Result<()>; type IntoFuture = BoxFuture; fn into_future(self) -> Self::IntoFuture { let Self { tree, - listener, + signal, builder, + listener, } = self; + + let (shutdown_tx, shutdown_rx) = watch::channel(()); + let shutdown_tx = Arc::new(shutdown_tx); + + tokio::spawn(async move { + signal.await; + tracing::trace!("received graceful shutdown signal"); + drop(shutdown_rx); + }); + + let (close_tx, close_rx) = watch::channel(()); + Box::pin(async move { loop { - let (stream, remote_addr) = listener.accept().await?; + let (stream, remote_addr) = select! { + res = listener.accept() => { + match res { + Ok(conn) => conn, + Err(e) => { + if !is_connection_error(&e) { + // [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186) + tracing::error!("listener accept error: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + continue + } + } + } + () = shutdown_tx.closed() => { + tracing::trace!("server is closing"); + break; + } + }; + + tracing::trace!("connection {:?} accepted", remote_addr); + let io = TokioIo::new(stream); + let remote_addr = Arc::new(remote_addr); let builder = builder.clone(); - let responder = Responder::::new(tree.clone(), Some(remote_addr)); + let responder = + Responder::>::new(tree.clone(), Some(remote_addr.clone())); + + let shutdown_tx = Arc::clone(&shutdown_tx); + let close_rx = close_rx.clone(); tokio::spawn(async move { - if let Err(_) = builder.serve_connection(io, responder).await {} + let conn = builder.serve_connection_with_upgrades(io, responder); + pin!(conn); + + let shutdown = shutdown_tx.closed().fuse(); + pin!(shutdown); + + loop { + select! { + res = conn.as_mut() => { + if let Err(e) = res { + tracing::error!("connection failed: {e}"); + } + break; + } + () = &mut shutdown => { + tracing::trace!("connection is starting to graceful shutdown"); + conn.as_mut().graceful_shutdown(); + } + } + } + + tracing::trace!("connection {:?} closed", remote_addr); + + drop(close_rx); }); } + + drop(close_rx); + drop(listener); + + tracing::trace!( + "waiting for {} task(s) to finish", + close_tx.receiver_count() + ); + close_tx.closed().await; + + tracing::trace!("server shutdown complete"); + + Ok(()) }) } } + +fn is_connection_error(e: &io::Error) -> bool { + matches!( + e.kind(), + io::ErrorKind::ConnectionRefused + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + ) +} diff --git a/viz/src/server/accept.rs b/viz/src/server/accept.rs index 9111abef..8a5dea0b 100644 --- a/viz/src/server/accept.rs +++ b/viz/src/server/accept.rs @@ -1,9 +1,14 @@ -use std::future::Future; -use std::io::Result; +//! The `Accept` trait and supporting types. +use std::{future::Future, io::Result}; + +/// Asynchronously accept incoming connections. pub trait Accept { - type Conn; + /// An accepted stream of the connection. + type Stream; + /// An accepted remote address of the connection. type Addr; - fn accept(&self) -> impl Future> + Send; + /// Accepts a new incoming connection from this listener. + fn accept(&self) -> impl Future> + Send; } diff --git a/viz/src/server/tcp.rs b/viz/src/server/tcp.rs index ac072b64..b78fa75a 100644 --- a/viz/src/server/tcp.rs +++ b/viz/src/server/tcp.rs @@ -5,10 +5,10 @@ use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; impl super::Accept for TcpListener { - type Conn = TcpStream; + type Stream = TcpStream; type Addr = SocketAddr; - fn accept(&self) -> impl Future> + Send { + fn accept(&self) -> impl Future> + Send { TcpListener::accept(self) } } diff --git a/viz/src/server/unix.rs b/viz/src/server/unix.rs index 1fe0f942..9f658d94 100644 --- a/viz/src/server/unix.rs +++ b/viz/src/server/unix.rs @@ -4,10 +4,10 @@ use std::io::Result; use tokio::net::{unix::SocketAddr, UnixListener, UnixStream}; impl super::Accept for UnixListener { - type Conn = UnixStream; + type Stream = UnixStream; type Addr = SocketAddr; - fn accept(&self) -> impl Future> + Send { + fn accept(&self) -> impl Future> + Send { UnixListener::accept(self) } } diff --git a/viz/src/tls/listener.rs b/viz/src/tls/listener.rs index c3d36c1c..7227d7c2 100644 --- a/viz/src/tls/listener.rs +++ b/viz/src/tls/listener.rs @@ -1,5 +1,4 @@ /// Unified TLS listener type. -#[allow(dead_code)] #[derive(Debug)] pub struct Listener { pub(crate) inner: T, diff --git a/viz/src/tls/native_tls.rs b/viz/src/tls/native_tls.rs index 6c335d3a..93a51d49 100644 --- a/viz/src/tls/native_tls.rs +++ b/viz/src/tls/native_tls.rs @@ -43,15 +43,10 @@ impl Config { } impl crate::Accept for Listener { - type Conn = TlsStream; + type Stream = TlsStream; type Addr = SocketAddr; - /// A [`TlsStream`] and [`SocketAddr`] part for accepting TLS. - /// - /// # Errors - /// - /// Will return `Err` if accepting the stream fails. - async fn accept(&self) -> std::io::Result<(Self::Conn, Self::Addr)> { + async fn accept(&self) -> std::io::Result<(Self::Stream, Self::Addr)> { let (stream, addr) = self.inner.accept().await?; let tls_stream = self .acceptor diff --git a/viz/src/tls/rustls.rs b/viz/src/tls/rustls.rs index 168d2461..a2b6033e 100644 --- a/viz/src/tls/rustls.rs +++ b/viz/src/tls/rustls.rs @@ -150,15 +150,10 @@ impl Config { } impl crate::Accept for Listener { - type Conn = TlsStream; + type Stream = TlsStream; type Addr = SocketAddr; - /// A [`TlsStream`] and [`SocketAddr`] part for accepting TLS. - /// - /// # Errors - /// - /// Will return `Err` if accepting the stream fails. - async fn accept(&self) -> std::io::Result<(Self::Conn, Self::Addr)> { + async fn accept(&self) -> std::io::Result<(Self::Stream, Self::Addr)> { let (stream, addr) = self.inner.accept().await?; let tls_stream = self.acceptor.accept(stream).await?; Ok((tls_stream, addr))