Skip to content

Commit

Permalink
feat(storage): add metrics for uploader spill task (#9951)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored May 24, 2023
1 parent 93d2fed commit a0acfdf
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 37 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

22 changes: 17 additions & 5 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1785,22 +1785,30 @@ def section_hummock(panels):
),

panels.timeseries_count(
"Merge Imm - Finished Tasks Count",
"Uploader - Tasks Count",
"",
[
panels.target(
f"sum(irate({table_metric('state_store_merge_imm_task_counts')}[$__rate_interval])) by (job,instance,table_id)",
"merge imm tasks - {{table_id}} @ {{instance}} ",
),
panels.target(
f"sum(irate({metric('state_store_spill_task_counts')}[$__rate_interval])) by (job,instance,uploader_stage)",
"Uploader spill tasks - {{uploader_stage}} @ {{instance}} ",
),
],
),
panels.timeseries_bytes(
"Merge Imm - Finished Task Memory Size",
"Uploader - Task Size",
"",
[
panels.target(
f"sum(rate({table_metric('state_store_merge_imm_memory_sz')}[$__rate_interval])) by (job,instance,table_id)",
"tasks memory size - {{table_id}} @ {{instance}} ",
"Merging tasks memory size - {{table_id}} @ {{instance}} ",
),
panels.target(
f"sum(rate({metric('state_store_spill_task_size')}[$__rate_interval])) by (job,instance,uploader_stage)",
"Uploading tasks size - {{uploader_stage}} @ {{instance}} ",
),
],
),
Expand Down Expand Up @@ -1864,11 +1872,11 @@ def section_hummock(panels):
"",
[
panels.target(
f"sum(rate({table_metric('state_store_write_batch_size_sum')}[$__rate_interval]))by(job,instance) / sum(rate({table_metric('state_store_write_batch_size_count')}[$__rate_interval]))by(job,instance,table_id)",
f"sum(rate({table_metric('state_store_write_batch_size_sum')}[$__rate_interval]))by(job,instance,table_id) / sum(rate({table_metric('state_store_write_batch_size_count')}[$__rate_interval]))by(job,instance,table_id)",
"shared_buffer - {{table_id}} @ {{job}} @ {{instance}}",
),
panels.target(
f"sum(rate({metric('compactor_shared_buffer_to_sstable_size')}[$__rate_interval]))by(job,instance) / sum(rate({metric('state_store_shared_buffer_to_sstable_size_count')}[$__rate_interval]))by(job,instance)",
f"sum(rate({metric('compactor_shared_buffer_to_sstable_size_sum')}[$__rate_interval]))by(job,instance) / sum(rate({metric('compactor_shared_buffer_to_sstable_size_count')}[$__rate_interval]))by(job,instance)",
"sync - {{job}} @ {{instance}}",
),
],
Expand Down Expand Up @@ -1905,6 +1913,10 @@ def section_hummock(panels):
),
panels.target(
f"sum({metric('state_store_limit_memory_size')}) by (job,instance)",
"Memory limiter usage - {{job}} @ {{instance}}",
),
panels.target(
f"sum({metric('state_store_uploader_uploading_task_size')}) by (job,instance)",
"uploading memory - {{job}} @ {{instance}}",
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

34 changes: 23 additions & 11 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

use std::collections::{BTreeMap, HashMap};
use std::ops::DerefMut;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use arc_swap::ArcSwap;
use await_tree::InstrumentAwait;
use futures::future::{select, Either};
use futures::FutureExt;
use parking_lot::RwLock;
use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::HummockVersionUpdateExt;
use risingwave_hummock_sdk::{info_in_release, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::version_update_payload::Payload;
Expand Down Expand Up @@ -53,27 +53,37 @@ use crate::store::SyncResult;
pub struct BufferTracker {
flush_threshold: usize,
global_buffer: Arc<MemoryLimiter>,
global_upload_task_size: Arc<AtomicUsize>,
global_upload_task_size: GenericGauge<AtomicU64>,
}

impl BufferTracker {
pub fn from_storage_opts(config: &StorageOpts) -> Self {
pub fn from_storage_opts(
config: &StorageOpts,
global_upload_task_size: GenericGauge<AtomicU64>,
) -> Self {
let capacity = config.shared_buffer_capacity_mb * (1 << 20);
let flush_threshold = capacity * 4 / 5;
Self::new(capacity, flush_threshold)
Self::new(capacity, flush_threshold, global_upload_task_size)
}

pub fn new(capacity: usize, flush_threshold: usize) -> Self {
pub fn new(
capacity: usize,
flush_threshold: usize,
global_upload_task_size: GenericGauge<AtomicU64>,
) -> Self {
assert!(capacity >= flush_threshold);
Self {
flush_threshold,
global_buffer: Arc::new(MemoryLimiter::new(capacity as u64)),
global_upload_task_size: Arc::new(AtomicUsize::new(0)),
global_upload_task_size,
}
}

pub fn for_test() -> Self {
Self::from_storage_opts(&StorageOpts::default())
Self::from_storage_opts(
&StorageOpts::default(),
GenericGauge::new("test", "test").unwrap(),
)
}

pub fn get_buffer_size(&self) -> usize {
Expand All @@ -84,15 +94,14 @@ impl BufferTracker {
&self.global_buffer
}

pub fn global_upload_task_size(&self) -> &Arc<AtomicUsize> {
pub fn global_upload_task_size(&self) -> &GenericGauge<AtomicU64> {
&self.global_upload_task_size
}

/// Return true when the buffer size minus current upload task size is still greater than the
/// flush threshold.
pub fn need_more_flush(&self) -> bool {
self.get_buffer_size()
> self.flush_threshold + self.global_upload_task_size.load(Ordering::Relaxed)
self.get_buffer_size() > self.flush_threshold + self.global_upload_task_size.get() as usize
}
}

Expand Down Expand Up @@ -145,7 +154,10 @@ impl HummockEventHandler {
tokio::sync::watch::channel(pinned_version.max_committed_epoch());
let version_update_notifier_tx = Arc::new(version_update_notifier_tx);
let read_version_mapping = Arc::new(RwLock::new(HashMap::default()));
let buffer_tracker = BufferTracker::from_storage_opts(&compactor_context.storage_opts);
let buffer_tracker = BufferTracker::from_storage_opts(
&compactor_context.storage_opts,
state_store_metrics.uploader_uploading_task_size.clone(),
);
let max_preload_wait_time_mill = compactor_context.storage_opts.max_preload_wait_time_mill;
let write_conflict_detector =
ConflictDetector::new_from_config(&compactor_context.storage_opts);
Expand Down
54 changes: 36 additions & 18 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ use std::future::Future;
use std::mem::swap;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::task::{ready, Context, Poll};

use futures::future::{try_join_all, TryJoinAll};
use futures::FutureExt;
use itertools::Itertools;
use prometheus::core::{AtomicU64, GenericGauge};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::EPOCH_LEN;
use risingwave_hummock_sdk::{info_in_release, CompactionGroupId, HummockEpoch, LocalSstableInfo};
Expand Down Expand Up @@ -78,7 +77,7 @@ struct UploadingTask {
join_handle: JoinHandle<HummockResult<UploadTaskOutput>>,
task_info: UploadTaskInfo,
spawn_upload_task: SpawnUploadTask,
task_size_guard: Arc<AtomicUsize>,
task_size_guard: GenericGauge<AtomicU64>,
}

pub struct MergeImmTaskOutput {
Expand Down Expand Up @@ -138,8 +137,7 @@ impl Future for MergingImmTask {

impl Drop for UploadingTask {
fn drop(&mut self) {
self.task_size_guard
.fetch_sub(self.task_info.task_size, Relaxed);
self.task_size_guard.sub(self.task_info.task_size as u64);
}
}

Expand Down Expand Up @@ -175,7 +173,7 @@ impl UploadingTask {
context
.buffer_tracker
.global_upload_task_size()
.fetch_add(task_size, Relaxed);
.add(task_size as u64);
info_in_release!("start upload task: {:?}", task_info);
let join_handle = (context.spawn_upload_task)(payload.clone(), task_info.clone());
Self {
Expand Down Expand Up @@ -291,8 +289,13 @@ impl UnsealedEpochData {
fn flush(&mut self, context: &UploaderContext) {
let imms = self.imms.drain(..).collect_vec();
if !imms.is_empty() {
self.spilled_data
.add_task(UploadingTask::new(imms, context));
let task = UploadingTask::new(imms, context);
context.stats.spill_task_counts_from_unsealed.inc();
context
.stats
.spill_task_size_from_unsealed
.inc_by(task.task_info.task_size as u64);
self.spilled_data.add_task(task);
}
}
}
Expand Down Expand Up @@ -407,7 +410,7 @@ impl SealedData {
}

// Flush can be triggered by either a sync_epoch or a spill (`may_flush`) request.
fn flush(&mut self, context: &UploaderContext) {
fn flush(&mut self, context: &UploaderContext, is_spilled: bool) {
// drop unfinished merging tasks
self.drop_merging_tasks();

Expand Down Expand Up @@ -436,8 +439,15 @@ impl SealedData {
.collect_vec();

if !payload.is_empty() {
self.spilled_data
.add_task(UploadingTask::new(payload, context));
let task = UploadingTask::new(payload, context);
if is_spilled {
context.stats.spill_task_counts_from_sealed.inc();
context
.stats
.spill_task_size_from_sealed
.inc_by(task.task_info.task_size as u64);
}
self.spilled_data.add_task(task);
}
}

Expand Down Expand Up @@ -516,6 +526,8 @@ struct UploaderContext {
imm_merge_threshold: usize,

compaction_executor: Arc<CompactionExecutor>,

stats: Arc<HummockStateStoreMetrics>,
}

impl UploaderContext {
Expand All @@ -525,13 +537,15 @@ impl UploaderContext {
buffer_tracker: BufferTracker,
config: &StorageOpts,
compaction_executor: Arc<CompactionExecutor>,
stats: Arc<HummockStateStoreMetrics>,
) -> Self {
UploaderContext {
pinned_version,
spawn_upload_task,
buffer_tracker,
imm_merge_threshold: config.imm_merge_threshold,
compaction_executor,
stats,
}
}
}
Expand All @@ -550,7 +564,6 @@ impl UploaderContext {
/// Data are mostly stored in `VecDeque`, and the order stored in the `VecDeque` indicates the data
/// order. Data at the front represents ***newer*** data.
pub struct HummockUploader {
stats: Arc<HummockStateStoreMetrics>,
/// The maximum epoch that is sealed
max_sealed_epoch: HummockEpoch,
/// The maximum epoch that has started syncing
Expand Down Expand Up @@ -588,7 +601,6 @@ impl HummockUploader {
) -> Self {
let initial_epoch = pinned_version.version().max_committed_epoch;
Self {
stats: state_store_metrics,
max_sealed_epoch: initial_epoch,
max_syncing_epoch: initial_epoch,
max_synced_epoch: initial_epoch,
Expand All @@ -602,6 +614,7 @@ impl HummockUploader {
buffer_tracker,
config,
compaction_executor,
state_store_metrics,
),
}
}
Expand Down Expand Up @@ -748,7 +761,7 @@ impl HummockUploader {

// flush imms to SST file, the output SSTs will be uploaded to object store
// return unfinished merging task
self.sealed_data.flush(&self.context);
self.sealed_data.flush(&self.context, false);

let SealedData {
epochs,
Expand Down Expand Up @@ -837,7 +850,7 @@ impl HummockUploader {

pub(crate) fn may_flush(&mut self) {
if self.context.buffer_tracker.need_more_flush() {
self.sealed_data.flush(&self.context);
self.sealed_data.flush(&self.context, true);
}

if self.context.buffer_tracker.need_more_flush() {
Expand Down Expand Up @@ -939,13 +952,15 @@ impl HummockUploader {
let shard_id_label = output.instance_id.to_string();

// monitor finished task
self.stats
self.context
.stats
.merge_imm_task_counts
.with_label_values(&[table_id_label.as_str(), shard_id_label.as_str()])
.inc();
// monitor merge imm memory size
// we should also add up the size of EPOCH stored in each entry
self.stats
self.context
.stats
.merge_imm_batch_memory_sz
.with_label_values(&[table_id_label.as_str(), shard_id_label.as_str()])
.inc_by((output.merged_imm.size() + output.merged_imm.kv_count() * EPOCH_LEN) as _);
Expand Down Expand Up @@ -1006,6 +1021,7 @@ mod tests {
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
use prometheus::core::GenericGauge;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::{FullKey, TableKey};
use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo};
Expand Down Expand Up @@ -1121,6 +1137,7 @@ mod tests {
BufferTracker::for_test(),
&config,
compaction_executor,
Arc::new(HummockStateStoreMetrics::unused()),
)
}

Expand Down Expand Up @@ -1578,7 +1595,8 @@ mod tests {
impl Fn(Vec<ImmId>) -> (BoxFuture<'static, ()>, oneshot::Sender<()>),
) {
// flush threshold is 0. Flush anyway
let buffer_tracker = BufferTracker::new(usize::MAX, 0);
let buffer_tracker =
BufferTracker::new(usize::MAX, 0, GenericGauge::new("test", "test").unwrap());
// (the started task send the imm ids of payload, the started task wait for finish notify)
#[allow(clippy::type_complexity)]
let task_notifier_holder: Arc<
Expand Down
Loading

0 comments on commit a0acfdf

Please sign in to comment.