Skip to content

Commit

Permalink
chore(storage): adjust compaction task priority and fix memory estima…
Browse files Browse the repository at this point in the history
…tion (#13914)
  • Loading branch information
Li0k committed Feb 21, 2024
1 parent d07384e commit 06fb7be
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 59 deletions.
11 changes: 6 additions & 5 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -942,12 +942,13 @@ impl HummockManager {
.await?;

tracing::debug!(
"TrivialMove for compaction group {}: pick up {} sstables in level {} to compact to target_level {} cost time: {:?}",
"TrivialMove for compaction group {}: pick up {} sstables in level {} to compact to target_level {} cost time: {:?} input {:?}",
compaction_group_id,
compact_task.input_ssts[0].table_infos.len(),
compact_task.input_ssts[0].level_idx,
compact_task.target_level,
start_time.elapsed()
start_time.elapsed(),
compact_task.input_ssts
);
} else {
compact_task.table_options = table_id_to_option
Expand Down Expand Up @@ -3046,14 +3047,14 @@ impl CompactionState {

pub fn auto_pick_type(&self, group: CompactionGroupId) -> Option<TaskType> {
let guard = self.scheduled.lock();
if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
Some(compact_task::TaskType::Dynamic)
} else if guard.contains(&(group, compact_task::TaskType::SpaceReclaim)) {
Some(compact_task::TaskType::SpaceReclaim)
} else if guard.contains(&(group, compact_task::TaskType::Ttl)) {
Some(compact_task::TaskType::Ttl)
} else if guard.contains(&(group, compact_task::TaskType::Tombstone)) {
Some(compact_task::TaskType::Tombstone)
} else if guard.contains(&(group, compact_task::TaskType::Dynamic)) {
Some(compact_task::TaskType::Dynamic)
} else {
None
}
Expand Down
96 changes: 57 additions & 39 deletions src/storage/hummock_sdk/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,25 @@ pub fn compact_task_to_string(compact_task: &CompactTask) -> String {
let tables: Vec<String> = level_entry
.table_infos
.iter()
.map(|table| format!("[id: {}, {}KB]", table.get_sst_id(), table.file_size / 1024))
.map(|table| {
if table.total_key_count != 0 {
format!(
"[id: {}, obj_id: {} {}KB stale_ratio {} delete_range_ratio {}]",
table.get_sst_id(),
table.object_id,
table.file_size / 1024,
(table.stale_key_count * 100 / table.total_key_count),
(table.range_tombstone_count * 100 / table.total_key_count),
)
} else {
format!(
"[id: {}, obj_id: {} {}KB]",
table.get_sst_id(),
table.object_id,
table.file_size / 1024,
)
}
})
.collect();
writeln!(s, "Level {:?} {:?} ", level_entry.level_idx, tables).unwrap();
}
Expand All @@ -78,33 +96,27 @@ pub fn append_sstable_info_to_string(s: &mut String, sstable_info: &SstableInfo)
hex::encode(key_range.right.as_slice())
};

if sstable_info.stale_key_count > 0 {
let ratio = sstable_info.stale_key_count * 100 / sstable_info.total_key_count;
writeln!(
s,
"SstableInfo: object id={:?}, SST id={:?}, KeyRange=[{:?},{:?}], table_ids: {:?}, size={:?}KB, delete_ratio={:?}%",
sstable_info.get_object_id(),
sstable_info.get_sst_id(),
left_str,
right_str,
sstable_info.table_ids,
sstable_info.file_size / 1024,
ratio,
)
.unwrap();
} else {
writeln!(
s,
"SstableInfo: object id={:?}, SST id={:?}, KeyRange=[{:?},{:?}], table_ids: {:?}, size={:?}KB",
sstable_info.get_object_id(),
sstable_info.get_sst_id(),
left_str,
right_str,
sstable_info.table_ids,
sstable_info.file_size / 1024,
)
.unwrap();
}
let stale_ratio = (sstable_info.stale_key_count * 100)
.checked_div(sstable_info.total_key_count)
.unwrap_or(0);
let range_tombstone_ratio = (sstable_info.range_tombstone_count * 100)
.checked_div(sstable_info.total_key_count)
.unwrap_or(0);
writeln!(
s,
"SstableInfo: object id={}, SST id={}, KeyRange=[{:?},{:?}], table_ids: {:?}, size={}KB, stale_ratio={}%, range_tombstone_count={} range_tombstone_ratio={}% bloom_filter_kind {:?}",
sstable_info.get_object_id(),
sstable_info.get_sst_id(),
left_str,
right_str,
sstable_info.table_ids,
sstable_info.file_size / 1024,
stale_ratio,
sstable_info.range_tombstone_count,
range_tombstone_ratio,
sstable_info.bloom_filter_kind,
)
.unwrap();
}

pub fn statistics_compact_task(task: &CompactTask) -> CompactTaskStatistics {
Expand Down Expand Up @@ -150,33 +162,39 @@ pub fn estimate_memory_for_compact_task(
// When building the SstableStreamIterator, sstable_syncable will fetch the SstableMeta and seek
// to the specified block and build the iterator. Since this operation is concurrent, the memory
// usage will need to take into account the size of the SstableMeta.
// The common size of SstableMeta in tests is no more than 1m (mainly from xor filters). Even
// though SstableMeta is used for a shorter period of time, it is safe to use 3m for the
// calculation.
// TODO: Note that this algorithm may fail when SstableMeta is occupied by a large number of
// range tombstones
const ESTIMATED_META_SIZE: u64 = 3 * 1048576;
// The common size of SstableMeta in tests is no more than 1m (mainly from xor filters).
let mut max_meta_ratio = 0;

// The memory usage of the SstableStreamIterator comes from SstableInfo with some state
// information (use ESTIMATED_META_SIZE to estimate it), the BlockStream being read (one block),
// and tcp recv_buffer_size.
let max_input_stream_estimated_memory = ESTIMATED_META_SIZE + block_size + recv_buffer_size;
let max_input_stream_estimated_memory = block_size + recv_buffer_size;

// input
for level in &task.input_ssts {
if level.level_type() == LevelType::Nonoverlapping {
result += max_input_stream_estimated_memory;
let mut meta_size = 0;
for sst in &level.table_infos {
meta_size = std::cmp::max(meta_size, sst.file_size - sst.meta_offset);
max_meta_ratio = std::cmp::max(max_meta_ratio, meta_size * 100 / sst.file_size);
}
result += max_input_stream_estimated_memory + meta_size;
} else {
result += max_input_stream_estimated_memory * level.table_infos.len() as u64;
for sst in &level.table_infos {
let meta_size = sst.file_size - sst.meta_offset;
result += max_input_stream_estimated_memory + meta_size;
max_meta_ratio = std::cmp::max(max_meta_ratio, meta_size * 100 / sst.file_size);
}
}
}

// output
// builder will maintain SstableInfo + block_builder(block) + writer (block to vec)
let estimated_meta_size = sst_capacity * max_meta_ratio / 100;
if support_streaming_upload {
result += ESTIMATED_META_SIZE + 2 * block_size
result += estimated_meta_size + 2 * block_size
} else {
result += ESTIMATED_META_SIZE + sst_capacity; // Use sst_capacity to avoid BatchUploader
result += estimated_meta_size + sst_capacity; // Use sst_capacity to avoid BatchUploader
// memory bursts.
}

Expand Down
5 changes: 3 additions & 2 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ pub async fn compact(
) * compact_task.splits.len() as u64;

tracing::info!(
"Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {}",
"Ready to handle compaction group {} task: {} compact_task_statistics {:?} target_level {} compression_algorithm {:?} table_ids {:?} parallelism {} task_memory_capacity_with_parallelism {}, enable fast runner: {} input: {:?}",
compact_task.compaction_group_id,
compact_task.task_id,
compact_task_statistics,
Expand All @@ -443,7 +443,8 @@ pub async fn compact(
compact_task.existing_table_ids,
parallelism,
task_memory_capacity_with_parallelism,
optimize_by_copy_block
optimize_by_copy_block,
compact_task_to_string(&compact_task),
);

// If the task does not have enough memory, it should cancel the task and let the meta
Expand Down
13 changes: 0 additions & 13 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,19 +212,6 @@ impl ConcatSstableIterator {
self.sstable_iter.as_mut().unwrap()
}

pub fn estimate_key_count(&self, uncompressed_block_size: u64) -> (u64, u64) {
let total_size = self.sstables[self.cur_idx].uncompressed_file_size;
if total_size == 0 {
return (0, 0);
}
// use ratio to avoid multiply overflow
let ratio = uncompressed_block_size * 10000 / total_size;
(
self.sstables[self.cur_idx].stale_key_count * ratio / 10000,
self.sstables[self.cur_idx].total_key_count * ratio / 10000,
)
}

pub async fn init_block_iter(&mut self) -> HummockResult<()> {
if let Some(sstable) = self.sstable_iter.as_mut() {
if sstable.iter.is_some() {
Expand Down

0 comments on commit 06fb7be

Please sign in to comment.