Skip to content

Commit

Permalink
add jvm memory to compute node memory control
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Oct 20, 2023
1 parent d7e8c7e commit 5b151f1
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3220,6 +3220,26 @@ def section_memory_manager(outer_panels):
),
],
),
panels.timeseries_memory(
"The allocated memory of jvm",
"",
[
panels.target(
f"{metric('jvm_allocated_bytes')}",
"",
),
],
),
panels.timeseries_memory(
"The active memory of jvm",
"",
[
panels.target(
f"{metric('jvm_active_bytes')}",
"",
),
],
),
panels.timeseries_ms(
"LRU manager diff between current watermark and evicted watermark time (ms) for actors",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ risingwave_common_heap_profiling = { workspace = true }
risingwave_common_service = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_hummock_sdk = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ impl GlobalMemoryManager {
let mut memory_control_stats = MemoryControlStats {
jemalloc_allocated_bytes: 0,
jemalloc_active_bytes: 0,
jvm_allocated_bytes: 0,
jvm_active_bytes: 0,
lru_watermark_step: 0,
lru_watermark_time_ms: Epoch::physical_now(),
lru_physical_now_ms: Epoch::physical_now(),
Expand Down Expand Up @@ -122,6 +124,12 @@ impl GlobalMemoryManager {
self.metrics
.jemalloc_active_bytes
.set(memory_control_stats.jemalloc_active_bytes as i64);
self.metrics
.jvm_allocated_bytes
.set(memory_control_stats.jvm_allocated_bytes as i64);
self.metrics
.jvm_active_bytes
.set(memory_control_stats.jvm_active_bytes as i64);
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 70;
pub struct MemoryControlStats {
pub jemalloc_allocated_bytes: usize,
pub jemalloc_active_bytes: usize,
pub jvm_allocated_bytes: usize,
pub jvm_active_bytes: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
Expand All @@ -68,9 +70,9 @@ pub trait MemoryControl: Send + Sync + std::fmt::Debug {
}

pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef {
use self::policy::JemallocMemoryControl;
use self::policy::JemallocAndJvmMemoryControl;

Box::new(JemallocMemoryControl::new(total_memory_bytes))
Box::new(JemallocAndJvmMemoryControl::new(total_memory_bytes))
}

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
Expand Down
24 changes: 15 additions & 9 deletions src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ use std::sync::Arc;

use risingwave_batch::task::BatchManager;
use risingwave_common::util::epoch::Epoch;
use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats;
use risingwave_stream::task::LocalStreamManager;
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};

use super::{MemoryControl, MemoryControlStats};

/// `JemallocMemoryControl` is a memory control policy that uses jemalloc statistics to control. It
/// assumes that most memory is used by streaming engine and does memory control over LRU watermark
/// based on jemalloc statistics.
pub struct JemallocMemoryControl {
/// `JemallocAndJvmMemoryControl` is a memory control policy that uses jemalloc statistics and
/// jvm memory statistics and to control. It assumes that most memory is used by streaming engine
/// and does memory control over LRU watermark based on jemalloc statistics and jvm memory statistics.
pub struct JemallocAndJvmMemoryControl {
threshold_stable: usize,
threshold_graceful: usize,
threshold_aggressive: usize,
Expand All @@ -35,7 +36,7 @@ pub struct JemallocMemoryControl {
jemalloc_active_mib: jemalloc_stats::active_mib,
}

impl JemallocMemoryControl {
impl JemallocAndJvmMemoryControl {
const THRESHOLD_AGGRESSIVE: f64 = 0.9;
const THRESHOLD_GRACEFUL: f64 = 0.8;
const THRESHOLD_STABLE: f64 = 0.7;
Expand Down Expand Up @@ -81,7 +82,7 @@ impl JemallocMemoryControl {
}
}

impl std::fmt::Debug for JemallocMemoryControl {
impl std::fmt::Debug for JemallocAndJvmMemoryControl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JemallocMemoryControl")
.field("threshold_stable", &self.threshold_stable)
Expand All @@ -91,7 +92,7 @@ impl std::fmt::Debug for JemallocMemoryControl {
}
}

impl MemoryControl for JemallocMemoryControl {
impl MemoryControl for JemallocAndJvmMemoryControl {
fn apply(
&self,
interval_ms: u32,
Expand All @@ -105,13 +106,15 @@ impl MemoryControl for JemallocMemoryControl {
prev_memory_stats.jemalloc_active_bytes,
);

let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();

// Streaming memory control
//
// We calculate the watermark of the LRU cache, which provides hints for streaming executors
// on cache eviction. Here we do the calculation based on jemalloc statistics.

let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark(
jemalloc_allocated_bytes,
jemalloc_allocated_bytes + jvm_allocated_bytes,
self.threshold_stable,
self.threshold_graceful,
self.threshold_aggressive,
Expand All @@ -124,6 +127,8 @@ impl MemoryControl for JemallocMemoryControl {
MemoryControlStats {
jemalloc_allocated_bytes,
jemalloc_active_bytes,
jvm_allocated_bytes,
jvm_active_bytes,
lru_watermark_step,
lru_watermark_time_ms,
lru_physical_now_ms: lru_physical_now,
Expand All @@ -141,7 +146,8 @@ fn calculate_lru_watermark(
) -> (u64, u64, u64) {
let mut watermark_time_ms = prev_memory_stats.lru_watermark_time_ms;
let last_step = prev_memory_stats.lru_watermark_step;
let last_used_memory_bytes = prev_memory_stats.jemalloc_allocated_bytes;
let last_used_memory_bytes =
prev_memory_stats.jemalloc_allocated_bytes + prev_memory_stats.jvm_allocated_bytes;

// The watermark calculation works in the following way:
//
Expand Down
34 changes: 34 additions & 0 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::fs;
use std::path::Path;
use std::sync::LazyLock;

use jni::objects::JValueOwned;
use jni::strings::JNIString;
use jni::{InitArgsBuilder, JNIVersion, JavaVM, NativeMethod};
use risingwave_common::error::{ErrorCode, RwError};
Expand Down Expand Up @@ -76,6 +77,7 @@ pub static JVM: LazyLock<Result<JavaVM, RwError>> = LazyLock::new(|| {
.option("-ea")
.option("-Dis_embedded_connector=true")
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size));

tracing::info!("JVM args: {:?}", args_builder);
Expand Down Expand Up @@ -134,3 +136,35 @@ pub fn register_native_method_for_jvm(jvm: &JavaVM) -> Result<(), jni::errors::E
tracing::info!("register native methods for jvm successfully");
Ok(())
}

pub fn load_jvm_memory_stats() -> (usize, usize) {
let mut env = JVM.as_ref().unwrap().attach_current_thread().unwrap();

let runtime_instance = env
.call_static_method(
"java/lang/Runtime",
"getRuntime",
"()Ljava/lang/Runtime;",
&[],
)
.unwrap();

let runtime_instance = match runtime_instance {
JValueOwned::Object(o) => o,
_ => unreachable!(),
};

let total_memory = env
.call_method(runtime_instance.as_ref(), "totalMemory", "()J", &[])
.unwrap()
.j()
.unwrap();

let free_memory = env
.call_method(runtime_instance, "freeMemory", "()J", &[])
.unwrap()
.j()
.unwrap();

(total_memory as usize, (total_memory - free_memory) as usize)
}
18 changes: 18 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ pub struct StreamingMetrics {
pub lru_evicted_watermark_time_ms: GenericGaugeVec<AtomicI64>,
pub jemalloc_allocated_bytes: IntGauge,
pub jemalloc_active_bytes: IntGauge,
pub jvm_allocated_bytes: IntGauge,
pub jvm_active_bytes: IntGauge,

/// User compute error reporting
pub user_compute_error_count: GenericCounterVec<AtomicU64>,
Expand Down Expand Up @@ -865,6 +867,20 @@ impl StreamingMetrics {
)
.unwrap();

let jvm_allocated_bytes = register_int_gauge_with_registry!(
"jvm_allocated_bytes",
"The allocated jvm memory",
registry
)
.unwrap();

let jvm_active_bytes = register_int_gauge_with_registry!(
"jvm_active_bytes",
"The active jvm memory",
registry
)
.unwrap();

let user_compute_error_count = register_int_counter_vec_with_registry!(
"user_compute_error_count",
"Compute errors in the system, queryable by tags",
Expand Down Expand Up @@ -994,6 +1010,8 @@ impl StreamingMetrics {
lru_evicted_watermark_time_ms,
jemalloc_allocated_bytes,
jemalloc_active_bytes,
jvm_allocated_bytes,
jvm_active_bytes,
user_compute_error_count,
user_source_reader_error_count,
materialize_cache_hit_count,
Expand Down

0 comments on commit 5b151f1

Please sign in to comment.