From e8459b474ab061d05fe9d79361fedf79cedbe767 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 29 Jan 2024 00:57:15 +0800 Subject: [PATCH] replace tuple with struct --- .../compactor/shared_buffer_compact.rs | 21 ++++--- .../src/hummock/iterator/merge_inner.rs | 12 ++-- .../shared_buffer/shared_buffer_batch.rs | 56 +++++++++++-------- 3 files changed, 53 insertions(+), 36 deletions(-) diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 9c3d30a31f633..2c72a64e0f8e4 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -314,7 +314,7 @@ pub async fn merge_imms_in_memory( mi.rewind_no_await(); assert!(mi.is_valid()); - let first_item_key = mi.current_key_items().0.clone(); + let first_item_key = mi.current_key_entry().key.clone(); let mut merged_payload: Vec = Vec::new(); @@ -327,14 +327,17 @@ pub async fn merge_imms_in_memory( let mut table_key_versions: Vec<(EpochWithGap, HummockValue)> = Vec::new(); while mi.is_valid() { - let (key, values) = mi.current_key_items(); + let key_entry = mi.current_key_entry(); let user_key = UserKey { table_id, - table_key: key.clone(), + table_key: key_entry.key.clone(), }; if let Some(last_full_key) = full_key_tracker.observe_multi_version( user_key, - values.iter().map(|(epoch_with_gap, _)| *epoch_with_gap), + key_entry + .new_values + .iter() + .map(|(epoch_with_gap, _)| *epoch_with_gap), ) { let last_user_key = last_full_key.user_key; // `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items @@ -342,13 +345,17 @@ pub async fn merge_imms_in_memory( let _epoch_with_gap = last_full_key.epoch_with_gap; // Record kv entries - merged_payload.push((last_user_key.table_key, table_key_versions)); + merged_payload.push(SharedBufferVersionedEntry::new( + last_user_key.table_key, + table_key_versions, + )); // Reset state before moving onto the new table key table_key_versions = vec![]; } table_key_versions.extend( - values + key_entry + .new_values .iter() .map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())), ); @@ -358,7 +365,7 @@ pub async fn merge_imms_in_memory( // process the last key if !table_key_versions.is_empty() { - merged_payload.push(( + merged_payload.push(SharedBufferVersionedEntry::new( full_key_tracker.latest_full_key.user_key.table_key, table_key_versions, )); diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index 1427005a3b7e4..471a1038b7496 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -16,14 +16,14 @@ use std::collections::binary_heap::PeekMut; use std::collections::{BinaryHeap, LinkedList}; use std::ops::{Deref, DerefMut}; -use bytes::Bytes; use futures::FutureExt; -use risingwave_hummock_sdk::key::{FullKey, TableKey}; -use risingwave_hummock_sdk::EpochWithGap; +use risingwave_hummock_sdk::key::FullKey; use super::Forward; use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; -use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator; +use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatchIterator, SharedBufferVersionedEntry, +}; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; @@ -100,12 +100,12 @@ impl MergeIterator { impl MergeIterator> { /// Used in `merge_imms_in_memory` to merge immutable memtables. - pub fn current_key_items(&self) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { + pub fn current_key_entry(&self) -> &SharedBufferVersionedEntry { self.heap .peek() .expect("no inner iter for imm merge") .iter - .current_key_items() + .current_key_entry() } } diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index c1c627e8bcf12..5f0c150b208fa 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -48,7 +48,17 @@ pub type SharedBufferBatchId = u64; /// A shared buffer may contain data from multiple epochs, /// there are multiple versions for a given key (`table_key`), we put those versions into a vector /// and sort them in descending order, aka newest to oldest. -pub type SharedBufferVersionedEntry = (TableKey, Vec<(EpochWithGap, HummockValue)>); +#[derive(PartialEq, Debug)] +pub struct SharedBufferVersionedEntry { + pub(crate) key: TableKey, + pub(crate) new_values: Vec<(EpochWithGap, HummockValue)>, +} + +impl SharedBufferVersionedEntry { + pub fn new(key: TableKey, new_values: Vec<(EpochWithGap, HummockValue)>) -> Self { + Self { key, new_values } + } +} #[derive(Debug)] pub(crate) struct SharedBufferBatchInner { @@ -79,7 +89,7 @@ impl SharedBufferBatchInner { let epoch_with_gap = EpochWithGap::new(epoch, spill_offset); let items = payload .into_iter() - .map(|(k, v)| (k, vec![(epoch_with_gap, v)])) + .map(|(k, v)| SharedBufferVersionedEntry::new(k, vec![(epoch_with_gap, v)])) .collect_vec(); let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed); @@ -103,8 +113,9 @@ impl SharedBufferBatchInner { tracker: Option, ) -> Self { assert!(!payload.is_empty()); - debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key)); - debug_assert!(payload.iter().all(|(_, values)| values + debug_assert!(payload.iter().is_sorted_by_key(|entry| &entry.key)); + debug_assert!(payload.iter().all(|entry| entry + .new_values .iter() .rev() .is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap))); @@ -129,11 +140,14 @@ impl SharedBufferBatchInner { read_epoch: HummockEpoch, ) -> Option<(HummockValue, EpochWithGap)> { // Perform binary search on table key to find the corresponding entry - if let Ok(i) = self.payload.binary_search_by(|m| (m.0[..]).cmp(*table_key)) { + if let Ok(i) = self + .payload + .binary_search_by(|m| (m.key.as_ref()).cmp(*table_key)) + { let item = &self.payload[i]; - assert_eq!(item.0.as_ref(), *table_key); + assert_eq!(item.key.as_ref(), *table_key); // Scan to find the first version <= epoch - for (e, v) in &item.1 { + for (e, v) in &item.new_values { // skip invisible versions if read_epoch < e.pure_epoch() { continue; @@ -275,7 +289,7 @@ impl SharedBufferBatch { pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool { self.inner .binary_search_by(|m| { - let key = &m.0; + let key = &m.key; let too_left = match &table_key_range.0 { std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(), std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(), @@ -317,17 +331,17 @@ impl SharedBufferBatch { #[inline(always)] pub fn start_table_key(&self) -> TableKey<&[u8]> { - TableKey(self.inner.payload.first().expect("non-empty").0.as_ref()) + TableKey(self.inner.payload.first().expect("non-empty").key.as_ref()) } #[inline(always)] pub fn end_table_key(&self) -> TableKey<&[u8]> { - TableKey(self.inner.payload.last().expect("non-empty").0.as_ref()) + TableKey(self.inner.payload.last().expect("non-empty").key.as_ref()) } #[inline(always)] pub fn raw_largest_key(&self) -> &TableKey { - &self.inner.payload.last().expect("non-empty").0 + &self.inner.payload.last().expect("non-empty").key } /// return inclusive left endpoint, which means that all data in this batch should be larger or @@ -386,7 +400,7 @@ impl SharedBufferBatch { let idx = match self .inner .payload - .binary_search_by(|m| (m.0[..]).cmp(seek_key.as_slice())) + .binary_search_by(|m| (m.key.as_ref()).cmp(seek_key.as_slice())) { Ok(idx) => idx, Err(idx) => idx, @@ -395,11 +409,11 @@ impl SharedBufferBatch { break; } let item = &self.inner.payload[idx]; - if item.0.len() <= VirtualNode::SIZE { + if item.key.len() <= VirtualNode::SIZE { break; } let current_vnode_id = VirtualNode::from_be_bytes( - item.0.as_ref()[..VirtualNode::SIZE] + item.key.as_ref()[..VirtualNode::SIZE] .try_into() .expect("slice with incorrect length"), ) @@ -456,7 +470,7 @@ impl SharedBufferBatchIterator { DirectionEnum::Forward => self.current_idx, DirectionEnum::Backward => self.inner.len() - self.current_idx - 1, }; - &self.inner.get(idx).unwrap().1 + &self.inner.get(idx).unwrap().new_values } fn current_versions_len(&self) -> i32 { @@ -477,8 +491,7 @@ impl SharedBufferBatchIterator { ), }; let cur_entry = self.inner.get(idx).unwrap(); - let value = &cur_entry.1[version_idx as usize]; - (&cur_entry.0, value) + (&cur_entry.key, &cur_entry.new_values[version_idx as usize]) } } @@ -488,13 +501,10 @@ impl SharedBufferBatchIterator { self.current_idx += 1; } - pub(crate) fn current_key_items( - &self, - ) -> (&TableKey, &[(EpochWithGap, HummockValue)]) { + pub(crate) fn current_key_entry(&self) -> &SharedBufferVersionedEntry { assert!(self.is_valid(), "iterator is not valid"); assert_eq!(self.current_version_idx, 0); - let (key, values) = &self.inner.payload[self.current_idx]; - (key, &values) + &self.inner.payload[self.current_idx] } } @@ -563,7 +573,7 @@ impl HummockIterator for SharedBufferBatchIterator< // by table key. let partition_point = self .inner - .binary_search_by(|probe| probe.0[..].cmp(*key.user_key.table_key)); + .binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key)); let seek_key_epoch = key.epoch_with_gap; match D::direction() { DirectionEnum::Forward => match partition_point {