From 2abc32f2a3aa4bff0e53ed64b6d45c664caea081 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 29 Jan 2024 00:37:08 +0800 Subject: [PATCH 1/6] refactor(storage): do not track imm ids in imm --- src/storage/hummock_sdk/src/key.rs | 54 +++++++++++++++---- .../compactor/shared_buffer_compact.rs | 37 ++++++++----- .../event_handler/hummock_event_handler.rs | 25 +++------ .../src/hummock/event_handler/uploader.rs | 49 +++++++++-------- .../src/hummock/iterator/merge_inner.rs | 22 +++++++- .../shared_buffer/shared_buffer_batch.rs | 44 +++++++-------- src/storage/src/hummock/store/version.rs | 24 ++++----- 7 files changed, 153 insertions(+), 102 deletions(-) diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index b65bb2a05ddd5..e46cc8b65cfc9 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -15,6 +15,7 @@ use std::borrow::Borrow; use std::cmp::Ordering; use std::fmt::Debug; +use std::iter::once; use std::ops::Bound::*; use std::ops::{Bound, Deref, DerefMut, RangeBounds}; use std::ptr; @@ -827,11 +828,11 @@ impl + Ord + Eq> PartialOrd for FullKey { } } -impl<'a, T> From> for FullKey +impl<'a, T> From> for UserKey where T: AsRef<[u8]> + CopyFromSlice, { - fn from(value: FullKey<&'a [u8]>) -> Self { + fn from(value: UserKey<&'a [u8]>) -> Self { value.copy_into() } } @@ -979,40 +980,73 @@ impl + Ord + Eq> FullKeyTracker { /// - Otherwise: return None pub fn observe(&mut self, key: FullKey) -> Option> where - FullKey: Into>, + UserKey: Into>, F: AsRef<[u8]>, { + self.observe_multi_version(key.user_key, once(key.epoch_with_gap)) + } + + /// `epochs` comes from greater to smaller + pub fn observe_multi_version( + &mut self, + user_key: UserKey, + mut epochs: impl Iterator, + ) -> Option> + where + UserKey: Into>, + F: AsRef<[u8]>, + { + let max_epoch_with_gap = epochs.next().expect("non-empty"); + let min_epoch_with_gap = epochs.fold( + max_epoch_with_gap, + |prev_epoch_with_gap, curr_epoch_with_gap| { + assert!( + prev_epoch_with_gap > curr_epoch_with_gap, + "epoch list not sorted. prev: {:?}, curr: {:?}, user_key: {:?}", + prev_epoch_with_gap, + curr_epoch_with_gap, + user_key + ); + curr_epoch_with_gap + }, + ); match self .latest_full_key .user_key .as_ref() - .cmp(&key.user_key.as_ref()) + .cmp(&user_key.as_ref()) { Ordering::Less => { // Observe a new user key // Reset epochs - self.last_observed_epoch_with_gap = key.epoch_with_gap; + self.last_observed_epoch_with_gap = min_epoch_with_gap; // Take the previous key and set latest key - Some(std::mem::replace(&mut self.latest_full_key, key.into())) + Some(std::mem::replace( + &mut self.latest_full_key, + FullKey { + user_key: user_key.into(), + epoch_with_gap: min_epoch_with_gap, + }, + )) } Ordering::Equal => { - if key.epoch_with_gap >= self.last_observed_epoch_with_gap { + if max_epoch_with_gap >= self.last_observed_epoch_with_gap { // Epoch from the same user key should be monotonically decreasing panic!( "key {:?} epoch {:?} >= prev epoch {:?}", - key.user_key, key.epoch_with_gap, self.last_observed_epoch_with_gap + user_key, max_epoch_with_gap, self.last_observed_epoch_with_gap ); } - self.last_observed_epoch_with_gap = key.epoch_with_gap; + self.last_observed_epoch_with_gap = min_epoch_with_gap; None } Ordering::Greater => { // User key should be monotonically increasing panic!( "key {:?} <= prev key {:?}", - key, + user_key, FullKey { user_key: self.latest_full_key.user_key.as_ref(), epoch_with_gap: self.last_observed_epoch_with_gap diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 2f11db3405619..9c3d30a31f633 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -285,11 +285,12 @@ pub async fn merge_imms_in_memory( instance_id: LocalInstanceId, imms: Vec, memory_tracker: Option, -) -> HummockResult { +) -> ImmutableMemtable { let mut kv_count = 0; let mut epochs = vec![]; let mut merged_size = 0; - let mut merged_imm_ids = Vec::with_capacity(imms.len()); + assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted()); + let max_imm_id = imms[0].batch_id(); let mut imm_iters = Vec::with_capacity(imms.len()); for imm in imms { @@ -300,7 +301,6 @@ pub async fn merge_imms_in_memory( "should only merge data belonging to the same table" ); - merged_imm_ids.push(imm.batch_id()); epochs.push(imm.min_epoch()); kv_count += imm.kv_count(); merged_size += imm.size(); @@ -311,10 +311,10 @@ pub async fn merge_imms_in_memory( // use merge iterator to merge input imms let mut mi = MergeIterator::new(imm_iters); - mi.rewind().await?; + mi.rewind_no_await(); assert!(mi.is_valid()); - let first_item_key = mi.current_item().0.clone(); + let first_item_key = mi.current_key_items().0.clone(); let mut merged_payload: Vec = Vec::new(); @@ -327,9 +327,15 @@ pub async fn merge_imms_in_memory( let mut table_key_versions: Vec<(EpochWithGap, HummockValue)> = Vec::new(); while mi.is_valid() { - let (key, (epoch_with_gap, value)) = mi.current_item(); - let full_key = FullKey::new_with_gap_epoch(table_id, key.clone(), *epoch_with_gap); - if let Some(last_full_key) = full_key_tracker.observe(full_key) { + let (key, values) = mi.current_key_items(); + let user_key = UserKey { + table_id, + table_key: key.clone(), + }; + if let Some(last_full_key) = full_key_tracker.observe_multi_version( + user_key, + values.iter().map(|(epoch_with_gap, _)| *epoch_with_gap), + ) { let last_user_key = last_full_key.user_key; // `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items // and should not be used because we use max epoch to initialize the tracker @@ -341,8 +347,13 @@ pub async fn merge_imms_in_memory( // Reset state before moving onto the new table key table_key_versions = vec![]; } - table_key_versions.push((*epoch_with_gap, value.clone())); - mi.next().await?; + table_key_versions.extend( + values + .iter() + .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), + ); + mi.advance_peek_to_next_key(); + tokio::task::consume_budget().await; } // process the last key @@ -353,18 +364,18 @@ pub async fn merge_imms_in_memory( )); } - Ok(SharedBufferBatch { + SharedBufferBatch { inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches( epochs, merged_payload, kv_count, - merged_imm_ids, merged_size, + max_imm_id, memory_tracker, )), table_id, instance_id, - }) + } } fn generate_splits( diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index cccbb8da242e3..c4effdbf4f459 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -555,7 +555,10 @@ impl HummockEventHandler { }, |read_version| { read_version.write().update(VersionUpdate::Staging( - StagingData::MergedImmMem(merge_output.merged_imm), + StagingData::MergedImmMem( + merge_output.merged_imm, + merge_output.imm_ids, + ), )); }, ) @@ -768,9 +771,7 @@ mod tests { use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; - use crate::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatch, SharedBufferBatchInner, - }; + use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; use crate::hummock::store::version::{StagingData, VersionUpdate}; use crate::hummock::test_utils::default_opts_for_test; use crate::hummock::value::HummockValue; @@ -813,26 +814,14 @@ mod tests { Err(HummockError::other("".to_string())) }) }), - Arc::new(move |table_id, instance_id, imms, _| { + Arc::new(move |_, _, imms, _| { let (tx, rx) = oneshot::channel::<()>(); let (finish_tx, finish_rx) = oneshot::channel::<()>(); spawn_merging_task_tx.send((tx, finish_rx)).unwrap(); spawn(async move { rx.await.unwrap(); finish_tx.send(()).unwrap(); - let first_imm = &imms[0]; - Ok(SharedBufferBatch { - inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches( - first_imm.epochs().clone(), - first_imm.get_payload().iter().cloned().collect_vec(), - 100, - imms.iter().map(|imm| imm.batch_id()).collect_vec(), - 100, - None, - )), - table_id, - instance_id, - }) + imms[0].clone() }) }), ); diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 1aeda293c84be..4c24775c6a57c 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -66,7 +66,7 @@ pub type SpawnMergingTask = Arc< LocalInstanceId, Vec, Option, - ) -> JoinHandle> + ) -> JoinHandle + Send + Sync + 'static, @@ -119,6 +119,8 @@ struct UploadingTask { } pub struct MergeImmTaskOutput { + /// Input imm ids of the merging task. Larger imm ids at the front. + pub imm_ids: Vec, pub table_id: TableId, pub instance_id: LocalInstanceId, pub merged_imm: ImmutableMemtable, @@ -129,7 +131,17 @@ struct MergingImmTask { table_id: TableId, instance_id: LocalInstanceId, input_imms: Vec, - join_handle: JoinHandle>, + join_handle: JoinHandle, +} + +impl Debug for MergingImmTask { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MergingImmTask") + .field("table_id", &self.table_id) + .field("instance_id", &self.instance_id) + .field("input_imms", &self.input_imms) + .finish() + } } impl MergingImmTask { @@ -140,6 +152,7 @@ impl MergingImmTask { memory_tracker: Option, context: &UploaderContext, ) -> Self { + assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted()); let input_imms = imms.clone(); let join_handle = (context.spawn_merging_task)(table_id, instance_id, imms, memory_tracker); @@ -152,19 +165,18 @@ impl MergingImmTask { } /// Poll the result of the merge task - fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll { Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { Ok(task_result) => task_result, - Err(err) => Err(HummockError::other(format!( - "fail to join imm merge join handle: {}", - err.as_report() - ))), + Err(err) => { + panic!("failed to join merging task: {:?} {:?}", err, self); + } }) } } impl Future for MergingImmTask { - type Output = HummockResult; + type Output = ImmutableMemtable; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_result(cx) @@ -510,7 +522,6 @@ impl SealedData { } fn add_merged_imm(&mut self, merged_imm: &ImmutableMemtable) { - debug_assert!(merged_imm.is_merged_imm()); // add merged_imm to merged_imms self.merged_imms.push_front(merged_imm.clone()); } @@ -573,15 +584,16 @@ impl SealedData { fn poll_success_merge_imm(&mut self, cx: &mut Context<'_>) -> Poll> { // only poll the oldest merge task if there is any if let Some(task) = self.merging_tasks.back_mut() { - let merge_result = ready!(task.poll_unpin(cx)); + let merged_imm = ready!(task.poll_unpin(cx)); // pop the finished task let task = self.merging_tasks.pop_back().expect("must exist"); Poll::Ready(Some(MergeImmTaskOutput { + imm_ids: task.input_imms.iter().map(|imm| imm.batch_id()).collect(), table_id: task.table_id, instance_id: task.instance_id, - merged_imm: merge_result.unwrap(), + merged_imm, })) } else { Poll::Ready(None) @@ -1655,17 +1667,10 @@ mod tests { _ = sleep => { println!("sleep timeout") } - res = &mut task => { - match res { - Ok(imm) => { - println!("merging task success"); - assert_eq!(table_id, imm.table_id); - assert_eq!(9, imm.kv_count()); - } - Err(err) => { - println!("merging task failed: {:?}", err); - } - } + imm = &mut task => { + println!("merging task success"); + assert_eq!(table_id, imm.table_id); + assert_eq!(9, imm.kv_count()); } } task.join_handle.abort(); diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index eaaac328ce8fa..1427005a3b7e4 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -17,6 +17,7 @@ use std::collections::{BinaryHeap, LinkedList}; use std::ops::{Deref, DerefMut}; use bytes::Bytes; +use futures::FutureExt; use risingwave_hummock_sdk::key::{FullKey, TableKey}; use risingwave_hummock_sdk::EpochWithGap; @@ -99,12 +100,12 @@ impl MergeIterator { impl MergeIterator> { /// Used in `merge_imms_in_memory` to merge immutable memtables. - pub fn current_item(&self) -> (&TableKey, &(EpochWithGap, HummockValue)) { + pub fn current_key_items(&self) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { self.heap .peek() .expect("no inner iter for imm merge") .iter - .current_item() + .current_key_items() } } @@ -197,6 +198,23 @@ impl<'a, T: Ord> Drop for PeekMutGuard<'a, T> { } } +impl MergeIterator> { + pub(crate) fn advance_peek_to_next_key(&mut self) { + self.heap + .peek_mut() + .expect("should exist") + .iter + .advance_to_next_key(); + } + + pub(crate) fn rewind_no_await(&mut self) { + self.rewind() + .now_or_never() + .expect("should not pending") + .expect("should not err") + } +} + impl HummockIterator for MergeIterator where Node: Ord, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 1fe231b99de21..c1c627e8bcf12 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -53,11 +53,6 @@ pub type SharedBufferVersionedEntry = (TableKey, Vec<(EpochWithGap, Hummo #[derive(Debug)] pub(crate) struct SharedBufferBatchInner { payload: Vec, - /// The list of imm ids that are merged into this batch - /// This field is immutable. - /// - /// Larger imm id at the front. - imm_ids: Vec, /// The epochs of the data in batch, sorted in ascending order (old to new) epochs: Vec, kv_count: usize, @@ -90,7 +85,6 @@ impl SharedBufferBatchInner { let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed); SharedBufferBatchInner { payload: items, - imm_ids: vec![batch_id], epochs: vec![epoch], kv_count, size, @@ -104,8 +98,8 @@ impl SharedBufferBatchInner { epochs: Vec, payload: Vec, num_items: usize, - imm_ids: Vec, size: usize, + imm_id: ImmId, tracker: Option, ) -> Self { assert!(!payload.is_empty()); @@ -114,21 +108,16 @@ impl SharedBufferBatchInner { .iter() .rev() .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap))); - debug_assert!(!imm_ids.is_empty()); - debug_assert!(imm_ids.iter().rev().is_sorted()); debug_assert!(!epochs.is_empty()); debug_assert!(epochs.is_sorted()); - let max_imm_id = *imm_ids.iter().max().unwrap(); - Self { payload, epochs, - imm_ids, kv_count: num_items, size, _tracker: tracker, - batch_id: max_imm_id, + batch_id: imm_id, } } @@ -262,10 +251,6 @@ impl SharedBufferBatch { self.table_id } - pub fn is_merged_imm(&self) -> bool { - !self.inner.imm_ids.is_empty() - } - pub fn min_epoch(&self) -> HummockEpoch { *self.inner.epochs.first().unwrap() } @@ -274,11 +259,6 @@ impl SharedBufferBatch { *self.inner.epochs.last().unwrap() } - pub fn get_imm_ids(&self) -> &Vec { - debug_assert!(!self.inner.imm_ids.is_empty()); - &self.inner.imm_ids - } - pub fn kv_count(&self) -> usize { self.inner.kv_count } @@ -502,6 +482,22 @@ impl SharedBufferBatchIterator { } } +impl SharedBufferBatchIterator { + pub(crate) fn advance_to_next_key(&mut self) { + assert_eq!(self.current_version_idx, 0); + self.current_idx += 1; + } + + pub(crate) fn current_key_items( + &self, + ) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { + assert!(self.is_valid(), "iterator is not valid"); + assert_eq!(self.current_version_idx, 0); + let (key, values) = &self.inner.payload[self.current_idx]; + (key, &values) + } +} + impl HummockIterator for SharedBufferBatchIterator { type Direction = D; @@ -1116,9 +1112,7 @@ mod tests { ]; // newer data comes first let imms = vec![imm3, imm2, imm1]; - let merged_imm = merge_imms_in_memory(table_id, 0, imms.clone(), None) - .await - .unwrap(); + let merged_imm = merge_imms_in_memory(table_id, 0, imms.clone(), None).await; // Point lookup for (i, items) in batch_items.iter().enumerate() { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 3f95e8fe0e248..a9abac35ede13 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -117,7 +117,7 @@ impl StagingSstableInfo { #[derive(Clone)] pub enum StagingData { ImmMem(ImmutableMemtable), - MergedImmMem(ImmutableMemtable), + MergedImmMem(ImmutableMemtable, Vec), Sst(StagingSstableInfo), } @@ -266,8 +266,8 @@ impl HummockReadVersion { self.staging.imm.push_front(imm) } - StagingData::MergedImmMem(merged_imm) => { - self.add_merged_imm(merged_imm); + StagingData::MergedImmMem(merged_imm, imm_ids) => { + self.add_merged_imm(merged_imm, imm_ids); } StagingData::Sst(staging_sst) => { // The following properties must be ensured: @@ -415,9 +415,11 @@ impl HummockReadVersion { .cloned() } - pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable) { - assert!(merged_imm.get_imm_ids().iter().rev().is_sorted()); - let min_imm_id = *merged_imm.get_imm_ids().last().expect("non-empty"); + /// `imm_ids` is the list of imm ids that are merged into this batch + /// This field is immutable. Larger imm id at the front. + pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable, imm_ids: Vec) { + assert!(imm_ids.iter().rev().is_sorted()); + let min_imm_id = *imm_ids.last().expect("non-empty"); let back = self.staging.imm.back().expect("should not be empty"); @@ -451,9 +453,7 @@ impl HummockReadVersion { unreachable!( "must have break in equal: {:?} {:?} {:?}", - remaining_staging_imm_ids, - earlier_imm_ids, - merged_imm.get_imm_ids() + remaining_staging_imm_ids, earlier_imm_ids, imm_ids ) } } @@ -471,13 +471,13 @@ impl HummockReadVersion { .map(|imm| imm.batch_id()) .collect_vec() }, - merged_imm.get_imm_ids() + imm_ids ); None }; // iter from smaller imm and take the older imm at the back. - for imm_id in merged_imm.get_imm_ids().iter().rev() { + for imm_id in imm_ids.iter().rev() { let imm = self.staging.imm.pop_back().expect("should exist"); assert_eq!( imm.batch_id(), @@ -490,7 +490,7 @@ impl HummockReadVersion { .map(|imm| imm.batch_id()) .collect_vec() }, - merged_imm.get_imm_ids(), + imm_ids, imm_id, ); } From e8459b474ab061d05fe9d79361fedf79cedbe767 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 29 Jan 2024 00:57:15 +0800 Subject: [PATCH 2/6] replace tuple with struct --- .../compactor/shared_buffer_compact.rs | 21 ++++--- .../src/hummock/iterator/merge_inner.rs | 12 ++-- .../shared_buffer/shared_buffer_batch.rs | 56 +++++++++++-------- 3 files changed, 53 insertions(+), 36 deletions(-) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 9c3d30a31f633..2c72a64e0f8e4 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -314,7 +314,7 @@ pub async fn merge_imms_in_memory( mi.rewind_no_await(); assert!(mi.is_valid()); - let first_item_key = mi.current_key_items().0.clone(); + let first_item_key = mi.current_key_entry().key.clone(); let mut merged_payload: Vec = Vec::new(); @@ -327,14 +327,17 @@ pub async fn merge_imms_in_memory( let mut table_key_versions: Vec<(EpochWithGap, HummockValue)> = Vec::new(); while mi.is_valid() { - let (key, values) = mi.current_key_items(); + let key_entry = mi.current_key_entry(); let user_key = UserKey { table_id, - table_key: key.clone(), + table_key: key_entry.key.clone(), }; if let Some(last_full_key) = full_key_tracker.observe_multi_version( user_key, - values.iter().map(|(epoch_with_gap, _)| *epoch_with_gap), + key_entry + .new_values + .iter() + .map(|(epoch_with_gap, _)| *epoch_with_gap), ) { let last_user_key = last_full_key.user_key; // `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items @@ -342,13 +345,17 @@ pub async fn merge_imms_in_memory( let _epoch_with_gap = last_full_key.epoch_with_gap; // Record kv entries - merged_payload.push((last_user_key.table_key, table_key_versions)); + merged_payload.push(SharedBufferVersionedEntry::new( + last_user_key.table_key, + table_key_versions, + )); // Reset state before moving onto the new table key table_key_versions = vec![]; } table_key_versions.extend( - values + key_entry + .new_values .iter() .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), ); @@ -358,7 +365,7 @@ pub async fn merge_imms_in_memory( // process the last key if !table_key_versions.is_empty() { - merged_payload.push(( + merged_payload.push(SharedBufferVersionedEntry::new( full_key_tracker.latest_full_key.user_key.table_key, table_key_versions, )); diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index 1427005a3b7e4..471a1038b7496 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -16,14 +16,14 @@ use std::collections::binary_heap::PeekMut; use std::collections::{BinaryHeap, LinkedList}; use std::ops::{Deref, DerefMut}; -use bytes::Bytes; use futures::FutureExt; -use risingwave_hummock_sdk::key::{FullKey, TableKey}; -use risingwave_hummock_sdk::EpochWithGap; +use risingwave_hummock_sdk::key::FullKey; use super::Forward; use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; -use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator; +use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatchIterator, SharedBufferVersionedEntry, +}; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; @@ -100,12 +100,12 @@ impl MergeIterator { impl MergeIterator> { /// Used in `merge_imms_in_memory` to merge immutable memtables. - pub fn current_key_items(&self) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { + pub fn current_key_entry(&self) -> &SharedBufferVersionedEntry { self.heap .peek() .expect("no inner iter for imm merge") .iter - .current_key_items() + .current_key_entry() } } diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index c1c627e8bcf12..5f0c150b208fa 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -48,7 +48,17 @@ pub type SharedBufferBatchId = u64; /// A shared buffer may contain data from multiple epochs, /// there are multiple versions for a given key (`table_key`), we put those versions into a vector /// and sort them in descending order, aka newest to oldest. -pub type SharedBufferVersionedEntry = (TableKey, Vec<(EpochWithGap, HummockValue)>); +#[derive(PartialEq, Debug)] +pub struct SharedBufferVersionedEntry { + pub(crate) key: TableKey, + pub(crate) new_values: Vec<(EpochWithGap, HummockValue)>, +} + +impl SharedBufferVersionedEntry { + pub fn new(key: TableKey, new_values: Vec<(EpochWithGap, HummockValue)>) -> Self { + Self { key, new_values } + } +} #[derive(Debug)] pub(crate) struct SharedBufferBatchInner { @@ -79,7 +89,7 @@ impl SharedBufferBatchInner { let epoch_with_gap = EpochWithGap::new(epoch, spill_offset); let items = payload .into_iter() - .map(|(k, v)| (k, vec![(epoch_with_gap, v)])) + .map(|(k, v)| SharedBufferVersionedEntry::new(k, vec![(epoch_with_gap, v)])) .collect_vec(); let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed); @@ -103,8 +113,9 @@ impl SharedBufferBatchInner { tracker: Option, ) -> Self { assert!(!payload.is_empty()); - debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key)); - debug_assert!(payload.iter().all(|(_, values)| values + debug_assert!(payload.iter().is_sorted_by_key(|entry| &entry.key)); + debug_assert!(payload.iter().all(|entry| entry + .new_values .iter() .rev() .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap))); @@ -129,11 +140,14 @@ impl SharedBufferBatchInner { read_epoch: HummockEpoch, ) -> Option<(HummockValue, EpochWithGap)> { // Perform binary search on table key to find the corresponding entry - if let Ok(i) = self.payload.binary_search_by(|m| (m.0[..]).cmp(*table_key)) { + if let Ok(i) = self + .payload + .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key)) + { let item = &self.payload[i]; - assert_eq!(item.0.as_ref(), *table_key); + assert_eq!(item.key.as_ref(), *table_key); // Scan to find the first version <= epoch - for (e, v) in &item.1 { + for (e, v) in &item.new_values { // skip invisible versions if read_epoch < e.pure_epoch() { continue; @@ -275,7 +289,7 @@ impl SharedBufferBatch { pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool { self.inner .binary_search_by(|m| { - let key = &m.0; + let key = &m.key; let too_left = match &table_key_range.0 { std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(), std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(), @@ -317,17 +331,17 @@ impl SharedBufferBatch { #[inline(always)] pub fn start_table_key(&self) -> TableKey<&[u8]> { - TableKey(self.inner.payload.first().expect("non-empty").0.as_ref()) + TableKey(self.inner.payload.first().expect("non-empty").key.as_ref()) } #[inline(always)] pub fn end_table_key(&self) -> TableKey<&[u8]> { - TableKey(self.inner.payload.last().expect("non-empty").0.as_ref()) + TableKey(self.inner.payload.last().expect("non-empty").key.as_ref()) } #[inline(always)] pub fn raw_largest_key(&self) -> &TableKey { - &self.inner.payload.last().expect("non-empty").0 + &self.inner.payload.last().expect("non-empty").key } /// return inclusive left endpoint, which means that all data in this batch should be larger or @@ -386,7 +400,7 @@ impl SharedBufferBatch { let idx = match self .inner .payload - .binary_search_by(|m| (m.0[..]).cmp(seek_key.as_slice())) + .binary_search_by(|m| (m.key.as_ref()).cmp(seek_key.as_slice())) { Ok(idx) => idx, Err(idx) => idx, @@ -395,11 +409,11 @@ impl SharedBufferBatch { break; } let item = &self.inner.payload[idx]; - if item.0.len() <= VirtualNode::SIZE { + if item.key.len() <= VirtualNode::SIZE { break; } let current_vnode_id = VirtualNode::from_be_bytes( - item.0.as_ref()[..VirtualNode::SIZE] + item.key.as_ref()[..VirtualNode::SIZE] .try_into() .expect("slice with incorrect length"), ) @@ -456,7 +470,7 @@ impl SharedBufferBatchIterator { DirectionEnum::Forward => self.current_idx, DirectionEnum::Backward => self.inner.len() - self.current_idx - 1, }; - &self.inner.get(idx).unwrap().1 + &self.inner.get(idx).unwrap().new_values } fn current_versions_len(&self) -> i32 { @@ -477,8 +491,7 @@ impl SharedBufferBatchIterator { ), }; let cur_entry = self.inner.get(idx).unwrap(); - let value = &cur_entry.1[version_idx as usize]; - (&cur_entry.0, value) + (&cur_entry.key, &cur_entry.new_values[version_idx as usize]) } } @@ -488,13 +501,10 @@ impl SharedBufferBatchIterator { self.current_idx += 1; } - pub(crate) fn current_key_items( - &self, - ) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { + pub(crate) fn current_key_entry(&self) -> &SharedBufferVersionedEntry { assert!(self.is_valid(), "iterator is not valid"); assert_eq!(self.current_version_idx, 0); - let (key, values) = &self.inner.payload[self.current_idx]; - (key, &values) + &self.inner.payload[self.current_idx] } } @@ -563,7 +573,7 @@ impl HummockIterator for SharedBufferBatchIterator< // by table key. let partition_point = self .inner - .binary_search_by(|probe| probe.0[..].cmp(*key.user_key.table_key)); + .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key)); let seek_key_epoch = key.epoch_with_gap; match D::direction() { DirectionEnum::Forward => match partition_point { From 314da859a550d868a4ab2f514d26f2532e1f490e Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 30 Jan 2024 23:30:54 +0800 Subject: [PATCH 3/6] fix ut --- .../src/hummock/iterator/merge_inner.rs | 18 +++++++++++++----- .../shared_buffer/shared_buffer_batch.rs | 3 +-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index 471a1038b7496..669db4cd3aab2 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -200,11 +200,19 @@ impl<'a, T: Ord> Drop for PeekMutGuard<'a, T> { impl MergeIterator> { pub(crate) fn advance_peek_to_next_key(&mut self) { - self.heap - .peek_mut() - .expect("should exist") - .iter - .advance_to_next_key(); + let mut node = + PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter"); + + node.iter.advance_to_next_key(); + + if !node.iter.is_valid() { + // Put back to `unused_iters` + let node = node.pop(); + self.unused_iters.push_back(node); + } else { + // This will update the heap top. + node.used(); + } } pub(crate) fn rewind_no_await(&mut self) { diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index c2727d8d68cd4..e53faf565cc73 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -477,7 +477,6 @@ impl SharedBufferBatchIterator { } pub(crate) fn current_item(&self) -> (&TableKey, &(EpochWithGap, HummockValue)) { - assert!(self.is_valid(), "iterator is not valid"); let (idx, version_idx) = match D::direction() { DirectionEnum::Forward => (self.current_idx, self.current_version_idx), DirectionEnum::Backward => ( @@ -485,7 +484,7 @@ impl SharedBufferBatchIterator { self.current_version_idx, ), }; - let cur_entry = self.inner.get(idx).unwrap(); + let cur_entry = &self.inner[idx]; (&cur_entry.key, &cur_entry.new_values[version_idx as usize]) } } From e33d5f739f071d9499b32e1a10a5e2837289e573 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 30 Jan 2024 23:50:15 +0800 Subject: [PATCH 4/6] make dylint happy --- src/storage/src/hummock/event_handler/uploader.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 4c24775c6a57c..b8ebc858054a1 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -169,7 +169,11 @@ impl MergingImmTask { Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { Ok(task_result) => task_result, Err(err) => { - panic!("failed to join merging task: {:?} {:?}", err, self); + panic!( + "failed to join merging task: {:?} {:?}", + err.as_report(), + self + ); } }) } From 99a1ca18e647cf01306c513726ec3659fd178013 Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 2 Feb 2024 01:22:38 +0800 Subject: [PATCH 5/6] rename version to value --- .../shared_buffer/shared_buffer_batch.rs | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index e53faf565cc73..f4c8fce2dea9f 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -440,7 +440,7 @@ impl SharedBufferBatch { /// If there are multiple versions of a key, the iterator will return all versions pub struct SharedBufferBatchIterator { inner: Arc, - current_version_idx: i32, + current_value_idx: i32, // The index of the current entry in the payload current_idx: usize, table_id: TableId, @@ -452,52 +452,52 @@ impl SharedBufferBatchIterator { Self { inner, current_idx: 0, - current_version_idx: 0, + current_value_idx: 0, table_id, _phantom: Default::default(), } } /// Return all values of the current key - pub(crate) fn current_versions(&self) -> &Vec<(EpochWithGap, HummockValue)> { + pub(crate) fn current_values(&self) -> &Vec<(EpochWithGap, HummockValue)> { debug_assert!(self.current_idx < self.inner.len()); let idx = match D::direction() { DirectionEnum::Forward => self.current_idx, DirectionEnum::Backward => self.inner.len() - self.current_idx - 1, }; - &self.inner.get(idx).unwrap().new_values + &self.inner[idx].new_values } - fn current_versions_len(&self) -> i32 { + fn current_values_len(&self) -> i32 { if self.current_idx < self.inner.len() { - self.current_versions().len() as i32 + self.current_values().len() as i32 } else { 0 } } pub(crate) fn current_item(&self) -> (&TableKey, &(EpochWithGap, HummockValue)) { - let (idx, version_idx) = match D::direction() { - DirectionEnum::Forward => (self.current_idx, self.current_version_idx), + let (idx, value_idx) = match D::direction() { + DirectionEnum::Forward => (self.current_idx, self.current_value_idx), DirectionEnum::Backward => ( self.inner.len() - self.current_idx - 1, - self.current_version_idx, + self.current_value_idx, ), }; let cur_entry = &self.inner[idx]; - (&cur_entry.key, &cur_entry.new_values[version_idx as usize]) + (&cur_entry.key, &cur_entry.new_values[value_idx as usize]) } } impl SharedBufferBatchIterator { pub(crate) fn advance_to_next_key(&mut self) { - assert_eq!(self.current_version_idx, 0); + assert_eq!(self.current_value_idx, 0); self.current_idx += 1; } pub(crate) fn current_key_entry(&self) -> &SharedBufferVersionedEntry { assert!(self.is_valid(), "iterator is not valid"); - assert_eq!(self.current_version_idx, 0); + assert_eq!(self.current_value_idx, 0); &self.inner.payload[self.current_idx] } } @@ -510,19 +510,19 @@ impl HummockIterator for SharedBufferBatchIterator< match D::direction() { DirectionEnum::Forward => { // If the current key has more versions, we need to advance the value index - if self.current_version_idx + 1 < self.current_versions_len() { - self.current_version_idx += 1; + if self.current_value_idx + 1 < self.current_values_len() { + self.current_value_idx += 1; } else { self.current_idx += 1; - self.current_version_idx = 0; + self.current_value_idx = 0; } } DirectionEnum::Backward => { - if self.current_version_idx > 0 { - self.current_version_idx -= 1; + if self.current_value_idx > 0 { + self.current_value_idx -= 1; } else { self.current_idx += 1; - self.current_version_idx = self.current_versions_len() - 1; + self.current_value_idx = self.current_values_len() - 1; } } } @@ -543,8 +543,7 @@ impl HummockIterator for SharedBufferBatchIterator< if self.current_idx >= self.inner.len() { return false; } - self.current_version_idx >= 0 - && self.current_version_idx < self.current_versions().len() as i32 + self.current_value_idx >= 0 && self.current_value_idx < self.current_values().len() as i32 } async fn rewind(&mut self) -> HummockResult<()> { @@ -552,10 +551,10 @@ impl HummockIterator for SharedBufferBatchIterator< match D::direction() { DirectionEnum::Forward => { - self.current_version_idx = 0; + self.current_value_idx = 0; } DirectionEnum::Backward => { - self.current_version_idx = self.current_versions_len() - 1; + self.current_value_idx = self.current_values_len() - 1; } } Ok(()) @@ -575,7 +574,7 @@ impl HummockIterator for SharedBufferBatchIterator< self.current_idx = i; // seek to the first version that is <= the seek key epoch let mut idx: i32 = 0; - for (epoch_with_gap, _) in self.current_versions() { + for (epoch_with_gap, _) in self.current_values() { if epoch_with_gap <= &seek_key_epoch { break; } @@ -584,16 +583,16 @@ impl HummockIterator for SharedBufferBatchIterator< // Move onto the next key for forward iteration if seek key epoch is smaller // than all versions - if idx >= self.current_versions().len() as i32 { + if idx >= self.current_values().len() as i32 { self.current_idx += 1; - self.current_version_idx = 0; + self.current_value_idx = 0; } else { - self.current_version_idx = idx; + self.current_value_idx = idx; } } Err(i) => { self.current_idx = i; - self.current_version_idx = 0; + self.current_value_idx = 0; } }, DirectionEnum::Backward => { @@ -601,7 +600,7 @@ impl HummockIterator for SharedBufferBatchIterator< Ok(i) => { self.current_idx = self.inner.len() - i - 1; // seek from back to the first version that is >= seek_key_epoch - let values = self.current_versions(); + let values = self.current_values(); let mut idx: i32 = (values.len() - 1) as i32; for (epoch_with_gap, _) in values.iter().rev() { if epoch_with_gap >= &seek_key_epoch { @@ -612,9 +611,9 @@ impl HummockIterator for SharedBufferBatchIterator< if idx < 0 { self.current_idx += 1; - self.current_version_idx = self.current_versions_len() - 1; + self.current_value_idx = self.current_values_len() - 1; } else { - self.current_version_idx = idx; + self.current_value_idx = idx; } } // Seek to one item before the seek partition_point: @@ -622,7 +621,7 @@ impl HummockIterator for SharedBufferBatchIterator< // self.inner.len(). Err(i) => { self.current_idx = self.inner.len() - i; - self.current_version_idx = self.current_versions_len() - 1; + self.current_value_idx = self.current_values_len() - 1; } } } From c644d9427b6342cd6a88ca1600d2718c385867be Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 2 Feb 2024 13:02:34 +0800 Subject: [PATCH 6/6] add comment --- src/storage/src/hummock/compactor/shared_buffer_compact.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index ef1e61b44a15f..a06a1eaca6761 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -360,6 +360,8 @@ pub async fn merge_imms_in_memory( .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), ); mi.advance_peek_to_next_key(); + // Since there is no blocking point in this method, but it is cpu intensive, we call this method + // to do cooperative scheduling tokio::task::consume_budget().await; }