Skip to content

Commit

Permalink
fix(risectl): exit with non-zero code when there's an error (#15325)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 5, 2024
1 parent 71504d7 commit a835506
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 35 deletions.
14 changes: 1 addition & 13 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::error::v2::AsReport as _;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_ctl::CliOpts as CtlOpts;
Expand Down Expand Up @@ -64,16 +63,5 @@ pub fn compactor(opts: CompactorOpts) {

pub fn ctl(opts: CtlOpts) {
init_risingwave_logger(LoggerSettings::new("ctl").stderr(true));

// Note: Use a simple current thread runtime for ctl.
// When there's a heavy workload, multiple thread runtime seems to respond slowly. May need
// further investigation.
if let Err(e) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(risingwave_ctl::start(opts))
{
eprintln!("Error: {:#?}", e.as_report());
}
main_okk(risingwave_ctl::start(opts));
}
19 changes: 17 additions & 2 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use cmd_impl::hummock::SstDumpArgs;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_meta::backup_restore::RestoreOpts;
use risingwave_pb::meta::update_worker_node_schedulability_request::Schedulability;
use thiserror_ext::AsReport;

use crate::cmd_impl::hummock::{
build_compaction_config_vec, list_pinned_snapshots, list_pinned_versions,
Expand Down Expand Up @@ -550,14 +551,28 @@ pub enum ProfileCommands {
},
}

pub async fn start(opts: CliOpts) -> Result<()> {
/// Start `risectl` with the given options.
/// Log and abort the process if any error occurs.
///
/// Note: use [`start_fallible`] if you want to call functionalities of `risectl`
/// in an embedded manner.
pub async fn start(opts: CliOpts) {
if let Err(e) = start_fallible(opts).await {
eprintln!("Error: {:#?}", e.as_report()); // pretty with backtrace
std::process::exit(1);
}
}

/// Start `risectl` with the given options.
/// Return `Err` if any error occurs.
pub async fn start_fallible(opts: CliOpts) -> Result<()> {
let context = CtlContext::default();
let result = start_impl(opts, &context).await;
context.try_close().await;
result
}

pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
match opts.command {
Commands::Compute(ComputeCommands::ShowConfig { host }) => {
cmd_impl::compute::show_config(&host).await?
Expand Down
36 changes: 16 additions & 20 deletions src/tests/simulation/src/ctl_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![cfg_attr(not(madsim), expect(unused_imports))]

use std::collections::{HashMap, HashSet};
use std::ffi::OsString;
use std::fmt::Write;
use std::sync::Arc;

Expand Down Expand Up @@ -380,7 +381,6 @@ impl Cluster {
.spawn(async move {
let revision = format!("{}", revision);
let mut v = vec![
"ctl",
"meta",
"reschedule",
"--plan",
Expand All @@ -393,8 +393,7 @@ impl Cluster {
v.push("--resolve-no-shuffle");
}

let opts = risingwave_ctl::CliOpts::parse_from(v);
risingwave_ctl::start(opts).await
start_ctl(v).await
})
.await??;

Expand All @@ -404,26 +403,14 @@ impl Cluster {
/// Pause all data sources in the cluster.
#[cfg_or_panic(madsim)]
pub async fn pause(&mut self) -> Result<()> {
self.ctl
.spawn(async move {
let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "pause"]);
risingwave_ctl::start(opts).await
})
.await??;

self.ctl.spawn(start_ctl(["meta", "pause"])).await??;
Ok(())
}

/// Resume all data sources in the cluster.
#[cfg_or_panic(madsim)]
pub async fn resume(&mut self) -> Result<()> {
self.ctl
.spawn(async move {
let opts = risingwave_ctl::CliOpts::parse_from(["ctl", "meta", "resume"]);
risingwave_ctl::start(opts).await
})
.await??;

self.ctl.spawn(start_ctl(["meta", "resume"])).await??;
Ok(())
}

Expand All @@ -433,16 +420,14 @@ impl Cluster {
self.ctl
.spawn(async move {
let mut command: Vec<String> = vec![
"ctl".into(),
"throttle".into(),
"mv".into(),
table_id.table_id.to_string(),
];
if let Some(rate_limit) = rate_limit {
command.push(rate_limit.to_string());
}
let opts = risingwave_ctl::CliOpts::parse_from(command);
risingwave_ctl::start(opts).await
start_ctl(command).await
})
.await??;
Ok(())
Expand Down Expand Up @@ -479,3 +464,14 @@ impl Cluster {
Ok(resp)
}
}

#[cfg_attr(not(madsim), allow(dead_code))]
async fn start_ctl<S, I>(args: I) -> Result<()>
where
S: Into<OsString>,
I: IntoIterator<Item = S>,
{
let args = std::iter::once("ctl".into()).chain(args.into_iter().map(|s| s.into()));
let opts = risingwave_ctl::CliOpts::parse_from(args);
risingwave_ctl::start_fallible(opts).await
}

0 comments on commit a835506

Please sign in to comment.