Skip to content

Commit

Permalink
feat(risectl): risectl heap dump (#11444)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Aug 9, 2023
1 parent 6d4d5ac commit 77c0b1c
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/cpu-profiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ nperf flamegraph --merge-threads perf.data > perf.svg
## Profiling remote compute nodes
You can profile remote compute nodes from a local machine by simply type the following command.
```shell
./risedev ctl profile --sleep [seconds]
./risedev ctl profile cpu --sleep [seconds]
```
All compute nodes will be profile for a given `seconds` time and generated flame graph will be transferred to your local machine `.risingwave/profiling/`.

Expand Down
15 changes: 15 additions & 0 deletions docs/memory-profiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,21 @@ Afterwards, the memory dump should be outputted to the specified folder. Use `ku

</details>

<details>
<summary>1.4. Dump memory profile with risectl</summary>

You can manually dump a heap profiling with risectl for a compute node with Jemalloc profiling enabled (`MALLOC_CONF=prof:true`).

```shell
./risedev ctl profile heap --dir [dumped_file_dir]
```

The dumped files will be saved in the directory you specified.

Note: To profile compute nodes remotely, please make sure all remote nodes have a public IP address accessible from your local machine (where you are running `risedev`).

</details>



## Step 2 - Analyze with `jeprof`
Expand Down
10 changes: 10 additions & 0 deletions proto/monitor_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ message StackTraceResponse {
map<string, string> compaction_task_traces = 3;
}

// CPU profiling
message ProfilingRequest {
// How long the profiling should last.
uint64 sleep_s = 1;
Expand All @@ -22,7 +23,16 @@ message ProfilingResponse {
bytes result = 1;
}

// Heap profiling
message HeapProfilingRequest {
// The directory that the dumped file in
string dir = 1;
}

message HeapProfilingResponse {}

service MonitorService {
rpc StackTrace(StackTraceRequest) returns (StackTraceResponse);
rpc Profiling(ProfilingRequest) returns (ProfilingResponse);
rpc HeapProfiling(HeapProfilingRequest) returns (HeapProfilingResponse);
}
1 change: 0 additions & 1 deletion src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ tracing = "0.1"

[target.'cfg(target_os = "linux")'.dependencies]
tikv-jemalloc-ctl = { git = "https://github.com/yuhao-su/jemallocator.git", rev = "a0911601bb7bb263ca55c7ea161ef308fdc623f8" }

[target.'cfg(not(madsim))'.dependencies]
workspace-hack = { path = "../workspace-hack" }

Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl JemallocMemoryControl {
.jemalloc_dump_mib
.write(CStr::from_bytes_with_nul(file_path_bytes).unwrap())
{
tracing::warn!("Jemalloc dump heap file failed! {:?}", e);
tracing::warn!("Auto Jemalloc dump heap file failed! {:?}", e);
}
unsafe { Box::from_raw(file_path_ptr) };
}
Expand Down
58 changes: 56 additions & 2 deletions src/compute/src/rpc/service/monitor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ use std::time::Duration;

use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse,
HeapProfilingRequest, HeapProfilingResponse, ProfilingRequest, ProfilingResponse,
StackTraceRequest, StackTraceResponse,
};
use risingwave_stream::task::LocalStreamManager;
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -90,7 +91,6 @@ impl MonitorService for MonitorServiceImpl {
.build()
.unwrap();
tokio::time::sleep(Duration::from_secs(time)).await;
// let buf = SharedWriter::new(vec![]);
let mut buf = vec![];
match guard.report().build() {
Ok(report) => {
Expand All @@ -104,6 +104,60 @@ impl MonitorService for MonitorServiceImpl {
}
}
}

#[cfg(target_os = "linux")]
#[cfg_attr(coverage, no_coverage)]
async fn heap_profiling(
&self,
request: Request<HeapProfilingRequest>,
) -> Result<Response<HeapProfilingResponse>, Status> {
use std::ffi::CStr;
use std::fs::create_dir_all;
use std::path::PathBuf;

use tikv_jemalloc_ctl;

if !tikv_jemalloc_ctl::opt::prof::read().unwrap() {
return Err(Status::failed_precondition(
"Jemalloc profiling is not enabled on the node. Try start the node with `MALLOC_CONF=prof:true`",
));
}

let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S").to_string();
let file_name = format!("{}.risectl-dump-heap-prof.compute.dump\0", time_prefix,);
let dir = PathBuf::from(request.into_inner().get_dir());
create_dir_all(&dir)?;

let file_path_buf = dir.join(file_name);
let file_path = file_path_buf
.to_str()
.ok_or_else(|| Status::internal("The file dir is not a UTF-8 String"))?;

let file_path_str = Box::leak(file_path.to_string().into_boxed_str());
let file_path_bytes = unsafe { file_path_str.as_bytes_mut() };
let file_path_ptr = file_path_bytes.as_mut_ptr();
let response = if let Err(e) = tikv_jemalloc_ctl::prof::dump::write(
CStr::from_bytes_with_nul(file_path_bytes).unwrap(),
) {
tracing::warn!("Risectl Jemalloc dump heap file failed! {:?}", e);
Err(Status::internal(e.to_string()))
} else {
Ok(Response::new(HeapProfilingResponse {}))
};
unsafe { Box::from_raw(file_path_ptr) };
response
}

#[cfg(not(target_os = "linux"))]
#[cfg_attr(coverage, no_coverage)]
async fn heap_profiling(
&self,
_request: Request<HeapProfilingRequest>,
) -> Result<Response<HeapProfilingResponse>, Status> {
Err(Status::unimplemented(
"heap profiling is only implemented on Linux",
))
}
}

pub use grpc_middleware::*;
Expand Down
52 changes: 51 additions & 1 deletion src/ctl/src/cmd_impl/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::io::AsyncWriteExt;

use crate::CtlContext;

pub async fn profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<()> {
pub async fn cpu_profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

let workers = meta_client.get_cluster_info().await?.worker_nodes;
Expand Down Expand Up @@ -81,3 +81,53 @@ pub async fn profile(context: &CtlContext, sleep_s: u64) -> anyhow::Result<()> {

Ok(())
}

pub async fn heap_profile(context: &CtlContext, dir: String) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

let workers = meta_client.get_cluster_info().await?.worker_nodes;
let compute_nodes = workers
.into_iter()
.filter(|w| w.r#type() == WorkerType::ComputeNode);

let clients = ComputeClientPool::default();

let mut profile_futs = vec![];

// FIXME: the compute node may not be accessible directly from risectl, we may let the meta
// service collect the reports from all compute nodes in the future.
for cn in compute_nodes {
let client = clients.get(&cn).await?;
let dir = &dir;

let fut = async move {
let response = client.heap_profile(dir.clone()).await;
let host_addr = cn.get_host().expect("Should have host address");

let node_name = format!(
"compute-node-{}-{}",
host_addr.get_host().replace('.', "-"),
host_addr.get_port()
);

if let Err(err) = response {
tracing::error!(
"Failed to dump profile on {} with error {}",
node_name,
err.to_string()
);
}
Ok::<_, anyhow::Error>(())
};
profile_futs.push(fut);
}

try_join_all(profile_futs).await?;

println!(
"Profiling results are saved at {} on each compute nodes",
PathBuf::from(dir).display()
);

Ok(())
}
29 changes: 24 additions & 5 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,8 @@ enum Commands {
Trace,
// TODO(yuhao): profile other nodes
/// Commands for profilng the compute nodes
Profile {
#[clap(short, long = "sleep")]
sleep: u64,
},
#[clap(subcommand)]
Profile(ProfileCommands),
}

#[derive(clap::ValueEnum, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
Expand Down Expand Up @@ -416,6 +414,22 @@ enum MetaCommands {
},
}

#[derive(Subcommand, Clone, Debug)]
pub enum ProfileCommands {
/// CPU profile
Cpu {
/// The time to active profiling for (in seconds)
#[clap(short, long = "sleep")]
sleep: u64,
},
/// Heap profile
Heap {
/// The output directory of the dumped file
#[clap(long = "dir")]
dir: String,
},
}

pub async fn start(opts: CliOpts) -> Result<()> {
let context = CtlContext::default();
let result = start_impl(opts, &context).await;
Expand Down Expand Up @@ -579,7 +593,12 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
cmd_impl::meta::validate_source(context, props).await?
}
Commands::Trace => cmd_impl::trace::trace(context).await?,
Commands::Profile { sleep } => cmd_impl::profile::profile(context, sleep).await?,
Commands::Profile(ProfileCommands::Cpu { sleep }) => {
cmd_impl::profile::cpu_profile(context, sleep).await?
}
Commands::Profile(ProfileCommands::Heap { dir }) => {
cmd_impl::profile::heap_profile(context, dir).await?
}
Commands::Scale(ScaleCommands::Resize(resize)) => {
cmd_impl::scale::resize(context, resize).await?
}
Expand Down
12 changes: 11 additions & 1 deletion src/rpc_client/src/compute_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use risingwave_pb::compute::config_service_client::ConfigServiceClient;
use risingwave_pb::compute::{ShowConfigRequest, ShowConfigResponse};
use risingwave_pb::monitor_service::monitor_service_client::MonitorServiceClient;
use risingwave_pb::monitor_service::{
ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse,
HeapProfilingRequest, HeapProfilingResponse, ProfilingRequest, ProfilingResponse,
StackTraceRequest, StackTraceResponse,
};
use risingwave_pb::task_service::exchange_service_client::ExchangeServiceClient;
use risingwave_pb::task_service::task_service_client::TaskServiceClient;
Expand Down Expand Up @@ -192,6 +193,15 @@ impl ComputeClient {
.into_inner())
}

pub async fn heap_profile(&self, dir: String) -> Result<HeapProfilingResponse> {
Ok(self
.monitor_client
.to_owned()
.heap_profiling(HeapProfilingRequest { dir })
.await?
.into_inner())
}

pub async fn show_config(&self) -> Result<ShowConfigResponse> {
Ok(self
.config_client
Expand Down
14 changes: 12 additions & 2 deletions src/storage/compactor/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use risingwave_pb::compactor::compactor_service_server::CompactorService;
use risingwave_pb::compactor::{EchoRequest, EchoResponse};
use risingwave_pb::monitor_service::monitor_service_server::MonitorService;
use risingwave_pb::monitor_service::{
ProfilingRequest, ProfilingResponse, StackTraceRequest, StackTraceResponse,
HeapProfilingRequest, HeapProfilingResponse, ProfilingRequest, ProfilingResponse,
StackTraceRequest, StackTraceResponse,
};
use tonic::{Request, Response, Status};

Expand Down Expand Up @@ -70,7 +71,16 @@ impl MonitorService for MonitorServiceImpl {
_request: Request<ProfilingRequest>,
) -> Result<Response<ProfilingResponse>, Status> {
Err(Status::unimplemented(
"profiling unimplemented in compactor",
"CPU profiling unimplemented in compactor",
))
}

async fn heap_profiling(
&self,
_request: Request<HeapProfilingRequest>,
) -> Result<Response<HeapProfilingResponse>, Status> {
Err(Status::unimplemented(
"Heap profiling unimplemented in compactor",
))
}
}

0 comments on commit 77c0b1c

Please sign in to comment.