diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 02b97bf6e618c..ddb4ed8ec1c47 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -552,6 +552,9 @@ pub struct StorageConfig { #[serde(default = "default::storage::max_preload_wait_time_mill")] pub max_preload_wait_time_mill: u64, + #[serde(default = "default::storage::max_version_pinning_duration_sec")] + pub max_version_pinning_duration_sec: u64, + #[serde(default = "default::storage::object_store_streaming_read_timeout_ms")] pub object_store_streaming_read_timeout_ms: u64, #[serde(default = "default::storage::object_store_streaming_upload_timeout_ms")] @@ -1067,6 +1070,10 @@ pub mod default { 0 } + pub fn max_version_pinning_duration_sec() -> u64 { + 3 * 3600 + } + pub fn object_store_streaming_read_timeout_ms() -> u64 { 10 * 60 * 1000 } diff --git a/src/config/example.toml b/src/config/example.toml index 8e8a2ac0bb406..c5c9bb3aef021 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -98,6 +98,7 @@ min_sst_size_for_streaming_upload = 33554432 max_sub_compaction = 4 max_concurrent_compaction_task_number = 16 max_preload_wait_time_mill = 0 +max_version_pinning_duration_sec = 10800 object_store_streaming_read_timeout_ms = 600000 object_store_streaming_upload_timeout_ms = 600000 object_store_upload_timeout_ms = 3600000 diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index a578cb571c706..5a3ba0384a52c 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -628,7 +628,7 @@ impl HummockManager { /// Unpin all pins which belongs to `context_id` and has an id which is older than /// `unpin_before`. All versions >= `unpin_before` will be treated as if they are all pinned by - /// this `context_id` so they will not be vacummed. + /// this `context_id` so they will not be vacuumed. #[named] pub async fn unpin_version_before( &self, @@ -646,6 +646,12 @@ impl HummockManager { min_pinned_id: 0, }, ); + assert!( + context_pinned_version.min_pinned_id <= unpin_before, + "val must be monotonically non-decreasing. old = {}, new = {}.", + context_pinned_version.min_pinned_id, + unpin_before + ); context_pinned_version.min_pinned_id = unpin_before; commit_multi_var!( self, diff --git a/src/storage/src/hummock/local_version/pinned_version.rs b/src/storage/src/hummock/local_version/pinned_version.rs index 636533d35403f..876a2f9a1f6e0 100644 --- a/src/storage/src/hummock/local_version/pinned_version.rs +++ b/src/storage/src/hummock/local_version/pinned_version.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap}; use std::iter::empty; use std::ops::Deref; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use auto_enums::auto_enum; use risingwave_common::catalog::TableId; @@ -167,6 +167,7 @@ impl PinnedVersion { pub(crate) async fn start_pinned_version_worker( mut rx: UnboundedReceiver, hummock_meta_client: Arc, + max_version_pinning_duration_sec: u64, ) { let min_execute_interval = Duration::from_millis(1000); let max_retry_interval = Duration::from_secs(10); @@ -180,21 +181,34 @@ pub(crate) async fn start_pinned_version_worker( min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let mut need_unpin = false; - let mut version_ids_in_use: BTreeMap = BTreeMap::new(); - + let mut version_ids_in_use: BTreeMap = BTreeMap::new(); + let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec); // For each run in the loop, accumulate versions to unpin and call unpin RPC once. loop { min_execute_interval_tick.tick().await; + // 0. Expire versions. + while version_ids_in_use.len() > 1 && let Some(e) = version_ids_in_use.first_entry() { + if e.get().1.elapsed() < max_version_pinning_duration_sec { + break; + } + need_unpin = true; + e.remove(); + } + // 1. Collect new versions to unpin. let mut versions_to_unpin = vec![]; + let inst = Instant::now(); 'collect: loop { match rx.try_recv() { Ok(version_action) => match version_action { PinVersionAction::Pin(version_id) => { version_ids_in_use .entry(version_id) - .and_modify(|counter| *counter += 1) - .or_insert(1); + .and_modify(|e| { + e.0 += 1; + e.1 = inst; + }) + .or_insert((1, inst)); } PinVersionAction::Unpin(version_id) => { versions_to_unpin.push(version_id); @@ -220,13 +234,16 @@ pub(crate) async fn start_pinned_version_worker( for version in &versions_to_unpin { match version_ids_in_use.get_mut(version) { - Some(counter) => { + Some((counter, _)) => { *counter -= 1; if *counter == 0 { version_ids_in_use.remove(version); } } - None => tracing::warn!("version {} to unpin dose not exist", version), + None => tracing::warn!( + "version {} to unpin does not exist, may already be unpinned due to expiration", + version + ), } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index e0125263df0a5..16aa9d474de0a 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -156,6 +156,7 @@ impl HummockStorage { tokio::spawn(start_pinned_version_worker( pin_version_rx, hummock_meta_client.clone(), + options.max_version_pinning_duration_sec, )); let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager( filter_key_extractor_manager.clone(), diff --git a/src/storage/src/opts.rs b/src/storage/src/opts.rs index 16112d1ae9f24..2fb02e61512c0 100644 --- a/src/storage/src/opts.rs +++ b/src/storage/src/opts.rs @@ -63,6 +63,7 @@ pub struct StorageOpts { /// Max sub compaction task numbers pub max_sub_compaction: u32, pub max_concurrent_compaction_task_number: u64, + pub max_version_pinning_duration_sec: u64, pub data_file_cache_dir: String, pub data_file_cache_capacity_mb: usize, @@ -164,6 +165,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload, max_sub_compaction: c.storage.max_sub_compaction, max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number, + max_version_pinning_duration_sec: c.storage.max_version_pinning_duration_sec, data_file_cache_dir: c.storage.data_file_cache.dir.clone(), data_file_cache_capacity_mb: c.storage.data_file_cache.capacity_mb, data_file_cache_file_capacity_mb: c.storage.data_file_cache.file_capacity_mb,