From d6d332eed581381bbafc5b23e3fdc6dbe99c0316 Mon Sep 17 00:00:00 2001 From: Kirill Ivanov Date: Tue, 19 Nov 2024 13:57:52 +0900 Subject: [PATCH 1/3] shutdown token that stops http+grpc+metrics servers --- libs/blockscout-service-launcher/Cargo.toml | 2 ++ .../src/launcher/launch.rs | 34 +++++++++++++++---- .../src/launcher/metrics.rs | 17 ++++++++-- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/libs/blockscout-service-launcher/Cargo.toml b/libs/blockscout-service-launcher/Cargo.toml index 4faca6604..60f3cd92d 100644 --- a/libs/blockscout-service-launcher/Cargo.toml +++ b/libs/blockscout-service-launcher/Cargo.toml @@ -27,6 +27,7 @@ reqwest = { version = "0.11", features = ["json"], optional = true } serde = { version = "1.0", features = ["derive"], optional = true } serde_json = {version = "1", optional = true } tokio = { version = "1", optional = true } +tokio-util = { version = "0.7.12", optional = true } tonic = { version = "0.8", optional = true } tracing = { version = "0.1", optional = true } tracing-actix-web = { package = "blockscout-tracing-actix-web", version = "0.8.0", optional = true } @@ -68,6 +69,7 @@ launcher = [ "dep:prometheus", "dep:serde", "dep:tokio", + "dep:tokio-util", "dep:tonic", "dep:tracing", "dep:tracing-actix-web", diff --git a/libs/blockscout-service-launcher/src/launcher/launch.rs b/libs/blockscout-service-launcher/src/launcher/launch.rs index 5ffec42a0..3de4c6ddf 100644 --- a/libs/blockscout-service-launcher/src/launcher/launch.rs +++ b/libs/blockscout-service-launcher/src/launcher/launch.rs @@ -8,6 +8,7 @@ use super::{ use actix_web::{middleware::Condition, App, HttpServer}; use actix_web_prom::PrometheusMetrics; use std::net::SocketAddr; +use tokio_util::sync::CancellationToken; use tracing_actix_web::TracingLogger; pub struct LaunchSettings { @@ -20,6 +21,7 @@ pub async fn launch( settings: &LaunchSettings, http: R, grpc: tonic::transport::server::Router, + shutdown: Option, ) -> Result<(), anyhow::Error> where R: HttpRouter + Send + Sync + Clone + 'static, @@ -39,6 +41,7 @@ where .as_ref() .map(|metrics| metrics.http_middleware().clone()), &settings.server.http, + shutdown.clone(), ); tokio::spawn(async move { http_server_future.await.map_err(anyhow::Error::msg) }) }; @@ -47,7 +50,7 @@ where if settings.server.grpc.enabled { let grpc_server = { - let grpc_server_future = grpc_serve(grpc, settings.server.grpc.addr); + let grpc_server_future = grpc_serve(grpc, settings.server.grpc.addr, shutdown.clone()); tokio::spawn(async move { grpc_server_future.await.map_err(anyhow::Error::msg) }) }; futures.push(grpc_server) @@ -56,7 +59,7 @@ where if let Some(metrics) = metrics { let addr = settings.metrics.addr; futures.push(tokio::spawn(async move { - metrics.run_server(addr).await?; + metrics.run_server(addr, shutdown).await?; Ok(()) })); } @@ -68,10 +71,20 @@ where res? } +pub(crate) async fn stop_actix_server_on_cancel( + actix_handle: actix_web::dev::ServerHandle, + shutdown: CancellationToken, + graceful: bool, +) { + shutdown.cancelled().await; + actix_handle.stop(graceful).await; +} + fn http_serve( http: R, metrics: Option, settings: &HttpServerSettings, + shutdown: Option, ) -> actix_web::dev::Server where R: HttpRouter + Send + Sync + Clone + 'static, @@ -84,7 +97,7 @@ where let json_cfg = actix_web::web::JsonConfig::default().limit(settings.max_body_size); let cors_settings = settings.cors.clone(); let cors_enabled = cors_settings.enabled; - if let Some(metrics) = metrics { + let server = if let Some(metrics) = metrics { HttpServer::new(move || { let cors = cors_settings.clone().build(); App::new() @@ -109,13 +122,22 @@ where .bind(settings.addr) .expect("failed to bind server") .run() + }; + if let Some(shutdown) = shutdown { + tokio::spawn(stop_actix_server_on_cancel(server.handle(), shutdown, true)); } + server } -fn grpc_serve( +async fn grpc_serve( grpc: tonic::transport::server::Router, addr: SocketAddr, -) -> impl futures::Future> { + shutdown: Option, +) -> Result<(), tonic::transport::Error> { tracing::info!("starting grpc server on addr {}", addr); - grpc.serve(addr) + if let Some(shutdown) = shutdown { + grpc.serve_with_shutdown(addr, shutdown.cancelled()).await + } else { + grpc.serve(addr).await + } } diff --git a/libs/blockscout-service-launcher/src/launcher/metrics.rs b/libs/blockscout-service-launcher/src/launcher/metrics.rs index 7257ddb13..721f790f4 100644 --- a/libs/blockscout-service-launcher/src/launcher/metrics.rs +++ b/libs/blockscout-service-launcher/src/launcher/metrics.rs @@ -1,6 +1,9 @@ use actix_web::{App, HttpServer}; use actix_web_prom::{PrometheusMetrics, PrometheusMetricsBuilder}; use std::{collections::HashMap, net::SocketAddr}; +use tokio_util::sync::CancellationToken; + +use crate::launcher::launch::stop_actix_server_on_cancel; #[derive(Clone)] pub struct Metrics { @@ -33,11 +36,19 @@ impl Metrics { &self.http_middleware } - pub fn run_server(self, addr: SocketAddr) -> actix_web::dev::Server { + pub fn run_server( + self, + addr: SocketAddr, + shutdown: Option, + ) -> actix_web::dev::Server { tracing::info!(addr = ?addr, "starting metrics server"); - HttpServer::new(move || App::new().wrap(self.metrics_middleware.clone())) + let server = HttpServer::new(move || App::new().wrap(self.metrics_middleware.clone())) .bind(addr) .unwrap() - .run() + .run(); + if let Some(shutdown) = shutdown { + tokio::spawn(stop_actix_server_on_cancel(server.handle(), shutdown, true)); + } + server } } From eb3461a36e0c3d4579fb0ec0281dd4685b852130 Mon Sep 17 00:00:00 2001 From: Kirill Ivanov Date: Tue, 19 Nov 2024 14:59:27 +0900 Subject: [PATCH 2/3] shutdown timeout --- .../src/launcher/launch.rs | 16 +++++++++++++++- .../src/launcher/metrics.rs | 3 ++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/libs/blockscout-service-launcher/src/launcher/launch.rs b/libs/blockscout-service-launcher/src/launcher/launch.rs index 3de4c6ddf..76f5193ff 100644 --- a/libs/blockscout-service-launcher/src/launcher/launch.rs +++ b/libs/blockscout-service-launcher/src/launcher/launch.rs @@ -11,6 +11,8 @@ use std::net::SocketAddr; use tokio_util::sync::CancellationToken; use tracing_actix_web::TracingLogger; +pub(crate) const SHUTDOWN_TIMEOUT_SEC: u64 = 10; + pub struct LaunchSettings { pub service_name: String, pub server: ServerSettings, @@ -77,9 +79,18 @@ pub(crate) async fn stop_actix_server_on_cancel( graceful: bool, ) { shutdown.cancelled().await; + tracing::info!( + "Shutting down actix server (gracefully: {graceful}).\ + Should finish within {SHUTDOWN_TIMEOUT_SEC} seconds..." + ); actix_handle.stop(graceful).await; } +pub(crate) async fn grpc_cancel_signal(shutdown: CancellationToken) { + shutdown.cancelled().await; + tracing::info!("Shutting down grpc server..."); +} + fn http_serve( http: R, metrics: Option, @@ -107,6 +118,7 @@ where .app_data(json_cfg.clone()) .configure(configure_router(&http)) }) + .shutdown_timeout(SHUTDOWN_TIMEOUT_SEC) .bind(settings.addr) .expect("failed to bind server") .run() @@ -119,6 +131,7 @@ where .app_data(json_cfg.clone()) .configure(configure_router(&http)) }) + .shutdown_timeout(SHUTDOWN_TIMEOUT_SEC) .bind(settings.addr) .expect("failed to bind server") .run() @@ -136,7 +149,8 @@ async fn grpc_serve( ) -> Result<(), tonic::transport::Error> { tracing::info!("starting grpc server on addr {}", addr); if let Some(shutdown) = shutdown { - grpc.serve_with_shutdown(addr, shutdown.cancelled()).await + grpc.serve_with_shutdown(addr, grpc_cancel_signal(shutdown)) + .await } else { grpc.serve(addr).await } diff --git a/libs/blockscout-service-launcher/src/launcher/metrics.rs b/libs/blockscout-service-launcher/src/launcher/metrics.rs index 721f790f4..e1029f09d 100644 --- a/libs/blockscout-service-launcher/src/launcher/metrics.rs +++ b/libs/blockscout-service-launcher/src/launcher/metrics.rs @@ -3,7 +3,7 @@ use actix_web_prom::{PrometheusMetrics, PrometheusMetricsBuilder}; use std::{collections::HashMap, net::SocketAddr}; use tokio_util::sync::CancellationToken; -use crate::launcher::launch::stop_actix_server_on_cancel; +use crate::launcher::launch::{stop_actix_server_on_cancel, SHUTDOWN_TIMEOUT_SEC}; #[derive(Clone)] pub struct Metrics { @@ -43,6 +43,7 @@ impl Metrics { ) -> actix_web::dev::Server { tracing::info!(addr = ?addr, "starting metrics server"); let server = HttpServer::new(move || App::new().wrap(self.metrics_middleware.clone())) + .shutdown_timeout(SHUTDOWN_TIMEOUT_SEC) .bind(addr) .unwrap() .run(); From 965c40aa7bac928a7d95277abcfe05e63aaf1143 Mon Sep 17 00:00:00 2001 From: Kirill Ivanov Date: Tue, 19 Nov 2024 18:26:23 +0900 Subject: [PATCH 3/3] bump crate version --- libs/blockscout-service-launcher/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/blockscout-service-launcher/Cargo.toml b/libs/blockscout-service-launcher/Cargo.toml index 60f3cd92d..365b7f247 100644 --- a/libs/blockscout-service-launcher/Cargo.toml +++ b/libs/blockscout-service-launcher/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "blockscout-service-launcher" -version = "0.14.0" +version = "0.15.0" description = "Allows to launch blazingly fast blockscout rust services" license = "MIT" repository = "https://github.com/blockscout/blockscout-rs"