Skip to content

Commit

Permalink
feat(memory): add jvm memory to compute node memory control (#12965)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Oct 20, 2023
1 parent bbd2852 commit 38a066c
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 85 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3220,6 +3220,26 @@ def section_memory_manager(outer_panels):
),
],
),
panels.timeseries_memory(
"The allocated memory of jvm",
"",
[
panels.target(
f"{metric('jvm_allocated_bytes')}",
"",
),
],
),
panels.timeseries_memory(
"The active memory of jvm",
"",
[
panels.target(
f"{metric('jvm_active_bytes')}",
"",
),
],
),
panels.timeseries_ms(
"LRU manager diff between current watermark and evicted watermark time (ms) for actors",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ risingwave_common_heap_profiling = { workspace = true }
risingwave_common_service = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ impl GlobalMemoryManager {
let mut memory_control_stats = MemoryControlStats {
jemalloc_allocated_bytes: 0,
jemalloc_active_bytes: 0,
jvm_allocated_bytes: 0,
jvm_active_bytes: 0,
lru_watermark_step: 0,
lru_watermark_time_ms: Epoch::physical_now(),
lru_physical_now_ms: Epoch::physical_now(),
Expand Down Expand Up @@ -122,6 +124,12 @@ impl GlobalMemoryManager {
self.metrics
.jemalloc_active_bytes
.set(memory_control_stats.jemalloc_active_bytes as i64);
self.metrics
.jvm_allocated_bytes
.set(memory_control_stats.jvm_allocated_bytes as i64);
self.metrics
.jvm_active_bytes
.set(memory_control_stats.jvm_active_bytes as i64);
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 70;
pub struct MemoryControlStats {
pub jemalloc_allocated_bytes: usize,
pub jemalloc_active_bytes: usize,
pub jvm_allocated_bytes: usize,
pub jvm_active_bytes: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
Expand All @@ -68,9 +70,9 @@ pub trait MemoryControl: Send + Sync + std::fmt::Debug {
}

pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef {
use self::policy::JemallocMemoryControl;
use self::policy::JemallocAndJvmMemoryControl;

Box::new(JemallocMemoryControl::new(total_memory_bytes))
Box::new(JemallocAndJvmMemoryControl::new(total_memory_bytes))
}

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
Expand Down
24 changes: 15 additions & 9 deletions src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ use std::sync::Arc;

use risingwave_batch::task::BatchManager;
use risingwave_common::util::epoch::Epoch;
use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats;
use risingwave_stream::task::LocalStreamManager;
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};

use super::{MemoryControl, MemoryControlStats};

/// `JemallocMemoryControl` is a memory control policy that uses jemalloc statistics to control. It
/// assumes that most memory is used by streaming engine and does memory control over LRU watermark
/// based on jemalloc statistics.
pub struct JemallocMemoryControl {
/// `JemallocAndJvmMemoryControl` is a memory control policy that uses jemalloc statistics and
/// jvm memory statistics and to control. It assumes that most memory is used by streaming engine
/// and does memory control over LRU watermark based on jemalloc statistics and jvm memory statistics.
pub struct JemallocAndJvmMemoryControl {
threshold_stable: usize,
threshold_graceful: usize,
threshold_aggressive: usize,
Expand All @@ -35,7 +36,7 @@ pub struct JemallocMemoryControl {
jemalloc_active_mib: jemalloc_stats::active_mib,
}

impl JemallocMemoryControl {
impl JemallocAndJvmMemoryControl {
const THRESHOLD_AGGRESSIVE: f64 = 0.9;
const THRESHOLD_GRACEFUL: f64 = 0.8;
const THRESHOLD_STABLE: f64 = 0.7;
Expand Down Expand Up @@ -81,7 +82,7 @@ impl JemallocMemoryControl {
}
}

impl std::fmt::Debug for JemallocMemoryControl {
impl std::fmt::Debug for JemallocAndJvmMemoryControl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JemallocMemoryControl")
.field("threshold_stable", &self.threshold_stable)
Expand All @@ -91,7 +92,7 @@ impl std::fmt::Debug for JemallocMemoryControl {
}
}

impl MemoryControl for JemallocMemoryControl {
impl MemoryControl for JemallocAndJvmMemoryControl {
fn apply(
&self,
interval_ms: u32,
Expand All @@ -105,13 +106,15 @@ impl MemoryControl for JemallocMemoryControl {
prev_memory_stats.jemalloc_active_bytes,
);

let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();

// Streaming memory control
//
// We calculate the watermark of the LRU cache, which provides hints for streaming executors
// on cache eviction. Here we do the calculation based on jemalloc statistics.

let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark(
jemalloc_allocated_bytes,
jemalloc_allocated_bytes + jvm_allocated_bytes,
self.threshold_stable,
self.threshold_graceful,
self.threshold_aggressive,
Expand All @@ -124,6 +127,8 @@ impl MemoryControl for JemallocMemoryControl {
MemoryControlStats {
jemalloc_allocated_bytes,
jemalloc_active_bytes,
jvm_allocated_bytes,
jvm_active_bytes,
lru_watermark_step,
lru_watermark_time_ms,
lru_physical_now_ms: lru_physical_now,
Expand All @@ -141,7 +146,8 @@ fn calculate_lru_watermark(
) -> (u64, u64, u64) {
let mut watermark_time_ms = prev_memory_stats.lru_watermark_time_ms;
let last_step = prev_memory_stats.lru_watermark_step;
let last_used_memory_bytes = prev_memory_stats.jemalloc_allocated_bytes;
let last_used_memory_bytes =
prev_memory_stats.jemalloc_allocated_bytes + prev_memory_stats.jvm_allocated_bytes;

// The watermark calculation works in the following way:
//
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl<R: RemoteSinkTrait> Sink for RemoteSink<R> {
}).try_collect()?;

let mut env = JVM
.as_ref()
.get_or_init()
.map_err(|err| SinkError::Internal(err.into()))?
.attach_current_thread()
.map_err(|err| SinkError::Internal(err.into()))?;
Expand Down Expand Up @@ -380,7 +380,7 @@ impl<SM, R: RemoteSinkTrait> RemoteSinkWriterInner<SM, R> {
};

std::thread::spawn(move || {
let mut env = JVM.as_ref().unwrap().attach_current_thread().unwrap();
let mut env = JVM.get_or_init().unwrap().attach_current_thread().unwrap();

let result = env.call_static_method(
"com/risingwave/connector/JniSinkWriterHandler",
Expand Down Expand Up @@ -625,7 +625,7 @@ impl<R: RemoteSinkTrait> RemoteCoordinator<R> {
};

std::thread::spawn(move || {
let mut env = JVM.as_ref().unwrap().attach_current_thread().unwrap();
let mut env = JVM.get_or_init().unwrap().attach_current_thread().unwrap();

let result = env.call_static_method(
"com/risingwave/connector/JniSinkCoordinatorHandler",
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
SourceType::from(T::source_type())
);

let mut env = JVM.as_ref()?.attach_current_thread()?;
let mut env = JVM.get_or_init()?.attach_current_thread()?;

let validate_source_request = ValidateSourceRequest {
source_id: context.info.source_id as u64,
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::str::FromStr;
use std::sync::LazyLock;

use anyhow::{anyhow, Result};
use async_trait::async_trait;
Expand Down Expand Up @@ -121,7 +120,8 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {

let (tx, mut rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

LazyLock::force(&JVM).as_ref()?;
// Force init, because we don't want to see initialization failure in the following thread.
JVM.get_or_init()?;

let get_event_stream_request = GetEventStreamRequest {
source_id: self.source_id,
Expand All @@ -135,7 +135,7 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
let source_type = get_event_stream_request.source_type.to_string();

std::thread::spawn(move || {
let mut env = JVM.as_ref().unwrap().attach_current_thread().unwrap();
let mut env = JVM.get_or_init().unwrap().attach_current_thread().unwrap();

let get_event_stream_request_bytes = env
.byte_array_from_slice(&Message::encode_to_vec(&get_event_stream_request))
Expand Down
Loading

0 comments on commit 38a066c

Please sign in to comment.