diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 074565f54620e..17031bc12404c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -745,12 +745,19 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let node_to_collect = self + let node_to_collect = match self .control_stream_manager .inject_barrier(command_ctx.clone()) - .inspect_err(|_| { + { + Ok(node_to_collect) => node_to_collect, + Err(err) => { + for notifier in notifiers { + notifier.notify_failed(err.clone()); + } fail_point!("inject_barrier_err_success"); - })?; + return Err(err); + } + }; // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index f282f99061b4c..662a1006baae9 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -241,7 +241,9 @@ impl BarrierScheduler { ..Default::default() }; self.attach_notifiers(vec![notifier], checkpoint)?; - rx.await.unwrap() + rx.await + .ok() + .context("failed to wait for barrier collect")? } /// Run multiple commands and return when they're all completely finished. It's ensured that