From 77e14b84178099c2980d37875181accb04439dbe Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 23 Feb 2024 16:19:53 +0800 Subject: [PATCH] add comment --- src/meta/src/barrier/mod.rs | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index eb905c5ad1e8f..f6890b8118f6f 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -215,16 +215,29 @@ impl CheckpointControl { } } + fn inflight_command_num(&self) -> usize { + self.inflight_command_ctx_queue.len() + } + + fn total_command_num(&self) -> usize { + self.inflight_command_ctx_queue.len() + + if self.completing_command.is_some() { + 1 + } else { + 0 + } + } + /// Update the metrics of barrier nums. fn update_barrier_nums_metrics(&self) { self.context .metrics .in_flight_barrier_nums - .set(self.inflight_command_ctx_queue.len() as i64); + .set(self.inflight_command_num() as i64); self.context .metrics .all_barrier_nums - .set(self.inflight_command_ctx_queue.len() as i64); + .set(self.total_command_num() as i64); } /// Enqueue a barrier command, and init its state to `InFlight`. @@ -253,7 +266,7 @@ impl CheckpointControl { /// Pause inject barrier until True. fn can_inject_barrier(&self, in_flight_barrier_nums: usize) -> bool { - let in_flight_not_full = self.inflight_command_ctx_queue.len() < in_flight_barrier_nums; + let in_flight_not_full = self.inflight_command_num() < in_flight_barrier_nums; // Whether some command requires pausing concurrent barrier. If so, it must be the last one. let should_pause = self @@ -312,6 +325,9 @@ struct InflightCommand { struct CompletingCommand { command_ctx: Arc, + // The join handle of a spawned task that completes the barrier. + // The return value indicate whether there is some create streaming job command + // that has finished but not checkpointed. If there is any, we will force checkpoint on the next barrier join_handle: JoinHandle>, }