diff --git a/dashboard/pages/await_tree.tsx b/dashboard/pages/await_tree.tsx index 2fb03afa3fc46..8163d4acc23c6 100644 --- a/dashboard/pages/await_tree.tsx +++ b/dashboard/pages/await_tree.tsx @@ -86,8 +86,12 @@ export default function AwaitTreeDump() { .entries() .map(([k, v]) => `[Barrier ${k}]\n${v}`) .join("\n") + const barrierWorkerState = _(response.barrierWorkerState) + .entries() + .map(([k, v]) => `[BarrierWorkerState ${k}]\n${v}`) + .join("\n") - result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}` + result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}\n${barrierWorkerState}` } catch (e: any) { result = `${title}\n\nERROR: ${e.message}\n${e.cause}` } diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto index 6a531e1bbab93..b9238c8d3686d 100644 --- a/proto/monitor_service.proto +++ b/proto/monitor_service.proto @@ -12,6 +12,7 @@ message StackTraceResponse { map rpc_traces = 2; map compaction_task_traces = 3; map inflight_barrier_traces = 4; + map barrier_worker_state = 5; } // CPU profiling diff --git a/src/common/src/util/prost.rs b/src/common/src/util/prost.rs index 4a4f846f543a5..d5d8501b8b819 100644 --- a/src/common/src/util/prost.rs +++ b/src/common/src/util/prost.rs @@ -12,7 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::btree_map::Entry; +use std::fmt::{Display, Formatter}; +use std::ops::Deref; + use risingwave_pb::batch_plan; +use risingwave_pb::monitor_service::StackTraceResponse; +use tracing::warn; pub trait TypeUrl { fn type_url() -> &'static str; @@ -23,3 +29,82 @@ impl TypeUrl for batch_plan::ExchangeNode { "type.googleapis.com/plan.ExchangeNode" } } + +pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse); + +impl<'a> Deref for StackTraceResponseOutput<'a> { + type Target = StackTraceResponse; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl<'a> Display for StackTraceResponseOutput<'a> { + fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result { + if !self.actor_traces.is_empty() { + writeln!(s, "--- Actor Traces ---")?; + for (actor_id, trace) in &self.actor_traces { + writeln!(s, ">> Actor {}", *actor_id)?; + writeln!(s, "{trace}")?; + } + } + if !self.rpc_traces.is_empty() { + let _ = writeln!(s, "--- RPC Traces ---"); + for (name, trace) in &self.rpc_traces { + writeln!(s, ">> RPC {name}")?; + writeln!(s, "{trace}")?; + } + } + if !self.compaction_task_traces.is_empty() { + writeln!(s, "--- Compactor Traces ---")?; + for (name, trace) in &self.compaction_task_traces { + writeln!(s, ">> Compaction Task {name}")?; + writeln!(s, "{trace}")?; + } + } + + if !self.inflight_barrier_traces.is_empty() { + writeln!(s, "--- Inflight Barrier Traces ---")?; + for (name, trace) in &self.inflight_barrier_traces { + writeln!(s, ">> Barrier {name}")?; + writeln!(s, "{trace}")?; + } + } + + writeln!(s, "\n\n--- Barrier Worker States ---")?; + for (worker_id, state) in &self.barrier_worker_state { + writeln!(s, ">> Worker {worker_id}")?; + writeln!(s, "{state}\n")?; + } + Ok(()) + } +} + +#[easy_ext::ext(StackTraceResponseExt)] +impl StackTraceResponse { + pub fn merge_other(&mut self, b: StackTraceResponse) { + self.actor_traces.extend(b.actor_traces); + self.rpc_traces.extend(b.rpc_traces); + self.compaction_task_traces.extend(b.compaction_task_traces); + self.inflight_barrier_traces + .extend(b.inflight_barrier_traces); + for (worker_id, worker_state) in b.barrier_worker_state { + match self.barrier_worker_state.entry(worker_id) { + Entry::Occupied(_entry) => { + warn!( + worker_id, + worker_state, "duplicate barrier worker state. skipped" + ); + } + Entry::Vacant(entry) => { + entry.insert(worker_state); + } + } + } + } + + pub fn output(&self) -> StackTraceResponseOutput<'_> { + StackTraceResponseOutput(self) + } +} diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs index d2542ca9bd085..6723528fcd1b8 100644 --- a/src/compute/src/rpc/service/monitor_service.rs +++ b/src/compute/src/rpc/service/monitor_service.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::ffi::CString; use std::fs; use std::path::Path; @@ -99,11 +100,17 @@ impl MonitorService for MonitorServiceImpl { Default::default() }; + let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?; + Ok(Response::new(StackTraceResponse { actor_traces, rpc_traces, compaction_task_traces, inflight_barrier_traces: barrier_traces, + barrier_worker_state: BTreeMap::from_iter([( + self.stream_mgr.env.worker_id(), + barrier_worker_state, + )]), })) } diff --git a/src/ctl/src/cmd_impl/await_tree.rs b/src/ctl/src/cmd_impl/await_tree.rs index 09c7c36119262..1c4ff98562791 100644 --- a/src/ctl/src/cmd_impl/await_tree.rs +++ b/src/ctl/src/cmd_impl/await_tree.rs @@ -13,21 +13,15 @@ // limitations under the License. use risingwave_common::util::addr::HostAddr; +use risingwave_common::util::StackTraceResponseExt; use risingwave_pb::common::WorkerType; use risingwave_pb::monitor_service::StackTraceResponse; use risingwave_rpc_client::{CompactorClient, ComputeClientPool}; use crate::CtlContext; -fn merge(a: &mut StackTraceResponse, b: StackTraceResponse) { - a.actor_traces.extend(b.actor_traces); - a.rpc_traces.extend(b.rpc_traces); - a.compaction_task_traces.extend(b.compaction_task_traces); - a.inflight_barrier_traces.extend(b.inflight_barrier_traces); -} - pub async fn dump(context: &CtlContext) -> anyhow::Result<()> { - let mut all = Default::default(); + let mut all = StackTraceResponse::default(); let meta_client = context.meta_client().await?; @@ -41,7 +35,7 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> { for cn in compute_nodes { let client = clients.get(&cn).await?; let response = client.stack_trace().await?; - merge(&mut all, response); + all.merge_other(response); } let compactor_nodes = meta_client @@ -52,7 +46,7 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> { let addr: HostAddr = compactor.get_host().unwrap().into(); let client = CompactorClient::new(addr).await?; let response = client.stack_trace().await?; - merge(&mut all, response); + all.merge_other(response); } if all.actor_traces.is_empty() @@ -61,32 +55,8 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> { && all.inflight_barrier_traces.is_empty() { println!("No traces found. No actors are running, or `--async-stack-trace` not set?"); - } else { - if !all.actor_traces.is_empty() { - println!("--- Actor Traces ---"); - for (key, trace) in all.actor_traces { - println!(">> Actor {key}\n{trace}"); - } - } - if !all.rpc_traces.is_empty() { - println!("\n\n--- RPC Traces ---"); - for (key, trace) in all.rpc_traces { - println!(">> RPC {key}\n{trace}"); - } - } - if !all.compaction_task_traces.is_empty() { - println!("\n\n--- Compactor Traces ---"); - for (key, trace) in all.compaction_task_traces { - println!(">> Compaction Task {key}\n{trace}"); - } - } - if !all.inflight_barrier_traces.is_empty() { - println!("\n\n--- Inflight Barrier Traces ---"); - for (name, trace) in &all.inflight_barrier_traces { - println!(">> Barrier {name}\n{trace}"); - } - } } + println!("{}", all.output()); Ok(()) } diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 6d05666819437..b72854c6a3985 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -24,6 +24,7 @@ use axum::http::{Method, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::routing::get; use axum::Router; +use risingwave_common::util::StackTraceResponseExt; use risingwave_rpc_client::ComputeClientPool; use tokio::net::TcpListener; use tower::ServiceBuilder; @@ -213,20 +214,13 @@ pub(super) mod handlers { worker_nodes: impl IntoIterator, compute_clients: &ComputeClientPool, ) -> Result> { - let mut all = Default::default(); - - fn merge(a: &mut StackTraceResponse, b: StackTraceResponse) { - a.actor_traces.extend(b.actor_traces); - a.rpc_traces.extend(b.rpc_traces); - a.compaction_task_traces.extend(b.compaction_task_traces); - a.inflight_barrier_traces.extend(b.inflight_barrier_traces); - } + let mut all = StackTraceResponse::default(); for worker_node in worker_nodes { let client = compute_clients.get(worker_node).await.map_err(err)?; let result = client.stack_trace().await.map_err(err)?; - merge(&mut all, result); + all.merge_other(result); } Ok(all.into()) diff --git a/src/meta/src/manager/diagnose.rs b/src/meta/src/manager/diagnose.rs index fbc04f9324002..06c76c47c5daa 100644 --- a/src/meta/src/manager/diagnose.rs +++ b/src/meta/src/manager/diagnose.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use itertools::Itertools; use prometheus_http_query::response::Data::Vector; use risingwave_common::types::Timestamptz; +use risingwave_common::util::StackTraceResponseExt; use risingwave_pb::common::WorkerType; use risingwave_pb::hummock::Level; use risingwave_pb::meta::event_log::Event; @@ -664,53 +665,18 @@ impl DiagnoseCommand { return; }; - let mut all = Default::default(); - - fn merge(a: &mut StackTraceResponse, b: StackTraceResponse) { - a.actor_traces.extend(b.actor_traces); - a.rpc_traces.extend(b.rpc_traces); - a.compaction_task_traces.extend(b.compaction_task_traces); - a.inflight_barrier_traces.extend(b.inflight_barrier_traces); - } + let mut all = StackTraceResponse::default(); let compute_clients = ComputeClientPool::default(); for worker_node in &worker_nodes { if let Ok(client) = compute_clients.get(worker_node).await && let Ok(result) = client.stack_trace().await { - merge(&mut all, result); + all.merge_other(result); } } - if !all.actor_traces.is_empty() { - let _ = writeln!(s, "--- Actor Traces ---"); - for (actor_id, trace) in &all.actor_traces { - let _ = writeln!(s, ">> Actor {}", *actor_id); - let _ = writeln!(s, "{trace}"); - } - } - if !all.rpc_traces.is_empty() { - let _ = writeln!(s, "--- RPC Traces ---"); - for (name, trace) in &all.rpc_traces { - let _ = writeln!(s, ">> RPC {name}"); - let _ = writeln!(s, "{trace}"); - } - } - if !all.compaction_task_traces.is_empty() { - let _ = writeln!(s, "--- Compactor Traces ---"); - for (name, trace) in &all.compaction_task_traces { - let _ = writeln!(s, ">> Compaction Task {name}"); - let _ = writeln!(s, "{trace}"); - } - } - - if !all.inflight_barrier_traces.is_empty() { - let _ = writeln!(s, "--- Inflight Barrier Traces ---"); - for (name, trace) in &all.inflight_barrier_traces { - let _ = writeln!(s, ">> Barrier {name}"); - let _ = writeln!(s, "{trace}"); - } - } + write!(s, "{}", all.output()).unwrap(); } } diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index a6fe55a512bb9..8e9778d82bb29 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::future::pending; use std::sync::Arc; use std::time::Duration; @@ -61,6 +61,7 @@ use risingwave_pb::stream_service::{ use crate::executor::exchange::permit::Receiver; use crate::executor::monitor::StreamingMetrics; use crate::executor::{Actor, Barrier, DispatchExecutor, Mutation, StreamExecutorError}; +use crate::task::barrier_manager::managed_state::ManagedBarrierStateDebugInfo; use crate::task::barrier_manager::progress::BackfillState; /// If enabled, all actors will be grouped in the same tracing span within one epoch. @@ -99,6 +100,10 @@ impl ControlStreamHandle { } } + pub(super) fn connected(&self) -> bool { + self.pair.is_some() + } + fn reset_stream_with_err(&mut self, err: Status) { if let Some((sender, _)) = self.pair.take() { // Note: `TonicStatusWrapper` provides a better error report. @@ -231,6 +236,9 @@ pub(super) enum LocalActorOperation { senders: Vec>, result_sender: oneshot::Sender<()>, }, + InspectState { + result_sender: oneshot::Sender, + }, } pub(super) struct CreateActorOutput { @@ -254,7 +262,7 @@ pub(crate) struct StreamActorManagerState { pub(super) creating_actors: FuturesUnordered< AttachedFuture< JoinHandle>, - oneshot::Sender>, + (BTreeSet, oneshot::Sender>), >, >, } @@ -275,7 +283,7 @@ impl StreamActorManagerState { oneshot::Sender>, StreamResult, ) { - let (join_result, sender) = pending_on_none(self.creating_actors.next()).await; + let (join_result, (_, sender)) = pending_on_none(self.creating_actors.next()).await; ( sender, try { join_result.context("failed to join creating actors futures")?? }, @@ -297,6 +305,16 @@ pub(crate) struct StreamActorManager { pub(super) runtime: BackgroundShutdownRuntime, } +#[derive(Debug)] +#[expect(dead_code)] +pub(super) struct LocalBarrierWorkerDebugInfo<'a> { + actor_to_send: BTreeSet, + running_actors: BTreeSet, + creating_actors: Vec>, + managed_barrier_state: ManagedBarrierStateDebugInfo<'a>, + has_control_stream_connected: bool, +} + /// [`LocalBarrierWorker`] manages barrier control flow, used by local stream manager. /// Specifically, [`LocalBarrierWorker`] serve barrier injection from meta server, send the /// barriers to and collect them from all actors, and finally report the progress. @@ -355,6 +373,21 @@ impl LocalBarrierWorker { } } + fn to_debug_info(&self) -> LocalBarrierWorkerDebugInfo<'_> { + LocalBarrierWorkerDebugInfo { + actor_to_send: self.barrier_senders.keys().cloned().collect(), + running_actors: self.actor_manager_state.handles.keys().cloned().collect(), + creating_actors: self + .actor_manager_state + .creating_actors + .iter() + .map(|fut| fut.item().0.clone()) + .collect(), + managed_barrier_state: self.state.to_debug_info(), + has_control_stream_connected: self.control_stream_handle.connected(), + } + } + async fn run(mut self, mut actor_op_rx: UnboundedReceiver) { loop { select! { @@ -504,6 +537,9 @@ impl LocalBarrierWorker { self.register_sender(actor_id, senders); let _ = result_sender.send(()); } + LocalActorOperation::InspectState { result_sender } => { + let _ = result_sender.send(format!("{:#?}", self.to_debug_info())); + } } } } diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 4bb39ddabff1e..62bfe279f34af 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -14,7 +14,8 @@ use std::assert_matches::assert_matches; use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::fmt::{Debug, Formatter}; use std::future::Future; use std::mem::replace; use std::sync::Arc; @@ -40,6 +41,23 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::{Barrier, Mutation}; use crate::task::{await_tree_key, ActorId}; +struct IssuedState { + pub mutation: Option>, + /// Actor ids remaining to be collected. + pub remaining_actors: BTreeSet, + + pub barrier_inflight_latency: HistogramTimer, +} + +impl Debug for IssuedState { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IssuedState") + .field("mutation", &self.mutation) + .field("remaining_actors", &self.remaining_actors) + .finish() + } +} + /// The state machine of local barrier manager. #[derive(Debug)] enum ManagedBarrierStateInner { @@ -51,13 +69,7 @@ enum ManagedBarrierStateInner { }, /// Meta service has issued a `send_barrier` request. We're collecting barriers now. - Issued { - mutation: Option>, - /// Actor ids remaining to be collected. - remaining_actors: HashSet, - - barrier_inflight_latency: HistogramTimer, - }, + Issued(IssuedState), /// The barrier has been collected by all remaining actors AllCollected, @@ -124,6 +136,15 @@ fn sync_epoch( } } +#[derive(Debug)] +pub(super) struct ManagedBarrierStateDebugInfo<'a> { + #[expect(dead_code)] + epoch_barrier_state_map: &'a BTreeMap, + + #[expect(dead_code)] + create_mview_progress: &'a HashMap>, +} + pub(super) struct ManagedBarrierState { /// Record barrier state for each epoch of concurrent checkpoints. /// @@ -170,6 +191,13 @@ impl ManagedBarrierState { } } + pub(super) fn to_debug_info(&self) -> ManagedBarrierStateDebugInfo<'_> { + ManagedBarrierStateDebugInfo { + epoch_barrier_state_map: &self.epoch_barrier_state_map, + create_mview_progress: &self.create_mview_progress, + } + } + pub fn read_barrier_mutation( &mut self, barrier: &Barrier, @@ -193,7 +221,7 @@ impl ManagedBarrierState { } => { mutation_senders.push(sender); } - ManagedBarrierStateInner::Issued { mutation, .. } => { + ManagedBarrierStateInner::Issued(IssuedState { mutation, .. }) => { let _ = sender.send(mutation.clone()); } _ => { @@ -219,14 +247,13 @@ impl ManagedBarrierState { for (prev_epoch, barrier_state) in &mut self.epoch_barrier_state_map { let prev_epoch = *prev_epoch; match &barrier_state.inner { - ManagedBarrierStateInner::Issued { + ManagedBarrierStateInner::Issued(IssuedState { remaining_actors, .. - } if remaining_actors.is_empty() => {} + }) if remaining_actors.is_empty() => {} ManagedBarrierStateInner::AllCollected | ManagedBarrierStateInner::Completed(_) => { continue; } - ManagedBarrierStateInner::Stashed { .. } - | ManagedBarrierStateInner::Issued { .. } => { + ManagedBarrierStateInner::Stashed { .. } | ManagedBarrierStateInner::Issued(_) => { break; } } @@ -236,10 +263,10 @@ impl ManagedBarrierState { ManagedBarrierStateInner::AllCollected, ); - must_match!(prev_state, ManagedBarrierStateInner::Issued { + must_match!(prev_state, ManagedBarrierStateInner::Issued(IssuedState { barrier_inflight_latency: timer, .. - } => { + }) => { timer.observe_duration(); }); @@ -317,10 +344,10 @@ impl ManagedBarrierState { .filter_map(move |(prev_epoch, barrier_state)| { #[allow(clippy::single_match)] match barrier_state.inner { - ManagedBarrierStateInner::Issued { + ManagedBarrierStateInner::Issued(IssuedState { ref remaining_actors, .. - } => { + }) => { if remaining_actors.contains(&actor_id) { Some(*prev_epoch) } else { @@ -357,10 +384,10 @@ impl ManagedBarrierState { Some(&mut BarrierState { curr_epoch, inner: - ManagedBarrierStateInner::Issued { + ManagedBarrierStateInner::Issued(IssuedState { ref mut remaining_actors, .. - }, + }), .. }) => { let exist = remaining_actors.remove(&actor_id); @@ -416,11 +443,11 @@ impl ManagedBarrierState { barrier.epoch.prev, BarrierState { curr_epoch: barrier.epoch.curr, - inner: ManagedBarrierStateInner::Issued { - remaining_actors: actor_ids_to_collect, + inner: ManagedBarrierStateInner::Issued(IssuedState { + remaining_actors: BTreeSet::from_iter(actor_ids_to_collect), mutation: barrier.mutation.clone(), barrier_inflight_latency: timer, - }, + }), kind: barrier.kind, }, ); diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index e9c00da238f54..346f200362b2d 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -18,6 +18,7 @@ use std::fmt::Debug; use std::mem::take; use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::time::Instant; use anyhow::anyhow; use async_recursion::async_recursion; @@ -255,6 +256,21 @@ impl LocalStreamManager { }) .await? } + + pub async fn inspect_barrier_state(&self) -> StreamResult { + info!("start inspecting barrier state"); + let start = Instant::now(); + self.actor_op_tx + .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender }) + .inspect(|result| { + info!( + ok = result.is_ok(), + time = ?start.elapsed(), + "finish inspecting barrier state" + ); + }) + .await + } } impl LocalBarrierWorker { @@ -310,7 +326,7 @@ impl LocalBarrierWorker { actors: &[ActorId], result_sender: oneshot::Sender>, ) { - let actors = { + let actors: Vec<_> = { let actor_result = actors .iter() .map(|actor_id| { @@ -328,6 +344,10 @@ impl LocalBarrierWorker { } } }; + let actor_ids = actors + .iter() + .map(|actor| actor.actor.as_ref().unwrap().actor_id) + .collect(); let actor_manager = self.actor_manager.clone(); let create_actors_fut = crate::CONFIG.scope( self.actor_manager.env.config().clone(), @@ -336,7 +356,7 @@ impl LocalBarrierWorker { let join_handle = self.actor_manager.runtime.spawn(create_actors_fut); self.actor_manager_state .creating_actors - .push(AttachedFuture::new(join_handle, result_sender)); + .push(AttachedFuture::new(join_handle, (actor_ids, result_sender))); } } diff --git a/src/utils/futures_util/src/misc.rs b/src/utils/futures_util/src/misc.rs index f224b58214ddf..c6f5bcf07d23d 100644 --- a/src/utils/futures_util/src/misc.rs +++ b/src/utils/futures_util/src/misc.rs @@ -103,6 +103,12 @@ impl AttachedFuture { self.item.expect("should not be called after polled ready"), ) } + + pub fn item(&self) -> &T { + self.item + .as_ref() + .expect("should not be called after polled ready") + } } impl Future for AttachedFuture {