Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): separate key and values in imm to reduce allocation #15300

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,24 +932,18 @@ pub fn bound_table_key_range<T: AsRef<[u8]> + EmptySliceRef>(
(start, end)
}

pub struct FullKeyTracker<T: AsRef<[u8]> + Ord + Eq> {
/// TODO: Temporary bypass full key check. Remove this field after #15099 is resolved.
pub struct FullKeyTracker<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool = false> {
pub latest_full_key: FullKey<T>,
last_observed_epoch_with_gap: EpochWithGap,
/// TODO: Temporary bypass full key check. Remove this field after #15099 is resolved.
allow_same_full_key: bool,
}

impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_DEDUP> {
pub fn new(init_full_key: FullKey<T>) -> Self {
Self::with_config(init_full_key, false)
}

pub fn with_config(init_full_key: FullKey<T>, allow_same_full_key: bool) -> Self {
let epoch_with_gap = init_full_key.epoch_with_gap;
Self {
latest_full_key: init_full_key,
last_observed_epoch_with_gap: epoch_with_gap,
allow_same_full_key,
}
}

Expand All @@ -965,7 +959,7 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
///
/// let table_id = TableId { table_id: 1 };
/// let full_key1 = FullKey::new(table_id, TableKey(Bytes::from("c")), 5 << EPOCH_AVAILABLE_BITS);
/// let mut a = FullKeyTracker::<Bytes>::new(full_key1.clone());
/// let mut a: FullKeyTracker<_> = FullKeyTracker::<Bytes>::new(full_key1.clone());
///
/// // Panic on non-decreasing epoch observed for the same user key.
/// // let full_key_with_larger_epoch = FullKey::new(table_id, TableKey(Bytes::from("c")), 6 << EPOCH_AVAILABLE_BITS);
Expand All @@ -976,16 +970,18 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
/// // a.observe(full_key_with_smaller_user_key);
///
/// let full_key2 = FullKey::new(table_id, TableKey(Bytes::from("c")), 3 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key2), None);
/// assert_eq!(a.observe(full_key2.clone()), None);
/// assert_eq!(a.latest_user_key(), &full_key2.user_key);
///
/// let full_key3 = FullKey::new(table_id, TableKey(Bytes::from("f")), 4 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key3), Some(full_key1));
/// assert_eq!(a.observe(full_key3.clone()), Some(full_key1.user_key));
/// assert_eq!(a.latest_user_key(), &full_key3.user_key);
/// ```
///
/// Return:
/// - If the provided `key` contains a new user key, return the latest full key observed for the previous user key.
/// - Otherwise: return None
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<FullKey<T>>
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<UserKey<T>>
where
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
Expand All @@ -998,7 +994,7 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
&mut self,
user_key: UserKey<F>,
mut epochs: impl Iterator<Item = EpochWithGap>,
) -> Option<FullKey<T>>
) -> Option<UserKey<T>>
where
UserKey<F>: Into<UserKey<T>>,
F: AsRef<[u8]>,
Expand Down Expand Up @@ -1030,18 +1026,20 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
self.last_observed_epoch_with_gap = min_epoch_with_gap;

// Take the previous key and set latest key
Some(std::mem::replace(
&mut self.latest_full_key,
FullKey {
user_key: user_key.into(),
epoch_with_gap: min_epoch_with_gap,
},
))
Some(
std::mem::replace(
&mut self.latest_full_key,
FullKey {
user_key: user_key.into(),
epoch_with_gap: min_epoch_with_gap,
},
)
.user_key,
)
}
Ordering::Equal => {
if max_epoch_with_gap > self.last_observed_epoch_with_gap
|| (!self.allow_same_full_key
&& max_epoch_with_gap == self.last_observed_epoch_with_gap)
|| (!SKIP_DEDUP && max_epoch_with_gap == self.last_observed_epoch_with_gap)
{
// Epoch from the same user key should be monotonically decreasing
panic!(
Expand All @@ -1065,6 +1063,10 @@ impl<T: AsRef<[u8]> + Ord + Eq> FullKeyTracker<T> {
}
}
}

pub fn latest_user_key(&self) -> &UserKey<T> {
&self.latest_full_key.user_key
}
}

#[cfg(test)]
Expand Down
71 changes: 31 additions & 40 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::cache::CachePriority;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey};
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, EPOCH_LEN};
use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, HummockEpoch, LocalSstableInfo};
use risingwave_pb::hummock::compact_task;
Expand All @@ -41,10 +41,9 @@ use crate::hummock::iterator::{
Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, UserIterator,
};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner, SharedBufferVersionedEntry,
SharedBufferBatch, SharedBufferBatchInner, SharedBufferKeyEntry, VersionedSharedBufferValue,
};
use crate::hummock::utils::MemoryTracker;
use crate::hummock::value::HummockValue;
use crate::hummock::{
BlockedXor16FilterBuilder, CachePolicy, CompactionDeleteRangeIterator, GetObjectId,
HummockError, HummockResult, SstableBuilderOptions, SstableObjectIdManagerRef,
Expand Down Expand Up @@ -147,7 +146,7 @@ async fn compact_shared_buffer(
ret
});

let total_key_count = payload.iter().map(|imm| imm.kv_count()).sum::<usize>();
let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::<usize>();
let (splits, sub_compaction_sstable_size, split_weight_by_vnode) =
generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref());
let parallelism = splits.len();
Expand Down Expand Up @@ -286,23 +285,23 @@ pub async fn merge_imms_in_memory(
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> ImmutableMemtable {
let mut kv_count = 0;
let mut epochs = vec![];
let mut merged_size = 0;
assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted());
let max_imm_id = imms[0].batch_id();

let mut imm_iters = Vec::with_capacity(imms.len());
let key_count = imms.iter().map(|imm| imm.key_count()).sum();
let value_count = imms.iter().map(|imm| imm.value_count()).sum();
for imm in imms {
assert!(imm.kv_count() > 0, "imm should not be empty");
assert!(imm.key_count() > 0, "imm should not be empty");
assert_eq!(
table_id,
imm.table_id(),
"should only merge data belonging to the same table"
);

epochs.push(imm.min_epoch());
kv_count += imm.kv_count();
merged_size += imm.size();

imm_iters.push(imm.into_forward_iter());
Expand All @@ -316,44 +315,44 @@ pub async fn merge_imms_in_memory(

let first_item_key = mi.current_key_entry().key.clone();

let mut merged_payload: Vec<SharedBufferVersionedEntry> = Vec::new();
let mut merged_entries: Vec<SharedBufferKeyEntry> = Vec::with_capacity(key_count);
let mut values: Vec<VersionedSharedBufferValue> = Vec::with_capacity(value_count);

merged_entries.push(SharedBufferKeyEntry {
key: first_item_key.clone(),
value_offset: 0,
});

// Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed
let mut full_key_tracker = FullKeyTracker::<Bytes>::new(FullKey::new_with_gap_epoch(
table_id,
first_item_key,
EpochWithGap::new_max_epoch(),
));
let mut table_key_versions: Vec<(EpochWithGap, HummockValue<Bytes>)> = Vec::new();

while mi.is_valid() {
let key_entry = mi.current_key_entry();
let user_key = UserKey {
table_id,
table_key: key_entry.key.clone(),
};
if let Some(last_full_key) = full_key_tracker.observe_multi_version(
user_key,
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, _)| *epoch_with_gap),
) {
let last_user_key = last_full_key.user_key;
// `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items
// and should not be used because we use max epoch to initialize the tracker
let _epoch_with_gap = last_full_key.epoch_with_gap;

if full_key_tracker
.observe_multi_version(
user_key,
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, _)| *epoch_with_gap),
)
.is_some()
{
// Record kv entries
merged_payload.push(SharedBufferVersionedEntry::new(
last_user_key.table_key,
table_key_versions,
));

// Reset state before moving onto the new table key
table_key_versions = vec![];
merged_entries.push(SharedBufferKeyEntry {
key: full_key_tracker.latest_user_key().table_key.clone(),
value_offset: values.len(),
});
}
table_key_versions.extend(
values.extend(
key_entry
.new_values
.iter()
Expand All @@ -365,19 +364,11 @@ pub async fn merge_imms_in_memory(
tokio::task::consume_budget().await;
}

// process the last key
if !table_key_versions.is_empty() {
merged_payload.push(SharedBufferVersionedEntry::new(
full_key_tracker.latest_full_key.user_key.table_key,
table_key_versions,
));
}

SharedBufferBatch {
inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches(
epochs,
merged_payload,
kv_count,
merged_entries,
values,
merged_size,
max_imm_id,
memory_tracker,
Expand All @@ -398,7 +389,7 @@ fn generate_splits(
for imm in payload {
let data_size = {
// calculate encoded bytes of key var length
(imm.kv_count() * 8 + imm.size()) as u64
(imm.value_count() * EPOCH_LEN + imm.size()) as u64
};
compact_data_size += data_size;
size_and_start_user_keys.push((data_size, imm.start_user_key()));
Expand Down
12 changes: 7 additions & 5 deletions src/storage/src/hummock/event_handler/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,18 +868,18 @@ impl HummockUploader {
.filter(|(_, imms)| imms.len() >= self.context.imm_merge_threshold)
{
let imms_to_merge = imms.drain(..).collect_vec();
let mut kv_count = 0;
let mut value_count = 0;
let mut imm_size = 0;
imms_to_merge.iter().for_each(|imm| {
// ensure imms are sealed
assert_le!(imm.max_epoch(), sealed_epoch);
kv_count += imm.kv_count();
value_count += imm.value_count();
imm_size += imm.size();
});

// acquire memory before generate merge task
// if acquire memory failed, the task will not be generated
let memory_sz = (imm_size + kv_count * EPOCH_LEN) as u64;
let memory_sz = (imm_size + value_count * EPOCH_LEN) as u64;
if let Some(tracker) = memory_limiter.try_require_memory(memory_sz) {
self.sealed_data
.merging_tasks
Expand Down Expand Up @@ -1125,7 +1125,9 @@ impl HummockUploader {
.stats
.merge_imm_batch_memory_sz
.with_label_values(&[table_id_label.as_str()])
.inc_by((output.merged_imm.size() + output.merged_imm.kv_count() * EPOCH_LEN) as _);
.inc_by(
(output.merged_imm.size() + output.merged_imm.value_count() * EPOCH_LEN) as _,
);
}
poll_ret
}
Expand Down Expand Up @@ -1667,7 +1669,7 @@ mod tests {
imm = &mut task => {
println!("merging task success");
assert_eq!(table_id, imm.table_id);
assert_eq!(9, imm.kv_count());
assert_eq!(9, imm.value_count());
}
}
task.join_handle.abort();
Expand Down
8 changes: 4 additions & 4 deletions src/storage/src/hummock/iterator/forward_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
iterator: I,

// Track the last seen full key
full_key_tracker: FullKeyTracker<Bytes>,
full_key_tracker: FullKeyTracker<Bytes, true>,

/// Last user value
latest_val: Bytes,
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
delete_range_iter,
_version: version,
is_current_pos_valid: false,
full_key_tracker: FullKeyTracker::with_config(FullKey::default(), true),
full_key_tracker: FullKeyTracker::new(FullKey::default()),
}
}

Expand Down Expand Up @@ -136,7 +136,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
pub async fn rewind(&mut self) -> HummockResult<()> {
// Reset
self.is_current_pos_valid = false;
self.full_key_tracker = FullKeyTracker::with_config(FullKey::default(), true);
self.full_key_tracker = FullKeyTracker::new(FullKey::default());

// Handle range scan
match &self.key_range.0 {
Expand Down Expand Up @@ -180,7 +180,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
// Reset
self.is_current_pos_valid = false;
self.full_key_tracker = FullKeyTracker::with_config(FullKey::default(), true);
self.full_key_tracker = FullKeyTracker::new(FullKey::default());

// Handle range scan when key < begin_key
let user_key = match &self.key_range.0 {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/iterator/merge_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::key::FullKey;
use super::Forward;
use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatchIterator, SharedBufferVersionedEntry,
SharedBufferBatchIterator, SharedBufferVersionedEntryRef,
};
use crate::hummock::value::HummockValue;
use crate::hummock::HummockResult;
Expand Down Expand Up @@ -100,7 +100,7 @@ impl<I: HummockIterator> MergeIterator<I> {

impl MergeIterator<SharedBufferBatchIterator<Forward>> {
/// Used in `merge_imms_in_memory` to merge immutable memtables.
pub fn current_key_entry(&self) -> &SharedBufferVersionedEntry {
pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> {
self.heap
.peek()
.expect("no inner iter for imm merge")
Expand Down
Loading
Loading