Skip to content

Commit

Permalink
Revert "fix(meta): do not split by vnode for low write throughput (#1…
Browse files Browse the repository at this point in the history
…2534)"

This reverts commit 6c5b56d.
  • Loading branch information
Little-Wallace committed Jan 22, 2024
1 parent 5a8e183 commit a39a83b
Show file tree
Hide file tree
Showing 19 changed files with 182 additions and 379 deletions.
5 changes: 1 addition & 4 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ message Level {
uint64 total_file_size = 4;
uint64 sub_level_id = 5;
uint64 uncompressed_file_size = 6;
uint32 vnode_partition_count = 7;
}

message InputLevel {
Expand All @@ -63,7 +62,6 @@ message IntraLevelDelta {
uint64 l0_sub_level_id = 2;
repeated uint64 removed_table_ids = 3;
repeated SstableInfo inserted_table_infos = 4;
uint32 vnode_partition_count = 5;
}

enum CompatibilityVersion {
Expand Down Expand Up @@ -144,7 +142,6 @@ message HummockVersion {
uint64 group_id = 3;
uint64 parent_group_id = 4;
repeated uint32 member_table_ids = 5;
uint32 vnode_partition_count = 6;
}
uint64 id = 1;
// Levels of each compaction group
Expand Down Expand Up @@ -351,7 +348,7 @@ message CompactTask {
bool split_by_state_table = 21 [deprecated = true];
// Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed.
// Deprecated. use table_vnode_partition instead;
uint32 split_weight_by_vnode = 22;
uint32 split_weight_by_vnode = 22 [deprecated = true];
map<uint32, uint32> table_vnode_partition = 23;
// The table watermark of any table id. In compaction we only use the table watermarks on safe epoch,
// so we only need to include the table watermarks on safe epoch to reduce the size of metadata.
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ pub mod default {
}

pub fn partition_vnode_count() -> u32 {
16
64
}

pub fn table_write_throughput_threshold() -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ move_table_size_limit = 10737418240
split_group_size_limit = 68719476736
cut_table_size_limit = 1073741824
do_not_config_object_storage_lifecycle = false
partition_vnode_count = 16
partition_vnode_count = 64
table_write_throughput_threshold = 16777216
min_table_split_write_throughput = 4194304
compaction_task_max_heartbeat_interval_secs = 30
Expand Down
50 changes: 43 additions & 7 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
pub mod compaction_config;
mod overlap_strategy;
use risingwave_common::catalog::TableOption;
use risingwave_pb::hummock::compact_task::{self, TaskType};
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType};

mod picker;
pub mod selector;

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use picker::{LevelCompactionPicker, TierCompactionPicker};
use risingwave_hummock_sdk::{can_concat, CompactionGroupId, HummockCompactionTaskId};
use risingwave_hummock_sdk::{
can_concat, CompactionGroupId, HummockCompactionTaskId, HummockEpoch,
};
use risingwave_pb::hummock::compaction_config::CompactionMode;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactTask, CompactionConfig, LevelType};
use risingwave_pb::hummock::{CompactTask, CompactionConfig, KeyRange, LevelType};
pub use selector::CompactionSelector;

use self::selector::LocalSelectorStatistic;
Expand Down Expand Up @@ -104,18 +107,51 @@ impl CompactStatus {
stats: &mut LocalSelectorStatistic,
selector: &mut Box<dyn CompactionSelector>,
table_id_to_options: HashMap<u32, TableOption>,
) -> Option<CompactionTask> {
) -> Option<CompactTask> {
// When we compact the files, we must make the result of compaction meet the following
// conditions, for any user key, the epoch of it in the file existing in the lower
// layer must be larger.
selector.pick_compaction(
let ret = selector.pick_compaction(
task_id,
group,
levels,
&mut self.level_handlers,
stats,
table_id_to_options,
)
)?;
let target_level_id = ret.input.target_level;

let compression_algorithm = match ret.compression_algorithm.as_str() {
"Lz4" => 1,
"Zstd" => 2,
_ => 0,
};

let compact_task = CompactTask {
input_ssts: ret.input.input_levels,
splits: vec![KeyRange::inf()],
watermark: HummockEpoch::MAX,
sorted_output_ssts: vec![],
task_id,
target_level: target_level_id as u32,
// only gc delete keys in last level because there may be older version in more bottom
// level.
gc_delete_keys: target_level_id == self.level_handlers.len() - 1,
base_level: ret.base_level as u32,
task_status: TaskStatus::Pending as i32,
compaction_group_id: group.group_id,
existing_table_ids: vec![],
compression_algorithm,
target_file_size: ret.target_file_size,
compaction_filter_mask: 0,
table_options: BTreeMap::default(),
current_epoch_time: 0,
target_sub_level_id: ret.input.target_sub_level_id,
task_type: ret.compaction_task_type as i32,
table_vnode_partition: BTreeMap::default(),
..Default::default()
};
Some(compact_task)
}

pub fn is_trivial_move_task(task: &CompactTask) -> bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,19 @@ impl CompactionPicker for LevelCompactionPicker {
return None;
}

if let Some(mut ret) = self.pick_base_trivial_move(
if let Some(ret) = self.pick_base_trivial_move(
l0,
levels.get_level(self.target_level),
level_handlers,
stats,
) {
ret.vnode_partition_count = levels.vnode_partition_count;
return Some(ret);
}

debug_assert!(self.target_level == levels.get_level(self.target_level).level_idx as usize);
if let Some(ret) = self.pick_multi_level_to_base(
l0,
levels.get_level(self.target_level),
levels.vnode_partition_count,
level_handlers,
stats,
) {
Expand Down Expand Up @@ -130,7 +128,6 @@ impl LevelCompactionPicker {
&self,
l0: &OverlappingLevel,
target_level: &Level,
vnode_partition_count: u32,
level_handlers: &[LevelHandler],
stats: &mut LocalPickerStatistic,
) -> Option<CompactionInput> {
Expand All @@ -150,18 +147,8 @@ impl LevelCompactionPicker {
overlap_strategy.clone(),
);

let mut max_vnode_partition_idx = 0;
for (idx, level) in l0.sub_levels.iter().enumerate() {
if level.vnode_partition_count < vnode_partition_count {
break;
}
max_vnode_partition_idx = idx;
}

let l0_select_tables_vec = non_overlap_sub_level_picker.pick_l0_multi_non_overlap_level(
&l0.sub_levels[..=max_vnode_partition_idx],
&level_handlers[0],
);
let l0_select_tables_vec = non_overlap_sub_level_picker
.pick_l0_multi_non_overlap_level(&l0.sub_levels, &level_handlers[0]);
if l0_select_tables_vec.is_empty() {
stats.skip_by_pending_files += 1;
return None;
Expand Down Expand Up @@ -230,7 +217,6 @@ impl LevelCompactionPicker {
select_input_size: input.total_file_size,
target_input_size: target_file_size,
total_file_count: (input.total_file_count + target_file_count) as u64,
vnode_partition_count,
..Default::default()
};

Expand All @@ -239,15 +225,6 @@ impl LevelCompactionPicker {
ValidationRuleType::ToBase,
stats,
) {
if l0.total_file_size > target_level.total_file_size * 8 {
tracing::warn!("skip task with level count: {}, file count: {}, select size: {}, target size: {}, target level size: {}",
result.input_levels.len(),
result.total_file_count,
result.select_input_size,
result.target_input_size,
target_level.total_file_size,
);
}
continue;
}

Expand Down Expand Up @@ -446,7 +423,6 @@ pub mod tests {
total_file_size: 0,
sub_level_id: 0,
uncompressed_file_size: 0,
..Default::default()
}];
let mut levels = Levels {
levels,
Expand Down Expand Up @@ -511,7 +487,6 @@ pub mod tests {
total_file_size: 900,
sub_level_id: 0,
uncompressed_file_size: 900,
..Default::default()
}],
l0: Some(generate_l0_nonoverlapping_sublevels(vec![])),
..Default::default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl CompactionTaskValidationRule for TierCompactionTaskValidationRule {
stats.skip_by_count_limit += 1;
return false;
}

true
}
}
Expand Down
Loading

0 comments on commit a39a83b

Please sign in to comment.