Skip to content

Commit

Permalink
refactor(over window): use new table.iter_with_prefix in streaming …
Browse files Browse the repository at this point in the history
…OverWindow (#12872)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 17, 2023
1 parent 203cac8 commit 57b4641
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 89 deletions.
1 change: 0 additions & 1 deletion src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ impl<S: StateStore> OverWindowExecutor<S> {
&mut cache,
this.cache_policy,
&this.calls,
&this.partition_key_indices,
&this.order_key_data_types,
&this.order_key_order_types,
&this.order_key_indices,
Expand Down
147 changes: 59 additions & 88 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@ use std::collections::{BTreeMap, HashSet, VecDeque};
use std::marker::PhantomData;
use std::ops::{Bound, RangeInclusive};

use futures::stream::select_all;
use futures::{stream, StreamExt, TryStreamExt};
use futures_async_stream::for_await;
use risingwave_common::array::stream_record::Record;
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
use risingwave_common::types::DataType;
use risingwave_common::util::memcmp_encoding;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::window_function::{FrameBounds, StateKey, WindowFuncCall};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::merge_sort::merge_sort;
use risingwave_storage::StateStore;

use super::delta_btree_map::Change;
Expand Down Expand Up @@ -230,12 +226,11 @@ pub(super) struct OverPartition<'a, S: StateStore> {
cache_policy: CachePolicy,

calls: &'a [WindowFuncCall],
partition_key_indices: &'a [usize],
order_key_data_types: &'a [DataType],
order_key_order_types: &'a [OrderType],
order_key_indices: &'a [usize],
input_pk_indices: &'a [usize],
state_key_to_table_pk_proj: Vec<usize>,
state_key_to_table_sub_pk_proj: Vec<usize>,
_phantom: PhantomData<S>,
}

Expand All @@ -248,20 +243,16 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
cache: &'a mut PartitionCache,
cache_policy: CachePolicy,
calls: &'a [WindowFuncCall],
partition_key_indices: &'a [usize],
order_key_data_types: &'a [DataType],
order_key_order_types: &'a [OrderType],
order_key_indices: &'a [usize],
input_pk_indices: &'a [usize],
) -> Self {
// TODO(rc): move the calculation to executor?
let mut projection = Vec::with_capacity(
partition_key_indices.len() + order_key_indices.len() + input_pk_indices.len(),
);
let mut projection = Vec::with_capacity(order_key_indices.len() + input_pk_indices.len());
let mut col_dedup = HashSet::new();
for (proj_idx, key_idx) in partition_key_indices
for (proj_idx, key_idx) in order_key_indices
.iter()
.chain(order_key_indices.iter())
.chain(input_pk_indices.iter())
.enumerate()
{
Expand All @@ -277,12 +268,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
cache_policy,

calls,
partition_key_indices,
order_key_data_types,
order_key_order_types,
order_key_indices,
input_pk_indices,
state_key_to_table_pk_proj: projection,
state_key_to_table_sub_pk_proj: projection,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -508,17 +498,17 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

if self.cache_real_len() == 0 {
// no normal entry in the cache, just load the given range
let table_pk_range = (
Bound::Included(self.state_key_to_table_pk(range.start())?),
Bound::Included(self.state_key_to_table_pk(range.end())?),
let table_sub_range = (
Bound::Included(self.state_key_to_table_sub_pk(range.start())?),
Bound::Included(self.state_key_to_table_sub_pk(range.end())?),
);
tracing::debug!(
partition=?self.this_partition_key,
table_pk_range=?table_pk_range,
table_sub_range=?table_sub_range,
"cache is empty, just loading the given range"
);
return self
.extend_cache_by_range_inner(table, table_pk_range)
.extend_cache_by_range_inner(table, table_sub_range)
.await;
}

Expand All @@ -528,33 +518,33 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

if self.cache_left_is_sentinel() && *range.start() < cache_real_first_key {
// extend leftward only if there's smallest sentinel
let table_pk_range = (
Bound::Included(self.state_key_to_table_pk(range.start())?),
Bound::Excluded(self.state_key_to_table_pk(cache_real_first_key)?),
let table_sub_range = (
Bound::Included(self.state_key_to_table_sub_pk(range.start())?),
Bound::Excluded(self.state_key_to_table_sub_pk(cache_real_first_key)?),
);
tracing::trace!(
partition=?self.this_partition_key,
table_pk_range=?table_pk_range,
table_sub_range=?table_sub_range,
"loading the left half of given range"
);
return self
.extend_cache_by_range_inner(table, table_pk_range)
.extend_cache_by_range_inner(table, table_sub_range)
.await;
}

if self.cache_right_is_sentinel() && *range.end() > cache_real_last_key {
// extend rightward only if there's largest sentinel
let table_pk_range = (
Bound::Excluded(self.state_key_to_table_pk(cache_real_last_key)?),
Bound::Included(self.state_key_to_table_pk(range.end())?),
let table_sub_range = (
Bound::Excluded(self.state_key_to_table_sub_pk(cache_real_last_key)?),
Bound::Included(self.state_key_to_table_sub_pk(range.end())?),
);
tracing::trace!(
partition=?self.this_partition_key,
table_pk_range=?table_pk_range,
table_sub_range=?table_sub_range,
"loading the right half of given range"
);
return self
.extend_cache_by_range_inner(table, table_pk_range)
.extend_cache_by_range_inner(table, table_sub_range)
.await;
}

Expand All @@ -569,24 +559,18 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
async fn extend_cache_by_range_inner(
&mut self,
table: &StateTable<S>,
table_pk_range: (Bound<impl Row>, Bound<impl Row>),
table_sub_range: (Bound<impl Row>, Bound<impl Row>),
) -> StreamExecutorResult<()> {
let streams = stream::iter(table.vnode_bitmap().iter_vnodes())
.map(|vnode| {
table.iter_with_vnode(
vnode,
&table_pk_range,
PrefetchOptions::new_for_exhaust_iter(),
)
})
.buffer_unordered(10)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(Box::pin);
let stream = table
.iter_with_prefix(
self.this_partition_key,
&table_sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await?;

#[for_await]
for row in select_all(streams) {
for row in stream {
let row: OwnedRow = row?.into_owned_row();
let key = self.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);
Expand Down Expand Up @@ -647,21 +631,20 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
) -> StreamExecutorResult<()> {
let mut to_extend: VecDeque<OwnedRow> = VecDeque::with_capacity(MAGIC_BATCH_SIZE);
{
let pk_range = (
Bound::Included(self.this_partition_key.into_owned_row()),
Bound::Excluded(self.state_key_to_table_pk(range_to_exclusive)?),
let sub_range = (
Bound::<OwnedRow>::Unbounded,
Bound::Excluded(self.state_key_to_table_sub_pk(range_to_exclusive)?),
);
let streams: Vec<_> =
futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| {
table.iter_with_vnode(vnode, &pk_range, PrefetchOptions::new_for_exhaust_iter())
}))
.await?
.into_iter()
.map(Box::pin)
.collect();
let stream = table
.iter_with_prefix(
self.this_partition_key,
&sub_range,
PrefetchOptions::new_for_exhaust_iter(),
)
.await?;

#[for_await]
for row in merge_sort(streams) {
for row in stream {
let row: OwnedRow = row?.into_owned_row();

// For leftward extension, we now must iterate the table in order from the beginning
Expand Down Expand Up @@ -739,33 +722,22 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
) -> StreamExecutorResult<()> {
let mut n_extended = 0usize;
{
let pk_range = (
Bound::Excluded(self.state_key_to_table_pk(range_from_exclusive)?),
// currently we can't get the first possible key after this partition, so use
// `Unbounded` plus manual check for workaround
let sub_range = (
Bound::Excluded(self.state_key_to_table_sub_pk(range_from_exclusive)?),
Bound::<OwnedRow>::Unbounded,
);
let streams: Vec<_> =
futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| {
table.iter_with_vnode(vnode, &pk_range, PrefetchOptions::default())
}))
.await?
.into_iter()
.map(Box::pin)
.collect();
let stream = table
.iter_with_prefix(
self.this_partition_key,
&sub_range,
PrefetchOptions::default(),
)
.await?;

#[for_await]
for row in merge_sort(streams) {
for row in stream {
let row: OwnedRow = row?.into_owned_row();

if !Row::eq(
self.this_partition_key,
(&row).project(self.partition_key_indices),
) {
// we've reached the end of this partition
break;
}

let key = self.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);

Expand All @@ -784,17 +756,16 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Ok(())
}

fn state_key_to_table_pk(&self, key: &StateKey) -> StreamExecutorResult<OwnedRow> {
Ok(self
.this_partition_key
.chain(memcmp_encoding::decode_row(
&key.order_key,
self.order_key_data_types,
self.order_key_order_types,
)?)
.chain(key.pk.as_inner())
.project(&self.state_key_to_table_pk_proj)
.into_owned_row())
/// Convert [`StateKey`] to sub pk (pk without partition key) as [`OwnedRow`].
fn state_key_to_table_sub_pk(&self, key: &StateKey) -> StreamExecutorResult<OwnedRow> {
Ok(memcmp_encoding::decode_row(
&key.order_key,
self.order_key_data_types,
self.order_key_order_types,
)?
.chain(key.pk.as_inner())
.project(&self.state_key_to_table_sub_pk_proj)
.into_owned_row())
}

fn row_to_state_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult<StateKey> {
Expand Down

0 comments on commit 57b4641

Please sign in to comment.