Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): decouple memory manager tick interval with barrier (#19494) #19502

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,9 @@ pub struct StreamingDeveloperConfig {
#[serde(default = "default::developer::memory_controller_eviction_factor_stable")]
pub memory_controller_eviction_factor_stable: f64,

#[serde(default = "default::developer::memory_controller_update_interval_ms")]
pub memory_controller_update_interval_ms: usize,

#[serde(default = "default::developer::memory_controller_sequence_tls_step")]
pub memory_controller_sequence_tls_step: u64,

Expand Down Expand Up @@ -1146,11 +1149,11 @@ pub struct ObjectStoreConfig {
#[serde(default)]
pub s3: S3ObjectStoreConfig,

// TODO: the following field will be deprecated after opendal is stablized
// TODO: the following field will be deprecated after opendal is stabilized
#[serde(default = "default::object_store_config::opendal_upload_concurrency")]
pub opendal_upload_concurrency: usize,

// TODO: the following field will be deprecated after opendal is stablized
// TODO: the following field will be deprecated after opendal is stabilized
#[serde(default)]
pub opendal_writer_abort_on_err: bool,

Expand Down Expand Up @@ -1973,6 +1976,10 @@ pub mod default {
1.0
}

pub fn memory_controller_update_interval_ms() -> usize {
100
}

pub fn memory_controller_sequence_tls_step() -> u64 {
128
}
Expand Down
47 changes: 13 additions & 34 deletions src/compute/src/memory/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use risingwave_common::sequence::AtomicSequence;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_stream::executor::monitor::StreamingMetrics;

use super::controller::LruWatermarkController;
Expand Down Expand Up @@ -50,7 +48,7 @@ pub struct MemoryManager {
impl MemoryManager {
// Arbitrarily set a minimal barrier interval in case it is too small,
// especially when it's 0.
const MIN_TICK_INTERVAL_MS: u32 = 10;
const MIN_INTERVAL: Duration = Duration::from_millis(10);

pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
let controller = Mutex::new(LruWatermarkController::new(&config));
Expand All @@ -67,42 +65,23 @@ impl MemoryManager {
self.watermark_sequence.clone()
}

pub async fn run(
self: Arc<Self>,
initial_interval_ms: u32,
mut system_params_change_rx: tokio::sync::watch::Receiver<SystemParamsReaderRef>,
) {
pub async fn run(self: Arc<Self>, interval: Duration) {
// Loop interval of running control policy
let mut interval_ms = std::cmp::max(initial_interval_ms, Self::MIN_TICK_INTERVAL_MS);
tracing::info!(
"start running MemoryManager with interval {}ms",
interval_ms
);
let interval = std::cmp::max(interval, Self::MIN_INTERVAL);
tracing::info!("start running MemoryManager with interval {interval:?}",);

// Keep same interval with the barrier interval
let mut tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64));
let mut tick_interval = tokio::time::interval(interval);

loop {
// Wait for a while to check if need eviction.
tokio::select! {
Ok(_) = system_params_change_rx.changed() => {
let params = system_params_change_rx.borrow().load();
let new_interval_ms = std::cmp::max(params.barrier_interval_ms(), Self::MIN_TICK_INTERVAL_MS);
if new_interval_ms != interval_ms {
interval_ms = new_interval_ms;
tick_interval = tokio::time::interval(Duration::from_millis(interval_ms as u64));
tracing::info!("updated MemoryManager interval to {}ms", interval_ms);
}
}

_ = tick_interval.tick() => {
let new_watermark_sequence = self.controller.lock().unwrap().tick();

self.watermark_sequence.store(new_watermark_sequence, Ordering::Relaxed);

self.metrics.lru_runtime_loop_count.inc();
}
}
tick_interval.tick().await;

let new_watermark_sequence = self.controller.lock().unwrap().tick();

self.watermark_sequence
.store(new_watermark_sequence, Ordering::Relaxed);

self.metrics.lru_runtime_loop_count.inc();
}
}
}
12 changes: 8 additions & 4 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,14 @@ pub async fn compute_node_serve(
});

// Run a background memory manager
tokio::spawn(memory_mgr.clone().run(
system_params.barrier_interval_ms(),
system_params_manager.watch_params(),
));
tokio::spawn(
memory_mgr.clone().run(Duration::from_millis(
config
.streaming
.developer
.memory_controller_update_interval_ms as _,
)),
);

let heap_profiler = HeapProfiler::new(
opts.total_memory_bytes,
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ stream_memory_controller_threshold_stable = 0.72
stream_memory_controller_eviction_factor_aggressive = 2.0
stream_memory_controller_eviction_factor_graceful = 1.5
stream_memory_controller_eviction_factor_stable = 1.0
stream_memory_controller_update_interval_ms = 100
stream_memory_controller_sequence_tls_step = 128
stream_memory_controller_sequence_tls_lag = 32
stream_enable_arrangement_backfill = true
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/type_inference/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ fn implicit_ok(source: &DataType, target: &SigDataType, eq_ok: bool) -> bool {
/// Find the top `candidates` that match `inputs` on most non-null positions. This covers Rule 2,
/// 4a, 4c and 4d in [`PostgreSQL`](https://www.postgresql.org/docs/current/typeconv-func.html).
///
/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all posistions
/// * Rule 2 & 4c: Keep candidates that have most exact type matches. Exact match on all positions
/// is just a special case.
/// * Rule 4d: Break ties by selecting those that accept preferred types at most positions.
/// * Rule 4a: If the input cannot implicit cast to expected type at any position, this candidate is
Expand Down
Loading