diff --git a/src/batch/src/executor/row_seq_scan.rs b/src/batch/src/executor/row_seq_scan.rs index 988122218479f..505702ffa021b 100644 --- a/src/batch/src/executor/row_seq_scan.rs +++ b/src/batch/src/executor/row_seq_scan.rs @@ -31,7 +31,7 @@ use risingwave_pb::common::BatchQueryEpoch; use risingwave_pb::plan_common::StorageTableDesc; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; -use risingwave_storage::table::{collect_data_chunk, TableDistribution}; +use risingwave_storage::table::TableDistribution; use risingwave_storage::{dispatch_state_store, StateStore}; use crate::error::{BatchError, Result}; @@ -387,7 +387,7 @@ impl RowSeqScanExecutor { // Range Scan. assert!(pk_prefix.len() < table.pk_indices().len()); let iter = table - .batch_iter_with_pk_bounds( + .batch_chunk_iter_with_pk_bounds( epoch.into(), &pk_prefix, ( @@ -419,6 +419,7 @@ impl RowSeqScanExecutor { }, ), ordered, + chunk_size, PrefetchOptions::new(limit.is_none(), true), ) .await?; @@ -427,9 +428,7 @@ impl RowSeqScanExecutor { loop { let timer = histogram.as_ref().map(|histogram| histogram.start_timer()); - let chunk = collect_data_chunk(&mut iter, table.schema(), Some(chunk_size)) - .await - .map_err(BatchError::from)?; + let chunk = iter.next().await.transpose().map_err(BatchError::from)?; if let Some(timer) = timer { timer.observe_duration() diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index 795668dfe8b1b..574250bb74ca8 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -21,10 +21,10 @@ use await_tree::InstrumentAwait; use bytes::Bytes; use foyer::CacheContext; use futures::future::try_join_all; -use futures::{Stream, StreamExt}; +use futures::{stream, Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::{Either, Itertools}; -use risingwave_common::array::Op; +use risingwave_common::array::{ArrayRef, DataChunk, Op}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; @@ -626,6 +626,90 @@ impl StorageTableInner { .await } + /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds. + /// Returns a stream of chunks of columns with the provided `chunk_size` + async fn chunk_iter_with_pk_bounds( + &self, + epoch: HummockReadEpoch, + pk_prefix: impl Row, + range_bounds: impl RangeBounds, + ordered: bool, + chunk_size: usize, + prefetch_options: PrefetchOptions, + ) -> StorageResult, usize)>> + Send> { + use risingwave_common::util::iter_util::ZipEqFast; + + let iter = self + .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options) + .await?; + + // Uses ArraryBuilderImpl instead of DataChunkBuilder here to demonstrate how to build chunk in a columnar manner + let builders = self.schema.create_array_builders(chunk_size); + let row_count = 0; + Ok(stream::unfold( + Some((Box::pin(iter), builders, row_count, self.schema.clone())), + move |state| async move { + if state.is_none() { + // Already reached end or met error + // We will only reach here after condition 2 or 3 below is met + return None; + } + let (mut iter, mut builders, mut row_count, schema) = state.unwrap(); + match iter.next().await { + Some(Ok(row)) => { + // 1. the row stream returns a valid row + row_count += 1; + for (datum, builder) in row.iter().zip_eq_fast(builders.iter_mut()) { + builder.append(datum); + } + if row_count == chunk_size { + // 1.a. yield a new chunk and reset the builder + let columns: Vec<_> = builders + .into_iter() + .map(|builder| builder.finish().into()) + .collect(); + let builders: Vec = + schema.create_array_builders(chunk_size); + Some(( + // Chunk to yield + Some(Ok((columns, chunk_size))), + // The new state (row_count == 0) + Some((iter, builders, 0, schema)), + )) + } else { + // 1.b. do not yield because the chunk is not full yet + Some(( + // None indicates no chunk to yield. It will be filter out by the filter_map + None, + Some((iter, builders, row_count, schema)), + )) + } + } + Some(Err(e)) => { + // 2. the row stream returns an error. + // yield the error directly and stop the iteration by setting the state to None + Some((Some(Err(e)), None)) + } + None => { + // 3. the row stream has reached the end + if row_count > 0 { + // 3.a. yield the last chunk if any + let columns: Vec<_> = builders + .into_iter() + .map(|builder| builder.finish().into()) + .collect(); + Some((Some(Ok((columns, row_count))), None)) + } else { + // 3.b. No need to yield if the last chunk is empty + None + } + } + } + }, + ) + .filter_map(|x| async { x })) + } + /// Construct a stream item `StorageResult>` for batch executors. /// Differs from the streaming one, this iterator will wait for the epoch before iteration pub async fn batch_iter_with_pk_bounds( @@ -704,6 +788,34 @@ impl StorageTableInner { Ok(iter) } + + /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds. + /// Returns a stream of `DataChunk` with the provided `chunk_size` + pub async fn batch_chunk_iter_with_pk_bounds( + &self, + epoch: HummockReadEpoch, + pk_prefix: impl Row, + range_bounds: impl RangeBounds, + ordered: bool, + chunk_size: usize, + prefetch_options: PrefetchOptions, + ) -> StorageResult> + Send> { + let iter = self + .chunk_iter_with_pk_bounds( + epoch, + pk_prefix, + range_bounds, + ordered, + chunk_size, + prefetch_options, + ) + .await?; + + Ok(iter.map(|item| { + let (columns, row_count) = item?; + Ok(DataChunk::new(columns, row_count)) + })) + } } /// [`StorageTableInnerIterInner`] iterates on the storage table. diff --git a/src/stream/src/common/table/test_storage_table.rs b/src/stream/src/common/table/test_storage_table.rs index cd914340fdb2d..e87931f207384 100644 --- a/src/stream/src/common/table/test_storage_table.rs +++ b/src/stream/src/common/table/test_storage_table.rs @@ -12,20 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::pin_mut; +use futures::{pin_mut, StreamExt}; use itertools::Itertools; +use risingwave_common::array::DataChunk; use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId}; -use risingwave_common::row::OwnedRow; +use risingwave_common::row::{self, OwnedRow, RowExt}; use risingwave_common::types::DataType; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::epoch::{test_epoch, EpochPair}; use risingwave_common::util::sort_util::OrderType; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_hummock_test::test_utils::prepare_hummock_test_env; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::TableIter; +use serde::de; use crate::common::table::state_table::StateTable; use crate::common::table::test_utils::{gen_prost_table, gen_prost_table_with_value_indices}; +use crate::config::chunk_size; /// There are three struct in relational layer, StateTable, MemTable and StorageTable. /// `StateTable` provides read/write interfaces to the upper layer streaming operator. @@ -460,3 +464,123 @@ async fn test_batch_scan_with_value_indices() { let res = iter.next_row().await.unwrap(); assert!(res.is_none()); } + +#[tokio::test] +async fn test_batch_scan_chunk_with_value_indices() { + const TEST_TABLE_ID: TableId = TableId { table_id: 233 }; + let test_env = prepare_hummock_test_env().await; + + let order_types = vec![OrderType::ascending(), OrderType::descending()]; + let column_ids = [ + ColumnId::from(0), + ColumnId::from(1), + ColumnId::from(2), + ColumnId::from(3), + ]; + let column_descs = vec![ + ColumnDesc::unnamed(column_ids[0], DataType::Int32), + ColumnDesc::unnamed(column_ids[1], DataType::Int32), + ColumnDesc::unnamed(column_ids[2], DataType::Int32), + ColumnDesc::unnamed(column_ids[3], DataType::Int32), + ]; + let pk_indices = vec![0_usize, 2_usize]; + let value_indices: Vec = vec![1, 3]; + let read_prefix_len_hint = 0; + let table = gen_prost_table_with_value_indices( + TEST_TABLE_ID, + column_descs.clone(), + order_types.clone(), + pk_indices.clone(), + read_prefix_len_hint, + value_indices.iter().map(|v| *v as i32).collect_vec(), + ); + + test_env.register_table(table.clone()).await; + let mut state = + StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None) + .await; + + let output_column_idx: Vec = vec![1, 2]; + let column_ids_partial = output_column_idx + .iter() + .map(|i| ColumnId::from(*i as i32)) + .collect_vec(); + + let table = StorageTable::for_test_with_partial_columns( + test_env.storage.clone(), + TEST_TABLE_ID, + column_descs.clone(), + column_ids_partial, + order_types.clone(), + pk_indices, + value_indices.clone(), + ); + let mut epoch = EpochPair::new_test_epoch(test_epoch(1)); + state.init_epoch(epoch); + + let gen_row = |i: i32, is_update: bool| { + let scale = if is_update { 10 } else { 1 }; + OwnedRow::new(vec![ + Some(i.into()), + Some((i * 10 * scale).into()), + Some((i * 100).into()), + Some((i * 1000 * scale).into()), + ]) + }; + + let mut rows = vec![]; + let insert_row_idx = (0..20).collect_vec(); + let delete_row_idx = (0..5).map(|i| i * 2).collect_vec(); + let updated_row_idx = (0..5).map(|i| i * 2 + 1).collect_vec(); + for i in &insert_row_idx { + let row = gen_row(*i, false); + state.insert(row.clone()); + rows.push(row); + } + + for i in &updated_row_idx { + let row = gen_row(*i, true); + state.update(rows[*i as usize].clone(), row.clone()); + rows[*i as usize] = row; + } + + for i in &delete_row_idx { + let row = gen_row(*i, false); + state.delete(row); + } + + let mut rows = rows + .into_iter() + .enumerate() + .filter(|(idx, _)| !delete_row_idx.contains(&(*idx as i32))) + .map(|(_, row)| row) + .collect_vec(); + + epoch.inc_for_test(); + state.commit(epoch).await.unwrap(); + test_env.commit_epoch(epoch.prev).await; + + let chunk_size = 2; + let iter = table + .batch_chunk_iter_with_pk_bounds( + HummockReadEpoch::Committed(epoch.prev), + row::empty(), + .., + false, + chunk_size, + Default::default(), + ) + .await + .unwrap(); + pin_mut!(iter); + + let chunks: Vec<_> = iter.collect().await; + for (chunk, expected_rows) in chunks.into_iter().zip_eq(rows.chunks_mut(chunk_size)) { + let mut builder = + DataChunkBuilder::new(vec![DataType::Int32, DataType::Int32], 2 * chunk_size); + for row in expected_rows { + let _ = builder.append_one_row(row.clone().project(&output_column_idx)); + } + assert_eq!(builder.consume_all().unwrap(), chunk.unwrap()); + } +}