From c451a4c4a7d9ebaf7056a2d070b0b74b89332bc8 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 18 Jun 2024 14:09:15 +0800 Subject: [PATCH 1/4] Improve the display of LocalBarrierManager's stats --- src/stream/src/task/barrier_manager.rs | 36 +++++++++- .../src/task/barrier_manager/managed_state.rs | 72 ++++++++++++++++++- .../src/task/barrier_manager/progress.rs | 13 ++++ 3 files changed, 115 insertions(+), 6 deletions(-) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index c5d564d9c6ab8..6ede6d1181035 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap, HashSet}; +use std::fmt::Display; use std::future::pending; use std::sync::Arc; use std::time::Duration; @@ -305,8 +306,6 @@ pub(crate) struct StreamActorManager { pub(super) runtime: BackgroundShutdownRuntime, } -#[derive(Debug)] -#[expect(dead_code)] pub(super) struct LocalBarrierWorkerDebugInfo<'a> { actor_to_send: BTreeSet, running_actors: BTreeSet, @@ -315,6 +314,36 @@ pub(super) struct LocalBarrierWorkerDebugInfo<'a> { has_control_stream_connected: bool, } +impl Display for LocalBarrierWorkerDebugInfo<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Running actors:")?; + for actor_id in &self.running_actors { + write!(f, "{}, ", actor_id)?; + } + + write!(f, "\nActors to send:")?; + for actor_id in &self.actor_to_send { + write!(f, "{}, ", actor_id)?; + } + + write!(f, "\nCreating actors:")?; + for actors in &self.creating_actors { + for actor_id in actors { + write!(f, "{}, ", actor_id)?; + } + } + + writeln!( + f, + "\nhas_control_stream_connected: {:}", + self.has_control_stream_connected + )?; + + writeln!(f, "Managed Barrier State: {}", self.managed_barrier_state)?; + Ok(()) + } +} + /// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager. /// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. @@ -538,7 +567,8 @@ impl LocalBarrierWorker { let _ = result_sender.send(()); } LocalActorOperation::InspectState { result_sender } => { - let _ = result_sender.send(format!("{:#?}", self.to_debug_info())); + let debug_info = self.to_debug_info(); + let _ = result_sender.send(debug_info.to_string()); } } } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 00ff5606bde2e..ad97b1bbe6b3f 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -15,7 +15,7 @@ use std::assert_matches::assert_matches; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; -use std::fmt::{Debug, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::future::Future; use std::mem::replace; use std::sync::Arc; @@ -143,13 +143,79 @@ fn sync_epoch( #[derive(Debug)] pub(super) struct ManagedBarrierStateDebugInfo<'a> { - #[expect(dead_code)] epoch_barrier_state_map: &'a BTreeMap, - #[expect(dead_code)] create_mview_progress: &'a HashMap>, } +impl Display for ManagedBarrierStateDebugInfo<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut prev_epoch = 0u64; + for (epoch, barrier_state) in self.epoch_barrier_state_map { + write!(f, "> Epoch {} ({:?}): ", epoch, barrier_state.kind)?; + match &barrier_state.inner { + ManagedBarrierStateInner::Stashed { .. } => { + write!(f, "Stashed")?; + } + ManagedBarrierStateInner::Issued(state) => { + write!(f, "Issued. Remaining actors: [")?; + let mut is_prev_epoch_issued = false; + if prev_epoch != 0 { + let bs = &self.epoch_barrier_state_map[&prev_epoch]; + match &bs.inner { + ManagedBarrierStateInner::Issued(IssuedState { + remaining_actors: remaining_actors_prev, + .. + }) => { + // Only show the actors that are not in the previous epoch. + is_prev_epoch_issued = true; + let mut duplicates = 0usize; + for actor_id in &state.remaining_actors { + if !remaining_actors_prev.contains(actor_id) { + write!(f, "{}, ", actor_id)?; + } else { + duplicates += 1; + } + } + if duplicates > 0 { + write!(f, "...and {} actors in prev epoch", duplicates)?; + } + } + _ => {} + }; + } + if !is_prev_epoch_issued { + for actor_id in &state.remaining_actors { + write!(f, "{}, ", actor_id)?; + } + } + write!(f, "]")?; + } + ManagedBarrierStateInner::AllCollected => { + write!(f, "AllCollected")?; + } + ManagedBarrierStateInner::Completed(_) => { + write!(f, "Completed")?; + } + } + prev_epoch = *epoch; + writeln!(f)?; + } + + if !self.create_mview_progress.is_empty() { + writeln!(f, "Create MView Progress:")?; + for (epoch, progress) in self.create_mview_progress { + write!(f, "> Epoch {}:", epoch)?; + for (actor_id, state) in progress { + write!(f, ">> Actor {}: {}, ", actor_id, state)?; + } + } + } + + Ok(()) + } +} + pub(super) struct ManagedBarrierState { /// Record barrier state for each epoch of concurrent checkpoints. /// diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 476534967072b..69c603a4b1ab4 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Display, Formatter}; + use super::LocalBarrierManager; use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress; use crate::task::barrier_manager::LocalBarrierWorker; @@ -26,6 +28,17 @@ pub(crate) enum BackfillState { Done(ConsumedRows), } +impl Display for BackfillState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + BackfillState::ConsumingUpstream(epoch, rows) => { + write!(f, "ConsumingUpstream(epoch: {}, rows: {})", epoch, rows) + } + BackfillState::Done(rows) => write!(f, "Done(rows: {})", rows), + } + } +} + impl LocalBarrierWorker { pub(crate) fn update_create_mview_progress( &mut self, From 3f9bfa2867ffbf7cfc3ea4bff8f29afdddf7c3d4 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 18 Jun 2024 17:59:56 +0800 Subject: [PATCH 2/4] format --- src/stream/src/task/barrier_manager.rs | 40 ++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 6ede6d1181035..66e6cfc7fbfa7 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -316,17 +316,17 @@ pub(super) struct LocalBarrierWorkerDebugInfo<'a> { impl Display for LocalBarrierWorkerDebugInfo<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Running actors:")?; + write!(f, "running_actors: ")?; for actor_id in &self.running_actors { write!(f, "{}, ", actor_id)?; } - write!(f, "\nActors to send:")?; + write!(f, "\nactor_to_send: ")?; for actor_id in &self.actor_to_send { write!(f, "{}, ", actor_id)?; } - write!(f, "\nCreating actors:")?; + write!(f, "\ncreating_actors: ")?; for actors in &self.creating_actors { for actor_id in actors { write!(f, "{}, ", actor_id)?; @@ -335,11 +335,11 @@ impl Display for LocalBarrierWorkerDebugInfo<'_> { writeln!( f, - "\nhas_control_stream_connected: {:}", + "\nhas_control_stream_connected: {}", self.has_control_stream_connected )?; - writeln!(f, "Managed Barrier State: {}", self.managed_barrier_state)?; + writeln!(f, "managed_barrier_state:\n{}", self.managed_barrier_state)?; Ok(()) } } @@ -1004,3 +1004,33 @@ impl LocalBarrierManager { rx.await.unwrap() } } +// #[cfg(test)] +// mod tests { +// use super::*; +// #[test] +// fn test_debug_info_display() { +// LocalBarrierWorkerDebugInfo { +// actor_to_send: BTreeSet::from([ +// 435670, 435671, 435672, 435673, 435674, 435675, 435676, 435677, 436568, 436569, +// 436570, 436571, 436572, 436573, 436574, 436575, 436984, 436985, 436986, 436987, +// 436988, 436989, 436990, 436991, 437146, 437147, 437148, 437149, 437150, 437151, +// 437152, 437153, 437755, 438195, 438196, 438197, 438198, 438199, 438200, 438201, +// 438202, 438739, 438740, 438741, 438742, 438743, 438744, 438745, 438746, 438803, +// 438804, 438805, 438806, 438807, 438808, 438809, 438810, 439188, 439189, 439190, +// 439191, 439192, 439193, 439194, 439195, 439349, 439350, 439351, 439352, 439353, +// 439354, 439355, 439356, 444797, 444798, 444799, 444800, 444801, 444802, 444803, +// 444804, 444917, 444918, 444919, 444920, 444921, 444922, 444923, 444924, 445205, +// 445206, 445207, 445208, 445209, 445210, 445211, 445212, 445261, 445262, 445263, +// 445264, 445265, 445266, 445267, 445268, 445357, 445358, 445359, 445360, 445361, +// 445362, 445363, 445364, 445437, 445438, 445439, 445440, 445441, 445442, 445443, +// 445444, 445501, 445502, 445503, 445504, 445505, 445506, 445507, 445508, 445581, +// 445582, 445583, 445584, 445585, 445586, 445587, 445588, 445949, 445950, 445951, +// 445952, 445953, 445954, 445955, 445956, +// ]), +// running_actors: BTreeSet::new(), +// creating_actors: vec![], +// managed_barrier_state: (), +// has_control_stream_connected: false, +// } +// } +// } From dc3b76c42624a3067d63722028c3a37c9c743094 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Tue, 18 Jun 2024 18:02:51 +0800 Subject: [PATCH 3/4] clean --- src/stream/src/task/barrier_manager.rs | 30 -------------------------- 1 file changed, 30 deletions(-) diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index 66e6cfc7fbfa7..8f807de9ff378 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -1004,33 +1004,3 @@ impl LocalBarrierManager { rx.await.unwrap() } } -// #[cfg(test)] -// mod tests { -// use super::*; -// #[test] -// fn test_debug_info_display() { -// LocalBarrierWorkerDebugInfo { -// actor_to_send: BTreeSet::from([ -// 435670, 435671, 435672, 435673, 435674, 435675, 435676, 435677, 436568, 436569, -// 436570, 436571, 436572, 436573, 436574, 436575, 436984, 436985, 436986, 436987, -// 436988, 436989, 436990, 436991, 437146, 437147, 437148, 437149, 437150, 437151, -// 437152, 437153, 437755, 438195, 438196, 438197, 438198, 438199, 438200, 438201, -// 438202, 438739, 438740, 438741, 438742, 438743, 438744, 438745, 438746, 438803, -// 438804, 438805, 438806, 438807, 438808, 438809, 438810, 439188, 439189, 439190, -// 439191, 439192, 439193, 439194, 439195, 439349, 439350, 439351, 439352, 439353, -// 439354, 439355, 439356, 444797, 444798, 444799, 444800, 444801, 444802, 444803, -// 444804, 444917, 444918, 444919, 444920, 444921, 444922, 444923, 444924, 445205, -// 445206, 445207, 445208, 445209, 445210, 445211, 445212, 445261, 445262, 445263, -// 445264, 445265, 445266, 445267, 445268, 445357, 445358, 445359, 445360, 445361, -// 445362, 445363, 445364, 445437, 445438, 445439, 445440, 445441, 445442, 445443, -// 445444, 445501, 445502, 445503, 445504, 445505, 445506, 445507, 445508, 445581, -// 445582, 445583, 445584, 445585, 445586, 445587, 445588, 445949, 445950, 445951, -// 445952, 445953, 445954, 445955, 445956, -// ]), -// running_actors: BTreeSet::new(), -// creating_actors: vec![], -// managed_barrier_state: (), -// has_control_stream_connected: false, -// } -// } -// } From b35b122dfd1dda8fc4e229011ed6bcf16c427214 Mon Sep 17 00:00:00 2001 From: Eric Fu Date: Fri, 21 Jun 2024 11:20:23 +0800 Subject: [PATCH 4/4] clippy --- .../src/task/barrier_manager/managed_state.rs | 36 +++++++++---------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index ad97b1bbe6b3f..0519be828f1f8 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -162,27 +162,25 @@ impl Display for ManagedBarrierStateDebugInfo<'_> { let mut is_prev_epoch_issued = false; if prev_epoch != 0 { let bs = &self.epoch_barrier_state_map[&prev_epoch]; - match &bs.inner { - ManagedBarrierStateInner::Issued(IssuedState { - remaining_actors: remaining_actors_prev, - .. - }) => { - // Only show the actors that are not in the previous epoch. - is_prev_epoch_issued = true; - let mut duplicates = 0usize; - for actor_id in &state.remaining_actors { - if !remaining_actors_prev.contains(actor_id) { - write!(f, "{}, ", actor_id)?; - } else { - duplicates += 1; - } - } - if duplicates > 0 { - write!(f, "...and {} actors in prev epoch", duplicates)?; + if let ManagedBarrierStateInner::Issued(IssuedState { + remaining_actors: remaining_actors_prev, + .. + }) = &bs.inner + { + // Only show the actors that are not in the previous epoch. + is_prev_epoch_issued = true; + let mut duplicates = 0usize; + for actor_id in &state.remaining_actors { + if !remaining_actors_prev.contains(actor_id) { + write!(f, "{}, ", actor_id)?; + } else { + duplicates += 1; } } - _ => {} - }; + if duplicates > 0 { + write!(f, "...and {} actors in prev epoch", duplicates)?; + } + } } if !is_prev_epoch_issued { for actor_id in &state.remaining_actors {