Skip to content

Commit

Permalink
refactore(storage): refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k committed Apr 19, 2024
1 parent accec49 commit de6db46
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 110 deletions.
203 changes: 202 additions & 1 deletion src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::TableStatsMap;
use risingwave_hummock_sdk::{can_concat, EpochWithGap, KeyComparator};
use risingwave_pb::hummock::compact_task::TaskType;
use risingwave_pb::hummock::{
compact_task, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo, TableSchema,
compact_task, BloomFilterType, CompactTask, KeyRange as KeyRange_vec, LevelType, SstableInfo,
TableSchema,
};
use thiserror_ext::AsReport;
use tokio::time::Instant;

pub use super::context::CompactorContext;
Expand Down Expand Up @@ -489,3 +492,201 @@ async fn check_result<
}
Ok(true)
}

pub fn optimize_by_copy_block(compact_task: &CompactTask, context: CompactorContext) -> bool {
let sstable_infos = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.filter(|table_info| {
let table_ids = &table_info.table_ids;
table_ids
.iter()
.any(|table_id| compact_task.existing_table_ids.contains(table_id))
})
.cloned()
.collect_vec();
let compaction_size = sstable_infos
.iter()
.map(|table_info| table_info.file_size)
.sum::<u64>();

let all_ssts_are_blocked_filter = sstable_infos
.iter()
.all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked);

let delete_key_count = sstable_infos
.iter()
.map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
.sum::<u64>();
let total_key_count = sstable_infos
.iter()
.map(|table_info| table_info.total_key_count)
.sum::<u64>();

let has_tombstone = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.range_tombstone_count > 0);
let has_ttl = compact_task
.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0));

let mut compact_table_ids: HashSet<u32> = HashSet::from_iter(
compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone()),
);
let single_table = compact_table_ids.len() == 1;

context.storage_opts.enable_fast_compaction
&& all_ssts_are_blocked_filter
&& !has_tombstone
&& !has_ttl
&& single_table
&& compact_task.target_level > 0
&& compact_task.input_ssts.len() == 2
&& compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
&& delete_key_count * 100
< context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
&& compact_task.task_type() == TaskType::Dynamic
}

pub async fn generate_splits_for_task(
compact_task: &mut CompactTask,
context: CompactorContext,
) -> bool {
let sstable_infos = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.filter(|table_info| {
let table_ids = &table_info.table_ids;
table_ids
.iter()
.any(|table_id| compact_task.existing_table_ids.contains(table_id))
})
.cloned()
.collect_vec();
let compaction_size = sstable_infos
.iter()
.map(|table_info| table_info.file_size)
.sum::<u64>();

let all_ssts_are_blocked_filter = sstable_infos
.iter()
.all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked);

let delete_key_count = sstable_infos
.iter()
.map(|table_info| table_info.stale_key_count + table_info.range_tombstone_count)
.sum::<u64>();
let total_key_count = sstable_infos
.iter()
.map(|table_info| table_info.total_key_count)
.sum::<u64>();

let has_tombstone = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.range_tombstone_count > 0);
let has_ttl = compact_task
.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0));

let mut compact_table_ids = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone())
.collect_vec();
compact_table_ids.sort();
compact_table_ids.dedup();
let single_table = compact_table_ids.len() == 1;

let optimize_by_copy_block = context.storage_opts.enable_fast_compaction
&& all_ssts_are_blocked_filter
&& !has_tombstone
&& !has_ttl
&& single_table
&& compact_task.target_level > 0
&& compact_task.input_ssts.len() == 2
&& compaction_size < context.storage_opts.compactor_fast_max_compact_task_size
&& delete_key_count * 100
< context.storage_opts.compactor_fast_max_compact_delete_ratio as u64 * total_key_count
&& compact_task.task_type() == TaskType::Dynamic;

if !optimize_by_copy_block {
match generate_splits(&sstable_infos, compaction_size, context.clone()).await {
Ok(splits) => {
if !splits.is_empty() {
compact_task.splits = splits;
}

return true;
}
Err(e) => {
tracing::warn!(error = %e.as_report(), "Failed to generate_splits");
// task_status = TaskStatus::ExecuteFailed;
// return (
// compact_done(compact_task, context.clone(), vec![], task_status),
// None,
// );

return false;
}
}
}

return true;
}

pub fn metrics_report_for_task(compact_task: &CompactTask, context: &CompactorContext) {
let group_label = compact_task.compaction_group_id.to_string();
let cur_level_label = compact_task.input_ssts[0].level_idx.to_string();
let select_table_infos = compact_task
.input_ssts
.iter()
.filter(|level| level.level_idx != compact_task.target_level)
.flat_map(|level| level.table_infos.iter())
.collect_vec();
let target_table_infos = compact_task
.input_ssts
.iter()
.filter(|level| level.level_idx == compact_task.target_level)
.flat_map(|level| level.table_infos.iter())
.collect_vec();
let select_size = select_table_infos
.iter()
.map(|table| table.file_size)
.sum::<u64>();
context
.compactor_metrics
.compact_read_current_level
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(select_size);
context
.compactor_metrics
.compact_read_sstn_current_level
.with_label_values(&[&group_label, &cur_level_label])
.inc_by(select_table_infos.len() as u64);

let target_level_read_bytes = target_table_infos.iter().map(|t| t.file_size).sum::<u64>();
let next_level_label = compact_task.target_level.to_string();
context
.compactor_metrics
.compact_read_next_level
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(target_level_read_bytes);
context
.compactor_metrics
.compact_read_sstn_next_level
.with_label_values(&[&group_label, next_level_label.as_str()])
.inc_by(target_table_infos.len() as u64);
}
Loading

0 comments on commit de6db46

Please sign in to comment.