Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable tracking the execution phase of a server #511

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 116 additions & 7 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::sync::Arc;
use std::thread;
#[cfg(unix)]
use tokio::signal::unix;
use tokio::sync::{watch, Mutex};
use tokio::sync::{broadcast, watch, Mutex};
use tokio::time::{sleep, Duration};

use crate::services::Service;
Expand All @@ -53,6 +53,49 @@ enum ShutdownType {
Quick,
}

/// The execution phase the server is currently in.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum ExecutionPhase {
/// The server was created, but has not started yet.
Setup,

/// Services are being prepared.
///
/// During graceful upgrades this phase acquires the listening FDs from the old process.
Bootstrap,

/// Bootstrap has finished, listening FDs have been transferred.
BootstrapComplete,

/// The server is running and is listening for shutdown signals.
Running,

/// A QUIT signal was received, indicating that a new process wants to take over.
///
/// The server is trying to send the fds to the new process over a Unix socket.
GracefulUpgradeTransferringFds,

/// FDs have been sent to the new process.
/// Waiting a fixed amount of time to allow the new process to take the sockets.
GracefulUpgradeCloseTimeout,

/// A TERM signal was received, indicating that the server should shut down gracefully.
GracefulTerminate,

/// The server is shutting down.
ShutdownStarted,

/// Waiting for the configured grace period to end before shutting down.
ShutdownGracePeriod,

/// Wait for runtimes to finish.
ShutdownRuntimes,

/// The server has stopped.
Terminated,
}

/// The receiver for server's shutdown event. The value will turn to true once the server starts
/// to shutdown
pub type ShutdownWatch = watch::Receiver<bool>;
Expand All @@ -71,6 +114,12 @@ pub struct Server {
shutdown_watch: watch::Sender<bool>,
// TODO: we many want to drop this copy to let sender call closed()
shutdown_recv: ShutdownWatch,

/// Tracks the execution phase of the server during upgrades and graceful shutdowns.
///
/// Users can subscribe to the phase with [`Self::watch_execution_phase()`].
execution_phase_watch: broadcast::Sender<ExecutionPhase>,

/// The parsed server configuration
pub configuration: Arc<ServerConf>,
/// The parser command line options
Expand All @@ -86,13 +135,25 @@ pub struct Server {
// TODO: delete the pid when exit

impl Server {
/// Acquire a receiver for the server's execution phase.
///
/// The receiver will produce values for each transition.
pub fn watch_execution_phase(&self) -> broadcast::Receiver<ExecutionPhase> {
self.execution_phase_watch.subscribe()
}

#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();

self.execution_phase_watch
.send(ExecutionPhase::Running)
.ok();

tokio::select! {
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
Expand All @@ -110,12 +171,18 @@ impl Server {
}
}
info!("Broadcast graceful shutdown complete");

self.execution_phase_watch.send(ExecutionPhase::GracefulTerminate).ok();

ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");

self.execution_phase_watch.send(ExecutionPhase::GracefulUpgradeTransferringFds).ok();

if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
Expand All @@ -131,6 +198,7 @@ impl Server {
sentry::capture_error(&e);
}
}
self.execution_phase_watch.send(ExecutionPhase::GracefulUpgradeCloseTimeout).ok();
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
info!("Broadcasting graceful shutdown");
// gracefully exiting
Expand Down Expand Up @@ -211,6 +279,7 @@ impl Server {
listen_fds: None,
shutdown_watch: tx,
shutdown_recv: rx,
execution_phase_watch: broadcast::channel(100).0,
configuration: Arc::new(conf),
options: opt,
#[cfg(feature = "sentry")]
Expand Down Expand Up @@ -253,6 +322,7 @@ impl Server {
listen_fds: None,
shutdown_watch: tx,
shutdown_recv: rx,
execution_phase_watch: broadcast::channel(100).0,
configuration: Arc::new(conf),
options: opt,
#[cfg(feature = "sentry")]
Expand Down Expand Up @@ -280,6 +350,10 @@ impl Server {
info!("Bootstrap starting");
debug!("{:#?}", self.options);

self.execution_phase_watch
.send(ExecutionPhase::Bootstrap)
.ok();

/* only init sentry in release builds */
#[cfg(all(not(debug_assertions), feature = "sentry"))]
let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
Expand All @@ -304,16 +378,23 @@ impl Server {
std::process::exit(1);
}
}

self.execution_phase_watch
.send(ExecutionPhase::BootstrapComplete)
.ok();
}

/// Start the server
/// Run the server until execution finished.
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
/// This function will run until the server has been instructed to shut down
/// through a signal, and will then wait for all services to finish and
/// runtimes to exit.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(mut self) -> ! {
/// Note: if daemonization is enabled in the config, this function will
/// never return.
/// Instead it will either start the daemon process and exit, or panic
/// if daemonization fails.
pub fn run(mut self) {
info!("Server starting");

let conf = self.configuration.as_ref();
Expand Down Expand Up @@ -358,7 +439,15 @@ impl Server {
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;

self.execution_phase_watch
.send(ExecutionPhase::ShutdownStarted)
.ok();

if matches!(shutdown_type, ShutdownType::Graceful) {
self.execution_phase_watch
.send(ExecutionPhase::ShutdownGracePeriod)
.ok();

let exit_timeout = self
.configuration
.as_ref()
Expand All @@ -379,6 +468,11 @@ impl Server {
.unwrap_or(5),
),
};

self.execution_phase_watch
.send(ExecutionPhase::ShutdownRuntimes)
.ok();

let shutdowns: Vec<_> = runtimes
.into_iter()
.map(|rt| {
Expand All @@ -395,6 +489,21 @@ impl Server {
}
}
info!("All runtimes exited, exiting now");

self.execution_phase_watch
.send(ExecutionPhase::Terminated)
.ok();
}

/// Start the server
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(self) -> ! {
self.run();
std::process::exit(0)
}

Expand Down
53 changes: 53 additions & 0 deletions pingora-core/tests/server_phase_fastshutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// NOTE: This test sends a shutdown signal to itself,
// so it needs to be in an isolated test to prevent concurrency.

use pingora_core::server::{ExecutionPhase, Server};

// Ensure that execution phases are reported correctly.
#[test]
fn test_server_execution_phase_monitor_fast_shutdown() {
let mut server = Server::new(None).unwrap();

let mut phase = server.watch_execution_phase();

let join = std::thread::spawn(move || {
server.bootstrap();
server.run();
});

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Bootstrap
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::BootstrapComplete,
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Running,
));

// Need to wait for startup, otherwise the signal handler is not
// installed yet.
unsafe {
libc::raise(libc::SIGINT);
}

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownStarted,
));

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownRuntimes,
));

join.join().unwrap();

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Terminated,
));
}
69 changes: 69 additions & 0 deletions pingora-core/tests/server_phase_gracefulshutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// NOTE: This test sends a shutdown signal to itself,
// so it needs to be in an isolated test to prevent concurrency.

use pingora_core::server::{configuration::ServerConf, ExecutionPhase, Server};

// Ensure that execution phases are reported correctly.
#[test]
fn test_server_execution_phase_monitor_graceful_shutdown() {
let conf = ServerConf {
// Use small timeouts to speed up the test.
grace_period_seconds: Some(1),
graceful_shutdown_timeout_seconds: Some(1),
..Default::default()
};
let mut server = Server::new_with_opt_and_conf(None, conf);

let mut phase = server.watch_execution_phase();

let join = std::thread::spawn(move || {
server.bootstrap();
server.run();
});

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Bootstrap
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::BootstrapComplete,
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Running,
));

// Need to wait for startup, otherwise the signal handler is not
// installed yet.
unsafe {
libc::raise(libc::SIGTERM);
}

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::GracefulTerminate,
));

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownStarted,
));

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownGracePeriod,
));

assert!(matches!(
dbg!(phase.blocking_recv().unwrap()),
ExecutionPhase::ShutdownRuntimes,
));

join.join().unwrap();

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Terminated,
));
}
Loading