Skip to content

Commit

Permalink
feat(storage): separate key and values in imm to reduce allocation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 4, 2024
1 parent 5163877 commit 940d6c7
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 144 deletions.
4 changes: 4 additions & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,9 @@ harness = false
name = "bench_row"
harness = false

[[bench]]
name = "bench_imm_compact"
harness = false

[lints]
workspace = true
83 changes: 83 additions & 0 deletions src/storage/benches/bench_imm_compact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use bytes::Bytes;
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::hummock::compactor::merge_imms_in_memory;
use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use risingwave_storage::hummock::value::HummockValue;

fn gen_interleave_shared_buffer_batch(
batch_size: usize,
batch_count: usize,
epoch: u64,
) -> Vec<SharedBufferBatch> {
let mut batches = Vec::new();
for i in 0..batch_count {
let mut batch_data = vec![];
for j in 0..batch_size {
batch_data.push((
TableKey(Bytes::copy_from_slice(
format!("test_key_{:08}", j * batch_count + i).as_bytes(),
)),
HummockValue::put(Bytes::copy_from_slice("value".as_bytes())),
));
}
let batch = SharedBufferBatch::for_test(batch_data, epoch, Default::default());
batches.push(batch);
}
batches.reverse();
batches
}

fn criterion_benchmark(c: &mut Criterion) {
let batches = gen_interleave_shared_buffer_batch(10000, 100, 100);
c.bench_with_input(
BenchmarkId::new("bench-imm-merge", "single-epoch"),
&batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
assert_eq!(imm.key_count(), 10000 * 100);
assert_eq!(imm.value_count(), 10000 * 100);
})
},
);

let mut later_batches = gen_interleave_shared_buffer_batch(2000, 100, 600);

for i in 1..5 {
let mut batches = gen_interleave_shared_buffer_batch(2000, 100, 600 - i * 100);
batches.extend(later_batches);
later_batches = batches;
}

c.bench_with_input(
BenchmarkId::new("bench-imm-merge", "multi-epoch"),
&later_batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
assert_eq!(imm.key_count(), 2000 * 100);
assert_eq!(imm.value_count(), 2000 * 100 * 5);
})
},
);
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
48 changes: 25 additions & 23 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,24 +932,18 @@ pub fn bound_table_key_range<T: AsRef<[u8]> + EmptySliceRef>(
(start, end)
}

pub struct FullKeyTracker<T: AsRef<[u8]> + Ord + Eq> {
/// TODO: Temporary bypass full key check. Remove this field after #15099 is resolved.
pub struct FullKeyTracker<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool = false> {
pub latest_full_key: FullKey<T>,
last_observed_epoch_with_gap: EpochWithGap,
/// TODO: Temporary bypass full key check. Remove this field after #15099 is resolved.
allow_same_full_key: bool,
}

impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_DEDUP> {
pub fn new(init_full_key: FullKey<T>) -> Self {
Self::with_config(init_full_key, false)
}

pub fn with_config(init_full_key: FullKey<T>, allow_same_full_key: bool) -> Self {
let epoch_with_gap = init_full_key.epoch_with_gap;
Self {
latest_full_key: init_full_key,
last_observed_epoch_with_gap: epoch_with_gap,
allow_same_full_key,
}
}

Expand All @@ -965,7 +959,7 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
///
/// let table_id = TableId { table_id: 1 };
/// let full_key1 = FullKey::new(table_id, TableKey(Bytes::from("c")), 5 << EPOCH_AVAILABLE_BITS);
/// let mut a = FullKeyTracker::<Bytes>::new(full_key1.clone());
/// let mut a: FullKeyTracker<_> = FullKeyTracker::<Bytes>::new(full_key1.clone());
///
/// // Panic on non-decreasing epoch observed for the same user key.
/// // let full_key_with_larger_epoch = FullKey::new(table_id, TableKey(Bytes::from("c")), 6 << EPOCH_AVAILABLE_BITS);
Expand All @@ -976,16 +970,18 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
/// // a.observe(full_key_with_smaller_user_key);
///
/// let full_key2 = FullKey::new(table_id, TableKey(Bytes::from("c")), 3 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key2), None);
/// assert_eq!(a.observe(full_key2.clone()), None);
/// assert_eq!(a.latest_user_key(), &full_key2.user_key);
///
/// let full_key3 = FullKey::new(table_id, TableKey(Bytes::from("f")), 4 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key3), Some(full_key1));
/// assert_eq!(a.observe(full_key3.clone()), Some(full_key1.user_key));
/// assert_eq!(a.latest_user_key(), &full_key3.user_key);
/// ```
///
/// Return:
/// - If the provided `key` contains a new user key, return the latest full key observed for the previous user key.
/// - Otherwise: return None
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<FullKey<T>>
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<UserKey<T>>
where
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
Expand All @@ -998,7 +994,7 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
&mut self,
user_key: UserKey<F>,
mut epochs: impl Iterator<Item = EpochWithGap>,
) -> Option<FullKey<T>>
) -> Option<UserKey<T>>
where
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
Expand Down Expand Up @@ -1030,18 +1026,20 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
self.last_observed_epoch_with_gap = min_epoch_with_gap;

// Take the previous key and set latest key
Some(std::mem::replace(
&mut self.latest_full_key,
FullKey {
user_key: user_key.into(),
epoch_with_gap: min_epoch_with_gap,
},
))
Some(
std::mem::replace(
&mut self.latest_full_key,
FullKey {
user_key: user_key.into(),
epoch_with_gap: min_epoch_with_gap,
},
)
.user_key,
)
}
Ordering::Equal => {
if max_epoch_with_gap > self.last_observed_epoch_with_gap
|| (!self.allow_same_full_key
&& max_epoch_with_gap == self.last_observed_epoch_with_gap)
|| (!SKIP_DEDUP && max_epoch_with_gap == self.last_observed_epoch_with_gap)
{
// Epoch from the same user key should be monotonically decreasing
panic!(
Expand All @@ -1065,6 +1063,10 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
}
}
}

pub fn latest_user_key(&self) -> &UserKey<T> {
&self.latest_full_key.user_key
}
}

#[cfg(test)]
Expand Down
81 changes: 39 additions & 42 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey};
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, EPOCH_LEN};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::compact_task;
use thiserror_ext::AsReport;
use tracing::error;
use tracing::{error, warn};

use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager};
use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
Expand All @@ -41,10 +41,9 @@ use crate::hummock::iterator::{
Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, UserIterator,
};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner, SharedBufferVersionedEntry,
SharedBufferBatch, SharedBufferBatchInner, SharedBufferKeyEntry, VersionedSharedBufferValue,
};
use crate::hummock::utils::MemoryTracker;
use crate::hummock::value::HummockValue;
use crate::hummock::{
BlockedXor16FilterBuilder, CachePolicy, CompactionDeleteRangeIterator, GetObjectId,
HummockError, HummockResult, SstableBuilderOptions, SstableObjectIdManagerRef,
Expand Down Expand Up @@ -147,7 +146,7 @@ async fn compact_shared_buffer(
ret
});

let total_key_count = payload.iter().map(|imm| imm.kv_count()).sum::<usize>();
let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
let (splits, sub_compaction_sstable_size, split_weight_by_vnode) =
generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
let parallelism = splits.len();
Expand Down Expand Up @@ -286,23 +285,23 @@ pub async fn merge_imms_in_memory(
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> ImmutableMemtable {
let mut kv_count = 0;
let mut epochs = vec![];
let mut merged_size = 0;
assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
let max_imm_id = imms[0].batch_id();

let mut imm_iters = Vec::with_capacity(imms.len());
let key_count = imms.iter().map(|imm| imm.key_count()).sum();
let value_count = imms.iter().map(|imm| imm.value_count()).sum();
for imm in imms {
assert!(imm.kv_count() > 0, "imm should not be empty");
assert!(imm.key_count() > 0, "imm should not be empty");
assert_eq!(
table_id,
imm.table_id(),
"should only merge data belonging to the same table"
);

epochs.push(imm.min_epoch());
kv_count += imm.kv_count();
merged_size += imm.size();

imm_iters.push(imm.into_forward_iter());
Expand All @@ -316,44 +315,50 @@ pub async fn merge_imms_in_memory(

let first_item_key = mi.current_key_entry().key.clone();

let mut merged_payload: Vec<SharedBufferVersionedEntry> = Vec::new();
let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);

merged_entries.push(SharedBufferKeyEntry {
key: first_item_key.clone(),
value_offset: 0,
});

// Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
table_id,
first_item_key,
EpochWithGap::new_max_epoch(),
));
let mut table_key_versions: Vec<(EpochWithGap, HummockValue<Bytes>)> = Vec::new();

while mi.is_valid() {
let key_entry = mi.current_key_entry();
let user_key = UserKey {
table_id,
table_key: key_entry.key.clone(),
};
if let Some(last_full_key) = full_key_tracker.observe_multi_version(
user_key,
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, _)| *epoch_with_gap),
) {
let last_user_key = last_full_key.user_key;
// `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items
// and should not be used because we use max epoch to initialize the tracker
let _epoch_with_gap = last_full_key.epoch_with_gap;

// Record kv entries
merged_payload.push(SharedBufferVersionedEntry::new(
last_user_key.table_key,
table_key_versions,
));

// Reset state before moving onto the new table key
table_key_versions = vec![];
if full_key_tracker
.observe_multi_version(
user_key,
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, _)| *epoch_with_gap),
)
.is_some()
{
let last_entry = merged_entries.last_mut().expect("non-empty");
if last_entry.value_offset == values.len() {
warn!(key = ?last_entry.key, "key has no value in imm compact. skipped");
last_entry.key = full_key_tracker.latest_user_key().table_key.clone();
} else {
// Record kv entries
merged_entries.push(SharedBufferKeyEntry {
key: full_key_tracker.latest_user_key().table_key.clone(),
value_offset: values.len(),
});
}
}
table_key_versions.extend(
values.extend(
key_entry
.new_values
.iter()
Expand All @@ -365,19 +370,11 @@ pub async fn merge_imms_in_memory(
tokio::task::consume_budget().await;
}

// process the last key
if !table_key_versions.is_empty() {
merged_payload.push(SharedBufferVersionedEntry::new(
full_key_tracker.latest_full_key.user_key.table_key,
table_key_versions,
));
}

SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
epochs,
merged_payload,
kv_count,
merged_entries,
values,
merged_size,
max_imm_id,
memory_tracker,
Expand All @@ -398,7 +395,7 @@ fn generate_splits(
for imm in payload {
let data_size = {
// calculate encoded bytes of key var length
(imm.kv_count() * 8 + imm.size()) as u64
(imm.value_count() * EPOCH_LEN + imm.size()) as u64
};
compact_data_size += data_size;
size_and_start_user_keys.push((data_size, imm.start_user_key()));
Expand Down
Loading

0 comments on commit 940d6c7

Please sign in to comment.