Skip to content

Commit

Permalink
feat: support smol runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
fundon committed Jan 3, 2024
1 parent 98ecceb commit ed275ef
Show file tree
Hide file tree
Showing 18 changed files with 435 additions and 178 deletions.
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"examples/databases/*",
"examples/htmlx",
"examples/tower",
"examples/smol",
]

[workspace.package]
Expand Down Expand Up @@ -113,6 +114,14 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower = "0.4"
tower-http = "0.5"

# soml
async-channel = "2.1"
async-executor = "1.8"
async-io = "2.2"
async-net = "2.0"
smol-hyper = "0.1.1"
futures-lite = { version = "2.1.0", default-features = false, features = ["std"] }

[workspace.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
16 changes: 16 additions & 0 deletions examples/smol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "smol-example"
version = "0.1.0"
edition.workspace = true
publish = false

[dependencies]
viz = { workspace = true, features = ["smol"] }

# smol
async-executor = "1.8"
async-io = "2.2"
async-net = "2.0"
smol-hyper = "0.1.1"
smol-macros = "0.1"
macro_rules_attribute = "0.2"
23 changes: 23 additions & 0 deletions examples/smol/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::io;
use std::sync::Arc;

use async_net::TcpListener;
use macro_rules_attribute::apply;
use viz::{IntoResponse, Request, Response, Result, Router};

#[apply(smol_macros::main!)]
async fn main(ex: &Arc<smol_macros::Executor<'_>>) -> io::Result<()> {
// Build our application with a route.
let app = Router::new().get("/", handler);

// Create a `smol`-based TCP listener.
let listener = TcpListener::bind(("127.0.0.1", 3000)).await.unwrap();
println!("listening on {}", listener.local_addr().unwrap());

// Run it
viz::serve(ex.clone(), listener, app).await
}

async fn handler(_: Request) -> Result<Response> {
Ok("<h1>Hello, World!</h1>".into_response())
}
17 changes: 17 additions & 0 deletions viz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ otel-prometheus = ["handlers", "viz-handlers?/prometheus"]
rustls = ["dep:rustls-pemfile", "dep:tokio-rustls", "dep:futures-util"]
native-tls = ["dep:tokio-native-tls", "dep:futures-util"]

smol = [
# "dep:async-channel",
"dep:async-executor",
# "dep:async-io",
"dep:async-net",
"dep:smol-hyper",
"dep:futures-lite"
]

[dependencies]
viz-core.workspace = true
viz-router.workspace = true
Expand All @@ -86,6 +95,14 @@ tokio = { workspace = true, features = ["macros"] }
tokio-util = { workspace = true, features = ["net"] }
tracing.workspace = true

# smol
# async-channel = { workspace = true, optional = true }
async-executor = { workspace = true, optional = true }
# async-io = { workspace = true, optional = true }
async-net = { workspace = true, optional = true }
smol-hyper = { workspace = true, optional = true }
futures-lite = { workspace = true, optional = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] }

Expand Down
3 changes: 1 addition & 2 deletions viz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,8 @@ pub use responder::Responder;
mod server;
pub use server::{serve, Listener, Server};

/// TLS
#[cfg(any(feature = "native_tls", feature = "rustls"))]
pub mod tls;
pub use server::tls;

pub use viz_core::*;
pub use viz_router::*;
Expand Down
199 changes: 32 additions & 167 deletions viz/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,188 +1,53 @@
use std::{
fmt::Debug,
future::{pending, Future, IntoFuture, Pending},
io,
pin::Pin,
sync::Arc,
};

use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder,
};
use tokio::{pin, select, sync::watch};

use crate::{future::FutureExt, Responder, Router, Tree};
use std::{fmt::Debug, future::Pending};

mod listener;
pub use listener::Listener;

#[cfg(any(feature = "http1", feature = "http2"))]
mod tcp;
#[cfg(not(feature = "smol"))]
mod tokio;
#[cfg(not(feature = "smol"))]
pub use self::tokio::serve;

#[cfg(feature = "smol")]
mod smol;
#[cfg(feature = "smol")]
pub use self::smol::serve;

#[cfg(any(feature = "native_tls", feature = "rustls"))]
#[path = "server/tls.rs"]
pub(super) mod internal;

/// TLS
#[cfg(any(feature = "native_tls", feature = "rustls"))]
pub mod tls {
pub use super::internal::*;

#[cfg(all(unix, feature = "unix-socket"))]
mod unix;
#[cfg(not(feature = "smol"))]
pub use super::tokio::tls::*;

/// Starts a server and serves the connections.
pub fn serve<L>(listener: L, router: Router) -> Server<L>
where
L: Listener + Send + 'static,
L::Io: Send + Unpin,
L::Addr: Send + Sync + Debug,
{
Server::<L>::new(listener, router)
#[cfg(feature = "smol")]
pub use super::smol::tls::*;
}

/// A listening HTTP server that accepts connections.
#[derive(Debug)]
pub struct Server<L, E = TokioExecutor, F = Pending<()>> {
signal: F,
tree: Tree,
pub struct Server<L, E, F, S = Pending<()>> {
signal: S,
tree: crate::Tree,
executor: E,
listener: L,
builder: Builder<E>,
build: F,
}

impl<L, E, F> Server<L, E, F> {
/// Starts a [`Server`] with a listener and a [`Tree`].
pub fn new(listener: L, router: Router) -> Server<L> {
Server {
listener,
signal: pending(),
tree: router.into(),
builder: Builder::new(TokioExecutor::new()),
}
}

impl<L, E, F, S> Server<L, E, F, S> {
/// Changes the signal for graceful shutdown.
pub fn signal<T>(self, signal: T) -> Server<L, E, T> {
pub fn signal<X>(self, signal: X) -> Server<L, E, F, X> {
Server {
signal,
tree: self.tree,
builder: self.builder,
build: self.build,
executor: self.executor,
listener: self.listener,
}
}

/// Returns the HTTP1 or HTTP2 connection builder.
pub fn builder(&mut self) -> &mut Builder<E> {
&mut self.builder
}
}

/// Copied from Axum. Thanks.
impl<L, F> IntoFuture for Server<L, TokioExecutor, F>
where
L: Listener + Send + 'static,
L::Io: Send + Unpin,
L::Addr: Send + Sync + Debug,
F: Future + Send + 'static,
{
type Output = io::Result<()>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let Self {
tree,
signal,
builder,
listener,
} = self;

let (shutdown_tx, shutdown_rx) = watch::channel(());
let shutdown_tx = Arc::new(shutdown_tx);

tokio::spawn(async move {
signal.await;
tracing::trace!("received graceful shutdown signal");
drop(shutdown_rx);
});

let (close_tx, close_rx) = watch::channel(());

let tree = Arc::new(tree);

Box::pin(async move {
loop {
let (stream, remote_addr) = select! {
res = listener.accept() => {
match res {
Ok(conn) => conn,
Err(e) => {
if !is_connection_error(&e) {
// [From `hyper::Server` in 0.14](https://github.com/hyperium/hyper/blob/v0.14.27/src/server/tcp.rs#L186)
tracing::error!("listener accept error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
continue
}
}
}
() = shutdown_tx.closed() => {
tracing::trace!("server is closing");
break;
}
};

tracing::trace!("connection {:?} accepted", remote_addr);

let io = TokioIo::new(stream);
let remote_addr = Arc::new(remote_addr);
let builder = builder.clone();
let responder =
Responder::<Arc<L::Addr>>::new(tree.clone(), Some(remote_addr.clone()));

let shutdown_tx = Arc::clone(&shutdown_tx);
let close_rx = close_rx.clone();

tokio::spawn(async move {
let conn = builder.serve_connection_with_upgrades(io, responder);
pin!(conn);

let shutdown = shutdown_tx.closed().fuse();
pin!(shutdown);

loop {
select! {
res = conn.as_mut() => {
if let Err(e) = res {
tracing::error!("connection failed: {e}");
}
break;
}
() = &mut shutdown => {
tracing::trace!("connection is starting to graceful shutdown");
conn.as_mut().graceful_shutdown();
}
}
}

tracing::trace!("connection {:?} closed", remote_addr);

drop(close_rx);
});
}

drop(close_rx);
drop(listener);

tracing::trace!(
"waiting for {} task(s) to finish",
close_tx.receiver_count()
);
close_tx.closed().await;

tracing::trace!("server shutdown complete");

Ok(())
})
}
}

fn is_connection_error(e: &io::Error) -> bool {
matches!(
e.kind(),
io::ErrorKind::ConnectionRefused
| io::ErrorKind::ConnectionAborted
| io::ErrorKind::ConnectionReset
)
}
4 changes: 1 addition & 3 deletions viz/src/server/listener.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::{future::Future, io::Result};

use tokio::io::{AsyncRead, AsyncWrite};

/// A trait for a listener: `TcpListener` and `UnixListener`.
pub trait Listener {
/// The stream's type of this listener.
type Io: AsyncRead + AsyncWrite;
type Io;
/// The socket address type of this listener.
type Addr;

Expand Down
Loading

0 comments on commit ed275ef

Please sign in to comment.