Skip to content

Commit

Permalink
fix(storage): fix task parallelism race
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Apr 19, 2024
1 parent de6db46 commit b8666cd
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 146 deletions.
60 changes: 5 additions & 55 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ async fn check_result<
Ok(true)
}

pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorContext) -> bool {
pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
let sstable_infos = compact_task
.input_ssts
.iter()
Expand Down Expand Up @@ -534,7 +534,7 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorCont
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0));

let mut compact_table_ids: HashSet<u32> = HashSet::from_iter(
let compact_table_ids: HashSet<u32> = HashSet::from_iter(
compact_task
.input_ssts
.iter()
Expand All @@ -558,7 +558,8 @@ pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorCont

pub async fn generate_splits_for_task(
compact_task: &mut CompactTask,
context: CompactorContext,
context: &CompactorContext,
optimize_by_copy_block: bool,
) -> bool {
let sstable_infos = compact_task
.input_ssts
Expand All @@ -577,51 +578,6 @@ pub async fn generate_splits_for_task(
.map(|table_info| table_info.file_size)
.sum::<u64>();

let all_ssts_are_blocked_filter = sstable_infos
.iter()
.all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked);

let delete_key_count = sstable_infos
.iter()
.map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
.sum::<u64>();
let total_key_count = sstable_infos
.iter()
.map(|table_info| table_info.total_key_count)
.sum::<u64>();

let has_tombstone = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.range_tombstone_count > 0);
let has_ttl = compact_task
.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0));

let mut compact_table_ids = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone())
.collect_vec();
compact_table_ids.sort();
compact_table_ids.dedup();
let single_table = compact_table_ids.len() == 1;

let optimize_by_copy_block = context.storage_opts.enable_fast_compaction
&& all_ssts_are_blocked_filter
&& !has_tombstone
&& !has_ttl
&& single_table
&& compact_task.target_level > 0
&& compact_task.input_ssts.len() == 2
&& compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
&& delete_key_count * 100
< context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
&& compact_task.task_type() == TaskType::Dynamic;

if !optimize_by_copy_block {
match generate_splits(&sstable_infos, compaction_size, context.clone()).await {
Ok(splits) => {
Expand All @@ -633,18 +589,12 @@ pub async fn generate_splits_for_task(
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
// task_status = TaskStatus::ExecuteFailed;
// return (
// compact_done(compact_task, context.clone(), vec![], task_status),
// None,
// );

return false;
}
}
}

return true;
true
}

pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ pub async fn compact(
.map(|table_info| table_info.file_size)
.sum::<u64>();

let optimize_by_copy_block = optimize_by_copy_block(&compact_task, context.clone());
let optimize_by_copy_block = optimize_by_copy_block(&compact_task, &context);

if !optimize_by_copy_block {
match generate_splits(&sstable_infos, compaction_size, context.clone()).await {
Expand Down Expand Up @@ -618,7 +618,7 @@ pub async fn compact(
}

/// Fills in the compact task and tries to report the task result to meta node.
fn compact_done(
pub(crate) fn compact_done(
mut compact_task: CompactTask,
context: CompactorContext,
output_ssts: Vec<CompactOutput>,
Expand Down
Loading

0 comments on commit b8666cd

Please sign in to comment.