Skip to content

Commit

Permalink
pre-apply and purge earlier
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jun 17, 2024
1 parent 0da9bf9 commit 453d4a8
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,18 +423,6 @@ impl GlobalBarrierManager {
})?
};

let mut control_stream_manager =
ControlStreamManager::new(self.context.clone());

control_stream_manager
.reset(prev_epoch.value().0, active_streaming_nodes.current())
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;

self.context.sink_manager.reset().await;

if self
.context
.pre_apply_drop_cancel(&self.scheduled_barriers)
Expand All @@ -454,6 +442,18 @@ impl GlobalBarrierManager {
.await
.context("purge state table from hummock")?;

let mut control_stream_manager =
ControlStreamManager::new(self.context.clone());

control_stream_manager
.reset(prev_epoch.value().0, active_streaming_nodes.current())
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "reset compute nodes failed");
})?;

self.context.sink_manager.reset().await;

// update and build all actors.
self.context.update_actors(&info).await.inspect_err(|err| {
warn!(error = %err.as_report(), "update actors failed");
Expand Down

0 comments on commit 453d4a8

Please sign in to comment.