Skip to content

Commit

Permalink
fix(storage): remove directly convert of tier compaction picker (#12280)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Sep 14, 2023
1 parent f25c625 commit c7fb909
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 82 deletions.
2 changes: 1 addition & 1 deletion src/meta/src/hummock/compaction/picker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::hummock::level_handler::LevelHandler;

pub const MAX_COMPACT_LEVEL_COUNT: usize = 42;

#[derive(Default)]
#[derive(Default, Debug)]
pub struct LocalPickerStatistic {
pub skip_by_write_amp_limit: u64,
pub skip_by_count_limit: u64,
Expand Down
34 changes: 3 additions & 31 deletions src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

use std::sync::Arc;

use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{CompactionConfig, InputLevel, LevelType, OverlappingLevel};

Expand Down Expand Up @@ -69,33 +67,11 @@ impl TierCompactionPicker {
continue;
}

let mut input_level = InputLevel {
let input_level = InputLevel {
level_idx: 0,
level_type: level.level_type,
table_infos: level.table_infos.clone(),
};
// Since the level is overlapping, we can change the order of origin sstable infos in
// task.
input_level.table_infos.sort_by(|sst1, sst2| {
let a = sst1.key_range.as_ref().unwrap();
let b = sst2.key_range.as_ref().unwrap();
a.compare(b)
});

if can_concat(&input_level.table_infos) {
return Some(CompactionInput {
select_input_size: input_level
.table_infos
.iter()
.map(|sst| sst.file_size)
.sum(),
total_file_count: input_level.table_infos.len() as u64,
input_levels: vec![input_level],
target_level: 0,
target_sub_level_id: level.sub_level_id,
..Default::default()
});
}

let mut select_level_inputs = vec![input_level];

Expand Down Expand Up @@ -182,7 +158,6 @@ impl CompactionPicker for TierCompactionPicker {
pub mod tests {
use std::sync::Arc;

use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::new_sub_level;
use risingwave_pb::hummock::hummock_version::Levels;
use risingwave_pb::hummock::{LevelType, OverlappingLevel};
Expand Down Expand Up @@ -281,11 +256,8 @@ pub mod tests {
// sub-level 0 is excluded because it's nonoverlapping and violating
// sub_level_max_compaction_bytes.
let mut picker = TierCompactionPicker::new(config);
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
assert_eq!(ret.input_levels.len(), 1);
assert!(can_concat(&ret.input_levels[0].table_infos));
let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats);
assert!(ret.is_none())
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ impl MetaMetrics {
let opts = histogram_opts!(
"storage_compact_task_size",
"Total size of compact that have been issued to state store",
exponential_buckets(4096.0, 1.6, 28).unwrap()
exponential_buckets(1048576.0, 2.0, 16).unwrap()
);

let compact_task_size =
Expand Down
85 changes: 36 additions & 49 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,67 +422,54 @@ pub(crate) mod tests {
.await;

// 2. get compact task
let mut compact_task = hummock_manager_ref

// 3. compact
while let Some(compact_task) = hummock_manager_ref
.get_compact_task(
StaticCompactionGroupId::StateDefault.into(),
&mut default_level_selector(),
)
.await
.unwrap()
.unwrap();
let compaction_filter_flag = CompactionFilterFlag::NONE;
compact_task.compaction_filter_mask = compaction_filter_flag.bits();
compact_task.current_epoch_time = 0;

// assert compact_task
assert_eq!(
compact_task
.input_ssts
.iter()
.map(|level| level.table_infos.len())
.sum::<usize>(),
SST_COUNT as usize / 2 + 1,
);
compact_task.target_level = 6;

// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
compact_ctx,
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;
{
// 3. compact
let (_tx, rx) = tokio::sync::oneshot::channel();
let (mut result_task, task_stats) = compact(
compact_ctx.clone(),
compact_task.clone(),
rx,
Box::new(sstable_object_id_manager.clone()),
)
.await;

hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
.await
.unwrap();
hummock_manager_ref
.report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats)))
.await
.unwrap();
}

// 4. get the latest version and check
let version = hummock_manager_ref.get_current_version().await;
let output_table = version
let output_tables = version
.get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into())
.levels
.last()
.unwrap()
.table_infos
.first()
.unwrap();
let table = storage
.sstable_store()
.sstable(output_table, &mut StoreLocalStatistic::default())
.await
.unwrap();
let target_table_size = storage.storage_opts().sstable_size_mb * (1 << 20);

assert!(
table.value().meta.estimated_size > target_table_size,
"table.meta.estimated_size {} <= target_table_size {}",
table.value().meta.estimated_size,
target_table_size
);
.iter()
.flat_map(|level| level.table_infos.clone())
.collect_vec();
for output_table in &output_tables {
let table = storage
.sstable_store()
.sstable(output_table, &mut StoreLocalStatistic::default())
.await
.unwrap();
let target_table_size = storage.storage_opts().sstable_size_mb * (1 << 20);
assert!(
table.value().meta.estimated_size > target_table_size,
"table.meta.estimated_size {} <= target_table_size {}",
table.value().meta.estimated_size,
target_table_size
);
}

// 5. storage get back the correct kv after compaction
storage.wait_version(version).await;
Expand Down

0 comments on commit c7fb909

Please sign in to comment.