Skip to content

Commit

Permalink
fix(storage): fix tombstone picker (#12776)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Oct 12, 2023
1 parent 9ad7857 commit 7d54cdd
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 9 deletions.
1 change: 1 addition & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ message RiseCtlUpdateCompactionConfigRequest {
uint64 max_space_reclaim_bytes = 13;
uint64 level0_max_compact_file_number = 14;
bool enable_emergency_picker = 15;
uint32 tombstone_reclaim_ratio = 16;
}
}
repeated uint64 compaction_group_ids = 1;
Expand Down
4 changes: 4 additions & 0 deletions src/ctl/src/cmd_impl/hummock/compaction_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub fn build_compaction_config_vec(
level0_max_compact_file_number: Option<u64>,
level0_overlapping_sub_level_compact_level_count: Option<u32>,
enable_emergency_picker: Option<bool>,
tombstone_reclaim_ratio: Option<u32>,
) -> Vec<MutableConfig> {
let mut configs = vec![];
if let Some(c) = max_bytes_for_level_base {
Expand Down Expand Up @@ -105,6 +106,9 @@ pub fn build_compaction_config_vec(
if let Some(c) = enable_emergency_picker {
configs.push(MutableConfig::EnableEmergencyPicker(c))
}
if let Some(c) = tombstone_reclaim_ratio {
configs.push(MutableConfig::TombstoneReclaimRatio(c))
}

configs
}
Expand Down
4 changes: 4 additions & 0 deletions src/ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ enum HummockCommands {
level0_overlapping_sub_level_compact_level_count: Option<u32>,
#[clap(long)]
enable_emergency_picker: Option<bool>,
#[clap(long)]
tombstone_reclaim_ratio: Option<u32>,
},
/// Split given compaction group into two. Moves the given tables to the new group.
SplitCompactionGroup {
Expand Down Expand Up @@ -559,6 +561,7 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
level0_max_compact_file_number,
level0_overlapping_sub_level_compact_level_count,
enable_emergency_picker,
tombstone_reclaim_ratio,
}) => {
cmd_impl::hummock::update_compaction_config(
context,
Expand All @@ -578,6 +581,7 @@ pub async fn start_impl(opts: CliOpts, context: &CtlContext) -> Result<()> {
level0_max_compact_file_number,
level0_overlapping_sub_level_compact_level_count,
enable_emergency_picker,
tombstone_reclaim_ratio,
),
)
.await?
Expand Down
6 changes: 5 additions & 1 deletion src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub mod compaction_config;
mod overlap_strategy;
use risingwave_common::catalog::TableOption;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_pb::hummock::compact_task::{self, TaskStatus};
use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType};

mod picker;
pub mod selector;
Expand Down Expand Up @@ -156,6 +156,10 @@ impl CompactStatus {
}

pub fn is_trivial_move_task(task: &CompactTask) -> bool {
if task.task_type() != TaskType::Dynamic && task.task_type() != TaskType::Emergency {
return false;
}

if task.input_ssts.len() == 1 {
return task.input_ssts[0].level_idx == 0
&& can_concat(&task.input_ssts[0].table_infos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ impl TombstoneReclaimCompactionPicker {
state: &mut TombstoneReclaimPickerState,
) -> Option<CompactionInput> {
assert!(!levels.levels.is_empty());
let mut select_input_ssts = vec![];
if state.last_level == 0 {
state.last_level = 1;
}

while state.last_level <= levels.levels.len() {
let mut select_input_ssts = vec![];
let mut select_file_size = 0;
for sst in &levels.levels[state.last_level - 1].table_infos {
let need_reclaim = (sst.range_tombstone_count * 100
Expand Down Expand Up @@ -108,6 +108,7 @@ impl TombstoneReclaimCompactionPicker {
}
}
if pending_compact {
state.last_level += 1;
continue;
}
InputLevel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ impl CompactionSelector for TombstoneCompactionSelector {
_selector_stats: &mut LocalSelectorStatistic,
_table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
if group.compaction_config.tombstone_reclaim_ratio == 0 {
// it might cause full-compaction when tombstone_reclaim_ratio == 0
return None;
}

let dynamic_level_core = DynamicLevelSelectorCore::new(group.compaction_config.clone());
let ctx = dynamic_level_core.calculate_level_base_size(levels);
let picker = TombstoneReclaimCompactionPicker::new(
create_overlap_strategy(group.compaction_config.compaction_mode()),
group.compaction_config.max_compaction_bytes,
group.compaction_config.max_space_reclaim_bytes,
group.compaction_config.tombstone_reclaim_ratio as u64,
group.compaction_config.tombstone_reclaim_ratio as u64 / 2,
);
Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,9 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi
MutableConfig::EnableEmergencyPicker(c) => {
target.enable_emergency_picker = *c;
}
MutableConfig::TombstoneReclaimRatio(c) => {
target.tombstone_reclaim_ratio = *c;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3045,6 +3045,8 @@ impl CompactionState {
Some(compact_task::TaskType::SpaceReclaim)
} else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
Some(compact_task::TaskType::Ttl)
} else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
Some(compact_task::TaskType::Tombstone)
} else if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
Some(compact_task::TaskType::Dynamic)
} else {
Expand Down
23 changes: 17 additions & 6 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,9 @@ where
progress_key_num += 1;

if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL {
task_progress.inc_progress_key(progress_key_num);
progress_key_num = 0;
}
task_progress.inc_progress_key(progress_key_num);
progress_key_num = 0;
}

let mut iter_key = iter.key();
compaction_statistics.iter_total_key_counts += 1;
Expand Down Expand Up @@ -750,7 +750,13 @@ where
.await?;
}
del_iter.next();
progress_key_num += 1;
if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL {
task_progress.inc_progress_key(progress_key_num);
progress_key_num = 0;
}
}

let earliest_range_delete_which_can_see_iter_key = del_iter.earliest_delete_since(epoch);

// Among keys with same user key, only retain keys which satisfy `epoch` >= `watermark`.
Expand Down Expand Up @@ -851,13 +857,18 @@ where
})
.await?;
del_iter.next();
progress_key_num += 1;
if let Some(task_progress) = task_progress.as_ref() && progress_key_num >= PROGRESS_KEY_INTERVAL {
task_progress.inc_progress_key(progress_key_num);
progress_key_num = 0;
}
}
}

if let Some(task_progress) = task_progress.as_ref() && progress_key_num > 0 {
// Avoid losing the progress_key_num in the last Interval
task_progress.inc_progress_key(progress_key_num);
}
// Avoid losing the progress_key_num in the last Interval
task_progress.inc_progress_key(progress_key_num);
}

if let Some(last_table_id) = last_table_id.take() {
table_stats_drop.insert(last_table_id, std::mem::take(&mut last_table_stats));
Expand Down

0 comments on commit 7d54cdd

Please sign in to comment.