Skip to content

Commit

Permalink
chore: revert "perf(over window): optimize range cache with state tab…
Browse files Browse the repository at this point in the history
…le reverse iterator" (#17158)
  • Loading branch information
stdrc authored Jun 7, 2024
1 parent 845748c commit f52d177
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 145 deletions.
90 changes: 16 additions & 74 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,12 +1359,6 @@ where
}
}

pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
impl<'a, T> KeyedRowStream<'a> for T where
T: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a
{
}

// Iterator functions
impl<
S,
Expand All @@ -1388,7 +1382,7 @@ where
vnode: VirtualNode,
pk_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
Ok(deserialize_keyed_row_stream(
self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
.await?,
Expand Down Expand Up @@ -1433,27 +1427,6 @@ where
Ok(self.local_store.iter(table_key_range, read_options).await?)
}

async fn rev_iter_kv(
&self,
table_key_range: TableKeyRange,
prefix_hint: Option<Bytes>,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
let read_options = ReadOptions {
prefix_hint,
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
prefetch_options,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
};

Ok(self
.local_store
.rev_iter(table_key_range, read_options)
.await?)
}

/// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
/// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
/// `pk_prefix` is used to identify the exact vnode the scan should perform on.
Expand All @@ -1462,28 +1435,7 @@ where
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
self.iter_with_prefix_inner::</* REVERSE */ false>(pk_prefix, sub_range, prefetch_options)
.await
}

/// This function scans the table just like `iter_with_prefix`, but in reverse order.
pub async fn rev_iter_with_prefix(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
self.iter_with_prefix_inner::</* REVERSE */ true>(pk_prefix, sub_range, prefetch_options)
.await
}

async fn iter_with_prefix_inner<const REVERSE: bool>(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
) -> StreamExecutorResult<KeyedRowStream<'_, S, SD>> {
let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);

Expand Down Expand Up @@ -1514,8 +1466,7 @@ where
trace!(
table_id = %self.table_id(),
?prefix_hint, ?pk_prefix,
?pk_prefix_indices,
iter_direction = if REVERSE { "reverse" } else { "forward" },
?pk_prefix_indices,
"storage_iter_with_prefix"
);

Expand All @@ -1524,27 +1475,15 @@ where

let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);

Ok(if REVERSE {
futures::future::Either::Left(deserialize_keyed_row_stream(
self.rev_iter_kv(
memcomparable_range_with_vnode,
prefix_hint,
prefetch_options,
)
.await?,
&self.row_serde,
))
} else {
futures::future::Either::Right(deserialize_keyed_row_stream(
self.iter_kv(
memcomparable_range_with_vnode,
prefix_hint,
prefetch_options,
)
.await?,
&self.row_serde,
))
})
Ok(deserialize_keyed_row_stream(
self.iter_kv(
memcomparable_range_with_vnode,
prefix_hint,
prefetch_options,
)
.await?,
&self.row_serde,
))
}

/// This function scans raw key-values from the relational table with specific `pk_range` under
Expand Down Expand Up @@ -1653,10 +1592,13 @@ where
}
}

pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> =
impl Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;

fn deserialize_keyed_row_stream<'a>(
iter: impl StateStoreIter + 'a,
deserializer: &'a impl ValueRowSerde,
) -> impl KeyedRowStream<'a> {
) -> impl Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {
iter.into_stream(move |(key, value)| {
Ok(KeyedRow::new(
// TODO: may avoid clone the key when key is not needed
Expand Down
147 changes: 76 additions & 71 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Types and functions that store or manipulate state/cache inside one single over window
//! partition.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::marker::PhantomData;
use std::ops::{Bound, RangeInclusive};

Expand Down Expand Up @@ -817,14 +817,37 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
.await?;
}

// prefetch rows before the start of the range
self.extend_cache_leftward_by_n(table, range.start())
.await?;
// TODO(rc): Uncomment the following to enable prefetching rows before the start of the
// range once we have STATE TABLE REVERSE ITERATOR.
// self.extend_cache_leftward_by_n(table, range.start()).await?;

// prefetch rows after the end of the range
self.extend_cache_rightward_by_n(table, range.end()).await
}

async fn extend_cache_by_range_inner(
&mut self,
table: &StateTable<S>,
table_sub_range: (Bound<impl Row>, Bound<impl Row>),
) -> StreamExecutorResult<()> {
let stream = table
.iter_with_prefix(
self.this_partition_key,
&table_sub_range,
PrefetchOptions::default(),
)
.await?;

#[for_await]
for row in stream {
let row: OwnedRow = row?.into_owned_row();
let key = self.row_conv.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);
}

Ok(())
}

async fn extend_cache_leftward_by_n(
&mut self,
table: &StateTable<S>,
Expand Down Expand Up @@ -870,6 +893,55 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Ok(())
}

async fn extend_cache_leftward_by_n_inner(
&mut self,
table: &StateTable<S>,
range_to_exclusive: &StateKey,
) -> StreamExecutorResult<()> {
let mut to_extend: VecDeque<OwnedRow> = VecDeque::with_capacity(MAGIC_BATCH_SIZE);
{
let sub_range = (
Bound::<OwnedRow>::Unbounded,
Bound::Excluded(
self.row_conv
.state_key_to_table_sub_pk(range_to_exclusive)?,
),
);
let stream = table
.iter_with_prefix(
self.this_partition_key,
&sub_range,
PrefetchOptions::default(),
)
.await?;

#[for_await]
for row in stream {
let row: OwnedRow = row?.into_owned_row();

// For leftward extension, we now must iterate the table in order from the beginning
// of this partition and fill only the last n rows to the cache.
// TODO(rc): WE NEED STATE TABLE REVERSE ITERATOR!!
if to_extend.len() == MAGIC_BATCH_SIZE {
to_extend.pop_front();
}
to_extend.push_back(row);
}
}

let n_extended = to_extend.len();
for row in to_extend {
let key = self.row_conv.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);
}
if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 {
// we reached the beginning of this partition in the table
self.range_cache.remove(&CacheKey::Smallest);
}

Ok(())
}

async fn extend_cache_rightward_by_n(
&mut self,
table: &StateTable<S>,
Expand Down Expand Up @@ -915,73 +987,6 @@ impl<'a, S: StateStore> OverPartition<'a, S> {
Ok(())
}

async fn extend_cache_by_range_inner(
&mut self,
table: &StateTable<S>,
table_sub_range: (Bound<impl Row>, Bound<impl Row>),
) -> StreamExecutorResult<()> {
let stream = table
.iter_with_prefix(
self.this_partition_key,
&table_sub_range,
PrefetchOptions::default(),
)
.await?;

#[for_await]
for row in stream {
let row: OwnedRow = row?.into_owned_row();
let key = self.row_conv.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);
}

Ok(())
}

async fn extend_cache_leftward_by_n_inner(
&mut self,
table: &StateTable<S>,
range_to_exclusive: &StateKey,
) -> StreamExecutorResult<()> {
let mut n_extended = 0usize;
{
let sub_range = (
Bound::<OwnedRow>::Unbounded,
Bound::Excluded(
self.row_conv
.state_key_to_table_sub_pk(range_to_exclusive)?,
),
);
let rev_stream = table
.rev_iter_with_prefix(
self.this_partition_key,
&sub_range,
PrefetchOptions::default(),
)
.await?;

#[for_await]
for row in rev_stream {
let row: OwnedRow = row?.into_owned_row();

let key = self.row_conv.row_to_state_key(&row)?;
self.range_cache.insert(CacheKey::from(key), row);

n_extended += 1;
if n_extended == MAGIC_BATCH_SIZE {
break;
}
}
}

if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 {
// we reached the beginning of this partition in the table
self.range_cache.remove(&CacheKey::Smallest);
}

Ok(())
}

async fn extend_cache_rightward_by_n_inner(
&mut self,
table: &StateTable<S>,
Expand Down

0 comments on commit f52d177

Please sign in to comment.