Skip to content

Commit

Permalink
feat(storage): collect and report iter log local metrics (#17862)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jul 31, 2024
1 parent a9859ed commit 8df1aec
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/storage/src/hummock/event_handler/refiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl CacheRefillTask {

let now = Instant::now();
let res = context.sstable_store.sstable(info, &mut stats).await;
stats.discard();
GLOBAL_CACHE_REFILL_METRICS
.meta_refill_success_duration
.observe(now.elapsed().as_secs_f64());
Expand Down
15 changes: 15 additions & 0 deletions src/storage/src/hummock/iterator/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::error::StorageResult;
use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator};
use crate::hummock::value::HummockValue;
use crate::hummock::{HummockResult, SstableIterator};
use crate::monitor::IterLocalMetricsGuard;
use crate::store::{ChangeLogValue, StateStoreReadLogItem, StateStoreReadLogItemRef};
use crate::StateStoreIter;

Expand Down Expand Up @@ -341,9 +342,21 @@ impl<NI: HummockIterator<Direction = Forward>, OI: HummockIterator<Direction = F
}
}

impl Drop for ChangeLogIterator {
fn drop(&mut self) {
self.inner
.new_value_iter
.collect_local_statistic(&mut self.stats_guard.local_stats);
self.inner
.old_value_iter
.collect_local_statistic(&mut self.stats_guard.local_stats);
}
}

pub struct ChangeLogIterator {
inner: ChangeLogIteratorInner<MergeIterator<SstableIterator>, MergeIterator<SstableIterator>>,
initial_read: bool,
stats_guard: IterLocalMetricsGuard,
}

impl ChangeLogIterator {
Expand All @@ -353,6 +366,7 @@ impl ChangeLogIterator {
new_value_iter: MergeIterator<SstableIterator>,
old_value_iter: MergeIterator<SstableIterator>,
table_id: TableId,
stats_guard: IterLocalMetricsGuard,
) -> HummockResult<Self> {
let user_key_range_ref = bound_table_key_range(table_id, &table_key_range);
let (start_bound, end_bound) = (
Expand All @@ -369,6 +383,7 @@ impl ChangeLogIterator {
Ok(Self {
inner,
initial_read: false,
stats_guard,
})
}
}
Expand Down
29 changes: 23 additions & 6 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ use crate::hummock::{
use crate::mem_table::{
ImmId, ImmutableMemtable, MemTableHummockIterator, MemTableHummockRevIterator,
};
use crate::monitor::{GetLocalMetricsGuard, HummockStateStoreMetrics, StoreLocalStatistic};
use crate::monitor::{
GetLocalMetricsGuard, HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic,
};
use crate::store::{gen_min_epoch, ReadLogOptions, ReadOptions};

pub type CommittedVersion = PinnedVersion;
Expand Down Expand Up @@ -995,30 +997,39 @@ impl HummockVersionReader {
ssts: impl Iterator<Item = &SstableInfo>,
sstable_store: &SstableStoreRef,
read_options: Arc<SstableIteratorReadOptions>,
local_stat: &mut StoreLocalStatistic,
) -> HummockResult<MergeIterator<SstableIterator>> {
let iters = try_join_all(ssts.map(|sst| {
let sstable_store = sstable_store.clone();
let read_options = read_options.clone();
async move {
let mut local_stat = StoreLocalStatistic::default();
let table_holder = sstable_store.sstable(sst, &mut local_stat).await?;
Ok::<_, HummockError>(SstableIterator::new(
table_holder,
sstable_store,
read_options,
Ok::<_, HummockError>((
SstableIterator::new(table_holder, sstable_store, read_options),
local_stat,
))
}
}))
.await?;
Ok::<_, HummockError>(MergeIterator::new(iters))
Ok::<_, HummockError>(MergeIterator::new(iters.into_iter().map(
|(iter, stats)| {
local_stat.add(&stats);
iter
},
)))
}

let mut local_stat = StoreLocalStatistic::default();

let new_value_iter = make_iter(
change_log
.iter()
.flat_map(|log| log.new_value.iter())
.filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
&self.sstable_store,
read_options.clone(),
&mut local_stat,
)
.await?;
let old_value_iter = make_iter(
Expand All @@ -1028,6 +1039,7 @@ impl HummockVersionReader {
.filter(|sst| filter_single_sst(sst, options.table_id, &key_range)),
&self.sstable_store,
read_options.clone(),
&mut local_stat,
)
.await?;
ChangeLogIterator::new(
Expand All @@ -1036,6 +1048,11 @@ impl HummockVersionReader {
new_value_iter,
old_value_iter,
options.table_id,
IterLocalMetricsGuard::new(
self.state_store_metrics.clone(),
options.table_id,
local_stat,
),
)
.await
}
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/monitor/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ impl StoreLocalStatistic {
}
}

pub fn discard(self) {
#[cfg(all(debug_assertions, not(any(madsim, test, feature = "test"))))]
{
self.reported.fetch_or(true, Ordering::Relaxed);
}
}

pub fn report_compactor(&self, metrics: &CompactorMetrics) {
let t = self.remote_io_time.load(Ordering::Relaxed) as f64;
if t > 0.0 {
Expand Down

0 comments on commit 8df1aec

Please sign in to comment.