From b8e08c7245af2effadcfc4ab98b80156ef39bed9 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 30 Jul 2024 17:18:19 +0800 Subject: [PATCH] feat: also gracefully shutdown on SIGTERM (#17802) Signed-off-by: Bugen Zhao --- src/utils/runtime/src/lib.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/src/utils/runtime/src/lib.rs b/src/utils/runtime/src/lib.rs index 6418c18ea103..75a4a5f4d00b 100644 --- a/src/utils/runtime/src/lib.rs +++ b/src/utils/runtime/src/lib.rs @@ -35,6 +35,7 @@ mod panic_hook; pub use panic_hook::*; mod prof; use prof::*; +use tokio::signal::unix::SignalKind; /// Start RisingWave components with configs from environment variable. /// @@ -100,20 +101,22 @@ where let shutdown = CancellationToken::new(); let mut fut = pin!(f(shutdown.clone())); + let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap(); + let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap(); + tokio::select! { biased; - result = tokio::signal::ctrl_c() => { - result.expect("failed to receive ctrl-c signal"); - tracing::info!("received ctrl-c, shutting down... (press ctrl-c again to force shutdown)"); - // Send shutdown signal. + // Watch SIGINT, typically originating from user pressing ctrl-c. + // Attempt to shutdown gracefully and force shutdown on the next signal. + _ = sigint.recv() => { + tracing::info!("received ctrl-c, shutting down... (press ctrl-c again to force shutdown)"); shutdown.cancel(); // While waiting for the future to finish, listen for the second ctrl-c signal. tokio::select! { biased; - result = tokio::signal::ctrl_c() => { - result.expect("failed to receive ctrl-c signal"); + _ = sigint.recv() => { tracing::warn!("forced shutdown"); // Directly exit the process **here** instead of returning from the future, since @@ -123,6 +126,16 @@ where _ = &mut fut => {}, } } + + // Watch SIGTERM, typically originating from Kubernetes. + // Attempt to shutdown gracefully. No need to force shutdown since it will send SIGKILL after a timeout. + _ = sigterm.recv() => { + tracing::info!("received SIGTERM, shutting down..."); + shutdown.cancel(); + fut.await; + } + + // Proceed with the future. _ = &mut fut => {}, } };