Skip to content

Commit

Permalink
fix(compactor): fix put key miss tombstone (#14233)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Dec 28, 2023
1 parent afd1fee commit 2a26d0e
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 92 deletions.
319 changes: 229 additions & 90 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) mod tests {
};
use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient};
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, TableOption};
use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, SstableInfo, TableOption};
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::filter_key_extractor::{
Expand All @@ -62,9 +62,10 @@ pub(crate) mod tests {
use risingwave_storage::hummock::test_utils::gen_test_sstable_info;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CachePolicy, CompressionAlgorithm, HummockStorage as GlobalHummockStorage, HummockStorage,
MemoryLimiter, SharedComapctorObjectIdManager, Sstable, SstableBuilderOptions,
SstableIteratorReadOptions, SstableObjectIdManager,
BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, FilterBuilder,
HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter,
SharedComapctorObjectIdManager, Sstable, SstableBuilder, SstableBuilderOptions,
SstableIteratorReadOptions, SstableObjectIdManager, SstableWriterOptions,
};
use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic};
use risingwave_storage::opts::StorageOpts;
Expand Down Expand Up @@ -1357,6 +1358,82 @@ pub(crate) mod tests {
}

type KeyValue = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>);
async fn check_compaction_result(
sstable_store: SstableStoreRef,
ret: Vec<SstableInfo>,
fast_ret: Vec<SstableInfo>,
capacity: u64,
) {
let mut fast_tables = Vec::with_capacity(fast_ret.len());
let mut normal_tables = Vec::with_capacity(ret.len());
let mut stats = StoreLocalStatistic::default();
for sst_info in &fast_ret {
fast_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap());
}

for sst_info in &ret {
normal_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap());
}
assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5));
println!(
"fast sstables file size: {:?}",
fast_ret.iter().map(|f| f.file_size).collect_vec(),
);
assert!(can_concat(&ret));
assert!(can_concat(&fast_ret));
let read_options = Arc::new(SstableIteratorReadOptions::default());

let mut normal_iter = UserIterator::for_test(
ConcatIterator::new(ret, sstable_store.clone(), read_options.clone()),
(Bound::Unbounded, Bound::Unbounded),
);
let mut fast_iter = UserIterator::for_test(
ConcatIterator::new(fast_ret, sstable_store.clone(), read_options.clone()),
(Bound::Unbounded, Bound::Unbounded),
);

normal_iter.rewind().await.unwrap();
fast_iter.rewind().await.unwrap();
let mut count = 0;
while normal_iter.is_valid() {
assert_eq!(
normal_iter.key(),
fast_iter.key(),
"not equal in {}, len: {} {} vs {}",
count,
normal_iter.key().user_key.table_key.as_ref().len(),
u64::from_be_bytes(
normal_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
u64::from_be_bytes(
fast_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
);
let hash = Sstable::hash_for_bloom_filter(
fast_iter.key().user_key.encode().as_slice(),
fast_iter.key().user_key.table_id.table_id,
);
assert_eq!(normal_iter.value(), fast_iter.value());
let key_ref = fast_iter.key().user_key.as_ref();
assert!(normal_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
assert!(fast_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
normal_iter.next().await.unwrap();
fast_iter.next().await.unwrap();
count += 1;
}
}

async fn test_fast_compact_impl(data: Vec<Vec<KeyValue>>) {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
Expand Down Expand Up @@ -1399,7 +1476,6 @@ pub(crate) mod tests {
println!("generate ssts size: {}", sst.file_size);
ssts.push(sst);
}
let read_options = Arc::new(SstableIteratorReadOptions::default());
let select_file_count = ssts.len() / 2;

let task = CompactTask {
Expand Down Expand Up @@ -1457,91 +1533,7 @@ pub(crate) mod tests {
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
println!("ssts: {} vs {}", fast_ret.len(), ret.len());
let mut fast_tables = Vec::with_capacity(fast_ret.len());
let mut normal_tables = Vec::with_capacity(ret.len());
let mut stats = StoreLocalStatistic::default();
for sst_info in &fast_ret {
fast_tables.push(
compact_ctx
.sstable_store
.sstable(sst_info, &mut stats)
.await
.unwrap(),
);
}

for sst_info in &ret {
normal_tables.push(
compact_ctx
.sstable_store
.sstable(sst_info, &mut stats)
.await
.unwrap(),
);
}
assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5));
println!(
"fast sstables file size: {:?}",
fast_ret.iter().map(|f| f.file_size).collect_vec(),
);
assert!(can_concat(&ret));
assert!(can_concat(&fast_ret));

let mut normal_iter = UserIterator::for_test(
ConcatIterator::new(ret, compact_ctx.sstable_store.clone(), read_options.clone()),
(Bound::Unbounded, Bound::Unbounded),
);
let mut fast_iter = UserIterator::for_test(
ConcatIterator::new(
fast_ret,
compact_ctx.sstable_store.clone(),
read_options.clone(),
),
(Bound::Unbounded, Bound::Unbounded),
);

normal_iter.rewind().await.unwrap();
fast_iter.rewind().await.unwrap();
let mut count = 0;
while normal_iter.is_valid() {
assert_eq!(
normal_iter.key(),
fast_iter.key(),
"not equal in {}, len: {} {} vs {}",
count,
normal_iter.key().user_key.table_key.as_ref().len(),
u64::from_be_bytes(
normal_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
u64::from_be_bytes(
fast_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
);
let hash = Sstable::hash_for_bloom_filter(
fast_iter.key().user_key.encode().as_slice(),
fast_iter.key().user_key.table_id.table_id,
);
assert_eq!(normal_iter.value(), fast_iter.value());
let key_ref = fast_iter.key().user_key.as_ref();
assert!(normal_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
assert!(fast_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
normal_iter.next().await.unwrap();
fast_iter.next().await.unwrap();
count += 1;
}
check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, capacity).await;
}

#[tokio::test]
Expand Down Expand Up @@ -1645,4 +1637,151 @@ pub(crate) mod tests {
}
test_fast_compact_impl(vec![data1, data2]).await;
}

#[tokio::test]
async fn test_tombstone_recycle() {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;
let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
hummock_manager_ref.clone(),
worker_node.id,
));
let existing_table_id: u32 = 1;
let storage = get_hummock_storage(
hummock_meta_client.clone(),
get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()),
&hummock_manager_ref,
TableId::from(existing_table_id),
)
.await;
hummock_manager_ref.get_new_sst_ids(10).await.unwrap();
let (compact_ctx, _) = prepare_compactor_and_filter(&storage, existing_table_id);

let sstable_store = compact_ctx.sstable_store.clone();
let capacity = 256 * 1024;
let opts = SstableBuilderOptions {
capacity,
block_capacity: 2048,
restart_interval: 16,
bloom_false_positive: 0.1,
compression_algorithm: CompressionAlgorithm::Lz4,
..Default::default()
};

const KEY_COUNT: usize = 20000;
let mut rng = rand::rngs::StdRng::seed_from_u64(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
let mut sst_infos = vec![];
let mut max_sst_file_size = 0;

for object_id in 1..3 {
let mut builder = SstableBuilder::<_, BlockedXor16FilterBuilder>::new(
object_id,
sstable_store
.clone()
.create_sst_writer(object_id, SstableWriterOptions::default()),
BlockedXor16FilterBuilder::create(opts.bloom_false_positive, opts.capacity / 16),
opts.clone(),
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)),
None,
);
let mut last_k: u64 = 1;
let init_epoch = 100 * object_id;
let mut last_epoch = init_epoch;
for idx in 0..KEY_COUNT {
let rand_v = rng.next_u32() % 10;
let (k, epoch) = if rand_v == 0 {
(last_k + 1000 * object_id, init_epoch)
} else if rand_v < 5 {
(last_k, last_epoch - 1)
} else {
(last_k + 1, init_epoch)
};
let key = k.to_be_bytes().to_vec();
let key = FullKey::new(TableId::new(1), TableKey(key.as_slice()), epoch);
let rand_v = rng.next_u32() % 10;
let v = if (5..7).contains(&rand_v) {
HummockValue::delete()
} else {
HummockValue::put(format!("{}-{}", idx, epoch).into_bytes())
};
if rand_v < 5 && builder.current_block_size() > opts.block_capacity / 2 {
// cut block when the key is same with the last key.
builder.build_block().await.unwrap();
}
builder.add(key, v.as_slice()).await.unwrap();
last_k = k;
last_epoch = epoch;
}

let output = builder.finish().await.unwrap();
output.writer_output.await.unwrap().unwrap();
let sst_info = output.sst_info.sst_info;
max_sst_file_size = std::cmp::max(max_sst_file_size, sst_info.file_size);
sst_infos.push(sst_info);
}

let target_file_size = max_sst_file_size / 4;

let task = CompactTask {
input_ssts: vec![
InputLevel {
level_idx: 5,
level_type: 1,
table_infos: sst_infos.drain(..1).collect_vec(),
},
InputLevel {
level_idx: 6,
level_type: 1,
table_infos: sst_infos,
},
],
existing_table_ids: vec![1],
task_id: 1,
watermark: 1000,
splits: vec![KeyRange::inf()],
target_level: 6,
base_level: 4,
target_file_size,
compression_algorithm: 1,
gc_delete_keys: true,
..Default::default()
};
let multi_filter_key_extractor =
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor));
let compaction_filter = DummyCompactionFilter {};
let slow_compact_runner = CompactorRunner::new(
0,
compact_ctx.clone(),
task.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]),
)),
);
let fast_compact_runner = FastCompactorRunner::new(
compact_ctx.clone(),
task.clone(),
multi_filter_key_extractor.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([22, 23, 24, 25, 26, 27, 28, 29]),
)),
Arc::new(TaskProgress::default()),
);
let (_, ret1, _) = slow_compact_runner
.run(
compaction_filter,
multi_filter_key_extractor,
Arc::new(TaskProgress::default()),
)
.await
.unwrap();
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, target_file_size).await;
}
}
5 changes: 4 additions & 1 deletion src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,10 @@ impl CompactorRunner {
let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
let (block, filter_data, block_meta) =
sstable_iter.download_next_block().await?.unwrap();
if self.executor.builder.need_flush() {
// If the last key is tombstone and it was deleted, the first key of this block must be deleted. So we can not move this block directly.
let need_deleted = self.executor.last_key.user_key.eq(&smallest_key.user_key)
&& self.executor.last_key_is_delete;
if self.executor.builder.need_flush() || need_deleted {
let largest_key = sstable_iter.sstable.value().meta.largest_key.clone();
let target_key = FullKey::decode(&largest_key);
sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
self.add(full_key, value).await
}

/// only for test
pub fn current_block_size(&self) -> usize {
self.block_builder.approximate_len()
}

/// Add raw data of block to sstable. return false means fallback
pub async fn add_raw_block(
&mut self,
Expand Down Expand Up @@ -666,7 +671,7 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
+ self.range_tombstone_size
}

async fn build_block(&mut self) -> HummockResult<()> {
pub async fn build_block(&mut self) -> HummockResult<()> {
// Skip empty block.
if self.block_builder.is_empty() {
return Ok(());
Expand Down

0 comments on commit 2a26d0e

Please sign in to comment.