Skip to content

Commit

Permalink
feat(storage): filter data by watermark in fast compact algorithm (#1…
Browse files Browse the repository at this point in the history
…4320)

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Mar 6, 2024
1 parent 8f9c063 commit ecf95c5
Show file tree
Hide file tree
Showing 3 changed files with 456 additions and 229 deletions.
326 changes: 262 additions & 64 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@ pub(crate) mod tests {
use bytes::{BufMut, Bytes, BytesMut};
use itertools::Itertools;
use rand::{Rng, RngCore, SeedableRng};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::constants::hummock::CompactionFilterFlag;
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::Epoch;
use risingwave_common_service::observer_manager::NotificationClient;
use risingwave_hummock_sdk::can_concat;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{next_key, FullKey, TableKey, TABLE_PREFIX_LEN};
use risingwave_hummock_sdk::key::{
next_key, prefix_slice_with_vnode, FullKey, TableKey, TABLE_PREFIX_LEN,
};
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map;
use risingwave_hummock_sdk::table_watermark::{
ReadTableWatermark, VnodeWatermark, WatermarkDirection,
};
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder;
use risingwave_meta::hummock::compaction::selector::{
Expand All @@ -44,7 +51,10 @@ pub(crate) mod tests {
};
use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient};
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, SstableInfo, TableOption};
use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
use risingwave_pb::hummock::{
CompactTask, InputLevel, KeyRange, SstableInfo, TableOption, TableWatermarks,
};
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::filter_key_extractor::{
Expand All @@ -57,7 +67,9 @@ pub(crate) mod tests {
CompactionExecutor, CompactorContext, DummyCompactionFilter, TaskProgress,
};
use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
use risingwave_storage::hummock::iterator::{ConcatIterator, UserIterator};
use risingwave_storage::hummock::iterator::{
ConcatIterator, SkipWatermarkIterator, UserIterator,
};
use risingwave_storage::hummock::sstable_store::SstableStoreRef;
use risingwave_storage::hummock::test_utils::gen_test_sstable_info;
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -1440,6 +1452,44 @@ pub(crate) mod tests {
}
}

async fn run_fast_and_normal_runner(
compact_ctx: CompactorContext,
task: CompactTask,
) -> (Vec<SstableInfo>, Vec<SstableInfo>) {
let multi_filter_key_extractor =
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor));
let compaction_filter = DummyCompactionFilter {};
let slow_compact_runner = CompactorRunner::new(
0,
compact_ctx.clone(),
task.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]),
)),
);
let fast_compact_runner = FastCompactorRunner::new(
compact_ctx.clone(),
task.clone(),
multi_filter_key_extractor.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([22, 23, 24, 25, 26, 27, 28, 29]),
)),
Arc::new(TaskProgress::default()),
);
let (_, ret1, _) = slow_compact_runner
.run(
compaction_filter,
multi_filter_key_extractor,
Arc::new(TaskProgress::default()),
)
.await
.unwrap();
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
(ret, fast_ret)
}

async fn test_fast_compact_impl(data: Vec<Vec<KeyValue>>) {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;
Expand Down Expand Up @@ -1507,37 +1557,7 @@ pub(crate) mod tests {
gc_delete_keys: true,
..Default::default()
};
let multi_filter_key_extractor =
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor));
let compaction_filter = DummyCompactionFilter {};
let slow_compact_runner = CompactorRunner::new(
0,
compact_ctx.clone(),
task.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]),
)),
);
let fast_compact_runner = FastCompactorRunner::new(
compact_ctx.clone(),
task.clone(),
multi_filter_key_extractor.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([22, 23, 24, 25, 26, 27, 28, 29]),
)),
Arc::new(TaskProgress::default()),
);
let (_, ret1, _) = slow_compact_runner
.run(
compaction_filter,
multi_filter_key_extractor,
Arc::new(TaskProgress::default()),
)
.await
.unwrap();
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ret, fast_ret) = run_fast_and_normal_runner(compact_ctx.clone(), task).await;
check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, capacity).await;
}

Expand Down Expand Up @@ -1756,37 +1776,215 @@ pub(crate) mod tests {
gc_delete_keys: true,
..Default::default()
};
let multi_filter_key_extractor =
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor));
let compaction_filter = DummyCompactionFilter {};
let slow_compact_runner = CompactorRunner::new(
0,
compact_ctx.clone(),
task.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]),
)),
let (ret, fast_ret) = run_fast_and_normal_runner(compact_ctx.clone(), task).await;
check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, target_file_size).await;
}

#[tokio::test]
async fn test_skip_watermark() {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;
let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
hummock_manager_ref.clone(),
worker_node.id,
));
let existing_table_id: u32 = 1;
let storage = get_hummock_storage(
hummock_meta_client.clone(),
get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()),
&hummock_manager_ref,
TableId::from(existing_table_id),
)
.await;
hummock_manager_ref.get_new_sst_ids(10).await.unwrap();
let (compact_ctx, _) = prepare_compactor_and_filter(&storage, existing_table_id);

let sstable_store = compact_ctx.sstable_store.clone();
let capacity = 256 * 1024;
let opts = SstableBuilderOptions {
capacity,
block_capacity: 2048,
restart_interval: 16,
bloom_false_positive: 0.1,
compression_algorithm: CompressionAlgorithm::Lz4,
..Default::default()
};

const KEY_COUNT: usize = 20000;
let mut rng = rand::rngs::StdRng::seed_from_u64(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
let fast_compact_runner = FastCompactorRunner::new(
compact_ctx.clone(),
task.clone(),
multi_filter_key_extractor.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([22, 23, 24, 25, 26, 27, 28, 29]),
)),
Arc::new(TaskProgress::default()),
let mut sst_infos = vec![];
let mut max_sst_file_size = 0;

for object_id in 1..3 {
let mut builder = SstableBuilder::<_, BlockedXor16FilterBuilder>::new(
object_id,
sstable_store
.clone()
.create_sst_writer(object_id, SstableWriterOptions::default()),
BlockedXor16FilterBuilder::create(opts.bloom_false_positive, opts.capacity / 16),
opts.clone(),
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)),
None,
);
let key_count = KEY_COUNT / VirtualNode::COUNT * 2;
for vnode_id in 0..VirtualNode::COUNT / 2 {
let mut last_k: u64 = 1;
let init_epoch = 100 * object_id;
let mut last_epoch = init_epoch;
for idx in 0..key_count {
let rand_v = rng.next_u32() % 10;
let (k, epoch) = if rand_v == 0 {
(last_k + 1000 * object_id, init_epoch)
} else if rand_v < 5 {
(last_k, last_epoch - 1)
} else {
(last_k + 1, init_epoch)
};
let key = prefix_slice_with_vnode(
VirtualNode::from_index(vnode_id),
k.to_be_bytes().as_slice(),
);
let key = FullKey::new(TableId::new(1), TableKey(key), epoch);
let rand_v = rng.next_u32() % 10;
let v = if (5..7).contains(&rand_v) {
HummockValue::delete()
} else {
HummockValue::put(format!("{}-{}", idx, epoch).into_bytes())
};
if rand_v < 5 && builder.current_block_size() > opts.block_capacity / 2 {
// cut block when the key is same with the last key.
builder.build_block().await.unwrap();
}
builder.add(key.to_ref(), v.as_slice()).await.unwrap();
last_k = k;
last_epoch = epoch;
}
}

let output = builder.finish().await.unwrap();
output.writer_output.await.unwrap().unwrap();
let sst_info = output.sst_info.sst_info;
max_sst_file_size = std::cmp::max(max_sst_file_size, sst_info.file_size);
sst_infos.push(sst_info);
}
println!(
"input data: {}",
sst_infos.iter().map(|sst| sst.file_size).sum::<u64>(),
);
let (_, ret1, _) = slow_compact_runner
.run(
compaction_filter,
multi_filter_key_extractor,
Arc::new(TaskProgress::default()),
)
.await
.unwrap();
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, target_file_size).await;

let target_file_size = max_sst_file_size / 4;
let mut table_watermarks = BTreeMap::default();
let key_count = KEY_COUNT / VirtualNode::COUNT * 2;
let mut vnode_builder = BitmapBuilder::zeroed(VirtualNode::COUNT);
for i in 0..VirtualNode::COUNT / 2 {
if i % 2 == 0 {
vnode_builder.set(i, true);
} else {
vnode_builder.set(i, false);
}
}
let bitmap = Arc::new(vnode_builder.finish());
let watermark_idx = key_count * 20;
let watermark_key = Bytes::from(watermark_idx.to_be_bytes().to_vec());
table_watermarks.insert(
1,
TableWatermarks {
epoch_watermarks: vec![PbEpochNewWatermarks {
watermarks: vec![
VnodeWatermark::new(bitmap.clone(), watermark_key.clone()).to_protobuf()
],
epoch: 500,
}],
is_ascending: true,
},
);

let task = CompactTask {
input_ssts: vec![
InputLevel {
level_idx: 5,
level_type: 1,
table_infos: sst_infos.drain(..1).collect_vec(),
},
InputLevel {
level_idx: 6,
level_type: 1,
table_infos: sst_infos,
},
],
existing_table_ids: vec![1],
task_id: 1,
watermark: 1000,
splits: vec![KeyRange::inf()],
target_level: 6,
base_level: 4,
target_file_size,
compression_algorithm: 1,
gc_delete_keys: true,
table_watermarks,
..Default::default()
};
let (ret, fast_ret) = run_fast_and_normal_runner(compact_ctx.clone(), task).await;
println!(
"normal compact result data: {}, fast compact result data: {}",
ret.iter().map(|sst| sst.file_size).sum::<u64>(),
fast_ret.iter().map(|sst| sst.file_size).sum::<u64>(),
);
// check_compaction_result(compact_ctx.sstable_store, ret.clone(), fast_ret, target_file_size).await;
let mut fast_tables = Vec::with_capacity(fast_ret.len());
let mut normal_tables = Vec::with_capacity(ret.len());
let mut stats = StoreLocalStatistic::default();
for sst_info in &fast_ret {
fast_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap());
}

for sst_info in &ret {
normal_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap());
}
assert!(can_concat(&ret));
assert!(can_concat(&fast_ret));
let read_options = Arc::new(SstableIteratorReadOptions::default());

let mut watermark = ReadTableWatermark {
direction: WatermarkDirection::Ascending,
vnode_watermarks: BTreeMap::default(),
};
for i in 0..VirtualNode::COUNT {
if i % 2 == 0 {
watermark
.vnode_watermarks
.insert(VirtualNode::from_index(i), watermark_key.clone());
}
}
let watermark = BTreeMap::from_iter([(TableId::new(1), watermark)]);

let mut normal_iter = UserIterator::for_test(
SkipWatermarkIterator::new(
ConcatIterator::new(ret, sstable_store.clone(), read_options.clone()),
watermark.clone(),
),
(Bound::Unbounded, Bound::Unbounded),
);
let mut fast_iter = UserIterator::for_test(
SkipWatermarkIterator::new(
ConcatIterator::new(fast_ret, sstable_store, read_options),
watermark,
),
(Bound::Unbounded, Bound::Unbounded),
);
normal_iter.rewind().await.unwrap();
fast_iter.rewind().await.unwrap();
let mut count = 0;
while normal_iter.is_valid() {
assert_eq!(normal_iter.key(), fast_iter.key(), "not equal in {}", count,);
normal_iter.next().await.unwrap();
fast_iter.next().await.unwrap();
count += 1;
}
}
}
Loading

0 comments on commit ecf95c5

Please sign in to comment.