Skip to content

Commit

Permalink
dump all
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Oct 18, 2023
1 parent 728e62d commit 5b775e9
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ description = "Build dashboard"
condition = { env_set = [
"ENABLE_BUILD_DASHBOARD",
], files_modified = { input = [
"./dashboard/**/*.js",
"./dashboard/**/*.ts*",
"./dashboard/package.json",
"./dashboard/next.config.js",
], output = [
Expand Down
31 changes: 22 additions & 9 deletions dashboard/pages/await_tree.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,32 @@ import { getClusterInfoComputeNode } from "./api/cluster"
import useFetch from "./api/fetch"

const SIDEBAR_WIDTH = 200
const ALL_COMPUTE_NODES = ""

export default function AwaitTreeDump() {
const { response: computeNodes } = useFetch(getClusterInfoComputeNode)

const [computeNodeId, setComputeNodeId] = useState<number>()
const [dump, setDump] = useState<string | undefined>("")
const [computeNodeId, setComputeNodeId] = useState<string>()
const [dump, setDump] = useState<string>("")

useEffect(() => {
if (computeNodes && !computeNodeId && computeNodes.length > 0) {
setComputeNodeId(computeNodes[0].id)
if (computeNodes && !computeNodeId) {
setComputeNodeId(ALL_COMPUTE_NODES)
}
}, [computeNodes, computeNodeId])

const dumpTree = async () => {
const title = `Await-Tree Dump of Compute Node ${computeNodeId}:`
setDump(undefined)
if (computeNodeId === undefined) {
return
}

let title
if (computeNodeId === ALL_COMPUTE_NODES) {
title = "Await-Tree Dump of All Compute Nodes:"
} else {
title = `Await-Tree Dump of Compute Node ${computeNodeId}:`
}
setDump("Loading...")

let result

Expand Down Expand Up @@ -92,10 +102,13 @@ export default function AwaitTreeDump() {
<FormLabel>Compute Nodes</FormLabel>
<VStack>
<Select
onChange={(event) =>
setComputeNodeId(parseInt(event.target.value))
}
onChange={(event) => setComputeNodeId(event.target.value)}
>
{computeNodes && (
<option value={ALL_COMPUTE_NODES} key={ALL_COMPUTE_NODES}>
All
</option>
)}
{computeNodes &&
computeNodes.map((n) => (
<option value={n.id} key={n.id}>
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ pub mod grpc_middleware {
};

Either::Right(async move {
let root = registry.lock().await.register(key, req.uri().to_string());
let root = registry.lock().await.register(key, req.uri().path());

root.instrument(inner.call(req)).await
})
Expand Down
43 changes: 36 additions & 7 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(super) mod handlers {
use risingwave_common_heap_profiling::COLLAPSED_SUFFIX;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{Sink, Source, Table};
use risingwave_pb::common::WorkerNode;
use risingwave_pb::common::{WorkerNode, WorkerType};
use risingwave_pb::meta::{ActorLocation, PbTableFragments};
use risingwave_pb::monitor_service::{
HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse,
Expand Down Expand Up @@ -101,7 +101,6 @@ pub(super) mod handlers {
Path(ty): Path<i32>,
Extension(srv): Extension<Service>,
) -> Result<Json<Vec<WorkerNode>>> {
use risingwave_pb::common::WorkerType;
let mut result = srv
.cluster_manager
.list_worker_node(
Expand Down Expand Up @@ -198,6 +197,39 @@ pub(super) mod handlers {
Ok(Json(table_fragments))
}

async fn dump_await_tree_inner(
worker_nodes: impl IntoIterator<Item = &WorkerNode>,
compute_clients: &ComputeClientPool,
) -> Result<Json<StackTraceResponse>> {
let mut all = Default::default();

fn merge(a: &mut StackTraceResponse, b: StackTraceResponse) {
a.actor_traces.extend(b.actor_traces);
a.rpc_traces.extend(b.rpc_traces);
a.compaction_task_traces.extend(b.compaction_task_traces);
}

for worker_node in worker_nodes {
let client = compute_clients.get(&worker_node).await.map_err(err)?;
let result = client.stack_trace().await.map_err(err)?;

merge(&mut all, result);
}

Ok(all.into())
}

pub async fn dump_await_tree_all(
Extension(srv): Extension<Service>,
) -> Result<Json<StackTraceResponse>> {
let worker_nodes = srv
.cluster_manager
.list_worker_node(WorkerType::ComputeNode, None)
.await;

dump_await_tree_inner(&worker_nodes, &srv.compute_clients).await
}

pub async fn dump_await_tree(
Path(worker_id): Path<WorkerId>,
Extension(srv): Extension<Service>,
Expand All @@ -210,11 +242,7 @@ pub(super) mod handlers {
.map_err(err)?
.worker_node;

let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;

let result = client.stack_trace().await.map_err(err)?;

Ok(result.into())
dump_await_tree_inner(std::iter::once(&worker_node), &srv.compute_clients).await
}

pub async fn heap_profile(
Expand Down Expand Up @@ -325,6 +353,7 @@ impl DashboardService {
get(prometheus::list_prometheus_actor_back_pressure),
)
.route("/monitor/await_tree/:worker_id", get(dump_await_tree))
.route("/monitor/await_tree/", get(dump_await_tree_all))
.route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
.route(
"/monitor/list_heap_profile/:worker_id",
Expand Down

0 comments on commit 5b775e9

Please sign in to comment.