diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index bd9ee3ecb9bd1..d15046d507779 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -206,7 +206,7 @@ struct CheckpointControl { metrics: Arc, /// Get notified when we finished Create MV and collect a barrier(checkpoint = true) - finished_commands: Vec, + finished_jobs: Vec, } impl CheckpointControl { @@ -218,30 +218,40 @@ impl CheckpointControl { adding_actors: Default::default(), removing_actors: Default::default(), metrics, - finished_commands: Default::default(), + finished_jobs: Default::default(), } } /// Stash a command to finish later. fn stash_command_to_finish(&mut self, finished_job: TrackingJob) { - self.finished_commands.push(finished_job); + self.finished_jobs.push(finished_job); } - /// Finish stashed commands. If the current barrier is not a `checkpoint`, we will not finish - /// the commands that requires a checkpoint, else we will finish all the commands. + /// Finish stashed jobs. + /// If checkpoint, means all jobs can be finished. + /// If not checkpoint, jobs which do not require checkpoint can be finished. /// - /// Returns whether there are still remaining stashed commands to finish. - async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult { - for command in self - .finished_commands - .extract_if(|c| checkpoint || !c.is_checkpoint()) - { - // The command is ready to finish. We can now call `pre_finish`. - command.pre_finish().await?; - command.notify_finished(); + /// Returns whether there are still remaining stashed jobs to finish. + async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult { + async fn finish_job(job: TrackingJob) -> MetaResult<()> { + job.pre_finish().await?; + job.notify_finished(); + Ok(()) + } + if checkpoint { + for job in self.finished_jobs.drain(..) { + finish_job(job).await?; + } + Ok(false) + } else { + for job in self + .finished_jobs + .extract_if(|c| !c.is_checkpoint_required()) + { + finish_job(job).await?; + } + Ok(!self.finished_jobs.is_empty()) } - - Ok(!self.finished_commands.is_empty()) } fn cancel_command(&mut self, cancelled_job: TrackingJob) { @@ -258,7 +268,7 @@ impl CheckpointControl { } fn cancel_stashed_command(&mut self, id: TableId) { - self.finished_commands + self.finished_jobs .retain(|x| x.table_to_create() != Some(id)); } @@ -476,7 +486,7 @@ impl CheckpointControl { tracing::warn!("there are some changes in dropping_tables"); self.dropping_tables.clear(); } - self.finished_commands.clear(); + self.finished_jobs.clear(); } } @@ -1092,9 +1102,7 @@ impl GlobalBarrierManager { checkpoint_control.cancel_stashed_command(table_id); } - let remaining = checkpoint_control - .finish_commands(kind.is_checkpoint()) - .await?; + let remaining = checkpoint_control.finish_jobs(kind.is_checkpoint()).await?; // If there are remaining commands (that requires checkpoint to finish), we force // the next barrier to be a checkpoint. if remaining { diff --git a/src/meta/src/barrier/progress.rs b/src/meta/src/barrier/progress.rs index 25a4d4d91dff3..2d5a745eaef5e 100644 --- a/src/meta/src/barrier/progress.rs +++ b/src/meta/src/barrier/progress.rs @@ -160,7 +160,7 @@ impl TrackingJob { } /// Returns whether the `TrackingJob` requires a checkpoint to complete. - pub(crate) fn is_checkpoint(&self) -> bool { + pub(crate) fn is_checkpoint_required(&self) -> bool { match self { // Recovered tracking job is always a streaming job, // It requires a checkpoint to complete.