Skip to content

Commit

Permalink
feat(meta): try to report the root cause of recovery (#13441)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Feb 27, 2024
1 parent de3696f commit 17212b3
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 19 deletions.
73 changes: 65 additions & 8 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use fail::fail_point;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::hash::ActorId;
Expand All @@ -35,13 +35,14 @@ use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::StreamClient;
use rw_futures_util::pending_on_none;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tracing::Instrument;
use uuid::Uuid;

use super::command::CommandContext;
use super::{BarrierCompletion, GlobalBarrierManagerContext};
use crate::manager::{MetaSrvEnv, WorkerId};
use crate::MetaResult;
use crate::{MetaError, MetaResult};

pub(super) struct BarrierRpcManager {
context: GlobalBarrierManagerContext,
Expand Down Expand Up @@ -294,11 +295,12 @@ impl StreamRpcManager {
) -> MetaResult<Vec<RSP>> {
let pool = self.env.stream_client_pool();
let f = &f;
Ok(try_join_all(request.map(|(node, input)| async move {
let client = pool.get(node).await?;
f(client, input).await
}))
.await?)
let iters = request.map(|(node, input)| async move {
let client = pool.get(node).await.map_err(|e| (node.id, e))?;
f(client, input).await.map_err(|e| (node.id, e))
});
let result = try_join_all_with_error_timeout(iters, Duration::from_secs(3)).await;
result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err))
}

async fn broadcast<RSP, Fut: Future<Output = Result<RSP, RpcError>> + 'static>(
Expand Down Expand Up @@ -419,3 +421,58 @@ impl StreamRpcManager {
Ok(())
}
}

/// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`.
async fn try_join_all_with_error_timeout<I, RSP, E, F>(
iters: I,
error_timeout: Duration,
) -> Result<Vec<RSP>, Vec<E>>
where
I: IntoIterator<Item = F>,
F: Future<Output = Result<RSP, E>>,
{
let stream = FuturesUnordered::from_iter(iters);
pin_mut!(stream);
let mut results_ok = vec![];
let mut results_err = vec![];
while let Some(result) = stream.next().await {
match result {
Ok(rsp) => {
results_ok.push(rsp);
}
Err(err) => {
results_err.push(err);
break;
}
}
}
if results_err.is_empty() {
return Ok(results_ok);
}
let _ = timeout(error_timeout, async {
while let Some(result) = stream.next().await {
if let Err(err) = result {
results_err.push(err);
}
}
})
.await;
Err(results_err)
}

fn merge_node_rpc_errors(
message: &str,
errors: impl IntoIterator<Item = (WorkerId, RpcError)>,
) -> MetaError {
use std::fmt::Write;

use thiserror_ext::AsReport;

let concat: String = errors
.into_iter()
.fold(format!("{message}:"), |mut s, (w, e)| {
write!(&mut s, " worker node {}, {};", w, e.as_report()).unwrap();
s
});
anyhow::anyhow!(concat).into()
}
2 changes: 1 addition & 1 deletion src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod dedup;
mod dispatch;
pub mod dml;
mod dynamic_filter;
mod error;
pub mod error;
mod expand;
mod filter;
mod flow_control;
Expand Down
72 changes: 62 additions & 10 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -212,6 +213,8 @@ pub(super) struct LocalBarrierWorker {
barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,

actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,

root_failure: Option<StreamError>,
}

impl LocalBarrierWorker {
Expand Down Expand Up @@ -239,6 +242,7 @@ impl LocalBarrierWorker {
current_shared_context: shared_context,
barrier_event_rx: event_rx,
actor_failure_rx: failure_rx,
root_failure: None,
}
}

Expand All @@ -260,7 +264,7 @@ impl LocalBarrierWorker {
},
failure = self.actor_failure_rx.recv() => {
let (actor_id, err) = failure.unwrap();
self.notify_failure(actor_id, err);
self.notify_failure(actor_id, err).await;
},
actor_op = actor_op_rx.recv() => {
if let Some(actor_op) = actor_op {
Expand Down Expand Up @@ -451,7 +455,8 @@ impl LocalBarrierWorker {
// The failure actors could exit before the barrier is issued, while their
// up-downstream actors could be stuck somehow. Return error directly to trigger the
// recovery.
return Err(e.clone());
// try_find_root_failure is not used merely because it requires async.
return Err(self.root_failure.clone().unwrap_or(e.clone()));
}
}

Expand Down Expand Up @@ -538,22 +543,42 @@ impl LocalBarrierWorker {

/// When a actor exit unexpectedly, it should report this event using this function, so meta
/// will notice actor's exit while collecting.
fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) {
async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) {
self.add_failure(actor_id, err.clone());
let root_err = self.try_find_root_failure(err).await;
for fail_epoch in self.state.epochs_await_on_actor(actor_id) {
if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) {
if result_sender.send(Err(root_err.clone())).is_err() {
warn!(fail_epoch, actor_id, err = %root_err.as_report(), "fail to notify actor failure");
}
}
}
}

fn add_failure(&mut self, actor_id: ActorId, err: StreamError) {
let err = err.into_unexpected_exit(actor_id);
if let Some(prev_err) = self.failure_actors.insert(actor_id, err.clone()) {
if let Some(prev_err) = self.failure_actors.insert(actor_id, err) {
warn!(
actor_id,
prev_err = %prev_err.as_report(),
"actor error overwritten"
);
}
for fail_epoch in self.state.epochs_await_on_actor(actor_id) {
if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) {
if result_sender.send(Err(err.clone())).is_err() {
warn!(fail_epoch, actor_id, err = %err.as_report(), "fail to notify actor failure");
}
}
}

async fn try_find_root_failure(&mut self, default_err: StreamError) -> StreamError {
if let Some(root_failure) = &self.root_failure {
return root_failure.clone();
}
// fetch more actor errors within a timeout
let _ = tokio::time::timeout(Duration::from_secs(3), async {
while let Some((actor_id, error)) = self.actor_failure_rx.recv().await {
self.add_failure(actor_id, error);
}
})
.await;
self.root_failure = try_find_root_actor_failure(self.failure_actors.values());
self.root_failure.clone().unwrap_or(default_err)
}
}

Expand Down Expand Up @@ -681,6 +706,33 @@ impl LocalBarrierManager {
}
}

/// Tries to find the root cause of actor failures, based on hard-coded rules.
pub fn try_find_root_actor_failure<'a>(
actor_errors: impl IntoIterator<Item = &'a StreamError>,
) -> Option<StreamError> {
use crate::executor::StreamExecutorError;
let stream_executor_error_score = |e: &StreamExecutorError| {
use crate::executor::error::ErrorKind;
match e.inner() {
ErrorKind::ChannelClosed(_) => 0,
ErrorKind::Internal(_) => 1,
_ => 999,
}
};
let stream_error_score = |e: &&StreamError| {
use crate::error::ErrorKind;
match e.inner() {
ErrorKind::Internal(_) => 1000,
ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
_ => 3000,
}
};
actor_errors
.into_iter()
.max_by_key(stream_error_score)
.cloned()
}

#[cfg(test)]
impl LocalBarrierManager {
pub(super) async fn spawn_for_test() -> (EventSender<LocalActorOperation>, Self) {
Expand Down

0 comments on commit 17212b3

Please sign in to comment.