diff --git a/Cargo.lock b/Cargo.lock index 47e85c6a612b5..d72d92004b59b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7089,6 +7089,7 @@ dependencies = [ "sysinfo", "tempfile", "thiserror", + "tikv-jemalloc-ctl", "tinyvec", "toml 0.7.8", "tower-layer", diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 233945f94eeec..b69a9a81017b4 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -85,6 +85,7 @@ strum = "0.25" strum_macros = "0.25" sysinfo = { version = "0.29", default-features = false } thiserror = "1" +tikv-jemalloc-ctl = { workspace = true } tinyvec = { version = "1", features = ["rustc_1_55", "grab_spare_slice"] } tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", diff --git a/src/common/src/heap_profiling/mod.rs b/src/common/src/heap_profiling/mod.rs index 3aacd46b30134..f6ffb66d836d7 100644 --- a/src/common/src/heap_profiling/mod.rs +++ b/src/common/src/heap_profiling/mod.rs @@ -17,3 +17,6 @@ pub const AUTO_DUMP_SUFFIX: &str = "auto.heap"; pub const COLLAPSED_SUFFIX: &str = "collapsed"; pub mod jeprof; +pub mod profiler; + +pub use profiler::HeapProfiler; diff --git a/src/common/src/heap_profiling/profiler.rs b/src/common/src/heap_profiling/profiler.rs new file mode 100644 index 0000000000000..6349f9ed74e4f --- /dev/null +++ b/src/common/src/heap_profiling/profiler.rs @@ -0,0 +1,115 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ffi::CString; +use std::fs; +use std::path::Path; + +use tikv_jemalloc_ctl::{ + epoch as jemalloc_epoch, opt as jemalloc_opt, prof as jemalloc_prof, stats as jemalloc_stats, +}; +use tokio::time::{self, Duration}; + +use super::AUTO_DUMP_SUFFIX; +use crate::config::HeapProfilingConfig; + +pub struct HeapProfiler { + config: HeapProfilingConfig, + threshold_auto_dump_heap_profile: usize, + jemalloc_dump_mib: jemalloc_prof::dump_mib, + jemalloc_allocated_mib: jemalloc_stats::allocated_mib, + jemalloc_epoch_mib: tikv_jemalloc_ctl::epoch_mib, + /// If jemalloc profiling is enabled + opt_prof: bool, +} + +impl HeapProfiler { + pub fn new(total_memory: usize, config: HeapProfilingConfig) -> Self { + let threshold_auto_dump_heap_profile = + (total_memory as f64 * config.threshold_auto as f64) as usize; + let jemalloc_dump_mib = jemalloc_prof::dump::mib().unwrap(); + let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap(); + let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap(); + let opt_prof = jemalloc_opt::prof::read().unwrap(); + + fs::create_dir_all(&config.dir).unwrap(); + + Self { + config, + threshold_auto_dump_heap_profile, + jemalloc_dump_mib, + jemalloc_allocated_mib, + jemalloc_epoch_mib, + opt_prof, + } + } + + fn dump_heap_prof(&self, cur_used_memory_bytes: usize, prev_used_memory_bytes: usize) { + if !self.config.enable_auto { + return; + } + + if cur_used_memory_bytes > self.threshold_auto_dump_heap_profile + && prev_used_memory_bytes <= self.threshold_auto_dump_heap_profile + { + if !self.opt_prof { + tracing::info!("Cannot dump heap profile because Jemalloc prof is not enabled"); + return; + } + + let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S"); + let file_name = format!("{}.{}", time_prefix, AUTO_DUMP_SUFFIX); + + let file_path = Path::new(&self.config.dir) + .join(&file_name) + .to_str() + .expect("file path is not valid utf8") + .to_owned(); + let file_path_c = CString::new(file_path).expect("0 byte in file path"); + + // FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime + if let Err(e) = self + .jemalloc_dump_mib + .write(unsafe { &*(file_path_c.as_c_str() as *const _) }) + { + tracing::warn!("Auto Jemalloc dump heap file failed! {:?}", e); + } else { + tracing::info!("Successfully dumped heap profile to {}", file_name); + } + } + } + + fn advance_jemalloc_epoch(&self, prev_jemalloc_allocated_bytes: usize) -> usize { + if let Err(e) = self.jemalloc_epoch_mib.advance() { + tracing::warn!("Jemalloc epoch advance failed! {:?}", e); + } + + self.jemalloc_allocated_mib.read().unwrap_or_else(|e| { + tracing::warn!("Jemalloc read allocated failed! {:?}", e); + prev_jemalloc_allocated_bytes + }) + } + + pub async fn start(self) { + let mut interval = time::interval(Duration::from_millis(500)); + let mut prev_jemalloc_allocated_bytes = 0; + loop { + interval.tick().await; + let jemalloc_allocated_bytes = + self.advance_jemalloc_epoch(prev_jemalloc_allocated_bytes); + self.dump_heap_prof(jemalloc_allocated_bytes, prev_jemalloc_allocated_bytes); + prev_jemalloc_allocated_bytes = jemalloc_allocated_bytes; + } + } +} diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs index 4f011debd77d6..3e8e0fb6c0a47 100644 --- a/src/compute/src/memory_management/memory_manager.rs +++ b/src/compute/src/memory_management/memory_manager.rs @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fs; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Duration; use risingwave_batch::task::BatchManager; -use risingwave_common::config::HeapProfilingConfig; use risingwave_common::system_param::local_manager::SystemParamsReaderRef; use risingwave_common::util::epoch::Epoch; use risingwave_stream::executor::monitor::StreamingMetrics; @@ -44,16 +42,10 @@ impl GlobalMemoryManager { // especially when it's 0. const MIN_TICK_INTERVAL_MS: u32 = 10; - pub fn new( - metrics: Arc, - total_memory_bytes: usize, - heap_profiling_config: HeapProfilingConfig, - ) -> Arc { - let memory_control_policy = - build_memory_control_policy(total_memory_bytes, heap_profiling_config.clone()); + pub fn new(metrics: Arc, total_memory_bytes: usize) -> Arc { + let memory_control_policy = build_memory_control_policy(total_memory_bytes); tracing::info!("memory control policy: {:?}", &memory_control_policy); - fs::create_dir_all(&heap_profiling_config.dir).unwrap(); Arc::new(Self { watermark_epoch: Arc::new(0.into()), metrics, diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index 66f105a02607b..014de7144ef05 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -22,7 +22,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use risingwave_batch::task::BatchManager; -use risingwave_common::config::{HeapProfilingConfig, StorageConfig, StorageMemoryConfig}; +use risingwave_common::config::{StorageConfig, StorageMemoryConfig}; use risingwave_common::util::pretty_bytes::convert; use risingwave_stream::task::LocalStreamManager; @@ -67,16 +67,10 @@ pub trait MemoryControl: Send + Sync + std::fmt::Debug { ) -> MemoryControlStats; } -pub fn build_memory_control_policy( - total_memory_bytes: usize, - heap_profiling_config: HeapProfilingConfig, -) -> MemoryControlRef { +pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef { use self::policy::JemallocMemoryControl; - Box::new(JemallocMemoryControl::new( - total_memory_bytes, - heap_profiling_config, - )) + Box::new(JemallocMemoryControl::new(total_memory_bytes)) } /// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs index cea094b0b15d5..71e85c6149258 100644 --- a/src/compute/src/memory_management/policy.rs +++ b/src/compute/src/memory_management/policy.rs @@ -12,20 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ffi::CString; -use std::path::Path; use std::sync::atomic::AtomicU64; use std::sync::Arc; -use chrono; use risingwave_batch::task::BatchManager; -use risingwave_common::config::HeapProfilingConfig; -use risingwave_common::heap_profiling::AUTO_DUMP_SUFFIX; use risingwave_common::util::epoch::Epoch; use risingwave_stream::task::LocalStreamManager; -use tikv_jemalloc_ctl::{ - epoch as jemalloc_epoch, opt as jemalloc_opt, prof as jemalloc_prof, stats as jemalloc_stats, -}; +use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats}; use super::{MemoryControl, MemoryControlStats}; @@ -36,14 +29,10 @@ pub struct JemallocMemoryControl { threshold_stable: usize, threshold_graceful: usize, threshold_aggressive: usize, - threshold_auto_dump_heap_profile: usize, jemalloc_epoch_mib: tikv_jemalloc_ctl::epoch_mib, jemalloc_allocated_mib: jemalloc_stats::allocated_mib, jemalloc_active_mib: jemalloc_stats::active_mib, - jemalloc_dump_mib: jemalloc_prof::dump_mib, - - heap_profiling_config: HeapProfilingConfig, } impl JemallocMemoryControl { @@ -51,28 +40,22 @@ impl JemallocMemoryControl { const THRESHOLD_GRACEFUL: f64 = 0.8; const THRESHOLD_STABLE: f64 = 0.7; - pub fn new(total_memory: usize, heap_profiling_config: HeapProfilingConfig) -> Self { + pub fn new(total_memory: usize) -> Self { let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize; let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize; let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize; - let threshold_auto_dump_heap_profile = - (total_memory as f64 * heap_profiling_config.threshold_auto as f64) as usize; let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap(); let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap(); let jemalloc_active_mib = jemalloc_stats::active::mib().unwrap(); - let jemalloc_dump_mib = jemalloc_prof::dump::mib().unwrap(); Self { threshold_stable, threshold_graceful, threshold_aggressive, - threshold_auto_dump_heap_profile, jemalloc_epoch_mib, jemalloc_allocated_mib, jemalloc_active_mib, - jemalloc_dump_mib, - heap_profiling_config, } } @@ -96,42 +79,6 @@ impl JemallocMemoryControl { }), ) } - - fn dump_heap_prof(&self, cur_used_memory_bytes: usize, prev_used_memory_bytes: usize) { - if !self.heap_profiling_config.enable_auto { - return; - } - - if cur_used_memory_bytes > self.threshold_auto_dump_heap_profile - && prev_used_memory_bytes <= self.threshold_auto_dump_heap_profile - { - let opt_prof = jemalloc_opt::prof::read().unwrap(); - if !opt_prof { - tracing::info!("Cannot dump heap profile because Jemalloc prof is not enabled"); - return; - } - - let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S"); - let file_name = format!("{}.{}", time_prefix, AUTO_DUMP_SUFFIX); - - let file_path = Path::new(&self.heap_profiling_config.dir) - .join(&file_name) - .to_str() - .expect("file path is not valid utf8") - .to_owned(); - let file_path_c = CString::new(file_path).expect("0 byte in file path"); - - // FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime - if let Err(e) = self - .jemalloc_dump_mib - .write(unsafe { &*(file_path_c.as_c_str() as *const _) }) - { - tracing::warn!("Auto Jemalloc dump heap file failed! {:?}", e); - } else { - tracing::info!("Successfully dumped heap profile to {}", file_name); - } - } - } } impl std::fmt::Debug for JemallocMemoryControl { @@ -158,11 +105,6 @@ impl MemoryControl for JemallocMemoryControl { prev_memory_stats.jemalloc_active_bytes, ); - self.dump_heap_prof( - jemalloc_allocated_bytes, - prev_memory_stats.jemalloc_allocated_bytes, - ); - // Streaming memory control // // We calculate the watermark of the LRU cache, which provides hints for streaming executors diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 28a63ddf2b7e7..bc9b9d4ab2e95 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -26,6 +26,7 @@ use risingwave_common::config::{ load_config, AsyncStackTraceOption, MetricLevel, StorageMemoryConfig, MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE, }; +use risingwave_common::heap_profiling::HeapProfiler; use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::telemetry::manager::TelemetryManager; @@ -284,12 +285,8 @@ pub async fn compute_node_serve( let batch_mgr_clone = batch_mgr.clone(); let stream_mgr_clone = stream_mgr.clone(); - let memory_mgr = GlobalMemoryManager::new( - streaming_metrics.clone(), - total_memory_bytes, - config.server.heap_profiling.clone(), - ); - // Run a background memory monitor + let memory_mgr = GlobalMemoryManager::new(streaming_metrics.clone(), total_memory_bytes); + // Run a background memory manager tokio::spawn(memory_mgr.clone().run( batch_mgr_clone, stream_mgr_clone, @@ -297,6 +294,10 @@ pub async fn compute_node_serve( system_params_manager.watch_params(), )); + let heap_profiler = HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); + // Run a background heap profiler + tokio::spawn(heap_profiler.start()); + let watermark_epoch = memory_mgr.get_watermark_epoch(); // Set back watermark epoch to stream mgr. Executor will read epoch from stream manager instead // of lru manager. diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 779eec208f72c..228211b30b949 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -21,6 +21,7 @@ use parking_lot::RwLock; use risingwave_common::config::{ extract_storage_memory_config, load_config, AsyncStackTraceOption, MetricLevel, RwConfig, }; +use risingwave_common::heap_profiling::HeapProfiler; use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::system_param::local_manager::LocalSystemParamsManager; use risingwave_common::system_param::reader::SystemParamsReader; @@ -69,6 +70,7 @@ pub async fn prepare_start_parameters( ) -> ( Arc, Arc, + HeapProfiler, Option>>>, Arc, Arc, @@ -142,6 +144,11 @@ pub async fn prepare_start_parameters( storage_memory_config, )); + let heap_profiler = HeapProfiler::new( + total_memory_available_bytes, + config.server.heap_profiling.clone(), + ); + monitor_cache(memory_collector); let await_tree_config = match &config.streaming.async_stack_trace { @@ -157,6 +164,7 @@ pub async fn prepare_start_parameters( ( sstable_store, memory_limiter, + heap_profiler, await_tree_reg, storage_opts, compactor_metrics, @@ -199,8 +207,14 @@ pub async fn compactor_serve( hummock_metrics.clone(), )); - let (sstable_store, memory_limiter, await_tree_reg, storage_opts, compactor_metrics) = - prepare_start_parameters(config.clone(), system_params_reader.clone()).await; + let ( + sstable_store, + memory_limiter, + heap_profiler, + await_tree_reg, + storage_opts, + compactor_metrics, + ) = prepare_start_parameters(config.clone(), system_params_reader.clone()).await; let filter_key_extractor_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new( RemoteTableAccessor::new(meta_client.clone()), @@ -213,6 +227,9 @@ pub async fn compactor_serve( let observer_manager = ObserverManager::new_with_meta_client(meta_client.clone(), compactor_observer_node).await; + // Run a background heap profiler + tokio::spawn(heap_profiler.start()); + // use half of limit because any memory which would hold in meta-cache will be allocate by // limited at first. let observer_join_handle = observer_manager.start().await; @@ -336,12 +353,22 @@ pub async fn shared_compactor_serve( .expect("Fail to get system params, the compactor pod cannot be started."); let system_params = system_params_response.into_inner().params.unwrap(); - let (sstable_store, memory_limiter, await_tree_reg, storage_opts, compactor_metrics) = - prepare_start_parameters(config.clone(), system_params.into()).await; + let ( + sstable_store, + memory_limiter, + heap_profiler, + await_tree_reg, + storage_opts, + compactor_metrics, + ) = prepare_start_parameters(config.clone(), system_params.into()).await; let (sender, receiver) = mpsc::unbounded_channel(); let compactor_srv: CompactorServiceImpl = CompactorServiceImpl::new(sender); let monitor_srv = MonitorServiceImpl::new(await_tree_reg.clone()); + + // Run a background heap profiler + tokio::spawn(heap_profiler.start()); + let (shutdown_send, mut shutdown_recv) = tokio::sync::oneshot::channel(); let compactor_context = CompactorContext { storage_opts,