Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): try to report the root cause of recovery #13441

Merged
merged 25 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 65 additions & 8 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use fail::fail_point;
use futures::future::try_join_all;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::hash::ActorId;
Expand All @@ -35,13 +35,14 @@ use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::StreamClient;
use rw_futures_util::pending_on_none;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tracing::Instrument;
use uuid::Uuid;

use super::command::CommandContext;
use super::{BarrierCompletion, GlobalBarrierManagerContext};
use crate::manager::{MetaSrvEnv, WorkerId};
use crate::MetaResult;
use crate::{MetaError, MetaResult};

pub(super) struct BarrierRpcManager {
context: GlobalBarrierManagerContext,
Expand Down Expand Up @@ -294,11 +295,12 @@ impl StreamRpcManager {
) -> MetaResult<Vec<RSP>> {
let pool = self.env.stream_client_pool();
let f = &f;
Ok(try_join_all(request.map(|(node, input)| async move {
let client = pool.get(node).await?;
f(client, input).await
}))
.await?)
let iters = request.map(|(node, input)| async move {
let client = pool.get(node).await.map_err(|e| (node.id, e))?;
f(client, input).await.map_err(|e| (node.id, e))
});
let result = try_join_all_with_error_timeout(iters, Duration::from_secs(3)).await;
result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err))
}

async fn broadcast<RSP, Fut: Future<Output = Result<RSP, RpcError>> + 'static>(
Expand Down Expand Up @@ -419,3 +421,58 @@ impl StreamRpcManager {
Ok(())
}
}

/// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`.
async fn try_join_all_with_error_timeout<I, RSP, E, F>(
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
iters: I,
error_timeout: Duration,
) -> Result<Vec<RSP>, Vec<E>>
where
I: IntoIterator<Item = F>,
F: Future<Output = Result<RSP, E>>,
{
let stream = FuturesUnordered::from_iter(iters);
pin_mut!(stream);
let mut results_ok = vec![];
let mut results_err = vec![];
while let Some(result) = stream.next().await {
match result {
Ok(rsp) => {
results_ok.push(rsp);
}
Err(err) => {
results_err.push(err);
break;
}
}
}
if results_err.is_empty() {
return Ok(results_ok);
}
let _ = timeout(error_timeout, async {
while let Some(result) = stream.next().await {
if let Err(err) = result {
results_err.push(err);
}
}
})
.await;
Err(results_err)
}

fn merge_node_rpc_errors(
message: &str,
errors: impl IntoIterator<Item = (WorkerId, RpcError)>,
) -> MetaError {
use std::fmt::Write;

use thiserror_ext::AsReport;

let concat: String = errors
.into_iter()
.fold(format!("{message}:"), |mut s, (w, e)| {
write!(&mut s, " worker node {}, {};", w, e.as_report()).unwrap();
s
});
anyhow::anyhow!(concat).into()
}
2 changes: 1 addition & 1 deletion src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod dedup;
mod dispatch;
pub mod dml;
mod dynamic_filter;
mod error;
pub mod error;
mod expand;
mod filter;
mod flow_control;
Expand Down
81 changes: 73 additions & 8 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -212,6 +213,8 @@ pub(super) struct LocalBarrierWorker {
barrier_event_rx: UnboundedReceiver<LocalBarrierEvent>,

actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,

root_failure: Option<StreamError>,
}

impl LocalBarrierWorker {
Expand Down Expand Up @@ -239,6 +242,7 @@ impl LocalBarrierWorker {
current_shared_context: shared_context,
barrier_event_rx: event_rx,
actor_failure_rx: failure_rx,
root_failure: None,
}
}

Expand All @@ -260,7 +264,7 @@ impl LocalBarrierWorker {
},
failure = self.actor_failure_rx.recv() => {
let (actor_id, err) = failure.unwrap();
self.notify_failure(actor_id, err);
self.notify_failure(actor_id, err).await;
},
actor_op = actor_op_rx.recv() => {
if let Some(actor_op) = actor_op {
Expand Down Expand Up @@ -451,7 +455,8 @@ impl LocalBarrierWorker {
// The failure actors could exit before the barrier is issued, while their
// up-downstream actors could be stuck somehow. Return error directly to trigger the
// recovery.
return Err(e.clone());
// try_find_root_failure is not used merely because it requires async.
return Err(self.root_failure().unwrap_or(e.clone()));
}
}

Expand Down Expand Up @@ -538,22 +543,55 @@ impl LocalBarrierWorker {

/// When a actor exit unexpectedly, it should report this event using this function, so meta
/// will notice actor's exit while collecting.
fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) {
async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) {
self.add_failure(actor_id, err.clone());
let root_err = self.try_find_root_failure(err).await;
for fail_epoch in self.state.epochs_await_on_actor(actor_id) {
if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) {
if result_sender.send(Err(root_err.clone())).is_err() {
warn!(fail_epoch, actor_id, err = %root_err.as_report(), "fail to notify actor failure");
}
}
}
}

fn add_failure(&mut self, actor_id: ActorId, err: StreamError) {
let err = err.into_unexpected_exit(actor_id);
if let Some(prev_err) = self.failure_actors.insert(actor_id, err.clone()) {
if let Some(prev_err) = self.failure_actors.insert(actor_id, err) {
warn!(
actor_id,
prev_err = %prev_err.as_report(),
"actor error overwritten"
);
}
for fail_epoch in self.state.epochs_await_on_actor(actor_id) {
if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) {
if result_sender.send(Err(err.clone())).is_err() {
warn!(fail_epoch, actor_id, err = %err.as_report(), "fail to notify actor failure");
}

fn root_failure(&self) -> Option<StreamError> {
self.root_failure.clone()
}

async fn try_find_root_failure(&mut self, default_err: StreamError) -> StreamError {
if let Some(root_failure) = &self.root_failure {
return root_failure.clone();
}
// fetch more actor errors within a timeout
let mut timeout = tokio::time::interval(Duration::from_secs(3));
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
timeout.reset();
loop {
select! {
biased;
_ = timeout.tick() => {
break;
}
result = self.actor_failure_rx.recv() => {
let Some((actor_id, error)) = result else {
break;
};
self.add_failure(actor_id, error);
}
}
}
try_find_root_actor_failure(self.failure_actors.values()).unwrap_or(default_err)
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -681,6 +719,33 @@ impl LocalBarrierManager {
}
}

/// Tries to find the root cause of actor failures, based on hard-coded rules.
pub fn try_find_root_actor_failure<'a>(
actor_errors: impl IntoIterator<Item = &'a StreamError>,
) -> Option<StreamError> {
use crate::executor::StreamExecutorError;
let stream_executor_error_score = |e: &StreamExecutorError| {
use crate::executor::error::ErrorKind;
match e.inner() {
ErrorKind::ChannelClosed(_) => 0,
ErrorKind::Internal(_) => 1,
_ => 999,
}
};
let stream_error_score = |e: &&StreamError| {
use crate::error::ErrorKind;
match e.inner() {
ErrorKind::Internal(_) => 1000,
ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee),
_ => 3000,
}
};
actor_errors
.into_iter()
.max_by_key(stream_error_score)
.cloned()
}

#[cfg(test)]
impl LocalBarrierManager {
pub(super) async fn spawn_for_test() -> (EventSender<LocalActorOperation>, Self) {
Expand Down
Loading