diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 98c8517cfbbc3..1d1a5e2a6047f 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -1359,12 +1359,6 @@ where } } -pub trait KeyedRowStream<'a>: Stream>> + 'a {} -impl<'a, T> KeyedRowStream<'a> for T where - T: Stream>> + 'a -{ -} - // Iterator functions impl< S, @@ -1388,7 +1382,7 @@ where vnode: VirtualNode, pk_range: &(Bound, Bound), prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult> { Ok(deserialize_keyed_row_stream( self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options) .await?, @@ -1433,27 +1427,6 @@ where Ok(self.local_store.iter(table_key_range, read_options).await?) } - async fn rev_iter_kv( - &self, - table_key_range: TableKeyRange, - prefix_hint: Option, - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult<::RevIter<'_>> { - let read_options = ReadOptions { - prefix_hint, - retention_seconds: self.table_option.retention_seconds, - table_id: self.table_id, - prefetch_options, - cache_policy: CachePolicy::Fill(CacheContext::Default), - ..Default::default() - }; - - Ok(self - .local_store - .rev_iter(table_key_range, read_options) - .await?) - } - /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`. /// `pk_prefix` is used to identify the exact vnode the scan should perform on. @@ -1462,28 +1435,7 @@ where pk_prefix: impl Row, sub_range: &(Bound, Bound), prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { - self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) - .await - } - - /// This function scans the table just like `iter_with_prefix`, but in reverse order. - pub async fn rev_iter_with_prefix( - &self, - pk_prefix: impl Row, - sub_range: &(Bound, Bound), - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { - self.iter_with_prefix_inner::(pk_prefix, sub_range, prefetch_options) - .await - } - - async fn iter_with_prefix_inner( - &self, - pk_prefix: impl Row, - sub_range: &(Bound, Bound), - prefetch_options: PrefetchOptions, - ) -> StreamExecutorResult> { + ) -> StreamExecutorResult> { let prefix_serializer = self.pk_serde.prefix(pk_prefix.len()); let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer); @@ -1514,8 +1466,7 @@ where trace!( table_id = %self.table_id(), ?prefix_hint, ?pk_prefix, - ?pk_prefix_indices, - iter_direction = if REVERSE { "reverse" } else { "forward" }, + ?pk_prefix_indices, "storage_iter_with_prefix" ); @@ -1524,27 +1475,15 @@ where let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode); - Ok(if REVERSE { - futures::future::Either::Left(deserialize_keyed_row_stream( - self.rev_iter_kv( - memcomparable_range_with_vnode, - prefix_hint, - prefetch_options, - ) - .await?, - &self.row_serde, - )) - } else { - futures::future::Either::Right(deserialize_keyed_row_stream( - self.iter_kv( - memcomparable_range_with_vnode, - prefix_hint, - prefetch_options, - ) - .await?, - &self.row_serde, - )) - }) + Ok(deserialize_keyed_row_stream( + self.iter_kv( + memcomparable_range_with_vnode, + prefix_hint, + prefetch_options, + ) + .await?, + &self.row_serde, + )) } /// This function scans raw key-values from the relational table with specific `pk_range` under @@ -1653,10 +1592,13 @@ where } } +pub type KeyedRowStream<'a, S: StateStore, SD: ValueRowSerde + 'a> = + impl Stream>> + 'a; + fn deserialize_keyed_row_stream<'a>( iter: impl StateStoreIter + 'a, deserializer: &'a impl ValueRowSerde, -) -> impl KeyedRowStream<'a> { +) -> impl Stream>> + 'a { iter.into_stream(move |(key, value)| { Ok(KeyedRow::new( // TODO: may avoid clone the key when key is not needed diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 763965fa5227f..baac154593f7a 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -15,7 +15,7 @@ //! Types and functions that store or manipulate state/cache inside one single over window //! partition. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::marker::PhantomData; use std::ops::{Bound, RangeInclusive}; @@ -817,14 +817,37 @@ impl<'a, S: StateStore> OverPartition<'a, S> { .await?; } - // prefetch rows before the start of the range - self.extend_cache_leftward_by_n(table, range.start()) - .await?; + // TODO(rc): Uncomment the following to enable prefetching rows before the start of the + // range once we have STATE TABLE REVERSE ITERATOR. + // self.extend_cache_leftward_by_n(table, range.start()).await?; // prefetch rows after the end of the range self.extend_cache_rightward_by_n(table, range.end()).await } + async fn extend_cache_by_range_inner( + &mut self, + table: &StateTable, + table_sub_range: (Bound, Bound), + ) -> StreamExecutorResult<()> { + let stream = table + .iter_with_prefix( + self.this_partition_key, + &table_sub_range, + PrefetchOptions::default(), + ) + .await?; + + #[for_await] + for row in stream { + let row: OwnedRow = row?.into_owned_row(); + let key = self.row_conv.row_to_state_key(&row)?; + self.range_cache.insert(CacheKey::from(key), row); + } + + Ok(()) + } + async fn extend_cache_leftward_by_n( &mut self, table: &StateTable, @@ -870,6 +893,55 @@ impl<'a, S: StateStore> OverPartition<'a, S> { Ok(()) } + async fn extend_cache_leftward_by_n_inner( + &mut self, + table: &StateTable, + range_to_exclusive: &StateKey, + ) -> StreamExecutorResult<()> { + let mut to_extend: VecDeque = VecDeque::with_capacity(MAGIC_BATCH_SIZE); + { + let sub_range = ( + Bound::::Unbounded, + Bound::Excluded( + self.row_conv + .state_key_to_table_sub_pk(range_to_exclusive)?, + ), + ); + let stream = table + .iter_with_prefix( + self.this_partition_key, + &sub_range, + PrefetchOptions::default(), + ) + .await?; + + #[for_await] + for row in stream { + let row: OwnedRow = row?.into_owned_row(); + + // For leftward extension, we now must iterate the table in order from the beginning + // of this partition and fill only the last n rows to the cache. + // TODO(rc): WE NEED STATE TABLE REVERSE ITERATOR!! + if to_extend.len() == MAGIC_BATCH_SIZE { + to_extend.pop_front(); + } + to_extend.push_back(row); + } + } + + let n_extended = to_extend.len(); + for row in to_extend { + let key = self.row_conv.row_to_state_key(&row)?; + self.range_cache.insert(CacheKey::from(key), row); + } + if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 { + // we reached the beginning of this partition in the table + self.range_cache.remove(&CacheKey::Smallest); + } + + Ok(()) + } + async fn extend_cache_rightward_by_n( &mut self, table: &StateTable, @@ -915,73 +987,6 @@ impl<'a, S: StateStore> OverPartition<'a, S> { Ok(()) } - async fn extend_cache_by_range_inner( - &mut self, - table: &StateTable, - table_sub_range: (Bound, Bound), - ) -> StreamExecutorResult<()> { - let stream = table - .iter_with_prefix( - self.this_partition_key, - &table_sub_range, - PrefetchOptions::default(), - ) - .await?; - - #[for_await] - for row in stream { - let row: OwnedRow = row?.into_owned_row(); - let key = self.row_conv.row_to_state_key(&row)?; - self.range_cache.insert(CacheKey::from(key), row); - } - - Ok(()) - } - - async fn extend_cache_leftward_by_n_inner( - &mut self, - table: &StateTable, - range_to_exclusive: &StateKey, - ) -> StreamExecutorResult<()> { - let mut n_extended = 0usize; - { - let sub_range = ( - Bound::::Unbounded, - Bound::Excluded( - self.row_conv - .state_key_to_table_sub_pk(range_to_exclusive)?, - ), - ); - let rev_stream = table - .rev_iter_with_prefix( - self.this_partition_key, - &sub_range, - PrefetchOptions::default(), - ) - .await?; - - #[for_await] - for row in rev_stream { - let row: OwnedRow = row?.into_owned_row(); - - let key = self.row_conv.row_to_state_key(&row)?; - self.range_cache.insert(CacheKey::from(key), row); - - n_extended += 1; - if n_extended == MAGIC_BATCH_SIZE { - break; - } - } - } - - if n_extended < MAGIC_BATCH_SIZE && self.cache_real_len() > 0 { - // we reached the beginning of this partition in the table - self.range_cache.remove(&CacheKey::Smallest); - } - - Ok(()) - } - async fn extend_cache_rightward_by_n_inner( &mut self, table: &StateTable,