Skip to content

Commit

Permalink
refactor: correctly maintain the source chain of error types (#13248)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Nov 7, 2023
1 parent 947c4d8 commit e3c8649
Show file tree
Hide file tree
Showing 35 changed files with 413 additions and 193 deletions.
12 changes: 10 additions & 2 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@ pub enum BatchError {
Cast(&'static str, &'static str),

#[error("Array error: {0}")]
Array(#[from] ArrayError),
Array(
#[from]
#[backtrace]
ArrayError,
),

#[error("Failed to send result to channel")]
SenderError,

#[error(transparent)]
Internal(#[from] anyhow::Error),
Internal(
#[from]
#[backtrace]
anyhow::Error,
),

#[error("Prometheus error: {0}")]
Prometheus(#[from] prometheus::Error),
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#![feature(assert_matches)]
#![feature(lazy_cell)]
#![feature(array_methods)]
#![feature(error_generic_member_access)]

mod error;
pub mod exchange_source;
Expand Down
6 changes: 5 additions & 1 deletion src/common/src/array/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ pub enum ArrayError {
Io(#[from] std::io::Error),

#[error(transparent)]
Internal(#[from] anyhow::Error),
Internal(
#[from]
#[backtrace]
anyhow::Error,
),

#[error("Convert from arrow error: {0}")]
FromArrow(String),
Expand Down
129 changes: 77 additions & 52 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,53 +76,92 @@ impl Display for TrackingIssue {
pub enum ErrorCode {
#[error("internal error: {0}")]
InternalError(String),
// TODO: unify with the above
#[error(transparent)]
InternalErrorAnyhow(
#[from]
#[backtrace]
anyhow::Error,
),
#[error("connector error: {0}")]
ConnectorError(BoxedError),
ConnectorError(
#[source]
#[backtrace]
BoxedError,
),
#[error("Feature is not yet implemented: {0}\n{1}")]
NotImplemented(String, TrackingIssue),
// Tips: Use this only if it's intended to reject the query
#[error("Not supported: {0}\nHINT: {1}")]
NotSupported(String, String),
#[error(transparent)]
IoError(IoError),
IoError(#[from] IoError),
#[error("Storage error: {0}")]
StorageError(
#[backtrace]
#[source]
BoxedError,
),
#[error("Expr error: {0}")]
ExprError(BoxedError),
ExprError(
#[source]
#[backtrace]
BoxedError,
),
#[error("BatchError: {0}")]
BatchError(BoxedError),
BatchError(
#[source]
#[backtrace]
BoxedError,
),
#[error("Array error: {0}")]
ArrayError(ArrayError),
ArrayError(
#[from]
#[backtrace]
ArrayError,
),
#[error("Stream error: {0}")]
StreamError(
#[backtrace]
#[source]
BoxedError,
),
#[error("RPC error: {0}")]
RpcError(BoxedError),
RpcError(
#[source]
#[backtrace]
BoxedError,
),
#[error("Bind error: {0}")]
BindError(String),
#[error("Catalog error: {0}")]
CatalogError(BoxedError),
CatalogError(
#[source]
#[backtrace]
BoxedError,
),
#[error("Protocol error: {0}")]
ProtocolError(String),
#[error("Scheduler error: {0}")]
SchedulerError(BoxedError),
SchedulerError(
#[source]
#[backtrace]
BoxedError,
),
#[error("Task not found")]
TaskNotFound,
#[error("Item not found: {0}")]
ItemNotFound(String),
#[error("Invalid input syntax: {0}")]
InvalidInputSyntax(String),
#[error("Can not compare in memory: {0}")]
MemComparableError(MemComparableError),
MemComparableError(#[from] MemComparableError),
#[error("Error while de/se values: {0}")]
ValueEncodingError(ValueEncodingError),
ValueEncodingError(
#[from]
#[backtrace]
ValueEncodingError,
),
#[error("Invalid value [{config_value:?}] for [{config_entry:?}]")]
InvalidConfigValue {
config_entry: String,
Expand All @@ -131,7 +170,11 @@ pub enum ErrorCode {
#[error("Invalid Parameter Value: {0}")]
InvalidParameterValue(String),
#[error("Sink error: {0}")]
SinkError(BoxedError),
SinkError(
#[source]
#[backtrace]
BoxedError,
),
#[error("Permission denied: {0}")]
PermissionDenied(String),
#[error("unrecognized configuration parameter \"{0}\"")]
Expand Down Expand Up @@ -211,7 +254,7 @@ impl From<std::net::AddrParseError> for RwError {

impl From<anyhow::Error> for RwError {
fn from(e: anyhow::Error) -> Self {
ErrorCode::InternalError(e.to_error_str()).into()
ErrorCode::InternalErrorAnyhow(e).into()
}
}

Expand Down Expand Up @@ -239,21 +282,6 @@ impl Debug for RwError {
}
}

impl PartialEq for RwError {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}

impl PartialEq for ErrorCode {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(ErrorCode::InternalError(msg), ErrorCode::InternalError(msg2)) => msg == msg2,
(_, _) => false,
}
}
}

impl From<PbFieldNotFound> for RwError {
fn from(err: PbFieldNotFound) -> Self {
ErrorCode::InternalError(format!(
Expand Down Expand Up @@ -328,13 +356,6 @@ impl<T> ToErrorStr for tokio::sync::mpsc::error::SendError<T> {
}
}

impl ToErrorStr for anyhow::Error {
fn to_error_str(self) -> String {
// Note: use alternate Display to print the source chain.
format!("{:#}", self)
}
}

/// Util macro for generating error when condition check failed.
///
/// # Case 1: Expression only.
Expand Down Expand Up @@ -476,8 +497,10 @@ mod tests {
use std::convert::Into;
use std::result::Result::Err;

use anyhow::anyhow;

use super::*;
use crate::error::ErrorCode::InternalError;
use crate::error::ErrorCode::InternalErrorAnyhow;

#[test]
fn test_display_internal_error() {
Expand All @@ -493,38 +516,42 @@ mod tests {
let err_msg = "a < 0";
let error = (|| {
ensure!(a < 0);
Ok(())
})();
Ok::<_, RwError>(())
})()
.unwrap_err();

assert_eq!(
Err(RwError::from(InternalError(err_msg.to_string()))),
error
RwError::from(InternalErrorAnyhow(anyhow!(err_msg))).to_string(),
error.to_string(),
);
}

{
let err_msg = "error msg without args";
let error = (|| {
ensure!(a < 0, "error msg without args");
Ok(())
})();
Ok::<_, RwError>(())
})()
.unwrap_err();
assert_eq!(
Err(RwError::from(InternalError(err_msg.to_string()))),
error
RwError::from(InternalErrorAnyhow(anyhow!(err_msg))).to_string(),
error.to_string()
);
}

{
let error = (|| {
ensure!(a < 0, "error msg with args: {}", "xx");
Ok(())
})();
Ok::<_, RwError>(())
})()
.unwrap_err();
assert_eq!(
Err(RwError::from(InternalError(format!(
RwError::from(InternalErrorAnyhow(anyhow!(
"error msg with args: {}",
"xx"
)))),
error
)))
.to_string(),
error.to_string()
);
}
}
Expand All @@ -538,10 +565,7 @@ mod tests {
Ok(())
}
let err = ensure_a_equals_b().unwrap_err();
assert_eq!(
err.to_string(),
"internal error: a == b assertion failed (a is 1, b is 2)"
);
assert_eq!(err.to_string(), "a == b assertion failed (a is 1, b is 2)");
}

#[test]
Expand All @@ -560,6 +584,7 @@ mod tests {
}

#[test]
#[ignore] // it's not a good practice to include error source in `Display`, see #13248
fn test_internal_sources() {
use anyhow::Context;

Expand Down
12 changes: 10 additions & 2 deletions src/common/src/util/value_encoding/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,17 @@ pub enum ValueEncodingError {
#[error("Invalid jsonb encoding")]
InvalidJsonbEncoding,
#[error("Invalid struct encoding: {0}")]
InvalidStructEncoding(crate::array::ArrayError),
InvalidStructEncoding(
#[source]
#[backtrace]
crate::array::ArrayError,
),
#[error("Invalid list encoding: {0}")]
InvalidListEncoding(crate::array::ArrayError),
InvalidListEncoding(
#[source]
#[backtrace]
crate::array::ArrayError,
),
#[error("Invalid flag: {0}")]
InvalidFlag(u8),
}
24 changes: 20 additions & 4 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,35 @@ pub enum ConnectorError {
Kafka(#[from] rdkafka::error::KafkaError),

#[error("Config error: {0}")]
Config(anyhow::Error),
Config(
#[source]
#[backtrace]
anyhow::Error,
),

#[error("Connection error: {0}")]
Connection(anyhow::Error),
Connection(
#[source]
#[backtrace]
anyhow::Error,
),

#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),

#[error("Pulsar error: {0}")]
Pulsar(anyhow::Error),
Pulsar(
#[source]
#[backtrace]
anyhow::Error,
),

#[error(transparent)]
Internal(#[from] anyhow::Error),
Internal(
#[from]
#[backtrace]
anyhow::Error,
),
}

impl From<ConnectorError> for RwError {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#![feature(if_let_guard)]
#![feature(iterator_try_collect)]
#![feature(try_blocks)]
#![feature(error_generic_member_access)]

use std::time::Duration;

Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/parser/unified/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,9 @@ pub enum AccessError {
#[error("Unsupported data type `{ty}`")]
UnsupportedType { ty: String },
#[error(transparent)]
Other(#[from] anyhow::Error),
Other(
#[from]
#[backtrace]
anyhow::Error,
),
}
Loading

0 comments on commit e3c8649

Please sign in to comment.