Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fix compactor task parallelism race #16387

Merged
merged 15 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 174 additions & 7 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::TableStatsMap;
use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator};
use risingwave_pb::hummock::compact_task::TaskType;
use risingwave_pb::hummock::{
compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema,
compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo,
TableSchema,
};
use tokio::time::Instant;

Expand Down Expand Up @@ -173,11 +175,10 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact
}

const MAX_FILE_COUNT: usize = 32;

fn generate_splits_fast(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: CompactorContext,
context: &CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
Li0k marked this conversation as resolved.
Show resolved Hide resolved
let worker_num = context.compaction_executor.worker_num();
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
Expand Down Expand Up @@ -211,9 +212,6 @@ fn generate_splits_fast(
}
indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
indexes.dedup();
if indexes.len() <= parallelism {
return Ok(vec![]);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this line causes a behavior change. Prior to this PR, when the condition is hit, we will only have one split. After this PR, we will have two splits. Is this expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code implementation was traced back:

  1. generate_splits_fast will only be used when sstable_count > 256 and will not cause the above problems.
  2. even if the branch is triggered and 2 splits are generated, there will be no correctness problem

@Little-Wallace

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate_splits_fast will only be used when sstable_count > 256 and will not cause the above problems.

It is not a good practice to rely on the caller's behavior to make sure the implementation of a function is expected. It is too implicit. What's the problem of directing returning 1 split if indexes.len() <= parallelism?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After refactoring the calculation, we can unify the parallelism calculation and eliminate the overuse of generate_splits_fast. We can then revert this code.

let mut splits = vec![];
splits.push(KeyRange_vec::new(vec![], vec![]));
let parallel_key_count = indexes.len() / parallelism;
Expand All @@ -232,7 +230,7 @@ fn generate_splits_fast(
pub async fn generate_splits(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: CompactorContext,
context: &CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
if compaction_size > parallel_compact_size {
Expand Down Expand Up @@ -489,3 +487,172 @@ async fn check_result<
}
Ok(true)
}

pub fn optimize_by_copy_block(compact_task: &CompactTask, context: &CompactorContext) -> bool {
let sstable_infos = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.filter(|table_info| {
let table_ids = &table_info.table_ids;
table_ids
.iter()
.any(|table_id| compact_task.existing_table_ids.contains(table_id))
})
.cloned()
.collect_vec();
let compaction_size = sstable_infos
.iter()
.map(|table_info| table_info.file_size)
.sum::<u64>();

let all_ssts_are_blocked_filter = sstable_infos
.iter()
.all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked);

let delete_key_count = sstable_infos
.iter()
.map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
.sum::<u64>();
let total_key_count = sstable_infos
.iter()
.map(|table_info| table_info.total_key_count)
.sum::<u64>();

let has_tombstone = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.range_tombstone_count > 0);
let has_ttl = compact_task
.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0));

let compact_table_ids: HashSet<u32> = HashSet::from_iter(
compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone()),
);
let single_table = compact_table_ids.len() == 1;

context.storage_opts.enable_fast_compaction
&& all_ssts_are_blocked_filter
&& !has_tombstone
&& !has_ttl
&& single_table
&& compact_task.target_level > 0
&& compact_task.input_ssts.len() == 2
&& compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
&& delete_key_count * 100
< context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
&& compact_task.task_type() == TaskType::Dynamic
}

pub async fn generate_splits_for_task(
compact_task: &mut CompactTask,
context: &CompactorContext,
optimize_by_copy_block: bool,
) -> HummockResult<()> {
let sstable_infos = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.filter(|table_info| {
let table_ids = &table_info.table_ids;
table_ids
.iter()
.any(|table_id| compact_task.existing_table_ids.contains(table_id))
})
.cloned()
.collect_vec();
let compaction_size = sstable_infos
.iter()
.map(|table_info| table_info.file_size)
.sum::<u64>();

if !optimize_by_copy_block {
let splits = generate_splits(&sstable_infos, compaction_size, context).await?;
if !splits.is_empty() {
compact_task.splits = splits;
}
return Ok(());
}

Ok(())
}

pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
let group_label = compact_task.compaction_group_id.to_string();
let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
let select_table_infos = compact_task
.input_ssts
.iter()
.filter(|level| level.level_idx != compact_task.target_level)
.flat_map(|level| level.table_infos.iter())
.collect_vec();
let target_table_infos = compact_task
.input_ssts
.iter()
.filter(|level| level.level_idx == compact_task.target_level)
.flat_map(|level| level.table_infos.iter())
.collect_vec();
let select_size = select_table_infos
.iter()
.map(|table| table.file_size)
.sum::<u64>();
context
.compactor_metrics
.compact_read_current_level
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(select_size);
context
.compactor_metrics
.compact_read_sstn_current_level
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(select_table_infos.len() as u64);

let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::<u64>();
let next_level_label = compact_task.target_level.to_string();
context
.compactor_metrics
.compact_read_next_level
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(target_level_read_bytes);
context
.compactor_metrics
.compact_read_sstn_next_level
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(target_table_infos.len() as u64);
}

pub fn calculate_task_parallelism(compact_task: &CompactTask, context: &CompactorContext) -> usize {
let optimize_by_copy_block = optimize_by_copy_block(compact_task, context);

if optimize_by_copy_block {
return 1;
}

let sstable_infos = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.filter(|table_info| {
let table_ids = &table_info.table_ids;
table_ids
.iter()
.any(|table_id| compact_task.existing_table_ids.contains(table_id))
})
.cloned()
.collect_vec();
let compaction_size = sstable_infos
.iter()
.map(|table_info| table_info.file_size)
.sum::<u64>();

generate_splits_fast(&sstable_infos, compaction_size, context)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using generate_splits_fast here seems to be an overkill because we don't actually need to calculate the index from sstable info and generate the split in order to know the task parallelism. We just need the logic in L183-194 and the input sst len:

    let worker_num = context.compaction_executor.worker_num();
    let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;

    let mut parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size;

    parallelism = std::cmp::min(
        worker_num,
        std::cmp::min(
            parallelism as usize,
            context.storage_opts.max_sub_compaction as usize,
        ),
    );

    if input_ssts.len() < parallelism {
        parallelism = 1
    }

Also, I think we can pass the parallelism as a parameter to generate_split and generate_split_fast because we already pre-calculate it after this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I noticed that the parallelism calculation is not exactly the same in generate_splits (L268-L279) and generate_splits_fast (L183-L194). Is this expected? Which one should we follow here?

.unwrap()
.len()
}
Loading
Loading