diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 676c0b013163e..2eb54362b413c 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -14,11 +14,10 @@ use core::ops::Bound::Unbounded; -use futures::StreamExt; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; use risingwave_storage::hummock::CachePolicy; -use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreRead}; +use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreIter, StateStoreRead}; use crate::common::HummockServiceOpts; use crate::CtlContext; @@ -36,22 +35,20 @@ pub async fn list_kv( tracing::info!("using MAX EPOCH as epoch"); } let range = (Unbounded, Unbounded); - let mut scan_result = Box::pin( - hummock - .iter( - range, - epoch, - ReadOptions { - table_id: TableId { table_id }, - prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), - cache_policy: CachePolicy::NotFill, - ..Default::default() - }, - ) - .await?, - ); - while let Some(item) = scan_result.next().await { - let (k, v) = item?; + let mut scan_result = hummock + .iter( + range, + epoch, + ReadOptions { + table_id: TableId { table_id }, + prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(), + cache_policy: CachePolicy::NotFill, + ..Default::default() + }, + ) + .await?; + while let Some(item) = scan_result.try_next().await? { + let (k, v) = item; let print_string = format!("[t{}]", k.user_key.table_id.table_id()); println!("{} {:?} => {:?}", print_string, k, v) } diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index ee2084b6ecf81..69845ff0f459e 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use bytes::Bytes; -use futures::TryStreamExt; +use futures::{Stream, TryStreamExt}; use risingwave_common::catalog::ColumnDesc; use risingwave_common::config::{MetricLevel, ObjectStoreConfig}; use risingwave_common::hash::VirtualNode; @@ -37,20 +37,31 @@ use risingwave_storage::hummock::{ }; use risingwave_storage::monitor::{global_hummock_state_store_metrics, HummockStateStoreMetrics}; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; -use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter}; +use risingwave_storage::store::{ReadOptions, StateStoreIterExt}; +use risingwave_storage::table::KeyedRow; use rw_futures_util::select_all; use tokio::sync::mpsc::unbounded_channel; -type SelectAllIterStream = impl StateStoreReadIterStream + Unpin; +type SelectAllIterStream = impl Stream>> + Unpin; +type SingleIterStream = impl Stream>>; -fn select_all_vnode_stream( - streams: Vec>, -) -> SelectAllIterStream { +fn select_all_vnode_stream(streams: Vec) -> SelectAllIterStream { select_all(streams.into_iter().map(Box::pin)) } -pub struct HummockJavaBindingIterator { +fn to_deserialized_stream( + iter: HummockStorageIterator, row_serde: EitherSerde, +) -> SingleIterStream { + iter.into_stream(move |(key, value)| { + Ok(KeyedRow::new( + key.user_key.table_key.copy_into(), + row_serde.deserialize(value).map(OwnedRow::new)?, + )) + }) +} + +pub struct HummockJavaBindingIterator { stream: SelectAllIterStream, } @@ -87,6 +98,28 @@ impl HummockJavaBindingIterator { 0, ); + let table = read_plan.table_catalog.unwrap(); + let versioned = table.version.is_some(); + let table_columns = table + .columns + .into_iter() + .map(|c| ColumnDesc::from(c.column_desc.unwrap())); + + // Decide which serializer to use based on whether the table is versioned or not. + let row_serde: EitherSerde = if versioned { + ColumnAwareSerde::new( + Arc::from_iter(0..table_columns.len()), + Arc::from_iter(table_columns), + ) + .into() + } else { + BasicSerde::new( + Arc::from_iter(0..table_columns.len()), + Arc::from_iter(table_columns), + ) + .into() + }; + let mut streams = Vec::with_capacity(read_plan.vnode_ids.len()); let key_range = read_plan.key_range.unwrap(); let pin_version = PinnedVersion::new( @@ -104,7 +137,7 @@ impl HummockJavaBindingIterator { key_range, read_plan.epoch, ); - let stream = reader + let iter = reader .iter( key_range, read_plan.epoch, @@ -116,45 +149,16 @@ impl HummockJavaBindingIterator { read_version_tuple, ) .await?; - streams.push(stream); + streams.push(to_deserialized_stream(iter, row_serde.clone())); } let stream = select_all_vnode_stream(streams); - let table = read_plan.table_catalog.unwrap(); - let versioned = table.version.is_some(); - let table_columns = table - .columns - .into_iter() - .map(|c| ColumnDesc::from(c.column_desc.unwrap())); - - // Decide which serializer to use based on whether the table is versioned or not. - let row_serde = if versioned { - ColumnAwareSerde::new( - Arc::from_iter(0..table_columns.len()), - Arc::from_iter(table_columns), - ) - .into() - } else { - BasicSerde::new( - Arc::from_iter(0..table_columns.len()), - Arc::from_iter(table_columns), - ) - .into() - }; - - Ok(Self { row_serde, stream }) + Ok(Self { stream }) } - pub async fn next(&mut self) -> StorageResult> { - let item = self.stream.try_next().await?; - Ok(match item { - Some((key, value)) => Some(( - key.user_key.table_key.0, - OwnedRow::new(self.row_serde.deserialize(&value)?), - )), - None => None, - }) + pub async fn next(&mut self) -> StorageResult>> { + self.stream.try_next().await } } diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 3b877261dfe7b..ac5192700fae5 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -360,10 +360,11 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNext<'a>( iter.cursor = None; Ok(JNI_FALSE) } - Some((key, row)) => { + Some(keyed_row) => { + let (key, row) = keyed_row.into_parts(); iter.cursor = Some(RowCursor { row, - extra: RowExtra::Key(key), + extra: RowExtra::Key(key.0), }); Ok(JNI_TRUE) } diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index c4a4761c58cb7..a12783a19b415 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -402,6 +402,23 @@ pub fn prefixed_range_with_vnode>( map_table_key_range((start, end)) } +pub trait SetSlice + ?Sized> { + fn set(&mut self, value: &S); +} + +impl + ?Sized> SetSlice for Vec { + fn set(&mut self, value: &S) { + self.clear(); + self.extend_from_slice(value.as_ref()); + } +} + +impl SetSlice for Bytes { + fn set(&mut self, value: &Bytes) { + *self = value.clone() + } +} + pub trait CopyFromSlice { fn copy_from_slice(slice: &[u8]) -> Self; } @@ -484,6 +501,12 @@ impl EstimateSize for TableKey { } } +impl<'a> TableKey<&'a [u8]> { + pub fn copy_into>(&self) -> TableKey { + TableKey(T::copy_from_slice(self.as_ref())) + } +} + #[inline] pub fn map_table_key_range(range: (Bound, Bound)) -> TableKeyRange { (range.0.map(TableKey), range.1.map(TableKey)) @@ -624,21 +647,22 @@ impl UserKey> { buf.advance(len); UserKey::new(TableId::new(table_id), TableKey(data)) } +} - pub fn extend_from_other(&mut self, other: &UserKey<&[u8]>) { - self.table_id = other.table_id; - self.table_key.0.clear(); - self.table_key.0.extend_from_slice(other.table_key.as_ref()); - } - +impl> UserKey { /// Use this method to override an old `UserKey>` with a `UserKey<&[u8]>` to own the /// table key without reallocating a new `UserKey` object. - pub fn set(&mut self, other: UserKey<&[u8]>) { + pub fn set(&mut self, other: UserKey) + where + T: SetSlice, + F: AsRef<[u8]>, + { self.table_id = other.table_id; - self.table_key.clear(); - self.table_key.extend_from_slice(other.table_key.as_ref()); + self.table_key.0.set(&other.table_key.0); } +} +impl UserKey> { pub fn into_bytes(self) -> UserKey { UserKey { table_id: self.table_id, @@ -811,10 +835,14 @@ impl> FullKey { } } -impl FullKey> { +impl> FullKey { /// Use this method to override an old `FullKey>` with a `FullKey<&[u8]>` to own the /// table key without reallocating a new `FullKey` object. - pub fn set(&mut self, other: FullKey<&[u8]>) { + pub fn set(&mut self, other: FullKey) + where + T: SetSlice, + F: AsRef<[u8]>, + { self.user_key.set(other.user_key); self.epoch_with_gap = other.epoch_with_gap; } @@ -835,15 +863,6 @@ impl + Ord + Eq> PartialOrd for FullKey { } } -impl<'a, T> From> for UserKey -where - T: AsRef<[u8]> + CopyFromSlice, -{ - fn from(value: UserKey<&'a [u8]>) -> Self { - value.copy_into() - } -} - #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct PointRange> { // When comparing `PointRange`, we first compare `left_user_key`, then @@ -977,20 +996,20 @@ impl + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker(&mut self, key: FullKey) -> Option> + /// - If the provided `key` contains a new user key, return true. + /// - Otherwise: return false + pub fn observe(&mut self, key: FullKey) -> bool where - UserKey: Into>, + T: SetSlice, F: AsRef<[u8]>, { self.observe_multi_version(key.user_key, once(key.epoch_with_gap)) @@ -1001,9 +1020,9 @@ impl + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker, mut epochs: impl Iterator, - ) -> Option> + ) -> bool where - UserKey: Into>, + T: SetSlice, F: AsRef<[u8]>, { let max_epoch_with_gap = epochs.next().expect("non-empty"); @@ -1033,16 +1052,11 @@ impl + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker { if max_epoch_with_gap > self.last_observed_epoch_with_gap @@ -1055,7 +1069,7 @@ impl + Ord + Eq, const SKIP_DEDUP: bool> FullKeyTracker { // User key should be monotonically increasing diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index 7b1cd9d260b85..c0d597ac800d9 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use bytes::Bytes; use criterion::{criterion_group, criterion_main, Criterion}; -use futures::{pin_mut, TryStreamExt}; +use futures::pin_mut; use risingwave_common::cache::CachePriority; use risingwave_common::util::epoch::test_epoch; use risingwave_hummock_sdk::key::TableKey; diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index c3dedd6dbae46..43899fa7e256c 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -33,28 +33,28 @@ use risingwave_pb::meta::{SubscribeResponse, SubscribeType}; use risingwave_storage::hummock::store::LocalHummockStorage; use risingwave_storage::hummock::HummockStorage; use risingwave_storage::store::{ - LocalStateStore, StateStoreIterItemStream, StateStoreRead, SyncResult, + to_owned_item, LocalStateStore, StateStoreIterExt, StateStoreRead, SyncResult, }; -use risingwave_storage::{StateStore, StateStoreReadIterStream}; +use risingwave_storage::{StateStore, StateStoreIter, StateStoreReadIter}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; pub(crate) struct GlobalReplayIter where - S: StateStoreReadIterStream, + S: StateStoreReadIter, { inner: S, } impl GlobalReplayIter where - S: StateStoreReadIterStream, + S: StateStoreReadIter, { pub(crate) fn new(inner: S) -> Self { Self { inner } } pub(crate) fn into_stream(self) -> impl Stream> { - self.inner.map(|item_res| { + self.inner.into_stream(to_owned_item).map(|item_res| { item_res .map(|(key, value)| (key.user_key.table_key.0.into(), value.into())) .map_err(|_| TraceError::IterFailed("iter failed to retrieve item".to_string())) @@ -67,8 +67,9 @@ pub(crate) struct LocalReplayIter { } impl LocalReplayIter { - pub(crate) async fn new(stream: impl StateStoreIterItemStream) -> Self { - let inner = stream + pub(crate) async fn new(iter: impl StateStoreIter) -> Self { + let inner = iter + .into_stream(to_owned_item) .map_ok(|value| (value.0.user_key.table_key.0.into(), value.1.into())) .try_collect::>() .await @@ -115,7 +116,6 @@ impl ReplayRead for GlobalReplayImpl { .iter(key_range, epoch, read_options.into()) .await .unwrap(); - let iter = iter.boxed(); let stream = GlobalReplayIter::new(iter).into_stream().boxed(); Ok(stream) } @@ -241,7 +241,6 @@ impl LocalReplayRead for LocalReplayImpl { .await .unwrap(); - let iter = iter.boxed(); let stream = LocalReplayIter::new(iter).await.into_stream().boxed(); Ok(stream) } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 489d513db888e..9afe31fc59d34 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -1457,7 +1457,8 @@ pub(crate) mod tests { fast_iter.key().user_key.table_id.table_id, ); assert_eq!(normal_iter.value(), fast_iter.value()); - let key_ref = fast_iter.key().user_key.as_ref(); + let key = fast_iter.key(); + let key_ref = key.user_key.as_ref(); assert!(normal_tables.iter().any(|table| { table.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash) })); diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index 9413079bd15ef..6d11d9c6bf83a 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -258,7 +258,8 @@ async fn test_storage_basic() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); assert_eq!( Some(( @@ -335,7 +336,8 @@ async fn test_storage_basic() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); assert_eq!( Some(( @@ -388,7 +390,8 @@ async fn test_storage_basic() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); assert_eq!( Some(( @@ -627,7 +630,8 @@ async fn test_state_store_sync() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); let kv_map_batch_1 = [ @@ -680,7 +684,8 @@ async fn test_state_store_sync() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); @@ -1039,7 +1044,8 @@ async fn test_iter_with_min_epoch() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); @@ -1064,7 +1070,8 @@ async fn test_iter_with_min_epoch() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(20, result.len()); @@ -1088,7 +1095,8 @@ async fn test_iter_with_min_epoch() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); @@ -1131,7 +1139,8 @@ async fn test_iter_with_min_epoch() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); @@ -1156,7 +1165,8 @@ async fn test_iter_with_min_epoch() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); @@ -1182,7 +1192,8 @@ async fn test_iter_with_min_epoch() { }, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); futures::pin_mut!(iter); @@ -1294,7 +1305,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(10, result.len()); @@ -1328,7 +1340,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(20, result.len()); @@ -1363,7 +1376,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(10, result.len()); @@ -1422,7 +1436,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(10, result.len()); @@ -1465,7 +1480,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(20, result.len()); @@ -1500,7 +1516,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(10, result.len()); @@ -1534,7 +1551,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(30, result.len()); @@ -1571,7 +1589,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(8, result.len()); @@ -1602,7 +1621,8 @@ async fn test_hummock_version_reader() { read_snapshot, ) .await - .unwrap(); + .unwrap() + .into_stream(to_owned_item); let result: Vec<_> = iter.try_collect().await.unwrap(); assert_eq!(18, result.len()); @@ -1948,6 +1968,7 @@ async fn test_table_watermark() { ) .await .unwrap() + .into_stream(to_owned_item) .map_ok(|(full_key, value)| (full_key.user_key, value)) .try_collect::>() .await @@ -2017,6 +2038,7 @@ async fn test_table_watermark() { ) .await .unwrap() + .into_stream(to_owned_item) .map_ok(|(full_key, value)| (full_key.user_key, value)) .try_collect::>() .await @@ -2048,6 +2070,7 @@ async fn test_table_watermark() { ) .await .unwrap() + .into_stream(to_owned_item) .try_collect::>() .await .unwrap(); @@ -2116,6 +2139,7 @@ async fn test_table_watermark() { ) .await .unwrap() + .into_stream(to_owned_item) .map_ok(|(full_key, value)| (full_key.user_key, value)) .try_collect::>() .await @@ -2147,6 +2171,7 @@ async fn test_table_watermark() { ) .await .unwrap() + .into_stream(to_owned_item) .try_collect::>() .await .unwrap(); @@ -2218,6 +2243,7 @@ async fn test_table_watermark() { ) .await .unwrap() + .into_stream(to_owned_item) .map_ok(|(full_key, value)| (full_key.user_key, value)) .try_collect::>() .await @@ -2250,6 +2276,7 @@ async fn test_table_watermark() { ) .await .unwrap() + .into_stream(to_owned_item) .try_collect::>() .await .unwrap(); diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index ab2d24b5a2c83..7dda089d897a8 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -15,7 +15,6 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; -use futures::TryStreamExt; use risingwave_common::cache::CachePriority; use risingwave_common::hash::VirtualNode; use risingwave_common::util::epoch::{test_epoch, EpochExt}; @@ -38,6 +37,8 @@ use crate::test_utils::{ macro_rules! assert_count_range_scan { ($storage:expr, $vnode:expr, $range:expr, $expect_count:expr, $epoch:expr) => {{ use std::ops::RangeBounds; + + use risingwave_storage::StateStoreIter; let range = $range; let bounds: (Bound, Bound) = ( range.start_bound().map(|x: &Bytes| x.clone()), @@ -45,7 +46,7 @@ macro_rules! assert_count_range_scan { ); let vnode = $vnode; let table_key_range = prefixed_range_with_vnode(bounds, vnode); - let it = $storage + let mut it = $storage .iter( table_key_range, $epoch, @@ -57,7 +58,6 @@ macro_rules! assert_count_range_scan { ) .await .unwrap(); - futures::pin_mut!(it); let mut count = 0; loop { match it.try_next().await.unwrap() { diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index b69541eb777ee..5b97dde202029 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use bytes::Bytes; use expect_test::expect; -use futures::{pin_mut, StreamExt, TryStreamExt}; +use futures::{pin_mut, StreamExt}; use risingwave_common::buffer::Bitmap; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; @@ -1442,6 +1442,7 @@ async fn test_replicated_local_hummock_storage() { ) .await .unwrap() + .into_stream(to_owned_item) .collect::>() .await; @@ -1509,6 +1510,7 @@ async fn test_replicated_local_hummock_storage() { ) .await .unwrap() + .into_stream(to_owned_item) .collect::>() .await; @@ -1544,6 +1546,7 @@ async fn test_replicated_local_hummock_storage() { ) .await .unwrap() + .into_stream(to_owned_item) .collect::>() .await; diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 9566d57cc691e..38c3cea98136b 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -778,7 +778,7 @@ where let mut iter_key = iter.key(); compaction_statistics.iter_total_key_counts += 1; - let mut is_new_user_key = full_key_tracker.observe(iter.key()).is_some(); + let mut is_new_user_key = full_key_tracker.observe(iter.key()); let mut drop = false; // CRITICAL WARN: Because of memtable spill, there may be several versions of the same user-key share the same `pure_epoch`. Do not change this code unless necessary. diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 2c1e5cb9b156f..cdc88ae68c439 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -336,16 +336,13 @@ pub async fn merge_imms_in_memory( table_id, table_key: key_entry.key.clone(), }; - if full_key_tracker - .observe_multi_version( - user_key, - key_entry - .new_values - .iter() - .map(|(epoch_with_gap, _)| *epoch_with_gap), - ) - .is_some() - { + if full_key_tracker.observe_multi_version( + user_key, + key_entry + .new_values + .iter() + .map(|(epoch_with_gap, _)| *epoch_with_gap), + ) { let last_entry = merged_entries.last_mut().expect("non-empty"); if last_entry.value_offset == values.len() { warn!(key = ?last_entry.key, "key has no value in imm compact. skipped"); @@ -423,7 +420,7 @@ fn generate_splits( if existing_table_ids.len() > 1 { if parallelism > 1 && compact_data_size > sstable_size { let mut last_buffer_size = 0; - let mut last_user_key = UserKey::default(); + let mut last_user_key: UserKey> = UserKey::default(); for (data_size, user_key) in size_and_start_user_keys { if last_buffer_size >= sub_compaction_data_size && last_user_key.as_ref() != user_key diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index 079ed59c5a8da..c3f94695d72c7 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -14,7 +14,7 @@ use std::ops::Bound::*; -use bytes::Bytes; +use risingwave_common::must_match; use risingwave_common::util::epoch::MAX_SPILL_TIMES; use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; @@ -32,10 +32,7 @@ pub struct UserIterator> { iterator: I, // Track the last seen full key - full_key_tracker: FullKeyTracker, - - /// Last user value - latest_val: Bytes, + full_key_tracker: FullKeyTracker, true>, /// Start and end bounds of user key. key_range: UserKeyRange, @@ -71,7 +68,6 @@ impl> UserIterator { Self { iterator, key_range, - latest_val: Bytes::new(), read_epoch, min_epoch, stats: StoreLocalStatistic::default(), @@ -119,17 +115,17 @@ impl> UserIterator { /// `rewind` or `seek` methods are called. /// /// Note: before call the function you need to ensure that the iterator is valid. - pub fn key(&self) -> &FullKey { + pub fn key(&self) -> FullKey<&[u8]> { assert!(self.is_valid()); - &self.full_key_tracker.latest_full_key + self.full_key_tracker.latest_full_key.to_ref() } /// The returned value is in the form of user value. /// /// Note: before call the function you need to ensure that the iterator is valid. - pub fn value(&self) -> &Bytes { + pub fn value(&self) -> &[u8] { assert!(self.is_valid()); - &self.latest_val + must_match!(self.iterator.value(), HummockValue::Put(val) => val) } /// Resets the iterating position to the beginning. @@ -245,7 +241,7 @@ impl> UserIterator { } // Skip older version entry for the same user key - if self.full_key_tracker.observe(full_key).is_none() { + if !self.full_key_tracker.observe(full_key) { self.stats.skip_multi_version_key_count += 1; self.iterator.next().await?; continue; @@ -265,12 +261,11 @@ impl> UserIterator { // Handle delete operation match self.iterator.value() { - HummockValue::Put(val) => { + HummockValue::Put(_val) => { self.delete_range_iter.next_until(full_key.user_key).await?; if self.delete_range_iter.current_epoch() >= epoch { self.stats.skip_delete_key_count += 1; } else { - self.latest_val = Bytes::copy_from_slice(val); self.stats.processed_key_count += 1; self.is_current_pos_valid = true; return Ok(()); @@ -325,6 +320,7 @@ mod tests { use std::ops::Bound::*; use std::sync::Arc; + use bytes::Bytes; use risingwave_common::util::epoch::test_epoch; use super::*; @@ -385,7 +381,7 @@ mod tests { while ui.is_valid() { let key = ui.key(); let val = ui.value(); - assert_eq!(key, &iterator_test_bytes_key_of(i)); + assert_eq!(key, iterator_test_bytes_key_of(i).to_ref()); assert_eq!(val, iterator_test_value_of(i).as_slice()); i += 1; ui.next().await.unwrap(); @@ -447,7 +443,7 @@ mod tests { let k = ui.key(); let v = ui.value(); assert_eq!(v, iterator_test_value_of(TEST_KEYS_COUNT + 5).as_slice()); - assert_eq!(k, &iterator_test_bytes_key_of(TEST_KEYS_COUNT + 5)); + assert_eq!(k, iterator_test_bytes_key_of(TEST_KEYS_COUNT + 5).to_ref()); ui.seek(iterator_test_bytes_user_key_of(2 * TEST_KEYS_COUNT + 5).as_ref()) .await .unwrap(); @@ -457,7 +453,10 @@ mod tests { v, iterator_test_value_of(2 * TEST_KEYS_COUNT + 5).as_slice() ); - assert_eq!(k, &iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5)); + assert_eq!( + k, + iterator_test_bytes_key_of(2 * TEST_KEYS_COUNT + 5).to_ref() + ); // left edge case ui.seek(iterator_test_bytes_user_key_of(0).as_ref()) @@ -466,7 +465,7 @@ mod tests { let k = ui.key(); let v = ui.value(); assert_eq!(v, iterator_test_value_of(0).as_slice()); - assert_eq!(k, &iterator_test_bytes_key_of(0)); + assert_eq!(k, iterator_test_bytes_key_of(0).to_ref()); } #[tokio::test] @@ -501,7 +500,7 @@ mod tests { // verify let k = ui.key(); let v = ui.value(); - assert_eq!(k, &iterator_test_bytes_key_of_epoch(2, 400)); + assert_eq!(k, iterator_test_bytes_key_of_epoch(2, 400).to_ref()); assert_eq!(v, &Bytes::from(iterator_test_value_of(2))); // only one valid kv pair @@ -559,11 +558,11 @@ mod tests { // ----- basic iterate ----- ui.rewind().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -571,11 +570,11 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(1).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -583,11 +582,11 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(2).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -637,11 +636,11 @@ mod tests { // ----- basic iterate ----- ui.rewind().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -649,11 +648,11 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(1).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -661,11 +660,11 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(2).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -698,13 +697,13 @@ mod tests { // ----- basic iterate ----- ui.rewind().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(1, 200)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -712,13 +711,13 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(0).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(1, 200)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(1, 200).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -726,11 +725,11 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(2).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -762,13 +761,13 @@ mod tests { // ----- basic iterate ----- ui.rewind().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(8, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -776,13 +775,13 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(1).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(8, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -790,13 +789,13 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(2).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(2, 300)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(2, 300).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(3, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(3, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(6, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(6, 100).to_ref()); ui.next().await.unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(8, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -804,7 +803,7 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(8).as_ref()) .await .unwrap(); - assert_eq!(ui.key(), &iterator_test_bytes_key_of_epoch(8, 100)); + assert_eq!(ui.key(), iterator_test_bytes_key_of_epoch(8, 100).to_ref()); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -886,9 +885,15 @@ mod tests { // ----- basic iterate ----- ui.rewind().await.unwrap(); assert!(ui.is_valid()); - assert_eq!(ui.key().user_key, iterator_test_bytes_user_key_of(0)); + assert_eq!( + ui.key().user_key, + iterator_test_bytes_user_key_of(0).as_ref() + ); ui.next().await.unwrap(); - assert_eq!(ui.key().user_key, iterator_test_bytes_user_key_of(8)); + assert_eq!( + ui.key().user_key, + iterator_test_bytes_user_key_of(8).as_ref() + ); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -896,7 +901,10 @@ mod tests { ui.seek(iterator_test_bytes_user_key_of(1).as_ref()) .await .unwrap(); - assert_eq!(ui.key().user_key, iterator_test_bytes_user_key_of(8)); + assert_eq!( + ui.key().user_key, + iterator_test_bytes_user_key_of(8).as_ref() + ); ui.next().await.unwrap(); assert!(!ui.is_valid()); @@ -919,9 +927,15 @@ mod tests { ); ui.rewind().await.unwrap(); assert!(ui.is_valid()); - assert_eq!(ui.key().user_key, iterator_test_bytes_user_key_of(2)); + assert_eq!( + ui.key().user_key, + iterator_test_bytes_user_key_of(2).as_ref() + ); ui.next().await.unwrap(); - assert_eq!(ui.key().user_key, iterator_test_bytes_user_key_of(8)); + assert_eq!( + ui.key().user_key, + iterator_test_bytes_user_key_of(8).as_ref() + ); ui.next().await.unwrap(); assert!(!ui.is_valid()); } diff --git a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs index 94d1bad371a68..5857c5d2f8bd2 100644 --- a/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs +++ b/src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs @@ -23,7 +23,6 @@ use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, LazyLock}; use bytes::Bytes; -use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, TableKeyRange, UserKey}; @@ -674,6 +673,7 @@ impl SharedBufferDeleteRangeIterator { table_id: TableId, delete_ranges: Vec<(Bound, Bound)>, ) -> Self { + use itertools::Itertools; let point_range_pairs = delete_ranges .into_iter() .map(|(left_bound, right_bound)| { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 88c123ca5d5bc..46c6ba993e9be 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -260,7 +260,7 @@ impl HummockStorage { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult { let (key_range, read_version_tuple) = if read_options.read_version_from_backup { self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range) .await? @@ -442,7 +442,7 @@ impl HummockStorage { } impl StateStoreRead for HummockStorage { - type IterStream = StreamTypeOfIter; + type Iter = HummockStorageIterator; fn get( &self, @@ -458,7 +458,7 @@ impl StateStoreRead for HummockStorage { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + '_ { + ) -> impl Future> + '_ { let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range); assert_eq!( r_vnode_exclusive - l_vnode_inclusive, diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 604a242c66754..38a653cfda375 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -135,7 +135,7 @@ impl LocalHummockStorage { table_key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult { let (table_key_range, read_snapshot) = read_filter_for_version( epoch, read_options.table_id, @@ -163,7 +163,7 @@ impl LocalHummockStorage { table_key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> StorageResult>> { + ) -> StorageResult> { let (table_key_range, read_snapshot) = read_filter_for_version( epoch, read_options.table_id, @@ -205,7 +205,7 @@ impl LocalHummockStorage { } impl StateStoreRead for LocalHummockStorage { - type IterStream = StreamTypeOfIter; + type Iter = HummockStorageIterator; fn get( &self, @@ -222,7 +222,7 @@ impl StateStoreRead for LocalHummockStorage { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + '_ { + ) -> impl Future> + '_ { assert!(epoch <= self.epoch()); self.iter_flushed(key_range, epoch, read_options) .instrument(tracing::trace_span!("hummock_iter")) @@ -230,7 +230,7 @@ impl StateStoreRead for LocalHummockStorage { } impl LocalStateStore for LocalHummockStorage { - type IterStream<'a> = StreamTypeOfIter>; + type Iter<'a> = LocalHummockStorageIterator<'a>; fn may_exist( &self, @@ -258,7 +258,7 @@ impl LocalStateStore for LocalHummockStorage { &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range); assert_eq!( r_vnode_exclusive - l_vnode_inclusive, @@ -593,19 +593,21 @@ pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>; pub struct HummockStorageIteratorInner<'a> { inner: UserIterator>, + initial_read: bool, stats_guard: IterLocalMetricsGuard, } impl<'a> StateStoreIter for HummockStorageIteratorInner<'a> { - type Item = StateStoreIterItem; - - async fn next(&mut self) -> StorageResult> { + async fn try_next<'b>(&'b mut self) -> StorageResult>> { let iter = &mut self.inner; + if !self.initial_read { + self.initial_read = true; + } else { + iter.next().await?; + } if iter.is_valid() { - let kv = (iter.key().clone(), iter.value().clone()); - iter.next().await?; - Ok(Some(kv)) + Ok(Some((iter.key(), iter.value()))) } else { Ok(None) } @@ -621,6 +623,7 @@ impl<'a> HummockStorageIteratorInner<'a> { ) -> Self { Self { inner, + initial_read: false, stats_guard: IterLocalMetricsGuard::new(metrics, table_id, local_stats), } } diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 80e0e94130d10..c32892613bbb9 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -60,7 +60,7 @@ use crate::mem_table::{ImmId, ImmutableMemtable, MemTableHummockIterator}; use crate::monitor::{ GetLocalMetricsGuard, HummockStateStoreMetrics, MayExistLocalMetricsGuard, StoreLocalStatistic, }; -use crate::store::{gen_min_epoch, ReadOptions, StateStoreIterExt, StreamTypeOfIter}; +use crate::store::{gen_min_epoch, ReadOptions}; pub type CommittedVersion = PinnedVersion; @@ -739,7 +739,7 @@ impl HummockVersionReader { epoch: u64, read_options: ReadOptions, read_version_tuple: ReadVersionTuple, - ) -> StorageResult> { + ) -> StorageResult { self.iter_inner( table_key_range, epoch, @@ -757,7 +757,7 @@ impl HummockVersionReader { read_options: ReadOptions, read_version_tuple: (Vec, Vec, CommittedVersion), memtable_iter: MemTableHummockIterator<'a>, - ) -> StorageResult>> { + ) -> StorageResult> { self.iter_inner( table_key_range, epoch, @@ -775,7 +775,7 @@ impl HummockVersionReader { read_options: ReadOptions, read_version_tuple: ReadVersionTuple, mem_table: Option>, - ) -> StorageResult>> { + ) -> StorageResult> { let (imms, uncommitted_ssts, committed) = read_version_tuple; let mut local_stats = StoreLocalStatistic::default(); @@ -1009,8 +1009,7 @@ impl HummockVersionReader { self.state_store_metrics.clone(), read_options.table_id, local_stats, - ) - .into_stream()) + )) } // Note: this method will not check the kv tomestones and delete range tomestones diff --git a/src/storage/src/hummock/test_utils.rs b/src/storage/src/hummock/test_utils.rs index 3bef02f7d7298..2cd7104cf74ef 100644 --- a/src/storage/src/hummock/test_utils.rs +++ b/src/storage/src/hummock/test_utils.rs @@ -17,7 +17,6 @@ use std::ops::Bound; use std::sync::Arc; use bytes::Bytes; -use futures::{Stream, TryStreamExt}; use itertools::Itertools; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; @@ -32,7 +31,6 @@ use super::{ HummockResult, InMemWriter, MonotonicDeleteEvent, SstableMeta, SstableWriterOptions, DEFAULT_RESTART_INTERVAL, }; -use crate::error::StorageResult; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FullKeyFilterKeyExtractor}; use crate::hummock::iterator::ForwardMergeRangeIterator; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; @@ -45,6 +43,7 @@ use crate::hummock::{ use crate::monitor::StoreLocalStatistic; use crate::opts::StorageOpts; use crate::storage_value::StorageValue; +use crate::StateStoreIter; pub fn default_opts_for_test() -> StorageOpts { StorageOpts { @@ -376,10 +375,9 @@ pub async fn gen_default_test_sstable( .await } -pub async fn count_stream(s: impl Stream> + Send) -> usize { - futures::pin_mut!(s); +pub async fn count_stream(mut i: impl StateStoreIter + Send) -> usize { let mut c: usize = 0; - while s.try_next().await.unwrap().is_some() { + while i.try_next().await.unwrap().is_some() { c += 1 } c diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 8e6efa63e3545..505eec276fbf4 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -62,10 +62,5 @@ pub mod mem_table; #[cfg(feature = "failpoints")] mod storage_failpoints; -pub use store::{StateStore, StateStoreIter, StateStoreReadIterStream}; +pub use store::{StateStore, StateStoreIter, StateStoreReadIter}; pub use store_impl::StateStoreImpl; - -pub enum TableScanOptions { - SequentialScan, - SparseIndexScan, -} diff --git a/src/storage/src/mem_table.rs b/src/storage/src/mem_table.rs index b03f24b901580..99f02758623e2 100644 --- a/src/storage/src/mem_table.rs +++ b/src/storage/src/mem_table.rs @@ -21,7 +21,7 @@ use std::ops::RangeBounds; use std::sync::Arc; use bytes::Bytes; -use futures::{pin_mut, StreamExt}; +use futures::{pin_mut, Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::buffer::Bitmap; @@ -350,7 +350,7 @@ impl KeyOp { #[try_stream(ok = StateStoreIterItem, error = StorageError)] pub(crate) async fn merge_stream<'a>( mem_table_iter: impl Iterator, &'a KeyOp)> + 'a, - inner_stream: impl StateStoreReadIterStream, + inner_stream: impl Stream> + 'static, table_id: TableId, epoch: u64, ) { @@ -458,7 +458,7 @@ impl MemtableLocalStateStore { } impl LocalStateStore for MemtableLocalStateStore { - type IterStream<'a> = impl StateStoreIterItemStream + 'a; + type Iter<'a> = impl StateStoreIter + 'a; #[allow(clippy::unused_async)] async fn may_exist( @@ -488,18 +488,18 @@ impl LocalStateStore for MemtableLocalState &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { + ) -> impl Future>> + Send + '_ { async move { - let stream = self + let iter = self .inner .iter(key_range.clone(), self.epoch(), read_options) .await?; - Ok(merge_stream( + Ok(FromStreamStateStoreIter::new(Box::pin(merge_stream( self.mem_table.iter(key_range), - stream, + iter.into_stream(to_owned_item), self.table_id, self.epoch(), - )) + )))) } } diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 566c63fba4066..096020ff569f6 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -537,7 +537,7 @@ impl RangeKvStateStore { } impl StateStoreRead for RangeKvStateStore { - type IterStream = StreamTypeOfIter>; + type Iter = RangeKvStateStoreIter; #[allow(clippy::unused_async)] async fn get( @@ -563,15 +563,14 @@ impl StateStoreRead for RangeKvStateStore { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> StorageResult { + ) -> StorageResult { Ok(RangeKvStateStoreIter::new( batched_iter::Iter::new( self.inner.clone(), to_full_key_range(read_options.table_id, key_range), ), epoch, - ) - .into_stream()) + )) } } @@ -659,8 +658,7 @@ pub struct RangeKvStateStoreIter { last_key: Option>, - /// For supporting semantic of `Fuse` - stopped: bool, + item_buffer: Option, } impl RangeKvStateStoreIter { @@ -669,29 +667,21 @@ impl RangeKvStateStoreIter { inner, epoch, last_key: None, - stopped: false, + item_buffer: None, } } } impl StateStoreIter for RangeKvStateStoreIter { - type Item = StateStoreIterItem; - #[allow(clippy::unused_async)] - async fn next(&mut self) -> StorageResult> { - if self.stopped { - Ok(None) - } else { - let ret = self.next_inner(); - match &ret { - Err(_) | Ok(None) => { - self.stopped = true; - } - _ => {} - } - - ret - } + async fn try_next(&mut self) -> StorageResult>> { + let ret = self.next_inner(); + let item = ret?; + self.item_buffer = item; + Ok(self + .item_buffer + .as_ref() + .map(|(key, value)| (key.to_ref(), value.as_ref()))) } } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 239f2ce9df7a0..ad8d831aa84ee 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -16,20 +16,19 @@ use std::sync::Arc; use await_tree::InstrumentAwait; use bytes::Bytes; -use futures::{Future, TryFutureExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::{Future, TryFutureExt}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use thiserror_ext::AsReport; use tokio::time::Instant; -use tracing::error; +use tracing::{error, Instrument}; #[cfg(all(not(madsim), feature = "hm-trace"))] use super::traced_store::TracedStateStore; use super::{MonitoredStateStoreGetStats, MonitoredStateStoreIterStats, MonitoredStorageMetrics}; -use crate::error::{StorageError, StorageResult}; +use crate::error::StorageResult; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{HummockStorage, SstableObjectIdManagerRef}; use crate::store::*; @@ -76,22 +75,20 @@ impl MonitoredStateStore { } /// A util function to break the type connection between two opaque return types defined by `impl`. -pub(crate) fn identity(input: impl StateStoreIterItemStream) -> impl StateStoreIterItemStream { +pub(crate) fn identity(input: impl StateStoreIter) -> impl StateStoreIter { input } -pub type MonitoredStateStoreIterStream = impl StateStoreIterItemStream; - -// Note: it is important to define the `MonitoredStateStoreIterStream` type alias, as it marks that +// Note: it is important to define the `MonitoredStateStoreIter` type alias, as it marks that // the return type of `monitored_iter` only captures the lifetime `'s` and has nothing to do with -// `'a`. If we simply use `impl StateStoreIterItemStream + 's`, the rust compiler will also capture +// `'a`. If we simply use `impl StateStoreIter + 's`, the rust compiler will also capture // the lifetime `'a` in the scope defined in the scope. impl MonitoredStateStore { - async fn monitored_iter<'a, St: StateStoreIterItemStream + 'a>( + async fn monitored_iter<'a, St: StateStoreIter + 'a>( &'a self, table_id: TableId, iter_stream_future: impl Future> + 'a, - ) -> StorageResult> { + ) -> StorageResult> { // start time takes iterator build time into account // wait for iterator creation (e.g. seek) let start_time = Instant::now(); @@ -109,7 +106,7 @@ impl MonitoredStateStore { self.storage_metrics.clone(), ), }; - Ok(monitored.into_stream()) + Ok(monitored) } pub fn inner(&self) -> &S { @@ -127,8 +124,6 @@ impl MonitoredStateStore { table_id: TableId, key_len: usize, ) -> StorageResult> { - use tracing::Instrument; - let mut stats = MonitoredStateStoreGetStats::new(table_id.table_id, self.storage_metrics.clone()); @@ -149,7 +144,7 @@ impl MonitoredStateStore { } impl StateStoreRead for MonitoredStateStore { - type IterStream = impl StateStoreReadIterStream; + type Iter = impl StateStoreReadIter; fn get( &self, @@ -167,7 +162,7 @@ impl StateStoreRead for MonitoredStateStore { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + '_ { + ) -> impl Future> + '_ { self.monitored_iter( read_options.table_id, self.inner.iter(key_range, epoch, read_options), @@ -177,7 +172,7 @@ impl StateStoreRead for MonitoredStateStore { } impl LocalStateStore for MonitoredStateStore { - type IterStream<'a> = impl StateStoreIterItemStream + 'a; + type Iter<'a> = impl StateStoreIter + 'a; async fn may_exist( &self, @@ -214,7 +209,7 @@ impl LocalStateStore for MonitoredStateStore { &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { + ) -> impl Future>> + Send + '_ { let table_id = read_options.table_id; // TODO: may collect the metrics as local self.monitored_iter(table_id, self.inner.iter(key_range, read_options)) @@ -348,26 +343,20 @@ pub struct MonitoredStateStoreIter { stats: MonitoredStateStoreIterStats, } -impl MonitoredStateStoreIter { - #[try_stream(ok = StateStoreIterItem, error = StorageError)] - async fn into_stream_inner(self) { - let inner = self.inner; - - let mut stats = self.stats; - futures::pin_mut!(inner); - while let Some((key, value)) = inner +impl StateStoreIter for MonitoredStateStoreIter { + async fn try_next(&mut self) -> StorageResult>> { + if let Some((key, value)) = self + .inner .try_next() + .instrument(tracing::trace_span!("store_iter_try_next")) .await .inspect_err(|e| error!(error = %e.as_report(), "Failed in next"))? { - stats.total_items += 1; - stats.total_size += key.encoded_len() + value.len(); - yield (key, value); + self.stats.total_items += 1; + self.stats.total_size += key.encoded_len() + value.len(); + Ok(Some((key, value))) + } else { + Ok(None) } - drop(stats); - } - - fn into_stream(self) -> MonitoredStateStoreIterStream { - Self::into_stream_inner(self) } } diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index de55143dd7d73..8fe9a6705cc09 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -14,8 +14,7 @@ use std::sync::Arc; use bytes::Bytes; -use futures::{Future, TryFutureExt, TryStreamExt}; -use futures_async_stream::try_stream; +use futures::{Future, TryFutureExt}; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -26,7 +25,7 @@ use risingwave_hummock_trace::{ use thiserror_ext::AsReport; use super::identity; -use crate::error::{StorageError, StorageResult}; +use crate::error::StorageResult; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::{HummockStorage, SstableObjectIdManagerRef}; use crate::store::*; @@ -67,11 +66,11 @@ impl TracedStateStore { } } - async fn traced_iter<'a, St: StateStoreIterItemStream>( + async fn traced_iter<'a, St: StateStoreIter>( &'a self, iter_stream_future: impl Future> + 'a, span: MayTraceSpan, - ) -> StorageResult> { + ) -> StorageResult> { let res = iter_stream_future.await; if res.is_ok() { span.may_send_result(OperationResult::Iter(TraceResult::Ok(()))); @@ -79,7 +78,7 @@ impl TracedStateStore { span.may_send_result(OperationResult::Iter(TraceResult::Err)); } let traced = TracedStateStoreIter::new(res?, span); - Ok(traced.into_stream()) + Ok(traced) } async fn traced_get( @@ -106,10 +105,8 @@ impl TracedStateStore { } } -type TracedStateStoreIterStream = impl StateStoreIterItemStream; - impl LocalStateStore for TracedStateStore { - type IterStream<'a> = impl StateStoreIterItemStream + 'a; + type Iter<'a> = impl StateStoreIter + 'a; fn may_exist( &self, @@ -136,7 +133,7 @@ impl LocalStateStore for TracedStateStore { &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { + ) -> impl Future>> + Send + '_ { let (l, r) = key_range.clone(); let bytes_key_range = (l.map(|l| l.0), r.map(|r| r.0)); let span = TraceSpan::new_iter_span( @@ -277,7 +274,7 @@ impl StateStore for TracedStateStore { } impl StateStoreRead for TracedStateStore { - type IterStream = impl StateStoreReadIterStream; + type Iter = impl StateStoreReadIter; fn get( &self, @@ -298,7 +295,7 @@ impl StateStoreRead for TracedStateStore { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + '_ { + ) -> impl Future> + '_ { let (l, r) = key_range.clone(); let bytes_key_range = (l.map(|l| l.0), r.map(|r| r.0)); let span = TraceSpan::new_iter_span( @@ -347,13 +344,10 @@ impl TracedStateStoreIter { } } -impl TracedStateStoreIter { - #[try_stream(ok = StateStoreIterItem, error = StorageError)] - async fn into_stream_inner(self) { - let inner = self.inner; - futures::pin_mut!(inner); - - while let Some((key, value)) = inner +impl StateStoreIter for TracedStateStoreIter { + async fn try_next(&mut self) -> StorageResult>> { + if let Some((key, value)) = self + .inner .try_next() .await .inspect_err(|e| tracing::error!(error = %e.as_report(), "Failed in next"))? @@ -362,15 +356,13 @@ impl TracedStateStoreIter { self.span .may_send_result(OperationResult::IterNext(TraceResult::Ok(Some(( TracedBytes::from(key.user_key.table_key.to_vec()), - TracedBytes::from(value.clone()), + TracedBytes::from(Bytes::copy_from_slice(value)), ))))); - yield (key, value); + Ok(Some((key, value))) + } else { + Ok(None) } } - - fn into_stream(self) -> TracedStateStoreIterStream { - Self::into_stream_inner(self) - } } pub fn get_concurrent_id() -> ConcurrentId { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 7e5985bb6aefe..6002acd9e1057 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -13,12 +13,9 @@ // limitations under the License. use std::ops::Bound; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use bytes::Bytes; -use futures::Stream; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -33,7 +30,7 @@ use crate::store::*; pub struct PanicStateStore; impl StateStoreRead for PanicStateStore { - type IterStream = PanicStateStoreStream; + type Iter = PanicStateStoreStream; #[allow(clippy::unused_async)] async fn get( @@ -51,7 +48,7 @@ impl StateStoreRead for PanicStateStore { _key_range: TableKeyRange, _epoch: u64, _read_options: ReadOptions, - ) -> StorageResult { + ) -> StorageResult { panic!("should not read from the state store!"); } } @@ -68,7 +65,7 @@ impl StateStoreWrite for PanicStateStore { } impl LocalStateStore for PanicStateStore { - type IterStream<'a> = PanicStateStoreStream; + type Iter<'a> = PanicStateStoreStream; #[allow(clippy::unused_async)] async fn may_exist( @@ -93,7 +90,7 @@ impl LocalStateStore for PanicStateStore { &self, _key_range: TableKeyRange, _read_options: ReadOptions, - ) -> StorageResult> { + ) -> StorageResult> { panic!("should not operate on the panic state store!"); } @@ -174,12 +171,10 @@ impl StateStore for PanicStateStore { } } -pub struct PanicStateStoreStream {} +pub struct PanicStateStoreStream; -impl Stream for PanicStateStoreStream { - type Item = StorageResult; - - fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { +impl StateStoreIter for PanicStateStoreStream { + async fn try_next(&mut self) -> StorageResult>> { panic!("should not call next on panic state store stream") } } diff --git a/src/storage/src/storage_failpoints/test_iterator.rs b/src/storage/src/storage_failpoints/test_iterator.rs index 7b1aa31c808cd..463c20ed469de 100644 --- a/src/storage/src/storage_failpoints/test_iterator.rs +++ b/src/storage/src/storage_failpoints/test_iterator.rs @@ -288,7 +288,7 @@ async fn test_failpoints_user_read_err() { while ui.is_valid() { let key = ui.key(); let val = ui.value(); - assert_eq!(key, &iterator_test_bytes_key_of(i)); + assert_eq!(key, iterator_test_bytes_key_of(i).to_ref()); assert_eq!(val, iterator_test_value_of(i).as_slice()); i += 1; let result = ui.next().await; diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 96838e1ef25d1..2a70002c42af8 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -12,15 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp::min; use std::collections::HashMap; use std::default::Default; use std::fmt::{Debug, Formatter}; use std::future::Future; +use std::marker::PhantomData; use std::ops::Bound; use std::sync::{Arc, LazyLock}; use bytes::Bytes; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures::{Stream, TryStreamExt}; use futures_async_stream::try_stream; use prost::Message; use risingwave_common::buffer::Bitmap; @@ -44,40 +46,126 @@ use crate::storage_value::StorageValue; pub trait StaticSendSync = Send + Sync + 'static; -pub trait StateStoreIter: Send + Sync { - type Item: Send; +pub trait IterItem: Send + 'static { + type ItemRef<'a>: Send + 'a; +} + +impl IterItem for StateStoreIterItem { + type ItemRef<'a> = StateStoreIterItemRef<'a>; +} + +pub trait StateStoreIter: Send { + fn try_next( + &mut self, + ) -> impl Future>>> + Send + '_; +} - fn next(&mut self) -> impl Future>> + Send + '_; +pub fn to_owned_item((key, value): StateStoreIterItemRef<'_>) -> StorageResult { + Ok((key.copy_into(), Bytes::copy_from_slice(value))) } -pub trait StateStoreIterExt: StateStoreIter { - type ItemStream: Stream::Item>> + Send; +pub trait StateStoreIterExt: StateStoreIter + Sized { + type ItemStream: Stream> + Send; - fn into_stream(self) -> Self::ItemStream; + fn into_stream Fn(T::ItemRef<'a>) -> StorageResult + Send>( + self, + f: F, + ) -> Self::ItemStream; + + fn fused(self) -> FusedStateStoreIter { + FusedStateStoreIter::new(self) + } } -#[try_stream(ok = I::Item, error = StorageError)] -async fn into_stream_inner(mut iter: I) { - while let Some(item) = iter.next().await? { - yield item; +#[try_stream(ok = O, error = StorageError)] +async fn into_stream_inner< + T: IterItem, + I: StateStoreIter, + O: Send, + F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult + Send, +>( + iter: I, + f: F, +) { + let mut iter = iter.fused(); + while let Some(item) = iter.try_next().await? { + yield f(item)?; } } -pub type StreamTypeOfIter = ::ItemStream; -impl StateStoreIterExt for I { - type ItemStream = impl Stream::Item>>; +pub struct FromStreamStateStoreIter { + inner: S, + item_buffer: Option, +} + +impl FromStreamStateStoreIter { + pub fn new(inner: S) -> Self { + Self { + inner, + item_buffer: None, + } + } +} - fn into_stream(self) -> Self::ItemStream { - into_stream_inner(self) +impl> + Unpin + Send> StateStoreIter + for FromStreamStateStoreIter +{ + async fn try_next(&mut self) -> StorageResult>> { + self.item_buffer = self.inner.try_next().await?; + Ok(self + .item_buffer + .as_ref() + .map(|(key, value)| (key.to_ref(), value.as_ref()))) } } +pub struct FusedStateStoreIter { + inner: I, + finished: bool, + _phantom: PhantomData, +} + +impl FusedStateStoreIter { + fn new(inner: I) -> Self { + Self { + inner, + finished: false, + _phantom: PhantomData, + } + } +} + +impl> FusedStateStoreIter { + async fn try_next(&mut self) -> StorageResult>> { + assert!(!self.finished, "call try_next after finish"); + let result = self.inner.try_next().await; + match &result { + Ok(Some(_)) => {} + Ok(None) | Err(_) => { + self.finished = true; + } + } + result + } +} + +impl> StateStoreIterExt for I { + type ItemStream = impl Stream> + Send; + + fn into_stream Fn(T::ItemRef<'a>) -> StorageResult + Send>( + self, + f: F, + ) -> Self::ItemStream { + into_stream_inner(self, f) + } +} + +pub type StateStoreIterItemRef<'a> = (FullKey<&'a [u8]>, &'a [u8]); pub type StateStoreIterItem = (FullKey, Bytes); -pub trait StateStoreIterItemStream = Stream> + Send; -pub trait StateStoreReadIterStream = StateStoreIterItemStream + 'static; +pub trait StateStoreReadIter = StateStoreIter + 'static; pub trait StateStoreRead: StaticSendSync { - type IterStream: StateStoreReadIterStream; + type Iter: StateStoreReadIter; /// Point gets a value from the state store. /// The result is based on a snapshot corresponding to the given `epoch`. @@ -98,7 +186,7 @@ pub trait StateStoreRead: StaticSendSync { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + Send + '_; + ) -> impl Future> + Send + '_; } pub trait StateStoreReadExt: StaticSendSync { @@ -129,12 +217,14 @@ impl StateStoreReadExt for S { if limit.is_some() { read_options.prefetch_options.prefetch = false; } + const MAX_INITIAL_CAP: usize = 1024; let limit = limit.unwrap_or(usize::MAX); - self.iter(key_range, epoch, read_options) - .await? - .take(limit) - .try_collect() - .await + let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP)); + let mut iter = self.iter(key_range, epoch, read_options).await?; + while let Some((key, value)) = iter.try_next().await? { + ret.push((key.copy_into(), Bytes::copy_from_slice(value))) + } + Ok(ret) } } @@ -205,7 +295,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { /// written by itself. Each local state store is not `Clone`, and is owned by a streaming state /// table. pub trait LocalStateStore: StaticSendSync { - type IterStream<'a>: StateStoreIterItemStream + 'a; + type Iter<'a>: StateStoreIter + 'a; /// Point gets a value from the state store. /// The result is based on the latest written snapshot. @@ -224,7 +314,7 @@ pub trait LocalStateStore: StaticSendSync { &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_; + ) -> impl Future>> + Send + '_; /// Inserts a key-value entry associated with a given `epoch` into the state store. fn insert( diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 50fe81d53ed54..5d57a8c4ba955 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -206,14 +206,12 @@ pub mod verify { use std::sync::Arc; use bytes::Bytes; - use futures::{pin_mut, TryStreamExt}; - use futures_async_stream::try_stream; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use tracing::log::warn; - use crate::error::{StorageError, StorageResult}; + use crate::error::StorageResult; use crate::hummock::HummockStorage; use crate::storage_value::StorageValue; use crate::store::*; @@ -251,7 +249,7 @@ pub mod verify { } impl StateStoreRead for VerifyStateStore { - type IterStream = impl StateStoreReadIterStream; + type Iter = impl StateStoreReadIter; async fn get( &self, @@ -278,7 +276,7 @@ pub mod verify { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + '_ { + ) -> impl Future> + '_ { async move { let actual = self .actual @@ -290,34 +288,29 @@ pub mod verify { None }; - Ok(verify_stream(actual, expected)) + Ok(verify_iter(actual, expected)) } } } - #[try_stream(ok = StateStoreIterItem, error = StorageError)] - async fn verify_stream( - actual: impl StateStoreIterItemStream, - expected: Option, - ) { - pin_mut!(actual); - pin_mut!(expected); - let mut expected = expected.as_pin_mut(); - - loop { - let actual = actual.try_next().await?; - if let Some(expected) = expected.as_mut() { + impl StateStoreIter for VerifyStateStore { + async fn try_next(&mut self) -> StorageResult>> { + let actual = self.actual.try_next().await?; + if let Some(expected) = self.expected.as_mut() { let expected = expected.try_next().await?; assert_eq!(actual, expected); } - if let Some(actual) = actual { - yield actual; - } else { - break; - } + Ok(actual) } } + fn verify_iter( + actual: impl StateStoreIter, + expected: Option, + ) -> impl StateStoreIter { + VerifyStateStore { actual, expected } + } + impl StateStoreWrite for VerifyStateStore { fn ingest_batch( &self, @@ -348,7 +341,7 @@ pub mod verify { } impl LocalStateStore for VerifyStateStore { - type IterStream<'a> = impl StateStoreIterItemStream + 'a; + type Iter<'a> = impl StateStoreIter + 'a; // We don't verify `may_exist` across different state stores because // the return value of `may_exist` is implementation specific and may not @@ -379,7 +372,7 @@ pub mod verify { &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { + ) -> impl Future>> + Send + '_ { async move { let actual = self .actual @@ -391,7 +384,7 @@ pub mod verify { None }; - Ok(verify_stream(actual, expected)) + Ok(verify_iter(actual, expected)) } } @@ -697,8 +690,6 @@ pub mod boxed_state_store { use bytes::Bytes; use dyn_clone::{clone_trait_object, DynClone}; - use futures::stream::BoxStream; - use futures::StreamExt; use risingwave_common::buffer::Bitmap; use risingwave_hummock_sdk::key::{TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; @@ -709,9 +700,31 @@ pub mod boxed_state_store { use crate::store_impl::AsHummock; use crate::StateStore; + #[async_trait::async_trait] + pub trait DynamicDispatchedStateStoreIter: Send { + async fn try_next(&mut self) -> StorageResult>>; + } + + #[async_trait::async_trait] + impl DynamicDispatchedStateStoreIter for I { + async fn try_next(&mut self) -> StorageResult>> { + self.try_next().await + } + } + + pub type BoxStateStoreIter<'a> = Box; + impl<'a> StateStoreIter for BoxStateStoreIter<'a> { + fn try_next( + &mut self, + ) -> impl Future>>> + Send + '_ + { + self.deref_mut().try_next() + } + } + // For StateStoreRead - pub type BoxStateStoreReadIterStream = BoxStream<'static, StorageResult>; + pub type BoxStateStoreReadIter = BoxStateStoreIter<'static>; #[async_trait::async_trait] pub trait DynamicDispatchedStateStoreRead: StaticSendSync { @@ -727,7 +740,7 @@ pub mod boxed_state_store { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> StorageResult; + ) -> StorageResult; } #[async_trait::async_trait] @@ -746,13 +759,13 @@ pub mod boxed_state_store { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> StorageResult { - Ok(self.iter(key_range, epoch, read_options).await?.boxed()) + ) -> StorageResult { + Ok(Box::new(self.iter(key_range, epoch, read_options).await?)) } } // For LocalStateStore - pub type BoxLocalStateStoreIterStream<'a> = BoxStream<'a, StorageResult>; + pub type BoxLocalStateStoreIterStream<'a> = BoxStateStoreIter<'a>; #[async_trait::async_trait] pub trait DynamicDispatchedLocalStateStore: StaticSendSync { async fn may_exist( @@ -820,7 +833,7 @@ pub mod boxed_state_store { key_range: TableKeyRange, read_options: ReadOptions, ) -> StorageResult> { - Ok(self.iter(key_range, read_options).await?.boxed()) + Ok(Box::new(self.iter(key_range, read_options).await?)) } fn insert( @@ -868,7 +881,7 @@ pub mod boxed_state_store { pub type BoxDynamicDispatchedLocalStateStore = Box; impl LocalStateStore for BoxDynamicDispatchedLocalStateStore { - type IterStream<'a> = BoxLocalStateStoreIterStream<'a>; + type Iter<'a> = BoxLocalStateStoreIterStream<'a>; fn may_exist( &self, @@ -890,7 +903,7 @@ pub mod boxed_state_store { &self, key_range: TableKeyRange, read_options: ReadOptions, - ) -> impl Future>> + Send + '_ { + ) -> impl Future>> + Send + '_ { self.deref().iter(key_range, read_options) } @@ -986,7 +999,7 @@ pub mod boxed_state_store { pub type BoxDynamicDispatchedStateStore = Box; impl StateStoreRead for BoxDynamicDispatchedStateStore { - type IterStream = BoxStateStoreReadIterStream; + type Iter = BoxStateStoreReadIter; fn get( &self, @@ -1002,7 +1015,7 @@ pub mod boxed_state_store { key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - ) -> impl Future> + '_ { + ) -> impl Future> + '_ { self.deref().iter(key_range, epoch, read_options) } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 031dcbbf1c42b..310d0a842463e 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -45,7 +45,7 @@ use crate::hummock::CachePolicy; use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; -use crate::store::{PrefetchOptions, ReadOptions}; +use crate::store::{PrefetchOptions, ReadOptions, StateStoreIter}; use crate::table::merge_sort::merge_sort; use crate::table::{KeyedRow, TableDistribution, TableIter}; use crate::StateStore; @@ -665,7 +665,7 @@ impl StorageTableInner { /// [`StorageTableInnerIterInner`] iterates on the storage table. struct StorageTableInnerIterInner { /// An iterator that returns raw bytes from storage. - iter: S::IterStream, + iter: S::Iter, mapping: Arc, @@ -725,18 +725,15 @@ impl StorageTableInnerIterInner { /// Yield a row with its primary key. #[try_stream(ok = KeyedRow, error = StorageError)] - async fn into_stream(self) { - use futures::TryStreamExt; - - // No need for table id and epoch. - let iter = self.iter.map_ok(|(k, v)| (k.user_key.table_key, v)); - futures::pin_mut!(iter); - while let Some((table_key, value)) = iter + async fn into_stream(mut self) { + while let Some((k, v)) = self + .iter .try_next() .verbose_instrument_await("storage_table_iter_next") .await? { - let full_row = self.row_deserializer.deserialize(&value)?; + let (table_key, value) = (k.user_key.table_key, v); + let full_row = self.row_deserializer.deserialize(value)?; let result_row_in_value = self .mapping .project(OwnedRow::new(full_row)) @@ -774,14 +771,15 @@ impl StorageTableInnerIterInner { } let row = OwnedRow::new(result_row_vec); + // TODO: may optimize the key clone yield KeyedRow { - vnode_prefixed_key: table_key, + vnode_prefixed_key: table_key.copy_into(), row, } } None => { yield KeyedRow { - vnode_prefixed_key: table_key, + vnode_prefixed_key: table_key.copy_into(), row: result_row_in_value, } } diff --git a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs index 579d403e98979..756df5fff48ce 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/reader.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/reader.rs @@ -19,7 +19,6 @@ use std::time::Duration; use anyhow::anyhow; use futures::future::{try_join_all, BoxFuture}; -use futures::stream::select_all; use futures::{FutureExt, TryFutureExt}; use risingwave_common::array::StreamChunk; use risingwave_common::buffer::Bitmap; @@ -113,7 +112,7 @@ pub struct KvLogStoreReader { first_write_epoch: Option, /// `Some` means consuming historical log data - state_store_stream: Option>>>, + state_store_stream: Option>>>, /// Store the future that attempts to read a flushed stream chunk. /// This is for cancellation safety. Since it is possible that the future of `next_item` @@ -184,7 +183,7 @@ impl KvLogStoreReader { fn read_persisted_log_store( &self, last_persisted_epoch: Option, - ) -> impl Future>>>> + Send + ) -> impl Future>>>> + Send { let range_start = if let Some(last_persisted_epoch) = last_persisted_epoch { // start from the next epoch of last_persisted_epoch @@ -341,7 +340,7 @@ impl LogReader for KvLogStoreReader { let table_id = self.table_id; let read_metrics = self.metrics.flushed_buffer_read_metrics.clone(); async move { - let streams = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { + let iters = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| { let range_start = serde.serialize_log_store_pk(vnode, item_epoch, Some(start_seq_id)); let range_end = @@ -351,7 +350,7 @@ impl LogReader for KvLogStoreReader { // Use MAX EPOCH here because the epoch to consume may be below the safe // epoch async move { - Ok::<_, anyhow::Error>(Box::pin( + Ok::<_, anyhow::Error>( state_store .iter( (Included(range_start), Included(range_end)), @@ -365,15 +364,14 @@ impl LogReader for KvLogStoreReader { }, ) .await?, - )) + ) } })) .await?; - let combined_stream = select_all(streams); let chunk = serde .deserialize_stream_chunk( - combined_stream, + iters, start_seq_id, end_seq_id, item_epoch, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs index 38bb51c79b75c..67167f466a50b 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/serde.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/serde.rs @@ -25,12 +25,12 @@ use itertools::Itertools; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::ColumnDesc; -use risingwave_common::estimate_size::EstimateSize; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::row_serde::OrderedRowSerde; +use risingwave_common::util::value_encoding; use risingwave_common::util::value_encoding::{ BasicSerde, ValueRowDeserializer, ValueRowSerializer, }; @@ -38,11 +38,12 @@ use risingwave_connector::sink::log_store::LogStoreResult; use risingwave_hummock_sdk::key::{next_key, TableKey}; use risingwave_hummock_sdk::HummockEpoch; use risingwave_pb::catalog::Table; -use risingwave_storage::error::StorageError; +use risingwave_storage::error::StorageResult; use risingwave_storage::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew; -use risingwave_storage::store::StateStoreReadIterStream; +use risingwave_storage::store::{StateStoreIterExt, StateStoreReadIter}; use risingwave_storage::table::{compute_vnode, TableDistribution, SINGLETON_VNODE}; +use rw_futures_util::select_all; use crate::common::log_store_impl::kv_log_store::{ KvLogStorePkInfo, KvLogStoreReadMetrics, ReaderTruncationOffsetType, RowOpCodeType, SeqIdType, @@ -305,8 +306,8 @@ impl LogStoreRowSerde { } impl LogStoreRowSerde { - fn deserialize(&self, value_bytes: Bytes) -> LogStoreResult<(u64, LogStoreRowOp)> { - let row_data = self.row_serde.deserialize(&value_bytes)?; + fn deserialize(&self, value_bytes: &[u8]) -> value_encoding::Result<(u64, LogStoreRowOp)> { + let row_data = self.row_serde.deserialize(value_bytes)?; let payload_row = OwnedRow::new(row_data[self.pk_info.predefined_column_len()..].to_vec()); let epoch = Self::decode_epoch( @@ -354,24 +355,31 @@ impl LogStoreRowSerde { Ok((epoch, op)) } - pub(crate) async fn deserialize_stream_chunk( + pub(crate) async fn deserialize_stream_chunk( &self, - stream: impl StateStoreReadIterStream, + iters: impl IntoIterator, start_seq_id: SeqIdType, end_seq_id: SeqIdType, expected_epoch: u64, metrics: &KvLogStoreReadMetrics, ) -> LogStoreResult { - pin_mut!(stream); let size_bound = (end_seq_id - start_seq_id + 1) as usize; let mut data_chunk_builder = DataChunkBuilder::new(self.payload_schema.clone(), size_bound + 1); let mut ops = Vec::with_capacity(size_bound); let mut read_info = ReadInfo::new(); - while let Some((key, value)) = stream.try_next().await? { - read_info - .read_one_row(key.user_key.table_key.estimated_size() + value.estimated_size()); - match self.deserialize(value)? { + let stream = select_all(iters.into_iter().map(|iter| { + iter.into_stream(move |(key, value)| { + let row_size = key.user_key.table_key.len() + value.len(); + let output = self.deserialize(value)?; + Ok((row_size, output)) + }) + .boxed() + })); + pin_mut!(stream); + while let Some((row_size, output)) = stream.try_next().await? { + read_info.read_one_row(row_size); + match output { (epoch, LogStoreRowOp::Row { op, row }) => { if epoch != expected_epoch { return Err(anyhow!( @@ -435,7 +443,7 @@ pub(crate) enum KvLogStoreItem { type BoxPeekableLogStoreItemStream = Pin>>>; -struct LogStoreRowOpStream { +struct LogStoreRowOpStream { serde: LogStoreRowSerde, /// Streams that have not reached a barrier @@ -451,16 +459,16 @@ struct LogStoreRowOpStream { metrics: KvLogStoreReadMetrics, } -impl LogStoreRowOpStream { +impl LogStoreRowOpStream { pub(crate) fn new( - streams: Vec, + iters: Vec, serde: LogStoreRowSerde, metrics: KvLogStoreReadMetrics, ) -> Self { - assert!(!streams.is_empty()); + assert!(!iters.is_empty()); Self { serde: serde.clone(), - barrier_streams: streams + barrier_streams: iters .into_iter() .map(|s| Box::pin(deserialize_stream(s, serde.clone()).peekable())) .collect(), @@ -538,37 +546,35 @@ impl LogStoreRowOpStream { pub(crate) type LogStoreItemMergeStream = impl Stream>; -pub(crate) fn merge_log_store_item_stream( - streams: Vec, +pub(crate) fn merge_log_store_item_stream( + iters: Vec, serde: LogStoreRowSerde, chunk_size: usize, metrics: KvLogStoreReadMetrics, ) -> LogStoreItemMergeStream { - LogStoreRowOpStream::new(streams, serde, metrics).into_log_store_item_stream(chunk_size) + LogStoreRowOpStream::new(iters, serde, metrics).into_log_store_item_stream(chunk_size) } -type LogStoreItemStream = +type LogStoreItemStream = impl Stream> + Send; -fn deserialize_stream( - stream: S, +fn deserialize_stream( + iter: S, serde: LogStoreRowSerde, ) -> LogStoreItemStream { - stream.map( - move |result: Result<_, StorageError>| -> LogStoreResult<(u64, LogStoreRowOp, usize)> { - match result { - Ok((key, value)) => { - let read_size = - key.user_key.table_key.estimated_size() + value.estimated_size(); - let (epoch, op) = serde.deserialize(value)?; - Ok((epoch, op, read_size)) - } - Err(e) => Err(e.into()), - } + iter.into_stream( + move |(key, value)| -> StorageResult<(u64, LogStoreRowOp, usize)> { + let read_size = key.user_key.table_key.len() + value.len(); + let (epoch, op) = serde.deserialize(value)?; + Ok((epoch, op, read_size)) }, ) + .map_err(Into::into) + .boxed() + // The `boxed` call was unnecessary in usual build. But when doing cargo doc, + // rustc will panic in auto_trait.rs. May remove it when using future version of tool chain. } -impl LogStoreRowOpStream { +impl LogStoreRowOpStream { // Return Ok(false) means all streams have reach the end. async fn init(&mut self) -> LogStoreResult { match &self.stream_state { @@ -753,12 +759,13 @@ impl LogStoreRowOpStream { mod tests { use std::cmp::min; use std::future::poll_fn; + use std::iter::once; use std::sync::Arc; use std::task::Poll; use bytes::Bytes; use futures::stream::empty; - use futures::{pin_mut, stream, StreamExt, TryStreamExt}; + use futures::{pin_mut, stream, Stream, StreamExt, TryStreamExt}; use itertools::Itertools; use rand::prelude::SliceRandom; use rand::thread_rng; @@ -770,7 +777,10 @@ mod tests { use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::{test_epoch, EpochExt}; use risingwave_hummock_sdk::key::FullKey; - use risingwave_storage::store::StateStoreReadIterStream; + use risingwave_storage::error::StorageResult; + use risingwave_storage::store::{ + FromStreamStateStoreIter, StateStoreIterItem, StateStoreReadIter, + }; use risingwave_storage::table::DEFAULT_VNODE; use tokio::sync::oneshot; use tokio::sync::oneshot::Sender; @@ -834,7 +844,7 @@ mod tests { let key = remove_vnode_prefix(&key.0); assert!(key < delete_range_right1); serialized_keys.push(key); - let (decoded_epoch, row_op) = serde.deserialize(value).unwrap(); + let (decoded_epoch, row_op) = serde.deserialize(&value).unwrap(); assert_eq!(decoded_epoch, epoch); match row_op { LogStoreRowOp::Row { @@ -851,7 +861,7 @@ mod tests { let (key, encoded_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, false); let key = remove_vnode_prefix(&key.0); - match serde.deserialize(encoded_barrier).unwrap() { + match serde.deserialize(&encoded_barrier).unwrap() { (decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => { assert!(!is_checkpoint); assert_eq!(decoded_epoch, epoch); @@ -872,7 +882,7 @@ mod tests { assert!(key >= delete_range_right1); assert!(key < delete_range_right2); serialized_keys.push(key); - let (decoded_epoch, row_op) = serde.deserialize(value).unwrap(); + let (decoded_epoch, row_op) = serde.deserialize(&value).unwrap(); assert_eq!(decoded_epoch, epoch); match row_op { LogStoreRowOp::Row { @@ -889,7 +899,7 @@ mod tests { let (key, encoded_checkpoint_barrier) = serde.serialize_barrier(epoch, DEFAULT_VNODE, true); let key = remove_vnode_prefix(&key.0); - match serde.deserialize(encoded_checkpoint_barrier).unwrap() { + match serde.deserialize(&encoded_checkpoint_barrier).unwrap() { (decoded_epoch, LogStoreRowOp::Barrier { is_checkpoint }) => { assert_eq!(decoded_epoch, epoch); assert!(is_checkpoint); @@ -968,7 +978,7 @@ mod tests { tx.send(()).unwrap(); let chunk = serde .deserialize_stream_chunk( - stream, + once(FromStreamStateStoreIter::new(stream.boxed())), start_seq_id, end_seq_id, EPOCH1, @@ -988,7 +998,10 @@ mod tests { rows: Vec, epoch: u64, seq_id: &mut SeqIdType, - ) -> (impl StateStoreReadIterStream, Sender<()>) { + ) -> ( + impl Stream>, + Sender<()>, + ) { let (tx, rx) = oneshot::channel(); let row_data = ops .into_iter() @@ -1014,7 +1027,7 @@ mod tests { seq_id: &mut SeqIdType, base: i64, ) -> ( - impl StateStoreReadIterStream, + impl Stream>, oneshot::Sender<()>, oneshot::Sender<()>, Vec, @@ -1052,7 +1065,7 @@ mod tests { serde: LogStoreRowSerde, size: usize, ) -> ( - LogStoreRowOpStream, + LogStoreRowOpStream, Vec>>, Vec>>, Vec>, @@ -1067,6 +1080,7 @@ mod tests { for i in 0..size { let (s, t1, t2, op_list, row_list) = gen_single_test_stream(serde.clone(), &mut seq_id, (100 * i) as _); + let s = FromStreamStateStoreIter::new(s.boxed()); streams.push(s); tx1.push(Some(t1)); tx2.push(Some(t2)); @@ -1219,6 +1233,7 @@ mod tests { let mut seq_id = 1; let (stream, tx1, tx2, ops, rows) = gen_single_test_stream(serde.clone(), &mut seq_id, 0); + let stream = FromStreamStateStoreIter::new(stream.boxed()); const CHUNK_SIZE: usize = 3; @@ -1329,7 +1344,10 @@ mod tests { const CHUNK_SIZE: usize = 3; let stream = merge_log_store_item_stream( - vec![empty(), empty()], + vec![ + FromStreamStateStoreIter::new(empty()), + FromStreamStateStoreIter::new(empty()), + ], serde, CHUNK_SIZE, KvLogStoreReadMetrics::for_test(), diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index dba45d52f1e3d..4ed6a1ebd6593 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; -use futures::{pin_mut, FutureExt, Stream, StreamExt}; +use futures::{pin_mut, FutureExt, Stream, StreamExt, TryStreamExt}; use futures_async_stream::for_await; use itertools::{izip, Itertools}; use risingwave_common::array::stream_record::Record; @@ -55,7 +55,7 @@ use risingwave_storage::row_serde::row_serde_util::{ use risingwave_storage::row_serde::value_serde::ValueRowSerde; use risingwave_storage::store::{ InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions, - ReadOptions, SealCurrentEpochOptions, StateStoreIterItemStream, + ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt, }; use risingwave_storage::table::merge_sort::merge_sort; use risingwave_storage::table::{KeyedRow, TableDistribution}; @@ -1309,7 +1309,7 @@ where table_key_range: TableKeyRange, prefix_hint: Option, prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult<::IterStream<'_>> { + ) -> StreamExecutorResult<::Iter<'_>> { let read_options = ReadOptions { prefix_hint, retention_seconds: self.table_option.retention_seconds, @@ -1394,7 +1394,7 @@ where // iterate over each vnode that the `StateTableInner` owns. vnode: VirtualNode, prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult<::IterStream<'_>> { + ) -> StreamExecutorResult<::Iter<'_>> { let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range); let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode); @@ -1459,19 +1459,17 @@ pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> = impl Stream>> + 'a; fn deserialize_keyed_row_stream<'a>( - stream: impl StateStoreIterItemStream + 'a, + iter: impl StateStoreIter + 'a, deserializer: &'a impl ValueRowSerde, ) -> impl Stream>> + 'a { - stream.map(move |result| { - result - .map_err(StreamExecutorError::from) - .and_then(|(key, value)| { - Ok(KeyedRow::new( - key.user_key.table_key, - deserializer.deserialize(&value).map(OwnedRow::new)?, - )) - }) + iter.into_stream(move |(key, value)| { + Ok(KeyedRow::new( + // TODO: may avoid clone the key when key is not needed + key.user_key.table_key.copy_into(), + deserializer.deserialize(value).map(OwnedRow::new)?, + )) }) + .map_err(Into::into) } pub fn prefix_range_to_memcomparable( diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index d1606d4e85907..948678572e3b4 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -23,7 +23,6 @@ use std::time::Duration; use anyhow::anyhow; use bytes::{BufMut, Bytes, BytesMut}; use clap::Parser; -use futures::TryStreamExt; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::TableId; use risingwave_common::config::{ @@ -44,7 +43,7 @@ use risingwave_storage::monitor::{ }; use risingwave_storage::opts::StorageOpts; use risingwave_storage::store::{ReadOptions, StateStoreRead}; -use risingwave_storage::{StateStore, StateStoreImpl}; +use risingwave_storage::{StateStore, StateStoreImpl, StateStoreIter}; const SST_ID_SHIFT_COUNT: u32 = 1000000; const CHECKPOINT_FREQ_FOR_REPLAY: u64 = 99999999; @@ -603,8 +602,7 @@ async fn poll_compaction_tasks_status( (compaction_ok, cur_version) } -type StateStoreIterType = - Pin as StateStoreRead>::IterStream>>; +type StateStoreIterType = Pin as StateStoreRead>::Iter>>; async fn open_hummock_iters( hummock: &MonitoredStateStore, @@ -661,8 +659,6 @@ pub async fn check_compaction_results( let mut expect_cnt = 0; let mut actual_cnt = 0; - futures::pin_mut!(expect_iter); - futures::pin_mut!(actual_iter); while let Some(kv_expect) = expect_iter.try_next().await? { expect_cnt += 1; let ret = actual_iter.try_next().await?; diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 7918b9289449d..d0a75c570eff4 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use bytes::Bytes; -use futures::StreamExt; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; use risingwave_common::cache::CachePriority; @@ -61,7 +60,7 @@ use risingwave_storage::opts::StorageOpts; use risingwave_storage::store::{ LocalStateStore, NewLocalOptions, PrefetchOptions, ReadOptions, SealCurrentEpochOptions, }; -use risingwave_storage::StateStore; +use risingwave_storage::{StateStore, StateStoreIter}; use crate::CompactionTestOpts; pub fn start_delete_range(opts: CompactionTestOpts) -> Pin + Send>> { @@ -473,10 +472,10 @@ impl NormalState { .await .unwrap(),); let mut ret = vec![]; - while let Some(item) = iter.next().await { - let (full_key, val) = item.unwrap(); - let tkey = full_key.user_key.table_key.0.clone(); - ret.push((tkey, val)); + while let Some(item) = iter.try_next().await.unwrap() { + let (full_key, val) = item; + let tkey = Bytes::copy_from_slice(full_key.user_key.table_key.0); + ret.push((tkey, Bytes::copy_from_slice(val))); } ret } @@ -485,29 +484,31 @@ impl NormalState { #[async_trait::async_trait] impl CheckState for NormalState { async fn delete_range(&mut self, left: &[u8], right: &[u8]) { - let mut iter = Box::pin( - self.storage - .iter( - ( - Bound::Included(Bytes::copy_from_slice(left)).map(TableKey), - Bound::Excluded(Bytes::copy_from_slice(right)).map(TableKey), - ), - ReadOptions { - ignore_range_tombstone: true, - table_id: self.table_id, - read_version_from_backup: false, - prefetch_options: PrefetchOptions::default(), - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - }, - ) - .await - .unwrap(), - ); + let mut iter = self + .storage + .iter( + ( + Bound::Included(Bytes::copy_from_slice(left)).map(TableKey), + Bound::Excluded(Bytes::copy_from_slice(right)).map(TableKey), + ), + ReadOptions { + ignore_range_tombstone: true, + table_id: self.table_id, + read_version_from_backup: false, + prefetch_options: PrefetchOptions::default(), + cache_policy: CachePolicy::Fill(CachePriority::High), + ..Default::default() + }, + ) + .await + .unwrap(); let mut delete_item = Vec::new(); - while let Some(item) = iter.next().await { - let (full_key, value) = item.unwrap(); - delete_item.push((full_key.user_key.table_key, value)); + while let Some(item) = iter.try_next().await.unwrap() { + let (full_key, value) = item; + delete_item.push(( + full_key.user_key.table_key.copy_into(), + Bytes::copy_from_slice(value), + )); } drop(iter); for (key, value) in delete_item {