Skip to content

Commit

Permalink
feat(diagnosis): improve barrier manager display (#17317)
Browse files Browse the repository at this point in the history
  • Loading branch information
fuyufjh authored Jun 21, 2024
1 parent 8a4bd3f commit fcfa573
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 6 deletions.
36 changes: 33 additions & 3 deletions src/stream/src/task/barrier_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::Display;
use std::future::pending;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -305,8 +306,6 @@ pub(crate) struct StreamActorManager {
pub(super) runtime: BackgroundShutdownRuntime,
}

#[derive(Debug)]
#[expect(dead_code)]
pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
actor_to_send: BTreeSet<ActorId>,
running_actors: BTreeSet<ActorId>,
Expand All @@ -315,6 +314,36 @@ pub(super) struct LocalBarrierWorkerDebugInfo<'a> {
has_control_stream_connected: bool,
}

impl Display for LocalBarrierWorkerDebugInfo<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "running_actors: ")?;
for actor_id in &self.running_actors {
write!(f, "{}, ", actor_id)?;
}

write!(f, "\nactor_to_send: ")?;
for actor_id in &self.actor_to_send {
write!(f, "{}, ", actor_id)?;
}

write!(f, "\ncreating_actors: ")?;
for actors in &self.creating_actors {
for actor_id in actors {
write!(f, "{}, ", actor_id)?;
}
}

writeln!(
f,
"\nhas_control_stream_connected: {}",
self.has_control_stream_connected
)?;

writeln!(f, "managed_barrier_state:\n{}", self.managed_barrier_state)?;
Ok(())
}
}

/// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager.
/// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the
/// barriers to and collect them from all actors, and finally report the progress.
Expand Down Expand Up @@ -538,7 +567,8 @@ impl LocalBarrierWorker {
let _ = result_sender.send(());
}
LocalActorOperation::InspectState { result_sender } => {
let _ = result_sender.send(format!("{:#?}", self.to_debug_info()));
let debug_info = self.to_debug_info();
let _ = result_sender.send(debug_info.to_string());
}
}
}
Expand Down
70 changes: 67 additions & 3 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::assert_matches::assert_matches;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::future::Future;
use std::mem::replace;
use std::sync::Arc;
Expand Down Expand Up @@ -143,13 +143,77 @@ fn sync_epoch(

#[derive(Debug)]
pub(super) struct ManagedBarrierStateDebugInfo<'a> {
#[expect(dead_code)]
epoch_barrier_state_map: &'a BTreeMap<u64, BarrierState>,

#[expect(dead_code)]
create_mview_progress: &'a HashMap<u64, HashMap<ActorId, BackfillState>>,
}

impl Display for ManagedBarrierStateDebugInfo<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut prev_epoch = 0u64;
for (epoch, barrier_state) in self.epoch_barrier_state_map {
write!(f, "> Epoch {} ({:?}): ", epoch, barrier_state.kind)?;
match &barrier_state.inner {
ManagedBarrierStateInner::Stashed { .. } => {
write!(f, "Stashed")?;
}
ManagedBarrierStateInner::Issued(state) => {
write!(f, "Issued. Remaining actors: [")?;
let mut is_prev_epoch_issued = false;
if prev_epoch != 0 {
let bs = &self.epoch_barrier_state_map[&prev_epoch];
if let ManagedBarrierStateInner::Issued(IssuedState {
remaining_actors: remaining_actors_prev,
..
}) = &bs.inner
{
// Only show the actors that are not in the previous epoch.
is_prev_epoch_issued = true;
let mut duplicates = 0usize;
for actor_id in &state.remaining_actors {
if !remaining_actors_prev.contains(actor_id) {
write!(f, "{}, ", actor_id)?;
} else {
duplicates += 1;
}
}
if duplicates > 0 {
write!(f, "...and {} actors in prev epoch", duplicates)?;
}
}
}
if !is_prev_epoch_issued {
for actor_id in &state.remaining_actors {
write!(f, "{}, ", actor_id)?;
}
}
write!(f, "]")?;
}
ManagedBarrierStateInner::AllCollected => {
write!(f, "AllCollected")?;
}
ManagedBarrierStateInner::Completed(_) => {
write!(f, "Completed")?;
}
}
prev_epoch = *epoch;
writeln!(f)?;
}

if !self.create_mview_progress.is_empty() {
writeln!(f, "Create MView Progress:")?;
for (epoch, progress) in self.create_mview_progress {
write!(f, "> Epoch {}:", epoch)?;
for (actor_id, state) in progress {
write!(f, ">> Actor {}: {}, ", actor_id, state)?;
}
}
}

Ok(())
}
}

pub(super) struct ManagedBarrierState {
/// Record barrier state for each epoch of concurrent checkpoints.
///
Expand Down
13 changes: 13 additions & 0 deletions src/stream/src/task/barrier_manager/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Display, Formatter};

use super::LocalBarrierManager;
use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
use crate::task::barrier_manager::LocalBarrierWorker;
Expand All @@ -26,6 +28,17 @@ pub(crate) enum BackfillState {
Done(ConsumedRows),
}

impl Display for BackfillState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
BackfillState::ConsumingUpstream(epoch, rows) => {
write!(f, "ConsumingUpstream(epoch: {}, rows: {})", epoch, rows)
}
BackfillState::Done(rows) => write!(f, "Done(rows: {})", rows),
}
}
}

impl LocalBarrierWorker {
pub(crate) fn update_create_mview_progress(
&mut self,
Expand Down

0 comments on commit fcfa573

Please sign in to comment.