Skip to content

Commit

Permalink
chore(storage): enable check compaction result on dedicated compaction (
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Jan 18, 2024
1 parent 49e43c5 commit de3cb2f
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,23 @@ pub fn start_compactor(
sstable_object_id_manager.remove_watermark_object_id(tracker_id);
},
);
compactor_runner::compact(context, compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await
let enable_check_compaction_result = context.storage_opts.check_compaction_result;
let compact_result = compactor_runner::compact(context.clone(), compact_task, rx, Box::new(sstable_object_id_manager.clone()), filter_key_extractor_manager.clone()).await;
let need_check_task = !compact_result.0.sorted_output_ssts.is_empty() && compact_result.0.task_status() == TaskStatus::Success;

if enable_check_compaction_result && need_check_task {
match check_compaction_result(&compact_result.0, context.clone()).await {
Err(e) => {
tracing::warn!("Failed to check compaction task {} because: {:?}",compact_result.0.task_id, e);
},
Ok(true) => (),
Ok(false) => {
panic!("Failed to pass consistency check for result of compaction task:\n{:?}", compact_task_to_string(&compact_result.0));
}
}
}

compact_result
},
Err(err) => {
tracing::warn!("Failed to track pending SST object id. {:#?}", err);
Expand Down Expand Up @@ -677,11 +693,9 @@ pub fn start_shared_compactor(
{
Ok(_) => {
// TODO: remove this method after we have running risingwave cluster with fast compact algorithm stably for a long time.

if context.storage_opts.check_compaction_result
&& !compact_task.sorted_output_ssts.is_empty()
&& compact_task.task_status() == TaskStatus::Success
{
let enable_check_compaction_result = context.storage_opts.check_compaction_result;
let need_check_task = !compact_task.sorted_output_ssts.is_empty() && compact_task.task_status() == TaskStatus::Success;
if enable_check_compaction_result && need_check_task {
match check_compaction_result(&compact_task, context.clone()).await {
Err(e) => {
tracing::warn!("Failed to check compaction task {} because: {:?}",compact_task.task_id, e);
Expand Down

0 comments on commit de3cb2f

Please sign in to comment.