From de6db46e1d306cb635723bf1c626cc887e707a5f Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 18 Apr 2024 19:38:43 +0800 Subject: [PATCH 1/9] refactore(storage): refactor --- .../src/hummock/compactor/compaction_utils.rs | 203 +++++++++++++++++- .../src/hummock/compactor/compactor_runner.rs | 160 +++++--------- src/storage/src/hummock/compactor/mod.rs | 3 +- 3 files changed, 256 insertions(+), 110 deletions(-) diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 9594d7295a03..bf0fdceeb19b 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -26,9 +26,12 @@ use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_hummock_sdk::table_stats::TableStatsMap; use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator}; +use risingwave_pb::hummock::compact_task::TaskType; use risingwave_pb::hummock::{ - compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema, + compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, + TableSchema, }; +use thiserror_ext::AsReport; use tokio::time::Instant; pub use super::context::CompactorContext; @@ -489,3 +492,201 @@ async fn check_result< } Ok(true) } + +pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorContext) -> bool { + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + let all_ssts_are_blocked_filter = sstable_infos + .iter() + .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); + + let delete_key_count = sstable_infos + .iter() + .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) + .sum::(); + let total_key_count = sstable_infos + .iter() + .map(|table_info| table_info.total_key_count) + .sum::(); + + let has_tombstone = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.range_tombstone_count > 0); + let has_ttl = compact_task + .table_options + .iter() + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); + + let mut compact_table_ids: HashSet = HashSet::from_iter( + compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()), + ); + let single_table = compact_table_ids.len() == 1; + + context.storage_opts.enable_fast_compaction + && all_ssts_are_blocked_filter + && !has_tombstone + && !has_ttl + && single_table + && compact_task.target_level > 0 + && compact_task.input_ssts.len() == 2 + && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size + && delete_key_count * 100 + < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count + && compact_task.task_type() == TaskType::Dynamic +} + +pub async fn generate_splits_for_task( + compact_task: &mut CompactTask, + context: CompactorContext, +) -> bool { + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + let all_ssts_are_blocked_filter = sstable_infos + .iter() + .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); + + let delete_key_count = sstable_infos + .iter() + .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) + .sum::(); + let total_key_count = sstable_infos + .iter() + .map(|table_info| table_info.total_key_count) + .sum::(); + + let has_tombstone = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .any(|sst| sst.range_tombstone_count > 0); + let has_ttl = compact_task + .table_options + .iter() + .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); + + let mut compact_table_ids = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()) + .collect_vec(); + compact_table_ids.sort(); + compact_table_ids.dedup(); + let single_table = compact_table_ids.len() == 1; + + let optimize_by_copy_block = context.storage_opts.enable_fast_compaction + && all_ssts_are_blocked_filter + && !has_tombstone + && !has_ttl + && single_table + && compact_task.target_level > 0 + && compact_task.input_ssts.len() == 2 + && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size + && delete_key_count * 100 + < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count + && compact_task.task_type() == TaskType::Dynamic; + + if !optimize_by_copy_block { + match generate_splits(&sstable_infos, compaction_size, context.clone()).await { + Ok(splits) => { + if !splits.is_empty() { + compact_task.splits = splits; + } + + return true; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); + // task_status = TaskStatus::ExecuteFailed; + // return ( + // compact_done(compact_task, context.clone(), vec![], task_status), + // None, + // ); + + return false; + } + } + } + + return true; +} + +pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) { + let group_label = compact_task.compaction_group_id.to_string(); + let cur_level_label = compact_task.input_ssts[0].level_idx.to_string(); + let select_table_infos = compact_task + .input_ssts + .iter() + .filter(|level| level.level_idx != compact_task.target_level) + .flat_map(|level| level.table_infos.iter()) + .collect_vec(); + let target_table_infos = compact_task + .input_ssts + .iter() + .filter(|level| level.level_idx == compact_task.target_level) + .flat_map(|level| level.table_infos.iter()) + .collect_vec(); + let select_size = select_table_infos + .iter() + .map(|table| table.file_size) + .sum::(); + context + .compactor_metrics + .compact_read_current_level + .with_label_values(&[&group_label, &cur_level_label]) + .inc_by(select_size); + context + .compactor_metrics + .compact_read_sstn_current_level + .with_label_values(&[&group_label, &cur_level_label]) + .inc_by(select_table_infos.len() as u64); + + let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); + let next_level_label = compact_task.target_level.to_string(); + context + .compactor_metrics + .compact_read_next_level + .with_label_values(&[&group_label, next_level_label.as_str()]) + .inc_by(target_level_read_bytes); + context + .compactor_metrics + .compact_read_sstn_next_level + .with_label_values(&[&group_label, next_level_label.as_str()]) + .inc_by(target_table_infos.len() as u64); +} diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 46ccaf0ebb94..54b39f3d4a19 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -30,8 +30,8 @@ use risingwave_hummock_sdk::table_stats::{add_table_stats_map, TableStats, Table use risingwave_hummock_sdk::{ can_concat, compact_task_output_to_string, HummockSstableObjectId, KeyComparator, }; -use risingwave_pb::hummock::compact_task::{TaskStatus, TaskType}; -use risingwave_pb::hummock::{BloomFilterType, CompactTask, LevelType, SstableInfo}; +use risingwave_pb::hummock::compact_task::TaskStatus; +use risingwave_pb::hummock::{CompactTask, LevelType, SstableInfo}; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; @@ -41,6 +41,7 @@ use super::{CompactionStatistics, TaskConfig}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_utils::{ build_multi_compaction_filter, estimate_task_output_capacity, generate_splits, + metrics_report_for_task, optimize_by_copy_block, }; use crate::hummock::compactor::iterator::ConcatSstableIterator; use crate::hummock::compactor::task_progress::TaskProgressGuard; @@ -304,46 +305,7 @@ pub async fn compact( ) { let context = compactor_context.clone(); let group_label = compact_task.compaction_group_id.to_string(); - let cur_level_label = compact_task.input_ssts[0].level_idx.to_string(); - let select_table_infos = compact_task - .input_ssts - .iter() - .filter(|level| level.level_idx != compact_task.target_level) - .flat_map(|level| level.table_infos.iter()) - .collect_vec(); - let target_table_infos = compact_task - .input_ssts - .iter() - .filter(|level| level.level_idx == compact_task.target_level) - .flat_map(|level| level.table_infos.iter()) - .collect_vec(); - let select_size = select_table_infos - .iter() - .map(|table| table.file_size) - .sum::(); - context - .compactor_metrics - .compact_read_current_level - .with_label_values(&[&group_label, &cur_level_label]) - .inc_by(select_size); - context - .compactor_metrics - .compact_read_sstn_current_level - .with_label_values(&[&group_label, &cur_level_label]) - .inc_by(select_table_infos.len() as u64); - - let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::(); - let next_level_label = compact_task.target_level.to_string(); - context - .compactor_metrics - .compact_read_next_level - .with_label_values(&[&group_label, next_level_label.as_str()]) - .inc_by(target_level_read_bytes); - context - .compactor_metrics - .compact_read_sstn_next_level - .with_label_values(&[&group_label, next_level_label.as_str()]) - .inc_by(target_table_infos.len() as u64); + metrics_report_for_task(&compact_task, &context); let timer = context .compactor_metrics @@ -356,64 +318,34 @@ pub async fn compact( let multi_filter = build_multi_compaction_filter(&compact_task); - let mut compact_table_ids = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()) - .collect_vec(); - compact_table_ids.sort(); - compact_table_ids.dedup(); - let single_table = compact_table_ids.len() == 1; - let existing_table_ids: HashSet = HashSet::from_iter(compact_task.existing_table_ids.clone()); let compact_table_ids = HashSet::from_iter( - compact_table_ids - .into_iter() + compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .flat_map(|sst| sst.table_ids.clone()) .filter(|table_id| existing_table_ids.contains(table_id)), ); - let multi_filter_key_extractor = match filter_key_extractor_manager - .acquire(compact_table_ids.clone()) - .await + + let multi_filter_key_extractor = match build_filter_key_extractor( + &compact_task, + filter_key_extractor_manager.clone(), + &compact_table_ids, + ) + .await { - Err(e) => { - tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); + Some(multi_filter_key_extractor) => multi_filter_key_extractor, + None => { let task_status = TaskStatus::ExecuteFailed; return ( compact_done(compact_task, context.clone(), vec![], task_status), None, ); } - Ok(extractor) => extractor, }; - if let FilterKeyExtractorImpl::Multi(multi) = &multi_filter_key_extractor { - let found_tables = multi.get_existing_table_ids(); - let removed_tables = compact_table_ids - .iter() - .filter(|table_id| !found_tables.contains(table_id)) - .collect_vec(); - if !removed_tables.is_empty() { - tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); - let task_status = TaskStatus::ExecuteFailed; - return ( - compact_done(compact_task, context.clone(), vec![], task_status), - None, - ); - } - } - - let multi_filter_key_extractor = Arc::new(multi_filter_key_extractor); - let has_tombstone = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .any(|sst| sst.range_tombstone_count > 0); - let has_ttl = compact_task - .table_options - .iter() - .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); let mut task_status = TaskStatus::Success; // skip sst related to non-existent able_id to reduce io let sstable_infos = compact_task @@ -432,29 +364,8 @@ pub async fn compact( .iter() .map(|table_info| table_info.file_size) .sum::(); - let all_ssts_are_blocked_filter = sstable_infos - .iter() - .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); - let delete_key_count = sstable_infos - .iter() - .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) - .sum::(); - let total_key_count = sstable_infos - .iter() - .map(|table_info| table_info.total_key_count) - .sum::(); - let optimize_by_copy_block = context.storage_opts.enable_fast_compaction - && all_ssts_are_blocked_filter - && !has_tombstone - && !has_ttl - && single_table - && compact_task.target_level > 0 - && compact_task.input_ssts.len() == 2 - && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size - && delete_key_count * 100 - < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count - && compact_task.task_type() == TaskType::Dynamic; + let optimize_by_copy_block = optimize_by_copy_block(&compact_task, context.clone()); if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { @@ -927,6 +838,39 @@ where Ok(compaction_statistics) } +async fn build_filter_key_extractor( + compact_task: &CompactTask, + filter_key_extractor_manager: FilterKeyExtractorManager, + compact_table_ids: &HashSet, +) -> Option> { + let multi_filter_key_extractor = match filter_key_extractor_manager + .acquire(compact_table_ids.clone()) + .await + { + Err(e) => { + tracing::error!(error = %e.as_report(), "Failed to fetch filter key extractor tables [{:?}], it may caused by some RPC error", compact_task.existing_table_ids); + return None; + } + Ok(extractor) => extractor, + }; + + if let FilterKeyExtractorImpl::Multi(multi) = &multi_filter_key_extractor { + let found_tables = multi.get_existing_table_ids(); + let removed_tables = compact_table_ids + .iter() + .filter(|table_id| !found_tables.contains(table_id)) + .collect_vec(); + if !removed_tables.is_empty() { + tracing::error!("Failed to fetch filter key extractor tables [{:?}. [{:?}] may be removed by meta-service. ", compact_table_ids, removed_tables); + return None; + } + } + + let multi_filter_key_extractor = Arc::new(multi_filter_key_extractor); + + Some(multi_filter_key_extractor) +} + #[cfg(test)] pub mod tests { use risingwave_hummock_sdk::can_concat; diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c18828bc93e7..654581449b1d 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -470,6 +470,7 @@ pub fn start_compactor( sstable_object_id_manager.remove_watermark_object_id(tracker_id); }, ); + compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await }, Err(err) => { @@ -791,4 +792,4 @@ fn get_task_progress( progress_list.push(progress.snapshot(task_id)); } progress_list -} +} \ No newline at end of file From b8666cded7799210a6bfe96a882e44fb71d2f3fd Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 18 Apr 2024 21:33:29 +0800 Subject: [PATCH 2/9] fix(storage): fix task parallelism race --- .../src/hummock/compactor/compaction_utils.rs | 60 +--- .../src/hummock/compactor/compactor_runner.rs | 4 +- src/storage/src/hummock/compactor/mod.rs | 275 ++++++++++++------ 3 files changed, 193 insertions(+), 146 deletions(-) diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index bf0fdceeb19b..555710a13d46 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -493,7 +493,7 @@ async fn check_result< Ok(true) } -pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorContext) -> bool { +pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool { let sstable_infos = compact_task .input_ssts .iter() @@ -534,7 +534,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorCont .iter() .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); - let mut compact_table_ids: HashSet = HashSet::from_iter( + let compact_table_ids: HashSet = HashSet::from_iter( compact_task .input_ssts .iter() @@ -558,7 +558,8 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorCont pub async fn generate_splits_for_task( compact_task: &mut CompactTask, - context: CompactorContext, + context: &CompactorContext, + optimize_by_copy_block: bool, ) -> bool { let sstable_infos = compact_task .input_ssts @@ -577,51 +578,6 @@ pub async fn generate_splits_for_task( .map(|table_info| table_info.file_size) .sum::(); - let all_ssts_are_blocked_filter = sstable_infos - .iter() - .all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked); - - let delete_key_count = sstable_infos - .iter() - .map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count) - .sum::(); - let total_key_count = sstable_infos - .iter() - .map(|table_info| table_info.total_key_count) - .sum::(); - - let has_tombstone = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .any(|sst| sst.range_tombstone_count > 0); - let has_ttl = compact_task - .table_options - .iter() - .any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0)); - - let mut compact_table_ids = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .flat_map(|sst| sst.table_ids.clone()) - .collect_vec(); - compact_table_ids.sort(); - compact_table_ids.dedup(); - let single_table = compact_table_ids.len() == 1; - - let optimize_by_copy_block = context.storage_opts.enable_fast_compaction - && all_ssts_are_blocked_filter - && !has_tombstone - && !has_ttl - && single_table - && compact_task.target_level > 0 - && compact_task.input_ssts.len() == 2 - && compaction_size < context.storage_opts.compactor_fast_max_compact_task_size - && delete_key_count * 100 - < context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count - && compact_task.task_type() == TaskType::Dynamic; - if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { Ok(splits) => { @@ -633,18 +589,12 @@ pub async fn generate_splits_for_task( } Err(e) => { tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); - // task_status = TaskStatus::ExecuteFailed; - // return ( - // compact_done(compact_task, context.clone(), vec![], task_status), - // None, - // ); - return false; } } } - return true; + true } pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 54b39f3d4a19..9d7f45cbd08d 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -365,7 +365,7 @@ pub async fn compact( .map(|table_info| table_info.file_size) .sum::(); - let optimize_by_copy_block = optimize_by_copy_block(&compact_task, context.clone()); + let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context); if !optimize_by_copy_block { match generate_splits(&sstable_infos, compaction_size, context.clone()).await { @@ -618,7 +618,7 @@ pub async fn compact( } /// Fills in the compact task and tries to report the task result to meta node. -fn compact_done( +pub(crate) fn compact_done( mut compact_task: CompactTask, context: CompactorContext, output_ssts: Vec, diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 654581449b1d..c81ed65d99cf 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -80,7 +80,10 @@ use super::{ use crate::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, StaticFilterKeyExtractorManager, }; -use crate::hummock::compactor::compactor_runner::compact_and_build_sst; +use crate::hummock::compactor::compaction_utils::{ + generate_splits_for_task, optimize_by_copy_block, +}; +use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done}; use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::SplitTableOutput; use crate::hummock::vacuum::Vacuum; @@ -454,121 +457,215 @@ pub fn start_compactor( let meta_client = hummock_meta_client.clone(); let sstable_object_id_manager = sstable_object_id_manager.clone(); let filter_key_extractor_manager = filter_key_extractor_manager.clone(); - executor.spawn(async move { - match event { - ResponseEvent::CompactTask(compact_task) => { - let (tx, rx) = tokio::sync::oneshot::channel(); + + match event { + ResponseEvent::CompactTask(mut compact_task) => { + // let mut is_task_fail = false; + let optimize_by_copy_block = + optimize_by_copy_block(&compact_task, &context); + if !generate_splits_for_task( + &mut compact_task, + &context, + optimize_by_copy_block, + ) + .await + { + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::ExecuteFailed, + ); + if let Err(e) = + request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task + .sorted_output_ssts + .clone(), + table_stats_change: to_prost_table_stats_map( + table_stats, + ), + })), + create_at: SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_millis() + as u64, + }) + { let task_id = compact_task.task_id; - shutdown.lock().unwrap().insert(task_id, tx); - let ((compact_task, table_stats), _memory_tracker) = match sstable_object_id_manager.add_watermark_object_id(None).await + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); + } + + continue 'start_stream; + } + + let scopeguard = scopeguard::guard( + (compact_task.clone(), context.clone()), + |(compact_task, context)| { + context.running_task_parallelism.fetch_sub( + compact_task.splits.len() as u32, + Ordering::SeqCst, + ); + }, + ); + + executor.spawn(async move { + let _scopeguard = scopeguard; + let (tx, rx) = tokio::sync::oneshot::channel(); + let task_id = compact_task.task_id; + shutdown.lock().unwrap().insert(task_id, tx); + let ((compact_task, table_stats), _memory_tracker) = + match sstable_object_id_manager + .add_watermark_object_id(None) + .await { Ok(tracker_id) => { - let sstable_object_id_manager_clone = sstable_object_id_manager.clone(); + let sstable_object_id_manager_clone = + sstable_object_id_manager.clone(); let _guard = scopeguard::guard( (tracker_id, sstable_object_id_manager_clone), |(tracker_id, sstable_object_id_manager)| { - sstable_object_id_manager.remove_watermark_object_id(tracker_id); + sstable_object_id_manager + .remove_watermark_object_id(tracker_id); }, ); - compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await - }, + compactor_runner::compact( + context.clone(), + compact_task, + rx, + Box::new(sstable_object_id_manager.clone()), + filter_key_extractor_manager.clone(), + ) + .await + } Err(err) => { tracing::warn!(error = %err.as_report(), "Failed to track pending SST object id"); let mut compact_task = compact_task; - compact_task.set_task_status(TaskStatus::TrackSstObjectIdFailed); - ((compact_task, HashMap::default()),None) + compact_task.set_task_status( + TaskStatus::TrackSstObjectIdFailed, + ); + ((compact_task, HashMap::default()), None) } }; - shutdown.lock().unwrap().remove(&task_id); - - let enable_check_compaction_result = context.storage_opts.check_compaction_result; - let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status() == TaskStatus::Success; - if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { - event: Some(RequestEvent::ReportTask( - ReportTask { - task_id: compact_task.task_id, - task_status: compact_task.task_status, - sorted_output_ssts: compact_task.sorted_output_ssts.clone(), - table_stats_change:to_prost_table_stats_map(table_stats), - } - )), + shutdown.lock().unwrap().remove(&task_id); + + if let Err(e) = + request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task + .sorted_output_ssts + .clone(), + table_stats_change: to_prost_table_stats_map( + table_stats, + ), + })), create_at: SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("Clock may have gone backwards") - .as_millis() as u64, - }) { - tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); - } - if enable_check_compaction_result && need_check_task { - match check_compaction_result(&compact_task, context.clone()).await { - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); - }, - Ok(true) => (), - Ok(false) => { - panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); - } - } - } + .as_millis() + as u64, + }) + { + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); } - ResponseEvent::VacuumTask(vacuum_task) => { - match Vacuum::handle_vacuum_task( - context.sstable_store.clone(), - &vacuum_task.sstable_object_ids, - ) - .await{ - Ok(_) => { - Vacuum::report_vacuum_task(vacuum_task, meta_client).await; - } + + let enable_check_compaction_result = + context.storage_opts.check_compaction_result; + let need_check_task = !compact_task.sorted_output_ssts.is_empty() + && compact_task.task_status() == TaskStatus::Success; + + if enable_check_compaction_result && need_check_task { + match check_compaction_result(&compact_task, context.clone()) + .await + { Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to vacuum task") - } - } - } - ResponseEvent::FullScanTask(full_scan_task) => { - match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()).await { - Ok((object_ids, total_object_count, total_object_size)) => { - Vacuum::report_full_scan_task(object_ids, total_object_count, total_object_size, meta_client).await; + tracing::warn!(error = %e.as_report(), "Failed to check compaction task {}",compact_task.task_id); } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to iter object"); + Ok(true) => (), + Ok(false) => { + panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_task)); } } } - ResponseEvent::ValidationTask(validation_task) => { - validate_ssts( - validation_task, - context.sstable_store.clone(), - ) - .await; + }); + } + ResponseEvent::VacuumTask(vacuum_task) => { + executor.spawn(async move { + match Vacuum::handle_vacuum_task( + context.sstable_store.clone(), + &vacuum_task.sstable_object_ids, + ) + .await + { + Ok(_) => { + Vacuum::report_vacuum_task(vacuum_task, meta_client).await; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to vacuum task") + } } - ResponseEvent::CancelCompactTask(cancel_compact_task) => { - if let Some(tx) = shutdown - .lock() - .unwrap() - .remove(&cancel_compact_task.task_id) - { - if tx.send(()).is_err() { - tracing::warn!( - "Cancellation of compaction task failed. task_id: {}", - cancel_compact_task.task_id - ); - } - } else { - tracing::warn!( - "Attempting to cancel non-existent compaction task. task_id: {}", - cancel_compact_task.task_id - ); + }); + } + ResponseEvent::FullScanTask(full_scan_task) => { + executor.spawn(async move { + match Vacuum::handle_full_scan_task( + full_scan_task, + context.sstable_store.clone(), + ) + .await + { + Ok((object_ids, total_object_count, total_object_size)) => { + Vacuum::report_full_scan_task( + object_ids, + total_object_count, + total_object_size, + meta_client, + ) + .await; + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "Failed to iter object"); } } - - ResponseEvent::PullTaskAck(_pull_task_ack) => { - // set flag - pull_task_ack.store(true, Ordering::SeqCst); + }); + } + ResponseEvent::ValidationTask(validation_task) => { + executor.spawn(async move { + validate_ssts(validation_task, context.sstable_store.clone()) + .await; + }); + } + ResponseEvent::CancelCompactTask(cancel_compact_task) => { + if let Some(tx) = shutdown + .lock() + .unwrap() + .remove(&cancel_compact_task.task_id) + { + if tx.send(()).is_err() { + tracing::warn!( + "Cancellation of compaction task failed. task_id: {}", + cancel_compact_task.task_id + ); } + } else { + tracing::warn!( + "Attempting to cancel non-existent compaction task. task_id: {}", + cancel_compact_task.task_id + ); } - }); + } + + ResponseEvent::PullTaskAck(_pull_task_ack) => { + // set flag + pull_task_ack.store(true, Ordering::SeqCst); + } + } } Some(Err(e)) => { tracing::warn!("Failed to consume stream. {}", e.message()); @@ -792,4 +889,4 @@ fn get_task_progress( progress_list.push(progress.snapshot(task_id)); } progress_list -} \ No newline at end of file +} From 874ec0384e13ed10839cfc0c6c434537986d74ba Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 19 Apr 2024 15:41:00 +0800 Subject: [PATCH 3/9] fix(storage): fix panic --- .../src/hummock/compactor/compaction_utils.rs | 58 ++++++---- .../src/hummock/compactor/compactor_runner.rs | 71 ++---------- src/storage/src/hummock/compactor/context.rs | 39 ++++++- src/storage/src/hummock/compactor/mod.rs | 103 +++++++++--------- 4 files changed, 133 insertions(+), 138 deletions(-) diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 555710a13d46..fd2ede833a6b 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -31,7 +31,6 @@ use risingwave_pb::hummock::{ compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema, }; -use thiserror_ext::AsReport; use tokio::time::Instant; pub use super::context::CompactorContext; @@ -176,11 +175,10 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact } const MAX_FILE_COUNT: usize = 32; - fn generate_splits_fast( sstable_infos: &Vec, compaction_size: u64, - context: CompactorContext, + context: &CompactorContext, ) -> HummockResult> { let worker_num = context.compaction_executor.worker_num(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; @@ -214,9 +212,6 @@ fn generate_splits_fast( } indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref())); indexes.dedup(); - if indexes.len() <= parallelism { - return Ok(vec![]); - } let mut splits = vec![]; splits.push(KeyRange_vec::new(vec![], vec![])); let parallel_key_count = indexes.len() / parallelism; @@ -235,7 +230,7 @@ fn generate_splits_fast( pub async fn generate_splits( sstable_infos: &Vec, compaction_size: u64, - context: CompactorContext, + context: &CompactorContext, ) -> HummockResult> { let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { @@ -560,7 +555,7 @@ pub async fn generate_splits_for_task( compact_task: &mut CompactTask, context: &CompactorContext, optimize_by_copy_block: bool, -) -> bool { +) -> HummockResult<()> { let sstable_infos = compact_task .input_ssts .iter() @@ -579,22 +574,14 @@ pub async fn generate_splits_for_task( .sum::(); if !optimize_by_copy_block { - match generate_splits(&sstable_infos, compaction_size, context.clone()).await { - Ok(splits) => { - if !splits.is_empty() { - compact_task.splits = splits; - } - - return true; - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); - return false; - } + let splits = generate_splits(&sstable_infos, compaction_size, context).await?; + if !splits.is_empty() { + compact_task.splits = splits; } + return Ok(()); } - true + Ok(()) } pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) { @@ -640,3 +627,32 @@ pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorCo .with_label_values(&[&group_label, next_level_label.as_str()]) .inc_by(target_table_infos.len() as u64); } + +pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize { + let optimize_by_copy_block = optimize_by_copy_block(compact_task, context); + + if optimize_by_copy_block { + return 1; + } + + let sstable_infos = compact_task + .input_ssts + .iter() + .flat_map(|level| level.table_infos.iter()) + .filter(|table_info| { + let table_ids = &table_info.table_ids; + table_ids + .iter() + .any(|table_id| compact_task.existing_table_ids.contains(table_id)) + }) + .cloned() + .collect_vec(); + let compaction_size = sstable_infos + .iter() + .map(|table_info| table_info.file_size) + .sum::(); + + generate_splits_fast(&sstable_infos, compaction_size, context) + .unwrap() + .len() +} diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 9d7f45cbd08d..74338084f41e 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BinaryHeap, HashMap, HashSet}; -use std::sync::atomic::Ordering; use std::sync::Arc; use await_tree::InstrumentAwait; @@ -40,7 +39,7 @@ use super::task_progress::TaskProgress; use super::{CompactionStatistics, TaskConfig}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_utils::{ - build_multi_compaction_filter, estimate_task_output_capacity, generate_splits, + build_multi_compaction_filter, estimate_task_output_capacity, generate_splits_for_task, metrics_report_for_task, optimize_by_copy_block, }; use crate::hummock::compactor::iterator::ConcatSstableIterator; @@ -347,71 +346,23 @@ pub async fn compact( }; let mut task_status = TaskStatus::Success; - // skip sst related to non-existent able_id to reduce io - let sstable_infos = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .filter(|table_info| { - let table_ids = &table_info.table_ids; - table_ids - .iter() - .any(|table_id| existing_table_ids.contains(table_id)) - }) - .cloned() - .collect_vec(); - let compaction_size = sstable_infos - .iter() - .map(|table_info| table_info.file_size) - .sum::(); - let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context); - if !optimize_by_copy_block { - match generate_splits(&sstable_infos, compaction_size, context.clone()).await { - Ok(splits) => { - if !splits.is_empty() { - compact_task.splits = splits; - } - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); - task_status = TaskStatus::ExecuteFailed; - return ( - compact_done(compact_task, context.clone(), vec![], task_status), - None, - ); - } - } - } - let compact_task_statistics = statistics_compact_task(&compact_task); - // Number of splits (key ranges) is equal to number of compaction tasks - let parallelism = compact_task.splits.len(); - assert_ne!(parallelism, 0, "splits cannot be empty"); - if !context.acquire_task_quota(parallelism as u32) { - tracing::warn!( - "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", - compact_task.task_id, - parallelism, - context.running_task_parallelism.load(Ordering::Relaxed), - context.max_task_parallelism.load(Ordering::Relaxed), - ); + if let Err(e) = + generate_splits_for_task(&mut compact_task, &context, optimize_by_copy_block).await + { + tracing::warn!(error = %e.as_report(), "Failed to generate_splits"); + task_status = TaskStatus::ExecuteFailed; return ( - compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::NoAvailCpuResourceCanceled, - ), + compact_done(compact_task, context.clone(), vec![], task_status), None, ); } - let _release_quota_guard = - scopeguard::guard((parallelism, context.clone()), |(parallelism, context)| { - context.release_task_quota(parallelism as u32); - }); - + let compact_task_statistics = statistics_compact_task(&compact_task); + // Number of splits (key ranges) is equal to number of compaction tasks + let parallelism = compact_task.splits.len(); + assert_ne!(parallelism, 0, "splits cannot be empty"); let mut output_ssts = Vec::with_capacity(parallelism); let mut compaction_futures = vec![]; let mut abort_handles = vec![]; diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index 4525dcdc773f..22e11e4430a2 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -102,7 +102,7 @@ impl CompactorContext { } } - pub fn acquire_task_quota(&self, parallelism: u32) -> bool { + pub fn acquire_task_quota(&self, parallelism: u32) -> Option { let mut running_u32 = self.running_task_parallelism.load(Ordering::SeqCst); let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst); @@ -114,7 +114,10 @@ impl CompactorContext { Ordering::SeqCst, ) { Ok(_) => { - return true; + return Some(ReleaseGuard::new( + self.running_task_parallelism.clone(), + parallelism, + )); } Err(old_running_u32) => { running_u32 = old_running_u32; @@ -122,7 +125,7 @@ impl CompactorContext { } } - false + None } pub fn release_task_quota(&self, parallelism: u32) { @@ -150,3 +153,33 @@ impl CompactorContext { } } } + +pub struct ReleaseGuard { + running_task_parallelism: Arc, + release_parallelism: u32, +} + +impl ReleaseGuard { + fn new(running_task_parallelism: Arc, release_parallelism: u32) -> Self { + Self { + running_task_parallelism, + release_parallelism, + } + } +} + +impl Drop for ReleaseGuard { + fn drop(&mut self) { + let prev = self + .running_task_parallelism + .fetch_sub(self.release_parallelism, Ordering::SeqCst); + + assert_ge!( + prev, + self.release_parallelism, + "running {} parallelism {}", + prev, + self.release_parallelism + ); + } +} diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index c81ed65d99cf..19abc78331b5 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -80,9 +80,7 @@ use super::{ use crate::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, StaticFilterKeyExtractorManager, }; -use crate::hummock::compactor::compaction_utils::{ - generate_splits_for_task, optimize_by_copy_block, -}; +use crate::hummock::compactor::compaction_utils::calculate_task_parallelism; use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done}; use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::multi_builder::SplitTableOutput; @@ -459,61 +457,58 @@ pub fn start_compactor( let filter_key_extractor_manager = filter_key_extractor_manager.clone(); match event { - ResponseEvent::CompactTask(mut compact_task) => { - // let mut is_task_fail = false; - let optimize_by_copy_block = - optimize_by_copy_block(&compact_task, &context); - if !generate_splits_for_task( - &mut compact_task, - &context, - optimize_by_copy_block, - ) - .await - { - let (compact_task, table_stats) = compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::ExecuteFailed, - ); - if let Err(e) = - request_sender.send(SubscribeCompactionEventRequest { - event: Some(RequestEvent::ReportTask(ReportTask { - task_id: compact_task.task_id, - task_status: compact_task.task_status, - sorted_output_ssts: compact_task - .sorted_output_ssts - .clone(), - table_stats_change: to_prost_table_stats_map( - table_stats, - ), - })), - create_at: SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("Clock may have gone backwards") - .as_millis() - as u64, - }) - { - let task_id = compact_task.task_id; - tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); - } - - continue 'start_stream; - } + ResponseEvent::CompactTask(compact_task) => { + let parallelism = + calculate_task_parallelism(&compact_task, &context); - let scopeguard = scopeguard::guard( - (compact_task.clone(), context.clone()), - |(compact_task, context)| { - context.running_task_parallelism.fetch_sub( - compact_task.splits.len() as u32, - Ordering::SeqCst, + assert_ne!(parallelism, 0, "splits cannot be empty"); + let release_guard = match context + .acquire_task_quota(parallelism as u32) + { + Some(release_guard) => release_guard, + None => { + tracing::warn!( + "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", + compact_task.task_id, + parallelism, + context.running_task_parallelism.load(Ordering::Relaxed), + context.max_task_parallelism.load(Ordering::Relaxed), + ); + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::NoAvailCpuResourceCanceled, ); - }, - ); + if let Err(e) = + request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task + .sorted_output_ssts + .clone(), + table_stats_change: to_prost_table_stats_map( + table_stats, + ), + })), + create_at: SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_millis() + as u64, + }) + { + let task_id = compact_task.task_id; + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); + } + + continue 'start_stream; + } + }; executor.spawn(async move { - let _scopeguard = scopeguard; + let _release_guard = release_guard; let (tx, rx) = tokio::sync::oneshot::channel(); let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx); From 9931c74d9dcf63b9bdcc2f341081531eb817c8ec Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 25 Apr 2024 16:47:26 +0800 Subject: [PATCH 4/9] fix(storage): address comments --- src/storage/src/hummock/compactor/mod.rs | 79 ++++++++++-------------- 1 file changed, 34 insertions(+), 45 deletions(-) diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 19abc78331b5..0a2d2efcc15c 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -20,7 +20,7 @@ use risingwave_pb::hummock::report_compaction_task_request::{ Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat, ReportTask as ReportSharedTask, }; -use risingwave_pb::hummock::{ReportFullScanTaskRequest, ReportVacuumTaskRequest}; +use risingwave_pb::hummock::{CompactTask, ReportFullScanTaskRequest, ReportVacuumTaskRequest}; use risingwave_rpc_client::GrpcCompactorProxyClient; use thiserror_ext::AsReport; use tokio::sync::mpsc; @@ -51,7 +51,7 @@ pub use context::{ use futures::{pin_mut, StreamExt}; pub use iterator::{ConcatSstableIterator, SstableStreamIterator}; use more_asserts::assert_ge; -use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; +use risingwave_hummock_sdk::table_stats::{to_prost_table_stats_map, TableStatsMap}; use risingwave_hummock_sdk::{compact_task_to_string, HummockCompactionTaskId, LocalSstableInfo}; use risingwave_pb::hummock::compact_task::TaskStatus; use risingwave_pb::hummock::subscribe_compaction_event_request::{ @@ -434,6 +434,28 @@ pub fn start_compactor( } }; + fn send_report_task_event( + compact_task: &CompactTask, + table_stats: TableStatsMap, + request_sender: &mpsc::UnboundedSender, + ) { + if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { + event: Some(RequestEvent::ReportTask(ReportTask { + task_id: compact_task.task_id, + task_status: compact_task.task_status, + sorted_output_ssts: compact_task.sorted_output_ssts.clone(), + table_stats_change: to_prost_table_stats_map(table_stats), + })), + create_at: SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Clock may have gone backwards") + .as_millis() as u64, + }) { + let task_id = compact_task.task_id; + tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); + } + } + match event { Some(Ok(SubscribeCompactionEventResponse { event, create_at })) => { let event = match event { @@ -481,28 +503,11 @@ pub fn start_compactor( TaskStatus::NoAvailCpuResourceCanceled, ); - if let Err(e) = - request_sender.send(SubscribeCompactionEventRequest { - event: Some(RequestEvent::ReportTask(ReportTask { - task_id: compact_task.task_id, - task_status: compact_task.task_status, - sorted_output_ssts: compact_task - .sorted_output_ssts - .clone(), - table_stats_change: to_prost_table_stats_map( - table_stats, - ), - })), - create_at: SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("Clock may have gone backwards") - .as_millis() - as u64, - }) - { - let task_id = compact_task.task_id; - tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); - } + send_report_task_event( + &compact_task, + table_stats, + &request_sender, + ); continue 'start_stream; } @@ -548,27 +553,11 @@ pub fn start_compactor( }; shutdown.lock().unwrap().remove(&task_id); - if let Err(e) = - request_sender.send(SubscribeCompactionEventRequest { - event: Some(RequestEvent::ReportTask(ReportTask { - task_id: compact_task.task_id, - task_status: compact_task.task_status, - sorted_output_ssts: compact_task - .sorted_output_ssts - .clone(), - table_stats_change: to_prost_table_stats_map( - table_stats, - ), - })), - create_at: SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("Clock may have gone backwards") - .as_millis() - as u64, - }) - { - tracing::warn!(error = %e.as_report(), "Failed to report task {task_id:?}"); - } + send_report_task_event( + &compact_task, + table_stats, + &request_sender, + ); let enable_check_compaction_result = context.storage_opts.check_compaction_result; From f136e6f23b7181e061ca23bf61fc91b00e216082 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 10 May 2024 15:24:34 +0800 Subject: [PATCH 5/9] fix(compactor): address comments --- .../src/hummock/compactor/compaction_utils.rs | 49 ++++++++++++------- .../src/hummock/compactor/compactor_runner.rs | 2 +- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index fd2ede833a6b..07a7478fd3b2 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -174,12 +174,11 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact multi_filter } -const MAX_FILE_COUNT: usize = 32; fn generate_splits_fast( sstable_infos: &Vec, compaction_size: u64, context: &CompactorContext, -) -> HummockResult> { +) -> Vec { let worker_num = context.compaction_executor.worker_num(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; @@ -224,7 +223,8 @@ fn generate_splits_fast( } last_split_key_count += 1; } - Ok(splits) + + splits } pub async fn generate_splits( @@ -232,10 +232,15 @@ pub async fn generate_splits( compaction_size: u64, context: &CompactorContext, ) -> HummockResult> { + const MAX_FILE_COUNT: usize = 32; let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; if compaction_size > parallel_compact_size { if sstable_infos.len() > MAX_FILE_COUNT { - return generate_splits_fast(sstable_infos, compaction_size, context); + return Ok(generate_splits_fast( + sstable_infos, + compaction_size, + context, + )); } let mut indexes = vec![]; // preload the meta and get the smallest key to split sub_compaction @@ -265,18 +270,15 @@ pub async fn generate_splits( let mut splits = vec![]; splits.push(KeyRange_vec::new(vec![], vec![])); - let worker_num = context.compaction_executor.worker_num(); - - let parallelism = std::cmp::min( - worker_num as u64, - std::cmp::min( - indexes.len() as u64, - context.storage_opts.max_sub_compaction as u64, - ), + let parallelism = calculate_task_parallelism_impl( + context.compaction_executor.worker_num(), + parallel_compact_size, + compaction_size, + context.storage_opts.max_sub_compaction, ); + let sub_compaction_data_size = - std::cmp::max(compaction_size / parallelism, parallel_compact_size); - let parallelism = compaction_size / sub_compaction_data_size; + std::cmp::max(compaction_size / parallelism as u64, parallel_compact_size); if parallelism > 1 { let mut last_buffer_size = 0; @@ -651,8 +653,21 @@ pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &Compacto .iter() .map(|table_info| table_info.file_size) .sum::(); + let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; + calculate_task_parallelism_impl( + context.compaction_executor.worker_num(), + parallel_compact_size, + compaction_size, + context.storage_opts.max_sub_compaction, + ) +} - generate_splits_fast(&sstable_infos, compaction_size, context) - .unwrap() - .len() +pub fn calculate_task_parallelism_impl( + worker_num: usize, + parallel_compact_size: u64, + compaction_size: u64, + max_sub_compaction: u32, +) -> usize { + let parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size; + worker_num.min(parallelism.min(max_sub_compaction as u64) as usize) } diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 74338084f41e..dbff563b5a14 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -330,7 +330,7 @@ pub async fn compact( let multi_filter_key_extractor = match build_filter_key_extractor( &compact_task, - filter_key_extractor_manager.clone(), + filter_key_extractor_manager, &compact_table_ids, ) .await From f00091034c361228f198d9602ca82a8f8173e90c Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 10 May 2024 17:18:24 +0800 Subject: [PATCH 6/9] refactor(compactor): address comments --- src/compute/src/server.rs | 9 -- src/storage/compactor/src/server.rs | 13 --- .../hummock_test/src/compactor_tests.rs | 2 - .../src/hummock/compactor/compaction_utils.rs | 15 ++-- src/storage/src/hummock/compactor/context.rs | 90 ------------------- src/storage/src/hummock/compactor/mod.rs | 86 +++++++++--------- .../src/delete_range_runner.rs | 10 --- 7 files changed, 52 insertions(+), 173 deletions(-) diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 07dee5dfa4c8..1d669864c726 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::net::SocketAddr; -use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Duration; @@ -217,12 +216,6 @@ pub async fn compute_node_serve( )); let compaction_executor = Arc::new(CompactionExecutor::new(Some(1))); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 - * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); - let compactor_context = CompactorContext { storage_opts, sstable_store: storage.sstable_store(), @@ -235,8 +228,6 @@ pub async fn compute_node_serve( await_tree_reg: await_tree_config .clone() .map(new_compaction_await_tree_reg_ref), - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; let (handle, shutdown_sender) = start_compactor( diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 02eaba668203..e11491fe0360 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::net::SocketAddr; -use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Duration; @@ -247,10 +246,6 @@ pub async fn compactor_serve( let compaction_executor = Arc::new(CompactionExecutor::new( opts.compaction_worker_threads_number, )); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); let compactor_context = CompactorContext { storage_opts, @@ -262,8 +257,6 @@ pub async fn compactor_serve( task_progress_manager: Default::default(), await_tree_reg: await_tree_reg.clone(), - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; let mut sub_tasks = vec![ MetaClient::start_heartbeat_loop( @@ -383,10 +376,6 @@ pub async fn shared_compactor_serve( let compaction_executor = Arc::new(CompactionExecutor::new( opts.compaction_worker_threads_number, )); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); let compactor_context = CompactorContext { storage_opts, sstable_store, @@ -396,8 +385,6 @@ pub async fn shared_compactor_serve( memory_limiter, task_progress_manager: Default::default(), await_tree_reg, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; let join_handle = tokio::spawn(async move { tonic::transport::Server::builder() diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 917aa53d5132..afd29fcd3503 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -215,8 +215,6 @@ pub(crate) mod tests { memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg: None, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, } } diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index 07a7478fd3b2..d0e5fe93c62e 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -182,14 +182,11 @@ fn generate_splits_fast( let worker_num = context.compaction_executor.worker_num(); let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20; - let parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size; - - let parallelism = std::cmp::min( + let parallelism = calculate_task_parallelism_impl( worker_num, - std::cmp::min( - parallelism as usize, - context.storage_opts.max_sub_compaction as usize, - ), + parallel_compact_size, + compaction_size, + context.storage_opts.max_sub_compaction, ); let mut indexes = vec![]; for sst in sstable_infos { @@ -211,6 +208,10 @@ fn generate_splits_fast( } indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref())); indexes.dedup(); + if indexes.len() <= parallelism { + return vec![]; + } + let mut splits = vec![]; splits.push(KeyRange_vec::new(vec![], vec![])); let parallel_key_count = indexes.len() / parallelism; diff --git a/src/storage/src/hummock/compactor/context.rs b/src/storage/src/hummock/compactor/context.rs index 22e11e4430a2..e8295effc25e 100644 --- a/src/storage/src/hummock/compactor/context.rs +++ b/src/storage/src/hummock/compactor/context.rs @@ -12,11 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use more_asserts::assert_ge; - use super::task_progress::TaskProgressManagerRef; use crate::hummock::compactor::CompactionExecutor; use crate::hummock::sstable_store::SstableStoreRef; @@ -65,10 +62,6 @@ pub struct CompactorContext { pub task_progress_manager: TaskProgressManagerRef, pub await_tree_reg: Option, - - pub running_task_parallelism: Arc, - - pub max_task_parallelism: Arc, } impl CompactorContext { @@ -97,89 +90,6 @@ impl CompactorContext { memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism: Arc::new(AtomicU32::new(u32::MAX)), - } - } - - pub fn acquire_task_quota(&self, parallelism: u32) -> Option { - let mut running_u32 = self.running_task_parallelism.load(Ordering::SeqCst); - let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst); - - while parallelism + running_u32 <= max_u32 { - match self.running_task_parallelism.compare_exchange( - running_u32, - running_u32 + parallelism, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => { - return Some(ReleaseGuard::new( - self.running_task_parallelism.clone(), - parallelism, - )); - } - Err(old_running_u32) => { - running_u32 = old_running_u32; - } - } - } - - None - } - - pub fn release_task_quota(&self, parallelism: u32) { - let prev = self - .running_task_parallelism - .fetch_sub(parallelism, Ordering::SeqCst); - - assert_ge!( - prev, - parallelism, - "running {} parallelism {}", - prev, - parallelism - ); - } - - pub fn get_free_quota(&self) -> u32 { - let running_u32 = self.running_task_parallelism.load(Ordering::SeqCst); - let max_u32 = self.max_task_parallelism.load(Ordering::SeqCst); - - if max_u32 > running_u32 { - max_u32 - running_u32 - } else { - 0 - } - } -} - -pub struct ReleaseGuard { - running_task_parallelism: Arc, - release_parallelism: u32, -} - -impl ReleaseGuard { - fn new(running_task_parallelism: Arc, release_parallelism: u32) -> Self { - Self { - running_task_parallelism, - release_parallelism, } } } - -impl Drop for ReleaseGuard { - fn drop(&mut self) { - let prev = self - .running_task_parallelism - .fetch_sub(self.release_parallelism, Ordering::SeqCst); - - assert_ge!( - prev, - self.release_parallelism, - "running {} parallelism {}", - prev, - self.release_parallelism - ); - } -} diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 0a2d2efcc15c..824e3cc8d7af 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -35,7 +35,7 @@ pub(super) mod task_progress; use std::collections::{HashMap, VecDeque}; use std::marker::PhantomData; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -300,14 +300,14 @@ pub fn start_compactor( let stream_retry_interval = Duration::from_secs(30); let task_progress = compactor_context.task_progress_manager.clone(); let periodic_event_update_interval = Duration::from_millis(1000); - let pull_task_ack = Arc::new(AtomicBool::new(true)); + + let max_task_parallelism: u32 = (compactor_context.compaction_executor.worker_num() as f32 + * compactor_context.storage_opts.compactor_max_task_multiplier) + .ceil() as u32; + let running_task_parallelism = Arc::new(AtomicU32::new(0)); + const MAX_PULL_TASK_COUNT: u32 = 4; - let max_pull_task_count = std::cmp::min( - compactor_context - .max_task_parallelism - .load(Ordering::SeqCst), - MAX_PULL_TASK_COUNT, - ); + let max_pull_task_count = std::cmp::min(max_task_parallelism, MAX_PULL_TASK_COUNT); assert_ge!( compactor_context.storage_opts.compactor_max_task_multiplier, @@ -322,7 +322,8 @@ pub fn start_compactor( // This outer loop is to recreate stream. 'start_stream: loop { // reset state - pull_task_ack.store(true, Ordering::SeqCst); + // pull_task_ack.store(true, Ordering::SeqCst); + let mut pull_task_ack = true; tokio::select! { // Wait for interval. _ = min_interval.tick() => {}, @@ -366,7 +367,7 @@ pub fn start_compactor( event_loop_iteration_now = Instant::now(); } - let pull_task_ack = pull_task_ack.clone(); + let running_task_parallelism = running_task_parallelism.clone(); let request_sender = request_sender.clone(); let event: Option> = tokio::select! { _ = periodic_event_interval.tick() => { @@ -390,9 +391,9 @@ pub fn start_compactor( let mut pending_pull_task_count = 0; - if pull_task_ack.load(Ordering::SeqCst) { + if pull_task_ack { // TODO: Compute parallelism on meta side - pending_pull_task_count = compactor_context.get_free_quota().max(max_pull_task_count); + pending_pull_task_count = (max_task_parallelism - running_task_parallelism.load(Ordering::SeqCst)).max(max_pull_task_count); if pending_pull_task_count > 0 { if let Err(e) = request_sender.send(SubscribeCompactionEventRequest { @@ -411,14 +412,14 @@ pub fn start_compactor( // re subscribe stream continue 'start_stream; } else { - pull_task_ack.store(false, Ordering::SeqCst); + pull_task_ack = false; } } } tracing::info!( - running_parallelism_count = %compactor_context.running_task_parallelism.load(Ordering::Relaxed), - pull_task_ack = %pull_task_ack.load(Ordering::Relaxed), + running_parallelism_count = %running_task_parallelism.load(Ordering::SeqCst), + pull_task_ack = %pull_task_ack, pending_pull_task_count = %pending_pull_task_count ); @@ -484,36 +485,36 @@ pub fn start_compactor( calculate_task_parallelism(&compact_task, &context); assert_ne!(parallelism, 0, "splits cannot be empty"); - let release_guard = match context - .acquire_task_quota(parallelism as u32) + + if (max_task_parallelism + - running_task_parallelism.load(Ordering::SeqCst)) + < parallelism as u32 { - Some(release_guard) => release_guard, - None => { - tracing::warn!( - "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", - compact_task.task_id, - parallelism, - context.running_task_parallelism.load(Ordering::Relaxed), - context.max_task_parallelism.load(Ordering::Relaxed), - ); - let (compact_task, table_stats) = compact_done( - compact_task, - context.clone(), - vec![], - TaskStatus::NoAvailCpuResourceCanceled, - ); + tracing::warn!( + "Not enough core parallelism to serve the task {} task_parallelism {} running_task_parallelism {} max_task_parallelism {}", + compact_task.task_id, + parallelism, + max_task_parallelism, + running_task_parallelism.load(Ordering::Relaxed), + ); + let (compact_task, table_stats) = compact_done( + compact_task, + context.clone(), + vec![], + TaskStatus::NoAvailCpuResourceCanceled, + ); - send_report_task_event( - &compact_task, - table_stats, - &request_sender, - ); + send_report_task_event( + &compact_task, + table_stats, + &request_sender, + ); + + continue 'start_stream; + } - continue 'start_stream; - } - }; executor.spawn(async move { - let _release_guard = release_guard; + running_task_parallelism.fetch_add(parallelism as u32, Ordering::SeqCst); let (tx, rx) = tokio::sync::oneshot::channel(); let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx); @@ -552,6 +553,7 @@ pub fn start_compactor( } }; shutdown.lock().unwrap().remove(&task_id); + running_task_parallelism.fetch_sub(parallelism as u32, Ordering::SeqCst); send_report_task_event( &compact_task, @@ -647,7 +649,7 @@ pub fn start_compactor( ResponseEvent::PullTaskAck(_pull_task_ack) => { // set flag - pull_task_ack.store(true, Ordering::SeqCst); + pull_task_ack = true; } } } diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index f29d7b8b642a..58e9fde74c48 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -15,7 +15,6 @@ use std::future::Future; use std::ops::{Bound, RangeBounds}; use std::pin::{pin, Pin}; -use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -605,24 +604,15 @@ fn run_compactor_thread( ) { let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager(filter_key_extractor_manager); - - let compaction_executor = Arc::new(CompactionExecutor::new(Some(1))); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); let compactor_context = CompactorContext { storage_opts, sstable_store, compactor_metrics, is_share_buffer_compact: false, compaction_executor: Arc::new(CompactionExecutor::new(None)), - memory_limiter: MemoryLimiter::unlimit(), task_progress_manager: Default::default(), await_tree_reg: None, - running_task_parallelism: Arc::new(AtomicU32::new(0)), - max_task_parallelism, }; start_compactor( From 3b4b613547412efd5dc1289739c32df71c55429f Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 10 May 2024 18:05:08 +0800 Subject: [PATCH 7/9] fix(compactor): fix check --- src/storage/hummock_test/src/compactor_tests.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index afd29fcd3503..aafb6ee9c15c 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -200,12 +200,6 @@ pub(crate) mod tests { storage_opts: Arc, sstable_store: SstableStoreRef, ) -> CompactorContext { - let compaction_executor = Arc::new(CompactionExecutor::new(Some(1))); - let max_task_parallelism = Arc::new(AtomicU32::new( - (compaction_executor.worker_num() as f32 * storage_opts.compactor_max_task_multiplier) - .ceil() as u32, - )); - CompactorContext { storage_opts, sstable_store, From c83d81df0750354693441e91aa392525bce715ef Mon Sep 17 00:00:00 2001 From: Li0k Date: Sat, 11 May 2024 13:59:57 +0800 Subject: [PATCH 8/9] fix(compactor): fix check --- src/storage/hummock_test/src/compactor_tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index aafb6ee9c15c..a10b8963a6d7 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -17,7 +17,6 @@ pub(crate) mod tests { use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::ops::Bound; - use std::sync::atomic::AtomicU32; use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; From 9f4b393406032f03bc3d74ce9a311413236ea9d7 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 15 May 2024 00:29:06 +0800 Subject: [PATCH 9/9] fix(compactor): address comments --- src/storage/src/hummock/compactor/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 824e3cc8d7af..be9a3ffad038 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -513,8 +513,9 @@ pub fn start_compactor( continue 'start_stream; } + running_task_parallelism + .fetch_add(parallelism as u32, Ordering::SeqCst); executor.spawn(async move { - running_task_parallelism.fetch_add(parallelism as u32, Ordering::SeqCst); let (tx, rx) = tokio::sync::oneshot::channel(); let task_id = compact_task.task_id; shutdown.lock().unwrap().insert(task_id, tx); @@ -563,8 +564,7 @@ pub fn start_compactor( let enable_check_compaction_result = context.storage_opts.check_compaction_result; - let need_check_task = !compact_task.sorted_output_ssts.is_empty() - && compact_task.task_status() == TaskStatus::Success; + let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status() == TaskStatus::Success; if enable_check_compaction_result && need_check_task { match check_compaction_result(&compact_task, context.clone())