Skip to content

Commit

Permalink
refactor(over window): reduce repeated calculation of projection from…
Browse files Browse the repository at this point in the history
… state key to table sub-pk (#14049)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Dec 19, 2023
1 parent 3ed3078 commit 723e70b
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 76 deletions.
99 changes: 94 additions & 5 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct ExecutorInner<S: StateStore> {
order_key_order_types: Vec<OrderType>,
input_pk_indices: Vec<usize>,
input_schema_len: usize,
state_key_to_table_sub_pk_proj: Vec<usize>,

state_table: StateTable<S>,
watermark_epoch: AtomicU64Ref,
Expand Down Expand Up @@ -180,6 +181,12 @@ impl<S: StateStore> OverWindowExecutor<S> {
.map(|i| input_schema.fields()[*i].data_type.clone())
.collect();

let state_key_to_table_sub_pk_proj = RowConverter::calc_state_key_to_table_sub_pk_proj(
&args.partition_key_indices,
&args.order_key_indices,
&input_info.pk_indices,
);

Self {
input: args.input,
inner: ExecutorInner {
Expand All @@ -192,6 +199,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
order_key_order_types: args.order_key_order_types,
input_pk_indices: input_info.pk_indices,
input_schema_len: input_schema.len(),
state_key_to_table_sub_pk_proj,
state_table: args.state_table,
watermark_epoch: args.watermark_epoch,
metrics: args.metrics,
Expand Down Expand Up @@ -337,11 +345,13 @@ impl<S: StateStore> OverWindowExecutor<S> {
&mut cache,
this.cache_policy,
&this.calls,
&this.partition_key_indices,
&this.order_key_data_types,
&this.order_key_order_types,
&this.order_key_indices,
&this.input_pk_indices,
RowConverter {
state_key_to_table_sub_pk_proj: &this.state_key_to_table_sub_pk_proj,
order_key_indices: &this.order_key_indices,
order_key_data_types: &this.order_key_data_types,
order_key_order_types: &this.order_key_order_types,
input_pk_indices: &this.input_pk_indices,
},
);

// Build changes for current partition.
Expand Down Expand Up @@ -676,3 +686,82 @@ impl<S: StateStore> OverWindowExecutor<S> {
}
}
}

/// A converter that helps convert [`StateKey`] to state table sub-PK and convert executor input/output
/// row to [`StateKey`].
///
/// ## Notes
///
/// - [`StateKey`]: Over window range cache key type, containing order key and input pk.
/// - State table sub-PK: State table PK = PK prefix (partition key) + sub-PK (order key + input pk).
/// - Input/output row: Input schema is the prefix of output schema.
///
/// You can see that the content of [`StateKey`] is very similar to state table sub-PK. There's only
/// one difference: the state table PK and sub-PK don't have duplicated columns, while in [`StateKey`],
/// `order_key` and (input)`pk` may contain duplicated columns.
#[derive(Debug, Clone, Copy)]
pub(super) struct RowConverter<'a> {
state_key_to_table_sub_pk_proj: &'a [usize],
order_key_indices: &'a [usize],
order_key_data_types: &'a [DataType],
order_key_order_types: &'a [OrderType],
input_pk_indices: &'a [usize],
}

impl<'a> RowConverter<'a> {
/// Calculate the indices needed for projection from [`StateKey`] to state table sub-PK (used to do
/// prefixed table scanning). Ideally this function should be called only once by each executor instance.
/// The projection indices vec is the *selected column indices* in [`StateKey`].`order_key.chain(input_pk)`.
pub(super) fn calc_state_key_to_table_sub_pk_proj(
partition_key_indices: &[usize],
order_key_indices: &[usize],
input_pk_indices: &'a [usize],
) -> Vec<usize> {
// This process is corresponding to `StreamOverWindow::infer_state_table`.
let mut projection = Vec::with_capacity(order_key_indices.len() + input_pk_indices.len());
let mut col_dedup: HashSet<usize> = partition_key_indices.iter().copied().collect();
for (proj_idx, key_idx) in order_key_indices
.iter()
.chain(input_pk_indices.iter())
.enumerate()
{
if col_dedup.insert(*key_idx) {
projection.push(proj_idx);
}
}
projection.shrink_to_fit();
projection
}

/// Convert [`StateKey`] to sub-PK (table PK without partition key) as [`OwnedRow`].
pub(super) fn state_key_to_table_sub_pk(
&self,
key: &StateKey,
) -> StreamExecutorResult<OwnedRow> {
Ok(memcmp_encoding::decode_row(
&key.order_key,
self.order_key_data_types,
self.order_key_order_types,
)?
.chain(key.pk.as_inner())
.project(self.state_key_to_table_sub_pk_proj)
.into_owned_row())
}

/// Convert full input/output row to [`StateKey`].
pub(super) fn row_to_state_key(
&self,
full_row: impl Row + Copy,
) -> StreamExecutorResult<StateKey> {
Ok(StateKey {
order_key: memcmp_encoding::encode_row(
full_row.project(self.order_key_indices),
self.order_key_order_types,
)?,
pk: full_row
.project(self.input_pk_indices)
.into_owned_row()
.into(),
})
}
}
101 changes: 30 additions & 71 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,21 @@
//! Types and functions that store or manipulate state/cache inside one single over window
//! partition.
use std::collections::{BTreeMap, HashSet, VecDeque};
use std::collections::{BTreeMap, VecDeque};
use std::marker::PhantomData;
use std::ops::{Bound, RangeInclusive};

use futures_async_stream::for_await;
use risingwave_common::array::stream_record::Record;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
use risingwave_common::types::DataType;
use risingwave_common::util::memcmp_encoding;
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::window_function::{FrameBounds, StateKey, WindowFuncCall};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;

use super::delta_btree_map::Change;
use super::estimated_btree_map::EstimatedBTreeMap;
use super::general::RowConverter;
use super::sentinel::KeyWithSentinel;
use crate::executor::over_window::delta_btree_map::DeltaBTreeMap;
use crate::executor::test_utils::prelude::StateTable;
Expand Down Expand Up @@ -233,11 +231,7 @@ pub(super) struct OverPartition<'a, S: StateStore> {
cache_policy: CachePolicy,

calls: &'a [WindowFuncCall],
order_key_data_types: &'a [DataType],
order_key_order_types: &'a [OrderType],
order_key_indices: &'a [usize],
input_pk_indices: &'a [usize],
state_key_to_table_sub_pk_proj: Vec<usize>,
row_conv: RowConverter<'a>,

stats: OverPartitionStats,

Expand All @@ -253,37 +247,15 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
cache: &'a mut PartitionCache,
cache_policy: CachePolicy,
calls: &'a [WindowFuncCall],
partition_key_indices: &'a [usize],
order_key_data_types: &'a [DataType],
order_key_order_types: &'a [OrderType],
order_key_indices: &'a [usize],
input_pk_indices: &'a [usize],
row_conv: RowConverter<'a>,
) -> Self {
// TODO(rc): move the calculation to executor?
let mut projection = Vec::with_capacity(order_key_indices.len() + input_pk_indices.len());
let mut col_dedup: HashSet<usize> = partition_key_indices.iter().copied().collect();
for (proj_idx, key_idx) in order_key_indices
.iter()
.chain(input_pk_indices.iter())
.enumerate()
{
if col_dedup.insert(*key_idx) {
projection.push(proj_idx);
}
}
projection.shrink_to_fit();

Self {
this_partition_key,
range_cache: cache,
cache_policy,

calls,
order_key_data_types,
order_key_order_types,
order_key_indices,
input_pk_indices,
state_key_to_table_sub_pk_proj: projection,
row_conv,

stats: Default::default(),

Expand Down Expand Up @@ -481,7 +453,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
#[for_await]
for row in table_iter {
let row: OwnedRow = row?.into_owned_row();
new_cache.insert(self.row_to_state_key(&row)?.into(), row);
new_cache.insert(self.row_conv.row_to_state_key(&row)?.into(), row);
}
*self.range_cache = new_cache;

Expand Down Expand Up @@ -522,8 +494,8 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
if self.cache_real_len() == 0 {
// no normal entry in the cache, just load the given range
let table_sub_range = (
Bound::Included(self.state_key_to_table_sub_pk(range.start())?),
Bound::Included(self.state_key_to_table_sub_pk(range.end())?),
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.start())?),
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
);
tracing::debug!(
partition=?self.this_partition_key,
Expand All @@ -542,8 +514,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
if self.cache_left_is_sentinel() && *range.start() < cache_real_first_key {
// extend leftward only if there's smallest sentinel
let table_sub_range = (
Bound::Included(self.state_key_to_table_sub_pk(range.start())?),
Bound::Excluded(self.state_key_to_table_sub_pk(cache_real_first_key)?),
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.start())?),
Bound::Excluded(
self.row_conv
.state_key_to_table_sub_pk(cache_real_first_key)?,
),
);
tracing::trace!(
partition=?self.this_partition_key,
Expand All @@ -558,8 +533,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
if self.cache_right_is_sentinel() && *range.end() > cache_real_last_key {
// extend rightward only if there's largest sentinel
let table_sub_range = (
Bound::Excluded(self.state_key_to_table_sub_pk(cache_real_last_key)?),
Bound::Included(self.state_key_to_table_sub_pk(range.end())?),
Bound::Excluded(
self.row_conv
.state_key_to_table_sub_pk(cache_real_last_key)?,
),
Bound::Included(self.row_conv.state_key_to_table_sub_pk(range.end())?),
);
tracing::trace!(
partition=?self.this_partition_key,
Expand Down Expand Up @@ -595,7 +573,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
#[for_await]
for row in stream {
let row: OwnedRow = row?.into_owned_row();
let key = self.row_to_state_key(&row)?;
let key = self.row_conv.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);
}

Expand Down Expand Up @@ -656,7 +634,10 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
{
let sub_range = (
Bound::<OwnedRow>::Unbounded,
Bound::Excluded(self.state_key_to_table_sub_pk(range_to_exclusive)?),
Bound::Excluded(
self.row_conv
.state_key_to_table_sub_pk(range_to_exclusive)?,
),
);
let stream = table
.iter_with_prefix(
Expand All @@ -682,7 +663,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

let n_extended = to_extend.len();
for row in to_extend {
let key = self.row_to_state_key(&row)?;
let key = self.row_conv.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);
}
if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 {
Expand Down Expand Up @@ -746,7 +727,10 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
let mut n_extended = 0usize;
{
let sub_range = (
Bound::Excluded(self.state_key_to_table_sub_pk(range_from_exclusive)?),
Bound::Excluded(
self.row_conv
.state_key_to_table_sub_pk(range_from_exclusive)?,
),
Bound::<OwnedRow>::Unbounded,
);
let stream = table
Expand All @@ -761,7 +745,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
for row in stream {
let row: OwnedRow = row?.into_owned_row();

let key = self.row_to_state_key(&row)?;
let key = self.row_conv.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);

n_extended += 1;
Expand All @@ -778,31 +762,6 @@ impl<'a, S: StateStore> OverPartition<'a, S> {

Ok(())
}

/// Convert [`StateKey`] to sub pk (pk without partition key) as [`OwnedRow`].
fn state_key_to_table_sub_pk(&self, key: &StateKey) -> StreamExecutorResult<OwnedRow> {
Ok(memcmp_encoding::decode_row(
&key.order_key,
self.order_key_data_types,
self.order_key_order_types,
)?
.chain(key.pk.as_inner())
.project(&self.state_key_to_table_sub_pk_proj)
.into_owned_row())
}

fn row_to_state_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult<StateKey> {
Ok(StateKey {
order_key: memcmp_encoding::encode_row(
full_row.project(self.order_key_indices),
self.order_key_order_types,
)?,
pk: full_row
.project(self.input_pk_indices)
.into_owned_row()
.into(),
})
}
}

/// Find all affected ranges in the given partition with delta.
Expand Down

0 comments on commit 723e70b

Please sign in to comment.