From 998b1c90a52b3c0e44ea32a9a9510a75bb197198 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Wed, 15 Nov 2023 16:14:13 +0800 Subject: [PATCH 01/20] feat(meta): try to report the root cause of recovery --- proto/stream_service.proto | 1 + src/compute/src/rpc/service/stream_service.rs | 3 +- src/meta/src/barrier/recovery.rs | 10 ++++- src/stream/src/error.rs | 8 +++- src/stream/src/executor/error.rs | 8 +++- src/stream/src/executor/mod.rs | 2 +- src/stream/src/task/barrier_manager.rs | 10 ++--- .../src/task/barrier_manager/managed_state.rs | 7 ++- src/stream/src/task/stream_manager.rs | 45 ++++++++++++++++--- 9 files changed, 75 insertions(+), 19 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index d7b3edd3fe7d4..81949d93a2d5d 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -51,6 +51,7 @@ message ForceStopActorsRequest { message ForceStopActorsResponse { string request_id = 1; common.Status status = 2; + string previous_actor_failure_cause = 3; } message InjectBarrierRequest { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 1c1448b3d1e45..b03be418c86f6 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -124,11 +124,12 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - self.mgr.stop_all_actors().await?; + let previous_actor_failure_cause = self.mgr.stop_all_actors().await; self.env.dml_manager_ref().clear(); Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, status: None, + previous_actor_failure_cause, })) } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index fc18ba4fbb612..a133b41a91252 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -663,9 +663,17 @@ impl GlobalBarrierManager { request_id: Uuid::new_v4().to_string(), }) .await + .map(|resp| (worker_node.id, resp)) }); - try_join_all(futures).await?; + let results = try_join_all(futures).await?; + for (worker_id, result) in results { + tracing::info!( + "cause of previous actor failure in worker node {}: {}", + worker_id, + result.previous_actor_failure_cause + ); + } debug!("all compute nodes have been reset."); Ok(()) diff --git a/src/stream/src/error.rs b/src/stream/src/error.rs index ee0c969801c54..c7613be1233a0 100644 --- a/src/stream/src/error.rs +++ b/src/stream/src/error.rs @@ -34,6 +34,12 @@ pub struct StreamError { inner: Box, } +impl StreamError { + pub(crate) fn kind(&self) -> &ErrorKind { + &self.inner.kind + } +} + #[derive(thiserror::Error, Debug)] #[error("{kind}")] struct Inner { @@ -43,7 +49,7 @@ struct Inner { } #[derive(thiserror::Error, Debug)] -enum ErrorKind { +pub(crate) enum ErrorKind { #[error("Storage error: {0}")] Storage( #[backtrace] diff --git a/src/stream/src/executor/error.rs b/src/stream/src/executor/error.rs index fc2f02f0aede0..7abf5344046cb 100644 --- a/src/stream/src/executor/error.rs +++ b/src/stream/src/executor/error.rs @@ -36,6 +36,12 @@ pub struct StreamExecutorError { inner: Box, } +impl StreamExecutorError { + pub(crate) fn kind(&self) -> &ErrorKind { + &self.inner.kind + } +} + #[derive(thiserror::Error, Debug)] #[error("{kind}")] struct Inner { @@ -45,7 +51,7 @@ struct Inner { } #[derive(thiserror::Error, Debug)] -enum ErrorKind { +pub(crate) enum ErrorKind { #[error("Storage error: {0}")] Storage( #[backtrace] diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index b01bfb42a3c09..9c5899da947ee 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -61,7 +61,7 @@ mod dedup; mod dispatch; pub mod dml; mod dynamic_filter; -mod error; +pub(crate) mod error; mod expand; mod filter; mod flow_control; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 996881d3ff4b0..2e634b95f4369 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -210,17 +210,17 @@ impl LocalBarrierManager { } /// Reset all internal states. - pub fn reset(&mut self) { + pub fn reset(&mut self) -> Vec { self.senders.clear(); self.collect_complete_receiver.clear(); match &mut self.state { #[cfg(test)] - BarrierState::Local => {} - - BarrierState::Managed(managed_state) => { - managed_state.clear_all_states(); + BarrierState::Local => { + vec![] } + + BarrierState::Managed(managed_state) => managed_state.clear_all_states(), } } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 9c1a02d43d0ca..bf98c22ccaca8 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -157,10 +157,13 @@ impl ManagedBarrierState { } /// Clear and reset all states. - pub(crate) fn clear_all_states(&mut self) { + pub(crate) fn clear_all_states(&mut self) -> Vec { tracing::debug!("clear all states in local barrier manager"); - + let errors = std::mem::take(&mut self.failure_actors) + .into_values() + .collect(); *self = Self::new(self.state_store.clone()); + errors } /// Notify unexpected actor exit with given `actor_id`. diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index a237e790588ed..481c3f49bc7b2 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -13,6 +13,7 @@ // limitations under the License. use core::time::Duration; +use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::io::Write; @@ -42,7 +43,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use super::{unique_executor_id, unique_operator_id, CollectResult}; -use crate::error::StreamResult; +use crate::error::{StreamError, StreamResult}; use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; @@ -266,8 +267,8 @@ impl LocalStreamManager { } /// Reset the state of the barrier manager. - pub fn reset_barrier_manager(&self) { - self.context.lock_barrier_manager().reset(); + pub fn reset_barrier_manager(&self) -> Vec { + self.context.lock_barrier_manager().reset() } /// Use `epoch` to find collect rx. And wait for all actor to be collected before @@ -347,13 +348,13 @@ impl LocalStreamManager { } /// Force stop all actors on this worker, and then drop their resources. - pub async fn stop_all_actors(&self) -> StreamResult<()> { + /// Returns the root cause of previous actor failure. + pub async fn stop_all_actors(&self) -> String { self.core.lock().await.stop_all_actors().await; - self.reset_barrier_manager(); + let actor_errors = self.reset_barrier_manager(); // Clear shared buffer in storage to release memory self.clear_storage_buffer().await; - - Ok(()) + try_find_root_cause(actor_errors) } pub async fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult { @@ -890,6 +891,36 @@ impl LocalStreamManagerCore { } } +/// Tries to find the root cause of last actor failures, based on hard-coded rules. +fn try_find_root_cause(actor_errors: Vec) -> String { + let stream_executor_error_score = |e: &StreamExecutorError| { + use crate::executor::error::ErrorKind; + match e.kind() { + ErrorKind::ChannelClosed(_) => 0, + ErrorKind::Internal(_) => 1, + _ => 999, + } + }; + let stream_error_score = |e: &StreamError| { + use crate::error::ErrorKind; + match e.kind() { + ErrorKind::Internal(_) => 1000, + ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), + _ => 3000, + } + }; + let cmp_stream_error = |a: &StreamError, b: &StreamError| -> Ordering { + stream_error_score(a).cmp(&stream_error_score(b)) + }; + actor_errors + .into_iter() + .sorted_by(cmp_stream_error) + .next_back() + // change to {:#?} to include backtrace + .map(|e| format!("{:#}", e)) + .unwrap_or_default() +} + #[cfg(test)] pub mod test_utils { use risingwave_pb::common::HostAddress; From 54e642e7fcb803daf0ce0bda8107913dc5e7c0fd Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 21 Nov 2023 15:24:56 +0800 Subject: [PATCH 02/20] refactor compute node side --- proto/stream_service.proto | 1 - src/compute/src/rpc/service/stream_service.rs | 20 ++++++++++---- src/meta/src/barrier/recovery.rs | 10 +------ src/stream/src/task/barrier_manager.rs | 19 ++++++++++---- .../src/task/barrier_manager/managed_state.rs | 4 +++ src/stream/src/task/stream_manager.rs | 26 +++++++++---------- 6 files changed, 47 insertions(+), 33 deletions(-) diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 81949d93a2d5d..d7b3edd3fe7d4 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -51,7 +51,6 @@ message ForceStopActorsRequest { message ForceStopActorsResponse { string request_id = 1; common.Status status = 2; - string previous_actor_failure_cause = 3; } message InjectBarrierRequest { diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index b03be418c86f6..a6350266eebe3 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::iter; use std::sync::Arc; use await_tree::InstrumentAwait; @@ -27,7 +28,9 @@ use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; use risingwave_stream::executor::Barrier; -use risingwave_stream::task::{CollectResult, LocalStreamManager, StreamEnvironment}; +use risingwave_stream::task::{ + try_find_root_cause, CollectResult, LocalStreamManager, StreamEnvironment, +}; use tonic::{Code, Request, Response, Status}; use tracing::Instrument; @@ -124,12 +127,11 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - let previous_actor_failure_cause = self.mgr.stop_all_actors().await; + self.mgr.stop_all_actors().await; self.env.dml_manager_ref().clear(); Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, status: None, - previous_actor_failure_cause, })) } @@ -166,7 +168,11 @@ impl StreamService for StreamServiceImpl { self.mgr .send_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect) - .await?; + .await + .map_err(|e| { + let actor_failures = self.mgr.take_actor_failures(); + try_find_root_cause(actor_failures.into_iter().chain(iter::once(e))).unwrap() + })?; Ok(Response::new(InjectBarrierResponse { request_id: req.request_id, @@ -188,7 +194,11 @@ impl StreamService for StreamServiceImpl { .collect_barrier(req.prev_epoch) .instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch)) .await - .inspect_err(|err| tracing::error!("failed to collect barrier: {}", err))?; + .inspect_err(|err| tracing::error!("failed to collect barrier: {}", err)) + .map_err(|e| { + let actor_failures = self.mgr.take_actor_failures(); + try_find_root_cause(actor_failures.into_iter().chain(iter::once(e))).unwrap() + })?; let synced_sstables = match kind { BarrierKind::Unspecified => unreachable!(), diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index a133b41a91252..fc18ba4fbb612 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -663,17 +663,9 @@ impl GlobalBarrierManager { request_id: Uuid::new_v4().to_string(), }) .await - .map(|resp| (worker_node.id, resp)) }); - let results = try_join_all(futures).await?; - for (worker_id, result) in results { - tracing::info!( - "cause of previous actor failure in worker node {}: {}", - worker_id, - result.previous_actor_failure_cause - ); - } + try_join_all(futures).await?; debug!("all compute nodes have been reset."); Ok(()) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 2e634b95f4369..98896b2024a28 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -210,17 +210,17 @@ impl LocalBarrierManager { } /// Reset all internal states. - pub fn reset(&mut self) -> Vec { + pub fn reset(&mut self) { self.senders.clear(); self.collect_complete_receiver.clear(); match &mut self.state { #[cfg(test)] - BarrierState::Local => { - vec![] - } + BarrierState::Local => {} - BarrierState::Managed(managed_state) => managed_state.clear_all_states(), + BarrierState::Managed(managed_state) => { + managed_state.clear_all_states(); + } } } @@ -249,6 +249,15 @@ impl LocalBarrierManager { } } } + + pub fn take_actor_failures(&mut self) -> Vec { + match &mut self.state { + #[cfg(test)] + BarrierState::Local => vec![], + + BarrierState::Managed(managed_state) => managed_state.take_actor_failures(), + } + } } #[cfg(test)] diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index bf98c22ccaca8..20d96bd3a82ab 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -316,6 +316,10 @@ impl ManagedBarrierState { Ok(()) } + + pub fn take_actor_failures(&mut self) -> Vec { + self.failure_actors.drain().map(|(_k, v)| v).collect() + } } #[cfg(test)] diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 481c3f49bc7b2..029320273350e 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -13,7 +13,6 @@ // limitations under the License. use core::time::Duration; -use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::io::Write; @@ -267,8 +266,8 @@ impl LocalStreamManager { } /// Reset the state of the barrier manager. - pub fn reset_barrier_manager(&self) -> Vec { - self.context.lock_barrier_manager().reset() + pub fn reset_barrier_manager(&self) { + self.context.lock_barrier_manager().reset(); } /// Use `epoch` to find collect rx. And wait for all actor to be collected before @@ -348,13 +347,10 @@ impl LocalStreamManager { } /// Force stop all actors on this worker, and then drop their resources. - /// Returns the root cause of previous actor failure. - pub async fn stop_all_actors(&self) -> String { + pub async fn stop_all_actors(&self) { self.core.lock().await.stop_all_actors().await; - let actor_errors = self.reset_barrier_manager(); // Clear shared buffer in storage to release memory self.clear_storage_buffer().await; - try_find_root_cause(actor_errors) } pub async fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult { @@ -400,6 +396,11 @@ impl LocalStreamManager { pub fn total_mem_usage(&self) -> usize { self.total_mem_val.get() as usize } + + pub fn take_actor_failures(&self) -> Vec { + let mut barrier_manager = self.context.lock_barrier_manager(); + barrier_manager.take_actor_failures() + } } impl LocalStreamManagerCore { @@ -891,8 +892,10 @@ impl LocalStreamManagerCore { } } -/// Tries to find the root cause of last actor failures, based on hard-coded rules. -fn try_find_root_cause(actor_errors: Vec) -> String { +/// Tries to find the root cause of previous actor failures, based on hard-coded rules. +pub fn try_find_root_cause( + actor_errors: impl IntoIterator, +) -> Option { let stream_executor_error_score = |e: &StreamExecutorError| { use crate::executor::error::ErrorKind; match e.kind() { @@ -909,16 +912,13 @@ fn try_find_root_cause(actor_errors: Vec) -> String { _ => 3000, } }; - let cmp_stream_error = |a: &StreamError, b: &StreamError| -> Ordering { + let cmp_stream_error = |a: &StreamError, b: &StreamError| -> core::cmp::Ordering { stream_error_score(a).cmp(&stream_error_score(b)) }; actor_errors .into_iter() .sorted_by(cmp_stream_error) .next_back() - // change to {:#?} to include backtrace - .map(|e| format!("{:#}", e)) - .unwrap_or_default() } #[cfg(test)] From dfd3e24c4f484b316ac586d3e9ca3a2b15e36551 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 21 Nov 2023 17:46:15 +0800 Subject: [PATCH 03/20] refactor meta node side --- src/compute/src/rpc/service/stream_service.rs | 2 + src/meta/src/barrier/mod.rs | 67 ++++++++++++++++--- src/stream/src/task/stream_manager.rs | 2 +- 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index a6350266eebe3..fb4ed6e6188ed 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -170,6 +170,7 @@ impl StreamService for StreamServiceImpl { .send_barrier(&barrier, req.actor_ids_to_send, req.actor_ids_to_collect) .await .map_err(|e| { + // There is no guarantee all actor have reported failure, including the one that could potentially be the root cause. let actor_failures = self.mgr.take_actor_failures(); try_find_root_cause(actor_failures.into_iter().chain(iter::once(e))).unwrap() })?; @@ -196,6 +197,7 @@ impl StreamService for StreamServiceImpl { .await .inspect_err(|err| tracing::error!("failed to collect barrier: {}", err)) .map_err(|e| { + // There is no guarantee all actor have reported failure, including the one that could potentially be the root cause. let actor_failures = self.mgr.take_actor_failures(); try_find_root_cause(actor_failures.into_iter().chain(iter::once(e))).unwrap() })?; diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index c0752bd484055..5f8b268673e06 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::time::Duration; use fail::fail_point; -use futures::future::try_join_all; +use futures::future::join_all; use itertools::Itertools; use prometheus::HistogramTimer; use risingwave_common::bail; @@ -39,6 +39,7 @@ use risingwave_pb::stream_plan::Barrier; use risingwave_pb::stream_service::{ BarrierCompleteRequest, BarrierCompleteResponse, InjectBarrierRequest, }; +use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClientPoolRef; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::{Receiver, Sender}; @@ -832,7 +833,12 @@ impl GlobalBarrierManager { passed_actors: vec![], }; async move { - let client = self.env.stream_client_pool().get(node).await?; + let client = self + .env + .stream_client_pool() + .get(node) + .await + .map_err(|e| (*node_id, e))?; let request = InjectBarrierRequest { request_id, @@ -846,12 +852,29 @@ impl GlobalBarrierManager { ); // This RPC returns only if this worker node has injected this barrier. - client.inject_barrier(request).await + client + .inject_barrier(request) + .await + .map_err(|e| (*node_id, e)) } .into() } }); - try_join_all(inject_futures).await?; + // join_all rather than try_join_all, in order to gather errors from all compute nodes. + let inject_results = join_all(inject_futures).await; + let mut errors = inject_results + .into_iter() + .filter_map(|r| match r { + Ok(_) => None, + Err(e) => Some(e), + }) + .peekable(); + if errors.peek().is_some() { + return Err(merge_compute_node_rpc_error( + "inject barrier failure", + errors, + )); + } Ok(node_need_collect) } @@ -876,7 +899,7 @@ impl GlobalBarrierManager { let request_id = Uuid::new_v4().to_string(); let tracing_context = tracing_context.clone(); async move { - let client = client_pool.get(node).await?; + let client = client_pool.get(node).await.map_err(|e| (*node_id, e))?; let request = BarrierCompleteRequest { request_id, prev_epoch, @@ -888,13 +911,29 @@ impl GlobalBarrierManager { ); // This RPC returns only if this worker node has collected this barrier. - client.barrier_complete(request).await + client + .barrier_complete(request) + .await + .map_err(|e| (*node_id, e)) } .into() } }); - let result = try_join_all(collect_futures).await.map_err(Into::into); + // join_all rather than try_join_all, in order to gather errors from all compute nodes. + let collect_result = join_all(collect_futures).await; + let result = if collect_result.iter().all(|r| r.is_ok()) { + Ok(collect_result.into_iter().map(|r| r.unwrap()).collect_vec()) + } else { + let errors = collect_result.into_iter().filter_map(|r| match r { + Ok(_) => None, + Err(e) => Some(e), + }); + Err(merge_compute_node_rpc_error( + "collect barrier failure", + errors, + )) + }; let _ = barrier_complete_tx .send(BarrierCompletion { prev_epoch, result }) .inspect_err(|err| tracing::warn!("failed to complete barrier: {err}")); @@ -927,7 +966,7 @@ impl GlobalBarrierManager { // back to frontend fail_point!("inject_barrier_err_success"); let fail_node = checkpoint_control.barrier_failed(); - tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err); + tracing::warn!("Failed to complete epoch {}: {}", prev_epoch, err); self.failure_recovery(err, fail_node, state, checkpoint_control) .await; return; @@ -1195,3 +1234,15 @@ fn collect_synced_ssts( } (sst_to_worker, synced_ssts) } + +fn merge_compute_node_rpc_error( + message: &str, + errors: impl IntoIterator, +) -> MetaError { + use std::fmt::Write; + let concat: String = errors.into_iter().fold(String::new(), |mut s, (w, e)| { + write!(&mut s, " worker {}, {};", w, e).unwrap(); + s + }); + anyhow::anyhow!(format!("{}:{}", message, concat)).into() +} diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 029320273350e..45109f5d33fb6 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -892,7 +892,7 @@ impl LocalStreamManagerCore { } } -/// Tries to find the root cause of previous actor failures, based on hard-coded rules. +/// Tries to find the root cause of previous failures, based on hard-coded rules. pub fn try_find_root_cause( actor_errors: impl IntoIterator, ) -> Option { From 325cc835e7a9f430202c46e55f59d81f11a5ee8f Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 21 Nov 2023 20:22:05 +0800 Subject: [PATCH 04/20] fix --- src/stream/src/task/barrier_manager/managed_state.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 20d96bd3a82ab..9bf5725503757 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -157,13 +157,9 @@ impl ManagedBarrierState { } /// Clear and reset all states. - pub(crate) fn clear_all_states(&mut self) -> Vec { + pub(crate) fn clear_all_states(&mut self) { tracing::debug!("clear all states in local barrier manager"); - let errors = std::mem::take(&mut self.failure_actors) - .into_values() - .collect(); *self = Self::new(self.state_store.clone()); - errors } /// Notify unexpected actor exit with given `actor_id`. From 1629bcd80d3eb5eb879abf3765770597c8a408e0 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 23 Nov 2023 16:15:18 +0800 Subject: [PATCH 05/20] fix --- src/meta/src/barrier/mod.rs | 36 +++++++++++++++------------ src/meta/src/rpc/ddl_controller.rs | 6 +++-- src/stream/src/task/stream_manager.rs | 1 + 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 38fb27993cc52..0a1e7820c719a 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -20,6 +20,7 @@ use std::ops::Deref; use std::sync::Arc; use std::time::Duration; +use either::Either; use fail::fail_point; use futures::future::join_all; use itertools::Itertools; @@ -864,10 +865,7 @@ impl GlobalBarrierManager { let inject_results = join_all(inject_futures).await; let mut errors = inject_results .into_iter() - .filter_map(|r| match r { - Ok(_) => None, - Err(e) => Some(e), - }) + .filter_map(Result::err) .peekable(); if errors.peek().is_some() { return Err(merge_compute_node_rpc_error( @@ -922,16 +920,17 @@ impl GlobalBarrierManager { // join_all rather than try_join_all, in order to gather errors from all compute nodes. let collect_result = join_all(collect_futures).await; - let result = if collect_result.iter().all(|r| r.is_ok()) { - Ok(collect_result.into_iter().map(|r| r.unwrap()).collect_vec()) - } else { - let errors = collect_result.into_iter().filter_map(|r| match r { - Ok(_) => None, - Err(e) => Some(e), + let (successes, failures): (Vec<_>, Vec<_>) = + collect_result.into_iter().partition_map(|r| match r { + Ok(v) => Either::Left(v), + Err(v) => Either::Right(v), }); + let result = if failures.is_empty() { + Ok(successes) + } else { Err(merge_compute_node_rpc_error( "collect barrier failure", - errors, + failures, )) }; let _ = barrier_complete_tx @@ -1233,9 +1232,14 @@ fn merge_compute_node_rpc_error( errors: impl IntoIterator, ) -> MetaError { use std::fmt::Write; - let concat: String = errors.into_iter().fold(String::new(), |mut s, (w, e)| { - write!(&mut s, " worker {}, {};", w, e).unwrap(); - s - }); - anyhow::anyhow!(format!("{}:{}", message, concat)).into() + + use thiserror_ext::AsReport; + + let concat: String = errors + .into_iter() + .fold(format!("{message}:"), |mut s, (w, e)| { + write!(&mut s, " worker node {}, {};", w, e.as_report()).unwrap(); + s + }); + anyhow::anyhow!(concat).into() } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index f421b7cadc31b..dfbc3fb879692 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -769,9 +769,11 @@ impl DdlController { &self, stream_job: &StreamingJob, internal_tables: Vec, - error: Option<&impl ToString>, + error: Option<&impl thiserror_ext::AsReport>, ) -> MetaResult<()> { - let error = error.map(ToString::to_string).unwrap_or_default(); + let error = error + .map(thiserror_ext::AsReport::to_report_string) + .unwrap_or_default(); let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { id: stream_job.id(), name: stream_job.name(), diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 9841400b0e9df..b216a09a30a08 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -344,6 +344,7 @@ impl LocalStreamManager { /// Force stop all actors on this worker, and then drop their resources. pub async fn stop_all_actors(&self) { self.core.lock().await.stop_all_actors().await; + self.reset_barrier_manager(); // Clear shared buffer in storage to release memory self.clear_storage_buffer().await; } From 94cd65e04501690bf8d34e8fbc2197b11cdad824 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 12 Jan 2024 14:50:27 +0800 Subject: [PATCH 06/20] minor refactor --- src/meta/src/barrier/mod.rs | 7 +++---- src/stream/src/task/stream_manager.rs | 8 +------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 0a1e7820c719a..10694e41f36e5 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -75,6 +75,8 @@ mod recovery; mod schedule; mod trace; +use thiserror_ext::AsReport; + pub use self::command::{Command, Reschedule}; pub use self::schedule::BarrierScheduler; pub use self::trace::TracedEpoch; @@ -965,7 +967,7 @@ impl GlobalBarrierManager { // back to frontend fail_point!("inject_barrier_err_success"); let fail_node = checkpoint_control.barrier_failed(); - tracing::warn!("Failed to complete epoch {}: {}", prev_epoch, err); + tracing::warn!(prev_epoch, error = %err.as_report(), "failed to complete epoch"); self.failure_recovery(err, fail_node, state, checkpoint_control) .await; return; @@ -1232,9 +1234,6 @@ fn merge_compute_node_rpc_error( errors: impl IntoIterator, ) -> MetaError { use std::fmt::Write; - - use thiserror_ext::AsReport; - let concat: String = errors .into_iter() .fold(format!("{message}:"), |mut s, (w, e)| { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index b216a09a30a08..c68c24931784f 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -910,13 +910,7 @@ pub fn try_find_root_cause( _ => 3000, } }; - let cmp_stream_error = |a: &StreamError, b: &StreamError| -> core::cmp::Ordering { - stream_error_score(a).cmp(&stream_error_score(b)) - }; - actor_errors - .into_iter() - .sorted_by(cmp_stream_error) - .next_back() + actor_errors.into_iter().max_by_key(stream_error_score) } #[cfg(test)] From 63b0d4fcfccdf103cd2fc131000c5413737550f7 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 15 Jan 2024 11:49:33 +0800 Subject: [PATCH 07/20] revert try_find_root_cause --- src/compute/src/rpc/service/stream_service.rs | 2 +- src/stream/src/executor/mod.rs | 2 +- src/stream/src/task/barrier_manager.rs | 9 ------ src/stream/src/task/stream_manager.rs | 30 +------------------ 4 files changed, 3 insertions(+), 40 deletions(-) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 22da99e58a414..7003602c7c49b 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -125,7 +125,7 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - self.mgr.stop_all_actors().await?; + self.mgr.stop_all_actors().await; self.env.dml_manager_ref().clear(); Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 8ea51f0f2fda3..96244cabf0b13 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -63,7 +63,7 @@ mod dedup; mod dispatch; pub mod dml; mod dynamic_filter; -pub(crate) mod error; +mod error; mod expand; mod filter; mod flow_control; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 3b375109b5c50..b706d7a5c3537 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -314,15 +314,6 @@ impl LocalBarrierWorker { } } } - - pub fn take_actor_failures(&mut self) -> Vec { - match &mut self.state { - #[cfg(test)] - BarrierState::Local => vec![], - - BarrierState::Managed(managed_state) => managed_state.take_actor_failures(), - } - } } #[derive(Clone)] diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 301c33fcfbca4..9d90c038d08cb 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -43,7 +43,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use super::{unique_executor_id, unique_operator_id, CollectResult}; -use crate::error::{StreamError, StreamResult}; +use crate::error::StreamResult; use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; @@ -362,11 +362,6 @@ impl LocalStreamManager { pub fn total_mem_usage(&self) -> usize { self.total_mem_val.get() as usize } - - pub fn take_actor_failures(&self) -> Vec { - let mut barrier_manager = self.context.lock_barrier_manager(); - barrier_manager.take_actor_failures() - } } impl LocalStreamManagerCore { @@ -860,29 +855,6 @@ impl LocalStreamManagerCore { } } -/// Tries to find the root cause of previous failures, based on hard-coded rules. -pub fn try_find_root_cause( - actor_errors: impl IntoIterator, -) -> Option { - let stream_executor_error_score = |e: &StreamExecutorError| { - use crate::executor::error::ErrorKind; - match e.kind() { - ErrorKind::ChannelClosed(_) => 0, - ErrorKind::Internal(_) => 1, - _ => 999, - } - }; - let stream_error_score = |e: &StreamError| { - use crate::error::ErrorKind; - match e.kind() { - ErrorKind::Internal(_) => 1000, - ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), - _ => 3000, - } - }; - actor_errors.into_iter().max_by_key(stream_error_score) -} - #[cfg(test)] pub mod test_utils { use risingwave_pb::common::HostAddress; From 3e7382d60c884086da2744509eb392916c01bc04 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 15 Jan 2024 12:10:51 +0800 Subject: [PATCH 08/20] resolve conflict --- src/meta/src/barrier/mod.rs | 3 +- src/meta/src/barrier/rpc.rs | 95 +++++++++++++++++++++++++++++-------- 2 files changed, 78 insertions(+), 20 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index fc84edd77417f..bf7aa9a013bbd 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -38,6 +38,7 @@ use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::PausedReason; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::BarrierCompleteResponse; +use thiserror_ext::AsReport; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::Mutex; use tokio::task::JoinHandle; @@ -856,7 +857,7 @@ impl GlobalBarrierManager { // back to frontend fail_point!("inject_barrier_err_success"); let fail_node = self.checkpoint_control.barrier_failed(); - tracing::warn!("Failed to complete epoch {}: {:?}", prev_epoch, err); + tracing::warn!(prev_epoch, error = %err.as_report(), "Failed to complete epoch"); self.failure_recovery(err, fail_node).await; return; } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index e79ffadf3d991..454508d7bde09 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -18,16 +18,19 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::anyhow; +use either::Either; use fail::fail_point; -use futures::future::try_join_all; +use futures::future::join_all; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; +use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::pending_on_none; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; use risingwave_pb::stream_service::{BarrierCompleteRequest, InjectBarrierRequest}; +use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClientPoolRef; use tokio::sync::oneshot; use uuid::Uuid; @@ -35,7 +38,7 @@ use uuid::Uuid; use super::command::CommandContext; use super::{BarrierCompletion, GlobalBarrierManagerContext}; use crate::manager::{MetaSrvEnv, WorkerId}; -use crate::MetaResult; +use crate::{MetaError, MetaResult}; pub(super) struct BarrierRpcManager { context: GlobalBarrierManagerContext, @@ -157,7 +160,26 @@ impl GlobalBarrierManagerContext { .into() } }); - try_join_all(inject_futures).await.inspect_err(|e| { + + // join_all rather than try_join_all, in order to gather errors from all compute nodes. + let inject_results = join_all(inject_futures).await; + let mut errors = inject_results + .into_iter() + .zip_eq_debug(node_need_collect.iter().filter_map(|(id, v)| { + if *v { + return Some(*id); + } + None + })) + .filter_map(|(result, id)| { + if let Err(e) = result { + return Some((id, e)); + } + None + }) + .peekable(); + if errors.peek().is_some() { + let e = merge_compute_node_rpc_error("inject barrier failure", errors); // Record failure in event log. use risingwave_pb::meta::event_log; use thiserror_ext::AsReport; @@ -169,7 +191,9 @@ impl GlobalBarrierManagerContext { self.env .event_log_manager_ref() .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); - })?; + return Err(e); + } + drop(errors); Ok(node_need_collect) } @@ -213,23 +237,56 @@ impl GlobalBarrierManagerContext { } }); - let result = try_join_all(collect_futures) - .await - .inspect_err(|e| { - // Record failure in event log. - use risingwave_pb::meta::event_log; - use thiserror_ext::AsReport; - let event = event_log::EventCollectBarrierFail { - prev_epoch: command_context.prev_epoch.value().0, - cur_epoch: command_context.curr_epoch.value().0, - error: e.to_report_string(), - }; - env.event_log_manager_ref() - .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); - }) - .map_err(Into::into); + // join_all rather than try_join_all, in order to gather errors from all compute nodes. + let collect_result = join_all(collect_futures).await; + let (successes, failures): (Vec<_>, Vec<_>) = collect_result + .into_iter() + .zip_eq_debug(node_need_collect.iter().filter_map(|(id, v)| { + if *v { + return Some(*id); + } + None + })) + .partition_map(|(r, id)| match r { + Ok(v) => Either::Left(v), + Err(e) => Either::Right((id, e)), + }); + let result = if failures.is_empty() { + Ok(successes) + } else { + let e = merge_compute_node_rpc_error("collect barrier failure", failures); + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::EventCollectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + env.event_log_manager_ref() + .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); + Err(e) + }; + let _ = barrier_complete_tx .send(BarrierCompletion { prev_epoch, result }) .inspect_err(|_| tracing::warn!(prev_epoch, "failed to notify barrier completion")); } } + +fn merge_compute_node_rpc_error( + message: &str, + errors: impl IntoIterator, +) -> MetaError { + use std::fmt::Write; + + use thiserror_ext::AsReport; + + let concat: String = errors + .into_iter() + .fold(format!("{message}:"), |mut s, (w, e)| { + write!(&mut s, " worker node {}, {};", w, e.as_report()).unwrap(); + s + }); + anyhow::anyhow!(concat).into() +} From a40320d41f8bf7f01a5581cc1e11809d14845d2e Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sat, 17 Feb 2024 13:52:58 +0800 Subject: [PATCH 09/20] Merge branch 'main' into wangzheng/recovery_cause # Conflicts: # src/compute/src/rpc/service/stream_service.rs # src/meta/src/barrier/mod.rs # src/meta/src/barrier/rpc.rs # src/stream/src/task/stream_manager.rs --- src/meta/src/barrier/rpc.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 625e66f6b40ba..db4910a9a449e 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -17,7 +17,6 @@ use std::future::Future; use std::sync::Arc; use anyhow::anyhow; -use either::Either; use fail::fail_point; use futures::future::try_join_all; use futures::stream::FuturesUnordered; @@ -25,7 +24,6 @@ use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::ActorId; -use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_common::util::tracing::TracingContext; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor}; @@ -394,6 +392,7 @@ impl StreamRpcManager { } } +#[allow(dead_code)] fn merge_compute_node_rpc_error( message: &str, errors: impl IntoIterator, From 3b3287d6ff9e1cad504155a926689f84caf3f83e Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sat, 17 Feb 2024 11:54:57 +0800 Subject: [PATCH 10/20] use join_all with timeout on error --- src/meta/src/barrier/rpc.rs | 54 +++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index db4910a9a449e..28e1183f51e16 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -15,12 +15,13 @@ use std::collections::HashMap; use std::future::Future; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use fail::fail_point; -use futures::future::try_join_all; +use futures::future::{select, Either}; use futures::stream::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::bail; use risingwave_common::hash::ActorId; @@ -268,11 +269,49 @@ impl StreamRpcManager { ) -> MetaResult> { let pool = self.env.stream_client_pool(); let f = &f; - Ok(try_join_all(request.map(|(node, input)| async move { - let client = pool.get(node).await?; - f(client, input).await - })) - .await?) + let iters = request.map(|(node, input)| async move { + let client = pool.get(node).await.map_err(|e| (node.id, e))?; + f(client, input).await.map_err(|e| (node.id, e)) + }); + + let stream = FuturesUnordered::from_iter(iters); + pin_mut!(stream); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let mut results_ok = vec![]; + let mut results_err = vec![]; + let mut is_err_timeout = false; + loop { + let rx = rx.recv(); + pin_mut!(rx); + match select(rx, stream.next()).await { + Either::Left((_, _)) => { + break; + } + Either::Right((None, _)) => { + break; + } + Either::Right((Some(Ok(rsp)), _)) => { + results_ok.push(rsp); + } + Either::Right((Some(Err(err)), _)) => { + results_err.push(err); + if is_err_timeout { + continue; + } + is_err_timeout = true; + let tx = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(5)).await; + let _ = tx.send(()); + }); + } + } + } + if results_err.is_empty() { + return Ok(results_ok); + } + let merged_error = merge_compute_node_rpc_error("merged RPC Error", results_err); + Err(merged_error) } async fn broadcast> + 'static>( @@ -392,7 +431,6 @@ impl StreamRpcManager { } } -#[allow(dead_code)] fn merge_compute_node_rpc_error( message: &str, errors: impl IntoIterator, From 82106cafee536c31564b68cff612f48f1075bc06 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sun, 18 Feb 2024 19:05:30 +0800 Subject: [PATCH 11/20] fmt --- src/meta/src/barrier/rpc.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 28e1183f51e16..3eb3794c3ac88 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -274,6 +274,7 @@ impl StreamRpcManager { f(client, input).await.map_err(|e| (node.id, e)) }); + // similar to join_all, but return early if a timeout occurs since the first error. let stream = FuturesUnordered::from_iter(iters); pin_mut!(stream); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); @@ -296,12 +297,12 @@ impl StreamRpcManager { Either::Right((Some(Err(err)), _)) => { results_err.push(err); if is_err_timeout { - continue; + continue; } is_err_timeout = true; let tx = tx.clone(); tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(3)).await; let _ = tx.send(()); }); } From 9a6aeb69682d8655990dba75ae9b05fa427402af Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sun, 18 Feb 2024 23:44:03 +0800 Subject: [PATCH 12/20] refactor --- src/meta/src/barrier/rpc.rs | 87 ++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 36 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 3eb3794c3ac88..fd16fffbface6 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -274,44 +274,12 @@ impl StreamRpcManager { f(client, input).await.map_err(|e| (node.id, e)) }); - // similar to join_all, but return early if a timeout occurs since the first error. - let stream = FuturesUnordered::from_iter(iters); - pin_mut!(stream); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let mut results_ok = vec![]; - let mut results_err = vec![]; - let mut is_err_timeout = false; - loop { - let rx = rx.recv(); - pin_mut!(rx); - match select(rx, stream.next()).await { - Either::Left((_, _)) => { - break; - } - Either::Right((None, _)) => { - break; - } - Either::Right((Some(Ok(rsp)), _)) => { - results_ok.push(rsp); - } - Either::Right((Some(Err(err)), _)) => { - results_err.push(err); - if is_err_timeout { - continue; - } - is_err_timeout = true; - let tx = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(3)).await; - let _ = tx.send(()); - }); - } - } - } + let (results_ok, results_err) = + try_join_all_with_error_timeout(iters, Duration::from_secs(3)).await; if results_err.is_empty() { return Ok(results_ok); } - let merged_error = merge_compute_node_rpc_error("merged RPC Error", results_err); + let merged_error = merge_node_rpc_errors("merged RPC Error", results_err); Err(merged_error) } @@ -432,7 +400,54 @@ impl StreamRpcManager { } } -fn merge_compute_node_rpc_error( +/// This function is similar to `try_join_all`, but it attempts to collect many error as possible within `error_timeout`. +async fn try_join_all_with_error_timeout( + iters: I, + error_timeout: Duration, +) -> (Vec, Vec) +where + I: IntoIterator, + F: Future>, +{ + let stream = FuturesUnordered::from_iter(iters); + pin_mut!(stream); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let mut results_ok = vec![]; + let mut results_err = vec![]; + let mut is_err_timeout = false; + loop { + let rx = rx.recv(); + pin_mut!(rx); + match select(rx, stream.next()).await { + Either::Left((_, _)) => { + // error_timeout + break; + } + Either::Right((None, _)) => { + break; + } + Either::Right((Some(Ok(rsp)), _)) => { + results_ok.push(rsp); + } + Either::Right((Some(Err(err)), _)) => { + results_err.push(err); + if is_err_timeout { + continue; + } + // Start `error_timeout` when the first error occurs. + is_err_timeout = true; + let tx = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(error_timeout).await; + let _ = tx.send(()); + }); + } + } + } + (results_ok, results_err) +} + +fn merge_node_rpc_errors( message: &str, errors: impl IntoIterator, ) -> MetaError { From 11f688df36982f45d72d29c433889fe800da1483 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 19 Feb 2024 14:12:23 +0800 Subject: [PATCH 13/20] refactor --- src/meta/src/barrier/rpc.rs | 59 ++++++++++++++----------------------- 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index fd16fffbface6..6447b2a5e9775 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -19,7 +19,6 @@ use std::time::Duration; use anyhow::anyhow; use fail::fail_point; -use futures::future::{select, Either}; use futures::stream::FuturesUnordered; use futures::{pin_mut, FutureExt, StreamExt}; use itertools::Itertools; @@ -36,6 +35,7 @@ use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; use rw_futures_util::pending_on_none; use tokio::sync::oneshot; +use tokio::time::timeout; use uuid::Uuid; use super::command::CommandContext; @@ -273,14 +273,8 @@ impl StreamRpcManager { let client = pool.get(node).await.map_err(|e| (node.id, e))?; f(client, input).await.map_err(|e| (node.id, e)) }); - - let (results_ok, results_err) = - try_join_all_with_error_timeout(iters, Duration::from_secs(3)).await; - if results_err.is_empty() { - return Ok(results_ok); - } - let merged_error = merge_node_rpc_errors("merged RPC Error", results_err); - Err(merged_error) + let result = try_join_all_with_error_timeout(iters, Duration::from_secs(3)).await; + result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err)) } async fn broadcast> + 'static>( @@ -400,51 +394,42 @@ impl StreamRpcManager { } } -/// This function is similar to `try_join_all`, but it attempts to collect many error as possible within `error_timeout`. +/// This function is similar to `try_join_all`, but it attempts to collect as many error as possible within `error_timeout`. async fn try_join_all_with_error_timeout( iters: I, error_timeout: Duration, -) -> (Vec, Vec) +) -> Result, Vec> where I: IntoIterator, F: Future>, { let stream = FuturesUnordered::from_iter(iters); pin_mut!(stream); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let mut results_ok = vec![]; let mut results_err = vec![]; - let mut is_err_timeout = false; - loop { - let rx = rx.recv(); - pin_mut!(rx); - match select(rx, stream.next()).await { - Either::Left((_, _)) => { - // error_timeout - break; - } - Either::Right((None, _)) => { - break; - } - Either::Right((Some(Ok(rsp)), _)) => { + while let Some(result) = stream.next().await { + match result { + Ok(rsp) => { results_ok.push(rsp); } - Either::Right((Some(Err(err)), _)) => { + Err(err) => { results_err.push(err); - if is_err_timeout { - continue; - } - // Start `error_timeout` when the first error occurs. - is_err_timeout = true; - let tx = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(error_timeout).await; - let _ = tx.send(()); - }); + break; } } } - (results_ok, results_err) + if results_err.is_empty() { + return Ok(results_ok); + } + let _ = timeout(error_timeout, async { + while let Some(result) = stream.next().await { + if let Err(err) = result { + results_err.push(err); + } + } + }) + .await; + Err(results_err) } fn merge_node_rpc_errors( From 936be263d4759cc268365e2499aea7a1f4055e7f Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 19 Feb 2024 16:01:44 +0800 Subject: [PATCH 14/20] try_find_root_actor_failure in compute node --- src/compute/src/rpc/service/stream_service.rs | 29 +++++++++++--- src/stream/src/executor/mod.rs | 2 +- src/stream/src/task/barrier_manager.rs | 17 ++++++++- src/stream/src/task/stream_manager.rs | 38 ++++++++++++++++++- 4 files changed, 76 insertions(+), 10 deletions(-) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index def9a534586bb..d6452703fe804 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use await_tree::InstrumentAwait; use itertools::Itertools; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; @@ -36,6 +38,14 @@ impl StreamServiceImpl { pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self { StreamServiceImpl { mgr, env } } + + async fn try_get_root_actor_failure(&self) -> Option { + tokio::time::sleep(Duration::from_secs(3)).await; + self.mgr + .local_barrier_manager + .try_get_root_actor_failure() + .await + } } #[async_trait::async_trait] @@ -132,9 +142,13 @@ impl StreamService for StreamServiceImpl { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?; - self.mgr + if let Err(e) = self + .mgr .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) - .await?; + .await + { + return Err(self.try_get_root_actor_failure().await.unwrap_or(e).into()); + } Ok(Response::new(InjectBarrierResponse { request_id: req.request_id, @@ -151,14 +165,17 @@ impl StreamService for StreamServiceImpl { let BarrierCompleteResult { create_mview_progress, sync_result, - } = self + } = match self .mgr .collect_barrier(req.prev_epoch) .instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch)) .await - .inspect_err( - |err| tracing::error!(error = %err.as_report(), "failed to collect barrier"), - )?; + { + Ok(result) => result, + Err(e) => { + return Err(self.try_get_root_actor_failure().await.unwrap_or(e).into()); + } + }; let (synced_sstables, table_watermarks) = sync_result .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 40ae252c03cf0..38ce0d15d1a17 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -63,7 +63,7 @@ mod dedup; mod dispatch; pub mod dml; mod dynamic_filter; -mod error; +pub mod error; mod expand; mod filter; mod flow_control; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b838314729ad3..a43bc9a1f7ede 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -102,6 +102,9 @@ pub(super) enum LocalBarrierEvent { actors: Vec, result_sender: oneshot::Sender>, }, + TryGetRootActorFailure { + result_sender: oneshot::Sender>, + }, #[cfg(test)] Flush(oneshot::Sender<()>), } @@ -187,7 +190,7 @@ pub(super) struct LocalBarrierWorker { state: ManagedBarrierState, /// Record all unexpected exited actors. - failure_actors: HashMap, + pub(super) failure_actors: HashMap, epoch_result_sender: HashMap>>, @@ -308,6 +311,9 @@ impl LocalBarrierWorker { actors, result_sender, } => self.start_create_actors(&actors, result_sender), + LocalBarrierEvent::TryGetRootActorFailure { result_sender } => { + self.try_get_root_actor_failure(result_sender) + } } } } @@ -593,6 +599,15 @@ impl LocalBarrierManager { .await? } + pub async fn try_get_root_actor_failure(&self) -> Option { + self.send_and_await(|result_sender| LocalBarrierEvent::TryGetRootActorFailure { + result_sender, + }) + .await + .ok() + .flatten() + } + /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. pub fn collect(&self, actor_id: ActorId, barrier: &Barrier) { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5a2bde99da491..9f7016dfc391e 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -41,7 +41,7 @@ use tokio::sync::oneshot; use tokio::task::JoinHandle; use super::{unique_executor_id, unique_operator_id, BarrierCompleteResult}; -use crate::error::StreamResult; +use crate::error::{StreamError, StreamResult}; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; use crate::executor::*; @@ -67,7 +67,7 @@ pub struct LocalStreamManager { context: Arc, - local_barrier_manager: LocalBarrierManager, + pub local_barrier_manager: LocalBarrierManager, } /// Report expression evaluation errors to the actor context. @@ -342,6 +342,14 @@ impl LocalBarrierWorker { .creating_actors .push(AttachedFuture::new(join_handle, result_sender)); } + + pub(super) fn try_get_root_actor_failure( + &mut self, + result_sender: oneshot::Sender>, + ) { + let result = try_find_root_actor_failure(self.failure_actors.values()); + let _ = result_sender.send(result); + } } impl StreamActorManager { @@ -746,6 +754,32 @@ impl StreamActorManagerState { } } +/// Tries to find the root cause of actor failures, based on hard-coded rules. +pub fn try_find_root_actor_failure<'a>( + actor_errors: impl IntoIterator, +) -> Option { + let stream_executor_error_score = |e: &StreamExecutorError| { + use crate::executor::error::ErrorKind; + match e.inner() { + ErrorKind::ChannelClosed(_) => 0, + ErrorKind::Internal(_) => 1, + _ => 999, + } + }; + let stream_error_score = |e: &&StreamError| { + use crate::error::ErrorKind; + match e.inner() { + ErrorKind::Internal(_) => 1000, + ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), + _ => 3000, + } + }; + actor_errors + .into_iter() + .max_by_key(stream_error_score) + .cloned() +} + #[cfg(test)] pub mod test_utils { use risingwave_pb::common::HostAddress; From 1cf2e1cd73b61a24ad543d6b01447bade86a9d59 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 19 Feb 2024 22:21:38 +0800 Subject: [PATCH 15/20] minor fix --- src/compute/src/rpc/service/stream_service.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index d6452703fe804..209ff93b5ec56 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -173,7 +173,9 @@ impl StreamService for StreamServiceImpl { { Ok(result) => result, Err(e) => { - return Err(self.try_get_root_actor_failure().await.unwrap_or(e).into()); + let err = self.try_get_root_actor_failure().await.unwrap_or(e); + tracing::error!(error = %err.as_report(), "failed to collect barrier"); + return Err(err.into()); } }; From d2a87d885a0b8f4e83346b509258731d0f477744 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 27 Feb 2024 15:39:06 +0800 Subject: [PATCH 16/20] Revert "minor fix" This reverts commit 1cf2e1cd73b61a24ad543d6b01447bade86a9d59. --- src/compute/src/rpc/service/stream_service.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 209ff93b5ec56..d6452703fe804 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -173,9 +173,7 @@ impl StreamService for StreamServiceImpl { { Ok(result) => result, Err(e) => { - let err = self.try_get_root_actor_failure().await.unwrap_or(e); - tracing::error!(error = %err.as_report(), "failed to collect barrier"); - return Err(err.into()); + return Err(self.try_get_root_actor_failure().await.unwrap_or(e).into()); } }; From 2224b4006da5674551f7bc08ff023208f964a195 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 27 Feb 2024 15:39:11 +0800 Subject: [PATCH 17/20] Revert "try_find_root_actor_failure in compute node" This reverts commit 936be263d4759cc268365e2499aea7a1f4055e7f. --- src/compute/src/rpc/service/stream_service.rs | 29 +++----------- src/stream/src/executor/mod.rs | 2 +- src/stream/src/task/barrier_manager.rs | 17 +-------- src/stream/src/task/stream_manager.rs | 38 +------------------ 4 files changed, 10 insertions(+), 76 deletions(-) diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index d6452703fe804..def9a534586bb 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; - use await_tree::InstrumentAwait; use itertools::Itertools; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; @@ -38,14 +36,6 @@ impl StreamServiceImpl { pub fn new(mgr: LocalStreamManager, env: StreamEnvironment) -> Self { StreamServiceImpl { mgr, env } } - - async fn try_get_root_actor_failure(&self) -> Option { - tokio::time::sleep(Duration::from_secs(3)).await; - self.mgr - .local_barrier_manager - .try_get_root_actor_failure() - .await - } } #[async_trait::async_trait] @@ -142,13 +132,9 @@ impl StreamService for StreamServiceImpl { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap()).map_err(StreamError::from)?; - if let Err(e) = self - .mgr + self.mgr .send_barrier(barrier, req.actor_ids_to_send, req.actor_ids_to_collect) - .await - { - return Err(self.try_get_root_actor_failure().await.unwrap_or(e).into()); - } + .await?; Ok(Response::new(InjectBarrierResponse { request_id: req.request_id, @@ -165,17 +151,14 @@ impl StreamService for StreamServiceImpl { let BarrierCompleteResult { create_mview_progress, sync_result, - } = match self + } = self .mgr .collect_barrier(req.prev_epoch) .instrument_await(format!("collect_barrier (epoch {})", req.prev_epoch)) .await - { - Ok(result) => result, - Err(e) => { - return Err(self.try_get_root_actor_failure().await.unwrap_or(e).into()); - } - }; + .inspect_err( + |err| tracing::error!(error = %err.as_report(), "failed to collect barrier"), + )?; let (synced_sstables, table_watermarks) = sync_result .map(|sync_result| (sync_result.uncommitted_ssts, sync_result.table_watermarks)) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 38ce0d15d1a17..40ae252c03cf0 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -63,7 +63,7 @@ mod dedup; mod dispatch; pub mod dml; mod dynamic_filter; -pub mod error; +mod error; mod expand; mod filter; mod flow_control; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index a43bc9a1f7ede..b838314729ad3 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -102,9 +102,6 @@ pub(super) enum LocalBarrierEvent { actors: Vec, result_sender: oneshot::Sender>, }, - TryGetRootActorFailure { - result_sender: oneshot::Sender>, - }, #[cfg(test)] Flush(oneshot::Sender<()>), } @@ -190,7 +187,7 @@ pub(super) struct LocalBarrierWorker { state: ManagedBarrierState, /// Record all unexpected exited actors. - pub(super) failure_actors: HashMap, + failure_actors: HashMap, epoch_result_sender: HashMap>>, @@ -311,9 +308,6 @@ impl LocalBarrierWorker { actors, result_sender, } => self.start_create_actors(&actors, result_sender), - LocalBarrierEvent::TryGetRootActorFailure { result_sender } => { - self.try_get_root_actor_failure(result_sender) - } } } } @@ -599,15 +593,6 @@ impl LocalBarrierManager { .await? } - pub async fn try_get_root_actor_failure(&self) -> Option { - self.send_and_await(|result_sender| LocalBarrierEvent::TryGetRootActorFailure { - result_sender, - }) - .await - .ok() - .flatten() - } - /// When a [`crate::executor::StreamConsumer`] (typically [`crate::executor::DispatchExecutor`]) get a barrier, it should report /// and collect this barrier with its own `actor_id` using this function. pub fn collect(&self, actor_id: ActorId, barrier: &Barrier) { diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 9f7016dfc391e..5a2bde99da491 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -41,7 +41,7 @@ use tokio::sync::oneshot; use tokio::task::JoinHandle; use super::{unique_executor_id, unique_operator_id, BarrierCompleteResult}; -use crate::error::{StreamError, StreamResult}; +use crate::error::StreamResult; use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; use crate::executor::*; @@ -67,7 +67,7 @@ pub struct LocalStreamManager { context: Arc, - pub local_barrier_manager: LocalBarrierManager, + local_barrier_manager: LocalBarrierManager, } /// Report expression evaluation errors to the actor context. @@ -342,14 +342,6 @@ impl LocalBarrierWorker { .creating_actors .push(AttachedFuture::new(join_handle, result_sender)); } - - pub(super) fn try_get_root_actor_failure( - &mut self, - result_sender: oneshot::Sender>, - ) { - let result = try_find_root_actor_failure(self.failure_actors.values()); - let _ = result_sender.send(result); - } } impl StreamActorManager { @@ -754,32 +746,6 @@ impl StreamActorManagerState { } } -/// Tries to find the root cause of actor failures, based on hard-coded rules. -pub fn try_find_root_actor_failure<'a>( - actor_errors: impl IntoIterator, -) -> Option { - let stream_executor_error_score = |e: &StreamExecutorError| { - use crate::executor::error::ErrorKind; - match e.inner() { - ErrorKind::ChannelClosed(_) => 0, - ErrorKind::Internal(_) => 1, - _ => 999, - } - }; - let stream_error_score = |e: &&StreamError| { - use crate::error::ErrorKind; - match e.inner() { - ErrorKind::Internal(_) => 1000, - ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), - _ => 3000, - } - }; - actor_errors - .into_iter() - .max_by_key(stream_error_score) - .cloned() -} - #[cfg(test)] pub mod test_utils { use risingwave_pb::common::HostAddress; From 941350ab4c77ca2ed56abcc1656b4575ebf68686 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 27 Feb 2024 15:47:29 +0800 Subject: [PATCH 18/20] try_find_root_actor_failure in compute node --- src/stream/src/executor/mod.rs | 2 +- src/stream/src/task/barrier_manager.rs | 81 +++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 9 deletions(-) diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 1e82768096f36..7ec645e3b0dc7 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -62,7 +62,7 @@ mod dedup; mod dispatch; pub mod dml; mod dynamic_filter; -mod error; +pub mod error; mod expand; mod filter; mod flow_control; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index d9917cdf554f0..cac7e4a7f8d2c 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use std::time::Duration; use anyhow::anyhow; use futures::stream::FuturesUnordered; @@ -212,6 +213,8 @@ pub(super) struct LocalBarrierWorker { barrier_event_rx: UnboundedReceiver, actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>, + + root_failure: Option, } impl LocalBarrierWorker { @@ -239,6 +242,7 @@ impl LocalBarrierWorker { current_shared_context: shared_context, barrier_event_rx: event_rx, actor_failure_rx: failure_rx, + root_failure: None, } } @@ -260,7 +264,7 @@ impl LocalBarrierWorker { }, failure = self.actor_failure_rx.recv() => { let (actor_id, err) = failure.unwrap(); - self.notify_failure(actor_id, err); + self.notify_failure(actor_id, err).await; }, actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { @@ -451,7 +455,8 @@ impl LocalBarrierWorker { // The failure actors could exit before the barrier is issued, while their // up-downstream actors could be stuck somehow. Return error directly to trigger the // recovery. - return Err(e.clone()); + // try_find_root_failure is not used merely because it requires async. + return Err(self.root_failure().unwrap_or(e.clone())); } } @@ -538,22 +543,55 @@ impl LocalBarrierWorker { /// When a actor exit unexpectedly, it should report this event using this function, so meta /// will notice actor's exit while collecting. - fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { + async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { + self.add_failure(actor_id, err.clone()); + let root_err = self.try_find_root_failure(err).await; + for fail_epoch in self.state.epochs_await_on_actor(actor_id) { + if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) { + if result_sender.send(Err(root_err.clone())).is_err() { + warn!(fail_epoch, actor_id, err = %root_err.as_report(), "fail to notify actor failure"); + } + } + } + } + + fn add_failure(&mut self, actor_id: ActorId, err: StreamError) { let err = err.into_unexpected_exit(actor_id); - if let Some(prev_err) = self.failure_actors.insert(actor_id, err.clone()) { + if let Some(prev_err) = self.failure_actors.insert(actor_id, err) { warn!( actor_id, prev_err = %prev_err.as_report(), "actor error overwritten" ); } - for fail_epoch in self.state.epochs_await_on_actor(actor_id) { - if let Some(result_sender) = self.epoch_result_sender.remove(&fail_epoch) { - if result_sender.send(Err(err.clone())).is_err() { - warn!(fail_epoch, actor_id, err = %err.as_report(), "fail to notify actor failure"); + } + + fn root_failure(&self) -> Option { + self.root_failure.clone() + } + + async fn try_find_root_failure(&mut self, default_err: StreamError) -> StreamError { + if let Some(root_failure) = &self.root_failure { + return root_failure.clone(); + } + // fetch more actor errors within a timeout + let mut timeout = tokio::time::interval(Duration::from_secs(3)); + timeout.reset(); + loop { + select! { + biased; + _ = timeout.tick() => { + break; + } + result = self.actor_failure_rx.recv() => { + let Some((actor_id, error)) = result else { + break; + }; + self.add_failure(actor_id, error); } } } + try_find_root_actor_failure(self.failure_actors.values()).unwrap_or(default_err) } } @@ -681,6 +719,33 @@ impl LocalBarrierManager { } } +/// Tries to find the root cause of actor failures, based on hard-coded rules. +pub fn try_find_root_actor_failure<'a>( + actor_errors: impl IntoIterator, +) -> Option { + use crate::executor::StreamExecutorError; + let stream_executor_error_score = |e: &StreamExecutorError| { + use crate::executor::error::ErrorKind; + match e.inner() { + ErrorKind::ChannelClosed(_) => 0, + ErrorKind::Internal(_) => 1, + _ => 999, + } + }; + let stream_error_score = |e: &&StreamError| { + use crate::error::ErrorKind; + match e.inner() { + ErrorKind::Internal(_) => 1000, + ErrorKind::Executor(ee) => 2000 + stream_executor_error_score(ee), + _ => 3000, + } + }; + actor_errors + .into_iter() + .max_by_key(stream_error_score) + .cloned() +} + #[cfg(test)] impl LocalBarrierManager { pub(super) async fn spawn_for_test() -> (EventSender, Self) { From bcea4b9da45a69b5a92ef9a3977fcfb0b2a1ed09 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 27 Feb 2024 16:24:24 +0800 Subject: [PATCH 19/20] minor refactor --- src/stream/src/task/barrier_manager.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index cac7e4a7f8d2c..1a12293785457 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -456,7 +456,7 @@ impl LocalBarrierWorker { // up-downstream actors could be stuck somehow. Return error directly to trigger the // recovery. // try_find_root_failure is not used merely because it requires async. - return Err(self.root_failure().unwrap_or(e.clone())); + return Err(self.root_failure.clone().unwrap_or(e.clone())); } } @@ -566,10 +566,6 @@ impl LocalBarrierWorker { } } - fn root_failure(&self) -> Option { - self.root_failure.clone() - } - async fn try_find_root_failure(&mut self, default_err: StreamError) -> StreamError { if let Some(root_failure) = &self.root_failure { return root_failure.clone(); From c8809c42ba6dfcee211812e4277109369a5a58c9 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Tue, 27 Feb 2024 16:42:41 +0800 Subject: [PATCH 20/20] bugfix and refactor --- src/stream/src/task/barrier_manager.rs | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 1a12293785457..faabc8f266ac8 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -571,23 +571,14 @@ impl LocalBarrierWorker { return root_failure.clone(); } // fetch more actor errors within a timeout - let mut timeout = tokio::time::interval(Duration::from_secs(3)); - timeout.reset(); - loop { - select! { - biased; - _ = timeout.tick() => { - break; - } - result = self.actor_failure_rx.recv() => { - let Some((actor_id, error)) = result else { - break; - }; - self.add_failure(actor_id, error); - } + let _ = tokio::time::timeout(Duration::from_secs(3), async { + while let Some((actor_id, error)) = self.actor_failure_rx.recv().await { + self.add_failure(actor_id, error); } - } - try_find_root_actor_failure(self.failure_actors.values()).unwrap_or(default_err) + }) + .await; + self.root_failure = try_find_root_actor_failure(self.failure_actors.values()); + self.root_failure.clone().unwrap_or(default_err) } }