From 5b06e2cb94e6ef94d600cb91969c9f7526f58940 Mon Sep 17 00:00:00 2001 From: Yanhao Date: Thu, 21 Sep 2023 13:51:26 +0800 Subject: [PATCH] support shutdown hooks (#228) * support shutdown hooks * fix: cargo fmt --- volo-thrift/src/server.rs | 28 +++++++++++++++++++ volo-thrift/src/transport/multiplex/client.rs | 5 +--- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/volo-thrift/src/server.rs b/volo-thrift/src/server.rs index 865f3e12..42b94500 100644 --- a/volo-thrift/src/server.rs +++ b/volo-thrift/src/server.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use futures::future::BoxFuture; use motore::{ layer::{Identity, Layer, Stack}, service::Service, @@ -42,6 +43,7 @@ pub struct Server { #[cfg(feature = "multiplex")] multiplex: bool, span_provider: SP, + shutdown_hooks: Vec BoxFuture<'static, ()>>>, _marker: PhantomData, } @@ -66,12 +68,25 @@ impl #[cfg(feature = "multiplex")] multiplex: false, span_provider: DefaultProvider {}, + shutdown_hooks: Vec::new(), _marker: PhantomData, } } } impl Server { + /// Register shutdown hook. + /// + /// Hook functions will be called just before volo's own gracefull existing code starts, + /// in reverse order of registration. + pub fn register_shutdown_hook( + mut self, + hook: impl FnOnce() -> BoxFuture<'static, ()> + 'static, + ) -> Self { + self.shutdown_hooks.push(Box::new(hook)); + self + } + /// Adds a new inner layer to the server. /// /// The layer's `Service` should be `Send + Sync + Clone + 'static`. @@ -92,6 +107,7 @@ impl Server { #[cfg(feature = "multiplex")] multiplex: self.multiplex, span_provider: self.span_provider, + shutdown_hooks: self.shutdown_hooks, _marker: PhantomData, } } @@ -116,6 +132,7 @@ impl Server { #[cfg(feature = "multiplex")] multiplex: self.multiplex, span_provider: self.span_provider, + shutdown_hooks: self.shutdown_hooks, _marker: PhantomData, } } @@ -144,6 +161,7 @@ impl Server { #[cfg(feature = "multiplex")] multiplex: self.multiplex, span_provider: self.span_provider, + shutdown_hooks: self.shutdown_hooks, _marker: PhantomData, } } @@ -291,6 +309,14 @@ impl Server { } } + if !self.shutdown_hooks.is_empty() { + info!("[VOLO] call shutdown hooks"); + + for hook in self.shutdown_hooks { + (hook)().await; + } + } + // received signal, graceful shutdown now info!("[VOLO] received signal, gracefully exiting now"); *exit_flag.write() = true; @@ -330,6 +356,7 @@ impl Server { stat_tracer: self.stat_tracer, multiplex, span_provider: self.span_provider, + shutdown_hooks: self.shutdown_hooks, _marker: PhantomData, } } @@ -343,6 +370,7 @@ impl Server { #[cfg(feature = "multiplex")] multiplex: self.multiplex, span_provider: provider, + shutdown_hooks: self.shutdown_hooks, _marker: PhantomData, } } diff --git a/volo-thrift/src/transport/multiplex/client.rs b/volo-thrift/src/transport/multiplex/client.rs index 6e2f6d67..e1068df9 100644 --- a/volo-thrift/src/transport/multiplex/client.rs +++ b/volo-thrift/src/transport/multiplex/client.rs @@ -157,10 +157,7 @@ where if !oneway { return Err(Error::Transport(pilota::thrift::TransportError::new( TransportErrorKind::EndOfFile, - format!( - "an unexpected end of file from server, cx: {:?}", - cx - ), + format!("an unexpected end of file from server, cx: {:?}", cx), ))); } }