Skip to content

Commit

Permalink
compactor
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Oct 9, 2023
1 parent 437d36e commit ea3aa99
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 89 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ strum = "0.25"
strum_macros = "0.25"
sysinfo = { version = "0.29", default-features = false }
thiserror = "1"
tikv-jemalloc-ctl = { workspace = true }
tinyvec = { version = "1", features = ["rustc_1_55", "grab_spare_slice"] }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
Expand Down
3 changes: 3 additions & 0 deletions src/common/src/heap_profiling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ pub const AUTO_DUMP_SUFFIX: &str = "auto.heap";
pub const COLLAPSED_SUFFIX: &str = "collapsed";

pub mod jeprof;
pub mod profiler;

pub use profiler::HeapProfiler;
115 changes: 115 additions & 0 deletions src/common/src/heap_profiling/profiler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ffi::CString;
use std::fs;
use std::path::Path;

use tikv_jemalloc_ctl::{
epoch as jemalloc_epoch, opt as jemalloc_opt, prof as jemalloc_prof, stats as jemalloc_stats,
};
use tokio::time::{self, Duration};

use super::AUTO_DUMP_SUFFIX;
use crate::config::HeapProfilingConfig;

pub struct HeapProfiler {
config: HeapProfilingConfig,
threshold_auto_dump_heap_profile: usize,
jemalloc_dump_mib: jemalloc_prof::dump_mib,
jemalloc_allocated_mib: jemalloc_stats::allocated_mib,
jemalloc_epoch_mib: tikv_jemalloc_ctl::epoch_mib,
/// If jemalloc profiling is enabled
opt_prof: bool,
}

impl HeapProfiler {
pub fn new(total_memory: usize, config: HeapProfilingConfig) -> Self {
let threshold_auto_dump_heap_profile =
(total_memory as f64 * config.threshold_auto as f64) as usize;
let jemalloc_dump_mib = jemalloc_prof::dump::mib().unwrap();
let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap();
let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap();
let opt_prof = jemalloc_opt::prof::read().unwrap();

fs::create_dir_all(&config.dir).unwrap();

Self {
config,
threshold_auto_dump_heap_profile,
jemalloc_dump_mib,
jemalloc_allocated_mib,
jemalloc_epoch_mib,
opt_prof,
}
}

fn dump_heap_prof(&self, cur_used_memory_bytes: usize, prev_used_memory_bytes: usize) {
if !self.config.enable_auto {
return;
}

if cur_used_memory_bytes > self.threshold_auto_dump_heap_profile
&& prev_used_memory_bytes <= self.threshold_auto_dump_heap_profile
{
if !self.opt_prof {
tracing::info!("Cannot dump heap profile because Jemalloc prof is not enabled");
return;
}

let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
let file_name = format!("{}.{}", time_prefix, AUTO_DUMP_SUFFIX);

let file_path = Path::new(&self.config.dir)
.join(&file_name)
.to_str()
.expect("file path is not valid utf8")
.to_owned();
let file_path_c = CString::new(file_path).expect("0 byte in file path");

// FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
if let Err(e) = self
.jemalloc_dump_mib
.write(unsafe { &*(file_path_c.as_c_str() as *const _) })
{
tracing::warn!("Auto Jemalloc dump heap file failed! {:?}", e);
} else {
tracing::info!("Successfully dumped heap profile to {}", file_name);
}
}
}

fn advance_jemalloc_epoch(&self, prev_jemalloc_allocated_bytes: usize) -> usize {
if let Err(e) = self.jemalloc_epoch_mib.advance() {
tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
}

self.jemalloc_allocated_mib.read().unwrap_or_else(|e| {
tracing::warn!("Jemalloc read allocated failed! {:?}", e);
prev_jemalloc_allocated_bytes
})
}

pub async fn start(self) {
let mut interval = time::interval(Duration::from_millis(500));
let mut prev_jemalloc_allocated_bytes = 0;
loop {
interval.tick().await;
let jemalloc_allocated_bytes =
self.advance_jemalloc_epoch(prev_jemalloc_allocated_bytes);
self.dump_heap_prof(jemalloc_allocated_bytes, prev_jemalloc_allocated_bytes);
prev_jemalloc_allocated_bytes = jemalloc_allocated_bytes;
}
}
}
12 changes: 2 additions & 10 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fs;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use risingwave_batch::task::BatchManager;
use risingwave_common::config::HeapProfilingConfig;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
Expand All @@ -44,16 +42,10 @@ impl GlobalMemoryManager {
// especially when it's 0.
const MIN_TICK_INTERVAL_MS: u32 = 10;

pub fn new(
metrics: Arc<StreamingMetrics>,
total_memory_bytes: usize,
heap_profiling_config: HeapProfilingConfig,
) -> Arc<Self> {
let memory_control_policy =
build_memory_control_policy(total_memory_bytes, heap_profiling_config.clone());
pub fn new(metrics: Arc<StreamingMetrics>, total_memory_bytes: usize) -> Arc<Self> {
let memory_control_policy = build_memory_control_policy(total_memory_bytes);
tracing::info!("memory control policy: {:?}", &memory_control_policy);

fs::create_dir_all(&heap_profiling_config.dir).unwrap();
Arc::new(Self {
watermark_epoch: Arc::new(0.into()),
metrics,
Expand Down
12 changes: 3 additions & 9 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_batch::task::BatchManager;
use risingwave_common::config::{HeapProfilingConfig, StorageConfig, StorageMemoryConfig};
use risingwave_common::config::{StorageConfig, StorageMemoryConfig};
use risingwave_common::util::pretty_bytes::convert;
use risingwave_stream::task::LocalStreamManager;

Expand Down Expand Up @@ -67,16 +67,10 @@ pub trait MemoryControl: Send + Sync + std::fmt::Debug {
) -> MemoryControlStats;
}

pub fn build_memory_control_policy(
total_memory_bytes: usize,
heap_profiling_config: HeapProfilingConfig,
) -> MemoryControlRef {
pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef {
use self::policy::JemallocMemoryControl;

Box::new(JemallocMemoryControl::new(
total_memory_bytes,
heap_profiling_config,
))
Box::new(JemallocMemoryControl::new(total_memory_bytes))
}

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
Expand Down
62 changes: 2 additions & 60 deletions src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ffi::CString;
use std::path::Path;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use chrono;
use risingwave_batch::task::BatchManager;
use risingwave_common::config::HeapProfilingConfig;
use risingwave_common::heap_profiling::AUTO_DUMP_SUFFIX;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::task::LocalStreamManager;
use tikv_jemalloc_ctl::{
epoch as jemalloc_epoch, opt as jemalloc_opt, prof as jemalloc_prof, stats as jemalloc_stats,
};
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};

use super::{MemoryControl, MemoryControlStats};

Expand All @@ -36,43 +29,33 @@ pub struct JemallocMemoryControl {
threshold_stable: usize,
threshold_graceful: usize,
threshold_aggressive: usize,
threshold_auto_dump_heap_profile: usize,

jemalloc_epoch_mib: tikv_jemalloc_ctl::epoch_mib,
jemalloc_allocated_mib: jemalloc_stats::allocated_mib,
jemalloc_active_mib: jemalloc_stats::active_mib,
jemalloc_dump_mib: jemalloc_prof::dump_mib,

heap_profiling_config: HeapProfilingConfig,
}

impl JemallocMemoryControl {
const THRESHOLD_AGGRESSIVE: f64 = 0.9;
const THRESHOLD_GRACEFUL: f64 = 0.8;
const THRESHOLD_STABLE: f64 = 0.7;

pub fn new(total_memory: usize, heap_profiling_config: HeapProfilingConfig) -> Self {
pub fn new(total_memory: usize) -> Self {
let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize;
let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize;
let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize;
let threshold_auto_dump_heap_profile =
(total_memory as f64 * heap_profiling_config.threshold_auto as f64) as usize;

let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap();
let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap();
let jemalloc_active_mib = jemalloc_stats::active::mib().unwrap();
let jemalloc_dump_mib = jemalloc_prof::dump::mib().unwrap();

Self {
threshold_stable,
threshold_graceful,
threshold_aggressive,
threshold_auto_dump_heap_profile,
jemalloc_epoch_mib,
jemalloc_allocated_mib,
jemalloc_active_mib,
jemalloc_dump_mib,
heap_profiling_config,
}
}

Expand All @@ -96,42 +79,6 @@ impl JemallocMemoryControl {
}),
)
}

fn dump_heap_prof(&self, cur_used_memory_bytes: usize, prev_used_memory_bytes: usize) {
if !self.heap_profiling_config.enable_auto {
return;
}

if cur_used_memory_bytes > self.threshold_auto_dump_heap_profile
&& prev_used_memory_bytes <= self.threshold_auto_dump_heap_profile
{
let opt_prof = jemalloc_opt::prof::read().unwrap();
if !opt_prof {
tracing::info!("Cannot dump heap profile because Jemalloc prof is not enabled");
return;
}

let time_prefix = chrono::Local::now().format("%Y-%m-%d-%H-%M-%S");
let file_name = format!("{}.{}", time_prefix, AUTO_DUMP_SUFFIX);

let file_path = Path::new(&self.heap_profiling_config.dir)
.join(&file_name)
.to_str()
.expect("file path is not valid utf8")
.to_owned();
let file_path_c = CString::new(file_path).expect("0 byte in file path");

// FIXME(yuhao): `unsafe` here because `jemalloc_dump_mib.write` requires static lifetime
if let Err(e) = self
.jemalloc_dump_mib
.write(unsafe { &*(file_path_c.as_c_str() as *const _) })
{
tracing::warn!("Auto Jemalloc dump heap file failed! {:?}", e);
} else {
tracing::info!("Successfully dumped heap profile to {}", file_name);
}
}
}
}

impl std::fmt::Debug for JemallocMemoryControl {
Expand All @@ -158,11 +105,6 @@ impl MemoryControl for JemallocMemoryControl {
prev_memory_stats.jemalloc_active_bytes,
);

self.dump_heap_prof(
jemalloc_allocated_bytes,
prev_memory_stats.jemalloc_allocated_bytes,
);

// Streaming memory control
//
// We calculate the watermark of the LRU cache, which provides hints for streaming executors
Expand Down
13 changes: 7 additions & 6 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use risingwave_common::config::{
load_config, AsyncStackTraceOption, MetricLevel, StorageMemoryConfig,
MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE,
};
use risingwave_common::heap_profiling::HeapProfiler;
use risingwave_common::monitor::connection::{RouterExt, TcpConfig};
use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand Down Expand Up @@ -284,19 +285,19 @@ pub async fn compute_node_serve(
let batch_mgr_clone = batch_mgr.clone();
let stream_mgr_clone = stream_mgr.clone();

let memory_mgr = GlobalMemoryManager::new(
streaming_metrics.clone(),
total_memory_bytes,
config.server.heap_profiling.clone(),
);
// Run a background memory monitor
let memory_mgr = GlobalMemoryManager::new(streaming_metrics.clone(), total_memory_bytes);
// Run a background memory manager
tokio::spawn(memory_mgr.clone().run(
batch_mgr_clone,
stream_mgr_clone,
system_params.barrier_interval_ms(),
system_params_manager.watch_params(),
));

let heap_profiler = HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
// Run a background heap profiler
tokio::spawn(heap_profiler.start());

let watermark_epoch = memory_mgr.get_watermark_epoch();
// Set back watermark epoch to stream mgr. Executor will read epoch from stream manager instead
// of lru manager.
Expand Down
Loading

0 comments on commit ea3aa99

Please sign in to comment.