Skip to content

Commit

Permalink
chore(examples/hyper): port to hyper 1.x (#195)
Browse files Browse the repository at this point in the history
Based on #184

Co-authored-by: Jun Kurihara <[email protected]>
Signed-off-by: Tamas Levai <[email protected]>
  • Loading branch information
levaitamas and junkurihara authored May 13, 2024
1 parent b462e61 commit bf196d7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 21 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ rand = "0.8.4"
tide = "0.16"
actix-web = "4"
tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "signal"] }
hyper = { version = "0.14.16", features = ["server", "http1", "tcp"] }
hyper = { version = "1.3.1", features = ["server", "http1"] }
hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1.1"

[build-dependencies]
prost-build = { version = "0.11.0", optional = true }
Expand Down
62 changes: 42 additions & 20 deletions examples/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use http_body_util::{combinators, BodyExt, Full};
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
body::{Bytes, Incoming},
server::conn::http1,
service::service_fn,
Request, Response,
};
use hyper_util::rt::TokioIo;
use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry};
use std::{
future::Future,
Expand All @@ -10,7 +14,11 @@ use std::{
pin::Pin,
sync::Arc,
};
use tokio::signal::unix::{signal, SignalKind};
use tokio::{
net::TcpListener,
pin,
signal::unix::{signal, SignalKind},
};

#[tokio::main]
async fn main() {
Expand All @@ -31,39 +39,48 @@ async fn main() {

/// Start a HTTP server to report metrics.
pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) {
let mut shutdown_stream = signal(SignalKind::terminate()).unwrap();

eprintln!("Starting metrics server on {metrics_addr}");

let registry = Arc::new(registry);
Server::bind(&metrics_addr)
.serve(make_service_fn(move |_conn| {
let registry = registry.clone();
async move {
let handler = make_handler(registry);
Ok::<_, io::Error>(service_fn(handler))

let tcp_listener = TcpListener::bind(metrics_addr).await.unwrap();
let server = http1::Builder::new();
while let Ok((stream, _)) = tcp_listener.accept().await {
let mut shutdown_stream = signal(SignalKind::terminate()).unwrap();
let io = TokioIo::new(stream);
let server_clone = server.clone();
let registry_clone = registry.clone();
tokio::task::spawn(async move {
let conn = server_clone.serve_connection(io, service_fn(make_handler(registry_clone)));
pin!(conn);
tokio::select! {
_ = conn.as_mut() => {}
_ = shutdown_stream.recv() => {
conn.as_mut().graceful_shutdown();
}
}
}))
.with_graceful_shutdown(async move {
shutdown_stream.recv().await;
})
.await
.unwrap();
});
}
}

/// Boxed HTTP body for responses
type BoxBody = combinators::BoxBody<Bytes, hyper::Error>;

/// This function returns a HTTP handler (i.e. another function)
pub fn make_handler(
registry: Arc<Registry>,
) -> impl Fn(Request<Body>) -> Pin<Box<dyn Future<Output = io::Result<Response<Body>>> + Send>> {
) -> impl Fn(Request<Incoming>) -> Pin<Box<dyn Future<Output = io::Result<Response<BoxBody>>> + Send>>
{
// This closure accepts a request and responds with the OpenMetrics encoding of our metrics.
move |_req: Request<Body>| {
move |_req: Request<Incoming>| {
let reg = registry.clone();

Box::pin(async move {
let mut buf = String::new();
encode(&mut buf, &reg.clone())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map(|_| {
let body = Body::from(buf);
let body = full(Bytes::from(buf));
Response::builder()
.header(
hyper::header::CONTENT_TYPE,
Expand All @@ -75,3 +92,8 @@ pub fn make_handler(
})
}
}

/// helper function to build a full boxed body
pub fn full(body: Bytes) -> BoxBody {
Full::new(body).map_err(|never| match never {}).boxed()
}

0 comments on commit bf196d7

Please sign in to comment.