From 9ee4a5338fd4675cc0e457228de1874d6aad032d Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Wed, 28 Feb 2024 13:35:04 +0800 Subject: [PATCH 1/3] fix(risectl): exit with non-zero code when there's an error Signed-off-by: Bugen Zhao --- src/cmd/src/lib.rs | 13 +------------ src/ctl/src/lib.rs | 8 ++++++-- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 13d5fcad5ec8c..543907e741378 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -64,16 +64,5 @@ pub fn compactor(opts: CompactorOpts) { pub fn ctl(opts: CtlOpts) { init_risingwave_logger(LoggerSettings::new("ctl").stderr(true)); - - // Note: Use a simple current thread runtime for ctl. - // When there's a heavy workload, multiple thread runtime seems to respond slowly. May need - // further investigation. - if let Err(e) = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(risingwave_ctl::start(opts)) - { - eprintln!("Error: {:#?}", e.as_report()); - } + main_okk(risingwave_ctl::start(opts)); } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 715460deb27ac..43267c094ba1d 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -22,6 +22,7 @@ use cmd_impl::hummock::SstDumpArgs; use risingwave_hummock_sdk::HummockEpoch; use risingwave_meta::backup_restore::RestoreOpts; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; +use thiserror_ext::AsReport; use crate::cmd_impl::hummock::{ build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions, @@ -550,11 +551,14 @@ pub enum ProfileCommands { }, } -pub async fn start(opts: CliOpts) -> Result<()> { +pub async fn start(opts: CliOpts) { let context = CtlContext::default(); let result = start_impl(opts, &context).await; context.try_close().await; - result + if let Err(e) = result { + eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace + std::process::exit(1); + } } pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { From 311a3eadd9dd41f4973ae88147266732771a2318 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Thu, 29 Feb 2024 13:16:04 +0800 Subject: [PATCH 2/3] fix simulation and clippy Signed-off-by: Bugen Zhao --- src/cmd/src/lib.rs | 1 - src/tests/simulation/src/ctl_ext.rs | 16 ++++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 543907e741378..a2f3457bb1266 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::error::v2::AsReport as _; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; use risingwave_ctl::CliOpts as CtlOpts; diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 5a880ec05dfd9..be0793c0fdc37 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -394,9 +394,9 @@ impl Cluster { } let opts = risingwave_ctl::CliOpts::parse_from(v); - risingwave_ctl::start(opts).await + risingwave_ctl::start(opts).await; }) - .await??; + .await?; Ok(()) } @@ -407,9 +407,9 @@ impl Cluster { self.ctl .spawn(async move { let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "pause"]); - risingwave_ctl::start(opts).await + risingwave_ctl::start(opts).await; }) - .await??; + .await?; Ok(()) } @@ -420,9 +420,9 @@ impl Cluster { self.ctl .spawn(async move { let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "resume"]); - risingwave_ctl::start(opts).await + risingwave_ctl::start(opts).await; }) - .await??; + .await?; Ok(()) } @@ -442,9 +442,9 @@ impl Cluster { command.push(rate_limit.to_string()); } let opts = risingwave_ctl::CliOpts::parse_from(command); - risingwave_ctl::start(opts).await + risingwave_ctl::start(opts).await; }) - .await??; + .await?; Ok(()) } From f005f09a909cef3c6147c4a15cd0cff8d6c59884 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 5 Mar 2024 15:09:09 +0800 Subject: [PATCH 3/3] use fallible start in sim ctl Signed-off-by: Bugen Zhao --- src/ctl/src/lib.rs | 21 +++++++++++---- src/tests/simulation/src/ctl_ext.rs | 40 +++++++++++++---------------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 43267c094ba1d..9cdc99c0d3156 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -551,17 +551,28 @@ pub enum ProfileCommands { }, } +/// Start `risectl` with the given options. +/// Log and abort the process if any error occurs. +/// +/// Note: use [`start_fallible`] if you want to call functionalities of `risectl` +/// in an embedded manner. pub async fn start(opts: CliOpts) { - let context = CtlContext::default(); - let result = start_impl(opts, &context).await; - context.try_close().await; - if let Err(e) = result { + if let Err(e) = start_fallible(opts).await { eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace std::process::exit(1); } } -pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { +/// Start `risectl` with the given options. +/// Return `Err` if any error occurs. +pub async fn start_fallible(opts: CliOpts) -> Result<()> { + let context = CtlContext::default(); + let result = start_impl(opts, &context).await; + context.try_close().await; + result +} + +async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { match opts.command { Commands::Compute(ComputeCommands::ShowConfig { host }) => { cmd_impl::compute::show_config(&host).await? diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index be0793c0fdc37..77ae0ee414e49 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -15,6 +15,7 @@ #![cfg_attr(not(madsim), expect(unused_imports))] use std::collections::{HashMap, HashSet}; +use std::ffi::OsString; use std::fmt::Write; use std::sync::Arc; @@ -380,7 +381,6 @@ impl Cluster { .spawn(async move { let revision = format!("{}", revision); let mut v = vec![ - "ctl", "meta", "reschedule", "--plan", @@ -393,10 +393,9 @@ impl Cluster { v.push("--resolve-no-shuffle"); } - let opts = risingwave_ctl::CliOpts::parse_from(v); - risingwave_ctl::start(opts).await; + start_ctl(v).await }) - .await?; + .await??; Ok(()) } @@ -404,26 +403,14 @@ impl Cluster { /// Pause all data sources in the cluster. #[cfg_or_panic(madsim)] pub async fn pause(&mut self) -> Result<()> { - self.ctl - .spawn(async move { - let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "pause"]); - risingwave_ctl::start(opts).await; - }) - .await?; - + self.ctl.spawn(start_ctl(["meta", "pause"])).await??; Ok(()) } /// Resume all data sources in the cluster. #[cfg_or_panic(madsim)] pub async fn resume(&mut self) -> Result<()> { - self.ctl - .spawn(async move { - let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "resume"]); - risingwave_ctl::start(opts).await; - }) - .await?; - + self.ctl.spawn(start_ctl(["meta", "resume"])).await??; Ok(()) } @@ -433,7 +420,6 @@ impl Cluster { self.ctl .spawn(async move { let mut command: Vec = vec![ - "ctl".into(), "throttle".into(), "mv".into(), table_id.table_id.to_string(), @@ -441,10 +427,9 @@ impl Cluster { if let Some(rate_limit) = rate_limit { command.push(rate_limit.to_string()); } - let opts = risingwave_ctl::CliOpts::parse_from(command); - risingwave_ctl::start(opts).await; + start_ctl(command).await }) - .await?; + .await??; Ok(()) } @@ -479,3 +464,14 @@ impl Cluster { Ok(resp) } } + +#[cfg_attr(not(madsim), allow(dead_code))] +async fn start_ctl(args: I) -> Result<()> +where + S: Into, + I: IntoIterator, +{ + let args = std::iter::once("ctl".into()).chain(args.into_iter().map(|s| s.into())); + let opts = risingwave_ctl::CliOpts::parse_from(args); + risingwave_ctl::start_fallible(opts).await +}