diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index fd8fc19b2be43..81ae96c1b415b 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -473,7 +473,23 @@ pub fn start_compactor( sstable_object_id_manager.remove_watermark_object_id(tracker_id); }, ); - compactor_runner::compact(context, compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await + let enable_check_compaction_result = context.storage_opts.check_compaction_result; + let compact_result = compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await; + let need_check_task = !compact_result.0.sorted_output_ssts.is_empty() && compact_result.0.task_status() == TaskStatus::Success; + + if enable_check_compaction_result && need_check_task { + match check_compaction_result(&compact_result.0, context.clone()).await { + Err(e) => { + tracing::warn!("Failed to check compaction task {} because: {:?}",compact_result.0.task_id, e); + }, + Ok(true) => (), + Ok(false) => { + panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_result.0)); + } + } + } + + compact_result }, Err(err) => { tracing::warn!("Failed to track pending SST object id. {:#?}", err); @@ -677,11 +693,9 @@ pub fn start_shared_compactor( { Ok(_) => { // TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time. - - if context.storage_opts.check_compaction_result - && !compact_task.sorted_output_ssts.is_empty() - && compact_task.task_status() == TaskStatus::Success - { + let enable_check_compaction_result = context.storage_opts.check_compaction_result; + let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status() == TaskStatus::Success; + if enable_check_compaction_result && need_check_task { match check_compaction_result(&compact_task, context.clone()).await { Err(e) => { tracing::warn!("Failed to check compaction task {} because: {:?}",compact_task.task_id, e);