Skip to content

Commit

Permalink
feat: support inspecting local barrier worker state (#16562)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored May 6, 2024
1 parent 16dbae3 commit fc0376c
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 110 deletions.
6 changes: 5 additions & 1 deletion dashboard/pages/await_tree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ export default function AwaitTreeDump() {
.entries()
.map(([k, v]) => `[Barrier ${k}]\n${v}`)
.join("\n")
const barrierWorkerState = _(response.barrierWorkerState)
.entries()
.map(([k, v]) => `[BarrierWorkerState ${k}]\n${v}`)
.join("\n")

result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}`
result = `${title}\n\n${actorTraces}\n${rpcTraces}\n${compactionTraces}\n${barrierTraces}\n${barrierWorkerState}`
} catch (e: any) {
result = `${title}\n\nERROR: ${e.message}\n${e.cause}`
}
Expand Down
1 change: 1 addition & 0 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message StackTraceResponse {
map<string, string> rpc_traces = 2;
map<string, string> compaction_task_traces = 3;
map<uint64, string> inflight_barrier_traces = 4;
map<uint32, string> barrier_worker_state = 5;
}

// CPU profiling
Expand Down
85 changes: 85 additions & 0 deletions src/common/src/util/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::btree_map::Entry;
use std::fmt::{Display, Formatter};
use std::ops::Deref;

use risingwave_pb::batch_plan;
use risingwave_pb::monitor_service::StackTraceResponse;
use tracing::warn;

pub trait TypeUrl {
fn type_url() -> &'static str;
Expand All @@ -23,3 +29,82 @@ impl TypeUrl for batch_plan::ExchangeNode {
"type.googleapis.com/plan.ExchangeNode"
}
}

pub struct StackTraceResponseOutput<'a>(&'a StackTraceResponse);

impl<'a> Deref for StackTraceResponseOutput<'a> {
type Target = StackTraceResponse;

fn deref(&self) -> &Self::Target {
self.0
}
}

impl<'a> Display for StackTraceResponseOutput<'a> {
fn fmt(&self, s: &mut Formatter<'_>) -> std::fmt::Result {
if !self.actor_traces.is_empty() {
writeln!(s, "--- Actor Traces ---")?;
for (actor_id, trace) in &self.actor_traces {
writeln!(s, ">> Actor {}", *actor_id)?;
writeln!(s, "{trace}")?;
}
}
if !self.rpc_traces.is_empty() {
let _ = writeln!(s, "--- RPC Traces ---");
for (name, trace) in &self.rpc_traces {
writeln!(s, ">> RPC {name}")?;
writeln!(s, "{trace}")?;
}
}
if !self.compaction_task_traces.is_empty() {
writeln!(s, "--- Compactor Traces ---")?;
for (name, trace) in &self.compaction_task_traces {
writeln!(s, ">> Compaction Task {name}")?;
writeln!(s, "{trace}")?;
}
}

if !self.inflight_barrier_traces.is_empty() {
writeln!(s, "--- Inflight Barrier Traces ---")?;
for (name, trace) in &self.inflight_barrier_traces {
writeln!(s, ">> Barrier {name}")?;
writeln!(s, "{trace}")?;
}
}

writeln!(s, "\n\n--- Barrier Worker States ---")?;
for (worker_id, state) in &self.barrier_worker_state {
writeln!(s, ">> Worker {worker_id}")?;
writeln!(s, "{state}\n")?;
}
Ok(())
}
}

#[easy_ext::ext(StackTraceResponseExt)]
impl StackTraceResponse {
pub fn merge_other(&mut self, b: StackTraceResponse) {
self.actor_traces.extend(b.actor_traces);
self.rpc_traces.extend(b.rpc_traces);
self.compaction_task_traces.extend(b.compaction_task_traces);
self.inflight_barrier_traces
.extend(b.inflight_barrier_traces);
for (worker_id, worker_state) in b.barrier_worker_state {
match self.barrier_worker_state.entry(worker_id) {
Entry::Occupied(_entry) => {
warn!(
worker_id,
worker_state, "duplicate barrier worker state. skipped"
);
}
Entry::Vacant(entry) => {
entry.insert(worker_state);
}
}
}
}

pub fn output(&self) -> StackTraceResponseOutput<'_> {
StackTraceResponseOutput(self)
}
}
7 changes: 7 additions & 0 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::ffi::CString;
use std::fs;
use std::path::Path;
Expand Down Expand Up @@ -99,11 +100,17 @@ impl MonitorService for MonitorServiceImpl {
Default::default()
};

let barrier_worker_state = self.stream_mgr.inspect_barrier_state().await?;

Ok(Response::new(StackTraceResponse {
actor_traces,
rpc_traces,
compaction_task_traces,
inflight_barrier_traces: barrier_traces,
barrier_worker_state: BTreeMap::from_iter([(
self.stream_mgr.env.worker_id(),
barrier_worker_state,
)]),
}))
}

Expand Down
40 changes: 5 additions & 35 deletions src/ctl/src/cmd_impl/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,15 @@
// limitations under the License.

use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::StackTraceResponseExt;
use risingwave_pb::common::WorkerType;
use risingwave_pb::monitor_service::StackTraceResponse;
use risingwave_rpc_client::{CompactorClient, ComputeClientPool};

use crate::CtlContext;

fn merge(a: &mut StackTraceResponse, b: StackTraceResponse) {
a.actor_traces.extend(b.actor_traces);
a.rpc_traces.extend(b.rpc_traces);
a.compaction_task_traces.extend(b.compaction_task_traces);
a.inflight_barrier_traces.extend(b.inflight_barrier_traces);
}

pub async fn dump(context: &CtlContext) -> anyhow::Result<()> {
let mut all = Default::default();
let mut all = StackTraceResponse::default();

let meta_client = context.meta_client().await?;

Expand All @@ -41,7 +35,7 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> {
for cn in compute_nodes {
let client = clients.get(&cn).await?;
let response = client.stack_trace().await?;
merge(&mut all, response);
all.merge_other(response);
}

let compactor_nodes = meta_client
Expand All @@ -52,7 +46,7 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> {
let addr: HostAddr = compactor.get_host().unwrap().into();
let client = CompactorClient::new(addr).await?;
let response = client.stack_trace().await?;
merge(&mut all, response);
all.merge_other(response);
}

if all.actor_traces.is_empty()
Expand All @@ -61,32 +55,8 @@ pub async fn dump(context: &CtlContext) -> anyhow::Result<()> {
&& all.inflight_barrier_traces.is_empty()
{
println!("No traces found. No actors are running, or `--async-stack-trace` not set?");
} else {
if !all.actor_traces.is_empty() {
println!("--- Actor Traces ---");
for (key, trace) in all.actor_traces {
println!(">> Actor {key}\n{trace}");
}
}
if !all.rpc_traces.is_empty() {
println!("\n\n--- RPC Traces ---");
for (key, trace) in all.rpc_traces {
println!(">> RPC {key}\n{trace}");
}
}
if !all.compaction_task_traces.is_empty() {
println!("\n\n--- Compactor Traces ---");
for (key, trace) in all.compaction_task_traces {
println!(">> Compaction Task {key}\n{trace}");
}
}
if !all.inflight_barrier_traces.is_empty() {
println!("\n\n--- Inflight Barrier Traces ---");
for (name, trace) in &all.inflight_barrier_traces {
println!(">> Barrier {name}\n{trace}");
}
}
}
println!("{}", all.output());

Ok(())
}
12 changes: 3 additions & 9 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use axum::http::{Method, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use axum::Router;
use risingwave_common::util::StackTraceResponseExt;
use risingwave_rpc_client::ComputeClientPool;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
Expand Down Expand Up @@ -213,20 +214,13 @@ pub(super) mod handlers {
worker_nodes: impl IntoIterator<Item = &WorkerNode>,
compute_clients: &ComputeClientPool,
) -> Result<Json<StackTraceResponse>> {
let mut all = Default::default();

fn merge(a: &mut StackTraceResponse, b: StackTraceResponse) {
a.actor_traces.extend(b.actor_traces);
a.rpc_traces.extend(b.rpc_traces);
a.compaction_task_traces.extend(b.compaction_task_traces);
a.inflight_barrier_traces.extend(b.inflight_barrier_traces);
}
let mut all = StackTraceResponse::default();

for worker_node in worker_nodes {
let client = compute_clients.get(worker_node).await.map_err(err)?;
let result = client.stack_trace().await.map_err(err)?;

merge(&mut all, result);
all.merge_other(result);
}

Ok(all.into())
Expand Down
42 changes: 4 additions & 38 deletions src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use itertools::Itertools;
use prometheus_http_query::response::Data::Vector;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::StackTraceResponseExt;
use risingwave_pb::common::WorkerType;
use risingwave_pb::hummock::Level;
use risingwave_pb::meta::event_log::Event;
Expand Down Expand Up @@ -664,53 +665,18 @@ impl DiagnoseCommand {
return;
};

let mut all = Default::default();

fn merge(a: &mut StackTraceResponse, b: StackTraceResponse) {
a.actor_traces.extend(b.actor_traces);
a.rpc_traces.extend(b.rpc_traces);
a.compaction_task_traces.extend(b.compaction_task_traces);
a.inflight_barrier_traces.extend(b.inflight_barrier_traces);
}
let mut all = StackTraceResponse::default();

let compute_clients = ComputeClientPool::default();
for worker_node in &worker_nodes {
if let Ok(client) = compute_clients.get(worker_node).await
&& let Ok(result) = client.stack_trace().await
{
merge(&mut all, result);
all.merge_other(result);
}
}

if !all.actor_traces.is_empty() {
let _ = writeln!(s, "--- Actor Traces ---");
for (actor_id, trace) in &all.actor_traces {
let _ = writeln!(s, ">> Actor {}", *actor_id);
let _ = writeln!(s, "{trace}");
}
}
if !all.rpc_traces.is_empty() {
let _ = writeln!(s, "--- RPC Traces ---");
for (name, trace) in &all.rpc_traces {
let _ = writeln!(s, ">> RPC {name}");
let _ = writeln!(s, "{trace}");
}
}
if !all.compaction_task_traces.is_empty() {
let _ = writeln!(s, "--- Compactor Traces ---");
for (name, trace) in &all.compaction_task_traces {
let _ = writeln!(s, ">> Compaction Task {name}");
let _ = writeln!(s, "{trace}");
}
}

if !all.inflight_barrier_traces.is_empty() {
let _ = writeln!(s, "--- Inflight Barrier Traces ---");
for (name, trace) in &all.inflight_barrier_traces {
let _ = writeln!(s, ">> Barrier {name}");
let _ = writeln!(s, "{trace}");
}
}
write!(s, "{}", all.output()).unwrap();
}
}

Expand Down
Loading

0 comments on commit fc0376c

Please sign in to comment.