Skip to content

Commit

Permalink
refactor(storage): add metrics for uploader memory and no instance id…
Browse files Browse the repository at this point in the history
… in imm (#17104)
  • Loading branch information
wenym1 authored Jun 5, 2024
1 parent 25a9b45 commit e905000
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 117 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,23 @@ def section_hummock_write(outer_panels):
"uploading task size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"uploader imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL}) - "
f"sum({metric('state_store_uploader_uploading_task_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"unflushed imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('uploading_memory_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL}) - "
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"orphan imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_old_value_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"old value size - {{%s}} @ {{%s}}"
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/storage/benches/bench_imm_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) {
&batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
let imm = merge_imms_in_memory(TableId::default(), batches.clone(), None).await;
assert_eq!(imm.key_count(), 10000 * 100);
assert_eq!(imm.value_count(), 10000 * 100);
})
Expand All @@ -72,7 +72,7 @@ fn criterion_benchmark(c: &mut Criterion) {
&later_batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
let imm = merge_imms_in_memory(TableId::default(), batches.clone(), None).await;
assert_eq!(imm.key_count(), 2000 * 100);
assert_eq!(imm.value_count(), 2000 * 100 * 5);
})
Expand Down
13 changes: 5 additions & 8 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorMana
use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
use crate::hummock::compactor::context::{await_tree_key, CompactorContext};
use crate::hummock::compactor::{check_flush_result, CompactOutput, Compactor};
use crate::hummock::event_handler::uploader::{UploadTaskOutput, UploadTaskPayload};
use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::event_handler::uploader::UploadTaskOutput;
use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
Expand All @@ -59,11 +58,11 @@ const GC_WATERMARK_FOR_FLUSH: u64 = 0;
pub async fn compact(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
payload: UploadTaskPayload,
payload: Vec<ImmutableMemtable>,
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
filter_key_extractor_manager: FilterKeyExtractorManager,
) -> HummockResult<UploadTaskOutput> {
let mut grouped_payload: HashMap<CompactionGroupId, UploadTaskPayload> = HashMap::new();
let mut grouped_payload: HashMap<CompactionGroupId, Vec<ImmutableMemtable>> = HashMap::new();
for imm in &payload {
let compaction_group_id = match compaction_group_index.get(&imm.table_id) {
// compaction group id is used only as a hint for grouping different data.
Expand Down Expand Up @@ -144,7 +143,7 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
filter_key_extractor_manager: FilterKeyExtractorManager,
mut payload: UploadTaskPayload,
mut payload: Vec<ImmutableMemtable>,
) -> HummockResult<Vec<LocalSstableInfo>> {
if !IS_NEW_VALUE {
assert!(payload.iter().all(|imm| imm.has_old_value()));
Expand Down Expand Up @@ -321,7 +320,6 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
/// Merge multiple batches into a larger one
pub async fn merge_imms_in_memory(
table_id: TableId,
instance_id: LocalInstanceId,
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> ImmutableMemtable {
Expand Down Expand Up @@ -456,13 +454,12 @@ pub async fn merge_imms_in_memory(
memory_tracker,
)),
table_id,
instance_id,
}
}

/// Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
fn generate_splits(
payload: &UploadTaskPayload,
payload: &Vec<ImmutableMemtable>,
existing_table_ids: &HashSet<u32>,
storage_opts: &StorageOpts,
) -> (Vec<KeyRange>, u64, BTreeMap<u32, u32>) {
Expand Down
19 changes: 9 additions & 10 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ use crate::hummock::compactor::{await_tree_key, compact, CompactorContext};
use crate::hummock::conflict_detector::ConflictDetector;
use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask};
use crate::hummock::event_handler::uploader::{
HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput,
UploadTaskPayload, UploaderEvent,
HummockUploader, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskOutput, UploaderEvent,
};
use crate::hummock::event_handler::{
HummockEvent, HummockReadVersionRef, HummockVersionUpdate, ReadOnlyReadVersionMapping,
Expand All @@ -58,6 +57,7 @@ use crate::hummock::utils::validate_table_key_range;
use crate::hummock::{
HummockError, HummockResult, MemoryLimiter, SstableObjectIdManager, SstableStoreRef, TrackerId,
};
use crate::mem_table::ImmutableMemtable;
use crate::monitor::HummockStateStoreMetrics;
use crate::opts::StorageOpts;

Expand Down Expand Up @@ -213,7 +213,7 @@ pub struct HummockEventHandler {
}

async fn flush_imms(
payload: UploadTaskPayload,
payload: Vec<ImmutableMemtable>,
task_info: UploadTaskInfo,
compactor_context: CompactorContext,
filter_key_extractor_manager: FilterKeyExtractorManager,
Expand Down Expand Up @@ -278,8 +278,8 @@ impl HummockEventHandler {
let _timer = upload_task_latency.start_timer();
let mut output = flush_imms(
payload
.values()
.flat_map(|imms| imms.iter().cloned())
.into_values()
.flat_map(|imms| imms.into_iter())
.collect(),
task_info,
upload_compactor_context.clone(),
Expand Down Expand Up @@ -872,15 +872,14 @@ impl HummockEventHandler {
HummockEvent::Shutdown => {
unreachable!("shutdown is handled specially")
}
HummockEvent::ImmToUploader(imm) => {
HummockEvent::ImmToUploader { instance_id, imm } => {
assert!(
self.local_read_version_mapping
.contains_key(&imm.instance_id),
self.local_read_version_mapping.contains_key(&instance_id),
"add imm from non-existing read version instance: instance_id: {}, table_id {}",
imm.instance_id,
instance_id,
imm.table_id,
);
self.uploader.add_imm(imm);
self.uploader.add_imm(instance_id, imm);
self.uploader.may_flush();
}

Expand Down
11 changes: 7 additions & 4 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;

use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
use crate::hummock::HummockResult;
use crate::mem_table::ImmutableMemtable;
use crate::store::SealCurrentEpochOptions;
Expand Down Expand Up @@ -67,7 +67,10 @@ pub enum HummockEvent {

Shutdown,

ImmToUploader(ImmutableMemtable),
ImmToUploader {
instance_id: SharedBufferBatchId,
imm: ImmutableMemtable,
},

SealEpoch {
epoch: HummockEpoch,
Expand Down Expand Up @@ -113,8 +116,8 @@ impl HummockEvent {

HummockEvent::Shutdown => "Shutdown".to_string(),

HummockEvent::ImmToUploader(imm) => {
format!("ImmToUploader {:?}", imm)
HummockEvent::ImmToUploader { instance_id, imm } => {
format!("ImmToUploader {} {}", instance_id, imm.batch_id())
}

HummockEvent::SealEpoch {
Expand Down
Loading

0 comments on commit e905000

Please sign in to comment.