From 17212b3bc09d31ddedb764dcbeaa9cbc4b407f14 Mon Sep 17 00:00:00 2001 From: zwang28 <70626450+zwang28@users.noreply.github.com> Date: Tue, 27 Feb 2024 17:12:49 +0800 Subject: [PATCH] feat(meta): try to report the root cause of recovery (#13441) --- src/meta/src/barrier/rpc.rs | 73 +++++++++++++++++++++++--- src/stream/src/executor/mod.rs | 2 +- src/stream/src/task/barrier_manager.rs | 72 +++++++++++++++++++++---- 3 files changed, 128 insertions(+), 19 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 8690435b4ebc0..755d8f4c2061f 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -15,12 +15,12 @@ use std::collections::HashMap; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use fail::fail_point; -use futures::future::try_join_all; use futures::stream::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::ActorId; @@ -35,13 +35,14 @@ use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; use tokio::sync::oneshot; +use tokio::time::timeout; use tracing::Instrument; use uuid::Uuid; use super::command::CommandContext; use super::{BarrierCompletion, GlobalBarrierManagerContext}; use crate::manager::{MetaSrvEnv, WorkerId}; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; pub(super) struct BarrierRpcManager { context: GlobalBarrierManagerContext, @@ -294,11 +295,12 @@ impl StreamRpcManager { ) -> MetaResult> { let pool = self.env.stream_client_pool(); let f = &f; - Ok(try_join_all(request.map(|(node, input)| async move { - let client = pool.get(node).await?; - f(client, input).await - })) - .await?) + let iters = request.map(|(node, input)| async move { + let client = pool.get(node).await.map_err(|e| (node.id, e))?; + f(client, input).await.map_err(|e| (node.id, e)) + }); + let result = try_join_all_with_error_timeout(iters, Duration::from_secs(3)).await; + result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err)) } async fn broadcast> + 'static>( @@ -419,3 +421,58 @@ impl StreamRpcManager { Ok(()) } } + +/// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`. +async fn try_join_all_with_error_timeout( + iters: I, + error_timeout: Duration, +) -> Result, Vec> +where + I: IntoIterator, + F: Future>, +{ + let stream = FuturesUnordered::from_iter(iters); + pin_mut!(stream); + let mut results_ok = vec![]; + let mut results_err = vec![]; + while let Some(result) = stream.next().await { + match result { + Ok(rsp) => { + results_ok.push(rsp); + } + Err(err) => { + results_err.push(err); + break; + } + } + } + if results_err.is_empty() { + return Ok(results_ok); + } + let _ = timeout(error_timeout, async { + while let Some(result) = stream.next().await { + if let Err(err) = result { + results_err.push(err); + } + } + }) + .await; + Err(results_err) +} + +fn merge_node_rpc_errors( + message: &str, + errors: impl IntoIterator, +) -> MetaError { + use std::fmt::Write; + + use thiserror_ext::AsReport; + + let concat: String = errors + .into_iter() + .fold(format!("{message}:"), |mut s, (w, e)| { + write!(&mut s, " worker node {}, {};", w, e.as_report()).unwrap(); + s + }); + anyhow::anyhow!(concat).into() +} diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 1e82768096f36..7ec645e3b0dc7 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -62,7 +62,7 @@ mod dedup; mod dispatch; pub mod dml; mod dynamic_filter; -mod error; +pub mod error; mod expand; mod filter; mod flow_control; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index d9917cdf554f0..faabc8f266ac8 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use futures::stream::FuturesUnordered; @@ -212,6 +213,8 @@ pub(super) struct LocalBarrierWorker { barrier_event_rx: UnboundedReceiver, actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>, + + root_failure: Option, } impl LocalBarrierWorker { @@ -239,6 +242,7 @@ impl LocalBarrierWorker { current_shared_context: shared_context, barrier_event_rx: event_rx, actor_failure_rx: failure_rx, + root_failure: None, } } @@ -260,7 +264,7 @@ impl LocalBarrierWorker { }, failure = self.actor_failure_rx.recv() => { let (actor_id, err) = failure.unwrap(); - self.notify_failure(actor_id, err); + self.notify_failure(actor_id, err).await; }, actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { @@ -451,7 +455,8 @@ impl LocalBarrierWorker { // The failure actors could exit before the barrier is issued, while their // up-downstream actors could be stuck somehow. Return error directly to trigger the // recovery. - return Err(e.clone()); + // try_find_root_failure is not used merely because it requires async. + return Err(self.root_failure.clone().unwrap_or(e.clone())); } } @@ -538,22 +543,42 @@ impl LocalBarrierWorker { /// When a actor exit unexpectedly, it should report this event using this function, so meta /// will notice actor's exit while collecting. - fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { + async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { + self.add_failure(actor_id, err.clone()); + let root_err = self.try_find_root_failure(err).await; + for fail_epoch in self.state.epochs_await_on_actor(actor_id) { + if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) { + if result_sender.send(Err(root_err.clone())).is_err() { + warn!(fail_epoch, actor_id, err = %root_err.as_report(), "fail to notify actor failure"); + } + } + } + } + + fn add_failure(&mut self, actor_id: ActorId, err: StreamError) { let err = err.into_unexpected_exit(actor_id); - if let Some(prev_err) = self.failure_actors.insert(actor_id, err.clone()) { + if let Some(prev_err) = self.failure_actors.insert(actor_id, err) { warn!( actor_id, prev_err = %prev_err.as_report(), "actor error overwritten" ); } - for fail_epoch in self.state.epochs_await_on_actor(actor_id) { - if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) { - if result_sender.send(Err(err.clone())).is_err() { - warn!(fail_epoch, actor_id, err = %err.as_report(), "fail to notify actor failure"); - } - } + } + + async fn try_find_root_failure(&mut self, default_err: StreamError) -> StreamError { + if let Some(root_failure) = &self.root_failure { + return root_failure.clone(); } + // fetch more actor errors within a timeout + let _ = tokio::time::timeout(Duration::from_secs(3), async { + while let Some((actor_id, error)) = self.actor_failure_rx.recv().await { + self.add_failure(actor_id, error); + } + }) + .await; + self.root_failure = try_find_root_actor_failure(self.failure_actors.values()); + self.root_failure.clone().unwrap_or(default_err) } } @@ -681,6 +706,33 @@ impl LocalBarrierManager { } } +/// Tries to find the root cause of actor failures, based on hard-coded rules. +pub fn try_find_root_actor_failure<'a>( + actor_errors: impl IntoIterator, +) -> Option { + use crate::executor::StreamExecutorError; + let stream_executor_error_score = |e: &StreamExecutorError| { + use crate::executor::error::ErrorKind; + match e.inner() { + ErrorKind::ChannelClosed(_) => 0, + ErrorKind::Internal(_) => 1, + _ => 999, + } + }; + let stream_error_score = |e: &&StreamError| { + use crate::error::ErrorKind; + match e.inner() { + ErrorKind::Internal(_) => 1000, + ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), + _ => 3000, + } + }; + actor_errors + .into_iter() + .max_by_key(stream_error_score) + .cloned() +} + #[cfg(test)] impl LocalBarrierManager { pub(super) async fn spawn_for_test() -> (EventSender, Self) {