From 2abc32f2a3aa4bff0e53ed64b6d45c664caea081 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 29 Jan 2024 00:37:08 +0800 Subject: [PATCH] refactor(storage): do not track imm ids in imm --- src/storage/hummock_sdk/src/key.rs | 54 +++++++++++++++---- .../compactor/shared_buffer_compact.rs | 37 ++++++++----- .../event_handler/hummock_event_handler.rs | 25 +++------ .../src/hummock/event_handler/uploader.rs | 49 +++++++++-------- .../src/hummock/iterator/merge_inner.rs | 22 +++++++- .../shared_buffer/shared_buffer_batch.rs | 44 +++++++-------- src/storage/src/hummock/store/version.rs | 24 ++++----- 7 files changed, 153 insertions(+), 102 deletions(-) diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index b65bb2a05ddd5..e46cc8b65cfc9 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -15,6 +15,7 @@ use std::borrow::Borrow; use std::cmp::Ordering; use std::fmt::Debug; +use std::iter::once; use std::ops::Bound::*; use std::ops::{Bound, Deref, DerefMut, RangeBounds}; use std::ptr; @@ -827,11 +828,11 @@ impl + Ord + Eq> PartialOrd for FullKey { } } -impl<'a, T> From> for FullKey +impl<'a, T> From> for UserKey where T: AsRef<[u8]> + CopyFromSlice, { - fn from(value: FullKey<&'a [u8]>) -> Self { + fn from(value: UserKey<&'a [u8]>) -> Self { value.copy_into() } } @@ -979,40 +980,73 @@ impl + Ord + Eq> FullKeyTracker { /// - Otherwise: return None pub fn observe(&mut self, key: FullKey) -> Option> where - FullKey: Into>, + UserKey: Into>, F: AsRef<[u8]>, { + self.observe_multi_version(key.user_key, once(key.epoch_with_gap)) + } + + /// `epochs` comes from greater to smaller + pub fn observe_multi_version( + &mut self, + user_key: UserKey, + mut epochs: impl Iterator, + ) -> Option> + where + UserKey: Into>, + F: AsRef<[u8]>, + { + let max_epoch_with_gap = epochs.next().expect("non-empty"); + let min_epoch_with_gap = epochs.fold( + max_epoch_with_gap, + |prev_epoch_with_gap, curr_epoch_with_gap| { + assert!( + prev_epoch_with_gap > curr_epoch_with_gap, + "epoch list not sorted. prev: {:?}, curr: {:?}, user_key: {:?}", + prev_epoch_with_gap, + curr_epoch_with_gap, + user_key + ); + curr_epoch_with_gap + }, + ); match self .latest_full_key .user_key .as_ref() - .cmp(&key.user_key.as_ref()) + .cmp(&user_key.as_ref()) { Ordering::Less => { // Observe a new user key // Reset epochs - self.last_observed_epoch_with_gap = key.epoch_with_gap; + self.last_observed_epoch_with_gap = min_epoch_with_gap; // Take the previous key and set latest key - Some(std::mem::replace(&mut self.latest_full_key, key.into())) + Some(std::mem::replace( + &mut self.latest_full_key, + FullKey { + user_key: user_key.into(), + epoch_with_gap: min_epoch_with_gap, + }, + )) } Ordering::Equal => { - if key.epoch_with_gap >= self.last_observed_epoch_with_gap { + if max_epoch_with_gap >= self.last_observed_epoch_with_gap { // Epoch from the same user key should be monotonically decreasing panic!( "key {:?} epoch {:?} >= prev epoch {:?}", - key.user_key, key.epoch_with_gap, self.last_observed_epoch_with_gap + user_key, max_epoch_with_gap, self.last_observed_epoch_with_gap ); } - self.last_observed_epoch_with_gap = key.epoch_with_gap; + self.last_observed_epoch_with_gap = min_epoch_with_gap; None } Ordering::Greater => { // User key should be monotonically increasing panic!( "key {:?} <= prev key {:?}", - key, + user_key, FullKey { user_key: self.latest_full_key.user_key.as_ref(), epoch_with_gap: self.last_observed_epoch_with_gap diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 2f11db3405619..9c3d30a31f633 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -285,11 +285,12 @@ pub async fn merge_imms_in_memory( instance_id: LocalInstanceId, imms: Vec, memory_tracker: Option, -) -> HummockResult { +) -> ImmutableMemtable { let mut kv_count = 0; let mut epochs = vec![]; let mut merged_size = 0; - let mut merged_imm_ids = Vec::with_capacity(imms.len()); + assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted()); + let max_imm_id = imms[0].batch_id(); let mut imm_iters = Vec::with_capacity(imms.len()); for imm in imms { @@ -300,7 +301,6 @@ pub async fn merge_imms_in_memory( "should only merge data belonging to the same table" ); - merged_imm_ids.push(imm.batch_id()); epochs.push(imm.min_epoch()); kv_count += imm.kv_count(); merged_size += imm.size(); @@ -311,10 +311,10 @@ pub async fn merge_imms_in_memory( // use merge iterator to merge input imms let mut mi = MergeIterator::new(imm_iters); - mi.rewind().await?; + mi.rewind_no_await(); assert!(mi.is_valid()); - let first_item_key = mi.current_item().0.clone(); + let first_item_key = mi.current_key_items().0.clone(); let mut merged_payload: Vec = Vec::new(); @@ -327,9 +327,15 @@ pub async fn merge_imms_in_memory( let mut table_key_versions: Vec<(EpochWithGap, HummockValue)> = Vec::new(); while mi.is_valid() { - let (key, (epoch_with_gap, value)) = mi.current_item(); - let full_key = FullKey::new_with_gap_epoch(table_id, key.clone(), *epoch_with_gap); - if let Some(last_full_key) = full_key_tracker.observe(full_key) { + let (key, values) = mi.current_key_items(); + let user_key = UserKey { + table_id, + table_key: key.clone(), + }; + if let Some(last_full_key) = full_key_tracker.observe_multi_version( + user_key, + values.iter().map(|(epoch_with_gap, _)| *epoch_with_gap), + ) { let last_user_key = last_full_key.user_key; // `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items // and should not be used because we use max epoch to initialize the tracker @@ -341,8 +347,13 @@ pub async fn merge_imms_in_memory( // Reset state before moving onto the new table key table_key_versions = vec![]; } - table_key_versions.push((*epoch_with_gap, value.clone())); - mi.next().await?; + table_key_versions.extend( + values + .iter() + .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), + ); + mi.advance_peek_to_next_key(); + tokio::task::consume_budget().await; } // process the last key @@ -353,18 +364,18 @@ pub async fn merge_imms_in_memory( )); } - Ok(SharedBufferBatch { + SharedBufferBatch { inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches( epochs, merged_payload, kv_count, - merged_imm_ids, merged_size, + max_imm_id, memory_tracker, )), table_id, instance_id, - }) + } } fn generate_splits( diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index cccbb8da242e3..c4effdbf4f459 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -555,7 +555,10 @@ impl HummockEventHandler { }, |read_version| { read_version.write().update(VersionUpdate::Staging( - StagingData::MergedImmMem(merge_output.merged_imm), + StagingData::MergedImmMem( + merge_output.merged_imm, + merge_output.imm_ids, + ), )); }, ) @@ -768,9 +771,7 @@ mod tests { use crate::hummock::event_handler::{HummockEvent, HummockEventHandler}; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::local_version::pinned_version::PinnedVersion; - use crate::hummock::shared_buffer::shared_buffer_batch::{ - SharedBufferBatch, SharedBufferBatchInner, - }; + use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; use crate::hummock::store::version::{StagingData, VersionUpdate}; use crate::hummock::test_utils::default_opts_for_test; use crate::hummock::value::HummockValue; @@ -813,26 +814,14 @@ mod tests { Err(HummockError::other("".to_string())) }) }), - Arc::new(move |table_id, instance_id, imms, _| { + Arc::new(move |_, _, imms, _| { let (tx, rx) = oneshot::channel::<()>(); let (finish_tx, finish_rx) = oneshot::channel::<()>(); spawn_merging_task_tx.send((tx, finish_rx)).unwrap(); spawn(async move { rx.await.unwrap(); finish_tx.send(()).unwrap(); - let first_imm = &imms[0]; - Ok(SharedBufferBatch { - inner: Arc::new(SharedBufferBatchInner::new_with_multi_epoch_batches( - first_imm.epochs().clone(), - first_imm.get_payload().iter().cloned().collect_vec(), - 100, - imms.iter().map(|imm| imm.batch_id()).collect_vec(), - 100, - None, - )), - table_id, - instance_id, - }) + imms[0].clone() }) }), ); diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index 1aeda293c84be..4c24775c6a57c 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -66,7 +66,7 @@ pub type SpawnMergingTask = Arc< LocalInstanceId, Vec, Option, - ) -> JoinHandle> + ) -> JoinHandle + Send + Sync + 'static, @@ -119,6 +119,8 @@ struct UploadingTask { } pub struct MergeImmTaskOutput { + /// Input imm ids of the merging task. Larger imm ids at the front. + pub imm_ids: Vec, pub table_id: TableId, pub instance_id: LocalInstanceId, pub merged_imm: ImmutableMemtable, @@ -129,7 +131,17 @@ struct MergingImmTask { table_id: TableId, instance_id: LocalInstanceId, input_imms: Vec, - join_handle: JoinHandle>, + join_handle: JoinHandle, +} + +impl Debug for MergingImmTask { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MergingImmTask") + .field("table_id", &self.table_id) + .field("instance_id", &self.instance_id) + .field("input_imms", &self.input_imms) + .finish() + } } impl MergingImmTask { @@ -140,6 +152,7 @@ impl MergingImmTask { memory_tracker: Option, context: &UploaderContext, ) -> Self { + assert!(imms.iter().rev().map(|imm| imm.batch_id()).is_sorted()); let input_imms = imms.clone(); let join_handle = (context.spawn_merging_task)(table_id, instance_id, imms, memory_tracker); @@ -152,19 +165,18 @@ impl MergingImmTask { } /// Poll the result of the merge task - fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll { Poll::Ready(match ready!(self.join_handle.poll_unpin(cx)) { Ok(task_result) => task_result, - Err(err) => Err(HummockError::other(format!( - "fail to join imm merge join handle: {}", - err.as_report() - ))), + Err(err) => { + panic!("failed to join merging task: {:?} {:?}", err, self); + } }) } } impl Future for MergingImmTask { - type Output = HummockResult; + type Output = ImmutableMemtable; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.poll_result(cx) @@ -510,7 +522,6 @@ impl SealedData { } fn add_merged_imm(&mut self, merged_imm: &ImmutableMemtable) { - debug_assert!(merged_imm.is_merged_imm()); // add merged_imm to merged_imms self.merged_imms.push_front(merged_imm.clone()); } @@ -573,15 +584,16 @@ impl SealedData { fn poll_success_merge_imm(&mut self, cx: &mut Context<'_>) -> Poll> { // only poll the oldest merge task if there is any if let Some(task) = self.merging_tasks.back_mut() { - let merge_result = ready!(task.poll_unpin(cx)); + let merged_imm = ready!(task.poll_unpin(cx)); // pop the finished task let task = self.merging_tasks.pop_back().expect("must exist"); Poll::Ready(Some(MergeImmTaskOutput { + imm_ids: task.input_imms.iter().map(|imm| imm.batch_id()).collect(), table_id: task.table_id, instance_id: task.instance_id, - merged_imm: merge_result.unwrap(), + merged_imm, })) } else { Poll::Ready(None) @@ -1655,17 +1667,10 @@ mod tests { _ = sleep => { println!("sleep timeout") } - res = &mut task => { - match res { - Ok(imm) => { - println!("merging task success"); - assert_eq!(table_id, imm.table_id); - assert_eq!(9, imm.kv_count()); - } - Err(err) => { - println!("merging task failed: {:?}", err); - } - } + imm = &mut task => { + println!("merging task success"); + assert_eq!(table_id, imm.table_id); + assert_eq!(9, imm.kv_count()); } } task.join_handle.abort(); diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index eaaac328ce8fa..1427005a3b7e4 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -17,6 +17,7 @@ use std::collections::{BinaryHeap, LinkedList}; use std::ops::{Deref, DerefMut}; use bytes::Bytes; +use futures::FutureExt; use risingwave_hummock_sdk::key::{FullKey, TableKey}; use risingwave_hummock_sdk::EpochWithGap; @@ -99,12 +100,12 @@ impl MergeIterator { impl MergeIterator> { /// Used in `merge_imms_in_memory` to merge immutable memtables. - pub fn current_item(&self) -> (&TableKey, &(EpochWithGap, HummockValue)) { + pub fn current_key_items(&self) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { self.heap .peek() .expect("no inner iter for imm merge") .iter - .current_item() + .current_key_items() } } @@ -197,6 +198,23 @@ impl<'a, T: Ord> Drop for PeekMutGuard<'a, T> { } } +impl MergeIterator> { + pub(crate) fn advance_peek_to_next_key(&mut self) { + self.heap + .peek_mut() + .expect("should exist") + .iter + .advance_to_next_key(); + } + + pub(crate) fn rewind_no_await(&mut self) { + self.rewind() + .now_or_never() + .expect("should not pending") + .expect("should not err") + } +} + impl HummockIterator for MergeIterator where Node: Ord, diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 1fe231b99de21..c1c627e8bcf12 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -53,11 +53,6 @@ pub type SharedBufferVersionedEntry = (TableKey, Vec<(EpochWithGap, Hummo #[derive(Debug)] pub(crate) struct SharedBufferBatchInner { payload: Vec, - /// The list of imm ids that are merged into this batch - /// This field is immutable. - /// - /// Larger imm id at the front. - imm_ids: Vec, /// The epochs of the data in batch, sorted in ascending order (old to new) epochs: Vec, kv_count: usize, @@ -90,7 +85,6 @@ impl SharedBufferBatchInner { let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed); SharedBufferBatchInner { payload: items, - imm_ids: vec![batch_id], epochs: vec![epoch], kv_count, size, @@ -104,8 +98,8 @@ impl SharedBufferBatchInner { epochs: Vec, payload: Vec, num_items: usize, - imm_ids: Vec, size: usize, + imm_id: ImmId, tracker: Option, ) -> Self { assert!(!payload.is_empty()); @@ -114,21 +108,16 @@ impl SharedBufferBatchInner { .iter() .rev() .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap))); - debug_assert!(!imm_ids.is_empty()); - debug_assert!(imm_ids.iter().rev().is_sorted()); debug_assert!(!epochs.is_empty()); debug_assert!(epochs.is_sorted()); - let max_imm_id = *imm_ids.iter().max().unwrap(); - Self { payload, epochs, - imm_ids, kv_count: num_items, size, _tracker: tracker, - batch_id: max_imm_id, + batch_id: imm_id, } } @@ -262,10 +251,6 @@ impl SharedBufferBatch { self.table_id } - pub fn is_merged_imm(&self) -> bool { - !self.inner.imm_ids.is_empty() - } - pub fn min_epoch(&self) -> HummockEpoch { *self.inner.epochs.first().unwrap() } @@ -274,11 +259,6 @@ impl SharedBufferBatch { *self.inner.epochs.last().unwrap() } - pub fn get_imm_ids(&self) -> &Vec { - debug_assert!(!self.inner.imm_ids.is_empty()); - &self.inner.imm_ids - } - pub fn kv_count(&self) -> usize { self.inner.kv_count } @@ -502,6 +482,22 @@ impl SharedBufferBatchIterator { } } +impl SharedBufferBatchIterator { + pub(crate) fn advance_to_next_key(&mut self) { + assert_eq!(self.current_version_idx, 0); + self.current_idx += 1; + } + + pub(crate) fn current_key_items( + &self, + ) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { + assert!(self.is_valid(), "iterator is not valid"); + assert_eq!(self.current_version_idx, 0); + let (key, values) = &self.inner.payload[self.current_idx]; + (key, &values) + } +} + impl HummockIterator for SharedBufferBatchIterator { type Direction = D; @@ -1116,9 +1112,7 @@ mod tests { ]; // newer data comes first let imms = vec![imm3, imm2, imm1]; - let merged_imm = merge_imms_in_memory(table_id, 0, imms.clone(), None) - .await - .unwrap(); + let merged_imm = merge_imms_in_memory(table_id, 0, imms.clone(), None).await; // Point lookup for (i, items) in batch_items.iter().enumerate() { diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 3f95e8fe0e248..a9abac35ede13 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -117,7 +117,7 @@ impl StagingSstableInfo { #[derive(Clone)] pub enum StagingData { ImmMem(ImmutableMemtable), - MergedImmMem(ImmutableMemtable), + MergedImmMem(ImmutableMemtable, Vec), Sst(StagingSstableInfo), } @@ -266,8 +266,8 @@ impl HummockReadVersion { self.staging.imm.push_front(imm) } - StagingData::MergedImmMem(merged_imm) => { - self.add_merged_imm(merged_imm); + StagingData::MergedImmMem(merged_imm, imm_ids) => { + self.add_merged_imm(merged_imm, imm_ids); } StagingData::Sst(staging_sst) => { // The following properties must be ensured: @@ -415,9 +415,11 @@ impl HummockReadVersion { .cloned() } - pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable) { - assert!(merged_imm.get_imm_ids().iter().rev().is_sorted()); - let min_imm_id = *merged_imm.get_imm_ids().last().expect("non-empty"); + /// `imm_ids` is the list of imm ids that are merged into this batch + /// This field is immutable. Larger imm id at the front. + pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable, imm_ids: Vec) { + assert!(imm_ids.iter().rev().is_sorted()); + let min_imm_id = *imm_ids.last().expect("non-empty"); let back = self.staging.imm.back().expect("should not be empty"); @@ -451,9 +453,7 @@ impl HummockReadVersion { unreachable!( "must have break in equal: {:?} {:?} {:?}", - remaining_staging_imm_ids, - earlier_imm_ids, - merged_imm.get_imm_ids() + remaining_staging_imm_ids, earlier_imm_ids, imm_ids ) } } @@ -471,13 +471,13 @@ impl HummockReadVersion { .map(|imm| imm.batch_id()) .collect_vec() }, - merged_imm.get_imm_ids() + imm_ids ); None }; // iter from smaller imm and take the older imm at the back. - for imm_id in merged_imm.get_imm_ids().iter().rev() { + for imm_id in imm_ids.iter().rev() { let imm = self.staging.imm.pop_back().expect("should exist"); assert_eq!( imm.batch_id(), @@ -490,7 +490,7 @@ impl HummockReadVersion { .map(|imm| imm.batch_id()) .collect_vec() }, - merged_imm.get_imm_ids(), + imm_ids, imm_id, ); }