Skip to content

Commit

Permalink
fix(streaming): assign lower score to BarrierSend error
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed May 10, 2024
1 parent c52cb9c commit 90eb6f1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub enum ErrorKind {
},

#[error(transparent)]
Internal(
Uncategorized(
#[from]
#[backtrace]
anyhow::Error,
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub enum ErrorKind {
NotImplemented(#[from] NotImplemented),

#[error(transparent)]
Internal(
Uncategorized(
#[from]
#[backtrace]
anyhow::Error,
Expand Down Expand Up @@ -150,7 +150,7 @@ impl From<PbFieldNotFound> for StreamExecutorError {

impl From<String> for StreamExecutorError {
fn from(s: String) -> Self {
ErrorKind::Internal(anyhow::anyhow!(s)).into()
ErrorKind::Uncategorized(anyhow::anyhow!(s)).into()
}
}

Expand Down
35 changes: 30 additions & 5 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,12 +888,28 @@ impl LocalBarrierManager {
pub fn try_find_root_actor_failure<'a>(
actor_errors: impl IntoIterator<Item = &'a StreamError>,
) -> Option<StreamError> {
// Explicitly list all error kinds here to notice developers to update this function when
// there are changes in error kinds.

fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
use crate::executor::error::ErrorKind;
match e.inner() {
ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 0,
ErrorKind::Internal(_) => 1,
_ => 999,
// `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
// and not the root cause.
ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,

// Normal errors.
ErrorKind::Uncategorized(_)
| ErrorKind::Storage(_)
| ErrorKind::ArrayError(_)
| ErrorKind::ExprError(_)
| ErrorKind::SerdeError(_)
| ErrorKind::SinkError(_)
| ErrorKind::RpcError(_)
| ErrorKind::AlignBarrier(_, _)
| ErrorKind::ConnectorError(_)
| ErrorKind::DmlError(_)
| ErrorKind::NotImplemented(_) => 999,
}
}

Expand All @@ -903,9 +919,18 @@ pub fn try_find_root_actor_failure<'a>(
// `UnexpectedExit` wraps the original error. Score on the inner error.
ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),

ErrorKind::Internal(_) => 1000,
// `BarrierSend` is likely to be caused by actor exit and not the root cause.
ErrorKind::BarrierSend { .. } => 1,

// Executor errors first.
ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
_ => 3000,

// Then other errors.
ErrorKind::Uncategorized(_)
| ErrorKind::Storage(_)
| ErrorKind::Expression(_)
| ErrorKind::Array(_)
| ErrorKind::Sink(_) => 1000,
}
}

Expand Down

0 comments on commit 90eb6f1

Please sign in to comment.