From 9268c77be0dd528c42b0d665585f50edf17f9724 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 22 Apr 2024 14:02:06 +0800 Subject: [PATCH] refactor(streaming): do not print backtrace for exchange channel closed error (#16362) Signed-off-by: Bugen Zhao --- src/stream/src/error.rs | 7 ++ src/stream/src/executor/error.rs | 8 ++ src/stream/src/executor/exchange/error.rs | 96 ++++++++++++++++++++++ src/stream/src/executor/exchange/input.rs | 19 +++-- src/stream/src/executor/exchange/mod.rs | 1 + src/stream/src/executor/exchange/output.rs | 21 +---- src/stream/src/executor/merge.rs | 17 ++-- src/stream/src/task/barrier_manager.rs | 2 +- 8 files changed, 134 insertions(+), 37 deletions(-) create mode 100644 src/stream/src/executor/exchange/error.rs diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index ea957086f038b..7aaffc7824d5e 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -20,6 +20,7 @@ use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::ToTonicStatus; use risingwave_storage::error::StorageError; +use crate::executor::exchange::error::ExchangeChannelClosed; use crate::executor::{Barrier, StreamExecutorError}; use crate::task::ActorId; @@ -108,6 +109,12 @@ impl From for StreamError { } } +impl From for StreamError { + fn from(err: ExchangeChannelClosed) -> Self { + StreamExecutorError::from(err).into() + } +} + impl From for tonic::Status { fn from(error: StreamError) -> Self { error.to_status(tonic::Code::Internal, "stream") diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index 48d093f9883ea..db70dcffa7fdd 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -26,6 +26,7 @@ use risingwave_rpc_client::error::RpcError; use risingwave_storage::error::StorageError; use strum_macros::AsRefStr; +use super::exchange::error::ExchangeChannelClosed; use super::Barrier; /// A specialized Result type for streaming executors. @@ -84,6 +85,13 @@ pub enum ErrorKind { #[error("Channel closed: {0}")] ChannelClosed(String), + #[error(transparent)] + ExchangeChannelClosed( + #[from] + #[backtrace] + ExchangeChannelClosed, + ), + #[error("Failed to align barrier: expected `{0:?}` but got `{1:?}`")] AlignBarrier(Box, Box), diff --git a/src/stream/src/executor/exchange/error.rs b/src/stream/src/executor/exchange/error.rs new file mode 100644 index 0000000000000..926cd0a61d702 --- /dev/null +++ b/src/stream/src/executor/exchange/error.rs @@ -0,0 +1,96 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_rpc_client::error::TonicStatusWrapper; + +use crate::task::ActorId; + +/// The error type for the exchange channel closed unexpectedly. +/// +/// In most cases, this error happens when the upstream or downstream actor +/// exits or panics on other errors, or the network connection is broken. +/// Therefore, this error is usually not the root case of the failure in the +/// streaming graph. +#[derive(Debug)] +pub struct ExchangeChannelClosed { + message: String, + + /// `Some` if there is a gRPC error from the remote actor. + source: Option, +} + +impl std::fmt::Display for ExchangeChannelClosed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for ExchangeChannelClosed { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source.as_ref().map(|s| s as _) + } + + fn provide<'a>(&'a self, request: &mut std::error::Request<'a>) { + use std::backtrace::Backtrace; + + // Always provide a fake disabled backtrace, so that the upper layer will + // not capture any other backtraces or include the backtrace in the error + // log. + // + // Otherwise, when an actor exits on a significant error, all connected + // actor will then exit with the `ExchangeChannelClosed` error, resulting + // in a very noisy log with flood of useless backtraces. + static DISABLED_BACKTRACE: Backtrace = Backtrace::disabled(); + request.provide_ref::(&DISABLED_BACKTRACE); + + if let Some(source) = &self.source { + source.provide(request); + } + } +} + +impl ExchangeChannelClosed { + /// Creates a new error indicating that the exchange channel from the local + /// upstream actor is closed unexpectedly. + pub fn local_input(upstream: ActorId) -> Self { + Self { + message: format!( + "exchange channel from local upstream actor {upstream} closed unexpectedly" + ), + source: None, + } + } + + /// Creates a new error indicating that the exchange channel from the remote + /// upstream actor is closed unexpectedly, with an optional gRPC error as the cause. + pub fn remote_input(upstream: ActorId, source: Option) -> Self { + Self { + message: format!( + "exchange channel from remote upstream actor {upstream} closed unexpectedly" + ), + source: source.map(Into::into), + } + } + + /// Creates a new error indicating that the exchange channel to the downstream + /// actor is closed unexpectedly. + pub fn output(downstream: ActorId) -> Self { + Self { + message: format!( + "exchange channel to downstream actor {downstream} closed unexpectedly" + ), + source: None, + } + } +} diff --git a/src/stream/src/executor/exchange/input.rs b/src/stream/src/executor/exchange/input.rs index 98ac3d278f0a3..11796441326aa 100644 --- a/src/stream/src/executor/exchange/input.rs +++ b/src/stream/src/executor/exchange/input.rs @@ -21,10 +21,9 @@ use futures_async_stream::try_stream; use pin_project::pin_project; use risingwave_common::util::addr::{is_local_address, HostAddr}; use risingwave_pb::task_service::{permits, GetStreamResponse}; -use risingwave_rpc_client::error::TonicStatusWrapper; use risingwave_rpc_client::ComputeClientPool; -use thiserror_ext::AsReport; +use super::error::ExchangeChannelClosed; use super::permit::Receiver; use crate::error::StreamResult; use crate::executor::error::StreamExecutorError; @@ -82,6 +81,9 @@ impl LocalInput { while let Some(msg) = channel.recv().verbose_instrument_await(span.clone()).await { yield msg; } + // Always emit an error outside the loop. This is because we use barrier as the control + // message to stop the stream. Reaching here means the channel is closed unexpectedly. + Err(ExchangeChannelClosed::local_input(actor_id))? } } @@ -213,15 +215,14 @@ impl RemoteInput { } yield msg; } - Err(e) => { - // TODO(error-handling): maintain the source chain - return Err(StreamExecutorError::channel_closed(format!( - "RemoteInput tonic error: {}", - TonicStatusWrapper::new(e).as_report() - ))); - } + + Err(e) => Err(ExchangeChannelClosed::remote_input(up_down_ids.0, Some(e)))?, } } + + // Always emit an error outside the loop. This is because we use barrier as the control + // message to stop the stream. Reaching here means the channel is closed unexpectedly. + Err(ExchangeChannelClosed::remote_input(up_down_ids.0, None))? } } diff --git a/src/stream/src/executor/exchange/mod.rs b/src/stream/src/executor/exchange/mod.rs index 92a9297ca6a06..3c0d6d83e89c3 100644 --- a/src/stream/src/executor/exchange/mod.rs +++ b/src/stream/src/executor/exchange/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod error; pub mod input; pub mod output; pub mod permit; diff --git a/src/stream/src/executor/exchange/output.rs b/src/stream/src/executor/exchange/output.rs index a5124a44a020f..41b4b5b844759 100644 --- a/src/stream/src/executor/exchange/output.rs +++ b/src/stream/src/executor/exchange/output.rs @@ -14,13 +14,12 @@ use std::fmt::Debug; -use anyhow::anyhow; use async_trait::async_trait; use await_tree::InstrumentAwait; use educe::Educe; use risingwave_common::util::addr::is_local_address; -use tokio::sync::mpsc::error::SendError; +use super::error::ExchangeChannelClosed; use super::permit::Sender; use crate::error::StreamResult; use crate::executor::Message; @@ -74,14 +73,7 @@ impl Output for LocalOutput { .send(message) .verbose_instrument_await(self.span.clone()) .await - .map_err(|SendError(message)| { - anyhow!( - "failed to send message to actor {}, message: {:?}", - self.actor_id, - message - ) - .into() - }) + .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into()) } fn actor_id(&self) -> ActorId { @@ -128,14 +120,7 @@ impl Output for RemoteOutput { .send(message) .verbose_instrument_await(self.span.clone()) .await - .map_err(|SendError(message)| { - anyhow!( - "failed to send message to actor {}, message: {:?}", - self.actor_id, - message - ) - .into() - }) + .map_err(|_| ExchangeChannelClosed::output(self.actor_id).into()) } fn actor_id(&self) -> ActorId { diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 0f27386ffc836..7a5450716f9ac 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -305,15 +305,14 @@ impl Stream for SelectReceivers { } } } - // If one upstream is finished, we finish the whole stream with an error. This - // should not happen normally as we use the barrier as the control message. - Some((None, r)) => { - return Poll::Ready(Some(Err(StreamExecutorError::channel_closed(format!( - "exchange from actor {} to actor {} closed unexpectedly", - r.actor_id(), - self.actor_id - ))))) - } + // We use barrier as the control message of the stream. That is, we always stop the + // actors actively when we receive a `Stop` mutation, instead of relying on the stream + // termination. + // + // Besides, in abnormal cases when the other side of the `Input` closes unexpectedly, + // we also yield an `Err(ExchangeChannelClosed)`, which will hit the `Err` arm above. + // So this branch will never be reached in all cases. + Some((None, _)) => unreachable!(), // There's no active upstreams. Process the barrier and resume the blocked ones. None => break, } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 6e7b6f70e2421..4549552bab607 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -846,7 +846,7 @@ pub fn try_find_root_actor_failure<'a>( fn stream_executor_error_score(e: &StreamExecutorError) -> i32 { use crate::executor::error::ErrorKind; match e.inner() { - ErrorKind::ChannelClosed(_) => 0, + ErrorKind::ChannelClosed(_) | ErrorKind::ExchangeChannelClosed(_) => 0, ErrorKind::Internal(_) => 1, _ => 999, }