Skip to content

Commit

Permalink
add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 23, 2024
1 parent 800b7fb commit 77e14b8
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,16 +215,29 @@ impl CheckpointControl {
}
}

fn inflight_command_num(&self) -> usize {
self.inflight_command_ctx_queue.len()
}

fn total_command_num(&self) -> usize {
self.inflight_command_ctx_queue.len()
+ if self.completing_command.is_some() {
1
} else {
0
}
}

/// Update the metrics of barrier nums.
fn update_barrier_nums_metrics(&self) {
self.context
.metrics
.in_flight_barrier_nums
.set(self.inflight_command_ctx_queue.len() as i64);
.set(self.inflight_command_num() as i64);
self.context
.metrics
.all_barrier_nums
.set(self.inflight_command_ctx_queue.len() as i64);
.set(self.total_command_num() as i64);
}

/// Enqueue a barrier command, and init its state to `InFlight`.
Expand Down Expand Up @@ -253,7 +266,7 @@ impl CheckpointControl {

/// Pause inject barrier until True.
fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool {
let in_flight_not_full = self.inflight_command_ctx_queue.len() < in_flight_barrier_nums;
let in_flight_not_full = self.inflight_command_num() < in_flight_barrier_nums;

// Whether some command requires pausing concurrent barrier. If so, it must be the last one.
let should_pause = self
Expand Down Expand Up @@ -312,6 +325,9 @@ struct InflightCommand {
struct CompletingCommand {
command_ctx: Arc<CommandContext>,

// The join handle of a spawned task that completes the barrier.
// The return value indicate whether there is some create streaming job command
// that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier
join_handle: JoinHandle<MetaResult<bool>>,
}

Expand Down

0 comments on commit 77e14b8

Please sign in to comment.