diff --git a/Cargo.lock b/Cargo.lock index 1c074e276553a..ba037f31ef9d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8685,6 +8685,7 @@ dependencies = [ "strum_macros 0.26.1", "task_stats_alloc", "tempfile", + "thiserror-ext", "tikv-jemallocator", "tracing", "vergen", diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index ce110c9effc17..13d5fcad5ec8c 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use risingwave_common::error::v2::AsReport as _; use risingwave_compactor::CompactorOpts; use risingwave_compute::ComputeNodeOpts; use risingwave_ctl::CliOpts as CtlOpts; @@ -67,13 +68,12 @@ pub fn ctl(opts: CtlOpts) { // Note: Use a simple current thread runtime for ctl. // When there's a heavy workload, multiple thread runtime seems to respond slowly. May need // further investigation. - tokio::runtime::Builder::new_current_thread() + if let Err(e) = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() .block_on(risingwave_ctl::start(opts)) - .inspect_err(|e| { - eprintln!("{:#?}", e); - }) - .unwrap(); + { + eprintln!("Error: {:#?}", e.as_report()); + } } diff --git a/src/cmd_all/Cargo.toml b/src/cmd_all/Cargo.toml index bb57fbfe88a09..c5f193ef8a2a3 100644 --- a/src/cmd_all/Cargo.toml +++ b/src/cmd_all/Cargo.toml @@ -58,6 +58,7 @@ workspace-hack = { path = "../workspace-hack" } expect-test = "1" [build-dependencies] +thiserror-ext = { workspace = true } vergen = { version = "8", default-features = false, features = [ "build", "git", diff --git a/src/cmd_all/build.rs b/src/cmd_all/build.rs index a4a7c27e65685..38d9f2d7107a6 100644 --- a/src/cmd_all/build.rs +++ b/src/cmd_all/build.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use thiserror_ext::AsReport; use vergen::EmitBuilder; fn main() { if let Err(e) = EmitBuilder::builder().git_sha(true).fail_on_error().emit() { // Leave the environment variable unset if error occurs. - println!("cargo:warning={}", e) + println!("cargo:warning={}", e.as_report()) } } diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 971fb28d208c2..b1415a00b1362 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -391,9 +391,9 @@ impl<'de> Deserialize<'de> for DefaultParallelism { VirtualNode::COUNT )))? } else { - NonZeroUsize::new(i) - .context("default parallelism should be greater than 0") - .map_err(|e| serde::de::Error::custom(e.to_string()))? + NonZeroUsize::new(i).ok_or_else(|| { + serde::de::Error::custom("default parallelism should be greater than 0") + })? })), } } diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 2bc4160e7a263..fc6afc379eb76 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -107,12 +107,9 @@ impl Sink for NatsSink { "Nats sink only support append-only mode" ))); } - let _client = self - .config - .common - .build_client() - .await - .context("validate nats sink error")?; + let _client = (self.config.common.build_client().await) + .context("validate nats sink error") + .map_err(SinkError::Nats)?; Ok(()) } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 43753dad599c7..902f113526a2e 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -14,7 +14,7 @@ use std::str::FromStr; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use futures_async_stream::try_stream; use itertools::Itertools; @@ -79,8 +79,8 @@ impl SplitReader for CdcSplitReader { if matches!(T::source_type(), CdcSourceType::Citus) && let Some(server_addr) = split.server_addr() { - let host_addr = HostAddr::from_str(&server_addr) - .map_err(|err| anyhow!("invalid server address for cdc split. {}", err))?; + let host_addr = + HostAddr::from_str(&server_addr).context("invalid server address for cdc split")?; properties.insert("hostname".to_string(), host_addr.host); properties.insert("port".to_string(), host_addr.port.to_string()); // rewrite table name with suffix to capture all shards in the split @@ -218,7 +218,7 @@ impl CommonSplitReader for CdcSplitReader { GLOBAL_ERROR_METRICS.cdc_source_error.report([ source_type.as_str_name().into(), source_id.clone(), - e.to_string(), + e.to_report_string(), ]); Err(e)?; } diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index 16314d21dbc1e..4441be8c9db21 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::time::Duration; -use anyhow::{anyhow, Context as _}; +use anyhow::{anyhow, Context}; use async_trait::async_trait; use rdkafka::consumer::{BaseConsumer, Consumer}; use rdkafka::error::KafkaResult; @@ -112,6 +112,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { self.broker_address ) })?; + let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?; let mut start_offsets = self .fetch_start_offset(topic_partitions.as_ref(), &watermarks) diff --git a/src/error/src/lib.rs b/src/error/src/lib.rs index d3364485e8f2f..f7a2611b84a65 100644 --- a/src/error/src/lib.rs +++ b/src/error/src/lib.rs @@ -23,3 +23,6 @@ pub mod anyhow; pub mod tonic; + +// Re-export the `thiserror-ext` crate. +pub use thiserror_ext::*; diff --git a/src/risedevtool/src/bin/risedev-dev.rs b/src/risedevtool/src/bin/risedev-dev.rs index ea9099eae319c..9723ee89fbd51 100644 --- a/src/risedevtool/src/bin/risedev-dev.rs +++ b/src/risedevtool/src/bin/risedev-dev.rs @@ -31,6 +31,7 @@ use risedev::{ RISEDEV_SESSION_NAME, }; use tempfile::tempdir; +use thiserror_ext::AsReport; use yaml_rust::YamlEmitter; #[derive(Default)] @@ -444,9 +445,9 @@ fn main() -> Result<()> { } Err(err) => { println!( - "{} - Failed to start: {:?}", // with `Caused by` + "{} - Failed to start: {:#}", // pretty with `Caused by` style("ERROR").red().bold(), - err, + err.as_report(), ); println!(); println!( diff --git a/src/risedevtool/src/preflight_check.rs b/src/risedevtool/src/preflight_check.rs index 17dc48884fea8..9b25d39423566 100644 --- a/src/risedevtool/src/preflight_check.rs +++ b/src/risedevtool/src/preflight_check.rs @@ -17,6 +17,7 @@ use std::process::Command; use anyhow::Result; use console::style; +use thiserror_ext::AsReport; fn preflight_check_proxy() -> Result<()> { if env::var("http_proxy").is_ok() @@ -72,7 +73,7 @@ pub fn preflight_check() -> Result<()> { "[{}] {} - failed to run proxy preflight check: {}", style("risedev-preflight-check").bold(), style("WARN").yellow().bold(), - e + e.as_report() ); } @@ -81,7 +82,7 @@ pub fn preflight_check() -> Result<()> { "[{}] {} - failed to run ulimit preflight check: {}", style("risedev-preflight-check").bold(), style("WARN").yellow().bold(), - e + e.as_report() ); } diff --git a/src/tests/regress/src/lib.rs b/src/tests/regress/src/lib.rs index efdd0f1422c00..a18afdebc843f 100644 --- a/src/tests/regress/src/lib.rs +++ b/src/tests/regress/src/lib.rs @@ -27,6 +27,9 @@ #![deny(rustdoc::broken_intra_doc_links)] #![feature(path_file_prefix)] #![feature(let_chains)] +#![feature(register_tool)] +#![register_tool(rw)] +#![allow(rw::format_error)] mod opts;