From c0a1b83dbd7ddea244f8f8edac8e629c9d8a1a12 Mon Sep 17 00:00:00 2001 From: Li0k Date: Sat, 7 Oct 2023 17:21:45 +0800 Subject: [PATCH 1/2] fix(storage): fix compact_fast_runner_bytes unused --- .../src/hummock/compactor/compactor_runner.rs | 88 +++++++++++++------ src/storage/src/monitor/compactor_metrics.rs | 5 +- 2 files changed, 66 insertions(+), 27 deletions(-) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 567edc4cdd7e9..6e90ff6a23ea1 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -324,7 +324,7 @@ pub async fn compact( Err(e) => { tracing::error!("Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error {:?}", compact_task.existing_table_ids, e); let task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status); + return compact_done(compact_task, context.clone(), vec![], task_status, false); } Ok(extractor) => extractor, }; @@ -338,7 +338,7 @@ pub async fn compact( if !removed_tables.is_empty() { tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); let task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status); + return compact_done(compact_task, context.clone(), vec![], task_status, false); } } @@ -391,7 +391,13 @@ pub async fn compact( Err(e) => { tracing::warn!("Failed to generate_splits {:#?}", e); task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status); + return compact_done( + compact_task, + context.clone(), + vec![], + task_status, + optimize_by_copy_block, + ); } } } @@ -415,7 +421,13 @@ pub async fn compact( Err(err) => { tracing::warn!("Failed to build delete range aggregator {:#?}", err); task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status); + return compact_done( + compact_task, + context.clone(), + vec![], + task_status, + optimize_by_copy_block, + ); } }; @@ -433,17 +445,17 @@ pub async fn compact( ) * compact_task.splits.len() as u64; tracing::info!( - "Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}", - compact_task.compaction_group_id, - compact_task.task_id, - compact_task_statistics, - compact_task.target_level, - compact_task.compression_algorithm, - compact_task.existing_table_ids, - parallelism, - task_memory_capacity_with_parallelism, - optimize_by_copy_block - ); + "Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}", + compact_task.compaction_group_id, + compact_task.task_id, + compact_task_statistics, + compact_task.target_level, + compact_task.compression_algorithm, + compact_task.existing_table_ids, + parallelism, + task_memory_capacity_with_parallelism, + optimize_by_copy_block + ); // If the task does not have enough memory, it should cancel the task and let the meta // reschedule it, so that it does not occupy the compactor's resources. @@ -459,7 +471,13 @@ pub async fn compact( context.memory_limiter.quota() ); task_status = TaskStatus::NoAvailResourceCanceled; - return compact_done(compact_task, context.clone(), output_ssts, task_status); + return compact_done( + compact_task, + context.clone(), + output_ssts, + task_status, + optimize_by_copy_block, + ); } context.compactor_metrics.compact_task_pending_num.inc(); @@ -487,8 +505,13 @@ pub async fn compact( context.compactor_metrics.compact_task_pending_num.dec(); // After a compaction is done, mutate the compaction task. - let (compact_task, table_stats) = - compact_done(compact_task, context.clone(), output_ssts, task_status); + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + output_ssts, + task_status, + optimize_by_copy_block, + ); let cost_time = timer.stop_and_record() * 1000.0; tracing::info!( "Finished compaction task in {:?}ms: {}", @@ -583,8 +606,13 @@ pub async fn compact( } // After a compaction is done, mutate the compaction task. - let (compact_task, table_stats) = - compact_done(compact_task, context.clone(), output_ssts, task_status); + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + output_ssts, + task_status, + optimize_by_copy_block, + ); let cost_time = timer.stop_and_record() * 1000.0; tracing::info!( "Finished compaction task in {:?}ms: {}", @@ -606,6 +634,7 @@ fn compact_done( context: CompactorContext, output_ssts: Vec, task_status: TaskStatus, + optimize_by_copy_block: bool, ) -> (CompactTask, HashMap) { let mut table_stats_map = TableStatsMap::default(); compact_task.set_task_status(task_status); @@ -630,11 +659,20 @@ fn compact_done( let group_label = compact_task.compaction_group_id.to_string(); let level_label = compact_task.target_level.to_string(); - context - .compactor_metrics - .compact_write_bytes - .with_label_values(&[&group_label, level_label.as_str()]) - .inc_by(compaction_write_bytes); + + if optimize_by_copy_block { + context + .compactor_metrics + .compact_fast_runner_bytes + .with_label_values(&[&group_label, level_label.as_str()]) + .inc_by(compaction_write_bytes); + } else { + context + .compactor_metrics + .compact_write_bytes + .with_label_values(&[&group_label, level_label.as_str()]) + .inc_by(compaction_write_bytes); + } context .compactor_metrics .compact_write_sstn diff --git a/src/storage/src/monitor/compactor_metrics.rs b/src/storage/src/monitor/compactor_metrics.rs index d71fd8ac87b7a..a59c850653940 100644 --- a/src/storage/src/monitor/compactor_metrics.rs +++ b/src/storage/src/monitor/compactor_metrics.rs @@ -26,7 +26,7 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; #[derive(Debug, Clone)] pub struct CompactorMetrics { pub compaction_upload_sst_counts: GenericCounter, - pub compact_fast_runner_bytes: GenericCounter, + pub compact_fast_runner_bytes: GenericCounterVec, pub compact_write_bytes: GenericCounterVec, pub compact_read_current_level: GenericCounterVec, pub compact_read_next_level: GenericCounterVec, @@ -212,9 +212,10 @@ impl CompactorMetrics { "Total size of compaction files size that have been written to object store from shared buffer", registry ).unwrap(); - let compact_fast_runner_bytes = register_int_counter_with_registry!( + let compact_fast_runner_bytes = register_int_counter_vec_with_registry!( "compactor_fast_compact_bytes", "Total size of compaction files size of fast compactor runner", + &["group", "level_index"], registry ) .unwrap(); From f6eeb7fcc0f1e167355995d99bede557564e9a22 Mon Sep 17 00:00:00 2001 From: Li0k Date: Sun, 8 Oct 2023 17:11:49 +0800 Subject: [PATCH 2/2] fix(storage): address comments --- .../src/hummock/compactor/compactor_runner.rs | 66 ++++--------------- .../compactor/fast_compactor_runner.rs | 2 +- src/storage/src/monitor/compactor_metrics.rs | 5 +- 3 files changed, 17 insertions(+), 56 deletions(-) diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 6e90ff6a23ea1..183eac792a5ca 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -324,7 +324,7 @@ pub async fn compact( Err(e) => { tracing::error!("Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error {:?}", compact_task.existing_table_ids, e); let task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status, false); + return compact_done(compact_task, context.clone(), vec![], task_status); } Ok(extractor) => extractor, }; @@ -338,7 +338,7 @@ pub async fn compact( if !removed_tables.is_empty() { tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); let task_status = TaskStatus::ExecuteFailed; - return compact_done(compact_task, context.clone(), vec![], task_status, false); + return compact_done(compact_task, context.clone(), vec![], task_status); } } @@ -391,13 +391,7 @@ pub async fn compact( Err(e) => { tracing::warn!("Failed to generate_splits {:#?}", e); task_status = TaskStatus::ExecuteFailed; - return compact_done( - compact_task, - context.clone(), - vec![], - task_status, - optimize_by_copy_block, - ); + return compact_done(compact_task, context.clone(), vec![], task_status); } } } @@ -421,13 +415,7 @@ pub async fn compact( Err(err) => { tracing::warn!("Failed to build delete range aggregator {:#?}", err); task_status = TaskStatus::ExecuteFailed; - return compact_done( - compact_task, - context.clone(), - vec![], - task_status, - optimize_by_copy_block, - ); + return compact_done(compact_task, context.clone(), vec![], task_status); } }; @@ -471,13 +459,7 @@ pub async fn compact( context.memory_limiter.quota() ); task_status = TaskStatus::NoAvailResourceCanceled; - return compact_done( - compact_task, - context.clone(), - output_ssts, - task_status, - optimize_by_copy_block, - ); + return compact_done(compact_task, context.clone(), output_ssts, task_status); } context.compactor_metrics.compact_task_pending_num.inc(); @@ -505,13 +487,8 @@ pub async fn compact( context.compactor_metrics.compact_task_pending_num.dec(); // After a compaction is done, mutate the compaction task. - let (compact_task, table_stats) = compact_done( - compact_task, - context.clone(), - output_ssts, - task_status, - optimize_by_copy_block, - ); + let (compact_task, table_stats) = + compact_done(compact_task, context.clone(), output_ssts, task_status); let cost_time = timer.stop_and_record() * 1000.0; tracing::info!( "Finished compaction task in {:?}ms: {}", @@ -606,13 +583,8 @@ pub async fn compact( } // After a compaction is done, mutate the compaction task. - let (compact_task, table_stats) = compact_done( - compact_task, - context.clone(), - output_ssts, - task_status, - optimize_by_copy_block, - ); + let (compact_task, table_stats) = + compact_done(compact_task, context.clone(), output_ssts, task_status); let cost_time = timer.stop_and_record() * 1000.0; tracing::info!( "Finished compaction task in {:?}ms: {}", @@ -634,7 +606,6 @@ fn compact_done( context: CompactorContext, output_ssts: Vec, task_status: TaskStatus, - optimize_by_copy_block: bool, ) -> (CompactTask, HashMap) { let mut table_stats_map = TableStatsMap::default(); compact_task.set_task_status(task_status); @@ -659,20 +630,11 @@ fn compact_done( let group_label = compact_task.compaction_group_id.to_string(); let level_label = compact_task.target_level.to_string(); - - if optimize_by_copy_block { - context - .compactor_metrics - .compact_fast_runner_bytes - .with_label_values(&[&group_label, level_label.as_str()]) - .inc_by(compaction_write_bytes); - } else { - context - .compactor_metrics - .compact_write_bytes - .with_label_values(&[&group_label, level_label.as_str()]) - .inc_by(compaction_write_bytes); - } + context + .compactor_metrics + .compact_write_bytes + .with_label_values(&[&group_label, level_label.as_str()]) + .inc_by(compaction_write_bytes); context .compactor_metrics .compact_write_sstn diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index 787b12bde9c32..6dcfb0e2392cf 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -482,7 +482,7 @@ impl CompactorRunner { total_read_bytes += sst.file_size; } self.metrics - .write_build_l0_bytes + .compact_fast_runner_bytes .inc_by(skip_raw_block_size); tracing::info!( "OPTIMIZATION: skip {} blocks for task-{}, optimize {}% data compression", diff --git a/src/storage/src/monitor/compactor_metrics.rs b/src/storage/src/monitor/compactor_metrics.rs index a59c850653940..d71fd8ac87b7a 100644 --- a/src/storage/src/monitor/compactor_metrics.rs +++ b/src/storage/src/monitor/compactor_metrics.rs @@ -26,7 +26,7 @@ use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY; #[derive(Debug, Clone)] pub struct CompactorMetrics { pub compaction_upload_sst_counts: GenericCounter, - pub compact_fast_runner_bytes: GenericCounterVec, + pub compact_fast_runner_bytes: GenericCounter, pub compact_write_bytes: GenericCounterVec, pub compact_read_current_level: GenericCounterVec, pub compact_read_next_level: GenericCounterVec, @@ -212,10 +212,9 @@ impl CompactorMetrics { "Total size of compaction files size that have been written to object store from shared buffer", registry ).unwrap(); - let compact_fast_runner_bytes = register_int_counter_vec_with_registry!( + let compact_fast_runner_bytes = register_int_counter_with_registry!( "compactor_fast_compact_bytes", "Total size of compaction files size of fast compactor runner", - &["group", "level_index"], registry ) .unwrap();