Skip to content

Commit

Permalink
feat(storage): support max version pinning duration (#13340)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 14, 2023
1 parent 69ed227 commit 4a5a331
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 8 deletions.
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,9 @@ pub struct StorageConfig {
#[serde(default = "default::storage::max_preload_wait_time_mill")]
pub max_preload_wait_time_mill: u64,

#[serde(default = "default::storage::max_version_pinning_duration_sec")]
pub max_version_pinning_duration_sec: u64,

#[serde(default = "default::storage::object_store_streaming_read_timeout_ms")]
pub object_store_streaming_read_timeout_ms: u64,
#[serde(default = "default::storage::object_store_streaming_upload_timeout_ms")]
Expand Down Expand Up @@ -1067,6 +1070,10 @@ pub mod default {
0
}

pub fn max_version_pinning_duration_sec() -> u64 {
3 * 3600
}

pub fn object_store_streaming_read_timeout_ms() -> u64 {
10 * 60 * 1000
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ min_sst_size_for_streaming_upload = 33554432
max_sub_compaction = 4
max_concurrent_compaction_task_number = 16
max_preload_wait_time_mill = 0
max_version_pinning_duration_sec = 10800
object_store_streaming_read_timeout_ms = 600000
object_store_streaming_upload_timeout_ms = 600000
object_store_upload_timeout_ms = 3600000
Expand Down
8 changes: 7 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ impl HummockManager {

/// Unpin all pins which belongs to `context_id` and has an id which is older than
/// `unpin_before`. All versions >= `unpin_before` will be treated as if they are all pinned by
/// this `context_id` so they will not be vacummed.
/// this `context_id` so they will not be vacuumed.
#[named]
pub async fn unpin_version_before(
&self,
Expand All @@ -646,6 +646,12 @@ impl HummockManager {
min_pinned_id: 0,
},
);
assert!(
context_pinned_version.min_pinned_id <= unpin_before,
"val must be monotonically non-decreasing. old = {}, new = {}.",
context_pinned_version.min_pinned_id,
unpin_before
);
context_pinned_version.min_pinned_id = unpin_before;
commit_multi_var!(
self,
Expand Down
31 changes: 24 additions & 7 deletions src/storage/src/hummock/local_version/pinned_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, HashMap};
use std::iter::empty;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use auto_enums::auto_enum;
use risingwave_common::catalog::TableId;
Expand Down Expand Up @@ -167,6 +167,7 @@ impl PinnedVersion {
pub(crate) async fn start_pinned_version_worker(
mut rx: UnboundedReceiver<PinVersionAction>,
hummock_meta_client: Arc<dyn HummockMetaClient>,
max_version_pinning_duration_sec: u64,
) {
let min_execute_interval = Duration::from_millis(1000);
let max_retry_interval = Duration::from_secs(10);
Expand All @@ -180,21 +181,34 @@ pub(crate) async fn start_pinned_version_worker(
min_execute_interval_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut need_unpin = false;

let mut version_ids_in_use: BTreeMap<u64, usize> = BTreeMap::new();

let mut version_ids_in_use: BTreeMap<u64, (usize, Instant)> = BTreeMap::new();
let max_version_pinning_duration_sec = Duration::from_secs(max_version_pinning_duration_sec);
// For each run in the loop, accumulate versions to unpin and call unpin RPC once.
loop {
min_execute_interval_tick.tick().await;
// 0. Expire versions.
while version_ids_in_use.len() > 1 && let Some(e) = version_ids_in_use.first_entry() {
if e.get().1.elapsed() < max_version_pinning_duration_sec {
break;
}
need_unpin = true;
e.remove();
}

// 1. Collect new versions to unpin.
let mut versions_to_unpin = vec![];
let inst = Instant::now();
'collect: loop {
match rx.try_recv() {
Ok(version_action) => match version_action {
PinVersionAction::Pin(version_id) => {
version_ids_in_use
.entry(version_id)
.and_modify(|counter| *counter += 1)
.or_insert(1);
.and_modify(|e| {
e.0 += 1;
e.1 = inst;
})
.or_insert((1, inst));
}
PinVersionAction::Unpin(version_id) => {
versions_to_unpin.push(version_id);
Expand All @@ -220,13 +234,16 @@ pub(crate) async fn start_pinned_version_worker(

for version in &versions_to_unpin {
match version_ids_in_use.get_mut(version) {
Some(counter) => {
Some((counter, _)) => {
*counter -= 1;
if *counter == 0 {
version_ids_in_use.remove(version);
}
}
None => tracing::warn!("version {} to unpin dose not exist", version),
None => tracing::warn!(
"version {} to unpin does not exist, may already be unpinned due to expiration",
version
),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl HummockStorage {
tokio::spawn(start_pinned_version_worker(
pin_version_rx,
hummock_meta_client.clone(),
options.max_version_pinning_duration_sec,
));
let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager(
filter_key_extractor_manager.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct StorageOpts {
/// Max sub compaction task numbers
pub max_sub_compaction: u32,
pub max_concurrent_compaction_task_number: u64,
pub max_version_pinning_duration_sec: u64,

pub data_file_cache_dir: String,
pub data_file_cache_capacity_mb: usize,
Expand Down Expand Up @@ -164,6 +165,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
min_sst_size_for_streaming_upload: c.storage.min_sst_size_for_streaming_upload,
max_sub_compaction: c.storage.max_sub_compaction,
max_concurrent_compaction_task_number: c.storage.max_concurrent_compaction_task_number,
max_version_pinning_duration_sec: c.storage.max_version_pinning_duration_sec,
data_file_cache_dir: c.storage.data_file_cache.dir.clone(),
data_file_cache_capacity_mb: c.storage.data_file_cache.capacity_mb,
data_file_cache_file_capacity_mb: c.storage.data_file_cache.file_capacity_mb,
Expand Down

0 comments on commit 4a5a331

Please sign in to comment.