Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support reverse scan #12570

Merged
merged 31 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
da97cfe
tmp commit
Little-Wallace Mar 20, 2024
b139383
refactor code
Little-Wallace Mar 20, 2024
f4b5eca
rev iterator
Little-Wallace Mar 20, 2024
af91a74
fix check
Little-Wallace Apr 15, 2024
27c4bb9
add test
Little-Wallace Apr 15, 2024
40d7711
fix test
Little-Wallace Apr 15, 2024
8367c90
remove core
Little-Wallace Apr 15, 2024
6c88003
Merge branch 'main' into reverse-iter
Little-Wallace May 9, 2024
11f48f9
fix conflict
Little-Wallace May 9, 2024
d8360cc
revert user key refactor
Little-Wallace May 9, 2024
b89d706
add micro benchmark
Little-Wallace May 10, 2024
c94b648
add ut
Little-Wallace May 10, 2024
02d8825
fix format
Little-Wallace May 10, 2024
a8f563a
refactor iterator builder
Little-Wallace May 14, 2024
849da4b
refactor
Little-Wallace May 16, 2024
3c82812
refactor interface
Little-Wallace May 16, 2024
3ccc35c
Merge branch 'main' into reverse-iter
Little-Wallace May 16, 2024
059e527
fix conflict
Little-Wallace May 16, 2024
c0343b4
fix format
Little-Wallace May 16, 2024
c7d7adf
add ut
Little-Wallace May 17, 2024
4289518
fix check
Little-Wallace May 17, 2024
349b617
fix corner case
Little-Wallace May 17, 2024
5632104
fix scan bound
Little-Wallace May 17, 2024
9615962
Merge branch 'main' into reverse-iter
Little-Wallace May 20, 2024
7c6217b
fix fmt
Little-Wallace May 20, 2024
16b1719
fix warn
Little-Wallace May 20, 2024
c2c0164
Merge branch 'main' into reverse-iter
Little-Wallace May 20, 2024
2420d15
address comment
Little-Wallace May 21, 2024
d0c0fd4
Merge branch 'main' into reverse-iter
Little-Wallace May 21, 2024
fa29061
fix conflict
Little-Wallace May 21, 2024
4dd8d3f
fix check
Little-Wallace May 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 165 additions & 34 deletions src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@ use criterion::{criterion_group, criterion_main, Criterion};
use foyer::HybridCacheBuilder;
use futures::future::try_join_all;
use itertools::Itertools;
use rand::random;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_object_store::object::{ObjectStore, ObjectStoreImpl, S3ObjectStore};
use risingwave_object_store::object::{
InMemObjectStore, ObjectStore, ObjectStoreImpl, S3ObjectStore,
};
use risingwave_pb::hummock::SstableInfo;
use risingwave_storage::hummock::iterator::{ConcatIterator, ConcatIteratorInner, HummockIterator};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
BatchSstableWriterFactory, CachePolicy, HummockResult, MemoryLimiter, SstableBuilder,
SstableBuilderOptions, SstableStore, SstableStoreConfig, SstableWriterFactory,
SstableWriterOptions, StreamingSstableWriterFactory, Xor16FilterBuilder,
BackwardSstableIterator, BatchSstableWriterFactory, CachePolicy, HummockResult, MemoryLimiter,
SstableBuilder, SstableBuilderOptions, SstableIteratorReadOptions, SstableStore,
SstableStoreConfig, SstableWriterFactory, SstableWriterOptions, StreamingSstableWriterFactory,
Xor16FilterBuilder,
};
use risingwave_storage::monitor::{global_hummock_state_store_metrics, ObjectStoreMetrics};

Expand Down Expand Up @@ -101,7 +107,7 @@ fn test_user_key_of(idx: u64) -> UserKey<Vec<u8>> {

async fn build_tables<F: SstableWriterFactory>(
mut builder: CapacitySplitTableBuilder<LocalTableBuilderFactory<F>>,
) {
) -> Vec<SstableInfo> {
for i in RANGE {
builder
.add_full_key_for_test(
Expand All @@ -113,11 +119,43 @@ async fn build_tables<F: SstableWriterFactory>(
.unwrap();
}
let split_table_outputs = builder.finish().await.unwrap();
let ssts = split_table_outputs
.iter()
.map(|handle| handle.sst_info.sst_info.clone())
.collect_vec();
let join_handles = split_table_outputs
.into_iter()
.map(|o| o.upload_join_handle)
.collect_vec();
try_join_all(join_handles).await.unwrap();
ssts
}

async fn generate_sstable_store(object_store: Arc<ObjectStoreImpl>) -> Arc<SstableStore> {
let meta_cache_v2 = HybridCacheBuilder::new()
.memory(64 << 20)
.with_shards(2)
.storage()
.build()
.await
.unwrap();
let block_cache_v2 = HybridCacheBuilder::new()
.memory(128 << 20)
.with_shards(2)
.storage()
.build()
.await
.unwrap();
Arc::new(SstableStore::new(SstableStoreConfig {
store: object_store,
path: "test".to_string(),
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(MetricLevel::Disabled)),
meta_cache_v2,
block_cache_v2,
}))
}

fn bench_builder(
Expand All @@ -141,34 +179,7 @@ fn bench_builder(
});
let object_store = Arc::new(ObjectStoreImpl::S3(object_store));

let sstable_store = runtime.block_on(async {
let meta_cache_v2 = HybridCacheBuilder::new()
.memory(64 << 20)
.with_shards(2)
.storage()
.build()
.await
.unwrap();
let block_cache_v2 = HybridCacheBuilder::new()
.memory(128 << 20)
.with_shards(2)
.storage()
.build()
.await
.unwrap();
Arc::new(SstableStore::new(SstableStoreConfig {
store: object_store,
path: "test".to_string(),
prefetch_buffer_capacity: 64 << 20,
max_prefetch_block_number: 16,
recent_filter: None,
state_store_metrics: Arc::new(global_hummock_state_store_metrics(
MetricLevel::Disabled,
)),
meta_cache_v2,
block_cache_v2,
}))
});
let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let mut group = c.benchmark_group("bench_multi_builder");
group
Expand Down Expand Up @@ -212,5 +223,125 @@ fn bench_multi_builder(c: &mut Criterion) {
}
}

criterion_group!(benches, bench_multi_builder);
fn bench_table_scan(c: &mut Criterion) {
let capacity_mb: usize = 32;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let store = InMemObjectStore::new().monitored(
Arc::new(ObjectStoreMetrics::unused()),
Arc::new(ObjectStoreConfig::default()),
);
let object_store = Arc::new(ObjectStoreImpl::InMem(store));
let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let ssts = runtime.block_on(async {
build_tables(CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(
1,
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
))
.await
});
println!("sst count: {}", ssts.len());
let mut group = c.benchmark_group("bench_multi_builder");
group
.sample_size(SAMPLE_COUNT)
.measurement_time(ESTIMATED_MEASUREMENT_TIME);
let read_options = Arc::new(SstableIteratorReadOptions::default());
group.bench_function("bench_table_scan", |b| {
let sstable_ssts = ssts.clone();
b.to_async(&runtime).iter(|| {
let sstable_ssts = sstable_ssts.clone();
let sstable_store = sstable_store.clone();
let read_options = read_options.clone();
async move {
let mut iter = ConcatIterator::new(
sstable_ssts.clone(),
sstable_store.clone(),
read_options.clone(),
);
iter.rewind().await.unwrap();
let mut count = 0;
while iter.is_valid() {
count += 1;
iter.next().await.unwrap();
}
assert_eq!(count, RANGE.end - RANGE.start);
}
});
});
group.bench_function("bench_table_reverse_scan", |b| {
let mut sstable_ssts = ssts.clone();
sstable_ssts.reverse();
b.to_async(&runtime).iter(|| {
let sstable_ssts = sstable_ssts.clone();
let sstable_store = sstable_store.clone();
let read_options = read_options.clone();
async move {
let mut iter = ConcatIteratorInner::<BackwardSstableIterator>::new(
sstable_ssts.clone(),
sstable_store.clone(),
read_options.clone(),
);
iter.rewind().await.unwrap();
let mut count = 0;
while iter.is_valid() {
count += 1;
iter.next().await.unwrap();
}
assert_eq!(count, RANGE.end - RANGE.start);
}
});
});
group.bench_function("bench_point_scan", |b| {
let sstable_ssts = ssts.clone();
b.to_async(&runtime).iter(|| {
let sstable_ssts = sstable_ssts.clone();
let sstable_store = sstable_store.clone();
let read_options = read_options.clone();
let idx = random::<u64>() % (RANGE.end - RANGE.start);
let key = FullKey::from_user_key(test_user_key_of(idx), 1);
async move {
let mut iter = ConcatIterator::new(
sstable_ssts.clone(),
sstable_store.clone(),
read_options.clone(),
);
iter.seek(key.to_ref()).await.unwrap();
if iter.is_valid() {
iter.next().await.unwrap();
}
}
});
});
group.bench_function("bench_point_reverse_scan", |b| {
let mut sstable_ssts = ssts.clone();
sstable_ssts.reverse();
b.to_async(&runtime).iter(|| {
let sstable_ssts = sstable_ssts.clone();
let sstable_store = sstable_store.clone();
let read_options = read_options.clone();
let idx = random::<u64>() % (RANGE.end - RANGE.start);
let key = FullKey::from_user_key(test_user_key_of(idx), 1);
async move {
let mut iter = ConcatIteratorInner::<BackwardSstableIterator>::new(
sstable_ssts.clone(),
sstable_store.clone(),
read_options.clone(),
);
iter.seek(key.to_ref()).await.unwrap();
if iter.is_valid() {
iter.next().await.unwrap();
}
}
});
});
}

criterion_group!(benches, bench_multi_builder, bench_table_scan);
criterion_main!(benches);
53 changes: 53 additions & 0 deletions src/storage/hummock_test/src/hummock_storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,12 +635,36 @@ async fn test_state_store_sync() {
.into_stream(to_owned_item);
futures::pin_mut!(iter);

let rev_iter = test_env
.storage
.rev_iter(
(
Unbounded,
Included(gen_key_from_str(VirtualNode::ZERO, "eeee")),
),
epoch1,
ReadOptions {
table_id: TEST_TABLE_ID,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
.await
.unwrap()
.into_stream(to_owned_item);
futures::pin_mut!(rev_iter);
let mut rev_results = vec![];
while let Some(result) = rev_iter.try_next().await.unwrap() {
rev_results.push(result);
}
let kv_map_batch_1 = [
(gen_key_from_str(VirtualNode::ZERO, "aaaa"), "1111", epoch1),
(gen_key_from_str(VirtualNode::ZERO, "bbbb"), "2222", epoch1),
];
for (k, v, e) in kv_map_batch_1 {
let result = iter.try_next().await.unwrap();
let rev_result = rev_results.pop();
assert_eq!(result, rev_result);
assert_eq!(
result,
Some((
Expand All @@ -657,6 +681,8 @@ async fn test_state_store_sync() {

for (k, v, e) in kv_map_batch_2 {
let result = iter.try_next().await.unwrap();
let rev_result = rev_results.pop();
assert_eq!(result, rev_result);
assert_eq!(
result,
Some((
Expand Down Expand Up @@ -690,12 +716,37 @@ async fn test_state_store_sync() {

futures::pin_mut!(iter);

let rev_iter = test_env
.storage
.rev_iter(
(
Unbounded,
Included(gen_key_from_str(VirtualNode::ZERO, "eeee")),
),
epoch2,
ReadOptions {
table_id: TEST_TABLE_ID,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
},
)
.await
.unwrap()
.into_stream(to_owned_item);
futures::pin_mut!(rev_iter);
let mut rev_results = vec![];
while let Some(result) = rev_iter.try_next().await.unwrap() {
rev_results.push(result);
}

let kv_map_batch_1 = [("aaaa", "1111", epoch1), ("bbbb", "2222", epoch1)];

let kv_map_batch_2 = [("cccc", "3333", epoch1), ("dddd", "4444", epoch1)];
let kv_map_batch_3 = [("eeee", "6666", epoch2)];
for (k, v, e) in kv_map_batch_1 {
let result = iter.try_next().await.unwrap();
let rev_result = rev_results.pop();
assert_eq!(result, rev_result);
assert_eq!(
result,
Some((
Expand All @@ -707,6 +758,8 @@ async fn test_state_store_sync() {

for (k, v, e) in kv_map_batch_2 {
let result = iter.try_next().await.unwrap();
let rev_result = rev_results.pop();
assert_eq!(result, rev_result);
assert_eq!(
result,
Some((
Expand Down
Loading
Loading