Skip to content

Commit

Permalink
refactor and refine docs
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jul 15, 2024
1 parent a67ffd2 commit 1b4ccf5
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 95 deletions.
32 changes: 3 additions & 29 deletions src/error/src/tonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod extra;

use std::borrow::Cow;
use std::error::Error;
use std::sync::Arc;
Expand All @@ -24,37 +26,9 @@ use tonic::metadata::{MetadataMap, MetadataValue};
const ERROR_KEY: &str = "risingwave-error-bin";

/// The service name that the error is from. Used to provide better error message.
// TODO: also make it a field of `Extra`?
type ServiceName = Cow<'static, str>;

pub mod extra {
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Score(pub i32);

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub(super) struct Extra {
pub score: Option<Score>,
}

impl Extra {
pub fn new<T>(error: &T) -> Self
where
T: ?Sized + std::error::Error,
{
Self {
score: std::error::request_value(error),
}
}

pub fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) {
if let Some(score) = self.score {
request.provide_value(score);
}
}
}
}

/// The error produced by the gRPC server and sent to the client on the wire.
#[derive(Debug, Serialize, Deserialize)]
struct ServerError {
Expand Down
50 changes: 50 additions & 0 deletions src/error/src/tonic/extra.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

/// The score of the error.
///
/// Currently, it's used to identify the root cause of streaming pipeline failures, i.e., which actor
/// led to the failure.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Score(pub i32);

/// Extra fields in errors that can be passed through the gRPC boundary.
///
/// - Field being set to `None` means it is not available.
/// - To add a new field, also update the `provide` method.
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
pub(super) struct Extra {
pub score: Option<Score>,
}

impl Extra {
/// Create a new [`Extra`] by [requesting](std::error::request_ref) each field from the given error.
pub fn new<T>(error: &T) -> Self
where
T: ?Sized + std::error::Error,
{
Self {
score: std::error::request_value(error),
}
}

/// Provide each field to the given [request](std::error::Request).
pub fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) {
if let Some(score) = self.score {
request.provide_value(score);
}
}
}
3 changes: 1 addition & 2 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
// Find the error with the highest score.
let max_score = errors
.iter()
.map(|(_, e)| request_value::<Score>(e))
.flatten()
.filter_map(|(_, e)| request_value::<Score>(e))
.max();

if let Some(max_score) = max_score {
Expand Down
3 changes: 3 additions & 0 deletions src/rpc_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ pub enum RpcError {
#[error(transparent)]
GrpcStatus(
#[from]
// Typically it does not have a backtrace,
// but this is to let `thiserror` generate `provide` implementation to make `Extra` work.
// See `risingwave_error::tonic::extra`.
#[backtrace]
Box<TonicStatusWrapper>,
),
Expand Down
123 changes: 59 additions & 64 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use futures::stream::{BoxStream, FuturesUnordered};
use futures::StreamExt;
use itertools::Itertools;
use parking_lot::Mutex;
use risingwave_common::error::tonic::extra::Score;
use risingwave_pb::stream_service::barrier_complete_response::{
GroupedSstableInfo, PbCreateMviewProgress,
};
Expand Down Expand Up @@ -799,7 +800,7 @@ impl LocalBarrierWorker {
let root_err = self
.try_find_root_failure()
.await
.unwrap_or(ScoredStreamError::fallback(err));
.unwrap_or_else(|| ScoredStreamError::new(err));

self.control_stream_handle.reset_stream_with_err(
anyhow!(root_err)
Expand All @@ -825,14 +826,21 @@ impl LocalBarrierWorker {
if self.cached_root_failure.is_some() {
return self.cached_root_failure.clone();
}

// fetch more actor errors within a timeout
let _ = tokio::time::timeout(Duration::from_secs(3), async {
while let Some((actor_id, error)) = self.actor_failure_rx.recv().await {
self.add_failure(actor_id, error);
}
})
.await;
self.cached_root_failure = try_find_root_actor_failure(self.failure_actors.values());

// Find the error with highest score.
self.cached_root_failure = self
.failure_actors
.values()
.map(|e| ScoredStreamError::new(e.clone()))
.max_by_key(|e| e.score);

self.cached_root_failure.clone()
}
Expand Down Expand Up @@ -941,16 +949,11 @@ impl LocalBarrierManager {
}
}

/// A [`StreamError`] with a score, used to find the root cause of actor failures.
#[derive(Debug, Clone)]
struct ScoredStreamError {
error: StreamError,
score: i32,
}

impl ScoredStreamError {
const fn fallback(error: StreamError) -> Self {
Self { error, score: 0 }
}
score: Score,
}

impl std::fmt::Display for ScoredStreamError {
Expand All @@ -965,72 +968,64 @@ impl std::error::Error for ScoredStreamError {
}

fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) {
use risingwave_common::error::tonic::extra::Score;

self.error.provide(request);
request.provide_value(Score(self.score));
// HIGHLIGHT: Provide the score to make it retrievable from meta service.
request.provide_value(self.score);
}
}

/// Tries to find the root cause of actor failures, based on hard-coded rules.
///
/// Returns `None` if the input is empty.
fn try_find_root_actor_failure<'a>(
actor_errors: impl IntoIterator<Item = &'a StreamError>,
) -> Option<ScoredStreamError> {
// Explicitly list all error kinds here to notice developers to update this function when
// there are changes in error kinds.

fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
use crate::executor::error::ErrorKind;
match e.inner() {
// `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
// and not the root cause.
ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,

// Normal errors.
ErrorKind::Uncategorized(_)
| ErrorKind::Storage(_)
| ErrorKind::ArrayError(_)
| ErrorKind::ExprError(_)
| ErrorKind::SerdeError(_)
| ErrorKind::SinkError(_)
| ErrorKind::RpcError(_)
| ErrorKind::AlignBarrier(_, _)
| ErrorKind::ConnectorError(_)
| ErrorKind::DmlError(_)
| ErrorKind::NotImplemented(_) => 999,
impl ScoredStreamError {
/// Score the given error based on hard-coded rules.
fn new(error: StreamError) -> Self {
// Explicitly list all error kinds here to notice developers to update this function when
// there are changes in error kinds.

fn stream_executor_error_score(e: &StreamExecutorError) -> i32 {
use crate::executor::error::ErrorKind;
match e.inner() {
// `ChannelClosed` or `ExchangeChannelClosed` is likely to be caused by actor exit
// and not the root cause.
ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 1,

// Normal errors.
ErrorKind::Uncategorized(_)
| ErrorKind::Storage(_)
| ErrorKind::ArrayError(_)
| ErrorKind::ExprError(_)
| ErrorKind::SerdeError(_)
| ErrorKind::SinkError(_)
| ErrorKind::RpcError(_)
| ErrorKind::AlignBarrier(_, _)
| ErrorKind::ConnectorError(_)
| ErrorKind::DmlError(_)
| ErrorKind::NotImplemented(_) => 999,
}
}
}

fn stream_error_score(e: &StreamError) -> i32 {
use crate::error::ErrorKind;
match e.inner() {
// `UnexpectedExit` wraps the original error. Score on the inner error.
ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),
fn stream_error_score(e: &StreamError) -> i32 {
use crate::error::ErrorKind;
match e.inner() {
// `UnexpectedExit` wraps the original error. Score on the inner error.
ErrorKind::UnexpectedExit { source, .. } => stream_error_score(source),

// `BarrierSend` is likely to be caused by actor exit and not the root cause.
ErrorKind::BarrierSend { .. } => 1,
// `BarrierSend` is likely to be caused by actor exit and not the root cause.
ErrorKind::BarrierSend { .. } => 1,

// Executor errors first.
ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
// Executor errors first.
ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),

// Then other errors.
ErrorKind::Uncategorized(_)
| ErrorKind::Storage(_)
| ErrorKind::Expression(_)
| ErrorKind::Array(_)
| ErrorKind::Sink(_) => 1000,
// Then other errors.
ErrorKind::Uncategorized(_)
| ErrorKind::Storage(_)
| ErrorKind::Expression(_)
| ErrorKind::Array(_)
| ErrorKind::Sink(_) => 1000,
}
}
}

actor_errors
.into_iter()
.map(|e| ScoredStreamError {
error: e.clone(),
score: stream_error_score(e),
})
.max_by_key(|e| e.score)
let score = Score(stream_error_score(&error));
Self { error, score }
}
}

#[cfg(test)]
Expand Down

0 comments on commit 1b4ccf5

Please sign in to comment.