diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index c5d564d9c6ab8..8f807de9ff378 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap, HashSet}; +use std::fmt::Display; use std::future::pending; use std::sync::Arc; use std::time::Duration; @@ -305,8 +306,6 @@ pub(crate) struct StreamActorManager { pub(super) runtime: BackgroundShutdownRuntime, } -#[derive(Debug)] -#[expect(dead_code)] pub(super) struct LocalBarrierWorkerDebugInfo<'a> { actor_to_send: BTreeSet, running_actors: BTreeSet, @@ -315,6 +314,36 @@ pub(super) struct LocalBarrierWorkerDebugInfo<'a> { has_control_stream_connected: bool, } +impl Display for LocalBarrierWorkerDebugInfo<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "running_actors: ")?; + for actor_id in &self.running_actors { + write!(f, "{}, ", actor_id)?; + } + + write!(f, "\nactor_to_send: ")?; + for actor_id in &self.actor_to_send { + write!(f, "{}, ", actor_id)?; + } + + write!(f, "\ncreating_actors: ")?; + for actors in &self.creating_actors { + for actor_id in actors { + write!(f, "{}, ", actor_id)?; + } + } + + writeln!( + f, + "\nhas_control_stream_connected: {}", + self.has_control_stream_connected + )?; + + writeln!(f, "managed_barrier_state:\n{}", self.managed_barrier_state)?; + Ok(()) + } +} + /// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager. /// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. @@ -538,7 +567,8 @@ impl LocalBarrierWorker { let _ = result_sender.send(()); } LocalActorOperation::InspectState { result_sender } => { - let _ = result_sender.send(format!("{:#?}", self.to_debug_info())); + let debug_info = self.to_debug_info(); + let _ = result_sender.send(debug_info.to_string()); } } } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 00ff5606bde2e..0519be828f1f8 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -15,7 +15,7 @@ use std::assert_matches::assert_matches; use std::collections::btree_map::Entry; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; -use std::fmt::{Debug, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::future::Future; use std::mem::replace; use std::sync::Arc; @@ -143,13 +143,77 @@ fn sync_epoch( #[derive(Debug)] pub(super) struct ManagedBarrierStateDebugInfo<'a> { - #[expect(dead_code)] epoch_barrier_state_map: &'a BTreeMap, - #[expect(dead_code)] create_mview_progress: &'a HashMap>, } +impl Display for ManagedBarrierStateDebugInfo<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut prev_epoch = 0u64; + for (epoch, barrier_state) in self.epoch_barrier_state_map { + write!(f, "> Epoch {} ({:?}): ", epoch, barrier_state.kind)?; + match &barrier_state.inner { + ManagedBarrierStateInner::Stashed { .. } => { + write!(f, "Stashed")?; + } + ManagedBarrierStateInner::Issued(state) => { + write!(f, "Issued. Remaining actors: [")?; + let mut is_prev_epoch_issued = false; + if prev_epoch != 0 { + let bs = &self.epoch_barrier_state_map[&prev_epoch]; + if let ManagedBarrierStateInner::Issued(IssuedState { + remaining_actors: remaining_actors_prev, + .. + }) = &bs.inner + { + // Only show the actors that are not in the previous epoch. + is_prev_epoch_issued = true; + let mut duplicates = 0usize; + for actor_id in &state.remaining_actors { + if !remaining_actors_prev.contains(actor_id) { + write!(f, "{}, ", actor_id)?; + } else { + duplicates += 1; + } + } + if duplicates > 0 { + write!(f, "...and {} actors in prev epoch", duplicates)?; + } + } + } + if !is_prev_epoch_issued { + for actor_id in &state.remaining_actors { + write!(f, "{}, ", actor_id)?; + } + } + write!(f, "]")?; + } + ManagedBarrierStateInner::AllCollected => { + write!(f, "AllCollected")?; + } + ManagedBarrierStateInner::Completed(_) => { + write!(f, "Completed")?; + } + } + prev_epoch = *epoch; + writeln!(f)?; + } + + if !self.create_mview_progress.is_empty() { + writeln!(f, "Create MView Progress:")?; + for (epoch, progress) in self.create_mview_progress { + write!(f, "> Epoch {}:", epoch)?; + for (actor_id, state) in progress { + write!(f, ">> Actor {}: {}, ", actor_id, state)?; + } + } + } + + Ok(()) + } +} + pub(super) struct ManagedBarrierState { /// Record barrier state for each epoch of concurrent checkpoints. /// diff --git a/src/stream/src/task/barrier_manager/progress.rs b/src/stream/src/task/barrier_manager/progress.rs index 476534967072b..69c603a4b1ab4 100644 --- a/src/stream/src/task/barrier_manager/progress.rs +++ b/src/stream/src/task/barrier_manager/progress.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::{Display, Formatter}; + use super::LocalBarrierManager; use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress; use crate::task::barrier_manager::LocalBarrierWorker; @@ -26,6 +28,17 @@ pub(crate) enum BackfillState { Done(ConsumedRows), } +impl Display for BackfillState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + BackfillState::ConsumingUpstream(epoch, rows) => { + write!(f, "ConsumingUpstream(epoch: {}, rows: {})", epoch, rows) + } + BackfillState::Done(rows) => write!(f, "Done(rows: {})", rows), + } + } +} + impl LocalBarrierWorker { pub(crate) fn update_create_mview_progress( &mut self,