Skip to content

Commit

Permalink
refactor(streaming): decouple memory manager tick interval with barrier
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <[email protected]>
  • Loading branch information
MrCroxx committed Nov 20, 2024
1 parent c325f42 commit fa99e7b
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 41 deletions.
11 changes: 9 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,9 @@ pub struct StreamingDeveloperConfig {
#[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
pub memory_controller_eviction_factor_stable: f64,

#[serde(default = "default::developer::memory_controller_update_interval_ms")]
pub memory_controller_update_interval_ms: usize,

#[serde(default = "default::developer::memory_controller_sequence_tls_step")]
pub memory_controller_sequence_tls_step: u64,

Expand Down Expand Up @@ -1203,11 +1206,11 @@ pub struct ObjectStoreConfig {
#[serde(default)]
pub s3: S3ObjectStoreConfig,

// TODO: the following field will be deprecated after opendal is stablized
// TODO: the following field will be deprecated after opendal is stabilized
#[serde(default = "default::object_store_config::opendal_upload_concurrency")]
pub opendal_upload_concurrency: usize,

// TODO: the following field will be deprecated after opendal is stablized
// TODO: the following field will be deprecated after opendal is stabilized
#[serde(default)]
pub opendal_writer_abort_on_err: bool,

Expand Down Expand Up @@ -2056,6 +2059,10 @@ pub mod default {
1.0
}

pub fn memory_controller_update_interval_ms() -> usize {
100
}

pub fn memory_controller_sequence_tls_step() -> u64 {
128
}
Expand Down
47 changes: 13 additions & 34 deletions src/compute/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use risingwave_common::sequence::AtomicSequence;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_stream::executor::monitor::StreamingMetrics;

use super::controller::LruWatermarkController;
Expand Down Expand Up @@ -50,7 +48,7 @@ pub struct MemoryManager {
impl MemoryManager {
// Arbitrarily set a minimal barrier interval in case it is too small,
// especially when it's 0.
const MIN_TICK_INTERVAL_MS: u32 = 10;
const MIN_INTERVAL: Duration = Duration::from_millis(10);

pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
let controller = Mutex::new(LruWatermarkController::new(&config));
Expand All @@ -67,42 +65,23 @@ impl MemoryManager {
self.watermark_sequence.clone()
}

pub async fn run(
self: Arc<Self>,
initial_interval_ms: u32,
mut system_params_change_rx: tokio::sync::watch::Receiver<SystemParamsReaderRef>,
) {
pub async fn run(self: Arc<Self>, interval: Duration) {
// Loop interval of running control policy
let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS);
tracing::info!(
"start running MemoryManager with interval {}ms",
interval_ms
);
let interval = std::cmp::max(interval, Self::MIN_INTERVAL);
tracing::info!("start running MemoryManager with interval {interval:?}",);

// Keep same interval with the barrier interval
let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64));
let mut tick_interval = tokio::time::interval(interval);

loop {
// Wait for a while to check if need eviction.
tokio::select! {
Ok(_) = system_params_change_rx.changed() => {
let params = system_params_change_rx.borrow().load();
let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS);
if new_interval_ms != interval_ms {
interval_ms = new_interval_ms;
tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64));
tracing::info!("updated MemoryManager interval to {}ms", interval_ms);
}
}

_ = tick_interval.tick() => {
let new_watermark_sequence = self.controller.lock().unwrap().tick();

self.watermark_sequence.store(new_watermark_sequence, Ordering::Relaxed);

self.metrics.lru_runtime_loop_count.inc();
}
}
tick_interval.tick().await;

let new_watermark_sequence = self.controller.lock().unwrap().tick();

self.watermark_sequence
.store(new_watermark_sequence, Ordering::Relaxed);

self.metrics.lru_runtime_loop_count.inc();
}
}
}
12 changes: 8 additions & 4 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,14 @@ pub async fn compute_node_serve(
});

// Run a background memory manager
tokio::spawn(memory_mgr.clone().run(
system_params.barrier_interval_ms(),
system_params_manager.watch_params(),
));
tokio::spawn(
memory_mgr.clone().run(Duration::from_millis(
config
.streaming
.developer
.memory_controller_update_interval_ms as _,
)),
);

let heap_profiler = HeapProfiler::new(
opts.total_memory_bytes,
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ stream_memory_controller_threshold_stable = 0.72
stream_memory_controller_eviction_factor_aggressive = 2.0
stream_memory_controller_eviction_factor_graceful = 1.5
stream_memory_controller_eviction_factor_stable = 1.0
stream_memory_controller_update_interval_ms = 100
stream_memory_controller_sequence_tls_step = 128
stream_memory_controller_sequence_tls_lag = 32
stream_enable_arrangement_backfill = true
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/type_inference/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ fn implicit_ok(source: &DataType, target: &SigDataType, eq_ok: bool) -> bool {
/// Find the top `candidates` that match `inputs` on most non-null positions. This covers Rule 2,
/// 4a, 4c and 4d in [`PostgreSQL`](https://www.postgresql.org/docs/current/typeconv-func.html).
///
/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all posistions
/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all positions
/// is just a special case.
/// * Rule 4d: Break ties by selecting those that accept preferred types at most positions.
/// * Rule 4a: If the input cannot implicit cast to expected type at any position, this candidate is
Expand Down

0 comments on commit fa99e7b

Please sign in to comment.