From 496d3a3e087307ad8a133b99f358c69b7d338dae Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 8 Jul 2024 13:27:08 +0800 Subject: [PATCH] refactor: graceful shutdown on risectl (#17574) Signed-off-by: Bugen Zhao --- src/cmd/src/lib.rs | 3 +-- src/ctl/src/common/context.rs | 6 ++++-- src/ctl/src/lib.rs | 26 +++++++++++++++++++------- src/tests/simulation/src/ctl_ext.rs | 3 ++- 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index c6bf6c849fd39..0e711458d5196 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -61,6 +61,5 @@ pub fn compactor(opts: CompactorOpts) -> ! { pub fn ctl(opts: CtlOpts) -> ! { init_risingwave_logger(LoggerSettings::new("ctl").stderr(true)); - // TODO(shutdown): pass the shutdown token - main_okk(|_| risingwave_ctl::start(opts)); + main_okk(|shutdown| risingwave_ctl::start(opts, shutdown)); } diff --git a/src/ctl/src/common/context.rs b/src/ctl/src/common/context.rs index da9f2c6c7b10a..2665a482a1074 100644 --- a/src/ctl/src/common/context.rs +++ b/src/ctl/src/common/context.rs @@ -60,8 +60,10 @@ impl CtlContext { .cloned() } - pub async fn try_close(mut self) { + pub async fn try_close(&self) { tracing::info!("clean up context"); - self.meta_client.take().unwrap().try_unregister().await; + if let Some(meta_client) = self.meta_client.get() { + meta_client.try_unregister().await; + } } } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 8fb022f96919f..7242ba1d3c74a 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -20,6 +20,7 @@ use clap::{Args, Parser, Subcommand}; use cmd_impl::bench::BenchCommands; use cmd_impl::hummock::SstDumpArgs; use itertools::Itertools; +use risingwave_common::util::tokio_util::sync::CancellationToken; use risingwave_hummock_sdk::HummockEpoch; use risingwave_meta::backup_restore::RestoreOpts; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::CompressionAlgorithm; @@ -614,22 +615,33 @@ pub enum ProfileCommands { } /// Start `risectl` with the given options. +/// Cancel the operation when the given `shutdown` token triggers. /// Log and abort the process if any error occurs. /// /// Note: use [`start_fallible`] if you want to call functionalities of `risectl` /// in an embedded manner. -pub async fn start(opts: CliOpts) { - if let Err(e) = start_fallible(opts).await { - eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace - std::process::exit(1); +pub async fn start(opts: CliOpts, shutdown: CancellationToken) { + let context = CtlContext::default(); + + tokio::select! { + _ = shutdown.cancelled() => { + // Shutdown requested, clean up the context and return. + context.try_close().await; + } + + result = start_fallible(opts, &context) => { + if let Err(e) = result { + eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace + std::process::exit(1); + } + } } } /// Start `risectl` with the given options. /// Return `Err` if any error occurs. -pub async fn start_fallible(opts: CliOpts) -> Result<()> { - let context = CtlContext::default(); - let result = start_impl(opts, &context).await; +pub async fn start_fallible(opts: CliOpts, context: &CtlContext) -> Result<()> { + let result = start_impl(opts, context).await; context.try_close().await; result } diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 04a93829b88a8..39f7c7c7bc214 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -441,5 +441,6 @@ where { let args = std::iter::once("ctl".into()).chain(args.into_iter().map(|s| s.into())); let opts = risingwave_ctl::CliOpts::parse_from(args); - risingwave_ctl::start_fallible(opts).await + let context = risingwave_ctl::common::CtlContext::default(); + risingwave_ctl::start_fallible(opts, &context).await }