Skip to content

Commit

Permalink
fix(streaming): find and return root actor failure when injection fai…
Browse files Browse the repository at this point in the history
…led (#17672)
  • Loading branch information
BugenZhao authored Jul 16, 2024
1 parent 46b4ccd commit 5aabf54
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/rpc/ddl_controller_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl DdlController {
{
Ok(version) => Ok(version),
Err(err) => {
tracing::error!(id = job_id, error = ?err.as_report(), "failed to create streaming job");
tracing::error!(id = job_id, error = %err.as_report(), "failed to create streaming job");
let event = risingwave_pb::meta::event_log::EventCreateStreamJobFail {
id: streaming_job.id(),
name: streaming_job.name(),
Expand Down
77 changes: 50 additions & 27 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,6 @@ impl ControlStreamHandle {
}
}

fn inspect_result(&mut self, result: StreamResult<()>) {
if let Err(e) = result {
self.reset_stream_with_err(e.to_status_unnamed(Code::Internal));
}
}

fn send_response(&mut self, response: StreamingControlStreamResponse) {
if let Some((sender, _)) = self.pair.as_ref() {
if sender.send(Ok(response)).is_err() {
Expand Down Expand Up @@ -374,7 +368,8 @@ pub(super) struct LocalBarrierWorker {

actor_failure_rx: UnboundedReceiver<(ActorId, StreamError)>,

root_failure: Option<StreamError>,
/// Cached result of [`Self::try_find_root_failure`].
cached_root_failure: Option<StreamError>,
}

impl LocalBarrierWorker {
Expand Down Expand Up @@ -403,7 +398,7 @@ impl LocalBarrierWorker {
current_shared_context: shared_context,
barrier_event_rx: event_rx,
actor_failure_rx: failure_rx,
root_failure: None,
cached_root_failure: None,
}
}

Expand Down Expand Up @@ -431,14 +426,16 @@ impl LocalBarrierWorker {
}
completed_epoch = self.state.next_completed_epoch() => {
let result = self.on_epoch_completed(completed_epoch);
self.control_stream_handle.inspect_result(result);
if let Err(err) = result {
self.notify_other_failure(err, "failed to complete epoch").await;
}
},
event = self.barrier_event_rx.recv() => {
self.handle_barrier_event(event.expect("should not be none"));
},
failure = self.actor_failure_rx.recv() => {
let (actor_id, err) = failure.unwrap();
self.notify_failure(actor_id, err).await;
self.notify_actor_failure(actor_id, err).await;
},
actor_op = actor_op_rx.recv() => {
if let Some(actor_op) = actor_op {
Expand All @@ -462,7 +459,9 @@ impl LocalBarrierWorker {
},
request = self.control_stream_handle.next_request() => {
let result = self.handle_streaming_control_request(request);
self.control_stream_handle.inspect_result(result);
if let Err(err) = result {
self.notify_other_failure(err, "failed to inject barrier").await;
}
},
}
}
Expand Down Expand Up @@ -661,15 +660,18 @@ impl LocalBarrierWorker {

/// Broadcast a barrier to all senders. Save a receiver which will get notified when this
/// barrier is finished, in managed mode.
///
/// Note that the error returned here is typically a [`StreamError::barrier_send`], which is not
/// the root cause of the failure. The caller should then call [`Self::try_find_root_failure`]
/// to find the root cause.
fn send_barrier(
&mut self,
barrier: &Barrier,
to_send: HashSet<ActorId>,
to_collect: HashSet<ActorId>,
table_ids: HashSet<TableId>,
) -> StreamResult<()> {
#[cfg(not(test))]
{
if !cfg!(test) {
// The barrier might be outdated and been injected after recovery in some certain extreme
// scenarios. So some newly creating actors in the barrier are possibly not rebuilt during
// recovery. Check it here and return an error here if some actors are not found to
Expand Down Expand Up @@ -702,12 +704,15 @@ impl LocalBarrierWorker {
);

for actor_id in &to_collect {
if let Some(e) = self.failure_actors.get(actor_id) {
if self.failure_actors.contains_key(actor_id) {
// The failure actors could exit before the barrier is issued, while their
// up-downstream actors could be stuck somehow. Return error directly to trigger the
// recovery.
// try_find_root_failure is not used merely because it requires async.
return Err(self.root_failure.clone().unwrap_or(e.clone()));
return Err(StreamError::barrier_send(
barrier.clone(),
*actor_id,
"actor has already failed",
));
}
}

Expand Down Expand Up @@ -763,11 +768,11 @@ impl LocalBarrierWorker {
self.state.collect(actor_id, barrier)
}

/// When a actor exit unexpectedly, it should report this event using this function, so meta
/// will notice actor's exit while collecting.
async fn notify_failure(&mut self, actor_id: ActorId, err: StreamError) {
/// When a actor exit unexpectedly, the error is reported using this function. The control stream
/// will be reset and the meta service will then trigger recovery.
async fn notify_actor_failure(&mut self, actor_id: ActorId, err: StreamError) {
self.add_failure(actor_id, err.clone());
let root_err = self.try_find_root_failure().await;
let root_err = self.try_find_root_failure().await.unwrap(); // always `Some` because we just added one

let failed_epochs = self.state.epochs_await_on_actor(actor_id).collect_vec();
if !failed_epochs.is_empty() {
Expand All @@ -782,6 +787,21 @@ impl LocalBarrierWorker {
}
}

/// When some other failure happens (like failed to send barrier), the error is reported using
/// this function. The control stream will be reset and the meta service will then trigger recovery.
///
/// This is similar to [`Self::notify_actor_failure`], but since there's not always an actor failure,
/// the given `err` will be used if there's no root failure found.
async fn notify_other_failure(&mut self, err: StreamError, message: impl Into<String>) {
let root_err = self.try_find_root_failure().await.unwrap_or(err);

self.control_stream_handle.reset_stream_with_err(
anyhow!(root_err)
.context(message.into())
.to_status_unnamed(Code::Internal),
);
}

fn add_failure(&mut self, actor_id: ActorId, err: StreamError) {
if let Some(prev_err) = self.failure_actors.insert(actor_id, err) {
warn!(
Expand All @@ -792,9 +812,12 @@ impl LocalBarrierWorker {
}
}

async fn try_find_root_failure(&mut self) -> StreamError {
if let Some(root_failure) = &self.root_failure {
return root_failure.clone();
/// Collect actor errors for a while and find the one that might be the root cause.
///
/// Returns `None` if there's no actor error received.
async fn try_find_root_failure(&mut self) -> Option<StreamError> {
if self.cached_root_failure.is_some() {
return self.cached_root_failure.clone();
}
// fetch more actor errors within a timeout
let _ = tokio::time::timeout(Duration::from_secs(3), async {
Expand All @@ -803,11 +826,9 @@ impl LocalBarrierWorker {
}
})
.await;
self.root_failure = try_find_root_actor_failure(self.failure_actors.values());
self.cached_root_failure = try_find_root_actor_failure(self.failure_actors.values());

self.root_failure
.clone()
.expect("failure actors should not be empty")
self.cached_root_failure.clone()
}
}

Expand Down Expand Up @@ -915,6 +936,8 @@ impl LocalBarrierManager {
}

/// Tries to find the root cause of actor failures, based on hard-coded rules.
///
/// Returns `None` if the input is empty.
pub fn try_find_root_actor_failure<'a>(
actor_errors: impl IntoIterator<Item = &'a StreamError>,
) -> Option<StreamError> {
Expand Down

0 comments on commit 5aabf54

Please sign in to comment.