Skip to content

Commit

Permalink
improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Nov 3, 2023
1 parent e168f22 commit 3d17200
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 22 deletions.
50 changes: 29 additions & 21 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ struct CheckpointControl {
metrics: Arc<MetaMetrics>,

/// Get notified when we finished Create MV and collect a barrier(checkpoint = true)
finished_commands: Vec<TrackingJob>,
finished_jobs: Vec<TrackingJob>,
}

impl CheckpointControl {
Expand All @@ -218,30 +218,40 @@ impl CheckpointControl {
adding_actors: Default::default(),
removing_actors: Default::default(),
metrics,
finished_commands: Default::default(),
finished_jobs: Default::default(),
}
}

/// Stash a command to finish later.
fn stash_command_to_finish(&mut self, finished_job: TrackingJob) {
self.finished_commands.push(finished_job);
self.finished_jobs.push(finished_job);
}

/// Finish stashed commands. If the current barrier is not a `checkpoint`, we will not finish
/// the commands that requires a checkpoint, else we will finish all the commands.
/// Finish stashed jobs.
/// If checkpoint, means all jobs can be finished.
/// If not checkpoint, jobs which do not require checkpoint can be finished.
///
/// Returns whether there are still remaining stashed commands to finish.
async fn finish_commands(&mut self, checkpoint: bool) -> MetaResult<bool> {
for command in self
.finished_commands
.extract_if(|c| checkpoint || !c.is_checkpoint())
{
// The command is ready to finish. We can now call `pre_finish`.
command.pre_finish().await?;
command.notify_finished();
/// Returns whether there are still remaining stashed jobs to finish.
async fn finish_jobs(&mut self, checkpoint: bool) -> MetaResult<bool> {
async fn finish_job(job: TrackingJob) -> MetaResult<()> {
job.pre_finish().await?;
job.notify_finished();
Ok(())
}
if checkpoint {
for job in self.finished_jobs.drain(..) {
finish_job(job).await?;
}
Ok(false)
} else {
for job in self
.finished_jobs
.extract_if(|c| !c.is_checkpoint_required())
{
finish_job(job).await?;
}
Ok(!self.finished_jobs.is_empty())
}

Ok(!self.finished_commands.is_empty())
}

fn cancel_command(&mut self, cancelled_job: TrackingJob) {
Expand All @@ -258,7 +268,7 @@ impl CheckpointControl {
}

fn cancel_stashed_command(&mut self, id: TableId) {
self.finished_commands
self.finished_jobs
.retain(|x| x.table_to_create() != Some(id));
}

Expand Down Expand Up @@ -476,7 +486,7 @@ impl CheckpointControl {
tracing::warn!("there are some changes in dropping_tables");
self.dropping_tables.clear();
}
self.finished_commands.clear();
self.finished_jobs.clear();
}
}

Expand Down Expand Up @@ -1092,9 +1102,7 @@ impl GlobalBarrierManager {
checkpoint_control.cancel_stashed_command(table_id);
}

let remaining = checkpoint_control
.finish_commands(kind.is_checkpoint())
.await?;
let remaining = checkpoint_control.finish_jobs(kind.is_checkpoint()).await?;
// If there are remaining commands (that requires checkpoint to finish), we force
// the next barrier to be a checkpoint.
if remaining {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl TrackingJob {
}

/// Returns whether the `TrackingJob` requires a checkpoint to complete.
pub(crate) fn is_checkpoint(&self) -> bool {
pub(crate) fn is_checkpoint_required(&self) -> bool {
match self {
// Recovered tracking job is always a streaming job,
// It requires a checkpoint to complete.
Expand Down

0 comments on commit 3d17200

Please sign in to comment.