Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(state_table): unify iterator interfaces of state table #12364

Merged
merged 19 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/ctl/src/cmd_impl/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use std::ops::Bound::Unbounded;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -20,6 +22,7 @@ use anyhow::Result;
use clap::Subcommand;
use futures::future::try_join_all;
use futures::{pin_mut, Future, StreamExt};
use risingwave_common::row::{self, OwnedRow};
use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::store::PrefetchOptions;
use size::Size;
Expand Down Expand Up @@ -102,8 +105,14 @@ pub async fn do_bench(context: &CtlContext, cmd: BenchCommands) -> Result<()> {
tb
};
loop {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Unbounded, Unbounded);
let stream = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await?;
pin_mut!(stream);
iter_cnt.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down
79 changes: 17 additions & 62 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ where
let mut streams = vec![];
for vnode in self.vnodes().iter_vnodes() {
let stream = self
.iter_row_with_pk_range(&range, vnode, PrefetchOptions::default())
.iter_with_vnode(vnode, &range, PrefetchOptions::default())
.await?;
streams.push(Box::pin(stream));
}
Expand Down Expand Up @@ -1088,38 +1088,16 @@ where
S: StateStore,
SD: ValueRowSerde,
{
/// This function scans rows from the relational table.
pub async fn iter_row(
&self,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
self.iter_row_with_pk_prefix(row::empty(), prefetch_options)
.await
}

/// This function scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
pub async fn iter_row_with_pk_prefix(
&self,
pk_prefix: impl Row,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
Ok(deserialize_keyed_row_stream(
self.iter_kv_with_pk_prefix(pk_prefix, prefetch_options)
.await?,
&self.row_serde,
))
}

/// This function scans rows from the relational table with specific `pk_range` under the same
/// `vnode`.
pub async fn iter_row_with_pk_range(
pub async fn iter_with_vnode(
&self,
pk_range: &(Bound<impl Row>, Bound<impl Row>),

// Optional vnode that returns an iterator only over the given range under that vnode.
// For now, we require this parameter, and will panic. In the future, when `None`, we can
// iterate over each vnode that the `StateTableInner` owns.
vnode: VirtualNode,
pk_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
Ok(deserialize_keyed_row_stream(
Expand Down Expand Up @@ -1149,49 +1127,26 @@ where
Ok(self.local_store.iter(table_key_range, read_options).await?)
}

/// This function scans raw key-values from the relational table with specific `pk_prefix`.
/// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
/// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
async fn iter_kv_with_pk_prefix(
pub async fn iter_with_prefix(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<<S::Local as LocalStateStore>::IterStream<'_>> {
let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
let encoded_key_range = range_of_prefix(&encoded_prefix);

// We assume that all usages of iterating the state table only access a single vnode.
// If this assertion fails, then something must be wrong with the operator implementation or
// the distribution derivation from the optimizer.
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
let vnode = self.compute_prefix_vnode(&pk_prefix).to_be_bytes();
let encoded_key_range_with_vnode = prefixed_range(encoded_key_range, &vnode);

// Construct prefix hint for prefix bloom filter.
let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
if self.prefix_hint_len != 0 {
debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
}
let prefix_hint = {
if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
None
} else {
let encoded_prefix_len = self
.pk_serde
.deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;

Some(Bytes::from(encoded_prefix[..encoded_prefix_len].to_vec()))
}
};

trace!(
table_id = %self.table_id(),
?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix,
?pk_prefix_indices,
"storage_iter_with_prefix"
);
let memcomparable_range =
prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);

self.iter_kv(encoded_key_range_with_vnode, prefix_hint, prefetch_options)
.await
let memcomparable_range_with_vnode = prefixed_range(memcomparable_range, &vnode);
Ok(deserialize_keyed_row_stream(
self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
.await?,
&self.row_serde,
))
Comment on lines -1193 to +1149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefix_hint is lost

}

/// This function scans rows from the relational table with specific `prefix` and `pk_sub_range` under the same
Expand Down
79 changes: 56 additions & 23 deletions src/stream/src/common/table/test_state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use std::ops::Bound::{self, *};

use futures::{pin_mut, StreamExt};
use risingwave_common::array::{Op, StreamChunk};
Expand Down Expand Up @@ -282,8 +282,9 @@ async fn test_state_table_iter_with_prefix() {
]));

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
let iter = state_table
.iter_row_with_pk_prefix(&pk_prefix, Default::default())
.iter_with_prefix(&pk_prefix, sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand Down Expand Up @@ -414,7 +415,7 @@ async fn test_state_table_iter_with_pk_range() {
std::ops::Bound::Included(OwnedRow::new(vec![Some(4_i32.into())])),
);
let iter = state_table
.iter_row_with_pk_range(&pk_range, DEFAULT_VNODE, Default::default())
.iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand All @@ -439,7 +440,7 @@ async fn test_state_table_iter_with_pk_range() {
std::ops::Bound::<row::Empty>::Unbounded,
);
let iter = state_table
.iter_row_with_pk_range(&pk_range, DEFAULT_VNODE, Default::default())
.iter_with_vnode(DEFAULT_VNODE, &pk_range, Default::default())
.await
.unwrap();
pin_mut!(iter);
Expand Down Expand Up @@ -576,9 +577,12 @@ async fn test_state_table_iter_with_value_indices() {
Some(99_i32.into()),
Some(999_i32.into()),
]));

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
{
let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -633,7 +637,10 @@ async fn test_state_table_iter_with_value_indices() {
Some(888_i32.into()),
]));

let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -737,9 +744,12 @@ async fn test_state_table_iter_with_shuffle_value_indices() {
Some(99_i32.into()),
Some(999_i32.into()),
]));

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
{
let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -815,7 +825,10 @@ async fn test_state_table_iter_with_shuffle_value_indices() {
Some(888_i32.into()),
]));

let iter = state_table.iter_row(Default::default()).await.unwrap();
let iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await
.unwrap();
pin_mut!(iter);

let res = iter.next().await.unwrap().unwrap();
Expand Down Expand Up @@ -1000,9 +1013,13 @@ async fn test_state_table_write_chunk() {
);

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1114,9 +1131,13 @@ async fn test_state_table_write_chunk_visibility() {
StreamChunk::with_visibility(ops, columns, Bitmap::from_iter([true, true, true, false]));

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1226,9 +1247,13 @@ async fn test_state_table_write_chunk_value_indices() {
);

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1508,9 +1533,13 @@ async fn test_state_table_watermark_cache_ignore_null() {
let chunk = StreamChunk::from_rows(&rows, &data_types);

state_table.write_chunk(chunk);

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let inserted_rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1795,9 +1824,13 @@ async fn test_state_table_watermark_cache_refill() {
for row in &rows {
state_table.insert(row);
}

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let inserted_rows: Vec<_> = state_table
.iter_row(PrefetchOptions::new_for_exhaust_iter())
.iter_with_prefix(
row::empty(),
sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await
.unwrap()
.collect::<Vec<_>>()
Expand Down Expand Up @@ -1895,7 +1928,7 @@ async fn test_state_table_iter_prefix_and_sub_range() {
);

let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range1, Default::default())
.iter_with_prefix(pk_prefix, &sub_range1, Default::default())
.await
.unwrap();

Expand Down Expand Up @@ -1933,7 +1966,7 @@ async fn test_state_table_iter_prefix_and_sub_range() {

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range2, Default::default())
.iter_with_prefix(pk_prefix, &sub_range2, Default::default())
.await
.unwrap();

Expand Down Expand Up @@ -1971,7 +2004,7 @@ async fn test_state_table_iter_prefix_and_sub_range() {

let pk_prefix = OwnedRow::new(vec![Some(1_i32.into())]);
let iter = state_table
.iter_row_with_pk_prefix_sub_range(pk_prefix, &sub_range3, Default::default())
.iter_with_prefix(pk_prefix, &sub_range3, Default::default())
.await
.unwrap();

Expand Down
10 changes: 7 additions & 3 deletions src/stream/src/executor/aggregation/minput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound::{self};

use futures::{pin_mut, StreamExt};
use futures_async_stream::for_await;
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::row::RowExt;
use risingwave_common::row::{OwnedRow, RowExt};
use risingwave_common::types::Datum;
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
Expand Down Expand Up @@ -182,10 +184,12 @@ impl MaterializedInputState {
) -> StreamExecutorResult<Datum> {
if !self.cache.is_synced() {
let mut cache_filler = self.cache.begin_syncing();

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Bound::Unbounded, Bound::Unbounded);
let all_data_iter = state_table
.iter_row_with_pk_prefix(
.iter_with_prefix(
group_key.map(GroupKey::table_pk),
sub_range,
PrefetchOptions {
exhaust_iter: cache_filler.capacity().is_none(),
},
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ where
let range_bounds = range_bounds.unwrap();

let vnode_row_iter = upstream_table
.iter_row_with_pk_range(&range_bounds, vnode, Default::default())
.iter_with_vnode(vnode, &range_bounds, Default::default())
.await?;

// TODO: Is there some way to avoid double-pin here?
Expand Down
Loading
Loading