diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index be0ebe204d745..2caacd6912a2c 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -44,7 +44,7 @@ pub(crate) mod tests { }; use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient}; use risingwave_pb::common::{HostAddress, WorkerType}; - use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, TableOption}; + use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, SstableInfo, TableOption}; use risingwave_pb::meta::add_worker_node_request::Property; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::filter_key_extractor::{ @@ -62,9 +62,10 @@ pub(crate) mod tests { use risingwave_storage::hummock::test_utils::gen_test_sstable_info; use risingwave_storage::hummock::value::HummockValue; use risingwave_storage::hummock::{ - CachePolicy, CompressionAlgorithm, HummockStorage as GlobalHummockStorage, HummockStorage, - MemoryLimiter, SharedComapctorObjectIdManager, Sstable, SstableBuilderOptions, - SstableIteratorReadOptions, SstableObjectIdManager, + BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, FilterBuilder, + HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter, + SharedComapctorObjectIdManager, Sstable, SstableBuilder, SstableBuilderOptions, + SstableIteratorReadOptions, SstableObjectIdManager, SstableWriterOptions, }; use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic}; use risingwave_storage::opts::StorageOpts; @@ -1357,6 +1358,82 @@ pub(crate) mod tests { } type KeyValue = (FullKey>, HummockValue>); + async fn check_compaction_result( + sstable_store: SstableStoreRef, + ret: Vec, + fast_ret: Vec, + capacity: u64, + ) { + let mut fast_tables = Vec::with_capacity(fast_ret.len()); + let mut normal_tables = Vec::with_capacity(ret.len()); + let mut stats = StoreLocalStatistic::default(); + for sst_info in &fast_ret { + fast_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap()); + } + + for sst_info in &ret { + normal_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap()); + } + assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5)); + println!( + "fast sstables file size: {:?}", + fast_ret.iter().map(|f| f.file_size).collect_vec(), + ); + assert!(can_concat(&ret)); + assert!(can_concat(&fast_ret)); + let read_options = Arc::new(SstableIteratorReadOptions::default()); + + let mut normal_iter = UserIterator::for_test( + ConcatIterator::new(ret, sstable_store.clone(), read_options.clone()), + (Bound::Unbounded, Bound::Unbounded), + ); + let mut fast_iter = UserIterator::for_test( + ConcatIterator::new(fast_ret, sstable_store.clone(), read_options.clone()), + (Bound::Unbounded, Bound::Unbounded), + ); + + normal_iter.rewind().await.unwrap(); + fast_iter.rewind().await.unwrap(); + let mut count = 0; + while normal_iter.is_valid() { + assert_eq!( + normal_iter.key(), + fast_iter.key(), + "not equal in {}, len: {} {} vs {}", + count, + normal_iter.key().user_key.table_key.as_ref().len(), + u64::from_be_bytes( + normal_iter.key().user_key.table_key.as_ref()[0..8] + .try_into() + .unwrap() + ), + u64::from_be_bytes( + fast_iter.key().user_key.table_key.as_ref()[0..8] + .try_into() + .unwrap() + ), + ); + let hash = Sstable::hash_for_bloom_filter( + fast_iter.key().user_key.encode().as_slice(), + fast_iter.key().user_key.table_id.table_id, + ); + assert_eq!(normal_iter.value(), fast_iter.value()); + let key_ref = fast_iter.key().user_key.as_ref(); + assert!(normal_tables.iter().any(|table| { + table + .value() + .may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash) + })); + assert!(fast_tables.iter().any(|table| { + table + .value() + .may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash) + })); + normal_iter.next().await.unwrap(); + fast_iter.next().await.unwrap(); + count += 1; + } + } async fn test_fast_compact_impl(data: Vec>) { let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = @@ -1399,7 +1476,6 @@ pub(crate) mod tests { println!("generate ssts size: {}", sst.file_size); ssts.push(sst); } - let read_options = Arc::new(SstableIteratorReadOptions::default()); let select_file_count = ssts.len() / 2; let task = CompactTask { @@ -1457,91 +1533,7 @@ pub(crate) mod tests { let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec(); let (ssts, _) = fast_compact_runner.run().await.unwrap(); let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec(); - println!("ssts: {} vs {}", fast_ret.len(), ret.len()); - let mut fast_tables = Vec::with_capacity(fast_ret.len()); - let mut normal_tables = Vec::with_capacity(ret.len()); - let mut stats = StoreLocalStatistic::default(); - for sst_info in &fast_ret { - fast_tables.push( - compact_ctx - .sstable_store - .sstable(sst_info, &mut stats) - .await - .unwrap(), - ); - } - - for sst_info in &ret { - normal_tables.push( - compact_ctx - .sstable_store - .sstable(sst_info, &mut stats) - .await - .unwrap(), - ); - } - assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5)); - println!( - "fast sstables file size: {:?}", - fast_ret.iter().map(|f| f.file_size).collect_vec(), - ); - assert!(can_concat(&ret)); - assert!(can_concat(&fast_ret)); - - let mut normal_iter = UserIterator::for_test( - ConcatIterator::new(ret, compact_ctx.sstable_store.clone(), read_options.clone()), - (Bound::Unbounded, Bound::Unbounded), - ); - let mut fast_iter = UserIterator::for_test( - ConcatIterator::new( - fast_ret, - compact_ctx.sstable_store.clone(), - read_options.clone(), - ), - (Bound::Unbounded, Bound::Unbounded), - ); - - normal_iter.rewind().await.unwrap(); - fast_iter.rewind().await.unwrap(); - let mut count = 0; - while normal_iter.is_valid() { - assert_eq!( - normal_iter.key(), - fast_iter.key(), - "not equal in {}, len: {} {} vs {}", - count, - normal_iter.key().user_key.table_key.as_ref().len(), - u64::from_be_bytes( - normal_iter.key().user_key.table_key.as_ref()[0..8] - .try_into() - .unwrap() - ), - u64::from_be_bytes( - fast_iter.key().user_key.table_key.as_ref()[0..8] - .try_into() - .unwrap() - ), - ); - let hash = Sstable::hash_for_bloom_filter( - fast_iter.key().user_key.encode().as_slice(), - fast_iter.key().user_key.table_id.table_id, - ); - assert_eq!(normal_iter.value(), fast_iter.value()); - let key_ref = fast_iter.key().user_key.as_ref(); - assert!(normal_tables.iter().any(|table| { - table - .value() - .may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash) - })); - assert!(fast_tables.iter().any(|table| { - table - .value() - .may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash) - })); - normal_iter.next().await.unwrap(); - fast_iter.next().await.unwrap(); - count += 1; - } + check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, capacity).await; } #[tokio::test] @@ -1645,4 +1637,151 @@ pub(crate) mod tests { } test_fast_compact_impl(vec![data1, data2]).await; } + + #[tokio::test] + async fn test_tombstone_recycle() { + let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = + setup_compute_env(8080).await; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager_ref.clone(), + worker_node.id, + )); + let existing_table_id: u32 = 1; + let storage = get_hummock_storage( + hummock_meta_client.clone(), + get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()), + &hummock_manager_ref, + TableId::from(existing_table_id), + ) + .await; + hummock_manager_ref.get_new_sst_ids(10).await.unwrap(); + let (compact_ctx, _) = prepare_compactor_and_filter(&storage, existing_table_id); + + let sstable_store = compact_ctx.sstable_store.clone(); + let capacity = 256 * 1024; + let opts = SstableBuilderOptions { + capacity, + block_capacity: 2048, + restart_interval: 16, + bloom_false_positive: 0.1, + compression_algorithm: CompressionAlgorithm::Lz4, + ..Default::default() + }; + + const KEY_COUNT: usize = 20000; + let mut rng = rand::rngs::StdRng::seed_from_u64( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + ); + let mut sst_infos = vec![]; + let mut max_sst_file_size = 0; + + for object_id in 1..3 { + let mut builder = SstableBuilder::<_, BlockedXor16FilterBuilder>::new( + object_id, + sstable_store + .clone() + .create_sst_writer(object_id, SstableWriterOptions::default()), + BlockedXor16FilterBuilder::create(opts.bloom_false_positive, opts.capacity / 16), + opts.clone(), + Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)), + None, + ); + let mut last_k: u64 = 1; + let init_epoch = 100 * object_id; + let mut last_epoch = init_epoch; + for idx in 0..KEY_COUNT { + let rand_v = rng.next_u32() % 10; + let (k, epoch) = if rand_v == 0 { + (last_k + 1000 * object_id, init_epoch) + } else if rand_v < 5 { + (last_k, last_epoch - 1) + } else { + (last_k + 1, init_epoch) + }; + let key = k.to_be_bytes().to_vec(); + let key = FullKey::new(TableId::new(1), TableKey(key.as_slice()), epoch); + let rand_v = rng.next_u32() % 10; + let v = if (5..7).contains(&rand_v) { + HummockValue::delete() + } else { + HummockValue::put(format!("{}-{}", idx, epoch).into_bytes()) + }; + if rand_v < 5 && builder.current_block_size() > opts.block_capacity / 2 { + // cut block when the key is same with the last key. + builder.build_block().await.unwrap(); + } + builder.add(key, v.as_slice()).await.unwrap(); + last_k = k; + last_epoch = epoch; + } + + let output = builder.finish().await.unwrap(); + output.writer_output.await.unwrap().unwrap(); + let sst_info = output.sst_info.sst_info; + max_sst_file_size = std::cmp::max(max_sst_file_size, sst_info.file_size); + sst_infos.push(sst_info); + } + + let target_file_size = max_sst_file_size / 4; + + let task = CompactTask { + input_ssts: vec![ + InputLevel { + level_idx: 5, + level_type: 1, + table_infos: sst_infos.drain(..1).collect_vec(), + }, + InputLevel { + level_idx: 6, + level_type: 1, + table_infos: sst_infos, + }, + ], + existing_table_ids: vec![1], + task_id: 1, + watermark: 1000, + splits: vec![KeyRange::inf()], + target_level: 6, + base_level: 4, + target_file_size, + compression_algorithm: 1, + gc_delete_keys: true, + ..Default::default() + }; + let multi_filter_key_extractor = + Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)); + let compaction_filter = DummyCompactionFilter {}; + let slow_compact_runner = CompactorRunner::new( + 0, + compact_ctx.clone(), + task.clone(), + Box::new(SharedComapctorObjectIdManager::for_test( + VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]), + )), + ); + let fast_compact_runner = FastCompactorRunner::new( + compact_ctx.clone(), + task.clone(), + multi_filter_key_extractor.clone(), + Box::new(SharedComapctorObjectIdManager::for_test( + VecDeque::from_iter([22, 23, 24, 25, 26, 27, 28, 29]), + )), + Arc::new(TaskProgress::default()), + ); + let (_, ret1, _) = slow_compact_runner + .run( + compaction_filter, + multi_filter_key_extractor, + Arc::new(TaskProgress::default()), + ) + .await + .unwrap(); + let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec(); + let (ssts, _) = fast_compact_runner.run().await.unwrap(); + let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec(); + check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, target_file_size).await; + } } diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index ff5b6f0e6e66e..205e222617fb3 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -466,7 +466,10 @@ impl CompactorRunner { let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec(); let (block, filter_data, block_meta) = sstable_iter.download_next_block().await?.unwrap(); - if self.executor.builder.need_flush() { + // If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly. + let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key) + && self.executor.last_key_is_delete; + if self.executor.builder.need_flush() || need_deleted { let largest_key = sstable_iter.sstable.value().meta.largest_key.clone(); let target_key = FullKey::decode(&largest_key); sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?; diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index bc946488a7208..b8a8ec4bebf7d 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -239,6 +239,11 @@ impl SstableBuilder { self.add(full_key, value).await } + /// only for test + pub fn current_block_size(&self) -> usize { + self.block_builder.approximate_len() + } + /// Add raw data of block to sstable. return false means fallback pub async fn add_raw_block( &mut self, @@ -666,7 +671,7 @@ impl SstableBuilder { + self.range_tombstone_size } - async fn build_block(&mut self) -> HummockResult<()> { + pub async fn build_block(&mut self) -> HummockResult<()> { // Skip empty block. if self.block_builder.is_empty() { return Ok(());