Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit a3ea7c0
Author: Eric Fu <[email protected]>
Date:   Sat Nov 25 11:00:48 2023 +0800

    refactor: memory management (#13636)

commit 2348a2b
Author: Bugen Zhao <[email protected]>
Date:   Fri Nov 24 18:26:48 2023 +0800

    fix(streaming): use correct label for `stream_fragment_exchange_bytes` metrics (#13644)

    Signed-off-by: Bugen Zhao <[email protected]>

commit 3ccb249
Author: Runji Wang <[email protected]>
Date:   Fri Nov 24 17:39:12 2023 +0800

    fix: estimate jsonb's value encoding size (#13643)

    Signed-off-by: Runji Wang <[email protected]>

commit 7b21e04
Author: Dylan <[email protected]>
Date:   Fri Nov 24 16:54:38 2023 +0800

    feat(optimizer): improve inline session timezone in exprs (#13640)

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Nov 27, 2023
1 parent ba49e30 commit bebc963
Show file tree
Hide file tree
Showing 19 changed files with 388 additions and 444 deletions.
2 changes: 1 addition & 1 deletion src/common/src/estimate_size/collections/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::{AtomicMutGuard, MutGuard};
use crate::estimate_size::{EstimateSize, KvSize};

/// The managed cache is a lru cache that bounds the memory usage by epoch.
/// Should be used with `GlobalMemoryManager`.
/// Should be used with `MemoryManager`.
pub struct EstimatedLruCache<K, V, S = DefaultHasher, A: Clone + Allocator = Global> {
inner: LruCache<K, V, S, A>,
kv_heap_size: KvSize,
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ impl<'a> JsonbRef<'a> {
Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b" "));
self.0.serialize(&mut ser).map_err(|_| std::fmt::Error)
}

/// Returns the capacity of the underlying buffer.
pub fn capacity(self) -> usize {
self.0.capacity()
}
}

/// A custom implementation for [`serde_json::ser::Formatter`] to match PostgreSQL, which adds extra
Expand Down
4 changes: 2 additions & 2 deletions src/common/src/util/value_encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option<usize> {
ArrayImpl::Float32(_) => Some(4),
ArrayImpl::Float64(_) => Some(8),
ArrayImpl::Bool(_) => Some(1),
ArrayImpl::Jsonb(_) => Some(8),
ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()),
ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()),
ArrayImpl::Date(_) => Some(estimate_serialize_date_size()),
Expand Down Expand Up @@ -246,7 +245,8 @@ fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize {
ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(),
ScalarRefImpl::Timestamptz(_) => 8,
ScalarRefImpl::Time(_) => estimate_serialize_time_size(),
ScalarRefImpl::Jsonb(_) => 8,
// not exact as we use internal encoding size to estimate the json string size
ScalarRefImpl::Jsonb(v) => v.capacity(),
ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s),
ScalarRefImpl::List(v) => estimate_serialize_list_size(v),
}
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#[macro_use]
extern crate tracing;

pub mod memory_management;
pub mod memory;
pub mod observer;
pub mod rpc;
pub mod server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,86 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod memory_manager;

// Only enable the non-trivial policies on Linux as it relies on statistics from `jemalloc-ctl`
// which might be inaccurate on other platforms.
pub mod policy;

use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_batch::task::BatchManager;
use risingwave_common::config::{StorageConfig, StorageMemoryConfig};
use risingwave_common::util::pretty_bytes::convert;
use risingwave_stream::task::LocalStreamManager;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
/// The memory reserved for system usage (stack and code segment of processes, allocation
/// overhead, network buffer, etc.) in megabytes.
pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512;
pub const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.2;

pub const STORAGE_MEMORY_PROPORTION: f64 = 0.3;

pub const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;

pub const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;

pub const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096;
pub const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
pub const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50;

/// `MemoryControlStats` contains the state from previous control loop
#[derive(Default)]
pub struct MemoryControlStats {
pub jemalloc_allocated_bytes: usize,
pub jemalloc_active_bytes: usize,
pub jvm_allocated_bytes: usize,
pub jvm_active_bytes: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
}

pub type MemoryControlRef = Box<dyn MemoryControl>;

pub trait MemoryControl: Send + Sync + std::fmt::Debug {
fn apply(
&self,
interval_ms: u32,
prev_memory_stats: MemoryControlStats,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats;
}

pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef {
use self::policy::JemallocAndJvmMemoryControl;
const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.2;

Box::new(JemallocAndJvmMemoryControl::new(total_memory_bytes))
}

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
/// is disabled on non-Linux OS.
#[derive(Debug)]
pub struct DummyPolicy;
const STORAGE_MEMORY_PROPORTION: f64 = 0.3;

impl MemoryControl for DummyPolicy {
fn apply(
&self,
_interval_ms: u32,
_prev_memory_stats: MemoryControlStats,
_batch_manager: Arc<BatchManager>,
_stream_manager: Arc<LocalStreamManager>,
_watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats {
MemoryControlStats::default()
}
}
const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1;
const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3;
const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096;
const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35;
const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3;
const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50;

/// Each compute node reserves some memory for stack and code segment of processes, allocation
/// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory
Expand Down
198 changes: 198 additions & 0 deletions src/compute/src/memory/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use risingwave_common::util::epoch::Epoch;
use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats;
use risingwave_stream::executor::monitor::StreamingMetrics;

/// Internal state of [`LruWatermarkController`] that saves the state in previous tick.
struct State {
pub used_memory_bytes: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
}

impl Default for State {
fn default() -> Self {
let physical_now = Epoch::physical_now();
Self {
used_memory_bytes: 0,
lru_watermark_step: 0,
lru_watermark_time_ms: physical_now,
}
}
}

/// `LruWatermarkController` controls LRU Watermark (epoch) according to actual memory usage statistics
/// collected from Jemalloc and JVM.
///
/// Basically, it works as a negative feedback loop: collect memory statistics, and then set the LRU watarmarking
/// according to maintain the memory usage in a proper level.
///
/// ```text
/// ┌───────────────────┐ ┌───────────────────┐
/// │ INPUT │ │ OUTPUT │
/// ┌───► (observe) ├───────►│ (control) ├───┐
/// │ │ Memory Statistics │ │ New LRU Watermark │ │
/// │ └───────────────────┘ └───────────────────┘ │
/// │ │
/// └────────────────────────────────────────────────────────┘
/// ```
///
/// Check the function [`Self::tick()`] to see the control policy.
pub struct LruWatermarkController {
metrics: Arc<StreamingMetrics>,

threshold_stable: usize,
threshold_graceful: usize,
threshold_aggressive: usize,

/// The state from previous tick
state: State,
}

impl LruWatermarkController {
// TODO(eric): make them configurable
const THRESHOLD_AGGRESSIVE: f64 = 0.9;
const THRESHOLD_GRACEFUL: f64 = 0.8;
const THRESHOLD_STABLE: f64 = 0.7;

pub fn new(total_memory: usize, metrics: Arc<StreamingMetrics>) -> Self {
let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize;
let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize;
let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize;

Self {
metrics,
threshold_stable,
threshold_graceful,
threshold_aggressive,
state: State::default(),
}
}
}

impl std::fmt::Debug for LruWatermarkController {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LruWatermarkController")
.field("threshold_stable", &self.threshold_stable)
.field("threshold_graceful", &self.threshold_graceful)
.field("threshold_aggressive", &self.threshold_aggressive)
.finish()
}
}

/// Get memory statistics from Jemalloc
///
/// - `stats.allocated`: Total number of bytes allocated by the application.
/// - `stats.active`: Total number of bytes in active pages allocated by the application. This is a multiple of the page size, and greater than or equal to `stats.allocated`. This does not include `stats.arenas.<i>.pdirty`, `stats.arenas.<i>.pmuzzy`, nor pages entirely devoted to allocator metadata.
///
/// Reference: <https://jemalloc.net/jemalloc.3.html>
fn jemalloc_memory_stats() -> (usize, usize) {
if let Err(e) = tikv_jemalloc_ctl::epoch::advance() {
tracing::warn!("Jemalloc epoch advance failed! {:?}", e);
}
let allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap();
let active = tikv_jemalloc_ctl::stats::active::read().unwrap();
(allocated, active)
}

impl LruWatermarkController {
pub fn tick(&mut self, interval_ms: u32) -> Epoch {
// NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM
let (jemalloc_allocated_bytes, jemalloc_active_bytes) = jemalloc_memory_stats();
let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats();

let cur_used_memory_bytes = jemalloc_active_bytes + jvm_allocated_bytes;

let last_step = self.state.lru_watermark_step;
let last_used_memory_bytes = self.state.used_memory_bytes;

// The watermark calculation works in the following way:
//
// 1. When the streaming memory usage is below the graceful threshold, we do not evict
// any caches, and simply reset the step to 0.
//
// 2. When the memory usage is between the graceful and aggressive threshold:
// - If the last eviction memory usage decreases after last eviction, we set the eviction step
// to 1.
// - Otherwise, we set the step to last_step + 1.
//
// 3. When the memory usage exceeds the aggressive threshold:
// - If the memory usage decreases after the last eviction, we set the eviction step to
// last_step.
// - Otherwise, we set the step to last_step * 2.
let mut step = if cur_used_memory_bytes < self.threshold_stable {
// Do not evict if the memory usage is lower than `threshold_stable`
0
} else if cur_used_memory_bytes < self.threshold_graceful {
// Evict in equal speed of time before `threshold_graceful`
1
} else if cur_used_memory_bytes < self.threshold_aggressive {
// Gracefully evict
if last_used_memory_bytes > cur_used_memory_bytes {
1
} else {
last_step + 1
}
} else if last_used_memory_bytes < cur_used_memory_bytes {
// Aggressively evict
if last_step == 0 {
2
} else {
last_step * 2
}
} else {
last_step
};

let physical_now = Epoch::physical_now();
let watermark_time_ms =
if (physical_now - self.state.lru_watermark_time_ms) / (interval_ms as u64) < step {
// We do not increase the step and watermark here to prevent a too-advanced watermark. The
// original condition is `prev_watermark_time_ms + interval_ms * step > now`.
step = last_step;
physical_now
} else {
// Increase by (steps * interval)
self.state.lru_watermark_time_ms + (interval_ms as u64 * step)
};

self.state = State {
used_memory_bytes: cur_used_memory_bytes,
lru_watermark_step: step,
lru_watermark_time_ms: watermark_time_ms,
};

self.metrics
.lru_current_watermark_time_ms
.set(watermark_time_ms as i64);
self.metrics.lru_physical_now_ms.set(physical_now as i64);
self.metrics.lru_watermark_step.set(step as i64);
self.metrics
.jemalloc_allocated_bytes
.set(jemalloc_allocated_bytes as i64);
self.metrics
.jemalloc_active_bytes
.set(jemalloc_active_bytes as i64);
self.metrics
.jvm_allocated_bytes
.set(jvm_allocated_bytes as i64);
self.metrics.jvm_active_bytes.set(jvm_active_bytes as i64);

Epoch::from_physical_time(watermark_time_ms)
}
}
Loading

0 comments on commit bebc963

Please sign in to comment.