Skip to content

Commit

Permalink
feat(storage): support data chunk iter in storage table
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Jun 11, 2024
1 parent daa1c42 commit 109e35d
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 9 deletions.
9 changes: 4 additions & 5 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::table::TableDistribution;
use risingwave_storage::{dispatch_state_store, StateStore};

use crate::error::{BatchError, Result};
Expand Down Expand Up @@ -387,7 +387,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
// Range Scan.
assert!(pk_prefix.len() < table.pk_indices().len());
let iter = table
.batch_iter_with_pk_bounds(
.batch_chunk_iter_with_pk_bounds(
epoch.into(),
&pk_prefix,
(
Expand Down Expand Up @@ -419,6 +419,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
},
),
ordered,
chunk_size,
PrefetchOptions::new(limit.is_none(), true),
)
.await?;
Expand All @@ -427,9 +428,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
loop {
let timer = histogram.as_ref().map(|histogram| histogram.start_timer());

let chunk = collect_data_chunk(&mut iter, table.schema(), Some(chunk_size))
.await
.map_err(BatchError::from)?;
let chunk = iter.next().await.transpose().map_err(BatchError::from)?;

if let Some(timer) = timer {
timer.observe_duration()
Expand Down
116 changes: 114 additions & 2 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use await_tree::InstrumentAwait;
use bytes::Bytes;
use foyer::CacheContext;
use futures::future::try_join_all;
use futures::{Stream, StreamExt};
use futures::{stream, Stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::{Either, Itertools};
use risingwave_common::array::Op;
use risingwave_common::array::{ArrayRef, DataChunk, Op};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
Expand Down Expand Up @@ -626,6 +626,90 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
.await
}

/// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
/// Returns a stream of chunks of columns with the provided `chunk_size`
async fn chunk_iter_with_pk_bounds(
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row,
range_bounds: impl RangeBounds<OwnedRow>,
ordered: bool,
chunk_size: usize,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
use risingwave_common::util::iter_util::ZipEqFast;

let iter = self
.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
.await?;

// Uses ArraryBuilderImpl instead of DataChunkBuilder here to demonstrate how to build chunk in a columnar manner

Check warning on line 646 in src/storage/src/table/batch_table/storage_table.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Arrary" should be "Array".
let builders = self.schema.create_array_builders(chunk_size);
let row_count = 0;
Ok(stream::unfold(
Some((Box::pin(iter), builders, row_count, self.schema.clone())),
move |state| async move {
if state.is_none() {
// Already reached end or met error
// We will only reach here after condition 2 or 3 below is met
return None;
}
let (mut iter, mut builders, mut row_count, schema) = state.unwrap();
match iter.next().await {
Some(Ok(row)) => {
// 1. the row stream returns a valid row
row_count += 1;
for (datum, builder) in row.iter().zip_eq_fast(builders.iter_mut()) {
builder.append(datum);
}
if row_count == chunk_size {
// 1.a. yield a new chunk and reset the builder
let columns: Vec<_> = builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();
let builders: Vec<risingwave_common::array::ArrayBuilderImpl> =
schema.create_array_builders(chunk_size);
Some((
// Chunk to yield
Some(Ok((columns, chunk_size))),
// The new state (row_count == 0)
Some((iter, builders, 0, schema)),
))
} else {
// 1.b. do not yield because the chunk is not full yet
Some((
// None indicates no chunk to yield. It will be filter out by the filter_map
None,
Some((iter, builders, row_count, schema)),
))
}
}
Some(Err(e)) => {
// 2. the row stream returns an error.
// yield the error directly and stop the iteration by setting the state to None
Some((Some(Err(e)), None))
}
None => {
// 3. the row stream has reached the end
if row_count > 0 {
// 3.a. yield the last chunk if any
let columns: Vec<_> = builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();
Some((Some(Ok((columns, row_count))), None))
} else {
// 3.b. No need to yield if the last chunk is empty
None
}
}
}
},
)
.filter_map(|x| async { x }))
}

/// Construct a stream item `StorageResult<KeyedRow<Bytes>>` for batch executors.
/// Differs from the streaming one, this iterator will wait for the epoch before iteration
pub async fn batch_iter_with_pk_bounds(
Expand Down Expand Up @@ -704,6 +788,34 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {

Ok(iter)
}

/// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
/// Returns a stream of `DataChunk` with the provided `chunk_size`
pub async fn batch_chunk_iter_with_pk_bounds(
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row,
range_bounds: impl RangeBounds<OwnedRow>,
ordered: bool,
chunk_size: usize,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
let iter = self
.chunk_iter_with_pk_bounds(
epoch,
pk_prefix,
range_bounds,
ordered,
chunk_size,
prefetch_options,
)
.await?;

Ok(iter.map(|item| {
let (columns, row_count) = item?;
Ok(DataChunk::new(columns, row_count))
}))
}
}

/// [`StorageTableInnerIterInner`] iterates on the storage table.
Expand Down
128 changes: 126 additions & 2 deletions src/stream/src/common/table/test_storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::pin_mut;
use futures::{pin_mut, StreamExt};
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId};
use risingwave_common::row::OwnedRow;
use risingwave_common::row::{self, OwnedRow, RowExt};
use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::epoch::{test_epoch, EpochPair};
use risingwave_common::util::sort_util::OrderType;
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_test::test_utils::prepare_hummock_test_env;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::TableIter;
use serde::de;

use crate::common::table::state_table::StateTable;
use crate::common::table::test_utils::{gen_prost_table, gen_prost_table_with_value_indices};
use crate::config::chunk_size;

/// There are three struct in relational layer, StateTable, MemTable and StorageTable.
/// `StateTable` provides read/write interfaces to the upper layer streaming operator.
Expand Down Expand Up @@ -460,3 +464,123 @@ async fn test_batch_scan_with_value_indices() {
let res = iter.next_row().await.unwrap();
assert!(res.is_none());
}

#[tokio::test]
async fn test_batch_scan_chunk_with_value_indices() {
const TEST_TABLE_ID: TableId = TableId { table_id: 233 };
let test_env = prepare_hummock_test_env().await;

let order_types = vec![OrderType::ascending(), OrderType::descending()];
let column_ids = [
ColumnId::from(0),
ColumnId::from(1),
ColumnId::from(2),
ColumnId::from(3),
];
let column_descs = vec![
ColumnDesc::unnamed(column_ids[0], DataType::Int32),
ColumnDesc::unnamed(column_ids[1], DataType::Int32),
ColumnDesc::unnamed(column_ids[2], DataType::Int32),
ColumnDesc::unnamed(column_ids[3], DataType::Int32),
];
let pk_indices = vec![0_usize, 2_usize];
let value_indices: Vec<usize> = vec![1, 3];
let read_prefix_len_hint = 0;
let table = gen_prost_table_with_value_indices(
TEST_TABLE_ID,
column_descs.clone(),
order_types.clone(),
pk_indices.clone(),
read_prefix_len_hint,
value_indices.iter().map(|v| *v as i32).collect_vec(),
);

test_env.register_table(table.clone()).await;
let mut state =
StateTable::from_table_catalog_inconsistent_op(&table, test_env.storage.clone(), None)
.await;

let output_column_idx: Vec<usize> = vec![1, 2];
let column_ids_partial = output_column_idx
.iter()
.map(|i| ColumnId::from(*i as i32))
.collect_vec();

let table = StorageTable::for_test_with_partial_columns(
test_env.storage.clone(),
TEST_TABLE_ID,
column_descs.clone(),
column_ids_partial,
order_types.clone(),
pk_indices,
value_indices.clone(),
);
let mut epoch = EpochPair::new_test_epoch(test_epoch(1));
state.init_epoch(epoch);

let gen_row = |i: i32, is_update: bool| {
let scale = if is_update { 10 } else { 1 };
OwnedRow::new(vec![
Some(i.into()),
Some((i * 10 * scale).into()),
Some((i * 100).into()),
Some((i * 1000 * scale).into()),
])
};

let mut rows = vec![];
let insert_row_idx = (0..20).collect_vec();
let delete_row_idx = (0..5).map(|i| i * 2).collect_vec();
let updated_row_idx = (0..5).map(|i| i * 2 + 1).collect_vec();
for i in &insert_row_idx {
let row = gen_row(*i, false);
state.insert(row.clone());
rows.push(row);
}

for i in &updated_row_idx {
let row = gen_row(*i, true);
state.update(rows[*i as usize].clone(), row.clone());
rows[*i as usize] = row;
}

for i in &delete_row_idx {
let row = gen_row(*i, false);
state.delete(row);
}

let mut rows = rows
.into_iter()
.enumerate()
.filter(|(idx, _)| !delete_row_idx.contains(&(*idx as i32)))
.map(|(_, row)| row)
.collect_vec();

epoch.inc_for_test();
state.commit(epoch).await.unwrap();
test_env.commit_epoch(epoch.prev).await;

let chunk_size = 2;
let iter = table
.batch_chunk_iter_with_pk_bounds(
HummockReadEpoch::Committed(epoch.prev),
row::empty(),
..,
false,
chunk_size,
Default::default(),
)
.await
.unwrap();
pin_mut!(iter);

let chunks: Vec<_> = iter.collect().await;
for (chunk, expected_rows) in chunks.into_iter().zip_eq(rows.chunks_mut(chunk_size)) {
let mut builder =
DataChunkBuilder::new(vec![DataType::Int32, DataType::Int32], 2 * chunk_size);
for row in expected_rows {
let _ = builder.append_one_row(row.clone().project(&output_column_idx));
}
assert_eq!(builder.consume_all().unwrap(), chunk.unwrap());
}
}

0 comments on commit 109e35d

Please sign in to comment.