Skip to content

Commit

Permalink
handle single error
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Jul 16, 2024
1 parent 8ce0f40 commit 5f8639b
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
errors: impl IntoIterator<Item = (WorkerId, E)>,
) -> MetaError {
use std::error::request_value;
use std::fmt::Write;

use risingwave_common::error::tonic::extra::Score;

Expand All @@ -550,6 +551,18 @@ fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(
return anyhow!(message.to_owned()).into();
}

// Create the error from the single error.
let single_error = |(worker_id, e)| {
anyhow::Error::from(e)
.context(format!("{message}, in worker node {worker_id}"))
.into()
};

if errors.len() == 1 {
return single_error(errors.into_iter().next().unwrap());
}

// Find the error with the highest score.
let max_score = errors
.iter()
.map(|(_, e)| request_value::<Score>(e))
Expand All @@ -558,23 +571,20 @@ fn merge_node_rpc_errors<E: Error + Send + Sync + 'static>(

if let Some(max_score) = max_score {
let mut errors = errors;
let (worker_id, error) = errors
let max_scored = errors
.extract_if(|(_, e)| request_value::<Score>(e) == Some(max_score))
.next()
.unwrap();

anyhow::Error::from(error)
.context(format!("{message}, in worker node {worker_id}"))
.into()
} else {
use std::fmt::Write;

let concat: String = errors
.into_iter()
.fold(format!("{message}: "), |mut s, (w, e)| {
write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
s
});
anyhow!(concat).into()
return single_error(max_scored);
}

// The errors do not have scores, so simply concatenate them.
let concat: String = errors
.into_iter()
.fold(format!("{message}: "), |mut s, (w, e)| {
write!(&mut s, " in worker node {}, {};", w, e.as_report()).unwrap();
s
});
anyhow!(concat).into()
}

0 comments on commit 5f8639b

Please sign in to comment.