Skip to content

Commit

Permalink
feat(core): Graceful shutdown and refactor Handler (#126)
Browse files Browse the repository at this point in the history
* feat(core): remove Sync bounds on Handler trait

* chore: MapErr

* chore: MapIntoResponse

* refactor: remove async_trait on Handler

* feat: remove dyn-clone

* feat: remove redundant 'static

* feat: remove redundant 'static

* chore(core): short form

* chore(tower): improve

* chore(core): improve hyper Service

* chore(viz): implement Handler for Responder

* fix: remove Arc on Tree

* feat(viz): add Server

* chore(viz): remove serve

* feat(viz): implement Accept for tls

* chore(viz): cfg rustls and native_tls

* chore(viz): add not_found

* fix: examples

* fix: useless into

* fix: make Cloneable pub crate

* docs: TryHandler

* fix(doc): links

* chore: revert Sync

* chore: revert Sync

* chore: revert lint

* chore: revert async_trait

* fix: doc tests

* chore: revert static-routes

* fix: boxed bounds

* fix: FnExt bounds

* chore(core): remove redundance bounds on MapErr

* chore(core): remove redundance bounds on Map

* chore(core): remove redundance bounds on Either

* chore(core): remove redundance bounds on CatchUnwind

* chore(core): improve

* chore(core): remove redundance allow attrs

* chore(core): remove redundance bounds on Cookie Middleware

* chore(core): remove redundance bounds on Cors middleware

* chore(core): remove redundance bounds on Cors middleware

* chore(core): remove redundance scope on Csrf middleware

* chore(core): remove redundance bounds on Limits middleware

* chore(core): remove redundance bounds on tests

* chore(core): remove redundance bounds on Router

* chore(core): remove redundance bounds on Route

* chore(core): remove redundance bounds on Resources

* chore(core): remove redundance bounds on Cloneable

* chore(core): remove redundance bounds on docs

* chore(viz): use tokio_util Listener

* fix(ci): coverage
  • Loading branch information
fundon authored Jan 1, 2024
1 parent 49fcdd9 commit d663b55
Show file tree
Hide file tree
Showing 99 changed files with 855 additions and 833 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ jobs:
coverage:
name: Coverage
runs-on: ubuntu-latest
env:
RUSTFLAGS: --cfg hyper_unstable_tracing
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ viz-tower = { version = "0.1.0", path = "viz-tower" }
anyhow = "1.0"
async-trait = "0.1"
bytes = "1.5"
dyn-clone = "1.0"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
Expand All @@ -67,7 +66,7 @@ sync_wrapper = "0.1.2"
thiserror = "1.0"

# router
path-tree = "0.7"
path-tree = "0.7.3"

# http
headers = "0.4"
Expand Down
18 changes: 6 additions & 12 deletions examples/compression/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use tokio::net::TcpListener;

use viz::{get, middleware::compression, serve, Request, Result, Router, Tree};
use viz::{get, middleware::compression, serve, Request, Result, Router};

async fn index(_req: Request) -> Result<&'static str> {
Ok("Hello, World!")
Expand All @@ -19,15 +18,10 @@ async fn main() -> Result<()> {
let app = Router::new()
.route("/", get(index))
.with(compression::Config);
let tree = Arc::new(Tree::from(app));

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(async move {
if let Err(err) = serve(stream, tree, Some(addr)).await {
eprintln!("Error while serving HTTP connection: {err}");
}
});
if let Err(e) = serve(listener, app).await {
println!("{e}");
}

Ok(())
}
20 changes: 7 additions & 13 deletions examples/cors/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use viz::{get, middleware::cors, serve, Method, Request, Result, Router, Tree};
use viz::{get, middleware::cors, serve, Method, Request, Result, Router};

async fn index(_req: Request) -> Result<&'static str> {
Ok("Hello, World!")
Expand All @@ -27,15 +26,10 @@ async fn main() -> Result<()> {
.route("/", get(index).options(options))
// .with(cors::Config::default()); // Default CORS config
.with(custom_cors); // Our custom CORS config
let tree = Arc::new(Tree::from(app));

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(async move {
if let Err(err) = serve(stream, tree, Some(addr)).await {
eprintln!("Error while serving HTTP connection: {err}");
}
});

if let Err(e) = serve(listener, app).await {
println!("{e}");
}

Ok(())
}
20 changes: 7 additions & 13 deletions examples/csrf/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{net::SocketAddr, time::Duration};
use tokio::net::TcpListener;

use viz::{
Expand All @@ -10,7 +9,7 @@ use viz::{
csrf::{self, CsrfToken},
helper::CookieOptions,
},
serve, Method, Request, RequestExt, Result, Router, Tree,
serve, Method, Request, RequestExt, Result, Router,
};

async fn index(mut req: Request) -> Result<String> {
Expand Down Expand Up @@ -39,15 +38,10 @@ async fn main() -> Result<()> {
csrf::verify,
))
.with(cookie::Config::default());
let tree = Arc::new(Tree::from(app));

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(async move {
if let Err(err) = serve(stream, tree, Some(addr)).await {
eprintln!("Error while serving HTTP connection: {err}");
}
});

if let Err(e) = serve(listener, app).await {
println!("{e}");
}

Ok(())
}
14 changes: 6 additions & 8 deletions examples/databases/sea-orm/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

//! `SeaOrm` example for Viz framework.
use sea_orm_example::{api, db::init_db};
use std::{env, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{env, net::SocketAddr, path::PathBuf};
use tokio::net::TcpListener;
use viz::{handlers::serve, middleware, serve, types::State, Result, Router, Tree};
use viz::{handlers::serve, middleware, serve, types::State, Result, Router};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -26,11 +25,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.delete("/todos/:id", api::delete)
.with(State::new(db))
.with(middleware::limits::Config::new());
let tree = Arc::new(Tree::from(app));

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(serve(stream, tree, Some(addr)));
if let Err(e) = serve(listener, app).await {
println!("{e}");
}

Ok(())
}
20 changes: 7 additions & 13 deletions examples/forms/form/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use viz::{
get, middleware::limits, serve, types::Form, IntoHandler, Request, Response, ResponseExt,
Result, Router, Tree,
Result, Router,
};

#[derive(Deserialize, Serialize)]
Expand Down Expand Up @@ -35,15 +34,10 @@ async fn main() -> Result<()> {
.route("/", get(new).post(create.into_handler()))
// limit body size
.with(limits::Config::default());
let tree = Arc::new(Tree::from(app));

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(async move {
if let Err(err) = serve(stream, tree, Some(addr)).await {
eprintln!("Error while serving HTTP connection: {err}");
}
});

if let Err(e) = serve(listener, app).await {
println!("{e}");
}

Ok(())
}
18 changes: 6 additions & 12 deletions examples/forms/multipart/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

use futures_util::TryStreamExt;
use std::{fs::File, net::SocketAddr, sync::Arc};
use std::{fs::File, net::SocketAddr};
use tempfile::tempdir;
use tokio::net::TcpListener;
use viz::{
middleware::limits,
serve,
types::{Multipart, PayloadError},
IntoHandler, IntoResponse, Request, Response, ResponseExt, Result, Router, Tree,
IntoHandler, IntoResponse, Request, Response, ResponseExt, Result, Router,
};

// HTML form for uploading photos
Expand Down Expand Up @@ -54,15 +53,10 @@ async fn main() -> Result<()> {
.post("/", upload.into_handler())
// limit body size
.with(limits::Config::default());
let tree = Arc::new(Tree::from(app));

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(async move {
if let Err(err) = serve(stream, tree, Some(addr)).await {
eprintln!("Error while serving HTTP connection: {err}");
}
});
if let Err(e) = serve(listener, app).await {
println!("{e}");
}

Ok(())
}
4 changes: 3 additions & 1 deletion examples/graceful-shutdown/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ publish = false
[dependencies]
viz.workspace = true

tokio = { workspace = true, features = ["rt-multi-thread", "macros", "time"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
92 changes: 42 additions & 50 deletions examples/graceful-shutdown/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,61 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

//! Graceful shutdown server.
//!
//! See <https://github.com/hyperium/hyper/blob/master/examples/graceful_shutdown.rs>
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{net::TcpListener, pin};
use viz::{server::conn::http1, Io, Request, Responder, Result, Router, Tree};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tokio::signal;
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use viz::{serve, Request, Result, Router};

async fn index(_: Request) -> Result<&'static str> {
Ok("Hello, World!")
}

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "tracing=debug,hyper=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await?;
println!("listening on {addr}");
info!("listening on {addr}");

let app = Router::new().get("/", index);
let tree = Arc::new(Tree::from(app));

// Use a 5 second timeout for incoming connections to the server.
// If a request is in progress when the 5 second timeout elapses,
// use a 2 second timeout for processing the final request and graceful shutdown.
let connection_timeouts = vec![Duration::from_secs(5), Duration::from_secs(2)];

loop {
// Clone the connection_timeouts so they can be passed to the new task.
let connection_timeouts_clone = connection_timeouts.clone();
let server = serve(listener, app).signal(shutdown_signal());

let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
if let Err(e) = server.await {
error!("{e}");
}

tokio::task::spawn(async move {
// Pin the connection object so we can use tokio::select! below.
let conn = http1::Builder::new()
.serve_connection(Io::new(stream), Responder::new(tree, Some(addr)));
pin!(conn);
Ok(())
}

// Iterate the timeouts. Use tokio::select! to wait on the
// result of polling the connection itself,
// and also on tokio::time::sleep for the current timeout duration.
for (iter, sleep_duration) in connection_timeouts_clone.iter().enumerate() {
println!("iter = {iter} sleep_duration = {sleep_duration:?}");
tokio::select! {
res = conn.as_mut() => {
// Polling the connection returned a result.
// In this case print either the successful or error result for the connection
// and break out of the loop.
match res {
Ok(()) => println!("after polling conn, no error"),
Err(e) => println!("error serving connection: {e:?}"),
};
break;
}
() = tokio::time::sleep(*sleep_duration) => {
// tokio::time::sleep returned a result.
// Call graceful_shutdown on the connection and continue the loop.
println!("iter = {iter} got timeout_interval, calling conn.graceful_shutdown");
conn.as_mut().graceful_shutdown();
}
}
}
});
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}
26 changes: 14 additions & 12 deletions examples/hello-world/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
#![deny(warnings)]
#![allow(clippy::unused_async)]

use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, str::FromStr};
use tokio::net::TcpListener;
use viz::{serve, Request, Result, Router, Tree};
use viz::{serve, Request, Result, Router};

async fn index(_: Request) -> Result<&'static str> {
Ok("Hello, World!")
async fn index(_: Request) -> Result<String> {
Ok(String::from("Hello, World!"))
}

#[tokio::main]
async fn main() -> Result<()> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let addr = SocketAddr::from_str("[::1]:3000").unwrap();
let listener = TcpListener::bind(addr).await?;
println!("listening on http://{addr}");

let app = Router::new().get("/", index);
let tree = Arc::new(Tree::from(app));
let mut app = Router::new().get("/", |_| async { Ok("Hello, World!") });

loop {
let (stream, addr) = listener.accept().await?;
let tree = tree.clone();
tokio::task::spawn(serve(stream, tree, Some(addr)));
for n in 0..1000 {
app = app.get(&format!("/{}", n), index);
}

if let Err(e) = serve(listener, app).await {
println!("{e}");
}

Ok(())
}
Loading

0 comments on commit d663b55

Please sign in to comment.