Skip to content

Commit

Permalink
use thiserror ext for stream error
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 28, 2023
1 parent 92a624a commit b16bbcc
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 109 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ serde_json = "1"
smallvec = "1"
static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
Expand Down
113 changes: 4 additions & 109 deletions src/stream/src/executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::backtrace::Backtrace;

use risingwave_common::array::ArrayError;
use risingwave_common::error::{BoxedError, Error, NotImplemented};
use risingwave_common::error::{BoxedError, NotImplemented};
use risingwave_common::util::value_encoding::error::ValueEncodingError;
use risingwave_connector::error::ConnectorError;
use risingwave_connector::sink::SinkError;
Expand All @@ -30,22 +28,9 @@ use super::Barrier;
pub type StreamExecutorResult<T> = std::result::Result<T, StreamExecutorError>;

/// The error type for streaming executors.
#[derive(thiserror::Error)]
#[error("{inner}")]
pub struct StreamExecutorError {
inner: Box<Inner>,
}

#[derive(thiserror::Error, Debug)]
#[error("{kind}")]
struct Inner {
#[from]
kind: ErrorKind,
backtrace: Backtrace,
}

#[derive(thiserror::Error, Debug)]
enum ErrorKind {
#[derive(thiserror::Error, Debug, thiserror_ext::Box, thiserror_ext::Construct)]
#[thiserror_ext(type = StreamExecutorError, backtrace)]
pub enum ErrorKind {
#[error("Storage error: {0}")]
Storage(
#[backtrace]
Expand Down Expand Up @@ -120,84 +105,6 @@ enum ErrorKind {
),
}

impl StreamExecutorError {
fn serde_error(error: impl Error) -> Self {
ErrorKind::SerdeError(error.into()).into()
}

pub fn channel_closed(name: impl Into<String>) -> Self {
ErrorKind::ChannelClosed(name.into()).into()
}

pub fn align_barrier(expected: Barrier, received: Barrier) -> Self {
ErrorKind::AlignBarrier(expected.into(), received.into()).into()
}

pub fn connector_error(error: impl Error) -> Self {
ErrorKind::ConnectorError(error.into()).into()
}

pub fn dml_error(error: impl Error) -> Self {
ErrorKind::DmlError(error.into()).into()
}
}

impl std::fmt::Debug for StreamExecutorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use std::error::Error;

write!(f, "{}", self.inner.kind)?;
writeln!(f)?;
if let Some(backtrace) =
std::error::request_ref::<Backtrace>(&self.inner.kind as &dyn Error)
{
write!(f, " backtrace of inner error:\n{}", backtrace)?;
} else {
write!(
f,
" backtrace of `StreamExecutorError`:\n{}",
self.inner.backtrace
)?;
}
Ok(())
}
}

impl From<ErrorKind> for StreamExecutorError {
fn from(kind: ErrorKind) -> Self {
Self {
inner: Box::new(kind.into()),
}
}
}

/// Storage error.
impl From<StorageError> for StreamExecutorError {
fn from(s: StorageError) -> Self {
ErrorKind::Storage(s).into()
}
}

/// Chunk operation error.
impl From<ArrayError> for StreamExecutorError {
fn from(e: ArrayError) -> Self {
ErrorKind::ArrayError(e).into()
}
}

impl From<ExprError> for StreamExecutorError {
fn from(e: ExprError) -> Self {
ErrorKind::ExprError(e).into()
}
}

/// Internal error.
impl From<anyhow::Error> for StreamExecutorError {
fn from(a: anyhow::Error) -> Self {
ErrorKind::Internal(a).into()
}
}

/// Serialize/deserialize error.
impl From<memcomparable::Error> for StreamExecutorError {
fn from(m: memcomparable::Error) -> Self {
Expand All @@ -210,25 +117,13 @@ impl From<ValueEncodingError> for StreamExecutorError {
}
}

impl From<RpcError> for StreamExecutorError {
fn from(e: RpcError) -> Self {
ErrorKind::RpcError(e).into()
}
}

/// Connector error.
impl From<ConnectorError> for StreamExecutorError {
fn from(s: ConnectorError) -> Self {
Self::connector_error(s)
}
}

impl From<SinkError> for StreamExecutorError {
fn from(e: SinkError) -> Self {
ErrorKind::SinkError(e).into()
}
}

impl From<PbFieldNotFound> for StreamExecutorError {
fn from(err: PbFieldNotFound) -> Self {
Self::from(anyhow::anyhow!(
Expand Down

0 comments on commit b16bbcc

Please sign in to comment.