From 364c341faf3607816c105cb75747b9e10bfe56df Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 27 Feb 2024 19:02:23 +0800 Subject: [PATCH 1/5] feat(storage): separate key and values in imm to reduce allocation --- src/storage/hummock_sdk/src/key.rs | 48 +++--- .../compactor/shared_buffer_compact.rs | 55 +++---- .../src/hummock/iterator/forward_user.rs | 8 +- .../src/hummock/iterator/merge_inner.rs | 4 +- .../shared_buffer/shared_buffer_batch.rs | 144 ++++++++++-------- 5 files changed, 136 insertions(+), 123 deletions(-) diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index 0395c58fd10a4..0a6ea9d993782 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -932,24 +932,18 @@ pub fn bound_table_key_range + EmptySliceRef>( (start, end) } -pub struct FullKeyTracker + Ord + Eq> { +/// TODO: Temporary bypass full key check. Remove this field after #15099 is resolved. +pub struct FullKeyTracker + Ord + Eq, const SKIP_DEDUP: bool = false> { pub latest_full_key: FullKey, last_observed_epoch_with_gap: EpochWithGap, - /// TODO: Temporary bypass full key check. Remove this field after #15099 is resolved. - allow_same_full_key: bool, } -impl + Ord + Eq> FullKeyTracker { +impl + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker { pub fn new(init_full_key: FullKey) -> Self { - Self::with_config(init_full_key, false) - } - - pub fn with_config(init_full_key: FullKey, allow_same_full_key: bool) -> Self { let epoch_with_gap = init_full_key.epoch_with_gap; Self { latest_full_key: init_full_key, last_observed_epoch_with_gap: epoch_with_gap, - allow_same_full_key, } } @@ -965,7 +959,7 @@ impl + Ord + Eq> FullKeyTracker { /// /// let table_id = TableId { table_id: 1 }; /// let full_key1 = FullKey::new(table_id, TableKey(Bytes::from("c")), 5 << EPOCH_AVAILABLE_BITS); - /// let mut a = FullKeyTracker::::new(full_key1.clone()); + /// let mut a: FullKeyTracker<_> = FullKeyTracker::::new(full_key1.clone()); /// /// // Panic on non-decreasing epoch observed for the same user key. /// // let full_key_with_larger_epoch = FullKey::new(table_id, TableKey(Bytes::from("c")), 6 << EPOCH_AVAILABLE_BITS); @@ -976,16 +970,18 @@ impl + Ord + Eq> FullKeyTracker { /// // a.observe(full_key_with_smaller_user_key); /// /// let full_key2 = FullKey::new(table_id, TableKey(Bytes::from("c")), 3 << EPOCH_AVAILABLE_BITS); - /// assert_eq!(a.observe(full_key2), None); + /// assert_eq!(a.observe(full_key2.clone()), None); + /// assert_eq!(a.latest_user_key(), &full_key2.user_key); /// /// let full_key3 = FullKey::new(table_id, TableKey(Bytes::from("f")), 4 << EPOCH_AVAILABLE_BITS); - /// assert_eq!(a.observe(full_key3), Some(full_key1)); + /// assert_eq!(a.observe(full_key3.clone()), Some(full_key1.user_key)); + /// assert_eq!(a.latest_user_key(), &full_key3.user_key); /// ``` /// /// Return: /// - If the provided `key` contains a new user key, return the latest full key observed for the previous user key. /// - Otherwise: return None - pub fn observe(&mut self, key: FullKey) -> Option> + pub fn observe(&mut self, key: FullKey) -> Option> where UserKey: Into>, F: AsRef<[u8]>, @@ -998,7 +994,7 @@ impl + Ord + Eq> FullKeyTracker { &mut self, user_key: UserKey, mut epochs: impl Iterator, - ) -> Option> + ) -> Option> where UserKey: Into>, F: AsRef<[u8]>, @@ -1030,18 +1026,20 @@ impl + Ord + Eq> FullKeyTracker { self.last_observed_epoch_with_gap = min_epoch_with_gap; // Take the previous key and set latest key - Some(std::mem::replace( - &mut self.latest_full_key, - FullKey { - user_key: user_key.into(), - epoch_with_gap: min_epoch_with_gap, - }, - )) + Some( + std::mem::replace( + &mut self.latest_full_key, + FullKey { + user_key: user_key.into(), + epoch_with_gap: min_epoch_with_gap, + }, + ) + .user_key, + ) } Ordering::Equal => { if max_epoch_with_gap > self.last_observed_epoch_with_gap - || (!self.allow_same_full_key - && max_epoch_with_gap == self.last_observed_epoch_with_gap) + || (!SKIP_DEDUP && max_epoch_with_gap == self.last_observed_epoch_with_gap) { // Epoch from the same user key should be monotonically decreasing panic!( @@ -1065,6 +1063,10 @@ impl + Ord + Eq> FullKeyTracker { } } } + + pub fn latest_user_key(&self) -> &UserKey { + &self.latest_full_key.user_key + } } #[cfg(test)] diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index a06a1eaca6761..19d83bd160783 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -41,10 +41,9 @@ use crate::hummock::iterator::{ Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, UserIterator, }; use crate::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatch, SharedBufferBatchInner, SharedBufferVersionedEntry, + SharedBufferBatch, SharedBufferBatchInner, SharedBufferKeyEntry, VersionedSharedBufferValue, }; use crate::hummock::utils::MemoryTracker; -use crate::hummock::value::HummockValue; use crate::hummock::{ BlockedXor16FilterBuilder, CachePolicy, CompactionDeleteRangeIterator, GetObjectId, HummockError, HummockResult, SstableBuilderOptions, SstableObjectIdManagerRef, @@ -316,7 +315,8 @@ pub async fn merge_imms_in_memory( let first_item_key = mi.current_key_entry().key.clone(); - let mut merged_payload: Vec = Vec::new(); + let mut merged_entries: Vec = Vec::new(); + let mut values: Vec = Vec::new(); // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed let mut full_key_tracker = FullKeyTracker::::new(FullKey::new_with_gap_epoch( @@ -324,7 +324,6 @@ pub async fn merge_imms_in_memory( first_item_key, EpochWithGap::new_max_epoch(), )); - let mut table_key_versions: Vec<(EpochWithGap, HummockValue)> = Vec::new(); while mi.is_valid() { let key_entry = mi.current_key_entry(); @@ -332,30 +331,25 @@ pub async fn merge_imms_in_memory( table_id, table_key: key_entry.key.clone(), }; - if let Some(last_full_key) = full_key_tracker.observe_multi_version( - user_key, - key_entry - .new_values - .iter() - .map(|(epoch_with_gap, _)| *epoch_with_gap), - ) { - let last_user_key = last_full_key.user_key; - // `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items - // and should not be used because we use max epoch to initialize the tracker - let _epoch_with_gap = last_full_key.epoch_with_gap; - + if full_key_tracker + .observe_multi_version( + user_key, + key_entry + .values + .iter() + .map(|(epoch_with_gap, _)| *epoch_with_gap), + ) + .is_some() + { // Record kv entries - merged_payload.push(SharedBufferVersionedEntry::new( - last_user_key.table_key, - table_key_versions, - )); - - // Reset state before moving onto the new table key - table_key_versions = vec![]; + merged_entries.push(SharedBufferKeyEntry { + key: full_key_tracker.latest_user_key().table_key.clone(), + value_offset: values.len(), + }); } - table_key_versions.extend( + values.extend( key_entry - .new_values + .values .iter() .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), ); @@ -365,18 +359,11 @@ pub async fn merge_imms_in_memory( tokio::task::consume_budget().await; } - // process the last key - if !table_key_versions.is_empty() { - merged_payload.push(SharedBufferVersionedEntry::new( - full_key_tracker.latest_full_key.user_key.table_key, - table_key_versions, - )); - } - SharedBufferBatch { inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches( epochs, - merged_payload, + merged_entries, + values, kv_count, merged_size, max_imm_id, diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index f006d041d5540..99a3fb14807c9 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -32,7 +32,7 @@ pub struct UserIterator> { iterator: I, // Track the last seen full key - full_key_tracker: FullKeyTracker, + full_key_tracker: FullKeyTracker, /// Last user value latest_val: Bytes, @@ -78,7 +78,7 @@ impl> UserIterator { delete_range_iter, _version: version, is_current_pos_valid: false, - full_key_tracker: FullKeyTracker::with_config(FullKey::default(), true), + full_key_tracker: FullKeyTracker::new(FullKey::default()), } } @@ -136,7 +136,7 @@ impl> UserIterator { pub async fn rewind(&mut self) -> HummockResult<()> { // Reset self.is_current_pos_valid = false; - self.full_key_tracker = FullKeyTracker::with_config(FullKey::default(), true); + self.full_key_tracker = FullKeyTracker::new(FullKey::default()); // Handle range scan match &self.key_range.0 { @@ -180,7 +180,7 @@ impl> UserIterator { pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> { // Reset self.is_current_pos_valid = false; - self.full_key_tracker = FullKeyTracker::with_config(FullKey::default(), true); + self.full_key_tracker = FullKeyTracker::new(FullKey::default()); // Handle range scan when key < begin_key let user_key = match &self.key_range.0 { diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index 669db4cd3aab2..00c789a386bd7 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -22,7 +22,7 @@ use risingwave_hummock_sdk::key::FullKey; use super::Forward; use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; use crate::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatchIterator, SharedBufferVersionedEntry, + SharedBufferBatchIterator, SharedBufferVersionedEntryRef, }; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; @@ -100,7 +100,7 @@ impl MergeIterator { impl MergeIterator> { /// Used in `merge_imms_in_memory` to merge immutable memtables. - pub fn current_key_entry(&self) -> &SharedBufferVersionedEntry { + pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> { self.heap .peek() .expect("no inner iter for imm merge") diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index f4c8fce2dea9f..6fde7d9fe12fa 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -17,7 +17,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::ops::Bound::Included; -use std::ops::{Bound, Deref, RangeBounds}; +use std::ops::{Bound, RangeBounds}; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, LazyLock}; @@ -45,24 +45,39 @@ use crate::store::ReadOptions; pub(crate) type SharedBufferItem = (TableKey, HummockValue); pub type SharedBufferBatchId = u64; -/// A shared buffer may contain data from multiple epochs, -/// there are multiple versions for a given key (`table_key`), we put those versions into a vector -/// and sort them in descending order, aka newest to oldest. -#[derive(PartialEq, Debug)] -pub struct SharedBufferVersionedEntry { - pub(crate) key: TableKey, - pub(crate) new_values: Vec<(EpochWithGap, HummockValue)>, +pub(crate) type VersionedSharedBufferValue = (EpochWithGap, HummockValue); + +pub(crate) struct SharedBufferVersionedEntryRef<'a> { + pub(crate) key: &'a TableKey, + pub(crate) values: &'a [VersionedSharedBufferValue], } -impl SharedBufferVersionedEntry { - pub fn new(key: TableKey, new_values: Vec<(EpochWithGap, HummockValue)>) -> Self { - Self { key, new_values } - } +fn values<'a>( + i: usize, + entries: &'a [SharedBufferKeyEntry], + values: &'a [VersionedSharedBufferValue], +) -> &'a [VersionedSharedBufferValue] { + &values[entries[i].value_offset + ..entries + .get(i) + .map(|entry| entry.value_offset) + .unwrap_or(values.len())] +} + +#[derive(PartialEq, Debug)] +pub(crate) struct SharedBufferKeyEntry { + pub(crate) key: TableKey, + /// A shared buffer may contain data from multiple epochs for a specific key. + /// The values of all keys are stored together in the field `new_values` of `SharedBufferBatchInner` + /// as a single vector. `value_offset` is the starting offset of values of the current `key` in the `new_values` vector. + /// The end offset is the `value_offset` of the next entry or the vector end if the current entry is not the last one. + pub(crate) value_offset: usize, } #[derive(Debug)] pub(crate) struct SharedBufferBatchInner { - payload: Vec, + entries: Vec, + new_values: Vec, /// The epochs of the data in batch, sorted in ascending order (old to new) epochs: Vec, kv_count: usize, @@ -87,14 +102,20 @@ impl SharedBufferBatchInner { let kv_count = payload.len(); let epoch_with_gap = EpochWithGap::new(epoch, spill_offset); - let items = payload - .into_iter() - .map(|(k, v)| SharedBufferVersionedEntry::new(k, vec![(epoch_with_gap, v)])) - .collect_vec(); + let mut entries = Vec::with_capacity(payload.len()); + let mut new_values = Vec::with_capacity(payload.len()); + for (i, (key, value)) in payload.into_iter().enumerate() { + entries.push(SharedBufferKeyEntry { + key, + value_offset: i, + }); + new_values.push((epoch_with_gap, value)); + } let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed); SharedBufferBatchInner { - payload: items, + entries, + new_values, epochs: vec![epoch], kv_count, size, @@ -103,19 +124,25 @@ impl SharedBufferBatchInner { } } + pub fn values(&self, i: usize) -> &[VersionedSharedBufferValue] { + values(i, &self.entries, &self.new_values) + } + #[allow(clippy::too_many_arguments)] pub(crate) fn new_with_multi_epoch_batches( epochs: Vec, - payload: Vec, + entries: Vec, + new_values: Vec, num_items: usize, size: usize, imm_id: ImmId, tracker: Option, ) -> Self { - assert!(!payload.is_empty()); - debug_assert!(payload.iter().is_sorted_by_key(|entry| &entry.key)); - debug_assert!(payload.iter().all(|entry| entry - .new_values + assert!(new_values.len() >= entries.len()); + assert!(!entries.is_empty()); + debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.key)); + debug_assert!(entries.iter().is_sorted_by_key(|entry| &entry.value_offset)); + debug_assert!((0..entries.len()).all(|i| values(i, &entries, &new_values) .iter() .rev() .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap))); @@ -123,7 +150,8 @@ impl SharedBufferBatchInner { debug_assert!(epochs.is_sorted()); Self { - payload, + entries, + new_values, epochs, kv_count: num_items, size, @@ -141,13 +169,13 @@ impl SharedBufferBatchInner { ) -> Option<(HummockValue, EpochWithGap)> { // Perform binary search on table key to find the corresponding entry if let Ok(i) = self - .payload + .entries .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key)) { - let item = &self.payload[i]; - assert_eq!(item.key.as_ref(), *table_key); + let entry = &self.entries[i]; + assert_eq!(entry.key.as_ref(), *table_key); // Scan to find the first version <= epoch - for (e, v) in &item.new_values { + for (e, v) in self.values(i) { // skip invisible versions if read_epoch < e.pure_epoch() { continue; @@ -161,17 +189,9 @@ impl SharedBufferBatchInner { } } -impl Deref for SharedBufferBatchInner { - type Target = [SharedBufferVersionedEntry]; - - fn deref(&self) -> &Self::Target { - self.payload.as_slice() - } -} - impl PartialEq for SharedBufferBatchInner { fn eq(&self, other: &Self) -> bool { - self.payload == other.payload + self.entries == other.entries && self.new_values == other.new_values } } @@ -288,6 +308,7 @@ impl SharedBufferBatch { pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool { self.inner + .entries .binary_search_by(|m| { let key = &m.key; let too_left = match &table_key_range.0 { @@ -325,23 +346,19 @@ impl SharedBufferBatch { self.into_directed_iter() } - pub fn get_payload(&self) -> &[SharedBufferVersionedEntry] { - &self.inner - } - #[inline(always)] pub fn start_table_key(&self) -> TableKey<&[u8]> { - TableKey(self.inner.payload.first().expect("non-empty").key.as_ref()) + TableKey(self.inner.entries.first().expect("non-empty").key.as_ref()) } #[inline(always)] pub fn end_table_key(&self) -> TableKey<&[u8]> { - TableKey(self.inner.payload.last().expect("non-empty").key.as_ref()) + TableKey(self.inner.entries.last().expect("non-empty").key.as_ref()) } #[inline(always)] pub fn raw_largest_key(&self) -> &TableKey { - &self.inner.payload.last().expect("non-empty").key + &self.inner.entries.last().expect("non-empty").key } /// return inclusive left endpoint, which means that all data in this batch should be larger or @@ -399,16 +416,16 @@ impl SharedBufferBatch { ); let idx = match self .inner - .payload + .entries .binary_search_by(|m| (m.key.as_ref()).cmp(seek_key.as_slice())) { Ok(idx) => idx, Err(idx) => idx, }; - if idx >= self.inner.payload.len() { + if idx >= self.inner.entries.len() { break; } - let item = &self.inner.payload[idx]; + let item = &self.inner.entries[idx]; if item.key.len() <= VirtualNode::SIZE { break; } @@ -459,17 +476,17 @@ impl SharedBufferBatchIterator { } /// Return all values of the current key - pub(crate) fn current_values(&self) -> &Vec<(EpochWithGap, HummockValue)> { - debug_assert!(self.current_idx < self.inner.len()); + pub(crate) fn current_values(&self) -> &[VersionedSharedBufferValue] { + debug_assert!(self.current_idx < self.inner.entries.len()); let idx = match D::direction() { DirectionEnum::Forward => self.current_idx, - DirectionEnum::Backward => self.inner.len() - self.current_idx - 1, + DirectionEnum::Backward => self.inner.entries.len() - self.current_idx - 1, }; - &self.inner[idx].new_values + self.inner.values(idx) } fn current_values_len(&self) -> i32 { - if self.current_idx < self.inner.len() { + if self.current_idx < self.inner.entries.len() { self.current_values().len() as i32 } else { 0 @@ -480,12 +497,15 @@ impl SharedBufferBatchIterator { let (idx, value_idx) = match D::direction() { DirectionEnum::Forward => (self.current_idx, self.current_value_idx), DirectionEnum::Backward => ( - self.inner.len() - self.current_idx - 1, + self.inner.entries.len() - self.current_idx - 1, self.current_value_idx, ), }; - let cur_entry = &self.inner[idx]; - (&cur_entry.key, &cur_entry.new_values[value_idx as usize]) + let cur_entry = &self.inner.entries[idx]; + ( + &cur_entry.key, + &self.inner.new_values[cur_entry.value_offset + value_idx as usize], + ) } } @@ -495,10 +515,13 @@ impl SharedBufferBatchIterator { self.current_idx += 1; } - pub(crate) fn current_key_entry(&self) -> &SharedBufferVersionedEntry { + pub(crate) fn current_key_entry(&self) -> SharedBufferVersionedEntryRef<'_> { assert!(self.is_valid(), "iterator is not valid"); assert_eq!(self.current_value_idx, 0); - &self.inner.payload[self.current_idx] + SharedBufferVersionedEntryRef { + key: &self.inner.entries[self.current_idx].key, + values: self.inner.values(self.current_idx), + } } } @@ -540,7 +563,7 @@ impl HummockIterator for SharedBufferBatchIterator< } fn is_valid(&self) -> bool { - if self.current_idx >= self.inner.len() { + if self.current_idx >= self.inner.entries.len() { return false; } self.current_value_idx >= 0 && self.current_value_idx < self.current_values().len() as i32 @@ -566,6 +589,7 @@ impl HummockIterator for SharedBufferBatchIterator< // by table key. let partition_point = self .inner + .entries .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key)); let seek_key_epoch = key.epoch_with_gap; match D::direction() { @@ -598,7 +622,7 @@ impl HummockIterator for SharedBufferBatchIterator< DirectionEnum::Backward => { match partition_point { Ok(i) => { - self.current_idx = self.inner.len() - i - 1; + self.current_idx = self.inner.entries.len() - i - 1; // seek from back to the first version that is >= seek_key_epoch let values = self.current_values(); let mut idx: i32 = (values.len() - 1) as i32; @@ -620,7 +644,7 @@ impl HummockIterator for SharedBufferBatchIterator< // If i == 0, the iterator will be invalidated with self.current_idx == // self.inner.len(). Err(i) => { - self.current_idx = self.inner.len() - i; + self.current_idx = self.inner.entries.len() - i; self.current_value_idx = self.current_values_len() - 1; } } From f594c938a5c619e4182588f2012977c5a00f2409 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 27 Feb 2024 20:00:33 +0800 Subject: [PATCH 2/5] fix --- .../src/hummock/compactor/shared_buffer_compact.rs | 7 ++----- .../hummock/shared_buffer/shared_buffer_batch.rs | 13 ++++--------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 19d83bd160783..b5237747f501e 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -285,7 +285,6 @@ pub async fn merge_imms_in_memory( imms: Vec, memory_tracker: Option, ) -> ImmutableMemtable { - let mut kv_count = 0; let mut epochs = vec![]; let mut merged_size = 0; assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted()); @@ -301,7 +300,6 @@ pub async fn merge_imms_in_memory( ); epochs.push(imm.min_epoch()); - kv_count += imm.kv_count(); merged_size += imm.size(); imm_iters.push(imm.into_forward_iter()); @@ -335,7 +333,7 @@ pub async fn merge_imms_in_memory( .observe_multi_version( user_key, key_entry - .values + .new_values .iter() .map(|(epoch_with_gap, _)| *epoch_with_gap), ) @@ -349,7 +347,7 @@ pub async fn merge_imms_in_memory( } values.extend( key_entry - .values + .new_values .iter() .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), ); @@ -364,7 +362,6 @@ pub async fn merge_imms_in_memory( epochs, merged_entries, values, - kv_count, merged_size, max_imm_id, memory_tracker, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 6fde7d9fe12fa..021d0ae9c0086 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -49,7 +49,7 @@ pub(crate) type VersionedSharedBufferValue = (EpochWithGap, HummockValue) pub(crate) struct SharedBufferVersionedEntryRef<'a> { pub(crate) key: &'a TableKey, - pub(crate) values: &'a [VersionedSharedBufferValue], + pub(crate) new_values: &'a [VersionedSharedBufferValue], } fn values<'a>( @@ -59,7 +59,7 @@ fn values<'a>( ) -> &'a [VersionedSharedBufferValue] { &values[entries[i].value_offset ..entries - .get(i) + .get(i + 1) .map(|entry| entry.value_offset) .unwrap_or(values.len())] } @@ -80,7 +80,6 @@ pub(crate) struct SharedBufferBatchInner { new_values: Vec, /// The epochs of the data in batch, sorted in ascending order (old to new) epochs: Vec, - kv_count: usize, /// Total size of all key-value items (excluding the `epoch` of value versions) size: usize, _tracker: Option, @@ -100,7 +99,6 @@ impl SharedBufferBatchInner { assert!(!payload.is_empty()); debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key)); - let kv_count = payload.len(); let epoch_with_gap = EpochWithGap::new(epoch, spill_offset); let mut entries = Vec::with_capacity(payload.len()); let mut new_values = Vec::with_capacity(payload.len()); @@ -117,7 +115,6 @@ impl SharedBufferBatchInner { entries, new_values, epochs: vec![epoch], - kv_count, size, _tracker, batch_id, @@ -133,7 +130,6 @@ impl SharedBufferBatchInner { epochs: Vec, entries: Vec, new_values: Vec, - num_items: usize, size: usize, imm_id: ImmId, tracker: Option, @@ -153,7 +149,6 @@ impl SharedBufferBatchInner { entries, new_values, epochs, - kv_count: num_items, size, _tracker: tracker, batch_id: imm_id, @@ -294,7 +289,7 @@ impl SharedBufferBatch { } pub fn kv_count(&self) -> usize { - self.inner.kv_count + self.inner.new_values.len() } pub fn get( @@ -520,7 +515,7 @@ impl SharedBufferBatchIterator { assert_eq!(self.current_value_idx, 0); SharedBufferVersionedEntryRef { key: &self.inner.entries[self.current_idx].key, - values: self.inner.values(self.current_idx), + new_values: self.inner.values(self.current_idx), } } } From e36cf6181d8f159c6f7b850e0692ca6f92586958 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 28 Feb 2024 15:20:38 +0800 Subject: [PATCH 3/5] fix --- .../compactor/shared_buffer_compact.rs | 19 +++++++++++++------ .../src/hummock/event_handler/uploader.rs | 12 +++++++----- .../shared_buffer/shared_buffer_batch.rs | 6 +++++- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index b5237747f501e..8a370be28e724 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -24,7 +24,7 @@ use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; -use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey}; +use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, EPOCH_LEN}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::compact_task; @@ -146,7 +146,7 @@ async fn compact_shared_buffer( ret }); - let total_key_count = payload.iter().map(|imm| imm.kv_count()).sum::(); + let total_key_count = payload.iter().map(|imm| imm.key_count()).sum::(); let (splits, sub_compaction_sstable_size, split_weight_by_vnode) = generate_splits(&payload, &existing_table_ids, context.storage_opts.as_ref()); let parallelism = splits.len(); @@ -291,8 +291,10 @@ pub async fn merge_imms_in_memory( let max_imm_id = imms[0].batch_id(); let mut imm_iters = Vec::with_capacity(imms.len()); + let key_count = imms.iter().map(|imm| imm.key_count()).sum(); + let value_count = imms.iter().map(|imm| imm.value_count()).sum(); for imm in imms { - assert!(imm.kv_count() > 0, "imm should not be empty"); + assert!(imm.key_count() > 0, "imm should not be empty"); assert_eq!( table_id, imm.table_id(), @@ -313,8 +315,13 @@ pub async fn merge_imms_in_memory( let first_item_key = mi.current_key_entry().key.clone(); - let mut merged_entries: Vec = Vec::new(); - let mut values: Vec = Vec::new(); + let mut merged_entries: Vec = Vec::with_capacity(key_count); + let mut values: Vec = Vec::with_capacity(value_count); + + merged_entries.push(SharedBufferKeyEntry { + key: first_item_key.clone(), + value_offset: 0, + }); // Use first key, max epoch to initialize the tracker to ensure that the check first call to full_key_tracker.observe will succeed let mut full_key_tracker = FullKeyTracker::::new(FullKey::new_with_gap_epoch( @@ -382,7 +389,7 @@ fn generate_splits( for imm in payload { let data_size = { // calculate encoded bytes of key var length - (imm.kv_count() * 8 + imm.size()) as u64 + (imm.value_count() * EPOCH_LEN + imm.size()) as u64 }; compact_data_size += data_size; size_and_start_user_keys.push((data_size, imm.start_user_key())); diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 7ffa3956c2349..1005b1a8730db 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -868,18 +868,18 @@ impl HummockUploader { .filter(|(_, imms)| imms.len() >= self.context.imm_merge_threshold) { let imms_to_merge = imms.drain(..).collect_vec(); - let mut kv_count = 0; + let mut value_count = 0; let mut imm_size = 0; imms_to_merge.iter().for_each(|imm| { // ensure imms are sealed assert_le!(imm.max_epoch(), sealed_epoch); - kv_count += imm.kv_count(); + value_count += imm.value_count(); imm_size += imm.size(); }); // acquire memory before generate merge task // if acquire memory failed, the task will not be generated - let memory_sz = (imm_size + kv_count * EPOCH_LEN) as u64; + let memory_sz = (imm_size + value_count * EPOCH_LEN) as u64; if let Some(tracker) = memory_limiter.try_require_memory(memory_sz) { self.sealed_data .merging_tasks @@ -1125,7 +1125,9 @@ impl HummockUploader { .stats .merge_imm_batch_memory_sz .with_label_values(&[table_id_label.as_str()]) - .inc_by((output.merged_imm.size() + output.merged_imm.kv_count() * EPOCH_LEN) as _); + .inc_by( + (output.merged_imm.size() + output.merged_imm.value_count() * EPOCH_LEN) as _, + ); } poll_ret } @@ -1667,7 +1669,7 @@ mod tests { imm = &mut task => { println!("merging task success"); assert_eq!(table_id, imm.table_id); - assert_eq!(9, imm.kv_count()); + assert_eq!(9, imm.value_count()); } } task.join_handle.abort(); diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 021d0ae9c0086..b676298b64859 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -288,7 +288,11 @@ impl SharedBufferBatch { *self.inner.epochs.last().unwrap() } - pub fn kv_count(&self) -> usize { + pub fn key_count(&self) -> usize { + self.inner.entries.len() + } + + pub fn value_count(&self) -> usize { self.inner.new_values.len() } From c26b6b27eef8f961bee4a2350330311ace971af6 Mon Sep 17 00:00:00 2001 From: William Wen Date: Sun, 3 Mar 2024 16:55:10 +0800 Subject: [PATCH 4/5] add benches --- src/storage/Cargo.toml | 4 + src/storage/benches/bench_imm_compact.rs | 83 +++++++++++++++++++ .../shared_buffer/shared_buffer_batch.rs | 2 +- 3 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 src/storage/benches/bench_imm_compact.rs diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 16f7156470788..d1faa33d9624a 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -139,5 +139,9 @@ harness = false name = "bench_row" harness = false +[[bench]] +name = "bench_imm_compact" +harness = false + [lints] workspace = true diff --git a/src/storage/benches/bench_imm_compact.rs b/src/storage/benches/bench_imm_compact.rs new file mode 100644 index 0000000000000..dbc5eb8c2b591 --- /dev/null +++ b/src/storage/benches/bench_imm_compact.rs @@ -0,0 +1,83 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use criterion::async_executor::FuturesExecutor; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use risingwave_common::catalog::TableId; +use risingwave_hummock_sdk::key::TableKey; +use risingwave_storage::hummock::compactor::merge_imms_in_memory; +use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; +use risingwave_storage::hummock::value::HummockValue; + +fn gen_interleave_shared_buffer_batch( + batch_size: usize, + batch_count: usize, + epoch: u64, +) -> Vec { + let mut batches = Vec::new(); + for i in 0..batch_count { + let mut batch_data = vec![]; + for j in 0..batch_size { + batch_data.push(( + TableKey(Bytes::copy_from_slice( + format!("test_key_{:08}", j * batch_count + i).as_bytes(), + )), + HummockValue::put(Bytes::copy_from_slice("value".as_bytes())), + )); + } + let batch = SharedBufferBatch::for_test(batch_data, epoch, Default::default()); + batches.push(batch); + } + batches.reverse(); + batches +} + +fn criterion_benchmark(c: &mut Criterion) { + let batches = gen_interleave_shared_buffer_batch(10000, 100, 100); + c.bench_with_input( + BenchmarkId::new("bench-imm-merge", "single-epoch"), + &batches, + |b, batches| { + b.to_async(FuturesExecutor).iter(|| async { + let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await; + assert_eq!(imm.key_count(), 10000 * 100); + assert_eq!(imm.value_count(), 10000 * 100); + }) + }, + ); + + let mut later_batches = gen_interleave_shared_buffer_batch(2000, 100, 600); + + for i in 1..5 { + let mut batches = gen_interleave_shared_buffer_batch(2000, 100, 600 - i * 100); + batches.extend(later_batches); + later_batches = batches; + } + + c.bench_with_input( + BenchmarkId::new("bench-imm-merge", "multi-epoch"), + &later_batches, + |b, batches| { + b.to_async(FuturesExecutor).iter(|| async { + let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await; + assert_eq!(imm.key_count(), 2000 * 100); + assert_eq!(imm.value_count(), 2000 * 100 * 5); + }) + }, + ); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index b676298b64859..3b3ddd4ee34f4 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -218,7 +218,7 @@ impl SharedBufferBatch { None, )), table_id, - instance_id: LocalInstanceId::default(), + instance_id: SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed), } } From 131e2ba98c9078dde2d8e1f723f6a1f68bbdb38f Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 4 Mar 2024 13:15:39 +0800 Subject: [PATCH 5/5] skip empty key --- .../hummock/compactor/shared_buffer_compact.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 8a370be28e724..3e7e58938caf9 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -29,7 +29,7 @@ use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::{CompactionGroupId, EpochWithGap, HummockEpoch, LocalSstableInfo}; use risingwave_pb::hummock::compact_task; use thiserror_ext::AsReport; -use tracing::error; +use tracing::{error, warn}; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManager}; use crate::hummock::compactor::compaction_filter::DummyCompactionFilter; @@ -346,11 +346,17 @@ pub async fn merge_imms_in_memory( ) .is_some() { - // Record kv entries - merged_entries.push(SharedBufferKeyEntry { - key: full_key_tracker.latest_user_key().table_key.clone(), - value_offset: values.len(), - }); + let last_entry = merged_entries.last_mut().expect("non-empty"); + if last_entry.value_offset == values.len() { + warn!(key = ?last_entry.key, "key has no value in imm compact. skipped"); + last_entry.key = full_key_tracker.latest_user_key().table_key.clone(); + } else { + // Record kv entries + merged_entries.push(SharedBufferKeyEntry { + key: full_key_tracker.latest_user_key().table_key.clone(), + value_offset: values.len(), + }); + } } values.extend( key_entry