From bebc963125b6bebdfabe832d14bdd6fe294a43b1 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 27 Nov 2023 11:48:28 +0800 Subject: [PATCH] Squashed commit of the following: commit a3ea7c0bc48dc26c200739f5a852638a68186487 Author: Eric Fu Date: Sat Nov 25 11:00:48 2023 +0800 refactor: memory management (#13636) commit 2348a2bdcadfd682f13503a057f251b989e4ef5d Author: Bugen Zhao Date: Fri Nov 24 18:26:48 2023 +0800 fix(streaming): use correct label for `stream_fragment_exchange_bytes` metrics (#13644) Signed-off-by: Bugen Zhao commit 3ccb249eea311eecb9df1b17e92ebf6e903c57a4 Author: Runji Wang Date: Fri Nov 24 17:39:12 2023 +0800 fix: estimate jsonb's value encoding size (#13643) Signed-off-by: Runji Wang commit 7b21e04b2d2df1b0e30bf2a0a928ce2ca7bff6e4 Author: Dylan Date: Fri Nov 24 16:54:38 2023 +0800 feat(optimizer): improve inline session timezone in exprs (#13640) Signed-off-by: Little-Wallace --- .../src/estimate_size/collections/lru.rs | 2 +- src/common/src/types/jsonb.rs | 5 + src/common/src/util/value_encoding/mod.rs | 4 +- src/compute/src/lib.rs | 2 +- .../mod.rs => memory/config.rs} | 77 +------ src/compute/src/memory/controller.rs | 198 +++++++++++++++++ src/compute/src/memory/manager.rs | 94 ++++++++ src/compute/src/memory/mod.rs | 17 ++ .../src/memory_management/memory_manager.rs | 137 ------------ src/compute/src/memory_management/policy.rs | 209 ------------------ .../src/rpc/service/exchange_service.rs | 15 +- src/compute/src/server.rs | 14 +- .../tests/testdata/output/explain.yaml | 2 +- .../tests/testdata/output/nexmark.yaml | 2 +- src/frontend/src/expr/mod.rs | 2 +- src/frontend/src/expr/session_timezone.rs | 34 ++- .../src/optimizer/logical_optimization.rs | 2 +- src/frontend/src/optimizer/mod.rs | 12 +- src/stream/src/cache/managed_lru.rs | 4 +- 19 files changed, 388 insertions(+), 444 deletions(-) rename src/compute/src/{memory_management/mod.rs => memory/config.rs} (78%) create mode 100644 src/compute/src/memory/controller.rs create mode 100644 src/compute/src/memory/manager.rs create mode 100644 src/compute/src/memory/mod.rs delete mode 100644 src/compute/src/memory_management/memory_manager.rs delete mode 100644 src/compute/src/memory_management/policy.rs diff --git a/src/common/src/estimate_size/collections/lru.rs b/src/common/src/estimate_size/collections/lru.rs index cb220f50100e6..d6889050dc108 100644 --- a/src/common/src/estimate_size/collections/lru.rs +++ b/src/common/src/estimate_size/collections/lru.rs @@ -22,7 +22,7 @@ use super::{AtomicMutGuard, MutGuard}; use crate::estimate_size::{EstimateSize, KvSize}; /// The managed cache is a lru cache that bounds the memory usage by epoch. -/// Should be used with `GlobalMemoryManager`. +/// Should be used with `MemoryManager`. pub struct EstimatedLruCache { inner: LruCache, kv_heap_size: KvSize, diff --git a/src/common/src/types/jsonb.rs b/src/common/src/types/jsonb.rs index 6e27ff5344198..71f33a1d53822 100644 --- a/src/common/src/types/jsonb.rs +++ b/src/common/src/types/jsonb.rs @@ -374,6 +374,11 @@ impl<'a> JsonbRef<'a> { Serializer::with_formatter(FmtToIoUnchecked(f), PrettyFormatter::with_indent(b" ")); self.0.serialize(&mut ser).map_err(|_| std::fmt::Error) } + + /// Returns the capacity of the underlying buffer. + pub fn capacity(self) -> usize { + self.0.capacity() + } } /// A custom implementation for [`serde_json::ser::Formatter`] to match PostgreSQL, which adds extra diff --git a/src/common/src/util/value_encoding/mod.rs b/src/common/src/util/value_encoding/mod.rs index e3c4386f39a20..7068cc427735b 100644 --- a/src/common/src/util/value_encoding/mod.rs +++ b/src/common/src/util/value_encoding/mod.rs @@ -130,7 +130,6 @@ pub fn try_get_exact_serialize_datum_size(arr: &ArrayImpl) -> Option { ArrayImpl::Float32(_) => Some(4), ArrayImpl::Float64(_) => Some(8), ArrayImpl::Bool(_) => Some(1), - ArrayImpl::Jsonb(_) => Some(8), ArrayImpl::Decimal(_) => Some(estimate_serialize_decimal_size()), ArrayImpl::Interval(_) => Some(estimate_serialize_interval_size()), ArrayImpl::Date(_) => Some(estimate_serialize_date_size()), @@ -246,7 +245,8 @@ fn estimate_serialize_scalar_size(value: ScalarRefImpl<'_>) -> usize { ScalarRefImpl::Timestamp(_) => estimate_serialize_timestamp_size(), ScalarRefImpl::Timestamptz(_) => 8, ScalarRefImpl::Time(_) => estimate_serialize_time_size(), - ScalarRefImpl::Jsonb(_) => 8, + // not exact as we use internal encoding size to estimate the json string size + ScalarRefImpl::Jsonb(v) => v.capacity(), ScalarRefImpl::Struct(s) => estimate_serialize_struct_size(s), ScalarRefImpl::List(v) => estimate_serialize_list_size(v), } diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 8bc9093274333..96110b487536e 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -25,7 +25,7 @@ #[macro_use] extern crate tracing; -pub mod memory_management; +pub mod memory; pub mod observer; pub mod rpc; pub mod server; diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory/config.rs similarity index 78% rename from src/compute/src/memory_management/mod.rs rename to src/compute/src/memory/config.rs index 267a62b70e6e4..92cfc8e6004b0 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory/config.rs @@ -12,86 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod memory_manager; - -// Only enable the non-trivial policies on Linux as it relies on statistics from `jemalloc-ctl` -// which might be inaccurate on other platforms. -pub mod policy; - -use std::sync::atomic::AtomicU64; -use std::sync::Arc; - -use risingwave_batch::task::BatchManager; use risingwave_common::config::{StorageConfig, StorageMemoryConfig}; use risingwave_common::util::pretty_bytes::convert; -use risingwave_stream::task::LocalStreamManager; /// The minimal memory requirement of computing tasks in megabytes. pub const MIN_COMPUTE_MEMORY_MB: usize = 512; /// The memory reserved for system usage (stack and code segment of processes, allocation /// overhead, network buffer, etc.) in megabytes. pub const MIN_SYSTEM_RESERVED_MEMORY_MB: usize = 512; -pub const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.2; - -pub const STORAGE_MEMORY_PROPORTION: f64 = 0.3; - -pub const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1; - -pub const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3; - -pub const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096; -pub const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35; -pub const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3; -pub const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50; - -/// `MemoryControlStats` contains the state from previous control loop -#[derive(Default)] -pub struct MemoryControlStats { - pub jemalloc_allocated_bytes: usize, - pub jemalloc_active_bytes: usize, - pub jvm_allocated_bytes: usize, - pub jvm_active_bytes: usize, - pub lru_watermark_step: u64, - pub lru_watermark_time_ms: u64, - pub lru_physical_now_ms: u64, -} - -pub type MemoryControlRef = Box; - -pub trait MemoryControl: Send + Sync + std::fmt::Debug { - fn apply( - &self, - interval_ms: u32, - prev_memory_stats: MemoryControlStats, - batch_manager: Arc, - stream_manager: Arc, - watermark_epoch: Arc, - ) -> MemoryControlStats; -} -pub fn build_memory_control_policy(total_memory_bytes: usize) -> MemoryControlRef { - use self::policy::JemallocAndJvmMemoryControl; +const SYSTEM_RESERVED_MEMORY_PROPORTION: f64 = 0.2; - Box::new(JemallocAndJvmMemoryControl::new(total_memory_bytes)) -} - -/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control -/// is disabled on non-Linux OS. -#[derive(Debug)] -pub struct DummyPolicy; +const STORAGE_MEMORY_PROPORTION: f64 = 0.3; -impl MemoryControl for DummyPolicy { - fn apply( - &self, - _interval_ms: u32, - _prev_memory_stats: MemoryControlStats, - _batch_manager: Arc, - _stream_manager: Arc, - _watermark_epoch: Arc, - ) -> MemoryControlStats { - MemoryControlStats::default() - } -} +const COMPACTOR_MEMORY_PROPORTION: f64 = 0.1; +const STORAGE_BLOCK_CACHE_MEMORY_PROPORTION: f64 = 0.3; +const STORAGE_META_CACHE_MAX_MEMORY_MB: usize = 4096; +const STORAGE_META_CACHE_MEMORY_PROPORTION: f64 = 0.35; +const STORAGE_SHARED_BUFFER_MEMORY_PROPORTION: f64 = 0.3; +const STORAGE_DEFAULT_HIGH_PRIORITY_BLOCK_CACHE_RATIO: usize = 50; /// Each compute node reserves some memory for stack and code segment of processes, allocation /// overhead, network buffer, etc. based on `SYSTEM_RESERVED_MEMORY_PROPORTION`. The reserve memory diff --git a/src/compute/src/memory/controller.rs b/src/compute/src/memory/controller.rs new file mode 100644 index 0000000000000..5c9518293f38a --- /dev/null +++ b/src/compute/src/memory/controller.rs @@ -0,0 +1,198 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_common::util::epoch::Epoch; +use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats; +use risingwave_stream::executor::monitor::StreamingMetrics; + +/// Internal state of [`LruWatermarkController`] that saves the state in previous tick. +struct State { + pub used_memory_bytes: usize, + pub lru_watermark_step: u64, + pub lru_watermark_time_ms: u64, +} + +impl Default for State { + fn default() -> Self { + let physical_now = Epoch::physical_now(); + Self { + used_memory_bytes: 0, + lru_watermark_step: 0, + lru_watermark_time_ms: physical_now, + } + } +} + +/// `LruWatermarkController` controls LRU Watermark (epoch) according to actual memory usage statistics +/// collected from Jemalloc and JVM. +/// +/// Basically, it works as a negative feedback loop: collect memory statistics, and then set the LRU watarmarking +/// according to maintain the memory usage in a proper level. +/// +/// ```text +/// ┌───────────────────┐ ┌───────────────────┐ +/// │ INPUT │ │ OUTPUT │ +/// ┌───► (observe) ├───────►│ (control) ├───┐ +/// │ │ Memory Statistics │ │ New LRU Watermark │ │ +/// │ └───────────────────┘ └───────────────────┘ │ +/// │ │ +/// └────────────────────────────────────────────────────────┘ +/// ``` +/// +/// Check the function [`Self::tick()`] to see the control policy. +pub struct LruWatermarkController { + metrics: Arc, + + threshold_stable: usize, + threshold_graceful: usize, + threshold_aggressive: usize, + + /// The state from previous tick + state: State, +} + +impl LruWatermarkController { + // TODO(eric): make them configurable + const THRESHOLD_AGGRESSIVE: f64 = 0.9; + const THRESHOLD_GRACEFUL: f64 = 0.8; + const THRESHOLD_STABLE: f64 = 0.7; + + pub fn new(total_memory: usize, metrics: Arc) -> Self { + let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize; + let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize; + let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize; + + Self { + metrics, + threshold_stable, + threshold_graceful, + threshold_aggressive, + state: State::default(), + } + } +} + +impl std::fmt::Debug for LruWatermarkController { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LruWatermarkController") + .field("threshold_stable", &self.threshold_stable) + .field("threshold_graceful", &self.threshold_graceful) + .field("threshold_aggressive", &self.threshold_aggressive) + .finish() + } +} + +/// Get memory statistics from Jemalloc +/// +/// - `stats.allocated`: Total number of bytes allocated by the application. +/// - `stats.active`: Total number of bytes in active pages allocated by the application. This is a multiple of the page size, and greater than or equal to `stats.allocated`. This does not include `stats.arenas..pdirty`, `stats.arenas..pmuzzy`, nor pages entirely devoted to allocator metadata. +/// +/// Reference: +fn jemalloc_memory_stats() -> (usize, usize) { + if let Err(e) = tikv_jemalloc_ctl::epoch::advance() { + tracing::warn!("Jemalloc epoch advance failed! {:?}", e); + } + let allocated = tikv_jemalloc_ctl::stats::allocated::read().unwrap(); + let active = tikv_jemalloc_ctl::stats::active::read().unwrap(); + (allocated, active) +} + +impl LruWatermarkController { + pub fn tick(&mut self, interval_ms: u32) -> Epoch { + // NOTE: Be careful! The meaning of `allocated` and `active` differ in JeMalloc and JVM + let (jemalloc_allocated_bytes, jemalloc_active_bytes) = jemalloc_memory_stats(); + let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats(); + + let cur_used_memory_bytes = jemalloc_active_bytes + jvm_allocated_bytes; + + let last_step = self.state.lru_watermark_step; + let last_used_memory_bytes = self.state.used_memory_bytes; + + // The watermark calculation works in the following way: + // + // 1. When the streaming memory usage is below the graceful threshold, we do not evict + // any caches, and simply reset the step to 0. + // + // 2. When the memory usage is between the graceful and aggressive threshold: + // - If the last eviction memory usage decreases after last eviction, we set the eviction step + // to 1. + // - Otherwise, we set the step to last_step + 1. + // + // 3. When the memory usage exceeds the aggressive threshold: + // - If the memory usage decreases after the last eviction, we set the eviction step to + // last_step. + // - Otherwise, we set the step to last_step * 2. + let mut step = if cur_used_memory_bytes < self.threshold_stable { + // Do not evict if the memory usage is lower than `threshold_stable` + 0 + } else if cur_used_memory_bytes < self.threshold_graceful { + // Evict in equal speed of time before `threshold_graceful` + 1 + } else if cur_used_memory_bytes < self.threshold_aggressive { + // Gracefully evict + if last_used_memory_bytes > cur_used_memory_bytes { + 1 + } else { + last_step + 1 + } + } else if last_used_memory_bytes < cur_used_memory_bytes { + // Aggressively evict + if last_step == 0 { + 2 + } else { + last_step * 2 + } + } else { + last_step + }; + + let physical_now = Epoch::physical_now(); + let watermark_time_ms = + if (physical_now - self.state.lru_watermark_time_ms) / (interval_ms as u64) < step { + // We do not increase the step and watermark here to prevent a too-advanced watermark. The + // original condition is `prev_watermark_time_ms + interval_ms * step > now`. + step = last_step; + physical_now + } else { + // Increase by (steps * interval) + self.state.lru_watermark_time_ms + (interval_ms as u64 * step) + }; + + self.state = State { + used_memory_bytes: cur_used_memory_bytes, + lru_watermark_step: step, + lru_watermark_time_ms: watermark_time_ms, + }; + + self.metrics + .lru_current_watermark_time_ms + .set(watermark_time_ms as i64); + self.metrics.lru_physical_now_ms.set(physical_now as i64); + self.metrics.lru_watermark_step.set(step as i64); + self.metrics + .jemalloc_allocated_bytes + .set(jemalloc_allocated_bytes as i64); + self.metrics + .jemalloc_active_bytes + .set(jemalloc_active_bytes as i64); + self.metrics + .jvm_allocated_bytes + .set(jvm_allocated_bytes as i64); + self.metrics.jvm_active_bytes.set(jvm_active_bytes as i64); + + Epoch::from_physical_time(watermark_time_ms) + } +} diff --git a/src/compute/src/memory/manager.rs b/src/compute/src/memory/manager.rs new file mode 100644 index 0000000000000..09c20d4e5d5b2 --- /dev/null +++ b/src/compute/src/memory/manager.rs @@ -0,0 +1,94 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use risingwave_common::system_param::local_manager::SystemParamsReaderRef; +use risingwave_stream::executor::monitor::StreamingMetrics; + +use super::controller::LruWatermarkController; + +/// Compute node uses [`MemoryManager`] to limit the memory usage. +pub struct MemoryManager { + /// All cached data before the watermark should be evicted. + watermark_epoch: Arc, + + metrics: Arc, + + controller: Mutex, +} + +impl MemoryManager { + // Arbitrarily set a minimal barrier interval in case it is too small, + // especially when it's 0. + const MIN_TICK_INTERVAL_MS: u32 = 10; + + pub fn new(metrics: Arc, total_memory_bytes: usize) -> Arc { + let controller = Mutex::new(LruWatermarkController::new( + total_memory_bytes, + metrics.clone(), + )); + tracing::info!("LRU watermark controller: {:?}", &controller); + + Arc::new(Self { + watermark_epoch: Arc::new(0.into()), + metrics, + controller, + }) + } + + pub fn get_watermark_epoch(&self) -> Arc { + self.watermark_epoch.clone() + } + + pub async fn run( + self: Arc, + initial_interval_ms: u32, + mut system_params_change_rx: tokio::sync::watch::Receiver, + ) { + // Loop interval of running control policy + let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS); + tracing::info!( + "start running MemoryManager with interval {}ms", + interval_ms + ); + + // Keep same interval with the barrier interval + let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); + + loop { + // Wait for a while to check if need eviction. + tokio::select! { + Ok(_) = system_params_change_rx.changed() => { + let params = system_params_change_rx.borrow().load(); + let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); + if new_interval_ms != interval_ms { + interval_ms = new_interval_ms; + tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); + tracing::info!("updated MemoryManager interval to {}ms", interval_ms); + } + } + + _ = tick_interval.tick() => { + let new_watermark_epoch = self.controller.lock().unwrap().tick(interval_ms); + self.watermark_epoch.store(new_watermark_epoch.0, Ordering::Relaxed); + + self.metrics.lru_runtime_loop_count.inc(); + } + } + } + } +} diff --git a/src/compute/src/memory/mod.rs b/src/compute/src/memory/mod.rs new file mode 100644 index 0000000000000..896389028fb12 --- /dev/null +++ b/src/compute/src/memory/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod controller; +pub mod manager; diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs deleted file mode 100644 index 721b39046572a..0000000000000 --- a/src/compute/src/memory_management/memory_manager.rs +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::atomic::AtomicU64; -use std::sync::Arc; -use std::time::Duration; - -use risingwave_batch::task::BatchManager; -use risingwave_common::system_param::local_manager::SystemParamsReaderRef; -use risingwave_common::util::epoch::Epoch; -use risingwave_stream::executor::monitor::StreamingMetrics; -use risingwave_stream::task::LocalStreamManager; - -use super::MemoryControlRef; -use crate::memory_management::{build_memory_control_policy, MemoryControlStats}; - -/// Compute node uses [`GlobalMemoryManager`] to limit the memory usage. -pub struct GlobalMemoryManager { - /// All cached data before the watermark should be evicted. - watermark_epoch: Arc, - - metrics: Arc, - /// The memory control policy for computing tasks. - memory_control_policy: MemoryControlRef, -} - -pub type GlobalMemoryManagerRef = Arc; - -impl GlobalMemoryManager { - // Arbitrarily set a minimal barrier interval in case it is too small, - // especially when it's 0. - const MIN_TICK_INTERVAL_MS: u32 = 10; - - pub fn new(metrics: Arc, total_memory_bytes: usize) -> Arc { - let memory_control_policy = build_memory_control_policy(total_memory_bytes); - tracing::info!("memory control policy: {:?}", &memory_control_policy); - - Arc::new(Self { - watermark_epoch: Arc::new(0.into()), - metrics, - memory_control_policy, - }) - } - - pub fn get_watermark_epoch(&self) -> Arc { - self.watermark_epoch.clone() - } - - /// Memory manager will get memory usage statistics from batch and streaming and perform memory - /// control accordingly. - pub async fn run( - self: Arc, - batch_manager: Arc, - stream_manager: Arc, - initial_interval_ms: u32, - mut system_params_change_rx: tokio::sync::watch::Receiver, - ) { - // Loop interval of running control policy - let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS); - tracing::info!( - "start running GlobalMemoryManager with interval {}ms", - interval_ms - ); - - // Keep same interval with the barrier interval - let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); - - let mut memory_control_stats = MemoryControlStats { - jemalloc_allocated_bytes: 0, - jemalloc_active_bytes: 0, - jvm_allocated_bytes: 0, - jvm_active_bytes: 0, - lru_watermark_step: 0, - lru_watermark_time_ms: Epoch::physical_now(), - lru_physical_now_ms: Epoch::physical_now(), - }; - - loop { - // Wait for a while to check if need eviction. - tokio::select! { - Ok(_) = system_params_change_rx.changed() => { - let params = system_params_change_rx.borrow().load(); - let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS); - if new_interval_ms != interval_ms { - interval_ms = new_interval_ms; - tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64)); - tracing::info!("updated GlobalMemoryManager interval to {}ms", interval_ms); - } - } - - _ = tick_interval.tick() => { - memory_control_stats = self.memory_control_policy.apply( - interval_ms, - memory_control_stats, - batch_manager.clone(), - stream_manager.clone(), - self.watermark_epoch.clone(), - ); - - self.metrics - .lru_current_watermark_time_ms - .set(memory_control_stats.lru_watermark_time_ms as i64); - self.metrics - .lru_physical_now_ms - .set(memory_control_stats.lru_physical_now_ms as i64); - self.metrics - .lru_watermark_step - .set(memory_control_stats.lru_watermark_step as i64); - self.metrics.lru_runtime_loop_count.inc(); - self.metrics - .jemalloc_allocated_bytes - .set(memory_control_stats.jemalloc_allocated_bytes as i64); - self.metrics - .jemalloc_active_bytes - .set(memory_control_stats.jemalloc_active_bytes as i64); - self.metrics - .jvm_allocated_bytes - .set(memory_control_stats.jvm_allocated_bytes as i64); - self.metrics - .jvm_active_bytes - .set(memory_control_stats.jvm_active_bytes as i64); - } - } - } - } -} diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs deleted file mode 100644 index 1eb5ff9832600..0000000000000 --- a/src/compute/src/memory_management/policy.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::atomic::AtomicU64; -use std::sync::Arc; - -use risingwave_batch::task::BatchManager; -use risingwave_common::util::epoch::Epoch; -use risingwave_jni_core::jvm_runtime::load_jvm_memory_stats; -use risingwave_stream::task::LocalStreamManager; -use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats}; - -use super::{MemoryControl, MemoryControlStats}; - -/// `JemallocAndJvmMemoryControl` is a memory control policy that uses jemalloc statistics and -/// jvm memory statistics and to control. It assumes that most memory is used by streaming engine -/// and does memory control over LRU watermark based on jemalloc statistics and jvm memory statistics. -pub struct JemallocAndJvmMemoryControl { - threshold_stable: usize, - threshold_graceful: usize, - threshold_aggressive: usize, - - jemalloc_epoch_mib: tikv_jemalloc_ctl::epoch_mib, - jemalloc_allocated_mib: jemalloc_stats::allocated_mib, - jemalloc_active_mib: jemalloc_stats::active_mib, -} - -impl JemallocAndJvmMemoryControl { - const THRESHOLD_AGGRESSIVE: f64 = 0.9; - const THRESHOLD_GRACEFUL: f64 = 0.8; - const THRESHOLD_STABLE: f64 = 0.7; - - pub fn new(total_memory: usize) -> Self { - let threshold_stable = (total_memory as f64 * Self::THRESHOLD_STABLE) as usize; - let threshold_graceful = (total_memory as f64 * Self::THRESHOLD_GRACEFUL) as usize; - let threshold_aggressive = (total_memory as f64 * Self::THRESHOLD_AGGRESSIVE) as usize; - - let jemalloc_epoch_mib = jemalloc_epoch::mib().unwrap(); - let jemalloc_allocated_mib = jemalloc_stats::allocated::mib().unwrap(); - let jemalloc_active_mib = jemalloc_stats::active::mib().unwrap(); - - Self { - threshold_stable, - threshold_graceful, - threshold_aggressive, - jemalloc_epoch_mib, - jemalloc_allocated_mib, - jemalloc_active_mib, - } - } - - fn advance_jemalloc_epoch( - &self, - prev_jemalloc_allocated_bytes: usize, - prev_jemalloc_active_bytes: usize, - ) -> (usize, usize) { - if let Err(e) = self.jemalloc_epoch_mib.advance() { - tracing::warn!("Jemalloc epoch advance failed! {:?}", e); - } - - ( - self.jemalloc_allocated_mib.read().unwrap_or_else(|e| { - tracing::warn!("Jemalloc read allocated failed! {:?}", e); - prev_jemalloc_allocated_bytes - }), - self.jemalloc_active_mib.read().unwrap_or_else(|e| { - tracing::warn!("Jemalloc read active failed! {:?}", e); - prev_jemalloc_active_bytes - }), - ) - } -} - -impl std::fmt::Debug for JemallocAndJvmMemoryControl { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("JemallocMemoryControl") - .field("threshold_stable", &self.threshold_stable) - .field("threshold_graceful", &self.threshold_graceful) - .field("threshold_aggressive", &self.threshold_aggressive) - .finish() - } -} - -impl MemoryControl for JemallocAndJvmMemoryControl { - fn apply( - &self, - interval_ms: u32, - prev_memory_stats: MemoryControlStats, - _batch_manager: Arc, - _stream_manager: Arc, - watermark_epoch: Arc, - ) -> MemoryControlStats { - let (jemalloc_allocated_bytes, jemalloc_active_bytes) = self.advance_jemalloc_epoch( - prev_memory_stats.jemalloc_allocated_bytes, - prev_memory_stats.jemalloc_active_bytes, - ); - - let (jvm_allocated_bytes, jvm_active_bytes) = load_jvm_memory_stats(); - - // Streaming memory control - // - // We calculate the watermark of the LRU cache, which provides hints for streaming executors - // on cache eviction. Here we do the calculation based on jemalloc statistics. - - let (lru_watermark_step, lru_watermark_time_ms, lru_physical_now) = calculate_lru_watermark( - jemalloc_allocated_bytes + jvm_allocated_bytes, - self.threshold_stable, - self.threshold_graceful, - self.threshold_aggressive, - interval_ms, - prev_memory_stats, - ); - - set_lru_watermark_time_ms(watermark_epoch, lru_watermark_time_ms); - - MemoryControlStats { - jemalloc_allocated_bytes, - jemalloc_active_bytes, - jvm_allocated_bytes, - jvm_active_bytes, - lru_watermark_step, - lru_watermark_time_ms, - lru_physical_now_ms: lru_physical_now, - } - } -} - -fn calculate_lru_watermark( - cur_used_memory_bytes: usize, - threshold_stable: usize, - threshold_graceful: usize, - threshold_aggressive: usize, - interval_ms: u32, - prev_memory_stats: MemoryControlStats, -) -> (u64, u64, u64) { - let mut watermark_time_ms = prev_memory_stats.lru_watermark_time_ms; - let last_step = prev_memory_stats.lru_watermark_step; - let last_used_memory_bytes = - prev_memory_stats.jemalloc_allocated_bytes + prev_memory_stats.jvm_allocated_bytes; - - // The watermark calculation works in the following way: - // - // 1. When the streaming memory usage is below the graceful threshold, we do not evict - // any caches, and simply reset the step to 0. - // - // 2. When the memory usage is between the graceful and aggressive threshold: - // - If the last eviction memory usage decreases after last eviction, we set the eviction step - // to 1. - // - Otherwise, we set the step to last_step + 1. - // - // 3. When the memory usage exceeds the aggressive threshold: - // - If the memory usage decreases after the last eviction, we set the eviction step to - // last_step. - // - Otherwise, we set the step to last_step * 2. - - let mut step = if cur_used_memory_bytes < threshold_stable { - // Do not evict if the memory usage is lower than `threshold_stable` - 0 - } else if cur_used_memory_bytes < threshold_graceful { - // Evict in equal speed of time before `threshold_graceful` - 1 - } else if cur_used_memory_bytes < threshold_aggressive { - // Gracefully evict - if last_used_memory_bytes > cur_used_memory_bytes { - 1 - } else { - last_step + 1 - } - } else if last_used_memory_bytes < cur_used_memory_bytes { - // Aggressively evict - if last_step == 0 { - 2 - } else { - last_step * 2 - } - } else { - last_step - }; - - let physical_now = Epoch::physical_now(); - if (physical_now - prev_memory_stats.lru_watermark_time_ms) / (interval_ms as u64) < step { - // We do not increase the step and watermark here to prevent a too-advanced watermark. The - // original condition is `prev_watermark_time_ms + interval_ms * step > now`. - step = last_step; - watermark_time_ms = physical_now; - } else { - watermark_time_ms += interval_ms as u64 * step; - } - - (step, watermark_time_ms, physical_now) -} - -fn set_lru_watermark_time_ms(watermark_epoch: Arc, time_ms: u64) { - use std::sync::atomic::Ordering; - - let epoch = Epoch::from_physical_time(time_ms).0; - watermark_epoch.as_ref().store(epoch, Ordering::Relaxed); -} diff --git a/src/compute/src/rpc/service/exchange_service.rs b/src/compute/src/rpc/service/exchange_service.rs index 6225cef2a7e30..955eaa729cdb5 100644 --- a/src/compute/src/rpc/service/exchange_service.rs +++ b/src/compute/src/rpc/service/exchange_service.rs @@ -83,7 +83,12 @@ impl ExchangeService for ExchangeServiceImpl { let mut request_stream: Streaming = request.into_inner(); // Extract the first `Get` request from the stream. - let get_req = { + let Get { + up_actor_id, + down_actor_id, + up_fragment_id, + down_fragment_id, + } = { let req = request_stream .next() .await @@ -94,8 +99,10 @@ impl ExchangeService for ExchangeServiceImpl { } }; - let up_down_actor_ids = (get_req.up_actor_id, get_req.down_actor_id); - let receiver = self.stream_mgr.take_receiver(up_down_actor_ids).await?; + let receiver = self + .stream_mgr + .take_receiver((up_actor_id, down_actor_id)) + .await?; // Map the remaining stream to add-permits. let add_permits_stream = request_stream.map_ok(|req| match req.value.unwrap() { @@ -108,7 +115,7 @@ impl ExchangeService for ExchangeServiceImpl { peer_addr, receiver, add_permits_stream, - up_down_actor_ids, + (up_fragment_id, down_fragment_id), ))) } } diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 48a9be89c952d..63cfc0b2519ce 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -66,10 +66,8 @@ use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; use tower::Layer; -use crate::memory_management::memory_manager::GlobalMemoryManager; -use crate::memory_management::{ - reserve_memory_bytes, storage_memory_config, MIN_COMPUTE_MEMORY_MB, -}; +use crate::memory::config::{reserve_memory_bytes, storage_memory_config, MIN_COMPUTE_MEMORY_MB}; +use crate::memory::manager::MemoryManager; use crate::observer::observer_manager::ComputeObserverNode; use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::GLOBAL_EXCHANGE_SERVICE_METRICS; @@ -272,10 +270,6 @@ pub async fn compute_node_serve( await_tree_config.clone(), )); - // Spawn LRU Manager that have access to collect memory from batch mgr and stream mgr. - let batch_mgr_clone = batch_mgr.clone(); - let stream_mgr_clone = stream_mgr.clone(); - // NOTE: Due to some limits, we use `compute_memory_bytes + storage_memory_bytes` as // `total_compute_memory_bytes` for memory control. This is just a workaround for some // memory control issues and should be modified as soon as we figure out a better solution. @@ -283,15 +277,13 @@ pub async fn compute_node_serve( // Related issues: // - https://github.com/risingwavelabs/risingwave/issues/8696 // - https://github.com/risingwavelabs/risingwave/issues/8822 - let memory_mgr = GlobalMemoryManager::new( + let memory_mgr = MemoryManager::new( streaming_metrics.clone(), compute_memory_bytes + storage_memory_bytes, ); // Run a background memory manager tokio::spawn(memory_mgr.clone().run( - batch_mgr_clone, - stream_mgr_clone, system_params.barrier_interval_ms(), system_params_manager.watch_params(), )); diff --git a/src/frontend/planner_test/tests/testdata/output/explain.yaml b/src/frontend/planner_test/tests/testdata/output/explain.yaml index 585d1bca15f77..d0ebf04140059 100644 --- a/src/frontend/planner_test/tests/testdata/output/explain.yaml +++ b/src/frontend/planner_test/tests/testdata/output/explain.yaml @@ -50,7 +50,7 @@ "stages": { "0": { "root": { - "plan_node_id": 10013, + "plan_node_id": 10010, "plan_node_type": "BatchValues", "schema": [ { diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 592696190a862..579e87e17e021 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1127,7 +1127,7 @@ JOIN side_input FOR SYSTEM_TIME AS OF PROCTIME() S ON mod(B.auction, 10000) = S.key sink_plan: |- - StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10022(hidden), side_input.key(hidden)] } + StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index ae4e0f84bc07c..2e8b87d89ee88 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -62,7 +62,7 @@ pub use now::{InlineNowProcTime, Now, NowProcTimeFinder}; pub use parameter::Parameter; pub use pure::*; pub use risingwave_pb::expr::expr_node::Type as ExprType; -pub use session_timezone::SessionTimezone; +pub use session_timezone::{SessionTimezone, TimestamptzExprFinder}; pub use subquery::{Subquery, SubqueryKind}; pub use table_function::{TableFunction, TableFunctionType}; pub use type_inference::{ diff --git a/src/frontend/src/expr/session_timezone.rs b/src/frontend/src/expr/session_timezone.rs index 96f5f1d69148d..a411ddfcc5738 100644 --- a/src/frontend/src/expr/session_timezone.rs +++ b/src/frontend/src/expr/session_timezone.rs @@ -17,7 +17,7 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType; pub use crate::expr::expr_rewriter::ExprRewriter; pub use crate::expr::function_call::FunctionCall; -use crate::expr::{Expr, ExprImpl}; +use crate::expr::{Expr, ExprImpl, ExprVisitor}; use crate::session::current; /// `SessionTimezone` will be used to resolve session @@ -264,3 +264,35 @@ impl SessionTimezone { .into() } } + +#[derive(Default)] +pub struct TimestamptzExprFinder { + has: bool, +} + +impl TimestamptzExprFinder { + pub fn has(&self) -> bool { + self.has + } +} + +impl ExprVisitor for TimestamptzExprFinder { + fn visit_function_call(&mut self, func_call: &FunctionCall) { + if func_call.return_type() == DataType::Timestamptz { + self.has = true; + return; + } + + for input in &func_call.inputs { + if input.return_type() == DataType::Timestamptz { + self.has = true; + return; + } + } + + func_call + .inputs() + .iter() + .for_each(|expr| self.visit_expr(expr)); + } +} diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index abb007355815f..2c8410cae22ca 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -489,7 +489,7 @@ impl LogicalOptimizer { } pub fn inline_now_proc_time(plan: PlanRef, ctx: &OptimizerContextRef) -> PlanRef { - // If now() and proctime() are no found, bail out. + // If now() and proctime() are not found, bail out. let mut v = NowProcTimeFinder::default(); plan.visit_exprs_recursive(&mut v); if !v.has() { diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index a81f29a60c932..de16b1893de10 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -60,10 +60,11 @@ use self::plan_visitor::{has_batch_exchange, CardinalityVisitor}; use self::property::{Cardinality, RequiredDist}; use self::rule::*; use crate::catalog::table_catalog::{TableType, TableVersion}; +use crate::expr::TimestamptzExprFinder; use crate::optimizer::plan_node::generic::Union; use crate::optimizer::plan_node::{ BatchExchange, PlanNodeType, PlanTreeNode, RewriteExprsRecursive, StreamExchange, StreamUnion, - ToStream, + ToStream, VisitExprsRecursive, }; use crate::optimizer::plan_visitor::TemporalJoinValidator; use crate::optimizer::property::Distribution; @@ -739,8 +740,13 @@ fn const_eval_exprs(plan: PlanRef) -> Result { } fn inline_session_timezone_in_exprs(ctx: OptimizerContextRef, plan: PlanRef) -> Result { - let plan = plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut()); - Ok(plan) + let mut v = TimestamptzExprFinder::default(); + plan.visit_exprs_recursive(&mut v); + if v.has() { + Ok(plan.rewrite_exprs_recursive(ctx.session_timezone().deref_mut())) + } else { + Ok(plan) + } } fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bool) -> bool { diff --git a/src/stream/src/cache/managed_lru.rs b/src/stream/src/cache/managed_lru.rs index 0608e429f4bb7..85b4e0081fc0b 100644 --- a/src/stream/src/cache/managed_lru.rs +++ b/src/stream/src/cache/managed_lru.rs @@ -30,11 +30,11 @@ use crate::common::metrics::MetricsInfo; const REPORT_SIZE_EVERY_N_KB_CHANGE: usize = 4096; /// The managed cache is a lru cache that bounds the memory usage by epoch. -/// Should be used with `GlobalMemoryManager`. +/// Should be used with `MemoryManager`. pub struct ManagedLruCache { inner: LruCache, /// The entry with epoch less than water should be evicted. - /// Should only be updated by the `GlobalMemoryManager`. + /// Should only be updated by the `MemoryManager`. watermark_epoch: Arc, /// The heap size of keys/values kv_heap_size: usize,