Skip to content

Commit

Permalink
refactor: graceful shutdown on risectl (#17574)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Jul 8, 2024
1 parent 739d005 commit 496d3a3
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 12 deletions.
3 changes: 1 addition & 2 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,5 @@ pub fn compactor(opts: CompactorOpts) -> ! {

pub fn ctl(opts: CtlOpts) -> ! {
init_risingwave_logger(LoggerSettings::new("ctl").stderr(true));
// TODO(shutdown): pass the shutdown token
main_okk(|_| risingwave_ctl::start(opts));
main_okk(|shutdown| risingwave_ctl::start(opts, shutdown));
}
6 changes: 4 additions & 2 deletions src/ctl/src/common/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ impl CtlContext {
.cloned()
}

pub async fn try_close(mut self) {
pub async fn try_close(&self) {
tracing::info!("clean up context");
self.meta_client.take().unwrap().try_unregister().await;
if let Some(meta_client) = self.meta_client.get() {
meta_client.try_unregister().await;
}
}
}
26 changes: 19 additions & 7 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use clap::{Args, Parser, Subcommand};
use cmd_impl::bench::BenchCommands;
use cmd_impl::hummock::SstDumpArgs;
use itertools::Itertools;
use risingwave_common::util::tokio_util::sync::CancellationToken;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_meta::backup_restore::RestoreOpts;
use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm;
Expand Down Expand Up @@ -614,22 +615,33 @@ pub enum ProfileCommands {
}

/// Start `risectl` with the given options.
/// Cancel the operation when the given `shutdown` token triggers.
/// Log and abort the process if any error occurs.
///
/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
/// in an embedded manner.
pub async fn start(opts: CliOpts) {
if let Err(e) = start_fallible(opts).await {
eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
std::process::exit(1);
pub async fn start(opts: CliOpts, shutdown: CancellationToken) {
let context = CtlContext::default();

tokio::select! {
_ = shutdown.cancelled() => {
// Shutdown requested, clean up the context and return.
context.try_close().await;
}

result = start_fallible(opts, &context) => {
if let Err(e) = result {
eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
std::process::exit(1);
}
}
}
}

/// Start `risectl` with the given options.
/// Return `Err` if any error occurs.
pub async fn start_fallible(opts: CliOpts) -> Result<()> {
let context = CtlContext::default();
let result = start_impl(opts, &context).await;
pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> {
let result = start_impl(opts, context).await;
context.try_close().await;
result
}
Expand Down
3 changes: 2 additions & 1 deletion src/tests/simulation/src/ctl_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,5 +441,6 @@ where
{
let args = std::iter::once("ctl".into()).chain(args.into_iter().map(|s| s.into()));
let opts = risingwave_ctl::CliOpts::parse_from(args);
risingwave_ctl::start_fallible(opts).await
let context = risingwave_ctl::common::CtlContext::default();
risingwave_ctl::start_fallible(opts, &context).await
}

0 comments on commit 496d3a3

Please sign in to comment.