Skip to content

Commit

Permalink
refactor(state_table): unify iterator interfaces of state table (#12364)
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Oct 11, 2023
1 parent 12a5b2e commit 60b3429
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 125 deletions.
11 changes: 10 additions & 1 deletion src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use std::ops::Bound::Unbounded;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -20,6 +22,7 @@ use anyhow::Result;
use clap::Subcommand;
use futures::future::try_join_all;
use futures::{pin_mut, Future, StreamExt};
use risingwave_common::row::{self, OwnedRow};
use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::store::PrefetchOptions;
use size::Size;
Expand Down Expand Up @@ -102,8 +105,14 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
tb
};
loop {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Unbounded, Unbounded);
let stream = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await?;
pin_mut!(stream);
iter_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down
79 changes: 17 additions & 62 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ where
let mut streams = vec![];
for vnode in self.vnodes().iter_vnodes() {
let stream = self
.iter_row_with_pk_range(&range, vnode, PrefetchOptions::default())
.iter_with_vnode(vnode, &range, PrefetchOptions::default())
.await?;
streams.push(Box::pin(stream));
}
Expand Down Expand Up @@ -1094,38 +1094,16 @@ where
S: StateStore,
SD: ValueRowSerde,
{
/// This function scans rows from the relational table.
pub async fn iter_row(
&self,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
self.iter_row_with_pk_prefix(row::empty(), prefetch_options)
.await
}

/// This function scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
pub async fn iter_row_with_pk_prefix(
&self,
pk_prefix: impl Row,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
Ok(deserialize_keyed_row_stream(
self.iter_kv_with_pk_prefix(pk_prefix, prefetch_options)
.await?,
&self.row_serde,
))
}

/// This function scans rows from the relational table with specific `pk_range` under the same
/// `vnode`.
pub async fn iter_row_with_pk_range(
pub async fn iter_with_vnode(
&self,
pk_range: &(Bound<impl Row>, Bound<impl Row>),

// Optional vnode that returns an iterator only over the given range under that vnode.
// For now, we require this parameter, and will panic. In the future, when `None`, we can
// iterate over each vnode that the `StateTableInner` owns.
vnode: VirtualNode,
pk_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
Ok(deserialize_keyed_row_stream(
Expand Down Expand Up @@ -1155,49 +1133,26 @@ where
Ok(self.local_store.iter(table_key_range, read_options).await?)
}

/// This function scans raw key-values from the relational table with specific `pk_prefix`.
/// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
/// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
async fn iter_kv_with_pk_prefix(
pub async fn iter_with_prefix(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<<S::Local as LocalStateStore>::IterStream<'_>> {
let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
let encoded_key_range = range_of_prefix(&encoded_prefix);

// We assume that all usages of iterating the state table only access a single vnode.
// If this assertion fails, then something must be wrong with the operator implementation or
// the distribution derivation from the optimizer.
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes();
let encoded_key_range_with_vnode = prefixed_range(encoded_key_range, &vnode);

// Construct prefix hint for prefix bloom filter.
let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
if self.prefix_hint_len != 0 {
debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
}
let prefix_hint = {
if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
None
} else {
let encoded_prefix_len = self
.pk_serde
.deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;

Some(Bytes::from(encoded_prefix[..encoded_prefix_len].to_vec()))
}
};

trace!(
table_id = %self.table_id(),
?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix,
?pk_prefix_indices,
"storage_iter_with_prefix"
);
let memcomparable_range =
prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);

self.iter_kv(encoded_key_range_with_vnode, prefix_hint, prefetch_options)
.await
let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode);
Ok(deserialize_keyed_row_stream(
self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
.await?,
&self.row_serde,
))
}

/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
Expand Down
79 changes: 56 additions & 23 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use std::ops::Bound::{self, *};

use futures::{pin_mut, StreamExt};
use risingwave_common::array::{Op, StreamChunk};
Expand Down Expand Up @@ -282,8 +282,9 @@ async fn test_state_table_iter_with_prefix() {
]));

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
let iter = state_table
.iter_row_with_pk_prefix(&pk_prefix, Default::default())
.iter_with_prefix(&pk_prefix, sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand Down Expand Up @@ -414,7 +415,7 @@ async fn test_state_table_iter_with_pk_range() {
std::ops::Bound::Included(OwnedRow::new(vec![Some(4_i32.into())])),
);
let iter = state_table
.iter_row_with_pk_range(&pk_range, DEFAULT_VNODE, Default::default())
.iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand All @@ -439,7 +440,7 @@ async fn test_state_table_iter_with_pk_range() {
std::ops::Bound::<row::Empty>::Unbounded,
);
let iter = state_table
.iter_row_with_pk_range(&pk_range, DEFAULT_VNODE, Default::default())
.iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand Down Expand Up @@ -576,9 +577,12 @@ async fn test_state_table_iter_with_value_indices() {
Some(99_i32.into()),
Some(999_i32.into()),
]));

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
{
let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -633,7 +637,10 @@ async fn test_state_table_iter_with_value_indices() {
Some(888_i32.into()),
]));

let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -737,9 +744,12 @@ async fn test_state_table_iter_with_shuffle_value_indices() {
Some(99_i32.into()),
Some(999_i32.into()),
]));

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
{
let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -815,7 +825,10 @@ async fn test_state_table_iter_with_shuffle_value_indices() {
Some(888_i32.into()),
]));

let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -1000,9 +1013,13 @@ async fn test_state_table_write_chunk() {
);

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1114,9 +1131,13 @@ async fn test_state_table_write_chunk_visibility() {
StreamChunk::with_visibility(ops, columns, Bitmap::from_iter([true, true, true, false]));

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1226,9 +1247,13 @@ async fn test_state_table_write_chunk_value_indices() {
);

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1508,9 +1533,13 @@ async fn test_state_table_watermark_cache_ignore_null() {
let chunk = StreamChunk::from_rows(&rows, &data_types);

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let inserted_rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1795,9 +1824,13 @@ async fn test_state_table_watermark_cache_refill() {
for row in &rows {
state_table.insert(row);
}

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let inserted_rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1895,7 +1928,7 @@ async fn test_state_table_iter_prefix_and_sub_range() {
);

let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default())
.iter_with_prefix(pk_prefix, &sub_range1, Default::default())
.await
.unwrap();

Expand Down Expand Up @@ -1933,7 +1966,7 @@ async fn test_state_table_iter_prefix_and_sub_range() {

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default())
.iter_with_prefix(pk_prefix, &sub_range2, Default::default())
.await
.unwrap();

Expand Down Expand Up @@ -1971,7 +2004,7 @@ async fn test_state_table_iter_prefix_and_sub_range() {

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default())
.iter_with_prefix(pk_prefix, &sub_range3, Default::default())
.await
.unwrap();

Expand Down
10 changes: 7 additions & 3 deletions src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound::{self};

use futures::{pin_mut, StreamExt};
use futures_async_stream::for_await;
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::RowExt;
use risingwave_common::row::{OwnedRow, RowExt};
use risingwave_common::types::Datum;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
Expand Down Expand Up @@ -182,10 +184,12 @@ impl MaterializedInputState {
) -> StreamExecutorResult<Datum> {
if !self.cache.is_synced() {
let mut cache_filler = self.cache.begin_syncing();

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Bound::Unbounded, Bound::Unbounded);
let all_data_iter = state_table
.iter_row_with_pk_prefix(
.iter_with_prefix(
group_key.map(GroupKey::table_pk),
sub_range,
PrefetchOptions {
exhaust_iter: cache_filler.capacity().is_none(),
},
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ where
let range_bounds = range_bounds.unwrap();

let vnode_row_iter = upstream_table
.iter_row_with_pk_range(&range_bounds, vnode, Default::default())
.iter_with_vnode(vnode, &range_bounds, Default::default())
.await?;

// TODO: Is there some way to avoid double-pin here?
Expand Down
Loading

0 comments on commit 60b3429

Please sign in to comment.