Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(memory): add jvm memory to compute node memory control #12965

Merged
merged 4 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3220,6 +3220,26 @@ def section_memory_manager(outer_panels):
),
],
),
panels.timeseries_memory(
"The allocated memory of jvm",
"",
[
panels.target(
f"{metric('jvm_allocated_bytes')}",
"",
),
],
),
panels.timeseries_memory(
"The active memory of jvm",
"",
[
panels.target(
f"{metric('jvm_active_bytes')}",
"",
),
],
),
panels.timeseries_ms(
"LRU manager diff between current watermark and evicted watermark time (ms) for actors",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ risingwave_common_heap_profiling = { workspace = true }
risingwave_common_service = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ impl GlobalMemoryManager {
let mut memory_control_stats = MemoryControlStats {
jemalloc_allocated_bytes: 0,
jemalloc_active_bytes: 0,
jvm_allocated_bytes: 0,
jvm_active_bytes: 0,
lru_watermark_step: 0,
lru_watermark_time_ms: Epoch::physical_now(),
lru_physical_now_ms: Epoch::physical_now(),
Expand Down Expand Up @@ -122,6 +124,12 @@ impl GlobalMemoryManager {
self.metrics
.jemalloc_active_bytes
.set(memory_control_stats.jemalloc_active_bytes as i64);
self.metrics
.jvm_allocated_bytes
.set(memory_control_stats.jvm_allocated_bytes as i64);
self.metrics
.jvm_active_bytes
.set(memory_control_stats.jvm_active_bytes as i64);
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 70;
pub struct MemoryControlStats {
pub jemalloc_allocated_bytes: usize,
pub jemalloc_active_bytes: usize,
pub jvm_allocated_bytes: usize,
pub jvm_active_bytes: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
Expand All @@ -68,9 +70,9 @@ pub trait MemoryControl: Send + Sync + std::fmt::Debug {
}

pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef {
use self::policy::JemallocMemoryControl;
use self::policy::JemallocAndJvmMemoryControl;

Box::new(JemallocMemoryControl::new(total_memory_bytes))
Box::new(JemallocAndJvmMemoryControl::new(total_memory_bytes))
}

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
Expand Down
24 changes: 15 additions & 9 deletions src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ use std::sync::Arc;

use risingwave_batch::task::BatchManager;
use risingwave_common::util::epoch::Epoch;
use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats;
use risingwave_stream::task::LocalStreamManager;
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};

use super::{MemoryControl, MemoryControlStats};

/// `JemallocMemoryControl` is a memory control policy that uses jemalloc statistics to control. It
/// assumes that most memory is used by streaming engine and does memory control over LRU watermark
/// based on jemalloc statistics.
pub struct JemallocMemoryControl {
/// `JemallocAndJvmMemoryControl` is a memory control policy that uses jemalloc statistics and
/// jvm memory statistics and to control. It assumes that most memory is used by streaming engine
/// and does memory control over LRU watermark based on jemalloc statistics and jvm memory statistics.
pub struct JemallocAndJvmMemoryControl {
threshold_stable: usize,
threshold_graceful: usize,
threshold_aggressive: usize,
Expand All @@ -35,7 +36,7 @@ pub struct JemallocMemoryControl {
jemalloc_active_mib: jemalloc_stats::active_mib,
}

impl JemallocMemoryControl {
impl JemallocAndJvmMemoryControl {
const THRESHOLD_AGGRESSIVE: f64 = 0.9;
const THRESHOLD_GRACEFUL: f64 = 0.8;
const THRESHOLD_STABLE: f64 = 0.7;
Expand Down Expand Up @@ -81,7 +82,7 @@ impl JemallocMemoryControl {
}
}

impl std::fmt::Debug for JemallocMemoryControl {
impl std::fmt::Debug for JemallocAndJvmMemoryControl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JemallocMemoryControl")
.field("threshold_stable", &self.threshold_stable)
Expand All @@ -91,7 +92,7 @@ impl std::fmt::Debug for JemallocMemoryControl {
}
}

impl MemoryControl for JemallocMemoryControl {
impl MemoryControl for JemallocAndJvmMemoryControl {
fn apply(
&self,
interval_ms: u32,
Expand All @@ -105,13 +106,15 @@ impl MemoryControl for JemallocMemoryControl {
prev_memory_stats.jemalloc_active_bytes,
);

let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();

// Streaming memory control
//
// We calculate the watermark of the LRU cache, which provides hints for streaming executors
// on cache eviction. Here we do the calculation based on jemalloc statistics.

let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark(
jemalloc_allocated_bytes,
jemalloc_allocated_bytes + jvm_allocated_bytes,
self.threshold_stable,
self.threshold_graceful,
self.threshold_aggressive,
Expand All @@ -124,6 +127,8 @@ impl MemoryControl for JemallocMemoryControl {
MemoryControlStats {
jemalloc_allocated_bytes,
jemalloc_active_bytes,
jvm_allocated_bytes,
jvm_active_bytes,
lru_watermark_step,
lru_watermark_time_ms,
lru_physical_now_ms: lru_physical_now,
Expand All @@ -141,7 +146,8 @@ fn calculate_lru_watermark(
) -> (u64, u64, u64) {
let mut watermark_time_ms = prev_memory_stats.lru_watermark_time_ms;
let last_step = prev_memory_stats.lru_watermark_step;
let last_used_memory_bytes = prev_memory_stats.jemalloc_allocated_bytes;
let last_used_memory_bytes =
prev_memory_stats.jemalloc_allocated_bytes + prev_memory_stats.jvm_allocated_bytes;

// The watermark calculation works in the following way:
//
Expand Down
34 changes: 34 additions & 0 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::fs;
use std::path::Path;
use std::sync::LazyLock;

use jni::objects::JValueOwned;
use jni::strings::JNIString;
use jni::{InitArgsBuilder, JNIVersion, JavaVM, NativeMethod};
use risingwave_common::error::{ErrorCode, RwError};
Expand Down Expand Up @@ -76,6 +77,7 @@ pub static JVM: LazyLock<Result<JavaVM, RwError>> = LazyLock::new(|| {
.option("-ea")
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));

tracing::info!("JVM args: {:?}", args_builder);
Expand Down Expand Up @@ -134,3 +136,35 @@ pub fn register_native_method_for_jvm(jvm: &JavaVM) -> Result<(), jni::errors::E
tracing::info!("register native methods for jvm successfully");
Ok(())
}

pub fn load_jvm_memory_stats() -> (usize, usize) {
let mut env = JVM.as_ref().unwrap().attach_current_thread().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the JVM is not loaded before, will this trigger JVM to load?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It will, so I set the initial heap memory of the JVM to be 16M.

Copy link
Member

@BugenZhao BugenZhao Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds not good. 😕 I have no idea why LazyLock does not expose its get() -> Option<_> method. Maybe we can use a global atomic value as an indicator? cc @wangrunji0408


UPDATE: We can combine OnceLock::get_or_init with OnceLock::get as a workaround.


let runtime_instance = env
.call_static_method(
"java/lang/Runtime",
"getRuntime",
"()Ljava/lang/Runtime;",
&[],
)
.unwrap();

let runtime_instance = match runtime_instance {
JValueOwned::Object(o) => o,
_ => unreachable!(),
};

let total_memory = env
.call_method(runtime_instance.as_ref(), "totalMemory", "()J", &[])
.unwrap()
.j()
.unwrap();

let free_memory = env
.call_method(runtime_instance, "freeMemory", "()J", &[])
.unwrap()
.j()
.unwrap();

(total_memory as usize, (total_memory - free_memory) as usize)
}
18 changes: 18 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub struct StreamingMetrics {
pub lru_evicted_watermark_time_ms: GenericGaugeVec<AtomicI64>,
pub jemalloc_allocated_bytes: IntGauge,
pub jemalloc_active_bytes: IntGauge,
pub jvm_allocated_bytes: IntGauge,
pub jvm_active_bytes: IntGauge,

/// User compute error reporting
pub user_compute_error_count: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -865,6 +867,20 @@ impl StreamingMetrics {
)
.unwrap();

let jvm_allocated_bytes = register_int_gauge_with_registry!(
"jvm_allocated_bytes",
"The allocated jvm memory",
registry
)
.unwrap();

let jvm_active_bytes = register_int_gauge_with_registry!(
"jvm_active_bytes",
"The active jvm memory",
registry
)
.unwrap();

let user_compute_error_count = register_int_counter_vec_with_registry!(
"user_compute_error_count",
"Compute errors in the system, queryable by tags",
Expand Down Expand Up @@ -994,6 +1010,8 @@ impl StreamingMetrics {
lru_evicted_watermark_time_ms,
jemalloc_allocated_bytes,
jemalloc_active_bytes,
jvm_allocated_bytes,
jvm_active_bytes,
user_compute_error_count,
user_source_reader_error_count,
materialize_cache_hit_count,
Expand Down
Loading