From b16bbcc90c82bbd0d90f7315624085cb504fa24c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 28 Nov 2023 17:44:24 +0800 Subject: [PATCH] use thiserror ext for stream error Signed-off-by: Bugen Zhao --- Cargo.lock | 1 + src/stream/Cargo.toml | 1 + src/stream/src/executor/error.rs | 113 ++----------------------------- 3 files changed, 6 insertions(+), 109 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 624b4beb9f42..856af81bf896 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8639,6 +8639,7 @@ dependencies = [ "static_assertions", "task_stats_alloc", "thiserror", + "thiserror-ext", "tokio-metrics", "tokio-stream", "tracing", diff --git a/src/stream/Cargo.toml b/src/stream/Cargo.toml index 6d81b4af85e7..c24729dfbdb2 100644 --- a/src/stream/Cargo.toml +++ b/src/stream/Cargo.toml @@ -57,6 +57,7 @@ serde_json = "1" smallvec = "1" static_assertions = "1" thiserror = "1" +thiserror-ext = { workspace = true } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index b4efcac1702f..6b2f484564ac 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::backtrace::Backtrace; - use risingwave_common::array::ArrayError; -use risingwave_common::error::{BoxedError, Error, NotImplemented}; +use risingwave_common::error::{BoxedError, NotImplemented}; use risingwave_common::util::value_encoding::error::ValueEncodingError; use risingwave_connector::error::ConnectorError; use risingwave_connector::sink::SinkError; @@ -30,22 +28,9 @@ use super::Barrier; pub type StreamExecutorResult = std::result::Result; /// The error type for streaming executors. -#[derive(thiserror::Error)] -#[error("{inner}")] -pub struct StreamExecutorError { - inner: Box, -} - -#[derive(thiserror::Error, Debug)] -#[error("{kind}")] -struct Inner { - #[from] - kind: ErrorKind, - backtrace: Backtrace, -} - -#[derive(thiserror::Error, Debug)] -enum ErrorKind { +#[derive(thiserror::Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)] +#[thiserror_ext(type = StreamExecutorError, backtrace)] +pub enum ErrorKind { #[error("Storage error: {0}")] Storage( #[backtrace] @@ -120,84 +105,6 @@ enum ErrorKind { ), } -impl StreamExecutorError { - fn serde_error(error: impl Error) -> Self { - ErrorKind::SerdeError(error.into()).into() - } - - pub fn channel_closed(name: impl Into) -> Self { - ErrorKind::ChannelClosed(name.into()).into() - } - - pub fn align_barrier(expected: Barrier, received: Barrier) -> Self { - ErrorKind::AlignBarrier(expected.into(), received.into()).into() - } - - pub fn connector_error(error: impl Error) -> Self { - ErrorKind::ConnectorError(error.into()).into() - } - - pub fn dml_error(error: impl Error) -> Self { - ErrorKind::DmlError(error.into()).into() - } -} - -impl std::fmt::Debug for StreamExecutorError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use std::error::Error; - - write!(f, "{}", self.inner.kind)?; - writeln!(f)?; - if let Some(backtrace) = - std::error::request_ref::(&self.inner.kind as &dyn Error) - { - write!(f, " backtrace of inner error:\n{}", backtrace)?; - } else { - write!( - f, - " backtrace of `StreamExecutorError`:\n{}", - self.inner.backtrace - )?; - } - Ok(()) - } -} - -impl From for StreamExecutorError { - fn from(kind: ErrorKind) -> Self { - Self { - inner: Box::new(kind.into()), - } - } -} - -/// Storage error. -impl From for StreamExecutorError { - fn from(s: StorageError) -> Self { - ErrorKind::Storage(s).into() - } -} - -/// Chunk operation error. -impl From for StreamExecutorError { - fn from(e: ArrayError) -> Self { - ErrorKind::ArrayError(e).into() - } -} - -impl From for StreamExecutorError { - fn from(e: ExprError) -> Self { - ErrorKind::ExprError(e).into() - } -} - -/// Internal error. -impl From for StreamExecutorError { - fn from(a: anyhow::Error) -> Self { - ErrorKind::Internal(a).into() - } -} - /// Serialize/deserialize error. impl From for StreamExecutorError { fn from(m: memcomparable::Error) -> Self { @@ -210,12 +117,6 @@ impl From for StreamExecutorError { } } -impl From for StreamExecutorError { - fn from(e: RpcError) -> Self { - ErrorKind::RpcError(e).into() - } -} - /// Connector error. impl From for StreamExecutorError { fn from(s: ConnectorError) -> Self { @@ -223,12 +124,6 @@ impl From for StreamExecutorError { } } -impl From for StreamExecutorError { - fn from(e: SinkError) -> Self { - ErrorKind::SinkError(e).into() - } -} - impl From for StreamExecutorError { fn from(err: PbFieldNotFound) -> Self { Self::from(anyhow::anyhow!(