diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 555710a13d466..fd2ede833a6b2 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -31,7 +31,6 @@ use risingwave_pb::hummock::{ compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema, }; -use thiserror_ext::AsReport; use tokio::time::Instant; pub use super::context::CompactorContext; @@ -176,11 +175,10 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact } const MAX_FILE_COUNT: usize = 32; - fn generate_splits_fast( sstable_infos: &Vec, compaction_size: u64, - context: CompactorContext, + context: &CompactorContext, ) -> HummockResult> { let worker_num = context.compaction_executor.worker_num(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; @@ -214,9 +212,6 @@ fn generate_splits_fast( } indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref())); indexes.dedup(); - if indexes.len() <= parallelism { - return Ok(vec![]); - } let mut splits = vec![]; splits.push(KeyRange_vec::new(vec![], vec![])); let parallel_key_count = indexes.len() / parallelism; @@ -235,7 +230,7 @@ fn generate_splits_fast( pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, - context: CompactorContext, + context: &CompactorContext, ) -> HummockResult> { let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { @@ -560,7 +555,7 @@ pub async fn generate_splits_for_task( compact_task: &mut CompactTask, context: &CompactorContext, optimize_by_copy_block: bool, -) -> bool { +) -> HummockResult<()> { let sstable_infos = compact_task .input_ssts .iter() @@ -579,22 +574,14 @@ pub async fn generate_splits_for_task( .sum::(); if !optimize_by_copy_block { - match generate_splits(&sstable_infos, compaction_size, context.clone()).await { - Ok(splits) => { - if !splits.is_empty() { - compact_task.splits = splits; - } - - return true; - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); - return false; - } + let splits = generate_splits(&sstable_infos, compaction_size, context).await?; + if !splits.is_empty() { + compact_task.splits = splits; } + return Ok(()); } - true + Ok(()) } pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) { @@ -640,3 +627,32 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .with_label_values(&[&group_label, next_level_label.as_str()]) .inc_by(target_table_infos.len() as u64); } + +pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize { + let optimize_by_copy_block = optimize_by_copy_block(compact_task, context); + + if optimize_by_copy_block { + return 1; + } + + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + generate_splits_fast(&sstable_infos, compaction_size, context) + .unwrap() + .len() +} diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 9d7f45cbd08d2..74338084f41ed 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BinaryHeap, HashMap, HashSet}; -use std::sync::atomic::Ordering; use std::sync::Arc; use await_tree::InstrumentAwait; @@ -40,7 +39,7 @@ use super::task_progress::TaskProgress; use super::{CompactionStatistics, TaskConfig}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_utils::{ - build_multi_compaction_filter, estimate_task_output_capacity, generate_splits, + build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task, metrics_report_for_task, optimize_by_copy_block, }; use crate::hummock::compactor::iterator::ConcatSstableIterator; @@ -347,71 +346,23 @@ pub async fn compact( }; let mut task_status = TaskStatus::Success; - // skip sst related to non-existent able_id to reduce io - let sstable_infos = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .filter(|table_info| { - let table_ids = &table_info.table_ids; - table_ids - .iter() - .any(|table_id| existing_table_ids.contains(table_id)) - }) - .cloned() - .collect_vec(); - let compaction_size = sstable_infos - .iter() - .map(|table_info| table_info.file_size) - .sum::(); - let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context); - if !optimize_by_copy_block { - match generate_splits(&sstable_infos, compaction_size, context.clone()).await { - Ok(splits) => { - if !splits.is_empty() { - compact_task.splits = splits; - } - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); - task_status = TaskStatus::ExecuteFailed; - return ( - compact_done(compact_task, context.clone(), vec![], task_status), - None, - ); - } - } - } - let compact_task_statistics = statistics_compact_task(&compact_task); - // Number of splits (key ranges) is equal to number of compaction tasks - let parallelism = compact_task.splits.len(); - assert_ne!(parallelism, 0, "splits cannot be empty"); - if !context.acquire_task_quota(parallelism as u32) { - tracing::warn!( - "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", - compact_task.task_id, - parallelism, - context.running_task_parallelism.load(Ordering::Relaxed), - context.max_task_parallelism.load(Ordering::Relaxed), - ); + if let Err(e) = + generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await + { + tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); + task_status = TaskStatus::ExecuteFailed; return ( - compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::NoAvailCpuResourceCanceled, - ), + compact_done(compact_task, context.clone(), vec![], task_status), None, ); } - let _release_quota_guard = - scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| { - context.release_task_quota(parallelism as u32); - }); - + let compact_task_statistics = statistics_compact_task(&compact_task); + // Number of splits (key ranges) is equal to number of compaction tasks + let parallelism = compact_task.splits.len(); + assert_ne!(parallelism, 0, "splits cannot be empty"); let mut output_ssts = Vec::with_capacity(parallelism); let mut compaction_futures = vec![]; let mut abort_handles = vec![]; diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index 4525dcdc773fb..22e11e4430a29 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -102,7 +102,7 @@ impl CompactorContext { } } - pub fn acquire_task_quota(&self, parallelism: u32) -> bool { + pub fn acquire_task_quota(&self, parallelism: u32) -> Option { let mut running_u32 = self.running_task_parallelism.load(Ordering::SeqCst); let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst); @@ -114,7 +114,10 @@ impl CompactorContext { Ordering::SeqCst, ) { Ok(_) => { - return true; + return Some(ReleaseGuard::new( + self.running_task_parallelism.clone(), + parallelism, + )); } Err(old_running_u32) => { running_u32 = old_running_u32; @@ -122,7 +125,7 @@ impl CompactorContext { } } - false + None } pub fn release_task_quota(&self, parallelism: u32) { @@ -150,3 +153,33 @@ impl CompactorContext { } } } + +pub struct ReleaseGuard { + running_task_parallelism: Arc, + release_parallelism: u32, +} + +impl ReleaseGuard { + fn new(running_task_parallelism: Arc, release_parallelism: u32) -> Self { + Self { + running_task_parallelism, + release_parallelism, + } + } +} + +impl Drop for ReleaseGuard { + fn drop(&mut self) { + let prev = self + .running_task_parallelism + .fetch_sub(self.release_parallelism, Ordering::SeqCst); + + assert_ge!( + prev, + self.release_parallelism, + "running {} parallelism {}", + prev, + self.release_parallelism + ); + } +} diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c81ed65d99cfc..19abc78331b54 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -80,9 +80,7 @@ use super::{ use crate::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, StaticFilterKeyExtractorManager, }; -use crate::hummock::compactor::compaction_utils::{ - generate_splits_for_task, optimize_by_copy_block, -}; +use crate::hummock::compactor::compaction_utils::calculate_task_parallelism; use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done}; use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::SplitTableOutput; @@ -459,61 +457,58 @@ pub fn start_compactor( let filter_key_extractor_manager = filter_key_extractor_manager.clone(); match event { - ResponseEvent::CompactTask(mut compact_task) => { - // let mut is_task_fail = false; - let optimize_by_copy_block = - optimize_by_copy_block(&compact_task, &context); - if !generate_splits_for_task( - &mut compact_task, - &context, - optimize_by_copy_block, - ) - .await - { - let (compact_task, table_stats) = compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::ExecuteFailed, - ); - if let Err(e) = - request_sender.send(SubscribeCompactionEventRequest { - event: Some(RequestEvent::ReportTask(ReportTask { - task_id: compact_task.task_id, - task_status: compact_task.task_status, - sorted_output_ssts: compact_task - .sorted_output_ssts - .clone(), - table_stats_change: to_prost_table_stats_map( - table_stats, - ), - })), - create_at: SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("Clock may have gone backwards") - .as_millis() - as u64, - }) - { - let task_id = compact_task.task_id; - tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); - } - - continue 'start_stream; - } + ResponseEvent::CompactTask(compact_task) => { + let parallelism = + calculate_task_parallelism(&compact_task, &context); - let scopeguard = scopeguard::guard( - (compact_task.clone(), context.clone()), - |(compact_task, context)| { - context.running_task_parallelism.fetch_sub( - compact_task.splits.len() as u32, - Ordering::SeqCst, + assert_ne!(parallelism, 0, "splits cannot be empty"); + let release_guard = match context + .acquire_task_quota(parallelism as u32) + { + Some(release_guard) => release_guard, + None => { + tracing::warn!( + "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", + compact_task.task_id, + parallelism, + context.running_task_parallelism.load(Ordering::Relaxed), + context.max_task_parallelism.load(Ordering::Relaxed), + ); + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::NoAvailCpuResourceCanceled, ); - }, - ); + if let Err(e) = + request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task + .sorted_output_ssts + .clone(), + table_stats_change: to_prost_table_stats_map( + table_stats, + ), + })), + create_at: SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_millis() + as u64, + }) + { + let task_id = compact_task.task_id; + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); + } + + continue 'start_stream; + } + }; executor.spawn(async move { - let _scopeguard = scopeguard; + let _release_guard = release_guard; let (tx, rx) = tokio::sync::oneshot::channel(); let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx);