diff --git a/Cargo.lock b/Cargo.lock index bd64f2fb19b3..b00c7da791d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8260,6 +8260,7 @@ dependencies = [ "rustls", "spin 0.9.8", "thiserror", + "thiserror-ext", "tokio-retry", "tracing", ] @@ -8578,6 +8579,7 @@ dependencies = [ "sync-point", "tempfile", "thiserror", + "thiserror-ext", "tokio-retry", "tracing", "tracing-futures", @@ -8640,6 +8642,7 @@ dependencies = [ "static_assertions", "task_stats_alloc", "thiserror", + "thiserror-ext", "tokio-metrics", "tokio-stream", "tracing", @@ -10240,9 +10243,9 @@ dependencies = [ [[package]] name = "thiserror-ext" -version = "0.0.8" +version = "0.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b32ea5f1a980ddd27f9ad46dc18db583f508a4d148dd2cf1ee3d81bfd767cf" +checksum = "778409884b05cc4a1b90b9c3a0f395a01c430355fe299d084152bdf50f3fa807" dependencies = [ "thiserror", "thiserror-ext-derive", @@ -10250,9 +10253,9 @@ dependencies = [ [[package]] name = "thiserror-ext-derive" -version = "0.0.8" +version = "0.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cf8fd06431d331ee33484e2c83f0309dd2ad26606c3b639d5927665a1698043" +checksum = "03162af1a5f99722388354158f3c5af20915d52a5d0d9bc4871d0fd7553e308f" dependencies = [ "either", "proc-macro2", diff --git a/Cargo.toml b/Cargo.toml index 8fd3ad975f02..35c3a590b1a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ arrow-flight = "49" arrow-select = "49" arrow-ord = "49" arrow-row = "49" -thiserror-ext = "0.0.8" +thiserror-ext = "0.0.10" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ "profiling", diff --git a/src/batch/clippy.toml b/src/batch/clippy.toml index c0ba7b7835ef..aeb91fb713fb 100644 --- a/src/batch/clippy.toml +++ b/src/batch/clippy.toml @@ -1,15 +1,10 @@ disallowed-methods = [ - { path = "risingwave_common::error::internal_err", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::internal_error", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::tonic_err", reason = "Please use per-crate error type instead." }, ] disallowed-types = [ { path = "risingwave_common::error::ErrorCode", reason = "Please use per-crate error type instead." }, { path = "risingwave_common::error::RwError", reason = "Please use per-crate error type instead." }, { path = "risingwave_common::error::Result", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::ToRwResult", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::ToErrorStr", reason = "Please use per-crate error type instead." }, ] doc-valid-idents = [ diff --git a/src/common/src/array/error.rs b/src/common/src/array/error.rs index 0cf285b56ef4..0bcebc4120a0 100644 --- a/src/common/src/array/error.rs +++ b/src/common/src/array/error.rs @@ -18,8 +18,6 @@ pub use anyhow::anyhow; use risingwave_pb::PbFieldNotFound; use thiserror::Error; -use crate::error::{ErrorCode, RwError}; - #[derive(Error, Debug)] pub enum ArrayError { #[error("Pb decode error: {0}")] @@ -42,12 +40,6 @@ pub enum ArrayError { ToArrow(String), } -impl From for RwError { - fn from(s: ArrayError) -> Self { - ErrorCode::ArrayError(s).into() - } -} - impl From for ArrayError { fn from(err: PbFieldNotFound) -> Self { anyhow!("Failed to decode prost: field not found `{}`", err.0).into() diff --git a/src/common/src/error.rs b/src/common/src/error.rs index a7b27e9dd9f8..779715763cdc 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; use std::collections::HashSet; use std::convert::Infallible; use std::fmt::{Debug, Display, Formatter}; @@ -23,7 +22,7 @@ use memcomparable::Error as MemComparableError; use risingwave_error::tonic::{ToTonicStatus, TonicStatusWrapper}; use risingwave_pb::PbFieldNotFound; use thiserror::Error; -use thiserror_ext::Macro; +use thiserror_ext::{Box, Macro}; use tokio::task::JoinError; use crate::array::ArrayError; @@ -80,7 +79,8 @@ pub struct NotImplemented { pub issue: TrackingIssue, } -#[derive(Error, Debug)] +#[derive(Error, Debug, Box)] +#[thiserror_ext(newtype(name = RwError, backtrace, report_debug))] pub enum ErrorCode { #[error("internal error: {0}")] InternalError(String), @@ -212,30 +212,11 @@ pub enum ErrorCode { ), } -// TODO(error-handling): automatically generate this impl. -impl From for RwError { - fn from(value: NotImplemented) -> Self { - ErrorCode::from(value).into() - } -} - -pub fn internal_error(msg: impl Into) -> RwError { - ErrorCode::InternalError(msg.into()).into() -} - -#[derive(Error)] -#[error("{inner}")] -pub struct RwError { - #[source] - inner: Box, - backtrace: Box, -} - impl From for tonic::Status { fn from(err: RwError) -> Self { use tonic::Code; - let code = match &*err.inner { + let code = match err.inner() { ErrorCode::ExprError(_) => Code::InvalidArgument, ErrorCode::PermissionDenied(_) => Code::PermissionDenied, ErrorCode::InternalError(_) => Code::Internal, @@ -271,45 +252,9 @@ impl From for RwError { } } -impl RwError { - pub fn inner(&self) -> &ErrorCode { - &self.inner - } -} - -impl From for RwError { - fn from(code: ErrorCode) -> Self { - Self { - inner: Box::new(code), - backtrace: Box::new(Backtrace::capture()), - } - } -} - impl From for RwError { fn from(join_error: JoinError) -> Self { - Self { - inner: Box::new(ErrorCode::InternalError(join_error.to_string())), - backtrace: Box::new(Backtrace::capture()), - } - } -} - -impl From for RwError { - fn from(mem_comparable_error: MemComparableError) -> Self { - ErrorCode::MemComparableError(mem_comparable_error).into() - } -} - -impl From for RwError { - fn from(value_encoding_error: ValueEncodingError) -> Self { - ErrorCode::ValueEncodingError(value_encoding_error).into() - } -} - -impl From for RwError { - fn from(io_err: IoError) -> Self { - ErrorCode::IoError(io_err).into() + anyhow::anyhow!(join_error).into() } } @@ -319,12 +264,6 @@ impl From for RwError { } } -impl From for RwError { - fn from(e: anyhow::Error) -> Self { - ErrorCode::InternalErrorAnyhow(e).into() - } -} - impl From for RwError { fn from(x: Infallible) -> Self { match x {} @@ -337,18 +276,6 @@ impl From for RwError { } } -impl Debug for RwError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}\n{}", - self.inner, - // Use inner error's backtrace by default, otherwise use the generated one in `From`. - std::error::request_ref::(&self.inner).unwrap_or(&*self.backtrace) - ) - } -} - impl From for RwError { fn from(err: PbFieldNotFound) -> Self { ErrorCode::InternalError(format!( @@ -359,60 +286,14 @@ impl From for RwError { } } -impl From for RwError { - fn from(value: SessionConfigError) -> Self { - ErrorCode::SessionConfig(value).into() - } -} - impl From for RwError { fn from(err: tonic::transport::Error) -> Self { ErrorCode::RpcError(err.into()).into() } } -/// Convert `RwError` into `tonic::Status`. Generally used in `map_err`. -pub fn tonic_err(err: impl Into) -> tonic::Status { - err.into().into() -} - pub type Result = std::result::Result; -/// A helper to convert a third-party error to string. -pub trait ToErrorStr { - fn to_error_str(self) -> String; -} - -pub trait ToRwResult { - fn to_rw_result(self) -> Result; - - fn to_rw_result_with(self, func: impl FnOnce() -> String) -> Result; -} - -impl ToRwResult for std::result::Result { - fn to_rw_result(self) -> Result { - self.map_err(|e| ErrorCode::InternalError(e.to_error_str()).into()) - } - - fn to_rw_result_with(self, func: impl FnOnce() -> String) -> Result { - self.map_err(|e| { - ErrorCode::InternalError(format!("{}: {}", func(), e.to_error_str())).into() - }) - } -} - -impl ToErrorStr for std::sync::mpsc::SendError { - fn to_error_str(self) -> String { - self.to_string() - } -} - -impl ToErrorStr for tokio::sync::mpsc::error::SendError { - fn to_error_str(self) -> String { - self.to_string() - } -} - /// Util macro for generating error when condition check failed. /// /// # Case 1: Expression only. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 1c1448b3d1e4..18bd7e427abe 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use itertools::Itertools; -use risingwave_common::error::tonic_err; use risingwave_common::util::tracing::TracingContext; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::LocalSstableInfo; @@ -258,7 +257,7 @@ impl StreamService for StreamServiceImpl { .try_wait_epoch(HummockReadEpoch::Committed(epoch)) .instrument_await(format!("wait_epoch_commit (epoch {})", epoch)) .await - .map_err(tonic_err)?; + .map_err(StreamError::from)?; }); Ok(Response::new(WaitEpochCommitResponse { status: None })) diff --git a/src/frontend/src/binder/relation/mod.rs b/src/frontend/src/binder/relation/mod.rs index 856f221ec985..34ee66e45782 100644 --- a/src/frontend/src/binder/relation/mod.rs +++ b/src/frontend/src/binder/relation/mod.rs @@ -16,8 +16,9 @@ use std::collections::hash_map::Entry; use std::ops::Deref; use itertools::{EitherOrBoth, Itertools}; +use risingwave_common::bail; use risingwave_common::catalog::{Field, TableId, DEFAULT_SCHEMA_NAME}; -use risingwave_common::error::{internal_error, ErrorCode, Result, RwError}; +use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_sqlparser::ast::{ Expr as ParserExpr, FunctionArg, FunctionArgExpr, Ident, ObjectName, TableAlias, TableFactor, }; @@ -208,10 +209,7 @@ impl Binder { /// return first name in identifiers, must have only one name. fn resolve_single_name(mut identifiers: Vec, ident_desc: &str) -> Result { if identifiers.len() > 1 { - return Err(internal_error(format!( - "{} must contain 1 argument", - ident_desc - ))); + bail!("{} must contain 1 argument", ident_desc); } let name = identifiers.pop().unwrap().real_value(); diff --git a/src/frontend/src/expr/function_call.rs b/src/frontend/src/expr/function_call.rs index 87c2c6d95595..48f4f86d7af9 100644 --- a/src/frontend/src/expr/function_call.rs +++ b/src/frontend/src/expr/function_call.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use risingwave_common::catalog::Schema; -use risingwave_common::error::{ErrorCode, Result as RwResult, RwError}; +use risingwave_common::error::{ErrorCode, Result as RwResult}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::iter_util::ZipEqFast; use thiserror::Error; @@ -431,9 +431,3 @@ impl From for ErrorCode { ErrorCode::BindError(value.to_string()) } } - -impl From for RwError { - fn from(value: CastError) -> Self { - ErrorCode::from(value).into() - } -} diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 6397b79b72dc..f1e5061677c1 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -28,7 +28,7 @@ use risingwave_object_store::object::build_remote_object_store; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_pb::java_binding::key_range::Bound; use risingwave_pb::java_binding::{KeyRange, ReadPlan}; -use risingwave_storage::error::{StorageError, StorageResult}; +use risingwave_storage::error::StorageResult; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::store::version::HummockVersionReader; use risingwave_storage::hummock::store::HummockStorageIterator; @@ -135,11 +135,7 @@ impl HummockJavaBindingIterator { Ok(match item { Some((key, value)) => Some(( key.user_key.table_key.0, - OwnedRow::new( - self.row_serde - .deserialize(&value) - .map_err(StorageError::DeserializeRow)?, - ), + OwnedRow::new(self.row_serde.deserialize(&value)?), )), None => None, }) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index f16a4e0a945b..32afb76c2159 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -403,7 +403,7 @@ pub async fn start_service_as_election_leader( let data_directory = system_params_reader.data_directory(); if !is_correct_data_directory(data_directory) { - return Err(MetaError::system_param(format!( + return Err(MetaError::system_params(format!( "The data directory {:?} is misconfigured. Please use a combination of uppercase and lowercase letters and numbers, i.e. [a-z, A-Z, 0-9]. The string cannot start or end with '/', and consecutive '/' are not allowed. diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index 8b0b2e0b5818..55313ec6f5f3 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -739,7 +739,7 @@ impl DdlService for DdlServiceImpl { .await? } else { return Err(Status::from(MetaError::unavailable( - "AWS client is not configured".into(), + "AWS client is not configured", ))); } } diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 5dd17668a12e..26e67b5e3cb5 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -94,7 +94,7 @@ impl ScheduledQueue { // we need to refine it when catalog and streaming metadata can be handled in a transactional way. if let QueueStatus::Blocked(reason) = &self.status && !matches!(scheduled.command, Command::DropStreamingJobs(_)) { - return Err(MetaError::unavailable(reason.clone())); + return Err(MetaError::unavailable(reason)); } self.queue.push_back(scheduled); Ok(()) diff --git a/src/meta/src/controller/cluster.rs b/src/meta/src/controller/cluster.rs index 3aa01637f7a1..0a2b3ec61666 100644 --- a/src/meta/src/controller/cluster.rs +++ b/src/meta/src/controller/cluster.rs @@ -519,8 +519,8 @@ impl ClusterControllerInner { Ok(()) } else { Err(MetaError::invalid_worker( - worker_id as _, - "worker not found".into(), + worker_id as u32, + "worker not found", )) } } @@ -536,8 +536,8 @@ impl ClusterControllerInner { Ok(()) } else { Err(MetaError::invalid_worker( - worker_id as _, - "worker not found".into(), + worker_id as u32, + "worker not found", )) } } @@ -546,14 +546,12 @@ impl ClusterControllerInner { self.worker_extra_info .get(&worker_id) .cloned() - .ok_or_else(|| MetaError::invalid_worker(worker_id as _, "worker not found".into())) + .ok_or_else(|| MetaError::invalid_worker(worker_id as u32, "worker not found")) } fn apply_transaction_id(&self, r#type: PbWorkerType) -> MetaResult> { match (self.available_transactional_ids.front(), r#type) { - (None, _) => Err(MetaError::unavailable( - "no available reusable machine id".to_string(), - )), + (None, _) => Err(MetaError::unavailable("no available reusable machine id")), // We only assign transactional id to compute node and frontend. (Some(id), PbWorkerType::ComputeNode | PbWorkerType::Frontend) => Ok(Some(*id)), _ => Ok(None), diff --git a/src/meta/src/controller/system_param.rs b/src/meta/src/controller/system_param.rs index e64ddf5b4ab8..ed1668e6df60 100644 --- a/src/meta/src/controller/system_param.rs +++ b/src/meta/src/controller/system_param.rs @@ -184,7 +184,7 @@ impl SystemParamsController { .one(&self.db) .await? else { - return Err(MetaError::system_param(format!( + return Err(MetaError::system_params(format!( "unrecognized system parameter {}", name ))); @@ -192,7 +192,7 @@ impl SystemParamsController { let mut params = params_guard.clone(); let mut param: system_parameter::ActiveModel = param.into(); param.value = - Set(set_system_param(&mut params, name, value).map_err(MetaError::system_param)?); + Set(set_system_param(&mut params, name, value).map_err(MetaError::system_params)?); param.update(&self.db).await?; *params_guard = params.clone(); diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 2a91f74d8011..01081ff1ca62 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -372,7 +372,7 @@ where .count(db) .await?; if count != 0 { - return Err(MetaError::permission_denied("schema is not empty".into())); + return Err(MetaError::permission_denied("schema is not empty")); } Ok(()) diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index 93c6c0cb9710..ee02b7400107 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; -use std::sync::Arc; - use aws_sdk_ec2::error::DisplayErrorContext; use risingwave_common::error::BoxedError; use risingwave_connector::sink::SinkError; @@ -28,11 +25,12 @@ use crate::storage::MetaStoreError; pub type MetaResult = std::result::Result; -#[derive(thiserror::Error, Debug)] -enum MetaErrorInner { +#[derive(thiserror::Error, Debug, thiserror_ext::Arc, thiserror_ext::Construct)] +#[thiserror_ext(newtype(name = MetaError, backtrace, report_debug))] +pub enum MetaErrorInner { #[error("MetaStore transaction error: {0}")] TransactionError( - #[from] + #[source] #[backtrace] MetaStoreError, ), @@ -69,6 +67,7 @@ enum MetaErrorInner { // Used for catalog errors. #[error("{0} id not found: {1}")] + #[construct(skip)] CatalogIdNotFound(&'static str, String), #[error("table_fragment not exist: id={0}")] @@ -81,7 +80,7 @@ enum MetaErrorInner { Unavailable(String), #[error("Election failed: {0}")] - Election(String), + Election(#[source] BoxedError), #[error("Cancelled: {0}")] Cancelled(String), @@ -107,118 +106,27 @@ enum MetaErrorInner { ), } -impl From for MetaError { - fn from(inner: MetaErrorInner) -> Self { - Self { - inner: Arc::new(inner), - backtrace: Arc::new(Backtrace::capture()), - } - } -} - -#[derive(thiserror::Error, Clone)] -#[error("{inner}")] -pub struct MetaError { - inner: Arc, - backtrace: Arc, -} - -impl std::fmt::Debug for MetaError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::error::Error; - - write!(f, "{}", self.inner)?; - writeln!(f)?; - if let Some(backtrace) = std::error::request_ref::(&self.inner as &dyn Error) { - write!(f, " backtrace of inner error:\n{}", backtrace)?; - } else { - write!(f, " backtrace of `MetaError`:\n{}", self.backtrace)?; - } - Ok(()) - } -} - impl MetaError { - /// Permission denied error. - pub fn permission_denied(s: String) -> Self { - MetaErrorInner::PermissionDenied(s).into() - } - - pub fn invalid_worker(worker_id: WorkerId, msg: String) -> Self { - MetaErrorInner::InvalidWorker(worker_id, msg).into() - } - pub fn is_invalid_worker(&self) -> bool { - use std::borrow::Borrow; - std::matches!(self.inner.borrow(), &MetaErrorInner::InvalidWorker(..)) - } - - pub fn invalid_parameter(s: impl Into) -> Self { - MetaErrorInner::InvalidParameter(s.into()).into() + matches!(self.inner(), MetaErrorInner::InvalidWorker(..)) } pub fn catalog_id_not_found(relation: &'static str, id: T) -> Self { MetaErrorInner::CatalogIdNotFound(relation, id.to_string()).into() } - pub fn fragment_not_found>(id: T) -> Self { - MetaErrorInner::FragmentNotFound(id.into()).into() - } - pub fn is_fragment_not_found(&self) -> bool { - matches!(self.inner.as_ref(), &MetaErrorInner::FragmentNotFound(..)) + matches!(self.inner(), MetaErrorInner::FragmentNotFound(..)) } pub fn catalog_duplicated>(relation: &'static str, name: T) -> Self { MetaErrorInner::Duplicated(relation, name.into()).into() } - - pub fn system_param(s: T) -> Self { - MetaErrorInner::SystemParams(s.to_string()).into() - } - - pub fn unavailable(s: String) -> Self { - MetaErrorInner::Unavailable(s).into() - } - - pub fn cancelled(s: String) -> Self { - MetaErrorInner::Cancelled(s).into() - } -} - -impl From for MetaError { - fn from(e: MetadataModelError) -> Self { - MetaErrorInner::MetadataModelError(e).into() - } -} - -impl From for MetaError { - fn from(e: HummockError) -> Self { - MetaErrorInner::HummockError(e).into() - } } impl From for MetaError { fn from(e: etcd_client::Error) -> Self { - MetaErrorInner::Election(e.to_string()).into() - } -} - -impl From for MetaError { - fn from(e: RpcError) -> Self { - MetaErrorInner::RpcError(e).into() - } -} - -impl From for MetaError { - fn from(e: SinkError) -> Self { - MetaErrorInner::Sink(e).into() - } -} - -impl From for MetaError { - fn from(a: anyhow::Error) -> Self { - MetaErrorInner::Internal(a).into() + MetaErrorInner::Election(e.into()).into() } } @@ -235,7 +143,7 @@ impl From for tonic::Status { fn from(err: MetaError) -> Self { use tonic::Code; - let code = match &*err.inner { + let code = match err.inner() { MetaErrorInner::PermissionDenied(_) => Code::PermissionDenied, MetaErrorInner::CatalogIdNotFound(_, _) => Code::NotFound, MetaErrorInner::Duplicated(_, _) => Code::AlreadyExists, diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index e9076a8a7c59..5557cb109e97 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -260,7 +260,7 @@ impl CatalogManager { if database_core.has_creation_in_database(database_id) { return Err(MetaError::permission_denied( - "Some relations are creating in the target database, try again later".into(), + "Some relations are creating in the target database, try again later", )); } @@ -515,12 +515,12 @@ impl CatalogManager { } if database_core.has_creation_in_schema(schema_id) { return Err(MetaError::permission_denied( - "Some relations are creating in the target schema, try again later".into(), + "Some relations are creating in the target schema, try again later", )); } if !database_core.schema_is_empty(schema_id) { return Err(MetaError::permission_denied( - "The schema is not empty, try dropping them first".into(), + "The schema is not empty, try dropping them first", )); } let mut schemas = BTreeMapTransaction::new(&mut database_core.schemas); diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 2ca44beb64ca..c1618b117cdb 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -184,11 +184,7 @@ impl ClusterManager { .await? as WorkerId; let transactional_id = match (core.available_transactional_ids.front(), r#type) { - (None, _) => { - return Err(MetaError::unavailable( - "no available reusable machine id".to_string(), - )) - } + (None, _) => return Err(MetaError::unavailable("no available reusable machine id")), // We only assign transactional id to compute node and frontend. (Some(id), WorkerType::ComputeNode | WorkerType::Frontend) => Some(*id), _ => None, @@ -336,10 +332,7 @@ impl ClusterManager { return Ok(()); } } - Err(MetaError::invalid_worker( - worker_id, - "worker not found".into(), - )) + Err(MetaError::invalid_worker(worker_id, "worker not found")) } pub fn start_heartbeat_checker( @@ -555,7 +548,7 @@ impl ClusterManagerCore { let transactional_id = match available_transactional_ids.pop_front() { None => { return Err(MetaError::unavailable( - "no available transactional id for worker".to_string(), + "no available transactional id for worker", )) } Some(id) => id, diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index eb24e0db0f34..4eb0b703bd68 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -58,7 +58,7 @@ impl SystemParamsManager { } else if let Some(persisted) = SystemParams::get(&meta_store).await? { merge_params(persisted, init_params) } else { - return Err(MetaError::system_param( + return Err(MetaError::system_params( "cluster is not newly created but no system parameters can be found", )); }; @@ -86,7 +86,7 @@ impl SystemParamsManager { let params = params_guard.deref_mut(); let mut mem_txn = VarTransaction::new(params); - set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_param)?; + set_system_param(mem_txn.deref_mut(), name, value).map_err(MetaError::system_params)?; let mut store_txn = Transaction::default(); mem_txn.apply_to_txn(&mut store_txn).await?; diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index b735db478e55..98360eee83cb 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -240,7 +240,7 @@ impl DdlController { async fn check_barrier_manager_status(&self) -> MetaResult<()> { if !self.barrier_manager.is_running().await { return Err(MetaError::unavailable( - "The cluster is starting or recovering".into(), + "The cluster is starting or recovering", )); } Ok(()) @@ -705,7 +705,7 @@ impl DdlController { ) -> MetaResult { if cluster_info.parallel_units.is_empty() { return Err(MetaError::unavailable( - "No available parallel units to schedule".to_string(), + "No available parallel units to schedule", )); } diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index b4f620f4ec4d..6f3edda0872c 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -346,7 +346,7 @@ impl GlobalStreamManager { tracing::warn!("failed to notify cancelled: {table_id}") }); self.creating_job_info.delete_job(table_id).await; - return Err(MetaError::cancelled("create".into())); + return Err(MetaError::cancelled("create")); } } CreatingState::Created => { diff --git a/src/object_store/Cargo.toml b/src/object_store/Cargo.toml index 9be44a74252d..6d45e742169a 100644 --- a/src/object_store/Cargo.toml +++ b/src/object_store/Cargo.toml @@ -32,6 +32,7 @@ risingwave_common = { workspace = true } rustls = "0.21.8" spin = "0.9" thiserror = "1" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = ["fs"] } tokio-retry = "0.3" tracing = "0.1" diff --git a/src/object_store/src/object/error.rs b/src/object_store/src/object/error.rs index 80cc85ae75f9..ad3f9f47807d 100644 --- a/src/object_store/src/object/error.rs +++ b/src/object_store/src/object/error.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; use std::io; use std::marker::{Send, Sync}; @@ -25,10 +24,9 @@ use risingwave_common::error::BoxedError; use thiserror::Error; use tokio::sync::oneshot::error::RecvError; -use crate::object::Error; - -#[derive(Error, Debug)] -enum ObjectErrorInner { +#[derive(Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] +#[thiserror_ext(newtype(name = ObjectError, backtrace, report_debug))] +pub enum ObjectErrorInner { #[error("s3 error: {}", DisplayErrorContext(&**.0))] S3(#[source] BoxedError), #[error("disk error: {msg}")] @@ -42,43 +40,18 @@ enum ObjectErrorInner { #[error(transparent)] Mem(#[from] crate::object::mem::Error), #[error("Internal error: {0}")] + #[construct(skip)] Internal(String), } -#[derive(Error)] -#[error("{inner}")] -pub struct ObjectError { - #[from] - inner: ObjectErrorInner, - - backtrace: Backtrace, -} - -impl std::fmt::Debug for ObjectError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.inner)?; - writeln!(f)?; - if let Some(backtrace) = std::error::request_ref::(&self.inner) { - write!(f, " backtrace of inner error:\n{}", backtrace)?; - } else { - write!(f, " backtrace of `ObjectError`:\n{}", self.backtrace)?; - } - Ok(()) - } -} - impl ObjectError { pub fn internal(msg: impl ToString) -> Self { ObjectErrorInner::Internal(msg.to_string()).into() } - pub fn disk(msg: String, err: io::Error) -> Self { - ObjectErrorInner::Disk { msg, inner: err }.into() - } - /// Tells whether the error indicates the target object is not found. pub fn is_object_not_found_error(&self) -> bool { - match &self.inner { + match self.inner() { ObjectErrorInner::S3(e) => { if let Some(aws_smithy_runtime_api::client::result::SdkError::ServiceError(err)) = e .downcast_ref:: for ObjectError { - fn from(e: opendal::Error) -> Self { - ObjectErrorInner::Opendal(e).into() - } -} - impl From for ObjectError { fn from(e: RecvError) -> Self { ObjectErrorInner::Internal(e.to_string()).into() @@ -140,10 +107,4 @@ impl From for ObjectError { } } -impl From for ObjectError { - fn from(e: Error) -> Self { - ObjectErrorInner::Mem(e).into() - } -} - pub type ObjectResult = std::result::Result; diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index e0a3ac62276d..a336719503b0 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -15,14 +15,16 @@ use std::collections::HashMap; use std::sync::Arc; +use anyhow::anyhow; use futures::future::try_join_all; use futures::stream::pending; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; +use risingwave_common::bail; use risingwave_common::catalog::ColumnId; use risingwave_common::error::ErrorCode::ConnectorError; -use risingwave_common::error::{internal_error, Result, RwError}; +use risingwave_common::error::{Result, RwError}; use risingwave_common::util::select_all; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; @@ -78,10 +80,7 @@ impl ConnectorSource { .iter() .find(|c| c.column_id == *id) .ok_or_else(|| { - internal_error(format!( - "Failed to find column id: {} in source: {:?}", - id, self - )) + anyhow!("Failed to find column id: {} in source: {:?}", id, self).into() }) .map(|col| col.clone()) }) @@ -94,7 +93,7 @@ impl ConnectorSource { ConnectorProperties::S3(prop) => { S3SplitEnumerator::new(*prop, Arc::new(SourceEnumeratorContext::default())).await? } - other => return Err(internal_error(format!("Unsupported source: {:?}", other))), + other => bail!("Unsupported source: {:?}", other), }; Ok(build_fs_list_stream( diff --git a/src/source/src/fs_connector_source.rs b/src/source/src/fs_connector_source.rs index 671f5b99c5ba..7ef5ba67ac70 100644 --- a/src/source/src/fs_connector_source.rs +++ b/src/source/src/fs_connector_source.rs @@ -17,11 +17,12 @@ use std::collections::HashMap; use std::sync::Arc; +use anyhow::anyhow; use futures::stream::pending; use futures::StreamExt; use risingwave_common::catalog::ColumnId; use risingwave_common::error::ErrorCode::ConnectorError; -use risingwave_common::error::{internal_error, Result}; +use risingwave_common::error::Result; use risingwave_connector::dispatch_source_prop; use risingwave_connector::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig}; use risingwave_connector::source::{ @@ -68,10 +69,7 @@ impl FsConnectorSource { .iter() .find(|c| c.column_id == *id) .ok_or_else(|| { - internal_error(format!( - "Failed to find column id: {} in source: {:?}", - id, self - )) + anyhow!("Failed to find column id: {} in source: {:?}", id, self).into() }) .map(|col| col.clone()) }) diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 19b6c5ba55e6..0c1045fac230 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -55,6 +55,7 @@ spin = "0.9" sync-point = { path = "../utils/sync-point" } tempfile = "3" thiserror = "1" +thiserror-ext = { workspace = true } # tikv-client = { git = "https://github.com/tikv/client-rust", rev = "5714b2", optional = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs", diff --git a/src/storage/src/error.rs b/src/storage/src/error.rs index 585898646d20..c14c4e6809e6 100644 --- a/src/storage/src/error.rs +++ b/src/storage/src/error.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; - use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::util::value_encoding::error::ValueEncodingError; use thiserror::Error; @@ -21,8 +19,9 @@ use thiserror::Error; use crate::hummock::HummockError; use crate::mem_table::MemTableError; -#[derive(Error)] -pub enum StorageError { +#[derive(Error, Debug, thiserror_ext::Box)] +#[thiserror_ext(newtype(name = StorageError, backtrace, report_debug))] +pub enum ErrorKind { #[error("Hummock error: {0}")] Hummock( #[backtrace] @@ -32,14 +31,14 @@ pub enum StorageError { #[error("Deserialize row error {0}.")] DeserializeRow( - #[source] + #[from] #[backtrace] ValueEncodingError, ), #[error("Serialize/deserialize error: {0}")] SerdeError( - #[source] + #[from] #[backtrace] memcomparable::Error, ), @@ -61,36 +60,8 @@ pub enum StorageError { pub type StorageResult = std::result::Result; -impl From for StorageError { - fn from(error: ValueEncodingError) -> Self { - StorageError::DeserializeRow(error) - } -} - -impl From for StorageError { - fn from(m: memcomparable::Error) -> Self { - StorageError::SerdeError(m) - } -} - impl From for RwError { fn from(s: StorageError) -> Self { ErrorCode::StorageError(Box::new(s)).into() } } - -impl std::fmt::Debug for StorageError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::error::Error; - - write!(f, "{}", self)?; - writeln!(f)?; - if let Some(backtrace) = std::error::request_ref::(&self as &dyn Error) { - // Since we forward all backtraces from source, `self.backtrace()` is the backtrace of - // inner error. - write!(f, " backtrace of inner error:\n{}", backtrace)?; - } - - Ok(()) - } -} diff --git a/src/storage/src/hummock/error.rs b/src/storage/src/hummock/error.rs index 6f1445e5729d..7bfedd8eb8be 100644 --- a/src/storage/src/hummock/error.rs +++ b/src/storage/src/hummock/error.rs @@ -12,14 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; - use risingwave_object_store::object::ObjectError; use thiserror::Error; use tokio::sync::oneshot::error::RecvError; -#[derive(Error, Debug)] -enum HummockErrorInner { +// TODO(error-handling): should prefer use error types than strings. +#[derive(Error, Debug, thiserror_ext::Box)] +#[thiserror_ext(newtype(name = HummockError, backtrace, report_debug))] +pub enum HummockErrorInner { #[error("Magic number mismatch: expected {expected}, found: {found}.")] MagicMismatch { expected: u32, found: u32 }, #[error("Invalid format version: {0}.")] @@ -32,14 +32,11 @@ enum HummockErrorInner { EncodeError(String), #[error("Decode error {0}.")] DecodeError(String), - #[expect(dead_code)] - #[error("Mock error {0}.")] - MockError(String), #[error("ObjectStore failed with IO error {0}.")] ObjectIoError( #[from] #[backtrace] - Box, + ObjectError, ), #[error("Meta error {0}.")] MetaError(String), @@ -67,19 +64,7 @@ enum HummockErrorInner { Other(String), } -#[derive(Error)] -#[error("{inner}")] -pub struct HummockError { - #[from] - inner: HummockErrorInner, - backtrace: Backtrace, -} - impl HummockError { - pub fn object_io_error(error: ObjectError) -> HummockError { - HummockErrorInner::ObjectIoError(error.into()).into() - } - pub fn invalid_format_version(v: u32) -> HummockError { HummockErrorInner::InvalidFormatVersion(v).into() } @@ -125,15 +110,15 @@ impl HummockError { } pub fn is_expired_epoch(&self) -> bool { - matches!(self.inner, HummockErrorInner::ExpiredEpoch { .. }) + matches!(self.inner(), HummockErrorInner::ExpiredEpoch { .. }) } pub fn is_meta_error(&self) -> bool { - matches!(self.inner, HummockErrorInner::MetaError(..)) + matches!(self.inner(), HummockErrorInner::MetaError(..)) } pub fn is_object_error(&self) -> bool { - matches!(self.inner, HummockErrorInner::ObjectIoError { .. }) + matches!(self.inner(), HummockErrorInner::ObjectIoError { .. }) } pub fn compaction_executor(error: impl ToString) -> HummockError { @@ -171,31 +156,10 @@ impl From for HummockError { } } -impl From for HummockError { - fn from(error: ObjectError) -> Self { - HummockErrorInner::ObjectIoError(error.into()).into() - } -} - impl From for HummockError { fn from(error: RecvError) -> Self { ObjectError::from(error).into() } } -impl std::fmt::Debug for HummockError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::error::Error; - - write!(f, "{}", self.inner)?; - writeln!(f)?; - if let Some(backtrace) = std::error::request_ref::(&self.inner as &dyn Error) { - write!(f, " backtrace of inner error:\n{}", backtrace)?; - } else { - write!(f, " backtrace of `HummockError`:\n{}", self.backtrace)?; - } - Ok(()) - } -} - pub type HummockResult = std::result::Result; diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 9a623189aa02..3f32f34e1608 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -264,7 +264,7 @@ impl SstableStore { self.store .upload(&data_path, data) .await - .map_err(HummockError::object_io_error) + .map_err(Into::into) } pub async fn preload_blocks( @@ -483,10 +483,7 @@ impl SstableStore { } let now = Instant::now(); - let buf = store - .read(&meta_path, range) - .await - .map_err(HummockError::object_io_error)?; + let buf = store.read(&meta_path, range).await?; let meta = SstableMeta::decode(&buf[..])?; let sst = Sstable::new(object_id, meta); @@ -576,10 +573,7 @@ impl SstableStore { let range = start_pos..end_pos; Ok(BlockStream::new( - store - .streaming_read(&data_path, range) - .await - .map_err(HummockError::object_io_error)?, + store.streaming_read(&data_path, range).await?, block_index, metas, )) @@ -848,7 +842,7 @@ impl SstableWriter for StreamingUploadWriter { self.object_uploader .write_bytes(block_data) .await - .map_err(HummockError::object_io_error) + .map_err(Into::into) } async fn write_block_bytes(&mut self, block: Bytes, meta: &BlockMeta) -> HummockResult<()> { @@ -860,16 +854,13 @@ impl SstableWriter for StreamingUploadWriter { self.object_uploader .write_bytes(block) .await - .map_err(HummockError::object_io_error) + .map_err(Into::into) } async fn finish(mut self, meta: SstableMeta) -> HummockResult { let meta_data = Bytes::from(meta.encode_to_bytes()); - self.object_uploader - .write_bytes(meta_data) - .await - .map_err(HummockError::object_io_error)?; + self.object_uploader.write_bytes(meta_data).await?; let join_handle = tokio::spawn(async move { let uploader_memory_usage = self.object_uploader.get_memory_usage(); let _tracker = self.tracker.map(|mut t| { @@ -883,10 +874,7 @@ impl SstableWriter for StreamingUploadWriter { assert!(!meta.block_metas.is_empty() || !meta.monotonic_tombstone_events.is_empty()); // Upload data to object store. - self.object_uploader - .finish() - .await - .map_err(HummockError::object_io_error)?; + self.object_uploader.finish().await?; // Add meta cache. self.sstable_store.insert_meta_cache(self.object_id, meta); @@ -1124,9 +1112,10 @@ impl BlockStream { } let block_meta = &self.block_metas[self.block_idx]; - fail_point!("stream_read_err", |_| Err(HummockError::object_io_error( - ObjectError::internal("stream read error") - ))); + fail_point!("stream_read_err", |_| Err(ObjectError::internal( + "stream read error" + ) + .into())); let end = self.buff_offset + block_meta.len as usize; let data = if end > self.buf.len() { let (current_block, buf) = self @@ -1216,9 +1205,10 @@ impl BatchBlockStream { } let block_meta = &self.block_metas[self.block_idx]; - fail_point!("stream_batch_read_err", |_| Err( - HummockError::object_io_error(ObjectError::internal("stream read error")) - )); + fail_point!("stream_batch_read_err", |_| Err(ObjectError::internal( + "stream read error" + ) + .into())); if let Some(block) = self.blocks.pop_front() { self.block_idx += 1; return Ok(Some(block)); diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 6d81b4af85e7..c24729dfbdb2 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -57,6 +57,7 @@ serde_json = "1" smallvec = "1" static_assertions = "1" thiserror = "1" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/stream/clippy.toml b/src/stream/clippy.toml index e95b1a4c9ba7..dd26a976e1ab 100644 --- a/src/stream/clippy.toml +++ b/src/stream/clippy.toml @@ -5,18 +5,12 @@ disallowed-methods = [ { path = "risingwave_expr::expr::build_func", reason = "Expressions in streaming must be in non-strict mode. Please use `build_func_non_strict` instead." }, { path = "risingwave_expr::expr::Expression::eval", reason = "Please use `NonStrictExpression::eval_infallible` instead." }, { path = "risingwave_expr::expr::Expression::eval_row", reason = "Please use `NonStrictExpression::eval_row_infallible` instead." }, - - { path = "risingwave_common::error::internal_err", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::internal_error", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::tonic_err", reason = "Please use per-crate error type instead." }, ] disallowed-types = [ { path = "risingwave_common::error::ErrorCode", reason = "Please use per-crate error type instead." }, { path = "risingwave_common::error::RwError", reason = "Please use per-crate error type instead." }, { path = "risingwave_common::error::Result", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::ToRwResult", reason = "Please use per-crate error type instead." }, - { path = "risingwave_common::error::ToErrorStr", reason = "Please use per-crate error type instead." }, ] doc-valid-idents = [ diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index d72008e64aa3..bfdd50f883fd 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -39,7 +39,7 @@ use risingwave_hummock_sdk::key::{ start_bound_of_excluded_prefix, TableKey, }; use risingwave_pb::catalog::Table; -use risingwave_storage::error::{StorageError, StorageResult}; +use risingwave_storage::error::{ErrorKind, StorageError, StorageResult}; use risingwave_storage::hummock::CachePolicy; use risingwave_storage::mem_table::MemTableError; use risingwave_storage::row_serde::row_serde_util::{ @@ -689,8 +689,8 @@ where SD: ValueRowSerde, { fn handle_mem_table_error(&self, e: StorageError) { - let e = match e { - StorageError::MemTable(e) => e, + let e = match e.into_inner() { + ErrorKind::MemTable(e) => e, _ => unreachable!("should only get memtable error"), }; match *e { diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index ee0c969801c5..972081918550 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; - use risingwave_common::array::ArrayError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -28,22 +26,9 @@ use crate::executor::StreamExecutorError; pub type StreamResult = std::result::Result; /// The error type for streaming tasks. -#[derive(thiserror::Error)] -#[error("{inner}")] -pub struct StreamError { - inner: Box, -} - -#[derive(thiserror::Error, Debug)] -#[error("{kind}")] -struct Inner { - #[from] - kind: ErrorKind, - backtrace: Backtrace, -} - -#[derive(thiserror::Error, Debug)] -enum ErrorKind { +#[derive(thiserror::Error, Debug, thiserror_ext::Box)] +#[thiserror_ext(newtype(name = StreamError, backtrace, report_debug))] +pub enum ErrorKind { #[error("Storage error: {0}")] Storage( #[backtrace] @@ -87,65 +72,6 @@ enum ErrorKind { ), } -impl std::fmt::Debug for StreamError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::error::Error; - - write!(f, "{}", self.inner.kind)?; - writeln!(f)?; - if let Some(backtrace) = - std::error::request_ref::(&self.inner.kind as &dyn Error) - { - write!(f, " backtrace of inner error:\n{}", backtrace)?; - } else { - write!(f, " backtrace of `StreamError`:\n{}", self.inner.backtrace)?; - } - Ok(()) - } -} - -impl From for StreamError { - fn from(kind: ErrorKind) -> Self { - Self { - inner: Box::new(kind.into()), - } - } -} - -// Storage transaction error; ... -impl From for StreamError { - fn from(s: StorageError) -> Self { - ErrorKind::Storage(s).into() - } -} - -// Build expression error; ... -impl From for StreamError { - fn from(error: ExprError) -> Self { - ErrorKind::Expression(error).into() - } -} - -// Chunk compaction error; ProtoBuf ser/de error; ... -impl From for StreamError { - fn from(error: ArrayError) -> Self { - ErrorKind::Array(error).into() - } -} - -// Executor runtime error; ... -impl From for StreamError { - fn from(error: StreamExecutorError) -> Self { - ErrorKind::Executor(error).into() - } -} - -impl From for StreamError { - fn from(value: SinkError) -> Self { - ErrorKind::Sink(value).into() - } -} - impl From for StreamError { fn from(err: PbFieldNotFound) -> Self { Self::from(anyhow::anyhow!( @@ -161,13 +87,6 @@ impl From for StreamError { } } -// Internal error. -impl From for StreamError { - fn from(a: anyhow::Error) -> Self { - ErrorKind::Internal(a).into() - } -} - impl From for tonic::Status { fn from(error: StreamError) -> Self { error.to_status(tonic::Code::Internal, "stream") diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index b4efcac1702f..283aef03f66d 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; - use risingwave_common::array::ArrayError; -use risingwave_common::error::{BoxedError, Error, NotImplemented}; +use risingwave_common::error::{BoxedError, NotImplemented}; use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -30,22 +28,9 @@ use super::Barrier; pub type StreamExecutorResult = std::result::Result; /// The error type for streaming executors. -#[derive(thiserror::Error)] -#[error("{inner}")] -pub struct StreamExecutorError { - inner: Box, -} - -#[derive(thiserror::Error, Debug)] -#[error("{kind}")] -struct Inner { - #[from] - kind: ErrorKind, - backtrace: Backtrace, -} - -#[derive(thiserror::Error, Debug)] -enum ErrorKind { +#[derive(thiserror::Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] +#[thiserror_ext(newtype(name = StreamExecutorError, backtrace, report_debug))] +pub enum ErrorKind { #[error("Storage error: {0}")] Storage( #[backtrace] @@ -120,84 +105,6 @@ enum ErrorKind { ), } -impl StreamExecutorError { - fn serde_error(error: impl Error) -> Self { - ErrorKind::SerdeError(error.into()).into() - } - - pub fn channel_closed(name: impl Into) -> Self { - ErrorKind::ChannelClosed(name.into()).into() - } - - pub fn align_barrier(expected: Barrier, received: Barrier) -> Self { - ErrorKind::AlignBarrier(expected.into(), received.into()).into() - } - - pub fn connector_error(error: impl Error) -> Self { - ErrorKind::ConnectorError(error.into()).into() - } - - pub fn dml_error(error: impl Error) -> Self { - ErrorKind::DmlError(error.into()).into() - } -} - -impl std::fmt::Debug for StreamExecutorError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::error::Error; - - write!(f, "{}", self.inner.kind)?; - writeln!(f)?; - if let Some(backtrace) = - std::error::request_ref::(&self.inner.kind as &dyn Error) - { - write!(f, " backtrace of inner error:\n{}", backtrace)?; - } else { - write!( - f, - " backtrace of `StreamExecutorError`:\n{}", - self.inner.backtrace - )?; - } - Ok(()) - } -} - -impl From for StreamExecutorError { - fn from(kind: ErrorKind) -> Self { - Self { - inner: Box::new(kind.into()), - } - } -} - -/// Storage error. -impl From for StreamExecutorError { - fn from(s: StorageError) -> Self { - ErrorKind::Storage(s).into() - } -} - -/// Chunk operation error. -impl From for StreamExecutorError { - fn from(e: ArrayError) -> Self { - ErrorKind::ArrayError(e).into() - } -} - -impl From for StreamExecutorError { - fn from(e: ExprError) -> Self { - ErrorKind::ExprError(e).into() - } -} - -/// Internal error. -impl From for StreamExecutorError { - fn from(a: anyhow::Error) -> Self { - ErrorKind::Internal(a).into() - } -} - /// Serialize/deserialize error. impl From for StreamExecutorError { fn from(m: memcomparable::Error) -> Self { @@ -210,12 +117,6 @@ impl From for StreamExecutorError { } } -impl From for StreamExecutorError { - fn from(e: RpcError) -> Self { - ErrorKind::RpcError(e).into() - } -} - /// Connector error. impl From for StreamExecutorError { fn from(s: ConnectorError) -> Self { @@ -223,12 +124,6 @@ impl From for StreamExecutorError { } } -impl From for StreamExecutorError { - fn from(e: SinkError) -> Self { - ErrorKind::SinkError(e).into() - } -} - impl From for StreamExecutorError { fn from(err: PbFieldNotFound) -> Self { Self::from(anyhow::anyhow!( diff --git a/src/udf/src/error.rs b/src/udf/src/error.rs index ab6cc4af5349..efdb0e9ec6a6 100644 --- a/src/udf/src/error.rs +++ b/src/udf/src/error.rs @@ -21,7 +21,7 @@ pub type Result = std::result::Result; /// The error type for UDF operations. #[derive(Error, Debug, Box, Construct)] -#[thiserror_ext(type = Error)] +#[thiserror_ext(newtype(name = Error))] pub enum ErrorInner { #[error("failed to connect to UDF service: {0}")] Connect(#[from] tonic::transport::Error),