diff --git a/Cargo.lock b/Cargo.lock index 247dde691177..c63a80f591ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9250,6 +9250,7 @@ dependencies = [ "rand", "risingwave_batch", "risingwave_common", + "risingwave_common_heap_profiling", "risingwave_common_service", "risingwave_connector", "risingwave_expr", diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 3c9a1b62f94d..31f0450389c9 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -55,6 +55,7 @@ prometheus = { version = "0.13", features = ["process"] } rand = "0.8" risingwave_batch = { workspace = true } risingwave_common = { workspace = true } +risingwave_common_heap_profiling = { workspace = true } risingwave_common_service = { workspace = true } risingwave_connector = { workspace = true } risingwave_expr = { workspace = true } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index ee7dc4e0965a..aad2c79558cb 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -50,8 +50,10 @@ use risingwave_common::telemetry::telemetry_env_enabled; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; +use risingwave_common::util::resource_util; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_common::{GIT_SHA, RW_VERSION}; +use risingwave_common_heap_profiling::HeapProfiler; use risingwave_common_service::observer_manager::ObserverManager; use risingwave_common_service::MetricsManager; use risingwave_connector::source::monitor::{SourceMetrics, GLOBAL_SOURCE_METRICS}; @@ -365,6 +367,12 @@ impl FrontendEnv { }); join_handles.push(join_handle); + let total_memory_bytes = resource_util::memory::system_memory_available_bytes(); + let heap_profiler = + HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone()); + // Run a background heap profiler + heap_profiler.start(); + Ok(( Self { catalog_reader,