diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 13d5fcad5ec8c..a2f3457bb1266 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::error::v2::AsReport as _; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; use risingwave_ctl::CliOpts as CtlOpts; @@ -64,16 +63,5 @@ pub fn compactor(opts: CompactorOpts) { pub fn ctl(opts: CtlOpts) { init_risingwave_logger(LoggerSettings::new("ctl").stderr(true)); - - // Note: Use a simple current thread runtime for ctl. - // When there's a heavy workload, multiple thread runtime seems to respond slowly. May need - // further investigation. - if let Err(e) = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(risingwave_ctl::start(opts)) - { - eprintln!("Error: {:#?}", e.as_report()); - } + main_okk(risingwave_ctl::start(opts)); } diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs index 715460deb27ac..9cdc99c0d3156 100644 --- a/src/ctl/src/lib.rs +++ b/src/ctl/src/lib.rs @@ -22,6 +22,7 @@ use cmd_impl::hummock::SstDumpArgs; use risingwave_hummock_sdk::HummockEpoch; use risingwave_meta::backup_restore::RestoreOpts; use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability; +use thiserror_ext::AsReport; use crate::cmd_impl::hummock::{ build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions, @@ -550,14 +551,28 @@ pub enum ProfileCommands { }, } -pub async fn start(opts: CliOpts) -> Result<()> { +/// Start `risectl` with the given options. +/// Log and abort the process if any error occurs. +/// +/// Note: use [`start_fallible`] if you want to call functionalities of `risectl` +/// in an embedded manner. +pub async fn start(opts: CliOpts) { + if let Err(e) = start_fallible(opts).await { + eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace + std::process::exit(1); + } +} + +/// Start `risectl` with the given options. +/// Return `Err` if any error occurs. +pub async fn start_fallible(opts: CliOpts) -> Result<()> { let context = CtlContext::default(); let result = start_impl(opts, &context).await; context.try_close().await; result } -pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { +async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> { match opts.command { Commands::Compute(ComputeCommands::ShowConfig { host }) => { cmd_impl::compute::show_config(&host).await? diff --git a/src/tests/simulation/src/ctl_ext.rs b/src/tests/simulation/src/ctl_ext.rs index 5a880ec05dfd9..77ae0ee414e49 100644 --- a/src/tests/simulation/src/ctl_ext.rs +++ b/src/tests/simulation/src/ctl_ext.rs @@ -15,6 +15,7 @@ #![cfg_attr(not(madsim), expect(unused_imports))] use std::collections::{HashMap, HashSet}; +use std::ffi::OsString; use std::fmt::Write; use std::sync::Arc; @@ -380,7 +381,6 @@ impl Cluster { .spawn(async move { let revision = format!("{}", revision); let mut v = vec![ - "ctl", "meta", "reschedule", "--plan", @@ -393,8 +393,7 @@ impl Cluster { v.push("--resolve-no-shuffle"); } - let opts = risingwave_ctl::CliOpts::parse_from(v); - risingwave_ctl::start(opts).await + start_ctl(v).await }) .await??; @@ -404,26 +403,14 @@ impl Cluster { /// Pause all data sources in the cluster. #[cfg_or_panic(madsim)] pub async fn pause(&mut self) -> Result<()> { - self.ctl - .spawn(async move { - let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "pause"]); - risingwave_ctl::start(opts).await - }) - .await??; - + self.ctl.spawn(start_ctl(["meta", "pause"])).await??; Ok(()) } /// Resume all data sources in the cluster. #[cfg_or_panic(madsim)] pub async fn resume(&mut self) -> Result<()> { - self.ctl - .spawn(async move { - let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "resume"]); - risingwave_ctl::start(opts).await - }) - .await??; - + self.ctl.spawn(start_ctl(["meta", "resume"])).await??; Ok(()) } @@ -433,7 +420,6 @@ impl Cluster { self.ctl .spawn(async move { let mut command: Vec = vec![ - "ctl".into(), "throttle".into(), "mv".into(), table_id.table_id.to_string(), @@ -441,8 +427,7 @@ impl Cluster { if let Some(rate_limit) = rate_limit { command.push(rate_limit.to_string()); } - let opts = risingwave_ctl::CliOpts::parse_from(command); - risingwave_ctl::start(opts).await + start_ctl(command).await }) .await??; Ok(()) @@ -479,3 +464,14 @@ impl Cluster { Ok(resp) } } + +#[cfg_attr(not(madsim), allow(dead_code))] +async fn start_ctl(args: I) -> Result<()> +where + S: Into, + I: IntoIterator, +{ + let args = std::iter::once("ctl".into()).chain(args.into_iter().map(|s| s.into())); + let opts = risingwave_ctl::CliOpts::parse_from(args); + risingwave_ctl::start_fallible(opts).await +}