From b37a19ccd4a78238683e2ff712c2ad2d796f9a01 Mon Sep 17 00:00:00 2001
From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com>
Date: Mon, 18 Sep 2023 16:18:01 +0800
Subject: [PATCH] feat(dashboard): add memory profiling (#12052)
---
Cargo.lock | 11 +
dashboard/components/Layout.tsx | 1 +
dashboard/package-lock.json | 14 +
dashboard/package.json | 1 +
dashboard/pages/heap_profiling.tsx | 267 ++++++++++++++++++
proto/monitor_service.proto | 19 ++
src/common/Cargo.toml | 1 +
src/common/src/config.rs | 37 ++-
src/common/src/heap_profiling/jeprof.rs | 49 ++++
src/common/src/heap_profiling/mod.rs | 19 ++
src/common/src/lib.rs | 1 +
src/compute/src/lib.rs | 6 +-
.../src/memory_management/memory_manager.rs | 10 +-
src/compute/src/memory_management/mod.rs | 6 +-
src/compute/src/memory_management/policy.rs | 38 +--
.../src/rpc/service/monitor_service.rs | 82 +++++-
src/compute/src/server.rs | 8 +-
src/config/example.toml | 8 +-
src/ctl/src/cmd_impl/profile.rs | 3 +-
src/ctl/src/lib.rs | 2 +-
src/meta/Cargo.toml | 1 +
src/meta/src/dashboard/mod.rs | 110 +++++++-
src/meta/src/rpc/server.rs | 5 +-
src/rpc_client/src/compute_client.rs | 21 +-
src/storage/compactor/src/rpc.rs | 21 +-
25 files changed, 663 insertions(+), 78 deletions(-)
create mode 100644 dashboard/pages/heap_profiling.tsx
create mode 100644 src/common/src/heap_profiling/jeprof.rs
create mode 100644 src/common/src/heap_profiling/mod.rs
diff --git a/Cargo.lock b/Cargo.lock
index 6cb0662b32155..a7e376422e966 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1076,6 +1076,15 @@ dependencies = [
"vsimd",
]
+[[package]]
+name = "base64-url"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c5b0a88aa36e9f095ee2e2b13fb8c5e4313e022783aedacc123328c0084916d"
+dependencies = [
+ "base64 0.21.3",
+]
+
[[package]]
name = "base64ct"
version = "1.6.0"
@@ -6704,6 +6713,7 @@ dependencies = [
"sysinfo",
"tempfile",
"thiserror",
+ "tikv-jemalloc-ctl",
"tinyvec",
"toml 0.7.8",
"tower-layer",
@@ -7205,6 +7215,7 @@ dependencies = [
"aws-config",
"aws-sdk-ec2",
"axum",
+ "base64-url",
"bytes",
"clap",
"crepe",
diff --git a/dashboard/components/Layout.tsx b/dashboard/components/Layout.tsx
index 184e17ac1e535..6d6b17cdc7d80 100644
--- a/dashboard/components/Layout.tsx
+++ b/dashboard/components/Layout.tsx
@@ -140,6 +140,7 @@ function Layout({ children }: { children: React.ReactNode }) {
Debug
Await Tree Dump
+ Heap Profiling
Settings
diff --git a/dashboard/package-lock.json b/dashboard/package-lock.json
index 482996c302099..e72946b00bff7 100644
--- a/dashboard/package-lock.json
+++ b/dashboard/package-lock.json
@@ -13,6 +13,7 @@
"@monaco-editor/react": "^4.4.6",
"@types/d3": "^7.4.0",
"@types/lodash": "^4.14.184",
+ "base64url": "^3.0.1",
"bootstrap-icons": "^1.9.1",
"d3": "^7.6.1",
"d3-axis": "^3.0.0",
@@ -3615,6 +3616,14 @@
"resolved": "https://registry.npmjs.org/base16/-/base16-1.0.0.tgz",
"integrity": "sha512-pNdYkNPiJUnEhnfXV56+sQy8+AaPcG3POZAUnwr4EeqCUZFz4u2PePbo3e5Gj4ziYPCWGUZT9RHisvJKnwFuBQ=="
},
+ "node_modules/base64url": {
+ "version": "3.0.1",
+ "resolved": "https://registry.npmjs.org/base64url/-/base64url-3.0.1.tgz",
+ "integrity": "sha512-ir1UPr3dkwexU7FdV8qBBbNDRUhMmIekYMFZfi+C/sLNnRESKPl23nB9b2pltqfOQNnGzsDdId90AEtG5tCx4A==",
+ "engines": {
+ "node": ">=6.0.0"
+ }
+ },
"node_modules/big-integer": {
"version": "1.6.51",
"resolved": "https://registry.npmjs.org/big-integer/-/big-integer-1.6.51.tgz",
@@ -13631,6 +13640,11 @@
"resolved": "https://registry.npmjs.org/base16/-/base16-1.0.0.tgz",
"integrity": "sha512-pNdYkNPiJUnEhnfXV56+sQy8+AaPcG3POZAUnwr4EeqCUZFz4u2PePbo3e5Gj4ziYPCWGUZT9RHisvJKnwFuBQ=="
},
+ "base64url": {
+ "version": "3.0.1",
+ "resolved": "https://registry.npmjs.org/base64url/-/base64url-3.0.1.tgz",
+ "integrity": "sha512-ir1UPr3dkwexU7FdV8qBBbNDRUhMmIekYMFZfi+C/sLNnRESKPl23nB9b2pltqfOQNnGzsDdId90AEtG5tCx4A=="
+ },
"big-integer": {
"version": "1.6.51",
"resolved": "https://registry.npmjs.org/big-integer/-/big-integer-1.6.51.tgz",
diff --git a/dashboard/package.json b/dashboard/package.json
index 94e8fccdf6138..a0642de4380df 100644
--- a/dashboard/package.json
+++ b/dashboard/package.json
@@ -19,6 +19,7 @@
"@monaco-editor/react": "^4.4.6",
"@types/d3": "^7.4.0",
"@types/lodash": "^4.14.184",
+ "base64url": "^3.0.1",
"bootstrap-icons": "^1.9.1",
"d3": "^7.6.1",
"d3-axis": "^3.0.0",
diff --git a/dashboard/pages/heap_profiling.tsx b/dashboard/pages/heap_profiling.tsx
new file mode 100644
index 0000000000000..88dce6a6a4d09
--- /dev/null
+++ b/dashboard/pages/heap_profiling.tsx
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2023 RisingWave Labs
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import {
+ Box,
+ Button,
+ Flex,
+ FormControl,
+ FormLabel,
+ Select,
+ VStack,
+} from "@chakra-ui/react"
+import Editor from "@monaco-editor/react"
+import base64url from "base64url"
+import { randomUUID } from "crypto"
+import Head from "next/head"
+import path from "path"
+import { Fragment, useEffect, useState } from "react"
+import SpinnerOverlay from "../components/SpinnerOverlay"
+import Title from "../components/Title"
+import { WorkerNode } from "../proto/gen/common"
+import { ListHeapProfilingResponse } from "../proto/gen/monitor_service"
+import api from "./api/api"
+import { getClusterInfoComputeNode } from "./api/cluster"
+import useFetch from "./api/fetch"
+
+const SIDEBAR_WIDTH = 200
+
+interface FileList {
+ dir: string
+ name: string[]
+}
+
+export default function HeapProfiling() {
+ const { response: computeNodes } = useFetch(getClusterInfoComputeNode)
+
+ const [computeNodeId, setComputeNodeId] = useState()
+ const [displayInfo, setDisplayInfo] = useState("")
+ const [profileList, setProfileList] = useState<
+ ListHeapProfilingResponse | undefined
+ >()
+ const [selectedProfileList, setSelectedProfileList] = useState<
+ FileList | undefined
+ >()
+ const [profileType, setProfileType] = useState("Auto")
+ const [analyzeTargetFileName, setAnalyzeTargetFileName] = useState<
+ string | undefined
+ >()
+
+ useEffect(() => {
+ if (computeNodes && !computeNodeId && computeNodes.length > 0) {
+ setComputeNodeId(computeNodes[0].id)
+ }
+ }, [computeNodes, computeNodeId])
+
+ async function getProfileList(
+ computeNodes: WorkerNode[] | undefined,
+ computeNodeId: number | undefined
+ ) {
+ if (computeNodes && computeNodeId && computeNodes.length > 0) {
+ try {
+ let list: ListHeapProfilingResponse =
+ ListHeapProfilingResponse.fromJSON(
+ await api.get(`/api/monitor/list_heap_profile/${computeNodeId}`)
+ )
+ setProfileList(list)
+ } catch (e: any) {
+ console.error(e)
+ let result = `Getting Profiling File List\n$Error: ${e.message}]`
+ setDisplayInfo(result)
+ }
+ }
+ }
+
+ useEffect(() => {
+ getProfileList(computeNodes, computeNodeId)
+ }, [computeNodes, computeNodeId])
+
+ useEffect(() => {
+ if (!profileList) {
+ return
+ }
+ if (profileType === "Auto") {
+ setSelectedProfileList({
+ dir: profileList.dir,
+ name: profileList.nameAuto,
+ })
+ } else if (profileType === "Manually") {
+ setSelectedProfileList({
+ dir: profileList.dir,
+ name: profileList.nameManually,
+ })
+ } else {
+ console.error(`Bad profileType ${profileType}`)
+ }
+ }, [profileType, profileList])
+
+ useEffect(() => {
+ if (!selectedProfileList) {
+ return
+ }
+ if (selectedProfileList.name.length > 0) {
+ setAnalyzeTargetFileName(selectedProfileList.name[0])
+ }
+ }, [selectedProfileList])
+
+ async function dumpProfile() {
+ api.get(`/api/monitor/dump_heap_profile/${computeNodeId}`)
+ getProfileList(computeNodes, computeNodeId)
+ }
+
+ async function analyzeHeapFile() {
+ if (
+ selectedProfileList === undefined ||
+ analyzeTargetFileName === undefined
+ ) {
+ console.log(
+ `selectedProfileList: ${selectedProfileList}, analyzeTargetFileName: ${analyzeTargetFileName}`
+ )
+ return
+ }
+
+ let analyzeFilePath = path.join(
+ selectedProfileList.dir,
+ analyzeTargetFileName
+ )
+
+ setDisplayInfo(
+ `Analyzing ${analyzeTargetFileName} from Compute Node ${computeNodeId}`
+ )
+
+ const title = `Collapsed Profiling of Compute Node ${computeNodeId} for ${analyzeTargetFileName}`
+
+ let result
+ try {
+ let analyzeFilePathBase64 = base64url(analyzeFilePath)
+ let resObj = await fetch(
+ `/api/monitor/analyze/${computeNodeId}/${analyzeFilePathBase64}`
+ ).then(async (res) => ({
+ filename: res.headers.get("content-disposition"),
+ blob: await res.blob(),
+ }))
+ let objUrl = window.URL.createObjectURL(resObj.blob)
+ let link = document.createElement("a")
+ link.href = objUrl
+ link.download = resObj.filename || randomUUID()
+ link.click()
+ result = `${title}\n\nDownloaded!`
+ } catch (e: any) {
+ result = `${title}\n\nError: ${e.message}`
+ }
+
+ setDisplayInfo(result)
+ }
+
+ const retVal = (
+
+ Heap Profiling
+
+
+
+ Dump Heap Profile
+
+ Compute Nodes
+
+
+
+
+
+ Analyze Heap Profile
+
+ Dumped By
+
+ Dumped Files
+
+
+
+
+
+
+ {displayInfo === undefined ? (
+
+ ) : (
+
+ )}
+
+
+
+ )
+
+ return (
+
+
+ Heap Profiling
+
+ {retVal}
+
+ )
+}
diff --git a/proto/monitor_service.proto b/proto/monitor_service.proto
index e364a2bff7704..7c7769da6b7ff 100644
--- a/proto/monitor_service.proto
+++ b/proto/monitor_service.proto
@@ -31,8 +31,27 @@ message HeapProfilingRequest {
message HeapProfilingResponse {}
+message ListHeapProfilingRequest {}
+message ListHeapProfilingResponse {
+ string dir = 1;
+ repeated string name_manually = 2;
+ repeated string name_auto = 3;
+}
+
+// Analyze dumped files
+message AnalyzeHeapRequest {
+ // The file path
+ string path = 1;
+}
+
+message AnalyzeHeapResponse {
+ bytes result = 1;
+}
+
service MonitorService {
rpc StackTrace(StackTraceRequest) returns (StackTraceResponse);
rpc Profiling(ProfilingRequest) returns (ProfilingResponse);
rpc HeapProfiling(HeapProfilingRequest) returns (HeapProfilingResponse);
+ rpc ListHeapProfiling(ListHeapProfilingRequest) returns (ListHeapProfilingResponse);
+ rpc AnalyzeHeap(AnalyzeHeapRequest) returns (AnalyzeHeapResponse);
}
diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml
index 5484b2b88c412..92992687c6c2e 100644
--- a/src/common/Cargo.toml
+++ b/src/common/Cargo.toml
@@ -84,6 +84,7 @@ strum = "0.25"
strum_macros = "0.25"
sysinfo = { version = "0.29", default-features = false }
thiserror = "1"
+tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tinyvec = { version = "1", features = ["rustc_1_55", "grab_spare_slice"] }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
diff --git a/src/common/src/config.rs b/src/common/src/config.rs
index 7ad9a29e429c1..2924b5dcdbf6b 100644
--- a/src/common/src/config.rs
+++ b/src/common/src/config.rs
@@ -366,12 +366,12 @@ pub struct ServerConfig {
#[serde(default = "default::server::telemetry_enabled")]
pub telemetry_enabled: bool,
- #[serde(default, flatten)]
- pub unrecognized: Unrecognized,
-
/// Enable heap profile dump when memory usage is high.
#[serde(default)]
- pub auto_dump_heap_profile: AutoDumpHeapProfileConfig,
+ pub heap_profiling: HeapProfilingConfig,
+
+ #[serde(default, flatten)]
+ pub unrecognized: Unrecognized,
}
/// The section `[batch]` in `risingwave.toml`.
@@ -657,18 +657,18 @@ impl AsyncStackTraceOption {
}
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
-pub struct AutoDumpHeapProfileConfig {
+pub struct HeapProfilingConfig {
/// Enable to auto dump heap profile when memory usage is high
- #[serde(default = "default::auto_dump_heap_profile::enabled")]
- pub enabled: bool,
+ #[serde(default = "default::heap_profiling::enable_auto")]
+ pub enable_auto: bool,
+
+ /// The proportion (number between 0 and 1) of memory usage to trigger heap profile dump
+ #[serde(default = "default::heap_profiling::threshold_auto")]
+ pub threshold_auto: f32,
/// The directory to dump heap profile. If empty, the prefix in `MALLOC_CONF` will be used
- #[serde(default = "default::auto_dump_heap_profile::dir")]
+ #[serde(default = "default::heap_profiling::dir")]
pub dir: String,
-
- /// The proportion (number between 0 and 1) of memory usage to trigger heap profile dump
- #[serde(default = "default::auto_dump_heap_profile::threshold")]
- pub threshold: f32,
}
serde_with::with_prefix!(streaming_prefix "stream_");
@@ -736,7 +736,6 @@ pub struct BatchDeveloperConfig {
#[serde(default = "default::developer::batch_chunk_size")]
pub chunk_size: usize,
}
-
/// The section `[system]` in `risingwave.toml`. All these fields are used to initialize the system
/// parameters persisted in Meta store. Most fields are for testing purpose only and should not be
/// documented.
@@ -1125,17 +1124,17 @@ pub mod default {
}
}
- pub mod auto_dump_heap_profile {
- pub fn enabled() -> bool {
+ pub mod heap_profiling {
+ pub fn enable_auto() -> bool {
true
}
- pub fn dir() -> String {
- "".to_string()
+ pub fn threshold_auto() -> f32 {
+ 0.9
}
- pub fn threshold() -> f32 {
- 0.9
+ pub fn dir() -> String {
+ "./".to_string()
}
}
diff --git a/src/common/src/heap_profiling/jeprof.rs b/src/common/src/heap_profiling/jeprof.rs
new file mode 100644
index 0000000000000..5c79c67604418
--- /dev/null
+++ b/src/common/src/heap_profiling/jeprof.rs
@@ -0,0 +1,49 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::path::Path;
+use std::process::Command;
+use std::{env, fs};
+
+use anyhow::anyhow;
+
+use crate::error::Result;
+
+pub async fn run(profile_path: String, collapsed_path: String) -> Result<()> {
+ let executable_path = env::current_exe()?;
+
+ let prof_cmd = move || {
+ Command::new("jeprof")
+ .arg("--collapsed")
+ .arg(executable_path)
+ .arg(Path::new(&profile_path))
+ .output()
+ };
+ match tokio::task::spawn_blocking(prof_cmd).await.unwrap() {
+ Ok(output) => {
+ if output.status.success() {
+ fs::write(Path::new(&collapsed_path), &output.stdout)?;
+ Ok(())
+ } else {
+ Err(anyhow!(
+ "jeprof exit with an error. stdout: {}, stderr: {}",
+ String::from_utf8_lossy(&output.stdout),
+ String::from_utf8_lossy(&output.stderr)
+ )
+ .into())
+ }
+ }
+ Err(e) => Err(e.into()),
+ }
+}
diff --git a/src/common/src/heap_profiling/mod.rs b/src/common/src/heap_profiling/mod.rs
new file mode 100644
index 0000000000000..7ab270bdecca7
--- /dev/null
+++ b/src/common/src/heap_profiling/mod.rs
@@ -0,0 +1,19 @@
+// Copyright 2023 RisingWave Labs
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+pub const MANUALLY_DUMP_MID_NAME: &str = "manually-dump.compute.heap";
+pub const AUTO_DUMP_MID_NAME: &str = "auto-dump.compute.heap";
+pub const COLLAPSED_SUFFIX: &str = "collapsed";
+
+pub mod jeprof;
diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs
index da58e53b8c52d..86d29213374a0 100644
--- a/src/common/src/lib.rs
+++ b/src/common/src/lib.rs
@@ -77,6 +77,7 @@ pub mod test_utils;
pub mod types;
pub mod vnode_mapping;
+pub mod heap_profiling;
pub mod range;
pub mod test_prelude {
diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs
index e19bdaeb1427a..ba53ebb2d3aef 100644
--- a/src/compute/src/lib.rs
+++ b/src/compute/src/lib.rs
@@ -123,9 +123,9 @@ pub struct ComputeNodeOpts {
pub async_stack_trace: Option,
/// Enable heap profile dump when memory usage is high.
- #[clap(long, env = "RW_AUTO_DUMP_HEAP_PROFILE_DIR")]
- #[override_opts(path = server.auto_dump_heap_profile.dir)]
- pub auto_dump_heap_profile_dir: Option,
+ #[clap(long, env = "RW_HEAP_PROFILING_DIR")]
+ #[override_opts(path = server.heap_profiling.dir)]
+ pub heap_profiling_dir: Option,
#[clap(long, env = "RW_OBJECT_STORE_STREAMING_READ_TIMEOUT_MS", value_enum)]
#[override_opts(path = storage.object_store_streaming_read_timeout_ms)]
diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs
index 656d42ed54422..4f011debd77d6 100644
--- a/src/compute/src/memory_management/memory_manager.rs
+++ b/src/compute/src/memory_management/memory_manager.rs
@@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use risingwave_batch::task::BatchManager;
-use risingwave_common::config::AutoDumpHeapProfileConfig;
+use risingwave_common::config::HeapProfilingConfig;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
@@ -47,15 +47,13 @@ impl GlobalMemoryManager {
pub fn new(
metrics: Arc,
total_memory_bytes: usize,
- auto_dump_heap_profile_config: AutoDumpHeapProfileConfig,
+ heap_profiling_config: HeapProfilingConfig,
) -> Arc {
let memory_control_policy =
- build_memory_control_policy(total_memory_bytes, auto_dump_heap_profile_config.clone());
+ build_memory_control_policy(total_memory_bytes, heap_profiling_config.clone());
tracing::info!("memory control policy: {:?}", &memory_control_policy);
- if auto_dump_heap_profile_config.enabled {
- fs::create_dir_all(&auto_dump_heap_profile_config.dir).unwrap();
- }
+ fs::create_dir_all(&heap_profiling_config.dir).unwrap();
Arc::new(Self {
watermark_epoch: Arc::new(0.into()),
metrics,
diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs
index 0b45f92bfb4a1..6d41eaac9955f 100644
--- a/src/compute/src/memory_management/mod.rs
+++ b/src/compute/src/memory_management/mod.rs
@@ -22,7 +22,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use risingwave_batch::task::BatchManager;
-use risingwave_common::config::{AutoDumpHeapProfileConfig, StorageConfig, StorageMemoryConfig};
+use risingwave_common::config::{HeapProfilingConfig, StorageConfig, StorageMemoryConfig};
use risingwave_common::util::pretty_bytes::convert;
use risingwave_stream::task::LocalStreamManager;
@@ -69,14 +69,14 @@ pub trait MemoryControl: Send + Sync + std::fmt::Debug {
pub fn build_memory_control_policy(
total_memory_bytes: usize,
- auto_dump_heap_profile_config: AutoDumpHeapProfileConfig,
+ heap_profiling_config: HeapProfilingConfig,
) -> MemoryControlRef {
use self::policy::JemallocMemoryControl;
if cfg!(target_os = "linux") {
Box::new(JemallocMemoryControl::new(
total_memory_bytes,
- auto_dump_heap_profile_config,
+ heap_profiling_config,
))
} else {
// We disable memory control on operating systems other than Linux now because jemalloc
diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs
index 005edb813f823..9d450a4f38fc4 100644
--- a/src/compute/src/memory_management/policy.rs
+++ b/src/compute/src/memory_management/policy.rs
@@ -19,7 +19,8 @@ use std::sync::Arc;
use chrono;
use risingwave_batch::task::BatchManager;
-use risingwave_common::config::AutoDumpHeapProfileConfig;
+use risingwave_common::config::HeapProfilingConfig;
+use risingwave_common::heap_profiling::AUTO_DUMP_MID_NAME;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::task::LocalStreamManager;
use tikv_jemalloc_ctl::{
@@ -43,7 +44,7 @@ pub struct JemallocMemoryControl {
jemalloc_dump_mib: jemalloc_prof::dump_mib,
dump_seq: u64,
- auto_dump_heap_profile_config: AutoDumpHeapProfileConfig,
+ heap_profiling_config: HeapProfilingConfig,
}
impl JemallocMemoryControl {
@@ -51,15 +52,12 @@ impl JemallocMemoryControl {
const THRESHOLD_GRACEFUL: f64 = 0.8;
const THRESHOLD_STABLE: f64 = 0.7;
- pub fn new(
- total_memory: usize,
- auto_dump_heap_profile_config: AutoDumpHeapProfileConfig,
- ) -> Self {
+ pub fn new(total_memory: usize, heap_profiling_config: HeapProfilingConfig) -> Self {
let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize;
let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize;
let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize;
let threshold_auto_dump_heap_profile =
- (total_memory as f64 * auto_dump_heap_profile_config.threshold as f64) as usize;
+ (total_memory as f64 * heap_profiling_config.threshold_auto as f64) as usize;
let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap();
let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap();
@@ -76,7 +74,7 @@ impl JemallocMemoryControl {
jemalloc_active_mib,
jemalloc_dump_mib,
dump_seq: 0,
- auto_dump_heap_profile_config,
+ heap_profiling_config,
}
}
@@ -102,7 +100,7 @@ impl JemallocMemoryControl {
}
fn dump_heap_prof(&self, cur_used_memory_bytes: usize, prev_used_memory_bytes: usize) {
- if !self.auto_dump_heap_profile_config.enabled {
+ if !self.heap_profiling_config.enable_auto {
return;
}
@@ -116,23 +114,13 @@ impl JemallocMemoryControl {
}
let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S").to_string();
- let file_name = format!(
- "{}.exceed-threshold-aggressive-heap-prof.compute.dump.{}\0",
- time_prefix, self.dump_seq,
- );
+ let file_name = format!("{}.{}.{}\0", time_prefix, AUTO_DUMP_MID_NAME, self.dump_seq,);
- let file_path = if !self.auto_dump_heap_profile_config.dir.is_empty() {
- Path::new(&self.auto_dump_heap_profile_config.dir)
- .join(Path::new(&file_name))
- .to_str()
- .unwrap()
- .to_string()
- } else {
- let prof_prefix = jemalloc_opt::prof_prefix::read().unwrap();
- let mut file_path = prof_prefix.to_str().unwrap().to_string();
- file_path.push_str(&file_name);
- file_path
- };
+ let file_path = Path::new(&self.heap_profiling_config.dir)
+ .join(Path::new(&file_name))
+ .to_str()
+ .unwrap()
+ .to_string();
let file_path_str = Box::leak(file_path.into_boxed_str());
let file_path_bytes = unsafe { file_path_str.as_bytes_mut() };
diff --git a/src/compute/src/rpc/service/monitor_service.rs b/src/compute/src/rpc/service/monitor_service.rs
index 01f1ed6d1c4cf..02b89f5bdf4bf 100644
--- a/src/compute/src/rpc/service/monitor_service.rs
+++ b/src/compute/src/rpc/service/monitor_service.rs
@@ -12,12 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::fs;
+use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
+use itertools::Itertools;
+use risingwave_common::config::ServerConfig;
+use risingwave_common::heap_profiling::{
+ self, AUTO_DUMP_MID_NAME, COLLAPSED_SUFFIX, MANUALLY_DUMP_MID_NAME,
+};
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
- HeapProfilingRequest, HeapProfilingResponse, ProfilingRequest, ProfilingResponse,
+ AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
+ ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
StackTraceRequest, StackTraceResponse,
};
use risingwave_stream::task::LocalStreamManager;
@@ -27,16 +35,19 @@ use tonic::{Request, Response, Status};
pub struct MonitorServiceImpl {
stream_mgr: Arc,
grpc_await_tree_reg: Option,
+ server_config: ServerConfig,
}
impl MonitorServiceImpl {
pub fn new(
stream_mgr: Arc,
grpc_await_tree_reg: Option,
+ server_config: ServerConfig,
) -> Self {
Self {
stream_mgr,
grpc_await_tree_reg,
+ server_config,
}
}
}
@@ -129,8 +140,13 @@ impl MonitorService for MonitorServiceImpl {
}
let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S").to_string();
- let file_name = format!("{}.risectl-dump-heap-prof.compute.dump\0", time_prefix,);
- let dir = PathBuf::from(request.into_inner().get_dir());
+ let file_name = format!("{}.{}\0", time_prefix, MANUALLY_DUMP_MID_NAME);
+ let arg_dir = request.into_inner().get_dir().clone();
+ let dir = PathBuf::from(if arg_dir.is_empty() {
+ &self.server_config.heap_profiling.dir
+ } else {
+ &arg_dir
+ });
create_dir_all(&dir)?;
let file_path_buf = dir.join(file_name);
@@ -144,7 +160,7 @@ impl MonitorService for MonitorServiceImpl {
let response = if let Err(e) = tikv_jemalloc_ctl::prof::dump::write(
CStr::from_bytes_with_nul(file_path_bytes).unwrap(),
) {
- tracing::warn!("Risectl Jemalloc dump heap file failed! {:?}", e);
+ tracing::warn!("Manually Jemalloc dump heap file failed! {:?}", e);
Err(Status::internal(e.to_string()))
} else {
Ok(Response::new(HeapProfilingResponse {}))
@@ -152,6 +168,64 @@ impl MonitorService for MonitorServiceImpl {
let _ = unsafe { Box::from_raw(file_path_ptr) };
response
}
+
+ #[cfg_attr(coverage, no_coverage)]
+ async fn list_heap_profiling(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ let dump_dir = self.server_config.heap_profiling.dir.clone();
+ let auto_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
+ .map(|entry| {
+ let entry = entry?;
+ Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
+ })
+ .filter(|name| {
+ if let Ok(name) = name {
+ name.contains(AUTO_DUMP_MID_NAME) && !name.ends_with(COLLAPSED_SUFFIX)
+ } else {
+ true
+ }
+ })
+ .try_collect()?;
+ let manually_dump_files_name: Vec<_> = fs::read_dir(dump_dir.clone())?
+ .map(|entry| {
+ let entry = entry?;
+ Ok::<_, Status>(entry.file_name().to_string_lossy().to_string())
+ })
+ .filter(|name| {
+ if let Ok(name) = name {
+ name.contains(MANUALLY_DUMP_MID_NAME) && !name.ends_with(COLLAPSED_SUFFIX)
+ } else {
+ true
+ }
+ })
+ .try_collect()?;
+
+ Ok(Response::new(ListHeapProfilingResponse {
+ dir: dump_dir,
+ name_auto: auto_dump_files_name,
+ name_manually: manually_dump_files_name,
+ }))
+ }
+
+ #[cfg_attr(coverage, no_coverage)]
+ async fn analyze_heap(
+ &self,
+ request: Request,
+ ) -> Result, Status> {
+ let dumped_path_str = request.into_inner().get_path().clone();
+ let collapsed_path_str = format!("{}.{}", dumped_path_str, COLLAPSED_SUFFIX);
+ let collapsed_path = Path::new(&collapsed_path_str);
+
+ // run jeprof if the target was not analyzed before
+ if !collapsed_path.exists() {
+ heap_profiling::jeprof::run(dumped_path_str, collapsed_path_str.clone()).await?;
+ }
+
+ let file = fs::read(Path::new(&collapsed_path_str))?;
+ Ok(Response::new(AnalyzeHeapResponse { result: file }))
+ }
}
pub use grpc_middleware::*;
diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs
index aef6895ad6b5b..a86bf198efaf4 100644
--- a/src/compute/src/server.rs
+++ b/src/compute/src/server.rs
@@ -287,7 +287,7 @@ pub async fn compute_node_serve(
let memory_mgr = GlobalMemoryManager::new(
streaming_metrics.clone(),
total_memory_bytes,
- config.server.auto_dump_heap_profile.clone(),
+ config.server.heap_profiling.clone(),
);
// Run a background memory monitor
tokio::spawn(memory_mgr.clone().run(
@@ -372,7 +372,11 @@ pub async fn compute_node_serve(
let exchange_srv =
ExchangeServiceImpl::new(batch_mgr.clone(), stream_mgr.clone(), exchange_srv_metrics);
let stream_srv = StreamServiceImpl::new(stream_mgr.clone(), stream_env.clone());
- let monitor_srv = MonitorServiceImpl::new(stream_mgr.clone(), grpc_await_tree_reg.clone());
+ let monitor_srv = MonitorServiceImpl::new(
+ stream_mgr.clone(),
+ grpc_await_tree_reg.clone(),
+ config.server.clone(),
+ );
let config_srv = ConfigServiceImpl::new(batch_mgr, stream_mgr);
let health_srv = HealthServiceImpl::new();
diff --git a/src/config/example.toml b/src/config/example.toml
index 603d0b5cc2bba..c2456a59e45b3 100644
--- a/src/config/example.toml
+++ b/src/config/example.toml
@@ -6,10 +6,10 @@ connection_pool_size = 16
metrics_level = "Info"
telemetry_enabled = true
-[server.auto_dump_heap_profile]
-enabled = true
-dir = ""
-threshold = 0.8999999761581421
+[server.heap_profiling]
+enable_auto = true
+threshold_auto = 0.8999999761581421
+dir = "./"
[meta]
min_sst_retention_time_sec = 86400
diff --git a/src/ctl/src/cmd_impl/profile.rs b/src/ctl/src/cmd_impl/profile.rs
index 0069adee37687..fc487f1d2c17d 100644
--- a/src/ctl/src/cmd_impl/profile.rs
+++ b/src/ctl/src/cmd_impl/profile.rs
@@ -82,7 +82,8 @@ pub async fn cpu_profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<(
Ok(())
}
-pub async fn heap_profile(context: &CtlContext, dir: String) -> anyhow::Result<()> {
+pub async fn heap_profile(context: &CtlContext, dir: Option) -> anyhow::Result<()> {
+ let dir = dir.unwrap_or_default();
let meta_client = context.meta_client().await?;
let workers = meta_client.get_cluster_info().await?.worker_nodes;
diff --git a/src/ctl/src/lib.rs b/src/ctl/src/lib.rs
index adcb6cf3b3472..45e61f80b5fae 100644
--- a/src/ctl/src/lib.rs
+++ b/src/ctl/src/lib.rs
@@ -466,7 +466,7 @@ pub enum ProfileCommands {
Heap {
/// The output directory of the dumped file
#[clap(long = "dir")]
- dir: String,
+ dir: Option,
},
}
diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml
index d8401cc5a7f71..cd81265e5f733 100644
--- a/src/meta/Cargo.toml
+++ b/src/meta/Cargo.toml
@@ -20,6 +20,7 @@ assert_matches = "1"
async-trait = "0.1"
aws-config = { workspace = true }
aws-sdk-ec2 = { workspace = true }
+base64-url = { version = "2.0.0" }
bytes = { version = "1", features = ["serde"] }
clap = { version = "4", features = ["derive", "env"] }
crepe = "0.1"
diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs
index a08eaa70cf515..2ebc9d924f50b 100644
--- a/src/meta/src/dashboard/mod.rs
+++ b/src/meta/src/dashboard/mod.rs
@@ -17,13 +17,14 @@ mod proxy;
use std::collections::HashMap;
use std::net::SocketAddr;
+use std::path::Path as FilePath;
use std::sync::Arc;
use anyhow::{anyhow, Result};
-use axum::body::Body;
+use axum::body::{boxed, Body};
use axum::extract::{Extension, Path};
use axum::http::{Method, StatusCode};
-use axum::response::IntoResponse;
+use axum::response::{IntoResponse, Response};
use axum::routing::{get, get_service};
use axum::Router;
use hyper::Request;
@@ -45,8 +46,7 @@ pub struct DashboardService {
pub cluster_manager: ClusterManagerRef,
pub fragment_manager: FragmentManagerRef,
pub compute_clients: ComputeClientPool,
-
- // TODO: replace with catalog manager.
+ pub ui_path: Option,
pub meta_store: MetaStoreRef,
}
@@ -56,11 +56,15 @@ pub(super) mod handlers {
use anyhow::Context;
use axum::Json;
use itertools::Itertools;
+ use risingwave_common::bail;
+ use risingwave_common::heap_profiling::COLLAPSED_SUFFIX;
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::catalog::{Sink, Source, Table};
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::{ActorLocation, PbTableFragments};
- use risingwave_pb::monitor_service::StackTraceResponse;
+ use risingwave_pb::monitor_service::{
+ HeapProfilingResponse, ListHeapProfilingResponse, StackTraceResponse,
+ };
use serde_json::json;
use super::*;
@@ -75,6 +79,12 @@ pub(super) mod handlers {
DashboardError(err.into())
}
+ impl From for DashboardError {
+ fn from(value: anyhow::Error) -> Self {
+ DashboardError(value)
+ }
+ }
+
impl IntoResponse for DashboardError {
fn into_response(self) -> axum::response::Response {
let mut resp = Json(json!({
@@ -206,11 +216,93 @@ pub(super) mod handlers {
Ok(result.into())
}
+
+ pub async fn heap_profile(
+ Path(worker_id): Path,
+ Extension(srv): Extension,
+ ) -> Result> {
+ let worker_node = srv
+ .cluster_manager
+ .get_worker_by_id(worker_id)
+ .await
+ .context("worker node not found")
+ .map_err(err)?
+ .worker_node;
+
+ let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
+
+ let result = client.heap_profile("".to_string()).await.map_err(err)?;
+
+ Ok(result.into())
+ }
+
+ pub async fn list_heap_profile(
+ Path(worker_id): Path,
+ Extension(srv): Extension,
+ ) -> Result> {
+ let worker_node = srv
+ .cluster_manager
+ .get_worker_by_id(worker_id)
+ .await
+ .context("worker node not found")
+ .map_err(err)?
+ .worker_node;
+
+ let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
+
+ let result = client.list_heap_profile().await.map_err(err)?;
+ Ok(result.into())
+ }
+
+ pub async fn analyze_heap(
+ Path((worker_id, file_path)): Path<(WorkerId, String)>,
+ Extension(srv): Extension,
+ ) -> Result {
+ if srv.ui_path.is_none() {
+ bail!("Should provide ui_path");
+ }
+
+ let file_path =
+ String::from_utf8(base64_url::decode(&file_path).map_err(err)?).map_err(err)?;
+
+ let file_name = FilePath::new(&file_path)
+ .file_name()
+ .unwrap()
+ .to_string_lossy()
+ .to_string();
+
+ let collapsed_file_name = format!("{}.{}", file_name, COLLAPSED_SUFFIX);
+
+ let worker_node = srv
+ .cluster_manager
+ .get_worker_by_id(worker_id)
+ .await
+ .context("worker node not found")
+ .map_err(err)?
+ .worker_node;
+
+ let client = srv.compute_clients.get(&worker_node).await.map_err(err)?;
+
+ let collapsed_bin = client
+ .analyze_heap(file_path.clone())
+ .await
+ .map_err(err)?
+ .result;
+ let collapsed_str = String::from_utf8_lossy(&collapsed_bin).to_string();
+
+ let response = Response::builder()
+ .header("Content-Type", "application/octet-stream")
+ .header("Content-Disposition", collapsed_file_name)
+ .body(boxed(collapsed_str));
+
+ response.map_err(err)
+ }
}
impl DashboardService {
- pub async fn serve(self, ui_path: Option) -> Result<()> {
+ pub async fn serve(self) -> Result<()> {
use handlers::*;
+ let ui_path = self.ui_path.clone();
let srv = Arc::new(self);
let cors_layer = CorsLayer::new()
@@ -233,6 +325,12 @@ impl DashboardService {
get(prometheus::list_prometheus_actor_back_pressure),
)
.route("/monitor/await_tree/:worker_id", get(dump_await_tree))
+ .route("/monitor/dump_heap_profile/:worker_id", get(heap_profile))
+ .route(
+ "/monitor/list_heap_profile/:worker_id",
+ get(list_heap_profile),
+ )
+ .route("/monitor/analyze/:worker_id/*path", get(analyze_heap))
.layer(
ServiceBuilder::new()
.layer(AddExtensionLayer::new(srv.clone()))
diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs
index cb64adc13dae9..53000dee54f40 100644
--- a/src/meta/src/rpc/server.rs
+++ b/src/meta/src/rpc/server.rs
@@ -352,7 +352,7 @@ pub async fn start_service_as_election_leader(
) -> MetaResult<()> {
tracing::info!("Defining leader services");
let prometheus_endpoint = opts.prometheus_endpoint.clone();
- let env = MetaSrvEnv::new(opts, init_system_params, meta_store.clone()).await?;
+ let env = MetaSrvEnv::new(opts.clone(), init_system_params, meta_store.clone()).await?;
let fragment_manager = Arc::new(FragmentManager::new(env.clone()).await.unwrap());
let system_params_manager = env.system_params_manager_ref();
@@ -426,8 +426,9 @@ pub async fn start_service_as_election_leader(
fragment_manager: fragment_manager.clone(),
compute_clients: ComputeClientPool::default(),
meta_store: env.meta_store_ref(),
+ ui_path: address_info.ui_path,
};
- let task = tokio::spawn(dashboard_service.serve(address_info.ui_path));
+ let task = tokio::spawn(dashboard_service.serve());
Some(task)
} else {
None
diff --git a/src/rpc_client/src/compute_client.rs b/src/rpc_client/src/compute_client.rs
index aac767570052e..15516380bd418 100644
--- a/src/rpc_client/src/compute_client.rs
+++ b/src/rpc_client/src/compute_client.rs
@@ -27,7 +27,8 @@ use risingwave_pb::compute::config_service_client::ConfigServiceClient;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{
- HeapProfilingRequest, HeapProfilingResponse, ProfilingRequest, ProfilingResponse,
+ AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
+ ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
StackTraceRequest, StackTraceResponse,
};
use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient;
@@ -211,6 +212,24 @@ impl ComputeClient {
.into_inner())
}
+ pub async fn list_heap_profile(&self) -> Result {
+ Ok(self
+ .monitor_client
+ .to_owned()
+ .list_heap_profiling(ListHeapProfilingRequest {})
+ .await?
+ .into_inner())
+ }
+
+ pub async fn analyze_heap(&self, path: String) -> Result {
+ Ok(self
+ .monitor_client
+ .to_owned()
+ .analyze_heap(AnalyzeHeapRequest { path })
+ .await?
+ .into_inner())
+ }
+
pub async fn show_config(&self) -> Result {
Ok(self
.config_client
diff --git a/src/storage/compactor/src/rpc.rs b/src/storage/compactor/src/rpc.rs
index 80f146d30f5ed..d7f01115610f1 100644
--- a/src/storage/compactor/src/rpc.rs
+++ b/src/storage/compactor/src/rpc.rs
@@ -20,7 +20,8 @@ use risingwave_pb::compactor::compactor_service_server::CompactorService;
use risingwave_pb::compactor::{EchoRequest, EchoResponse};
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
- HeapProfilingRequest, HeapProfilingResponse, ProfilingRequest, ProfilingResponse,
+ AnalyzeHeapRequest, AnalyzeHeapResponse, HeapProfilingRequest, HeapProfilingResponse,
+ ListHeapProfilingRequest, ListHeapProfilingResponse, ProfilingRequest, ProfilingResponse,
StackTraceRequest, StackTraceResponse,
};
use tonic::{Request, Response, Status};
@@ -83,4 +84,22 @@ impl MonitorService for MonitorServiceImpl {
"Heap profiling unimplemented in compactor",
))
}
+
+ async fn list_heap_profiling(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ Err(Status::unimplemented(
+ "Heap profiling unimplemented in compactor",
+ ))
+ }
+
+ async fn analyze_heap(
+ &self,
+ _request: Request,
+ ) -> Result, Status> {
+ Err(Status::unimplemented(
+ "Heap profiling unimplemented in compactor",
+ ))
+ }
}