Skip to content

Commit

Permalink
refactor(storage-table): ensure UpdataInsert emit next to UpdateDelete
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jul 22, 2024
1 parent 9335967 commit 1a29aef
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 154 deletions.
62 changes: 21 additions & 41 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::{Bound, Deref};
use std::ops::Deref;
use std::sync::Arc;

use futures::prelude::stream::StreamExt;
use futures_async_stream::try_stream;
use futures_util::pin_mut;
use itertools::Itertools;
use prometheus::Histogram;
use risingwave_common::array::DataChunk;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnId, Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::row::{Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, KeyedRow, TableDistribution};
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
Expand Down Expand Up @@ -188,45 +187,26 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
histogram: Option<impl Deref<Target = Histogram>>,
schema: Arc<Schema>,
) {
let pk_prefix = OwnedRow::default();

let order_type = table.pk_serializer().get_order_types()[pk_prefix.len()];
// Range Scan.
let iter = table
.batch_iter_log_with_pk_bounds(
old_epoch.into(),
new_epoch.into(),
&pk_prefix,
(
if order_type.nulls_are_first() {
// `NULL`s are at the start bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
},
if order_type.nulls_are_last() {
// `NULL`s are at the end bound side, we should exclude them to meet SQL semantics.
Bound::Excluded(OwnedRow::new(vec![None]))
} else {
// Both start and end are unbounded, so we need to select all rows.
Bound::Unbounded
},
),
)
.batch_iter_log_with_pk_bounds(old_epoch.into(), new_epoch.into())
.await?
.map(|r| match r {
Ok((op, value)) => {
let (k, row) = value.into_owned_row_key();
// Todo! To avoid create a full row.
let full_row = row
.into_iter()
.chain(vec![Some(ScalarImpl::Int16(op.to_i16()))])
.collect_vec();
let row = OwnedRow::new(full_row);
Ok(KeyedRow::<_>::new(k, row))
}
Err(e) => Err(e),
.flat_map(|r| {
futures::stream::iter(std::iter::from_coroutine(move || {
match r {
Ok(change_log_row) => {
fn with_op(op: Op, row: impl Row) -> impl Row {
row.chain([Some(ScalarImpl::Int16(op.to_i16()))])
}
for (op, row) in change_log_row.into_op_value_iter() {
yield Ok(with_op(op, row));
}
}
Err(e) => {
yield Err(e);
}
};
}))
});

pin_mut!(iter);
Expand Down
1 change: 1 addition & 0 deletions src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#![feature(lazy_cell)]
#![feature(error_generic_member_access)]
#![feature(map_try_insert)]
#![feature(iter_from_coroutine)]

pub mod error;
pub mod exchange_source;
Expand Down
26 changes: 26 additions & 0 deletions src/common/src/row/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,32 @@ pub trait RowExt: Row {
fn is_null_at(&self, index: usize) -> bool {
self.datum_at(index).is_none()
}

fn exact_size_iter(&self) -> impl ExactSizeIterator<Item = DatumRef<'_>> {
ExactSizeRowIter {
inner: self.iter(),
size: self.len(),
}
}
}

pub struct ExactSizeRowIter<'a, I: Iterator<Item = DatumRef<'a>> + 'a> {
inner: I,
size: usize,
}

impl<'a, I: Iterator<Item = DatumRef<'a>> + 'a> Iterator for ExactSizeRowIter<'a, I> {
type Item = DatumRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}

impl<'a, I: Iterator<Item = DatumRef<'a>> + 'a> ExactSizeIterator for ExactSizeRowIter<'a, I> {
fn len(&self) -> usize {
self.size
}
}

impl<R: Row> RowExt for R {}
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#![feature(impl_trait_in_assoc_type)]
#![feature(maybe_uninit_uninit_array)]
#![feature(maybe_uninit_array_assume_init)]
#![feature(iter_from_coroutine)]

pub mod hummock;
pub mod memory;
Expand Down
19 changes: 19 additions & 0 deletions src/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use futures_async_stream::try_stream;
use prost::Message;
use risingwave_common::array::Op;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
Expand Down Expand Up @@ -189,6 +190,24 @@ impl<T> ChangeLogValue<T> {
ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
})
}

pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
std::iter::from_coroutine(move || match self {
Self::Insert(row) => {
yield (Op::Insert, row);
}
Self::Delete(row) => {
yield (Op::Delete, row);
}
Self::Update {
old_value,
new_value,
} => {
yield (Op::UpdateDelete, old_value);
yield (Op::UpdateInsert, new_value);
}
})
}
}

impl<T: AsRef<[u8]>> ChangeLogValue<T> {
Expand Down
100 changes: 20 additions & 80 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use futures::{Stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::{Either, Itertools};
use more_asserts::assert_gt;
use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk, Op};
use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
Expand All @@ -47,9 +47,11 @@ use crate::hummock::CachePolicy;
use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
use crate::row_serde::{find_columns_by_ids, ColumnMapping};
use crate::store::{ChangeLogValue, PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter};
use crate::store::{
PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt,
};
use crate::table::merge_sort::merge_sort;
use crate::table::{KeyedRow, TableDistribution, TableIter};
use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
use crate::StateStore;

/// [`StorageTableInner`] is the interface accessing relational data in KV(`StateStore`) with
Expand Down Expand Up @@ -749,11 +751,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
&self,
satrt_epoch: HummockReadEpoch,
end_epoch: HummockReadEpoch,
pk_prefix: impl Row,
range_bounds: impl RangeBounds<OwnedRow>,
) -> StorageResult<impl Stream<Item = StorageResult<(Op, KeyedRow<Bytes>)>> + Send> {
let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
let pk_prefix = OwnedRow::default();
let start_key = self.serialize_pk_bound(&pk_prefix, Unbounded, true);
let end_key = self.serialize_pk_bound(&pk_prefix, Unbounded, false);

assert!(pk_prefix.len() <= self.pk_indices.len());
let table_key_ranges = {
Expand Down Expand Up @@ -995,77 +996,16 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterLogInner<S, SD> {
}

/// Yield a row with its primary key.
#[try_stream(ok = (Op, KeyedRow<Bytes>), error = StorageError)]
async fn into_stream(mut self) {
while let Some((k, v)) = self
.iter
.try_next()
.verbose_instrument_await("storage_table_iter_next")
.await?
{
match v {
ChangeLogValue::Insert(value) => {
let full_row = self.row_deserializer.deserialize(value)?;
let row = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
// TODO: may optimize the key clone
yield (
Op::Insert,
KeyedRow::<Bytes> {
vnode_prefixed_key: k.copy_into(),
row,
},
);
}
ChangeLogValue::Update {
new_value,
old_value,
} => {
let full_row = self.row_deserializer.deserialize(old_value)?;
let row = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
// TODO: may optimize the key clone
yield (
Op::UpdateDelete,
KeyedRow::<Bytes> {
vnode_prefixed_key: k.copy_into(),
row,
},
);
let full_row = self.row_deserializer.deserialize(new_value)?;
let row = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
// TODO: may optimize the key clone
yield (
Op::UpdateInsert,
KeyedRow::<Bytes> {
vnode_prefixed_key: k.copy_into(),
row,
},
);
}
ChangeLogValue::Delete(value) => {
let full_row = self.row_deserializer.deserialize(value)?;
let row = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
// TODO: may optimize the key clone
yield (
Op::Delete,
KeyedRow::<Bytes> {
vnode_prefixed_key: k.copy_into(),
row,
},
);
}
}
}
fn into_stream(self) -> impl Stream<Item = StorageResult<ChangeLogRow>> {
self.iter.into_stream(move |(_key, value)| {
value.try_map(|value| {
let full_row = self.row_deserializer.deserialize(value)?;
let row = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
Ok(row)
})
})
}
}
50 changes: 19 additions & 31 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@ pub mod merge_sort;

use std::ops::Deref;

use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::{DataChunk, Op};
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
pub use risingwave_common::hash::table_distribution::*;
use risingwave_common::hash::VirtualNode;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_hummock_sdk::key::TableKey;

use crate::error::{StorageError, StorageResult};
use crate::error::StorageResult;
use crate::row_serde::value_serde::ValueRowSerde;
use crate::store::{ChangeLogValue, StateStoreIterExt, StateStoreReadLogItem};
use crate::StateStoreIter;
Expand All @@ -40,20 +38,21 @@ pub trait TableIter: Send {
async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>>;
}

pub async fn collect_data_chunk<E, S>(
pub async fn collect_data_chunk<E, S, R>(
stream: &mut S,
schema: &Schema,
chunk_size: Option<usize>,
) -> Result<Option<DataChunk>, E>
where
S: Stream<Item = Result<KeyedRow<Bytes>, E>> + Unpin,
S: Stream<Item = Result<R, E>> + Unpin,
R: Row,
{
let mut builders = schema.create_array_builders(chunk_size.unwrap_or(0));
let mut row_count = 0;
for _ in 0..chunk_size.unwrap_or(usize::MAX) {
match stream.next().await.transpose()? {
Some(row) => {
for (datum, builder) in row.iter().zip_eq_fast(builders.iter_mut()) {
for (datum, builder) in row.exact_size_iter().zip_eq_fast(builders.iter_mut()) {
builder.append(datum);
}
}
Expand Down Expand Up @@ -151,30 +150,19 @@ impl<T: AsRef<[u8]>> Deref for KeyedRow<T> {
}
}

#[try_stream(ok = (Op, OwnedRow), error = StorageError)]
pub async fn deserialize_log_stream<'a>(
impl<T: AsRef<[u8]>> From<KeyedRow<T>> for OwnedRow {
fn from(value: KeyedRow<T>) -> Self {
value.row
}
}

pub type ChangeLogRow = ChangeLogValue<OwnedRow>;

pub fn deserialize_log_stream<'a>(
iter: impl StateStoreIter<StateStoreReadLogItem> + 'a,
deserializer: &'a impl ValueRowSerde,
) {
let stream = iter.into_stream(|(_key, log_value)| {
) -> impl Stream<Item = StorageResult<ChangeLogRow>> + 'a {
iter.into_stream(|(_key, log_value)| {
log_value.try_map(|slice| Ok(OwnedRow::new(deserializer.deserialize(slice)?)))
});
#[for_await]
for log_value in stream {
match log_value? {
ChangeLogValue::Insert(row) => {
yield (Op::Insert, row);
}
ChangeLogValue::Delete(row) => {
yield (Op::Delete, row);
}
ChangeLogValue::Update {
new_value,
old_value,
} => {
yield (Op::UpdateDelete, old_value);
yield (Op::UpdateInsert, new_value);
}
}
}
})
}
Loading

0 comments on commit 1a29aef

Please sign in to comment.