Skip to content

Commit

Permalink
feat(storage): iter with key and value ref (#15565)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 12, 2024
1 parent 4747e5f commit df496cb
Show file tree
Hide file tree
Showing 33 changed files with 657 additions and 527 deletions.
33 changes: 15 additions & 18 deletions src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

use core::ops::Bound::Unbounded;

use futures::StreamExt;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::is_max_epoch;
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreRead};
use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreIter, StateStoreRead};

use crate::common::HummockServiceOpts;
use crate::CtlContext;
Expand All @@ -36,22 +35,20 @@ pub async fn list_kv(
tracing::info!("using MAX EPOCH as epoch");
}
let range = (Unbounded, Unbounded);
let mut scan_result = Box::pin(
hummock
.iter(
range,
epoch,
ReadOptions {
table_id: TableId { table_id },
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::NotFill,
..Default::default()
},
)
.await?,
);
while let Some(item) = scan_result.next().await {
let (k, v) = item?;
let mut scan_result = hummock
.iter(
range,
epoch,
ReadOptions {
table_id: TableId { table_id },
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::NotFill,
..Default::default()
},
)
.await?;
while let Some(item) = scan_result.try_next().await? {
let (k, v) = item;
let print_string = format!("[t{}]", k.user_key.table_id.table_id());
println!("{} {:?} => {:?}", print_string, k, v)
}
Expand Down
86 changes: 45 additions & 41 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use bytes::Bytes;
use futures::TryStreamExt;
use futures::{Stream, TryStreamExt};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
Expand All @@ -37,20 +37,31 @@ use risingwave_storage::hummock::{
};
use risingwave_storage::monitor::{global_hummock_state_store_metrics, HummockStateStoreMetrics};
use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter};
use risingwave_storage::store::{ReadOptions, StateStoreIterExt};
use risingwave_storage::table::KeyedRow;
use rw_futures_util::select_all;
use tokio::sync::mpsc::unbounded_channel;

type SelectAllIterStream = impl StateStoreReadIterStream + Unpin;
type SelectAllIterStream = impl Stream<Item = StorageResult<KeyedRow<Bytes>>> + Unpin;
type SingleIterStream = impl Stream<Item = StorageResult<KeyedRow<Bytes>>>;

fn select_all_vnode_stream(
streams: Vec<StreamTypeOfIter<HummockStorageIterator>>,
) -> SelectAllIterStream {
fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> SelectAllIterStream {
select_all(streams.into_iter().map(Box::pin))
}

pub struct HummockJavaBindingIterator {
fn to_deserialized_stream(
iter: HummockStorageIterator,
row_serde: EitherSerde,
) -> SingleIterStream {
iter.into_stream(move |(key, value)| {
Ok(KeyedRow::new(
key.user_key.table_key.copy_into(),
row_serde.deserialize(value).map(OwnedRow::new)?,
))
})
}

pub struct HummockJavaBindingIterator {
stream: SelectAllIterStream,
}

Expand Down Expand Up @@ -87,6 +98,28 @@ impl HummockJavaBindingIterator {
0,
);

let table = read_plan.table_catalog.unwrap();
let versioned = table.version.is_some();
let table_columns = table
.columns
.into_iter()
.map(|c| ColumnDesc::from(c.column_desc.unwrap()));

// Decide which serializer to use based on whether the table is versioned or not.
let row_serde: EitherSerde = if versioned {
ColumnAwareSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
} else {
BasicSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
};

let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
let key_range = read_plan.key_range.unwrap();
let pin_version = PinnedVersion::new(
Expand All @@ -104,7 +137,7 @@ impl HummockJavaBindingIterator {
key_range,
read_plan.epoch,
);
let stream = reader
let iter = reader
.iter(
key_range,
read_plan.epoch,
Expand All @@ -116,45 +149,16 @@ impl HummockJavaBindingIterator {
read_version_tuple,
)
.await?;
streams.push(stream);
streams.push(to_deserialized_stream(iter, row_serde.clone()));
}

let stream = select_all_vnode_stream(streams);

let table = read_plan.table_catalog.unwrap();
let versioned = table.version.is_some();
let table_columns = table
.columns
.into_iter()
.map(|c| ColumnDesc::from(c.column_desc.unwrap()));

// Decide which serializer to use based on whether the table is versioned or not.
let row_serde = if versioned {
ColumnAwareSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
} else {
BasicSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
};

Ok(Self { row_serde, stream })
Ok(Self { stream })
}

pub async fn next(&mut self) -> StorageResult<Option<(Bytes, OwnedRow)>> {
let item = self.stream.try_next().await?;
Ok(match item {
Some((key, value)) => Some((
key.user_key.table_key.0,
OwnedRow::new(self.row_serde.deserialize(&value)?),
)),
None => None,
})
pub async fn next(&mut self) -> StorageResult<Option<KeyedRow<Bytes>>> {
self.stream.try_next().await
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,11 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNext<'a>(
iter.cursor = None;
Ok(JNI_FALSE)
}
Some((key, row)) => {
Some(keyed_row) => {
let (key, row) = keyed_row.into_parts();
iter.cursor = Some(RowCursor {
row,
extra: RowExtra::Key(key),
extra: RowExtra::Key(key.0),
});
Ok(JNI_TRUE)
}
Expand Down
92 changes: 53 additions & 39 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,23 @@ pub fn prefixed_range_with_vnode<B: AsRef<[u8]>>(
map_table_key_range((start, end))
}

pub trait SetSlice<S: AsRef<[u8]> + ?Sized> {
fn set(&mut self, value: &S);
}

impl<S: AsRef<[u8]> + ?Sized> SetSlice<S> for Vec<u8> {
fn set(&mut self, value: &S) {
self.clear();
self.extend_from_slice(value.as_ref());
}
}

impl SetSlice<Bytes> for Bytes {
fn set(&mut self, value: &Bytes) {
*self = value.clone()
}
}

pub trait CopyFromSlice {
fn copy_from_slice(slice: &[u8]) -> Self;
}
Expand Down Expand Up @@ -484,6 +501,12 @@ impl EstimateSize for TableKey<Bytes> {
}
}

impl<'a> TableKey<&'a [u8]> {
pub fn copy_into<T: CopyFromSlice + AsRef<[u8]>>(&self) -> TableKey<T> {
TableKey(T::copy_from_slice(self.as_ref()))
}
}

#[inline]
pub fn map_table_key_range(range: (Bound<KeyPayloadType>, Bound<KeyPayloadType>)) -> TableKeyRange {
(range.0.map(TableKey), range.1.map(TableKey))
Expand Down Expand Up @@ -624,21 +647,22 @@ impl UserKey<Vec<u8>> {
buf.advance(len);
UserKey::new(TableId::new(table_id), TableKey(data))
}
}

pub fn extend_from_other(&mut self, other: &UserKey<&[u8]>) {
self.table_id = other.table_id;
self.table_key.0.clear();
self.table_key.0.extend_from_slice(other.table_key.as_ref());
}

impl<T: AsRef<[u8]>> UserKey<T> {
/// Use this method to override an old `UserKey<Vec<u8>>` with a `UserKey<&[u8]>` to own the
/// table key without reallocating a new `UserKey` object.
pub fn set(&mut self, other: UserKey<&[u8]>) {
pub fn set<F>(&mut self, other: UserKey<F>)
where
T: SetSlice<F>,
F: AsRef<[u8]>,
{
self.table_id = other.table_id;
self.table_key.clear();
self.table_key.extend_from_slice(other.table_key.as_ref());
self.table_key.0.set(&other.table_key.0);
}
}

impl UserKey<Vec<u8>> {
pub fn into_bytes(self) -> UserKey<Bytes> {
UserKey {
table_id: self.table_id,
Expand Down Expand Up @@ -811,10 +835,14 @@ impl<T: AsRef<[u8]>> FullKey<T> {
}
}

impl FullKey<Vec<u8>> {
impl<T: AsRef<[u8]>> FullKey<T> {
/// Use this method to override an old `FullKey<Vec<u8>>` with a `FullKey<&[u8]>` to own the
/// table key without reallocating a new `FullKey` object.
pub fn set(&mut self, other: FullKey<&[u8]>) {
pub fn set<F>(&mut self, other: FullKey<F>)
where
T: SetSlice<F>,
F: AsRef<[u8]>,
{
self.user_key.set(other.user_key);
self.epoch_with_gap = other.epoch_with_gap;
}
Expand All @@ -835,15 +863,6 @@ impl<T: AsRef<[u8]> + Ord + Eq> PartialOrd for FullKey<T> {
}
}

impl<'a, T> From<UserKey<&'a [u8]>> for UserKey<T>
where
T: AsRef<[u8]> + CopyFromSlice,
{
fn from(value: UserKey<&'a [u8]>) -> Self {
value.copy_into()
}
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct PointRange<T: AsRef<[u8]>> {
// When comparing `PointRange`, we first compare `left_user_key`, then
Expand Down Expand Up @@ -977,20 +996,20 @@ impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_D
/// // a.observe(full_key_with_smaller_user_key);
///
/// let full_key2 = FullKey::new(table_id, TableKey(Bytes::from("c")), 3 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key2.clone()), None);
/// assert_eq!(a.observe(full_key2.clone()), false);
/// assert_eq!(a.latest_user_key(), &full_key2.user_key);
///
/// let full_key3 = FullKey::new(table_id, TableKey(Bytes::from("f")), 4 << EPOCH_AVAILABLE_BITS);
/// assert_eq!(a.observe(full_key3.clone()), Some(full_key1.user_key));
/// assert_eq!(a.observe(full_key3.clone()), true);
/// assert_eq!(a.latest_user_key(), &full_key3.user_key);
/// ```
///
/// Return:
/// - If the provided `key` contains a new user key, return the latest full key observed for the previous user key.
/// - Otherwise: return None
pub fn observe<F>(&mut self, key: FullKey<F>) -> Option<UserKey<T>>
/// - If the provided `key` contains a new user key, return true.
/// - Otherwise: return false
pub fn observe<F>(&mut self, key: FullKey<F>) -> bool
where
UserKey<F>: Into<UserKey<T>>,
T: SetSlice<F>,
F: AsRef<[u8]>,
{
self.observe_multi_version(key.user_key, once(key.epoch_with_gap))
Expand All @@ -1001,9 +1020,9 @@ impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_D
&mut self,
user_key: UserKey<F>,
mut epochs: impl Iterator<Item = EpochWithGap>,
) -> Option<UserKey<T>>
) -> bool
where
UserKey<F>: Into<UserKey<T>>,
T: SetSlice<F>,
F: AsRef<[u8]>,
{
let max_epoch_with_gap = epochs.next().expect("non-empty");
Expand Down Expand Up @@ -1033,16 +1052,11 @@ impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_D
self.last_observed_epoch_with_gap = min_epoch_with_gap;

// Take the previous key and set latest key
Some(
std::mem::replace(
&mut self.latest_full_key,
FullKey {
user_key: user_key.into(),
epoch_with_gap: min_epoch_with_gap,
},
)
.user_key,
)
self.latest_full_key.set(FullKey {
user_key,
epoch_with_gap: min_epoch_with_gap,
});
true
}
Ordering::Equal => {
if max_epoch_with_gap > self.last_observed_epoch_with_gap
Expand All @@ -1055,7 +1069,7 @@ impl<T: AsRef<[u8]> + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker<T, SKIP_D
);
}
self.last_observed_epoch_with_gap = min_epoch_with_gap;
None
false
}
Ordering::Greater => {
// User key should be monotonically increasing
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/benches/bench_hummock_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{pin_mut, TryStreamExt};
use futures::pin_mut;
use risingwave_common::cache::CachePriority;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::key::TableKey;
Expand Down
Loading

0 comments on commit df496cb

Please sign in to comment.