Skip to content

Commit

Permalink
feat(storage): add metrics to monitor old value size (#16559)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Apr 30, 2024
1 parent 1c65460 commit 5b40139
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 31 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2229,6 +2229,11 @@ def section_hummock_write(outer_panels):
"uploading task size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_old_value_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"old value size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
],
),
panels.timeseries_latency(
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions src/storage/hummock_test/src/hummock_read_version_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn test_read_version_basic() {
{
// single imm
let sorted_items = gen_dummy_batch(1);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None).0;
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
Expand Down Expand Up @@ -87,7 +87,7 @@ async fn test_read_version_basic() {
for i in 0..5 {
epoch.inc_epoch();
let sorted_items = gen_dummy_batch(i + 2);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None).0;
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
Expand Down Expand Up @@ -275,7 +275,7 @@ async fn test_read_filter_basic() {
{
// single imm
let sorted_items = gen_dummy_batch(epoch);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None).0;
let imm = SharedBufferBatch::build_shared_buffer_batch_for_test(
epoch,
0,
Expand Down
28 changes: 27 additions & 1 deletion src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ use crate::hummock::event_handler::uploader::{UploadTaskOutput, UploadTaskPayloa
use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner, SharedBufferKeyEntry, VersionedSharedBufferValue,
SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
VersionedSharedBufferValue,
};
use crate::hummock::utils::MemoryTracker;
use crate::hummock::{
Expand Down Expand Up @@ -343,6 +344,23 @@ pub async fn merge_imms_in_memory(
// If the imm of a table id contains old value, all other imm of the same table id should have old value
assert!(imms.iter().all(|imm| imm.has_old_value() == has_old_value));

let (old_value_size, global_old_value_size) = if has_old_value {
(
imms.iter()
.map(|imm| imm.old_values().expect("has old value").size)
.sum(),
Some(
imms[0]
.old_values()
.expect("has old value")
.global_old_value_size
.clone(),
),
)
} else {
(0, None)
};

let mut imm_iters = Vec::with_capacity(imms.len());
let key_count = imms.iter().map(|imm| imm.key_count()).sum();
let value_count = imms.iter().map(|imm| imm.value_count()).sum();
Expand Down Expand Up @@ -428,6 +446,14 @@ pub async fn merge_imms_in_memory(
tokio::task::consume_budget().await;
}

let old_values = old_values.map(|old_values| {
SharedBufferBatchOldValues::new(
old_values,
old_value_size,
global_old_value_size.expect("should exist when has old value"),
)
});

SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
epochs,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ mod tests {
TableKey(Bytes::from(dummy_table_key())),
SharedBufferValue::Delete,
)];
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None);
let size = SharedBufferBatch::measure_batch_size(&sorted_items, None).0;
let tracker = match limiter {
Some(limiter) => Some(limiter.require_memory(size as u64).await),
None => None,
Expand Down
80 changes: 58 additions & 22 deletions src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, LazyLock};

use bytes::Bytes;
use prometheus::IntGauge;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, TableKeyRange, UserKey};
Expand Down Expand Up @@ -118,13 +119,41 @@ impl SharedBufferKeyEntry {
}
}

#[derive(Debug)]
pub(crate) struct SharedBufferBatchOldValues {
/// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
/// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
values: Vec<Bytes>,
pub size: usize,
pub global_old_value_size: IntGauge,
}

impl Drop for SharedBufferBatchOldValues {
fn drop(&mut self) {
self.global_old_value_size.sub(self.size as _);
}
}

impl SharedBufferBatchOldValues {
pub(crate) fn new(values: Vec<Bytes>, size: usize, global_old_value_size: IntGauge) -> Self {
global_old_value_size.add(size as _);
Self {
values,
size,
global_old_value_size,
}
}

pub(crate) fn for_test(values: Vec<Bytes>, size: usize) -> Self {
Self::new(values, size, IntGauge::new("test", "test").unwrap())
}
}

#[derive(Debug)]
pub(crate) struct SharedBufferBatchInner {
entries: Vec<SharedBufferKeyEntry>,
new_values: Vec<VersionedSharedBufferValue>,
/// Store the old values. If some, the length should be the same as `new_values`. It contains empty `Bytes` when the
/// corresponding `new_value` is `Insert`, and contains the old values of `Update` and `Delete`.
old_values: Option<Vec<Bytes>>,
old_values: Option<SharedBufferBatchOldValues>,
/// The epochs of the data in batch, sorted in ascending order (old to new)
epochs: Vec<HummockEpoch>,
/// Total size of all key-value items (excluding the `epoch` of value versions)
Expand All @@ -140,14 +169,14 @@ impl SharedBufferBatchInner {
epoch: HummockEpoch,
spill_offset: u16,
payload: Vec<SharedBufferItem>,
old_values: Option<Vec<Bytes>>,
old_values: Option<SharedBufferBatchOldValues>,
size: usize,
_tracker: Option<MemoryTracker>,
) -> Self {
assert!(!payload.is_empty());
debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
if let Some(old_values) = &old_values {
assert_eq!(old_values.len(), payload.len());
assert_eq!(old_values.values.len(), payload.len());
}

let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
Expand Down Expand Up @@ -182,7 +211,7 @@ impl SharedBufferBatchInner {
epochs: Vec<HummockEpoch>,
entries: Vec<SharedBufferKeyEntry>,
new_values: Vec<VersionedSharedBufferValue>,
old_values: Option<Vec<Bytes>>,
old_values: Option<SharedBufferBatchOldValues>,
size: usize,
imm_id: ImmId,
tracker: Option<MemoryTracker>,
Expand Down Expand Up @@ -283,7 +312,10 @@ impl SharedBufferBatch {
epoch: HummockEpoch,
table_id: TableId,
) -> Self {
let size = Self::measure_batch_size(&sorted_items, old_values.as_deref());
let (size, old_value_size) = Self::measure_batch_size(&sorted_items, old_values.as_deref());

let old_values = old_values
.map(|old_values| SharedBufferBatchOldValues::for_test(old_values, old_value_size));

Self {
inner: Arc::new(SharedBufferBatchInner::new(
Expand Down Expand Up @@ -317,12 +349,17 @@ impl SharedBufferBatch {
.sum()
}

/// Return (total size, old value size or 0)
pub fn measure_batch_size(
batch_items: &[SharedBufferItem],
old_values: Option<&[Bytes]>,
) -> usize {
) -> (usize, usize) {
let old_value_size = old_values
.iter()
.flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
.sum::<usize>();
// size = Sum(length of full key + length of user value)
batch_items
let kv_size = batch_items
.iter()
.map(|(k, v)| {
k.len() + {
Expand All @@ -334,11 +371,8 @@ impl SharedBufferBatch {
}
}
})
.sum::<usize>()
+ old_values
.iter()
.flat_map(|slice| slice.iter().map(|value| size_of_val(value) + value.len()))
.sum::<usize>()
.sum::<usize>();
(kv_size + old_value_size, old_value_size)
}

pub fn filter<R, B>(&self, table_id: TableId, table_key_range: &R) -> bool
Expand Down Expand Up @@ -466,6 +500,10 @@ impl SharedBufferBatch {
self.inner.size
}

pub(crate) fn old_values(&self) -> Option<&SharedBufferBatchOldValues> {
self.inner.old_values.as_ref()
}

pub fn batch_id(&self) -> SharedBufferBatchId {
self.inner.batch_id
}
Expand All @@ -474,11 +512,11 @@ impl SharedBufferBatch {
&self.inner.epochs
}

pub fn build_shared_buffer_batch(
pub(crate) fn build_shared_buffer_batch(
epoch: HummockEpoch,
spill_offset: u16,
sorted_items: Vec<SharedBufferItem>,
old_values: Option<Vec<Bytes>>,
old_values: Option<SharedBufferBatchOldValues>,
size: usize,
table_id: TableId,
instance_id: LocalInstanceId,
Expand Down Expand Up @@ -693,11 +731,9 @@ impl SharedBufferBatchIterator<Forward> {
SharedBufferVersionedEntryRef {
key: &self.inner.entries[self.current_entry_idx].key,
new_values: &self.inner.new_values[self.current_value_idx..self.value_end_offset],
old_values: self
.inner
.old_values
.as_ref()
.map(|old_values| &old_values[self.current_value_idx..self.value_end_offset]),
old_values: self.inner.old_values.as_ref().map(|old_values| {
&old_values.values[self.current_value_idx..self.value_end_offset]
}),
}
}
}
Expand Down Expand Up @@ -732,7 +768,7 @@ impl<D: HummockIteratorDirection, const IS_NEW_VALUE: bool> HummockIterator
.into()
} else {
HummockValue::put(
self.inner.old_values.as_ref().unwrap()[self.current_value_idx].as_ref(),
self.inner.old_values.as_ref().unwrap().values[self.current_value_idx].as_ref(),
)
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use crate::hummock::iterator::{
ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, UserIterator,
};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator, SharedBufferItem, SharedBufferValue,
SharedBufferBatch, SharedBufferBatchIterator, SharedBufferBatchOldValues, SharedBufferItem,
SharedBufferValue,
};
use crate::hummock::store::version::{read_filter_for_version, HummockVersionReader};
use crate::hummock::utils::{
Expand Down Expand Up @@ -496,7 +497,8 @@ impl LocalHummockStorage {
.start_timer();

let imm_size = if !sorted_items.is_empty() {
let size = SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());
let (size, old_value_size) =
SharedBufferBatch::measure_batch_size(&sorted_items, old_values.as_deref());

self.write_limiter.wait_permission(self.table_id).await;
let limiter = self.memory_limiter.as_ref();
Expand All @@ -523,6 +525,14 @@ impl LocalHummockStorage {
tracker
};

let old_values = old_values.map(|old_values| {
SharedBufferBatchOldValues::new(
old_values,
old_value_size,
self.stats.old_value_size.clone(),
)
});

let instance_id = self.instance_guard.instance_id;
let imm = SharedBufferBatch::build_shared_buffer_batch(
epoch,
Expand Down
9 changes: 9 additions & 0 deletions src/storage/src/monitor/hummock_state_store_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct HummockStateStoreMetrics {

// memory
pub mem_table_spill_counts: RelabeledCounterVec,
pub old_value_size: IntGauge,

// block statistics
pub block_efficiency_histogram: RelabeledHistogramVec,
Expand Down Expand Up @@ -404,6 +405,13 @@ impl HummockStateStoreMetrics {
metric_level,
);

let old_value_size = register_int_gauge_with_registry!(
"state_store_old_value_size",
"The size of old value",
registry
)
.unwrap();

let opts = histogram_opts!(
"block_efficiency_histogram",
"Access ratio of in-memory block.",
Expand Down Expand Up @@ -462,6 +470,7 @@ impl HummockStateStoreMetrics {
uploader_syncing_epoch_count,
uploader_wait_poll_latency,
mem_table_spill_counts,
old_value_size,

block_efficiency_histogram,
event_handler_pending_event,
Expand Down

0 comments on commit 5b40139

Please sign in to comment.