From 453d4a8c64d73a7757dee8e3606c838790e116f7 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 17 Jun 2024 14:56:40 +0800 Subject: [PATCH] pre-apply and purge earlier --- src/meta/src/barrier/recovery.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index f58f588e95466..7200426d78300 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -423,18 +423,6 @@ impl GlobalBarrierManager { })? }; - let mut control_stream_manager = - ControlStreamManager::new(self.context.clone()); - - control_stream_manager - .reset(prev_epoch.value().0, active_streaming_nodes.current()) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "reset compute nodes failed"); - })?; - - self.context.sink_manager.reset().await; - if self .context .pre_apply_drop_cancel(&self.scheduled_barriers) @@ -454,6 +442,18 @@ impl GlobalBarrierManager { .await .context("purge state table from hummock")?; + let mut control_stream_manager = + ControlStreamManager::new(self.context.clone()); + + control_stream_manager + .reset(prev_epoch.value().0, active_streaming_nodes.current()) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "reset compute nodes failed"); + })?; + + self.context.sink_manager.reset().await; + // update and build all actors. self.context.update_actors(&info).await.inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed");