From 5aabf542875465e591d65f4b1ad8a4cd925a5800 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 16 Jul 2024 11:11:43 +0800 Subject: [PATCH] fix(streaming): find and return root actor failure when injection failed (#17672) --- src/meta/src/rpc/ddl_controller_v2.rs | 2 +- src/stream/src/task/barrier_manager.rs | 77 +++++++++++++++++--------- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 0dabc9b19022d..518d6e7b3eafb 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -94,7 +94,7 @@ impl DdlController { { Ok(version) => Ok(version), Err(err) => { - tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job"); + tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job"); let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail { id: streaming_job.id(), name: streaming_job.name(), diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 8d6b4e5056ad7..b0ce6ad30540f 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -120,12 +120,6 @@ impl ControlStreamHandle { } } - fn inspect_result(&mut self, result: StreamResult<()>) { - if let Err(e) = result { - self.reset_stream_with_err(e.to_status_unnamed(Code::Internal)); - } - } - fn send_response(&mut self, response: StreamingControlStreamResponse) { if let Some((sender, _)) = self.pair.as_ref() { if sender.send(Ok(response)).is_err() { @@ -374,7 +368,8 @@ pub(super) struct LocalBarrierWorker { actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>, - root_failure: Option, + /// Cached result of [`Self::try_find_root_failure`]. + cached_root_failure: Option, } impl LocalBarrierWorker { @@ -403,7 +398,7 @@ impl LocalBarrierWorker { current_shared_context: shared_context, barrier_event_rx: event_rx, actor_failure_rx: failure_rx, - root_failure: None, + cached_root_failure: None, } } @@ -431,14 +426,16 @@ impl LocalBarrierWorker { } completed_epoch = self.state.next_completed_epoch() => { let result = self.on_epoch_completed(completed_epoch); - self.control_stream_handle.inspect_result(result); + if let Err(err) = result { + self.notify_other_failure(err, "failed to complete epoch").await; + } }, event = self.barrier_event_rx.recv() => { self.handle_barrier_event(event.expect("should not be none")); }, failure = self.actor_failure_rx.recv() => { let (actor_id, err) = failure.unwrap(); - self.notify_failure(actor_id, err).await; + self.notify_actor_failure(actor_id, err).await; }, actor_op = actor_op_rx.recv() => { if let Some(actor_op) = actor_op { @@ -462,7 +459,9 @@ impl LocalBarrierWorker { }, request = self.control_stream_handle.next_request() => { let result = self.handle_streaming_control_request(request); - self.control_stream_handle.inspect_result(result); + if let Err(err) = result { + self.notify_other_failure(err, "failed to inject barrier").await; + } }, } } @@ -661,6 +660,10 @@ impl LocalBarrierWorker { /// Broadcast a barrier to all senders. Save a receiver which will get notified when this /// barrier is finished, in managed mode. + /// + /// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not + /// the root cause of the failure. The caller should then call [`Self::try_find_root_failure`] + /// to find the root cause. fn send_barrier( &mut self, barrier: &Barrier, @@ -668,8 +671,7 @@ impl LocalBarrierWorker { to_collect: HashSet, table_ids: HashSet, ) -> StreamResult<()> { - #[cfg(not(test))] - { + if !cfg!(test) { // The barrier might be outdated and been injected after recovery in some certain extreme // scenarios. So some newly creating actors in the barrier are possibly not rebuilt during // recovery. Check it here and return an error here if some actors are not found to @@ -702,12 +704,15 @@ impl LocalBarrierWorker { ); for actor_id in &to_collect { - if let Some(e) = self.failure_actors.get(actor_id) { + if self.failure_actors.contains_key(actor_id) { // The failure actors could exit before the barrier is issued, while their // up-downstream actors could be stuck somehow. Return error directly to trigger the // recovery. - // try_find_root_failure is not used merely because it requires async. - return Err(self.root_failure.clone().unwrap_or(e.clone())); + return Err(StreamError::barrier_send( + barrier.clone(), + *actor_id, + "actor has already failed", + )); } } @@ -763,11 +768,11 @@ impl LocalBarrierWorker { self.state.collect(actor_id, barrier) } - /// When a actor exit unexpectedly, it should report this event using this function, so meta - /// will notice actor's exit while collecting. - async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) { + /// When a actor exit unexpectedly, the error is reported using this function. The control stream + /// will be reset and the meta service will then trigger recovery. + async fn notify_actor_failure(&mut self, actor_id: ActorId, err: StreamError) { self.add_failure(actor_id, err.clone()); - let root_err = self.try_find_root_failure().await; + let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one let failed_epochs = self.state.epochs_await_on_actor(actor_id).collect_vec(); if !failed_epochs.is_empty() { @@ -782,6 +787,21 @@ impl LocalBarrierWorker { } } + /// When some other failure happens (like failed to send barrier), the error is reported using + /// this function. The control stream will be reset and the meta service will then trigger recovery. + /// + /// This is similar to [`Self::notify_actor_failure`], but since there's not always an actor failure, + /// the given `err` will be used if there's no root failure found. + async fn notify_other_failure(&mut self, err: StreamError, message: impl Into) { + let root_err = self.try_find_root_failure().await.unwrap_or(err); + + self.control_stream_handle.reset_stream_with_err( + anyhow!(root_err) + .context(message.into()) + .to_status_unnamed(Code::Internal), + ); + } + fn add_failure(&mut self, actor_id: ActorId, err: StreamError) { if let Some(prev_err) = self.failure_actors.insert(actor_id, err) { warn!( @@ -792,9 +812,12 @@ impl LocalBarrierWorker { } } - async fn try_find_root_failure(&mut self) -> StreamError { - if let Some(root_failure) = &self.root_failure { - return root_failure.clone(); + /// Collect actor errors for a while and find the one that might be the root cause. + /// + /// Returns `None` if there's no actor error received. + async fn try_find_root_failure(&mut self) -> Option { + if self.cached_root_failure.is_some() { + return self.cached_root_failure.clone(); } // fetch more actor errors within a timeout let _ = tokio::time::timeout(Duration::from_secs(3), async { @@ -803,11 +826,9 @@ impl LocalBarrierWorker { } }) .await; - self.root_failure = try_find_root_actor_failure(self.failure_actors.values()); + self.cached_root_failure = try_find_root_actor_failure(self.failure_actors.values()); - self.root_failure - .clone() - .expect("failure actors should not be empty") + self.cached_root_failure.clone() } } @@ -915,6 +936,8 @@ impl LocalBarrierManager { } /// Tries to find the root cause of actor failures, based on hard-coded rules. +/// +/// Returns `None` if the input is empty. pub fn try_find_root_actor_failure<'a>( actor_errors: impl IntoIterator, ) -> Option {