diff --git a/src/batch/src/executor/log_row_seq_scan.rs b/src/batch/src/executor/log_row_seq_scan.rs index 9ca18a9654f7e..63746fe52c998 100644 --- a/src/batch/src/executor/log_row_seq_scan.rs +++ b/src/batch/src/executor/log_row_seq_scan.rs @@ -12,24 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::{Bound, Deref}; +use std::ops::Deref; use std::sync::Arc; use futures::prelude::stream::StreamExt; use futures_async_stream::try_stream; use futures_util::pin_mut; -use itertools::Itertools; use prometheus::Histogram; -use risingwave_common::array::DataChunk; +use risingwave_common::array::{DataChunk, Op}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnId, Field, Schema}; -use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::row::{Row, RowExt}; use risingwave_common::types::ScalarImpl; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{collect_data_chunk, KeyedRow, TableDistribution}; +use risingwave_storage::table::{collect_data_chunk, TableDistribution}; use risingwave_storage::{dispatch_state_store, StateStore}; use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; @@ -188,45 +187,26 @@ impl LogRowSeqScanExecutor { histogram: Option>, schema: Arc, ) { - let pk_prefix = OwnedRow::default(); - - let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()]; // Range Scan. let iter = table - .batch_iter_log_with_pk_bounds( - old_epoch.into(), - new_epoch.into(), - &pk_prefix, - ( - if order_type.nulls_are_first() { - // `NULL`s are at the start bound side, we should exclude them to meet SQL semantics. - Bound::Excluded(OwnedRow::new(vec![None])) - } else { - // Both start and end are unbounded, so we need to select all rows. - Bound::Unbounded - }, - if order_type.nulls_are_last() { - // `NULL`s are at the end bound side, we should exclude them to meet SQL semantics. - Bound::Excluded(OwnedRow::new(vec![None])) - } else { - // Both start and end are unbounded, so we need to select all rows. - Bound::Unbounded - }, - ), - ) + .batch_iter_log_with_pk_bounds(old_epoch.into(), new_epoch.into()) .await? - .map(|r| match r { - Ok((op, value)) => { - let (k, row) = value.into_owned_row_key(); - // Todo! To avoid create a full row. - let full_row = row - .into_iter() - .chain(vec![Some(ScalarImpl::Int16(op.to_i16()))]) - .collect_vec(); - let row = OwnedRow::new(full_row); - Ok(KeyedRow::<_>::new(k, row)) - } - Err(e) => Err(e), + .flat_map(|r| { + futures::stream::iter(std::iter::from_coroutine(move || { + match r { + Ok(change_log_row) => { + fn with_op(op: Op, row: impl Row) -> impl Row { + row.chain([Some(ScalarImpl::Int16(op.to_i16()))]) + } + for (op, row) in change_log_row.into_op_value_iter() { + yield Ok(with_op(op, row)); + } + } + Err(e) => { + yield Err(e); + } + }; + })) }); pin_mut!(iter); diff --git a/src/batch/src/lib.rs b/src/batch/src/lib.rs index 3a29e2a90b27e..ce479daa2afc5 100644 --- a/src/batch/src/lib.rs +++ b/src/batch/src/lib.rs @@ -31,6 +31,7 @@ #![feature(lazy_cell)] #![feature(error_generic_member_access)] #![feature(map_try_insert)] +#![feature(iter_from_coroutine)] pub mod error; pub mod exchange_source; diff --git a/src/common/src/row/mod.rs b/src/common/src/row/mod.rs index 8114b96cb37e5..a5ec114f1929e 100644 --- a/src/common/src/row/mod.rs +++ b/src/common/src/row/mod.rs @@ -201,6 +201,32 @@ pub trait RowExt: Row { fn is_null_at(&self, index: usize) -> bool { self.datum_at(index).is_none() } + + fn exact_size_iter(&self) -> impl ExactSizeIterator> { + ExactSizeRowIter { + inner: self.iter(), + size: self.len(), + } + } +} + +pub struct ExactSizeRowIter<'a, I: Iterator> + 'a> { + inner: I, + size: usize, +} + +impl<'a, I: Iterator> + 'a> Iterator for ExactSizeRowIter<'a, I> { + type Item = DatumRef<'a>; + + fn next(&mut self) -> Option { + self.inner.next() + } +} + +impl<'a, I: Iterator> + 'a> ExactSizeIterator for ExactSizeRowIter<'a, I> { + fn len(&self) -> usize { + self.size + } } impl RowExt for R {} diff --git a/src/storage/src/lib.rs b/src/storage/src/lib.rs index 21c0c7f49ae4c..ab69c45093b46 100644 --- a/src/storage/src/lib.rs +++ b/src/storage/src/lib.rs @@ -40,6 +40,7 @@ #![feature(impl_trait_in_assoc_type)] #![feature(maybe_uninit_uninit_array)] #![feature(maybe_uninit_array_assume_init)] +#![feature(iter_from_coroutine)] pub mod hummock; pub mod memory; diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 5653927891bd8..84af79132c766 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -25,6 +25,7 @@ use bytes::Bytes; use futures::{Stream, TryStreamExt}; use futures_async_stream::try_stream; use prost::Message; +use risingwave_common::array::Op; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; @@ -189,6 +190,24 @@ impl ChangeLogValue { ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?), }) } + + pub fn into_op_value_iter(self) -> impl Iterator { + std::iter::from_coroutine(move || match self { + Self::Insert(row) => { + yield (Op::Insert, row); + } + Self::Delete(row) => { + yield (Op::Delete, row); + } + Self::Update { + old_value, + new_value, + } => { + yield (Op::UpdateDelete, old_value); + yield (Op::UpdateInsert, new_value); + } + }) + } } impl> ChangeLogValue { diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 9a6169ff5838d..5eaa52a0538c5 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -25,7 +25,7 @@ use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::{Either, Itertools}; use more_asserts::assert_gt; -use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk, Op}; +use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk}; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; @@ -47,9 +47,11 @@ use crate::hummock::CachePolicy; use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode}; use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew}; use crate::row_serde::{find_columns_by_ids, ColumnMapping}; -use crate::store::{ChangeLogValue, PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter}; +use crate::store::{ + PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt, +}; use crate::table::merge_sort::merge_sort; -use crate::table::{KeyedRow, TableDistribution, TableIter}; +use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter}; use crate::StateStore; /// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with @@ -749,11 +751,10 @@ impl StorageTableInner { &self, satrt_epoch: HummockReadEpoch, end_epoch: HummockReadEpoch, - pk_prefix: impl Row, - range_bounds: impl RangeBounds, - ) -> StorageResult)>> + Send> { - let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true); - let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false); + ) -> StorageResult> + Send> { + let pk_prefix = OwnedRow::default(); + let start_key = self.serialize_pk_bound(&pk_prefix, Unbounded, true); + let end_key = self.serialize_pk_bound(&pk_prefix, Unbounded, false); assert!(pk_prefix.len() <= self.pk_indices.len()); let table_key_ranges = { @@ -995,77 +996,16 @@ impl StorageTableInnerIterLogInner { } /// Yield a row with its primary key. - #[try_stream(ok = (Op, KeyedRow), error = StorageError)] - async fn into_stream(mut self) { - while let Some((k, v)) = self - .iter - .try_next() - .verbose_instrument_await("storage_table_iter_next") - .await? - { - match v { - ChangeLogValue::Insert(value) => { - let full_row = self.row_deserializer.deserialize(value)?; - let row = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); - // TODO: may optimize the key clone - yield ( - Op::Insert, - KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }, - ); - } - ChangeLogValue::Update { - new_value, - old_value, - } => { - let full_row = self.row_deserializer.deserialize(old_value)?; - let row = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); - // TODO: may optimize the key clone - yield ( - Op::UpdateDelete, - KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }, - ); - let full_row = self.row_deserializer.deserialize(new_value)?; - let row = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); - // TODO: may optimize the key clone - yield ( - Op::UpdateInsert, - KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }, - ); - } - ChangeLogValue::Delete(value) => { - let full_row = self.row_deserializer.deserialize(value)?; - let row = self - .mapping - .project(OwnedRow::new(full_row)) - .into_owned_row(); - // TODO: may optimize the key clone - yield ( - Op::Delete, - KeyedRow:: { - vnode_prefixed_key: k.copy_into(), - row, - }, - ); - } - } - } + fn into_stream(self) -> impl Stream> { + self.iter.into_stream(move |(_key, value)| { + value.try_map(|value| { + let full_row = self.row_deserializer.deserialize(value)?; + let row = self + .mapping + .project(OwnedRow::new(full_row)) + .into_owned_row(); + Ok(row) + }) + }) } } diff --git a/src/storage/src/table/mod.rs b/src/storage/src/table/mod.rs index d245e4bde3790..b29b0a93f273e 100644 --- a/src/storage/src/table/mod.rs +++ b/src/storage/src/table/mod.rs @@ -17,19 +17,17 @@ pub mod merge_sort; use std::ops::Deref; -use bytes::Bytes; use futures::{Stream, StreamExt}; -use futures_async_stream::try_stream; -use risingwave_common::array::{DataChunk, Op}; +use risingwave_common::array::DataChunk; use risingwave_common::catalog::Schema; pub use risingwave_common::hash::table_distribution::*; use risingwave_common::hash::VirtualNode; -use risingwave_common::row::{OwnedRow, Row}; +use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_hummock_sdk::key::TableKey; -use crate::error::{StorageError, StorageResult}; +use crate::error::StorageResult; use crate::row_serde::value_serde::ValueRowSerde; use crate::store::{ChangeLogValue, StateStoreIterExt, StateStoreReadLogItem}; use crate::StateStoreIter; @@ -40,20 +38,21 @@ pub trait TableIter: Send { async fn next_row(&mut self) -> StorageResult>; } -pub async fn collect_data_chunk( +pub async fn collect_data_chunk( stream: &mut S, schema: &Schema, chunk_size: Option, ) -> Result, E> where - S: Stream, E>> + Unpin, + S: Stream> + Unpin, + R: Row, { let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0)); let mut row_count = 0; for _ in 0..chunk_size.unwrap_or(usize::MAX) { match stream.next().await.transpose()? { Some(row) => { - for (datum, builder) in row.iter().zip_eq_fast(builders.iter_mut()) { + for (datum, builder) in row.exact_size_iter().zip_eq_fast(builders.iter_mut()) { builder.append(datum); } } @@ -151,30 +150,19 @@ impl> Deref for KeyedRow { } } -#[try_stream(ok = (Op, OwnedRow), error = StorageError)] -pub async fn deserialize_log_stream<'a>( +impl> From> for OwnedRow { + fn from(value: KeyedRow) -> Self { + value.row + } +} + +pub type ChangeLogRow = ChangeLogValue; + +pub fn deserialize_log_stream<'a>( iter: impl StateStoreIter + 'a, deserializer: &'a impl ValueRowSerde, -) { - let stream = iter.into_stream(|(_key, log_value)| { +) -> impl Stream> + 'a { + iter.into_stream(|(_key, log_value)| { log_value.try_map(|slice| Ok(OwnedRow::new(deserializer.deserialize(slice)?))) - }); - #[for_await] - for log_value in stream { - match log_value? { - ChangeLogValue::Insert(row) => { - yield (Op::Insert, row); - } - ChangeLogValue::Delete(row) => { - yield (Op::Delete, row); - } - ChangeLogValue::Update { - new_value, - old_value, - } => { - yield (Op::UpdateDelete, old_value); - yield (Op::UpdateInsert, new_value); - } - } - } + }) } diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index babc2d401e7ef..aa5f27ee2eee9 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -57,7 +57,7 @@ use risingwave_storage::store::{ ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt, }; use risingwave_storage::table::merge_sort::merge_sort; -use risingwave_storage::table::{deserialize_log_stream, KeyedRow, TableDistribution}; +use risingwave_storage::table::{ChangeLogRow, deserialize_log_stream, KeyedRow, TableDistribution}; use risingwave_storage::StateStore; use thiserror_ext::AsReport; use tracing::{trace, Instrument}; @@ -1589,7 +1589,7 @@ where vnode: VirtualNode, epoch_range: (u64, u64), pk_range: &(Bound, Bound), - ) -> StreamExecutorResult> + '_> { + ) -> StreamExecutorResult> + '_> { let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range); let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode); Ok(deserialize_log_stream( diff --git a/src/stream/src/executor/batch_query.rs b/src/stream/src/executor/batch_query.rs index d7c7f38d99504..34e32189bc5e5 100644 --- a/src/stream/src/executor/batch_query.rs +++ b/src/stream/src/executor/batch_query.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use futures::TryStreamExt; use risingwave_common::array::Op; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_storage::store::PrefetchOptions; @@ -52,6 +53,7 @@ where PrefetchOptions::prefetch_for_large_range_scan(), ) .await?; + let iter = iter.map_ok(|keyed_row| keyed_row.into_owned_row()); pin_mut!(iter); while let Some(data_chunk) =