Skip to content

Commit

Permalink
fix(compaction): fast compact may not cut sst if not meet point key (#…
Browse files Browse the repository at this point in the history
…13690) (#13755)

Signed-off-by: Little-Wallace <[email protected]>
Co-authored-by: Wallace <[email protected]>
  • Loading branch information
github-actions[bot] and Little-Wallace authored Dec 1, 2023
1 parent b7cd67e commit f35a724
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 36 deletions.
85 changes: 63 additions & 22 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1358,12 +1358,7 @@ pub(crate) mod tests {

type KeyValue = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>);

async fn test_fast_compact_impl(
data1: Vec<KeyValue>,
data2: Vec<KeyValue>,
data3: Vec<KeyValue>,
data4: Vec<KeyValue>,
) {
async fn test_fast_compact_impl(data: Vec<Vec<KeyValue>>) {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;
let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
Expand All @@ -1383,31 +1378,41 @@ pub(crate) mod tests {

let sstable_store = compact_ctx.sstable_store.clone();
let capacity = 256 * 1024;
let mut options = SstableBuilderOptions {
let options = SstableBuilderOptions {
capacity,
block_capacity: 2048,
restart_interval: 16,
bloom_false_positive: 0.1,
compression_algorithm: CompressionAlgorithm::Lz4,
..Default::default()
};
let sst1 = gen_test_sstable_info(options.clone(), 1, data1, sstable_store.clone()).await;
let sst2 = gen_test_sstable_info(options.clone(), 2, data2, sstable_store.clone()).await;
options.compression_algorithm = CompressionAlgorithm::Lz4;
let sst3 = gen_test_sstable_info(options.clone(), 3, data3, sstable_store.clone()).await;
let sst4 = gen_test_sstable_info(options, 4, data4, sstable_store.clone()).await;
let capacity = options.capacity as u64;
let mut ssts = vec![];
for (idx, sst_input) in data.into_iter().enumerate() {
let sst = gen_test_sstable_info(
options.clone(),
(idx + 1) as u64,
sst_input,
sstable_store.clone(),
)
.await;
println!("generate ssts size: {}", sst.file_size);
ssts.push(sst);
}
let read_options = Arc::new(SstableIteratorReadOptions::default());
let select_file_count = ssts.len() / 2;

let task = CompactTask {
input_ssts: vec![
InputLevel {
level_idx: 5,
level_type: 1,
table_infos: vec![sst1, sst2],
table_infos: ssts.drain(..select_file_count).collect_vec(),
},
InputLevel {
level_idx: 6,
level_type: 1,
table_infos: vec![sst3, sst4],
table_infos: ssts,
},
],
existing_table_ids: vec![1],
Expand All @@ -1416,7 +1421,7 @@ pub(crate) mod tests {
splits: vec![KeyRange::inf()],
target_level: 6,
base_level: 4,
target_file_size: capacity as u64,
target_file_size: capacity,
compression_algorithm: 1,
gc_delete_keys: true,
..Default::default()
Expand All @@ -1429,15 +1434,15 @@ pub(crate) mod tests {
compact_ctx.clone(),
task.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([5, 6, 7, 8, 9]),
VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]),
)),
);
let fast_compact_runner = FastCompactorRunner::new(
compact_ctx.clone(),
task.clone(),
multi_filter_key_extractor.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([10, 11, 12, 13, 14]),
VecDeque::from_iter([22, 23, 24, 25, 26, 27, 28, 29]),
)),
Arc::new(TaskProgress::default()),
);
Expand Down Expand Up @@ -1475,9 +1480,10 @@ pub(crate) mod tests {
.unwrap(),
);
}
assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5));
println!(
"fast sstables {}.file size={}",
fast_ret[0].object_id, fast_ret[0].file_size,
"fast sstables file size: {:?}",
fast_ret.iter().map(|f| f.file_size).collect_vec(),
);
assert!(can_concat(&ret));
assert!(can_concat(&fast_ret));
Expand Down Expand Up @@ -1554,7 +1560,7 @@ pub(crate) mod tests {
for _ in 0..KEY_COUNT {
let rand_v = rng.next_u32() % 100;
let (k, epoch) = if rand_v == 0 {
(last_k + 3000, 400)
(last_k + 2000, 400)
} else if rand_v < 5 {
(last_k, last_epoch - 1)
} else {
Expand Down Expand Up @@ -1582,7 +1588,7 @@ pub(crate) mod tests {
let max_epoch = std::cmp::min(300, last_epoch - 1);
last_epoch = max_epoch;

for _ in 0..KEY_COUNT * 2 {
for _ in 0..KEY_COUNT * 4 {
let rand_v = rng.next_u32() % 100;
let (k, epoch) = if rand_v == 0 {
(last_k + 1000, max_epoch)
Expand All @@ -1602,6 +1608,41 @@ pub(crate) mod tests {
last_epoch = epoch;
}
let data4 = data;
test_fast_compact_impl(data1, data2, data3, data4).await;
test_fast_compact_impl(vec![data1, data2, data3, data4]).await;
}

#[tokio::test]
async fn test_fast_compact_cut_file() {
const KEY_COUNT: usize = 20000;
let mut rng = rand::rngs::StdRng::seed_from_u64(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
let mut data1 = Vec::with_capacity(KEY_COUNT / 2);
for start_idx in 0..3 {
let base = start_idx * KEY_COUNT;
for k in 0..KEY_COUNT / 3 {
let key = (k + base).to_be_bytes().to_vec();
let key = FullKey::new(TableId::new(1), TableKey(key), 400);
let rand_v = rng.next_u32() % 10;
let v = if rand_v == 1 {
HummockValue::delete()
} else {
HummockValue::put(format!("sst1-{}", 400).into_bytes())
};
data1.push((key, v));
}
}

let mut data2 = Vec::with_capacity(KEY_COUNT);
for k in 0..KEY_COUNT * 4 {
let key = k.to_be_bytes().to_vec();
let key = FullKey::new(TableId::new(1), TableKey(key), 300);
let v = HummockValue::put(format!("sst2-{}", 300).into_bytes());
data2.push((key, v));
}
test_fast_compact_impl(vec![data1, data2]).await;
}
}
14 changes: 13 additions & 1 deletion src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ use crate::hummock::{
SstableDeleteRangeIterator, SstableStoreRef,
};
use crate::monitor::{CompactorMetrics, StoreLocalStatistic};

const FAST_COMPACT_MAX_COMPACT_SIZE: u64 = 2 * 1024 * 1024 * 1024; // 2GB
const FAST_COMPACT_MAX_DELETE_RATIO: u64 = 40; // 40%
pub struct CompactorRunner {
compact_task: CompactTask,
compactor: Compactor,
Expand Down Expand Up @@ -369,13 +370,24 @@ pub async fn compact(
let all_ssts_are_blocked_filter = sstable_infos
.iter()
.all(|table_info| table_info.bloom_filter_kind() == BloomFilterType::Blocked);

let delete_key_count = sstable_infos
.iter()
.map(|table_info| table_info.stale_key_count)
.sum::<u64>();
let total_key_count = sstable_infos
.iter()
.map(|table_info| table_info.total_key_count)
.sum::<u64>();
let optimize_by_copy_block = context.storage_opts.enable_fast_compaction
&& all_ssts_are_blocked_filter
&& !has_tombstone
&& !has_ttl
&& single_table
&& compact_task.target_level > 0
&& compact_task.input_ssts.len() == 2
&& compaction_size < FAST_COMPACT_MAX_COMPACT_SIZE
&& delete_key_count * 100 < FAST_COMPACT_MAX_DELETE_RATIO * total_key_count
&& compact_task.task_type() == TaskType::Dynamic;
if !optimize_by_copy_block {
match generate_splits(&sstable_infos, compaction_size, context.clone()).await {
Expand Down
39 changes: 26 additions & 13 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ impl BlockStreamIterator {
}
}

pub fn is_first_block(&self) -> bool {
self.next_block_index == 0
}

/// Wrapper function for `self.block_stream.next()` which allows us to measure the time needed.
async fn download_next_block(&mut self) -> HummockResult<Option<(Bytes, Vec<u8>, BlockMeta)>> {
let (data, meta) = match self.block_stream.next().await? {
Expand Down Expand Up @@ -374,7 +378,7 @@ impl CompactorRunner {
assert!(ret != Ordering::Equal);
if first.current_sstable().iter.is_none() {
let right_key = second.current_sstable().key();
while first.current_sstable().is_valid() {
while first.current_sstable().is_valid() && !self.executor.builder.need_flush() {
let full_key = FullKey::decode(first.current_sstable().next_block_largest());
// the full key may be either Excluded key or Included key, so we do not allow
// they equals.
Expand Down Expand Up @@ -464,19 +468,28 @@ impl CompactorRunner {
let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
let (block, filter_data, block_meta) =
sstable_iter.download_next_block().await?.unwrap();
let largest_key = sstable_iter.current_block_largest();
let block_len = block.len() as u64;
let block_key_count = block_meta.total_key_count;
if self
.executor
.builder
.add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
.await?
{
skip_raw_block_count += 1;
skip_raw_block_size += block_len;
if self.executor.builder.need_flush() {
let largest_key = sstable_iter.sstable.value().meta.largest_key.clone();
let target_key = FullKey::decode(&largest_key);
sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
let mut iter = sstable_iter.iter.take().unwrap();
self.executor.run(&mut iter, target_key).await?;
} else {
let largest_key = sstable_iter.current_block_largest();
let block_len = block.len() as u64;
let block_key_count = block_meta.total_key_count;
if self
.executor
.builder
.add_raw_block(block, filter_data, smallest_key, largest_key, block_meta)
.await?
{
skip_raw_block_count += 1;
skip_raw_block_size += block_len;
}
self.executor.may_report_process_key(block_key_count);
self.executor.clear();
}
self.executor.may_report_process_key(block_key_count);
}
rest_data.next_sstable().await?;
}
Expand Down
7 changes: 7 additions & 0 deletions src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ where
(switch_builder, vnode_changed)
}

pub fn need_flush(&self) -> bool {
self.current_builder
.as_ref()
.map(|builder| builder.reach_capacity())
.unwrap_or(false)
}

/// Add kv pair to sstable.
pub async fn add_monotonic_delete(&mut self, event: MonotonicDeleteEvent) -> HummockResult<()> {
if let Some(builder) = self.current_builder.as_mut()
Expand Down

0 comments on commit f35a724

Please sign in to comment.