diff --git a/src/ctl/src/cmd_impl/table/scan.rs b/src/ctl/src/cmd_impl/table/scan.rs index 519fe33f61029..05b6519e2fbeb 100644 --- a/src/ctl/src/cmd_impl/table/scan.rs +++ b/src/ctl/src/cmd_impl/table/scan.rs @@ -145,7 +145,7 @@ async fn do_scan(table: TableCatalog, hummock: MonitoredStateStore for Bytes { } } -pub trait CopyFromSlice { +pub trait CopyFromSlice: Send + 'static { fn copy_from_slice(slice: &[u8]) -> Self; } @@ -456,6 +456,10 @@ impl CopyFromSlice for Bytes { } } +impl CopyFromSlice for () { + fn copy_from_slice(_: &[u8]) -> Self {} +} + /// [`TableKey`] is an internal concept in storage. It's a wrapper around the key directly from the /// user, to make the code clearer and avoid confusion with encoded [`UserKey`] and [`FullKey`]. /// diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index c64498cbec9de..2e8511af1c071 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -12,18 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; use std::ops::Bound::{self, Excluded, Included, Unbounded}; use std::ops::RangeBounds; use std::sync::Arc; -use auto_enums::auto_enum; use await_tree::InstrumentAwait; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use foyer::CacheHint; use futures::future::try_join_all; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use futures_async_stream::try_stream; -use itertools::{Either, Itertools}; +use itertools::Itertools; use more_asserts::assert_gt; use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk}; use risingwave_common::bitmap::Bitmap; @@ -37,7 +37,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde; use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde}; use risingwave_hummock_sdk::key::{ - end_bound_of_prefix, next_key, prefixed_range_with_vnode, TableKeyRange, + end_bound_of_prefix, next_key, prefixed_range_with_vnode, CopyFromSlice, TableKeyRange, }; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::plan_common::StorageTableDesc; @@ -52,8 +52,8 @@ use crate::store::{ PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt, TryWaitEpochOptions, }; -use crate::table::merge_sort::merge_sort; -use crate::table::{ChangeLogRow, KeyedChangeLogRow, KeyedRow, TableDistribution, TableIter}; +use crate::table::merge_sort::NodePeek; +use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter}; use crate::StateStore; /// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with @@ -487,61 +487,178 @@ impl StorageTableInner { } } -pub trait PkAndRowStream = Stream>> + Send; - /// The row iterator of the storage table. -/// The wrapper of stream item `StorageResult>` if pk is not persisted. - -#[async_trait::async_trait] -impl TableIter for S { +/// The wrapper of stream item `StorageResult` if pk is not persisted. +impl> + Send + Unpin> TableIter for S { async fn next_row(&mut self) -> StorageResult> { - self.next() - .await - .transpose() - .map(|r| r.map(|keyed_row| keyed_row.into_owned_row())) + self.next().await.transpose() + } +} + +mod merge_vnode_stream { + + use bytes::Bytes; + use futures::{Stream, StreamExt, TryStreamExt}; + use risingwave_hummock_sdk::key::TableKey; + + use crate::error::StorageResult; + use crate::table::merge_sort::{merge_sort, NodePeek}; + use crate::table::KeyedRow; + + pub(super) enum VnodeStreamType { + Single(RowSt), + Unordered(Vec), + Ordered(Vec), + } + + pub(super) type MergedVnodeStream< + R: Send, + RowSt: Stream> + Send, + KeyedRowSt: Stream> + Send, + > + where + KeyedRow: NodePeek + Send + Sync, + = impl Stream> + Send; + + pub(super) type SortKeyType = Bytes; // TODO: may use Vec + + pub(super) fn merge_stream< + R: Send, + RowSt: Stream> + Send, + KeyedRowSt: Stream> + Send, + >( + stream: VnodeStreamType, + ) -> MergedVnodeStream + where + KeyedRow: NodePeek + Send + Sync, + { + #[auto_enums::auto_enum(futures03::Stream)] + match stream { + VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row), + VnodeStreamType::Unordered(streams) => futures::stream::iter( + streams + .into_iter() + .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))), + ) + .flatten_unordered(1024), + VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| { + Box::pin(stream.map_ok(|(key, row)| KeyedRow { + vnode_prefixed_key: TableKey(key), + row, + })) + })) + .map_ok(|keyed_row| keyed_row.row), + } } } +use merge_vnode_stream::*; + +async fn build_vnode_stream< + R: Send, + RowSt: Stream> + Send, + KeyedRowSt: Stream> + Send, + RowStFut: Future>, + KeyedRowStFut: Future>, +>( + row_stream_fn: impl Fn(VirtualNode) -> RowStFut, + keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut, + vnodes: &[VirtualNode], + ordered: bool, +) -> StorageResult> +where + KeyedRow: NodePeek + Send + Sync, +{ + let stream = match vnodes { + [] => unreachable!(), + [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?), + // Concat all iterators if not to preserve order. + vnodes if !ordered => VnodeStreamType::Unordered( + try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?, + ), + // Merge all iterators if to preserve order. + vnodes => VnodeStreamType::Ordered( + try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?, + ), + }; + Ok(merge_stream(stream)) +} + /// Iterators impl StorageTableInner { - /// Get multiple stream item `StorageResult>` based on the specified vnodes of this table with + /// Get multiple stream item `StorageResult` based on the specified vnodes of this table with /// `vnode_hint`, and merge or concat them by given `ordered`. async fn iter_with_encoded_key_range( &self, prefix_hint: Option, - encoded_key_range: (Bound, Bound), + (start_bound, end_bound): (Bound, Bound), wait_epoch: HummockReadEpoch, vnode_hint: Option, ordered: bool, prefetch_options: PrefetchOptions, - ) -> StorageResult>> + Send> { - let cache_policy = match ( - encoded_key_range.start_bound(), - encoded_key_range.end_bound(), - ) { + ) -> StorageResult> + Send + 'static> { + let vnodes = match vnode_hint { + // If `vnode_hint` is set, we can only access this single vnode. + Some(vnode) => { + assert!( + self.distribution.vnodes().is_set(vnode.to_index()), + "vnode unset: {:?}, distribution: {:?}", + vnode, + self.distribution + ); + vec![vnode] + } + // Otherwise, we need to access all vnodes of this table. + None => self.distribution.vnodes().iter_vnodes().collect_vec(), + }; + + build_vnode_stream( + |vnode| { + self.iter_vnode_with_encoded_key_range( + prefix_hint.clone(), + (start_bound.as_ref(), end_bound.as_ref()), + wait_epoch, + vnode, + prefetch_options, + ) + }, + |vnode| { + self.iter_vnode_with_encoded_key_range( + prefix_hint.clone(), + (start_bound.as_ref(), end_bound.as_ref()), + wait_epoch, + vnode, + prefetch_options, + ) + }, + &vnodes, + ordered, + ) + .await + } + + async fn iter_vnode_with_encoded_key_range( + &self, + prefix_hint: Option, + encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>), + wait_epoch: HummockReadEpoch, + vnode: VirtualNode, + prefetch_options: PrefetchOptions, + ) -> StorageResult> + Send> { + let cache_policy = match &encoded_key_range { // To prevent unbounded range scan queries from polluting the block cache, use the // low priority fill policy. (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(CacheHint::Low), _ => CachePolicy::Fill(CacheHint::Normal), }; - let table_key_ranges = { - // Vnodes that are set and should be accessed. - let vnodes = match vnode_hint { - // If `vnode_hint` is set, we can only access this single vnode. - Some(vnode) => Either::Left(std::iter::once(vnode)), - // Otherwise, we need to access all vnodes of this table. - None => Either::Right(self.distribution.vnodes().iter_vnodes()), - }; - vnodes.map(|vnode| prefixed_range_with_vnode(encoded_key_range.clone(), vnode)) - }; + let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode); - // For each key range, construct an iterator. - let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| { + { let prefix_hint = prefix_hint.clone(); let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_)); let read_committed = wait_epoch.is_read_committed(); - async move { + { let read_options = ReadOptions { prefix_hint, retention_seconds: self.table_option.retention_seconds, @@ -570,27 +687,10 @@ impl StorageTableInner { wait_epoch, ) .await? - .into_stream(); - - Ok::<_, StorageError>(iter) - } - })) - .await?; - - #[auto_enum(futures03::Stream)] - let iter = match iterators.len() { - 0 => unreachable!(), - 1 => iterators.into_iter().next().unwrap(), - // Concat all iterators if not to preserve order. - _ if !ordered => { - futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) - .flatten_unordered(1024) + .into_stream::(); + Ok(iter) } - // Merge all iterators if to preserve order. - _ => merge_sort(iterators.into_iter().map(Box::pin).collect()), - }; - - Ok(iter) + } } // TODO: directly use `prefixed_range`. @@ -651,7 +751,7 @@ impl StorageTableInner { range_bounds: impl RangeBounds, ordered: bool, prefetch_options: PrefetchOptions, - ) -> StorageResult>> + Send> { + ) -> StorageResult> + Send> { let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); assert!(pk_prefix.len() <= self.pk_indices.len()); @@ -705,7 +805,7 @@ impl StorageTableInner { // Construct a stream of (columns, row_count) from a row stream #[try_stream(ok = (Vec, usize), error = StorageError)] async fn convert_row_stream_to_array_vec_stream( - iter: impl Stream>>, + iter: impl Stream>, schema: Schema, chunk_size: usize, ) { @@ -771,7 +871,7 @@ impl StorageTableInner { )) } - /// Construct a stream item `StorageResult>` for batch executors. + /// Construct a stream item `StorageResult` for batch executors. /// Differs from the streaming one, this iterator will wait for the epoch before iteration pub async fn batch_iter_with_pk_bounds( &self, @@ -780,7 +880,7 @@ impl StorageTableInner { range_bounds: impl RangeBounds, ordered: bool, prefetch_options: PrefetchOptions, - ) -> StorageResult>> + Send> { + ) -> StorageResult> + Send> { self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options) .await } @@ -791,70 +891,102 @@ impl StorageTableInner { epoch: HummockReadEpoch, ordered: bool, prefetch_options: PrefetchOptions, - ) -> StorageResult>> + Send> { + ) -> StorageResult> + Send> { self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options) .await } - pub async fn batch_iter_log_with_pk_bounds( + pub async fn batch_iter_vnode( &self, - start_epoch: u64, - end_epoch: HummockReadEpoch, - ordered: bool, - ) -> StorageResult> + Send + 'static> { - let pk_prefix = OwnedRow::default(); - let start_key = self.serialize_pk_bound(&pk_prefix, Unbounded, true); - let end_key = self.serialize_pk_bound(&pk_prefix, Unbounded, false); - - assert!(pk_prefix.len() <= self.pk_indices.len()); - let table_key_ranges = { - // Vnodes that are set and should be accessed. - let vnodes = match self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix) { - // If `vnode_hint` is set, we can only access this single vnode. - Some(vnode) => Either::Left(std::iter::once(vnode)), - // Otherwise, we need to access all vnodes of this table. - None => Either::Right(self.distribution.vnodes().iter_vnodes()), - }; - vnodes - .map(|vnode| prefixed_range_with_vnode((start_key.clone(), end_key.clone()), vnode)) + epoch: HummockReadEpoch, + start_pk: Option<&OwnedRow>, + vnode: VirtualNode, + prefetch_options: PrefetchOptions, + ) -> StorageResult> + Send + 'static> { + let start_bound = if let Some(start_pk) = start_pk { + let mut bytes = BytesMut::new(); + self.pk_serializer.serialize(start_pk, &mut bytes); + let bytes = bytes.freeze(); + Included(bytes) + } else { + Unbounded }; - - let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| async move { - let read_options = ReadLogOptions { - table_id: self.table_id, - }; - let iter = StorageTableInnerIterLogInner::::new( - &self.store, - self.mapping.clone(), - self.row_serde.clone(), - table_key_range, - read_options, - start_epoch, - end_epoch, + Ok(self + .iter_vnode_with_encoded_key_range::<()>( + None, + (start_bound.as_ref(), Unbounded), + epoch, + vnode, + prefetch_options, ) .await? - .into_stream(); - Ok::<_, StorageError>(iter) - })) - .await?; - - #[auto_enum(futures03::Stream)] - let iter = match iterators.len() { - 0 => unreachable!(), - 1 => iterators.into_iter().next().unwrap(), - // Concat all iterators if not to preserve order. - _ if !ordered => { - futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec()) - .flatten_unordered(1024) - } - // Merge all iterators if to preserve order. - _ => merge_sort(iterators.into_iter().map(Box::pin).collect()), - } - .map(|row| row.map(|key_row| key_row.into_owned_row())); + .map_ok(|(_, row)| row)) + } + + async fn batch_iter_log_inner( + &self, + start_epoch: u64, + end_epoch: HummockReadEpoch, + start_pk: Option<&OwnedRow>, + vnode: VirtualNode, + ) -> StorageResult>> { + let start_bound = if let Some(start_pk) = start_pk { + let mut bytes = BytesMut::new(); + self.pk_serializer.serialize(start_pk, &mut bytes); + let bytes = bytes.freeze(); + Included(bytes) + } else { + Unbounded + }; + let table_key_range = + prefixed_range_with_vnode::<&Bytes>((start_bound.as_ref(), Unbounded), vnode); + let read_options = ReadLogOptions { + table_id: self.table_id, + }; + let iter = StorageTableInnerIterLogInner::::new( + &self.store, + self.mapping.clone(), + self.row_serde.clone(), + table_key_range, + read_options, + start_epoch, + end_epoch, + ) + .await? + .into_stream::(); Ok(iter) } + pub async fn batch_iter_vnode_log( + &self, + start_epoch: u64, + end_epoch: HummockReadEpoch, + start_pk: Option<&OwnedRow>, + vnode: VirtualNode, + ) -> StorageResult>> { + let stream = self + .batch_iter_log_inner::<()>(start_epoch, end_epoch, start_pk, vnode) + .await?; + Ok(stream.map_ok(|(_, row)| row)) + } + + pub async fn batch_iter_log_with_pk_bounds( + &self, + start_epoch: u64, + end_epoch: HummockReadEpoch, + ordered: bool, + ) -> StorageResult> + Send + 'static> { + let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec(); + build_vnode_stream( + |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode), + |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode), + &vnodes, + ordered, + ) + .await + } + /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds. /// Returns a stream of `DataChunk` with the provided `chunk_size` pub async fn batch_chunk_iter_with_pk_bounds( @@ -953,8 +1085,8 @@ impl StorageTableInnerIterInner { } /// Yield a row with its primary key. - #[try_stream(ok = KeyedRow, error = StorageError)] - async fn into_stream(mut self) { + #[try_stream(ok = (K, OwnedRow), error = StorageError)] + async fn into_stream(mut self) { while let Some((k, v)) = self .iter .try_next() @@ -964,7 +1096,7 @@ impl StorageTableInnerIterInner { let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap); let row = self.row_deserializer.deserialize(value)?; let result_row_in_value = self.mapping.project(OwnedRow::new(row)); - match &self.key_output_indices { + let row = match &self.key_output_indices { Some(key_output_indices) => { let result_row_in_key = match self.pk_serializer.clone() { Some(pk_serializer) => { @@ -1004,13 +1136,7 @@ impl StorageTableInnerIterInner { ); } } - let row = OwnedRow::new(result_row_vec); - - // TODO: may optimize the key clone - yield KeyedRow { - vnode_prefixed_key: table_key.copy_into(), - row, - } + OwnedRow::new(result_row_vec) } None => match &self.epoch_idx { Some(epoch_idx) => { @@ -1033,20 +1159,12 @@ impl StorageTableInnerIterInner { ); } } - let row = OwnedRow::new(result_row_vec); - yield KeyedRow { - vnode_prefixed_key: table_key.copy_into(), - row, - } - } - None => { - yield KeyedRow { - vnode_prefixed_key: table_key.copy_into(), - row: result_row_in_value.into_owned_row(), - } + OwnedRow::new(result_row_vec) } + None => result_row_in_value.into_owned_row(), }, - } + }; + yield (K::copy_from_slice(table_key.as_ref()), row); } } } @@ -1097,7 +1215,7 @@ impl StorageTableInnerIterLogInner { } /// Yield a row with its primary key. - fn into_stream(self) -> impl Stream>> { + fn into_stream(self) -> impl Stream> { self.iter.into_stream(move |(table_key, value)| { value .try_map(|value| { @@ -1108,10 +1226,7 @@ impl StorageTableInnerIterLogInner { .into_owned_row(); Ok(row) }) - .map(|row| KeyedChangeLogRow { - vnode_prefixed_key: table_key.copy_into(), - row, - }) + .map(|row| (K::copy_from_slice(table_key.as_ref()), row)) }) } } diff --git a/src/storage/src/table/merge_sort.rs b/src/storage/src/table/merge_sort.rs index 44b5e03dd4b0d..b4443134ab89c 100644 --- a/src/storage/src/table/merge_sort.rs +++ b/src/storage/src/table/merge_sort.rs @@ -72,7 +72,7 @@ impl Ord for Node { } #[try_stream(ok=KO, error=E)] -pub async fn merge_sort(streams: Vec) +pub async fn merge_sort(streams: impl IntoIterator) where KO: NodePeek + Send + Sync, E: Error, diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index f6e9885d62be3..f7a1be00b1ce7 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -32,8 +32,6 @@ use crate::row_serde::value_serde::ValueRowSerde; use crate::store::{ChangeLogValue, StateStoreIterExt, StateStoreReadLogItem}; use crate::StateStoreIter; -// TODO: GAT-ify this trait or remove this trait -#[async_trait::async_trait] pub trait TableIter: Send { async fn next_row(&mut self) -> StorageResult>; } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index c39fb37300134..051f0c1c17a62 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -39,8 +39,8 @@ use risingwave_common::util::row_serde::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; use risingwave_common::util::value_encoding::BasicSerde; use risingwave_hummock_sdk::key::{ - end_bound_of_prefix, prefixed_range_with_vnode, start_bound_of_excluded_prefix, TableKey, - TableKeyRange, + end_bound_of_prefix, prefixed_range_with_vnode, start_bound_of_excluded_prefix, CopyFromSlice, + TableKey, TableKeyRange, }; use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection}; use risingwave_pb::catalog::Table; @@ -1030,7 +1030,7 @@ where let mut streams = vec![]; for vnode in self.vnodes().iter_vnodes() { let stream = self - .iter_with_vnode(vnode, &range, PrefetchOptions::default()) + .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default()) .await?; streams.push(Box::pin(stream)); } @@ -1159,11 +1159,9 @@ where } } -pub trait KeyedRowStream<'a>: Stream>> + 'a {} -impl<'a, T> KeyedRowStream<'a> for T where - T: Stream>> + 'a -{ -} +pub trait RowStream<'a> = Stream> + 'a; +pub trait KeyedRowStream<'a> = Stream>> + 'a; +pub trait PkRowStream<'a, K> = Stream> + 'a; // Iterator functions impl @@ -1183,12 +1181,27 @@ where vnode: VirtualNode, pk_range: &(Bound, Bound), prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult> { + Ok(deserialize_keyed_row_stream::<'_, ()>( + self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options) + .await?, + &self.row_serde, + ) + .map_ok(|(_, row)| row)) + } + + pub async fn iter_keyed_row_with_vnode( + &self, + vnode: VirtualNode, + pk_range: &(Bound, Bound), + prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { Ok(deserialize_keyed_row_stream( self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options) .await?, &self.row_serde, - )) + ) + .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row))) } pub async fn iter_with_vnode_and_output_indices( @@ -1196,18 +1209,12 @@ where vnode: VirtualNode, pk_range: &(Bound, Bound), prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult>> + '_> { + ) -> StreamExecutorResult> { assert!(IS_REPLICATED); let stream = self .iter_with_vnode(vnode, pk_range, prefetch_options) .await?; - Ok(stream.map(|row| { - row.map(|keyed_row| { - let (vnode_prefixed_key, row) = keyed_row.into_parts(); - let row = row.project(&self.output_indices).into_owned_row(); - KeyedRow::new(vnode_prefixed_key, row) - }) - })) + Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row()))) } async fn iter_kv( @@ -1257,9 +1264,22 @@ where pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, + ) -> StreamExecutorResult> { + let stream = self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) + .await?; + Ok(stream.map_ok(|(_, row)| row)) + } + + pub async fn iter_keyed_row_with_prefix( + &self, + pk_prefix: impl Row, + sub_range: &(Bound, Bound), + prefetch_options: PrefetchOptions, ) -> StreamExecutorResult> { - self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) - .await + Ok( + self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) + .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)), + ) } /// This function scans the table just like `iter_with_prefix`, but in reverse order. @@ -1268,17 +1288,19 @@ where pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { - self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) - .await + ) -> StreamExecutorResult> { + Ok( + self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) + .await?.map_ok(|(_, row)| row), + ) } - async fn iter_with_prefix_inner( + async fn iter_with_prefix_inner( &self, pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult> { let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); @@ -1398,14 +1420,13 @@ where } } -fn deserialize_keyed_row_stream<'a>( +fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>( iter: impl StateStoreIter + 'a, deserializer: &'a impl ValueRowSerde, -) -> impl KeyedRowStream<'a> { +) -> impl PkRowStream<'a, K> { iter.into_stream(move |(key, value)| { - Ok(KeyedRow::new( - // TODO: may avoid clone the key when key is not needed - key.user_key.table_key.copy_into(), + Ok(( + K::copy_from_slice(key.user_key.table_key.as_ref()), deserializer.deserialize(value).map(OwnedRow::new)?, )) }) diff --git a/src/stream/src/common/table/test_state_table.rs b/src/stream/src/common/table/test_state_table.rs index 7b8959bbf2c93..721c42b3cd746 100644 --- a/src/stream/src/common/table/test_state_table.rs +++ b/src/stream/src/common/table/test_state_table.rs @@ -1069,7 +1069,7 @@ async fn test_state_table_write_chunk() { .collect::>() .await .into_iter() - .map(|row| row.unwrap().into_owned_row()) + .map(|row| row.unwrap()) .collect(); assert_eq!(rows.len(), 2); @@ -1186,7 +1186,7 @@ async fn test_state_table_write_chunk_visibility() { .collect::>() .await .into_iter() - .map(|row| row.unwrap().into_owned_row()) + .map(|row| row.unwrap()) .collect(); assert_eq!(rows.len(), 3); @@ -1301,7 +1301,7 @@ async fn test_state_table_write_chunk_value_indices() { .collect::>() .await .into_iter() - .map(|row| row.unwrap().into_owned_row()) + .map(|row| row.unwrap()) .collect(); assert_eq!(rows.len(), 3); @@ -1386,7 +1386,7 @@ async fn test_state_table_watermark_cache_ignore_null() { .collect::>() .await .into_iter() - .map(|row| row.unwrap().into_owned_row()) + .map(|row| row.unwrap()) .collect(); assert_eq!(inserted_rows.len(), 4); @@ -1688,7 +1688,7 @@ async fn test_state_table_watermark_cache_refill() { .collect::>() .await .into_iter() - .map(|row| row.unwrap().into_owned_row()) + .map(|row| row.unwrap()) .collect(); assert_eq!(inserted_rows.len(), 4); diff --git a/src/stream/src/executor/backfill/arrangement_backfill.rs b/src/stream/src/executor/backfill/arrangement_backfill.rs index e2d28bce04bc0..fde42fd1716cd 100644 --- a/src/stream/src/executor/backfill/arrangement_backfill.rs +++ b/src/stream/src/executor/backfill/arrangement_backfill.rs @@ -30,8 +30,8 @@ use crate::common::table::state_table::ReplicatedStateTable; use crate::executor::backfill::utils::METADATA_STATE_LEN; use crate::executor::backfill::utils::{ compute_bounds, create_builder, create_limiter, get_progress_per_vnode, mapping_chunk, - mapping_message, mark_chunk_ref_by_vnode, owned_row_iter, persist_state_per_vnode, - update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState, + mapping_message, mark_chunk_ref_by_vnode, persist_state_per_vnode, update_pos_by_vnode, + BackfillProgressPerVnode, BackfillRateLimiter, BackfillState, }; use crate::executor::prelude::*; use crate::task::CreateMviewProgressReporter; @@ -720,8 +720,6 @@ where ) .await?; - let vnode_row_iter = Box::pin(owned_row_iter(vnode_row_iter)); - let vnode_row_iter = vnode_row_iter.map_ok(move |row| (vnode, row)); let vnode_row_iter = Box::pin(vnode_row_iter); diff --git a/src/stream/src/executor/backfill/no_shuffle_backfill.rs b/src/stream/src/executor/backfill/no_shuffle_backfill.rs index 9d6d95ff7e7cb..8664c573b5354 100644 --- a/src/stream/src/executor/backfill/no_shuffle_backfill.rs +++ b/src/stream/src/executor/backfill/no_shuffle_backfill.rs @@ -26,8 +26,7 @@ use risingwave_storage::table::batch_table::storage_table::StorageTable; use crate::executor::backfill::utils; use crate::executor::backfill::utils::{ compute_bounds, construct_initial_finished_state, create_builder, create_limiter, get_new_pos, - mapping_chunk, mapping_message, mark_chunk, owned_row_iter, BackfillRateLimiter, - METADATA_STATE_LEN, + mapping_chunk, mapping_message, mark_chunk, BackfillRateLimiter, METADATA_STATE_LEN, }; use crate::executor::prelude::*; use crate::task::CreateMviewProgressReporter; @@ -687,7 +686,7 @@ where // We use uncommitted read here, because we have already scheduled the `BackfillExecutor` // together with the upstream mv. - let iter = upstream_table + let row_iter = upstream_table .batch_iter_with_pk_bounds( epoch, row::empty(), @@ -697,7 +696,6 @@ where PrefetchOptions::prefetch_for_small_range_scan(), ) .await?; - let row_iter = owned_row_iter(iter); #[for_await] for row in row_iter { diff --git a/src/stream/src/executor/backfill/utils.rs b/src/stream/src/executor/backfill/utils.rs index 7ae488bb718a1..ee8e9c46bfd77 100644 --- a/src/stream/src/executor/backfill/utils.rs +++ b/src/stream/src/executor/backfill/utils.rs @@ -19,9 +19,8 @@ use std::ops::Bound; use std::time::Instant; use await_tree::InstrumentAwait; -use bytes::Bytes; use futures::future::try_join_all; -use futures::{pin_mut, Stream, StreamExt}; +use futures::Stream; use futures_async_stream::try_stream; use governor::clock::MonotonicClock; use governor::middleware::NoOpMiddleware; @@ -42,7 +41,7 @@ use risingwave_common::util::value_encoding::BasicSerde; use risingwave_connector::error::ConnectorError; use risingwave_connector::source::cdc::external::{CdcOffset, CdcOffsetParseFunc}; use risingwave_storage::row_serde::value_serde::ValueRowSerde; -use risingwave_storage::table::{collect_data_chunk_with_builder, KeyedRow}; +use risingwave_storage::table::collect_data_chunk_with_builder; use risingwave_storage::StateStore; use crate::common::table::state_table::{ReplicatedStateTable, StateTableInner}; @@ -666,19 +665,6 @@ pub(crate) fn compute_bounds( } } -#[try_stream(ok = OwnedRow, error = StreamExecutorError)] -pub(crate) async fn owned_row_iter(storage_iter: S) -where - StreamExecutorError: From, - S: Stream, E>>, -{ - pin_mut!(storage_iter); - while let Some(row) = storage_iter.next().await { - let row = row?; - yield row.into_owned_row() - } -} - #[try_stream(ok = StreamChunk, error = StreamExecutorError)] pub(crate) async fn iter_chunks<'a, S, E, R>(mut iter: S, builder: &'a mut DataChunkBuilder) where diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 287931076108e..1ee3694be4d03 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -398,15 +398,17 @@ impl JoinHashMap { if self.need_degree_table { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); - let table_iter_fut = - self.state - .table - .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); + let table_iter_fut = self.state.table.iter_keyed_row_with_prefix( + &key, + sub_range, + PrefetchOptions::default(), + ); let degree_state = self.degree_state.as_ref().unwrap(); - let degree_table_iter_fut = - degree_state - .table - .iter_with_prefix(&key, sub_range, PrefetchOptions::default()); + let degree_table_iter_fut = degree_state.table.iter_keyed_row_with_prefix( + &key, + sub_range, + PrefetchOptions::default(), + ); let (table_iter, degree_table_iter) = try_join(table_iter_fut, degree_table_iter_fut).await?; @@ -538,7 +540,7 @@ impl JoinHashMap { let table_iter = self .state .table - .iter_with_prefix(&key, sub_range, PrefetchOptions::default()) + .iter_keyed_row_with_prefix(&key, sub_range, PrefetchOptions::default()) .await?; #[for_await] diff --git a/src/stream/src/executor/nested_loop_temporal_join.rs b/src/stream/src/executor/nested_loop_temporal_join.rs index 55d21b468a777..c4e463daf45a0 100644 --- a/src/stream/src/executor/nested_loop_temporal_join.rs +++ b/src/stream/src/executor/nested_loop_temporal_join.rs @@ -72,7 +72,7 @@ async fn phase1_handle_chunk( for (op, left_row) in chunk.rows() { let mut matched = false; #[for_await] - for keyed_row in right_table + for right_row in right_table .source .batch_iter( HummockReadEpoch::NoWait(epoch), @@ -81,8 +81,7 @@ async fn phase1_handle_chunk( ) .await? { - let keyed_row = keyed_row?; - let right_row = keyed_row.row(); + let right_row = right_row?; matched = true; if let Some(chunk) = E::append_matched_row(op, &mut builder, left_row, right_row) { yield chunk; diff --git a/src/stream/src/executor/sort_buffer.rs b/src/stream/src/executor/sort_buffer.rs index b3d658f3af4a5..04dbce84f81b5 100644 --- a/src/stream/src/executor/sort_buffer.rs +++ b/src/stream/src/executor/sort_buffer.rs @@ -213,7 +213,7 @@ impl SortBuffer { let streams: Vec<_> = futures::future::try_join_all(buffer_table.vnodes().iter_vnodes().map(|vnode| { - buffer_table.iter_with_vnode( + buffer_table.iter_keyed_row_with_vnode( vnode, &pk_range, PrefetchOptions::new(filler.capacity().is_none(), false),