Skip to content

Commit

Permalink
refactor(meta): break hummock manager Versioning into smaller structs
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed May 13, 2024
1 parent cb8a92a commit 9d21f18
Show file tree
Hide file tree
Showing 13 changed files with 412 additions and 404 deletions.
16 changes: 0 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 14 additions & 11 deletions src/common/metrics/src/monitor/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,34 @@ use prometheus::HistogramVec;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

pub struct MonitoredRwLock<T> {
// labels: [lock_name, lock_type]
metrics: HistogramVec,
inner: RwLock<T>,
lock_name: &'static str,
}

impl<T> MonitoredRwLock<T> {
pub fn new(metrics: HistogramVec, val: T) -> Self {
pub fn new(metrics: HistogramVec, val: T, lock_name: &'static str) -> Self {
Self {
metrics,
inner: RwLock::new(val),
lock_name,
}
}

pub async fn read<'a, 'b>(
&'a self,
label_values: &'b [&'static str],
) -> RwLockReadGuard<'a, T> {
let _timer = self.metrics.with_label_values(label_values).start_timer();
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
let _timer = self
.metrics
.with_label_values(&[self.lock_name, "read"])
.start_timer();
self.inner.read().await
}

pub async fn write<'a, 'b>(
&'a self,
label_values: &'b [&'static str],
) -> RwLockWriteGuard<'a, T> {
let _timer = self.metrics.with_label_values(label_values).start_timer();
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
let _timer = self
.metrics
.with_label_values(&[self.lock_name, "write"])
.start_timer();
self.inner.write().await
}
}
1 change: 0 additions & 1 deletion src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ either = "1"
enum-as-inner = "0.6"
etcd-client = { workspace = true }
fail = "0.5"
function_name = "0.3.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
hex = "0.4"
hyper = "0.14" # required by tonic
Expand Down
22 changes: 10 additions & 12 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::ops::Bound::{Excluded, Included};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering;

use function_name::named;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
object_size_map, summarize_group_deltas,
};
Expand All @@ -32,7 +31,6 @@ use tracing::warn;

use crate::hummock::error::Result;
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::manager::{read_lock, write_lock};
use crate::hummock::metrics_utils::trigger_gc_stat;
use crate::hummock::HummockManager;

Expand Down Expand Up @@ -122,11 +120,10 @@ impl HummockManager {
/// Returns the diff between new and old checkpoint id.
/// Note that this method must not be called concurrently, because internally it doesn't hold
/// lock throughout the method.
#[named]
pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
let timer = self.metrics.version_checkpoint_latency.start_timer();
// 1. hold read lock and create new checkpoint
let versioning_guard = read_lock!(self, versioning).await;
let versioning_guard = self.versioning.read().await;
let versioning: &Versioning = versioning_guard.deref();
let current_version: &HummockVersion = &versioning.current_version;
let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
Expand All @@ -137,9 +134,10 @@ impl HummockManager {
}
if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
drop(versioning_guard);
let mut versioning = write_lock!(self, versioning).await;
versioning.mark_objects_for_deletion();
let min_pinned_version_id = versioning.min_pinned_version_id();
let versioning = self.versioning.read().await;
let context_info = self.context_info.read().await;
versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker);
let min_pinned_version_id = context_info.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
return Ok(0);
}
Expand Down Expand Up @@ -224,16 +222,17 @@ impl HummockManager {
}
}
// 3. hold write lock and update in memory state
let mut versioning_guard = write_lock!(self, versioning).await;
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
versioning.checkpoint = new_checkpoint;
// Not delete stale objects when archive is enabled
if !self.env.opts.enable_hummock_data_archive {
versioning.mark_objects_for_deletion();
versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker);
}

let min_pinned_version_id = versioning.min_pinned_version_id();
let min_pinned_version_id = context_info.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
drop(versioning_guard);
timer.observe_duration();
Expand All @@ -259,9 +258,8 @@ impl HummockManager {
self.pause_version_checkpoint.load(Ordering::Relaxed)
}

#[named]
pub async fn get_checkpoint_version(&self) -> HummockVersion {
let versioning_guard = read_lock!(self, versioning).await;
let versioning_guard = self.versioning.read().await;
versioning_guard.checkpoint.version.clone()
}
}
20 changes: 6 additions & 14 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;

use function_name::named;
use futures::future::Shared;
use itertools::Itertools;
use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId};
Expand All @@ -37,7 +36,7 @@ use tokio::sync::oneshot::Receiver as OneShotReceiver;
use crate::hummock::compaction::selector::level_selector::PickerInfo;
use crate::hummock::compaction::selector::DynamicLevelSelectorCore;
use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector};
use crate::hummock::manager::{init_selectors, read_lock};
use crate::hummock::manager::init_selectors;
use crate::hummock::HummockManager;

const MAX_SKIP_TIMES: usize = 8;
Expand All @@ -54,17 +53,12 @@ pub struct Compaction {
}

impl HummockManager {
#[named]
pub async fn get_assigned_compact_task_num(&self) -> u64 {
read_lock!(self, compaction)
.await
.compact_task_assignment
.len() as u64
self.compaction.read().await.compact_task_assignment.len() as u64
}

#[named]
pub async fn list_all_tasks_ids(&self) -> Vec<HummockCompactionTaskId> {
let compaction = read_lock!(self, compaction).await;
let compaction = self.compaction.read().await;

compaction
.compaction_statuses
Expand All @@ -77,11 +71,10 @@ impl HummockManager {
.collect_vec()
}

#[named]
pub async fn list_compaction_status(
&self,
) -> (Vec<PbCompactStatus>, Vec<CompactTaskAssignment>) {
let compaction = read_lock!(self, compaction).await;
let compaction = self.compaction.read().await;
(
compaction.compaction_statuses.values().map_into().collect(),
compaction
Expand All @@ -92,14 +85,13 @@ impl HummockManager {
)
}

#[named]
pub async fn get_compaction_scores(
&self,
compaction_group_id: CompactionGroupId,
) -> Vec<PickerInfo> {
let (status, levels, group) = {
let compaction = read_lock!(self, compaction).await;
let versioning = read_lock!(self, versioning).await;
let compaction = self.compaction.read().await;
let versioning = self.versioning.read().await;
let config_manager = self.compaction_group_manager.read().await;
match (
compaction.compaction_statuses.get(&compaction_group_id),
Expand Down
Loading

0 comments on commit 9d21f18

Please sign in to comment.