diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 427cc392adef9..9addc79389c9a 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -585,7 +585,7 @@ impl GlobalBarrierManager { .checkpoint_control .can_inject_barrier(self.in_flight_barrier_nums) => { self.active_streaming_nodes.sync().await; - self.handle_new_barrier(scheduled); + self.handle_new_barrier(scheduled).await; } } self.checkpoint_control.update_barrier_nums_metrics(); @@ -593,7 +593,7 @@ impl GlobalBarrierManager { } /// Handle the new barrier from the scheduled queue and inject it. - fn handle_new_barrier(&mut self, scheduled: Scheduled) { + async fn handle_new_barrier(&mut self, scheduled: Scheduled) { let Scheduled { command, mut notifiers, @@ -630,7 +630,7 @@ impl GlobalBarrierManager { send_latency_timer.observe_duration(); - self.rpc_manager.inject_barrier(command_ctx.clone()); + self.rpc_manager.inject_barrier(command_ctx.clone()).await; // Notify about the injection. let prev_paused_reason = self.state.paused_reason(); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 35964d3933b7a..8ebee011d395d 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -483,6 +483,7 @@ impl GlobalBarrierManagerContext { let res = match self .inject_barrier(command_ctx.clone(), None, None) .await + .await .result { Ok(response) => { diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 877f935f25207..78fef8b1526b8 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -66,15 +66,16 @@ impl BarrierRpcManager { self.prev_injecting_barrier = None; } - pub(super) fn inject_barrier(&mut self, command_context: Arc) { + pub(super) async fn inject_barrier(&mut self, command_context: Arc) { // this is to notify that the barrier has been injected so that the next // barrier can be injected to avoid out of order barrier injection. // TODO: can be removed when bidi-stream control in implemented. let (inject_tx, inject_rx) = oneshot::channel(); let prev_inject_rx = self.prev_injecting_barrier.replace(inject_rx); - let await_complete_future = - self.context - .inject_barrier(command_context, Some(inject_tx), prev_inject_rx); + let await_complete_future = self + .context + .inject_barrier(command_context, Some(inject_tx), prev_inject_rx) + .await; self.injected_in_progress_barrier .push(await_complete_future); } @@ -88,7 +89,7 @@ pub(super) type BarrierCompletionFuture = impl Future, inject_tx: Option>, @@ -97,22 +98,16 @@ impl GlobalBarrierManagerContext { let (tx, rx) = oneshot::channel(); let prev_epoch = command_context.prev_epoch.value().0; let stream_rpc_manager = self.stream_rpc_manager.clone(); + let span = command_context.span.clone(); + if let Some(prev_inject_rx) = prev_inject_rx { + let _ = prev_inject_rx.await; + } + let result = stream_rpc_manager + .inject_barrier(command_context.clone()) + .instrument(span.clone()) + .await; // todo: the collect handler should be abort when recovery. let _join_handle = tokio::spawn(async move { - let span = command_context.span.clone(); - if let Some(prev_inject_rx) = prev_inject_rx { - if prev_inject_rx.await.is_err() { - let _ = tx.send(BarrierCompletion { - prev_epoch, - result: Err(anyhow!("prev barrier failed to be injected").into()), - }); - return; - } - } - let result = stream_rpc_manager - .inject_barrier(command_context.clone()) - .instrument(span.clone()) - .await; match result { Ok(node_need_collect) => { if let Some(inject_tx) = inject_tx {