From 57b46414e673219aecf87175e5f04bf74b83bb7b Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 17 Oct 2023 10:25:47 -0500 Subject: [PATCH] refactor(over window): use new `table.iter_with_prefix` in streaming OverWindow (#12872) Signed-off-by: Richard Chien --- .../src/executor/over_window/general.rs | 1 - .../executor/over_window/over_partition.rs | 147 +++++++----------- 2 files changed, 59 insertions(+), 89 deletions(-) diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 5a01fabd25149..9e66835b54b05 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -345,7 +345,6 @@ impl OverWindowExecutor { &mut cache, this.cache_policy, &this.calls, - &this.partition_key_indices, &this.order_key_data_types, &this.order_key_order_types, &this.order_key_indices, diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index b89c51478dbd2..42529a1c80587 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -19,11 +19,8 @@ use std::collections::{BTreeMap, HashSet, VecDeque}; use std::marker::PhantomData; use std::ops::{Bound, RangeInclusive}; -use futures::stream::select_all; -use futures::{stream, StreamExt, TryStreamExt}; use futures_async_stream::for_await; use risingwave_common::array::stream_record::Record; -use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; use risingwave_common::types::DataType; @@ -31,7 +28,6 @@ use risingwave_common::util::memcmp_encoding; use risingwave_common::util::sort_util::OrderType; use risingwave_expr::window_function::{FrameBounds, StateKey, WindowFuncCall}; use risingwave_storage::store::PrefetchOptions; -use risingwave_storage::table::merge_sort::merge_sort; use risingwave_storage::StateStore; use super::delta_btree_map::Change; @@ -230,12 +226,11 @@ pub(super) struct OverPartition<'a, S: StateStore> { cache_policy: CachePolicy, calls: &'a [WindowFuncCall], - partition_key_indices: &'a [usize], order_key_data_types: &'a [DataType], order_key_order_types: &'a [OrderType], order_key_indices: &'a [usize], input_pk_indices: &'a [usize], - state_key_to_table_pk_proj: Vec, + state_key_to_table_sub_pk_proj: Vec, _phantom: PhantomData, } @@ -248,20 +243,16 @@ impl<'a, S: StateStore> OverPartition<'a, S> { cache: &'a mut PartitionCache, cache_policy: CachePolicy, calls: &'a [WindowFuncCall], - partition_key_indices: &'a [usize], order_key_data_types: &'a [DataType], order_key_order_types: &'a [OrderType], order_key_indices: &'a [usize], input_pk_indices: &'a [usize], ) -> Self { // TODO(rc): move the calculation to executor? - let mut projection = Vec::with_capacity( - partition_key_indices.len() + order_key_indices.len() + input_pk_indices.len(), - ); + let mut projection = Vec::with_capacity(order_key_indices.len() + input_pk_indices.len()); let mut col_dedup = HashSet::new(); - for (proj_idx, key_idx) in partition_key_indices + for (proj_idx, key_idx) in order_key_indices .iter() - .chain(order_key_indices.iter()) .chain(input_pk_indices.iter()) .enumerate() { @@ -277,12 +268,11 @@ impl<'a, S: StateStore> OverPartition<'a, S> { cache_policy, calls, - partition_key_indices, order_key_data_types, order_key_order_types, order_key_indices, input_pk_indices, - state_key_to_table_pk_proj: projection, + state_key_to_table_sub_pk_proj: projection, _phantom: PhantomData, } } @@ -508,17 +498,17 @@ impl<'a, S: StateStore> OverPartition<'a, S> { if self.cache_real_len() == 0 { // no normal entry in the cache, just load the given range - let table_pk_range = ( - Bound::Included(self.state_key_to_table_pk(range.start())?), - Bound::Included(self.state_key_to_table_pk(range.end())?), + let table_sub_range = ( + Bound::Included(self.state_key_to_table_sub_pk(range.start())?), + Bound::Included(self.state_key_to_table_sub_pk(range.end())?), ); tracing::debug!( partition=?self.this_partition_key, - table_pk_range=?table_pk_range, + table_sub_range=?table_sub_range, "cache is empty, just loading the given range" ); return self - .extend_cache_by_range_inner(table, table_pk_range) + .extend_cache_by_range_inner(table, table_sub_range) .await; } @@ -528,33 +518,33 @@ impl<'a, S: StateStore> OverPartition<'a, S> { if self.cache_left_is_sentinel() && *range.start() < cache_real_first_key { // extend leftward only if there's smallest sentinel - let table_pk_range = ( - Bound::Included(self.state_key_to_table_pk(range.start())?), - Bound::Excluded(self.state_key_to_table_pk(cache_real_first_key)?), + let table_sub_range = ( + Bound::Included(self.state_key_to_table_sub_pk(range.start())?), + Bound::Excluded(self.state_key_to_table_sub_pk(cache_real_first_key)?), ); tracing::trace!( partition=?self.this_partition_key, - table_pk_range=?table_pk_range, + table_sub_range=?table_sub_range, "loading the left half of given range" ); return self - .extend_cache_by_range_inner(table, table_pk_range) + .extend_cache_by_range_inner(table, table_sub_range) .await; } if self.cache_right_is_sentinel() && *range.end() > cache_real_last_key { // extend rightward only if there's largest sentinel - let table_pk_range = ( - Bound::Excluded(self.state_key_to_table_pk(cache_real_last_key)?), - Bound::Included(self.state_key_to_table_pk(range.end())?), + let table_sub_range = ( + Bound::Excluded(self.state_key_to_table_sub_pk(cache_real_last_key)?), + Bound::Included(self.state_key_to_table_sub_pk(range.end())?), ); tracing::trace!( partition=?self.this_partition_key, - table_pk_range=?table_pk_range, + table_sub_range=?table_sub_range, "loading the right half of given range" ); return self - .extend_cache_by_range_inner(table, table_pk_range) + .extend_cache_by_range_inner(table, table_sub_range) .await; } @@ -569,24 +559,18 @@ impl<'a, S: StateStore> OverPartition<'a, S> { async fn extend_cache_by_range_inner( &mut self, table: &StateTable, - table_pk_range: (Bound, Bound), + table_sub_range: (Bound, Bound), ) -> StreamExecutorResult<()> { - let streams = stream::iter(table.vnode_bitmap().iter_vnodes()) - .map(|vnode| { - table.iter_with_vnode( - vnode, - &table_pk_range, - PrefetchOptions::new_for_exhaust_iter(), - ) - }) - .buffer_unordered(10) - .try_collect::>() - .await? - .into_iter() - .map(Box::pin); + let stream = table + .iter_with_prefix( + self.this_partition_key, + &table_sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) + .await?; #[for_await] - for row in select_all(streams) { + for row in stream { let row: OwnedRow = row?.into_owned_row(); let key = self.row_to_state_key(&row)?; self.range_cache.insert(CacheKey::from(key), row); @@ -647,21 +631,20 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ) -> StreamExecutorResult<()> { let mut to_extend: VecDeque = VecDeque::with_capacity(MAGIC_BATCH_SIZE); { - let pk_range = ( - Bound::Included(self.this_partition_key.into_owned_row()), - Bound::Excluded(self.state_key_to_table_pk(range_to_exclusive)?), + let sub_range = ( + Bound::::Unbounded, + Bound::Excluded(self.state_key_to_table_sub_pk(range_to_exclusive)?), ); - let streams: Vec<_> = - futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| { - table.iter_with_vnode(vnode, &pk_range, PrefetchOptions::new_for_exhaust_iter()) - })) - .await? - .into_iter() - .map(Box::pin) - .collect(); + let stream = table + .iter_with_prefix( + self.this_partition_key, + &sub_range, + PrefetchOptions::new_for_exhaust_iter(), + ) + .await?; #[for_await] - for row in merge_sort(streams) { + for row in stream { let row: OwnedRow = row?.into_owned_row(); // For leftward extension, we now must iterate the table in order from the beginning @@ -739,33 +722,22 @@ impl<'a, S: StateStore> OverPartition<'a, S> { ) -> StreamExecutorResult<()> { let mut n_extended = 0usize; { - let pk_range = ( - Bound::Excluded(self.state_key_to_table_pk(range_from_exclusive)?), - // currently we can't get the first possible key after this partition, so use - // `Unbounded` plus manual check for workaround + let sub_range = ( + Bound::Excluded(self.state_key_to_table_sub_pk(range_from_exclusive)?), Bound::::Unbounded, ); - let streams: Vec<_> = - futures::future::try_join_all(table.vnode_bitmap().iter_vnodes().map(|vnode| { - table.iter_with_vnode(vnode, &pk_range, PrefetchOptions::default()) - })) - .await? - .into_iter() - .map(Box::pin) - .collect(); + let stream = table + .iter_with_prefix( + self.this_partition_key, + &sub_range, + PrefetchOptions::default(), + ) + .await?; #[for_await] - for row in merge_sort(streams) { + for row in stream { let row: OwnedRow = row?.into_owned_row(); - if !Row::eq( - self.this_partition_key, - (&row).project(self.partition_key_indices), - ) { - // we've reached the end of this partition - break; - } - let key = self.row_to_state_key(&row)?; self.range_cache.insert(CacheKey::from(key), row); @@ -784,17 +756,16 @@ impl<'a, S: StateStore> OverPartition<'a, S> { Ok(()) } - fn state_key_to_table_pk(&self, key: &StateKey) -> StreamExecutorResult { - Ok(self - .this_partition_key - .chain(memcmp_encoding::decode_row( - &key.order_key, - self.order_key_data_types, - self.order_key_order_types, - )?) - .chain(key.pk.as_inner()) - .project(&self.state_key_to_table_pk_proj) - .into_owned_row()) + /// Convert [`StateKey`] to sub pk (pk without partition key) as [`OwnedRow`]. + fn state_key_to_table_sub_pk(&self, key: &StateKey) -> StreamExecutorResult { + Ok(memcmp_encoding::decode_row( + &key.order_key, + self.order_key_data_types, + self.order_key_order_types, + )? + .chain(key.pk.as_inner()) + .project(&self.state_key_to_table_sub_pk_proj) + .into_owned_row()) } fn row_to_state_key(&self, full_row: impl Row + Copy) -> StreamExecutorResult {