From 4e0b9d74c081a4a7f709d57e1af90bc4f92e2df8 Mon Sep 17 00:00:00 2001 From: August Date: Thu, 21 Mar 2024 19:32:19 +0800 Subject: [PATCH] fix: avoid flush panic when recovery encountered --- src/meta/src/barrier/mod.rs | 13 ++++++++++--- src/meta/src/barrier/schedule.rs | 4 +++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 074565f54620e..17031bc12404c 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -745,12 +745,19 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - let node_to_collect = self + let node_to_collect = match self .control_stream_manager .inject_barrier(command_ctx.clone()) - .inspect_err(|_| { + { + Ok(node_to_collect) => node_to_collect, + Err(err) => { + for notifier in notifiers { + notifier.notify_failed(err.clone()); + } fail_point!("inject_barrier_err_success"); - })?; + return Err(err); + } + }; // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index f282f99061b4c..662a1006baae9 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -241,7 +241,9 @@ impl BarrierScheduler { ..Default::default() }; self.attach_notifiers(vec![notifier], checkpoint)?; - rx.await.unwrap() + rx.await + .ok() + .context("failed to wait for barrier collect")? } /// Run multiple commands and return when they're all completely finished. It's ensured that