Skip to content

Commit

Permalink
refactor(storage): hide inner value of TableKey
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 27, 2023
1 parent 2348a2b commit f8291c4
Show file tree
Hide file tree
Showing 31 changed files with 311 additions and 222 deletions.
2 changes: 1 addition & 1 deletion src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl HummockJavaBindingIterator {
let item = self.stream.try_next().await?;
Ok(match item {
Some((key, value)) => Some((
key.user_key.table_key.0,
key.user_key.table_key.into_inner(),
OwnedRow::new(
self.row_serde
.deserialize(&value)
Expand Down
6 changes: 5 additions & 1 deletion src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ async fn build_table(
for i in range {
let start = (i % 8) as usize;
let end = start + 8;
full_key.user_key.table_key[table_key_len - 8..].copy_from_slice(&i.to_be_bytes());
full_key
.user_key
.table_key
.range_mut(table_key_len - 8..)
.copy_from_slice(&i.to_be_bytes());
builder
.add_for_test(full_key.to_ref(), HummockValue::put(&value[start..end]))
.await
Expand Down
4 changes: 2 additions & 2 deletions src/storage/benches/bench_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn gen_interleave_shared_buffer_batch_iter(
let mut batch_data = vec![];
for j in 0..batch_size {
batch_data.push((
TableKey(Bytes::copy_from_slice(
TableKey::new(Bytes::copy_from_slice(
format!("test_key_{:08}", j * batch_count + i).as_bytes(),
)),
HummockValue::put(Bytes::copy_from_slice("value".as_bytes())),
Expand Down Expand Up @@ -66,7 +66,7 @@ fn gen_interleave_shared_buffer_batch_enum_iter(
let mut batch_data = vec![];
for j in 0..batch_size {
batch_data.push((
TableKey(Bytes::copy_from_slice(
TableKey::new(Bytes::copy_from_slice(
format!("test_key_{:08}", j * batch_count + i).as_bytes(),
)),
HummockValue::put(Bytes::copy_from_slice("value".as_bytes())),
Expand Down
81 changes: 49 additions & 32 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::borrow::Borrow;
use std::cmp::Ordering;
use std::fmt::Debug;
use std::ops::Bound::*;
use std::ops::{Bound, Deref, DerefMut, RangeBounds};
use std::ops::{Bound, Deref, RangeBounds};
use std::ptr;
use std::slice::SliceIndex;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use risingwave_common::catalog::TableId;
Expand Down Expand Up @@ -371,7 +372,7 @@ impl CopyFromSlice for Bytes {
/// Its name come from the assumption that Hummock is always accessed by a table-like structure
/// identified by a [`TableId`].
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct TableKey<T: AsRef<[u8]>>(pub T);
pub struct TableKey<T: AsRef<[u8]>>(T);

impl<T: AsRef<[u8]>> Debug for TableKey<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -387,19 +388,26 @@ impl<T: AsRef<[u8]>> Deref for TableKey<T> {
}
}

impl<T: AsRef<[u8]>> DerefMut for TableKey<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl<T: AsRef<[u8]>> AsRef<[u8]> for TableKey<T> {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

impl<T: AsRef<[u8]>> TableKey<T> {
pub fn new(value: T) -> Self {
debug_assert!(
value.as_ref().len() > VirtualNode::SIZE,
"too short table key: {:?}",
value.as_ref()
);
Self(value)
}

pub fn into_inner(self) -> T {
self.0
}

pub fn vnode_part(&self) -> VirtualNode {
VirtualNode::from_be_bytes(
self.0.as_ref()[..VirtualNode::SIZE]
Expand All @@ -413,6 +421,12 @@ impl<T: AsRef<[u8]>> TableKey<T> {
}
}

impl TableKey<Vec<u8>> {
pub fn range_mut(&mut self, range: impl SliceIndex<[u8], Output = [u8]>) -> &mut [u8] {
&mut self.0[range]
}
}

impl<T: AsRef<[u8]>> Borrow<[u8]> for TableKey<T> {
fn borrow(&self) -> &[u8] {
self.0.as_ref()
Expand All @@ -427,7 +441,7 @@ impl EstimateSize for TableKey<Bytes> {

#[inline]
pub fn map_table_key_range(range: (Bound<KeyPayloadType>, Bound<KeyPayloadType>)) -> TableKeyRange {
(range.0.map(TableKey), range.1.map(TableKey))
(range.0.map(TableKey::new), range.1.map(TableKey::new))
}

/// [`UserKey`] is is an internal concept in storage. In the storage interface, user specifies
Expand Down Expand Up @@ -465,7 +479,7 @@ impl<T: AsRef<[u8]>> UserKey<T> {
pub fn for_test(table_id: TableId, table_key: T) -> Self {
Self {
table_id,
table_key: TableKey(table_key),
table_key: TableKey::new(table_key),
}
}

Expand Down Expand Up @@ -514,7 +528,7 @@ impl<'a> UserKey<&'a [u8]> {

Self {
table_id: TableId::new(table_id),
table_key: TableKey(&slice[TABLE_PREFIX_LEN..]),
table_key: TableKey::new(&slice[TABLE_PREFIX_LEN..]),
}
}

Expand All @@ -525,7 +539,7 @@ impl<'a> UserKey<&'a [u8]> {
pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(self) -> UserKey<T> {
UserKey {
table_id: self.table_id,
table_key: TableKey(T::copy_from_slice(self.table_key.0)),
table_key: TableKey::new(T::copy_from_slice(self.table_key.0)),
}
}
}
Expand All @@ -534,14 +548,14 @@ impl<'a, T: AsRef<[u8]> + Clone> UserKey<&'a T> {
pub fn cloned(self) -> UserKey<T> {
UserKey {
table_id: self.table_id,
table_key: TableKey(self.table_key.0.clone()),
table_key: TableKey::new(self.table_key.0.clone()),
}
}
}

impl<T: AsRef<[u8]>> UserKey<T> {
pub fn as_ref(&self) -> UserKey<&[u8]> {
UserKey::new(self.table_id, TableKey(self.table_key.as_ref()))
UserKey::new(self.table_id, TableKey::new(self.table_key.as_ref()))
}
}

Expand All @@ -551,7 +565,7 @@ impl UserKey<Vec<u8>> {
let len = buf.get_u32() as usize;
let data = buf[..len].to_vec();
buf.advance(len);
UserKey::new(TableId::new(table_id), TableKey(data))
UserKey::new(TableId::new(table_id), TableKey::new(data))
}

pub fn extend_from_other(&mut self, other: &UserKey<&[u8]>) {
Expand All @@ -564,14 +578,14 @@ impl UserKey<Vec<u8>> {
/// table key without reallocating a new `UserKey` object.
pub fn set(&mut self, other: UserKey<&[u8]>) {
self.table_id = other.table_id;
self.table_key.clear();
self.table_key.extend_from_slice(other.table_key.as_ref());
self.table_key.0.clear();
self.table_key.0.extend_from_slice(other.table_key.as_ref());
}

pub fn into_bytes(self) -> UserKey<Bytes> {
UserKey {
table_id: self.table_id,
table_key: TableKey(Bytes::from(self.table_key.0)),
table_key: TableKey::new(Bytes::from(self.table_key.0)),
}
}
}
Expand Down Expand Up @@ -691,7 +705,10 @@ impl<'a> FullKey<&'a [u8]> {
let epoch = (&slice_without_table_id[epoch_pos..]).get_u64();

Self {
user_key: UserKey::new(table_id, TableKey(&slice_without_table_id[..epoch_pos])),
user_key: UserKey::new(
table_id,
TableKey::new(&slice_without_table_id[..epoch_pos]),
),
epoch_with_gap: EpochWithGap::from_u64(epoch),
}
}
Expand Down Expand Up @@ -835,19 +852,19 @@ pub fn bound_table_key_range<T: AsRef<[u8]> + EmptySliceRef>(
table_key_range: &impl RangeBounds<TableKey<T>>,
) -> (Bound<UserKey<&T>>, Bound<UserKey<&T>>) {
let start = match table_key_range.start_bound() {
Included(b) => Included(UserKey::new(table_id, TableKey(&b.0))),
Excluded(b) => Excluded(UserKey::new(table_id, TableKey(&b.0))),
Unbounded => Included(UserKey::new(table_id, TableKey(T::empty_slice_ref()))),
Included(b) => Included(UserKey::new(table_id, TableKey::new(&b.0))),
Excluded(b) => Excluded(UserKey::new(table_id, TableKey::new(&b.0))),
Unbounded => Included(UserKey::new(table_id, TableKey::new(T::empty_slice_ref()))),
};

let end = match table_key_range.end_bound() {
Included(b) => Included(UserKey::new(table_id, TableKey(&b.0))),
Excluded(b) => Excluded(UserKey::new(table_id, TableKey(&b.0))),
Included(b) => Included(UserKey::new(table_id, TableKey::new(&b.0))),
Excluded(b) => Excluded(UserKey::new(table_id, TableKey::new(&b.0))),
Unbounded => {
if let Some(next_table_id) = table_id.table_id().checked_add(1) {
Excluded(UserKey::new(
next_table_id.into(),
TableKey(T::empty_slice_ref()),
TableKey::new(T::empty_slice_ref()),
))
} else {
Unbounded
Expand Down Expand Up @@ -913,8 +930,8 @@ mod tests {
bound_table_key_range(
TableId::default(),
&(
Included(TableKey(b"a".to_vec())),
Included(TableKey(b"b".to_vec()))
Included(TableKey::new(b"a".to_vec())),
Included(TableKey::new(b"b".to_vec()))
)
),
(
Expand All @@ -925,7 +942,7 @@ mod tests {
assert_eq!(
bound_table_key_range(
TableId::from(1),
&(Included(TableKey(b"a".to_vec())), Unbounded)
&(Included(TableKey::new(b"a".to_vec())), Unbounded)
),
(
Included(UserKey::for_test(TableId::from(1), &b"a".to_vec())),
Expand All @@ -935,7 +952,7 @@ mod tests {
assert_eq!(
bound_table_key_range(
TableId::from(u32::MAX),
&(Included(TableKey(b"a".to_vec())), Unbounded)
&(Included(TableKey::new(b"a".to_vec())), Unbounded)
),
(
Included(UserKey::for_test(TableId::from(u32::MAX), &b"a".to_vec())),
Expand Down Expand Up @@ -994,9 +1011,9 @@ mod tests {

#[test]
fn test_uesr_key_order() {
let a = UserKey::new(TableId::new(1), TableKey(b"aaa".to_vec()));
let b = UserKey::new(TableId::new(2), TableKey(b"aaa".to_vec()));
let c = UserKey::new(TableId::new(2), TableKey(b"bbb".to_vec()));
let a = UserKey::new(TableId::new(1), TableKey::new(b"aaa".to_vec()));
let b = UserKey::new(TableId::new(2), TableKey::new(b"aaa".to_vec()));
let c = UserKey::new(TableId::new(2), TableKey::new(b"bbb".to_vec()));
assert!(a.lt(&b));
assert!(b.lt(&c));
let a = a.encode();
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/benches/bench_hummock_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn gen_interleave_shared_buffer_batch_iter(
let mut batch_data = vec![];
for j in 0..batch_size {
batch_data.push((
TableKey(Bytes::copy_from_slice(
TableKey::new(Bytes::copy_from_slice(
format!("test_key_{:08}", j * batch_count + i).as_bytes(),
)),
StorageValue::new_put(Bytes::copy_from_slice("value".as_bytes())),
Expand Down
23 changes: 13 additions & 10 deletions src/storage/hummock_test/src/bin/replay/replay_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ReplayItem>> {
self.inner.map(|item_res| {
item_res
.map(|(key, value)| (key.user_key.table_key.0.into(), value.into()))
.map(|(key, value)| (key.user_key.table_key.into_inner().into(), value.into()))
.map_err(|_| TraceError::IterFailed("iter failed to retrieve item".to_string()))
})
}
Expand All @@ -72,7 +72,10 @@ impl LocalReplayIter {
#[for_await]
for value in stream {
let value = value.unwrap();
inner.push((value.0.user_key.table_key.0.into(), value.1.into()));
inner.push((
value.0.user_key.table_key.into_inner().into(),
value.1.into(),
));
}
Self { inner }
}
Expand Down Expand Up @@ -107,8 +110,8 @@ impl ReplayRead for GlobalReplayImpl {
read_options: TracedReadOptions,
) -> Result<BoxStream<'static, Result<ReplayItem>>> {
let key_range = (
key_range.0.map(TracedBytes::into).map(TableKey),
key_range.1.map(TracedBytes::into).map(TableKey),
key_range.0.map(TracedBytes::into).map(TableKey::new),
key_range.1.map(TracedBytes::into).map(TableKey::new),
);

let iter = self
Expand All @@ -129,7 +132,7 @@ impl ReplayRead for GlobalReplayImpl {
) -> Result<Option<TracedBytes>> {
Ok(self
.store
.get(TableKey(key.into()), epoch, read_options.into())
.get(TableKey::new(key.into()), epoch, read_options.into())
.await
.unwrap()
.map(TracedBytes::from))
Expand Down Expand Up @@ -244,8 +247,8 @@ impl LocalReplayRead for LocalReplayImpl {
read_options: TracedReadOptions,
) -> Result<BoxStream<'static, Result<ReplayItem>>> {
let key_range = (
key_range.0.map(|b| TableKey(b.into())),
key_range.1.map(|b| TableKey(b.into())),
key_range.0.map(|b| TableKey::new(b.into())),
key_range.1.map(|b| TableKey::new(b.into())),
);

let iter = LocalStateStore::iter(&self.0, key_range, read_options.into())
Expand All @@ -263,7 +266,7 @@ impl LocalReplayRead for LocalReplayImpl {
read_options: TracedReadOptions,
) -> Result<Option<TracedBytes>> {
Ok(
LocalStateStore::get(&self.0, TableKey(key.into()), read_options.into())
LocalStateStore::get(&self.0, TableKey::new(key.into()), read_options.into())
.await
.unwrap()
.map(TracedBytes::from),
Expand All @@ -281,7 +284,7 @@ impl ReplayWrite for LocalReplayImpl {
) -> Result<()> {
LocalStateStore::insert(
&mut self.0,
TableKey(key.into()),
TableKey::new(key.into()),
new_val.into(),
old_val.map(|b| b.into()),
)
Expand All @@ -290,7 +293,7 @@ impl ReplayWrite for LocalReplayImpl {
}

fn delete(&mut self, key: TracedBytes, old_val: TracedBytes) -> Result<()> {
LocalStateStore::delete(&mut self.0, TableKey(key.into()), old_val.into()).unwrap();
LocalStateStore::delete(&mut self.0, TableKey::new(key.into()), old_val.into()).unwrap();
Ok(())
}
}
Expand Down
Loading

0 comments on commit f8291c4

Please sign in to comment.