Skip to content

Commit

Permalink
replace tuple with struct
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 28, 2024
1 parent 2abc32f commit e8459b4
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 36 deletions.
21 changes: 14 additions & 7 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ pub async fn merge_imms_in_memory(
mi.rewind_no_await();
assert!(mi.is_valid());

let first_item_key = mi.current_key_items().0.clone();
let first_item_key = mi.current_key_entry().key.clone();

let mut merged_payload: Vec<SharedBufferVersionedEntry> = Vec::new();

Expand All @@ -327,28 +327,35 @@ pub async fn merge_imms_in_memory(
let mut table_key_versions: Vec<(EpochWithGap, HummockValue<Bytes>)> = Vec::new();

while mi.is_valid() {
let (key, values) = mi.current_key_items();
let key_entry = mi.current_key_entry();
let user_key = UserKey {
table_id,
table_key: key.clone(),
table_key: key_entry.key.clone(),
};
if let Some(last_full_key) = full_key_tracker.observe_multi_version(
user_key,
values.iter().map(|(epoch_with_gap, _)| *epoch_with_gap),
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, _)| *epoch_with_gap),
) {
let last_user_key = last_full_key.user_key;
// `epoch_with_gap` of the `last_full_key` may not reflect the real epoch in the items
// and should not be used because we use max epoch to initialize the tracker
let _epoch_with_gap = last_full_key.epoch_with_gap;

// Record kv entries
merged_payload.push((last_user_key.table_key, table_key_versions));
merged_payload.push(SharedBufferVersionedEntry::new(
last_user_key.table_key,
table_key_versions,
));

// Reset state before moving onto the new table key
table_key_versions = vec![];
}
table_key_versions.extend(
values
key_entry
.new_values
.iter()
.map(|(epoch_with_gap, value)| (*epoch_with_gap, value.clone())),
);
Expand All @@ -358,7 +365,7 @@ pub async fn merge_imms_in_memory(

// process the last key
if !table_key_versions.is_empty() {
merged_payload.push((
merged_payload.push(SharedBufferVersionedEntry::new(
full_key_tracker.latest_full_key.user_key.table_key,
table_key_versions,
));
Expand Down
12 changes: 6 additions & 6 deletions src/storage/src/hummock/iterator/merge_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ use std::collections::binary_heap::PeekMut;
use std::collections::{BinaryHeap, LinkedList};
use std::ops::{Deref, DerefMut};

use bytes::Bytes;
use futures::FutureExt;
use risingwave_hummock_sdk::key::{FullKey, TableKey};
use risingwave_hummock_sdk::EpochWithGap;
use risingwave_hummock_sdk::key::FullKey;

use super::Forward;
use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection};
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator;
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatchIterator, SharedBufferVersionedEntry,
};
use crate::hummock::value::HummockValue;
use crate::hummock::HummockResult;
use crate::monitor::StoreLocalStatistic;
Expand Down Expand Up @@ -100,12 +100,12 @@ impl<I: HummockIterator> MergeIterator<I> {

impl MergeIterator<SharedBufferBatchIterator<Forward>> {
/// Used in `merge_imms_in_memory` to merge immutable memtables.
pub fn current_key_items(&self) -> (&TableKey<Bytes>, &[(EpochWithGap, HummockValue<Bytes>)]) {
pub fn current_key_entry(&self) -> &SharedBufferVersionedEntry {
self.heap
.peek()
.expect("no inner iter for imm merge")
.iter
.current_key_items()
.current_key_entry()
}
}

Expand Down
56 changes: 33 additions & 23 deletions src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ pub type SharedBufferBatchId = u64;
/// A shared buffer may contain data from multiple epochs,
/// there are multiple versions for a given key (`table_key`), we put those versions into a vector
/// and sort them in descending order, aka newest to oldest.
pub type SharedBufferVersionedEntry = (TableKey<Bytes>, Vec<(EpochWithGap, HummockValue<Bytes>)>);
#[derive(PartialEq, Debug)]
pub struct SharedBufferVersionedEntry {
pub(crate) key: TableKey<Bytes>,
pub(crate) new_values: Vec<(EpochWithGap, HummockValue<Bytes>)>,
}

impl SharedBufferVersionedEntry {
pub fn new(key: TableKey<Bytes>, new_values: Vec<(EpochWithGap, HummockValue<Bytes>)>) -> Self {
Self { key, new_values }
}
}

#[derive(Debug)]
pub(crate) struct SharedBufferBatchInner {
Expand Down Expand Up @@ -79,7 +89,7 @@ impl SharedBufferBatchInner {
let epoch_with_gap = EpochWithGap::new(epoch, spill_offset);
let items = payload
.into_iter()
.map(|(k, v)| (k, vec![(epoch_with_gap, v)]))
.map(|(k, v)| SharedBufferVersionedEntry::new(k, vec![(epoch_with_gap, v)]))
.collect_vec();

let batch_id = SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed);
Expand All @@ -103,8 +113,9 @@ impl SharedBufferBatchInner {
tracker: Option<MemoryTracker>,
) -> Self {
assert!(!payload.is_empty());
debug_assert!(payload.iter().is_sorted_by_key(|(key, _)| key));
debug_assert!(payload.iter().all(|(_, values)| values
debug_assert!(payload.iter().is_sorted_by_key(|entry| &entry.key));
debug_assert!(payload.iter().all(|entry| entry
.new_values
.iter()
.rev()
.is_sorted_by_key(|(epoch_with_gap, _)| epoch_with_gap)));
Expand All @@ -129,11 +140,14 @@ impl SharedBufferBatchInner {
read_epoch: HummockEpoch,
) -> Option<(HummockValue<Bytes>, EpochWithGap)> {
// Perform binary search on table key to find the corresponding entry
if let Ok(i) = self.payload.binary_search_by(|m| (m.0[..]).cmp(*table_key)) {
if let Ok(i) = self
.payload
.binary_search_by(|m| (m.key.as_ref()).cmp(*table_key))
{
let item = &self.payload[i];
assert_eq!(item.0.as_ref(), *table_key);
assert_eq!(item.key.as_ref(), *table_key);
// Scan to find the first version <= epoch
for (e, v) in &item.1 {
for (e, v) in &item.new_values {
// skip invisible versions
if read_epoch < e.pure_epoch() {
continue;
Expand Down Expand Up @@ -275,7 +289,7 @@ impl SharedBufferBatch {
pub fn range_exists(&self, table_key_range: &TableKeyRange) -> bool {
self.inner
.binary_search_by(|m| {
let key = &m.0;
let key = &m.key;
let too_left = match &table_key_range.0 {
std::ops::Bound::Included(range_start) => range_start.as_ref() > key.as_ref(),
std::ops::Bound::Excluded(range_start) => range_start.as_ref() >= key.as_ref(),
Expand Down Expand Up @@ -317,17 +331,17 @@ impl SharedBufferBatch {

#[inline(always)]
pub fn start_table_key(&self) -> TableKey<&[u8]> {
TableKey(self.inner.payload.first().expect("non-empty").0.as_ref())
TableKey(self.inner.payload.first().expect("non-empty").key.as_ref())
}

#[inline(always)]
pub fn end_table_key(&self) -> TableKey<&[u8]> {
TableKey(self.inner.payload.last().expect("non-empty").0.as_ref())
TableKey(self.inner.payload.last().expect("non-empty").key.as_ref())
}

#[inline(always)]
pub fn raw_largest_key(&self) -> &TableKey<Bytes> {
&self.inner.payload.last().expect("non-empty").0
&self.inner.payload.last().expect("non-empty").key
}

/// return inclusive left endpoint, which means that all data in this batch should be larger or
Expand Down Expand Up @@ -386,7 +400,7 @@ impl SharedBufferBatch {
let idx = match self
.inner
.payload
.binary_search_by(|m| (m.0[..]).cmp(seek_key.as_slice()))
.binary_search_by(|m| (m.key.as_ref()).cmp(seek_key.as_slice()))
{
Ok(idx) => idx,
Err(idx) => idx,
Expand All @@ -395,11 +409,11 @@ impl SharedBufferBatch {
break;
}
let item = &self.inner.payload[idx];
if item.0.len() <= VirtualNode::SIZE {
if item.key.len() <= VirtualNode::SIZE {
break;
}
let current_vnode_id = VirtualNode::from_be_bytes(
item.0.as_ref()[..VirtualNode::SIZE]
item.key.as_ref()[..VirtualNode::SIZE]
.try_into()
.expect("slice with incorrect length"),
)
Expand Down Expand Up @@ -456,7 +470,7 @@ impl<D: HummockIteratorDirection> SharedBufferBatchIterator<D> {
DirectionEnum::Forward => self.current_idx,
DirectionEnum::Backward => self.inner.len() - self.current_idx - 1,
};
&self.inner.get(idx).unwrap().1
&self.inner.get(idx).unwrap().new_values
}

fn current_versions_len(&self) -> i32 {
Expand All @@ -477,8 +491,7 @@ impl<D: HummockIteratorDirection> SharedBufferBatchIterator<D> {
),
};
let cur_entry = self.inner.get(idx).unwrap();
let value = &cur_entry.1[version_idx as usize];
(&cur_entry.0, value)
(&cur_entry.key, &cur_entry.new_values[version_idx as usize])
}
}

Expand All @@ -488,13 +501,10 @@ impl SharedBufferBatchIterator<Forward> {
self.current_idx += 1;
}

pub(crate) fn current_key_items(
&self,
) -> (&TableKey<Bytes>, &[(EpochWithGap, HummockValue<Bytes>)]) {
pub(crate) fn current_key_entry(&self) -> &SharedBufferVersionedEntry {
assert!(self.is_valid(), "iterator is not valid");
assert_eq!(self.current_version_idx, 0);
let (key, values) = &self.inner.payload[self.current_idx];
(key, &values)
&self.inner.payload[self.current_idx]
}
}

Expand Down Expand Up @@ -563,7 +573,7 @@ impl<D: HummockIteratorDirection> HummockIterator for SharedBufferBatchIterator<
// by table key.
let partition_point = self
.inner
.binary_search_by(|probe| probe.0[..].cmp(*key.user_key.table_key));
.binary_search_by(|probe| probe.key.as_ref().cmp(*key.user_key.table_key));
let seek_key_epoch = key.epoch_with_gap;
match D::direction() {
DirectionEnum::Forward => match partition_point {
Expand Down

0 comments on commit e8459b4

Please sign in to comment.