Skip to content

Commit

Permalink
fix(storage): avoid preload all range-tombstone before compaction job…
Browse files Browse the repository at this point in the history
… start (#12792)

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Nov 13, 2023
1 parent b10238c commit 0ec7c6e
Show file tree
Hide file tree
Showing 12 changed files with 535 additions and 503 deletions.
8 changes: 5 additions & 3 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::FullKey;
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl};
use risingwave_pb::hummock::{compact_task, SstableInfo};
Expand All @@ -30,7 +31,8 @@ use risingwave_storage::hummock::compactor::{
ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress,
};
use risingwave_storage::hummock::iterator::{
ConcatIterator, Forward, HummockIterator, UnorderedMergeIteratorInner,
ConcatIterator, Forward, ForwardMergeRangeIterator, HummockIterator,
UnorderedMergeIteratorInner,
};
use risingwave_storage::hummock::multi_builder::{
CapacitySplitTableBuilder, LocalTableBuilderFactory,
Expand All @@ -39,7 +41,7 @@ use risingwave_storage::hummock::sstable::SstableIteratorReadOptions;
use risingwave_storage::hummock::sstable_store::SstableStoreRef;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CachePolicy, CompactionDeleteRanges, FileCache, SstableBuilder, SstableBuilderOptions,
CachePolicy, CompactionDeleteRangeIterator, FileCache, SstableBuilder, SstableBuilderOptions,
SstableIterator, SstableStore, SstableWriterOptions, Xor16FilterBuilder,
};
use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic};
Expand Down Expand Up @@ -196,7 +198,7 @@ async fn compact<I: HummockIterator<Direction = Forward>>(iter: I, sstable_store
};
compact_and_build_sst(
&mut builder,
Arc::new(CompactionDeleteRanges::default()),
CompactionDeleteRangeIterator::new(ForwardMergeRangeIterator::new(HummockEpoch::MAX)),
&task_config,
Arc::new(CompactorMetrics::unused()),
iter,
Expand Down
14 changes: 9 additions & 5 deletions src/storage/hummock_sdk/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,25 @@ pub fn estimate_memory_for_compact_task(
// The common size of SstableMeta in tests is no more than 1m (mainly from xor filters). Even
// though SstableMeta is used for a shorter period of time, it is safe to use 3m for the
// calculation.
// TODO: Note that this algorithm may fail when SstableMeta is occupied by a large number of
// range tombstones
const ESTIMATED_META_SIZE: u64 = 3 * 1048576;

// The memory usage of the SstableStreamIterator comes from SstableInfo with some state
// information (use ESTIMATED_META_SIZE to estimate it), the BlockStream being read (one block),
// and tcp recv_buffer_size.
let max_input_stream_estimated_memory = ESTIMATED_META_SIZE + block_size + recv_buffer_size;
let max_input_stream_estimated_memory = block_size + recv_buffer_size;

// input
for level in &task.input_ssts {
if level.level_type() == LevelType::Nonoverlapping {
result += max_input_stream_estimated_memory;
let mut meta_size = 0;
for sst in &level.table_infos {
meta_size = std::cmp::max(meta_size, sst.file_size - sst.meta_offset);
}
result += max_input_stream_estimated_memory + meta_size;
} else {
result += max_input_stream_estimated_memory * level.table_infos.len() as u64;
for sst in &level.table_infos {
result += max_input_stream_estimated_memory + sst.file_size - sst.meta_offset;
}
}
}

Expand Down
9 changes: 3 additions & 6 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ pub(crate) mod tests {
use risingwave_storage::hummock::test_utils::gen_test_sstable_info;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CachePolicy, CompactionDeleteRanges, CompressionAlgorithm,
HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter,
SharedComapctorObjectIdManager, Sstable, SstableBuilderOptions, SstableIteratorReadOptions,
SstableObjectIdManager,
CachePolicy, CompressionAlgorithm, HummockStorage as GlobalHummockStorage, HummockStorage,
MemoryLimiter, SharedComapctorObjectIdManager, Sstable, SstableBuilderOptions,
SstableIteratorReadOptions, SstableObjectIdManager,
};
use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic};
use risingwave_storage::opts::StorageOpts;
Expand Down Expand Up @@ -1421,7 +1420,6 @@ pub(crate) mod tests {
gc_delete_keys: true,
..Default::default()
};
let deg = Arc::new(CompactionDeleteRanges::default());
let multi_filter_key_extractor =
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor));
let compaction_filter = DummyCompactionFilter {};
Expand All @@ -1446,7 +1444,6 @@ pub(crate) mod tests {
.run(
compaction_filter,
multi_filter_key_extractor,
deg,
Arc::new(TaskProgress::default()),
)
.await
Expand Down
Loading

0 comments on commit 0ec7c6e

Please sign in to comment.