From ee5cdfe1d08e1fd1447b431f8512c6f1a26909df Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 6 Dec 2023 14:56:51 +0800 Subject: [PATCH 01/11] refactor: hide state table from being mutate from outside ManagedTopNState Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/group_top_n.rs | 11 +++----- .../executor/top_n/group_top_n_appendonly.rs | 11 +++----- .../src/executor/top_n/top_n_appendonly.rs | 2 +- src/stream/src/executor/top_n/top_n_plain.rs | 2 +- src/stream/src/executor/top_n/top_n_state.rs | 25 ++++++++++++++++++- 5 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/stream/src/executor/top_n/group_top_n.rs b/src/stream/src/executor/top_n/group_top_n.rs index 204d93a2558ae..61a0de5c0a7f6 100644 --- a/src/stream/src/executor/top_n/group_top_n.rs +++ b/src/stream/src/executor/top_n/group_top_n.rs @@ -165,7 +165,7 @@ where let mut res_ops = Vec::with_capacity(self.limit); let mut res_rows = Vec::with_capacity(self.limit); let keys = K::build(&self.group_by, chunk.data_chunk())?; - let table_id_str = self.managed_state.state_table.table_id().to_string(); + let table_id_str = self.managed_state.table().table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.fragment_id.to_string(); for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { @@ -243,11 +243,7 @@ where } fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) { - let (_previous_vnode_bitmap, cache_may_stale) = self - .managed_state - .state_table - .update_vnode_bitmap(vnode_bitmap); - + let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap); if cache_may_stale { self.caches.clear(); } @@ -258,14 +254,13 @@ where } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.state_table.init_epoch(epoch); + self.managed_state.init_epoch(epoch); Ok(()) } async fn handle_watermark(&mut self, watermark: Watermark) -> Option { if watermark.col_idx == self.group_by[0] { self.managed_state - .state_table .update_watermark(watermark.val.clone(), false); Some(watermark) } else { diff --git a/src/stream/src/executor/top_n/group_top_n_appendonly.rs b/src/stream/src/executor/top_n/group_top_n_appendonly.rs index bf8cbdb0a6134..3f185a581ef53 100644 --- a/src/stream/src/executor/top_n/group_top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/group_top_n_appendonly.rs @@ -163,7 +163,7 @@ where let data_types = self.info().schema.data_types(); let row_deserializer = RowDeserializer::new(data_types.clone()); - let table_id_str = self.managed_state.state_table.table_id().to_string(); + let table_id_str = self.managed_state.table().table_id().to_string(); let actor_id_str = self.ctx.id.to_string(); let fragment_id_str = self.ctx.fragment_id.to_string(); for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { @@ -223,11 +223,7 @@ where } fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) { - let (_previous_vnode_bitmap, cache_may_stale) = self - .managed_state - .state_table - .update_vnode_bitmap(vnode_bitmap); - + let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap); if cache_may_stale { self.caches.clear(); } @@ -242,14 +238,13 @@ where } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.state_table.init_epoch(epoch); + self.managed_state.init_epoch(epoch); Ok(()) } async fn handle_watermark(&mut self, watermark: Watermark) -> Option { if watermark.col_idx == self.group_by[0] { self.managed_state - .state_table .update_watermark(watermark.val.clone(), false); Some(watermark) } else { diff --git a/src/stream/src/executor/top_n/top_n_appendonly.rs b/src/stream/src/executor/top_n/top_n_appendonly.rs index 6392b0ac491fe..4aff28bb30bd2 100644 --- a/src/stream/src/executor/top_n/top_n_appendonly.rs +++ b/src/stream/src/executor/top_n/top_n_appendonly.rs @@ -140,7 +140,7 @@ where } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.state_table.init_epoch(epoch); + self.managed_state.init_epoch(epoch); self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) .await diff --git a/src/stream/src/executor/top_n/top_n_plain.rs b/src/stream/src/executor/top_n/top_n_plain.rs index fe1a078e2d5a1..01fa6722c02df 100644 --- a/src/stream/src/executor/top_n/top_n_plain.rs +++ b/src/stream/src/executor/top_n/top_n_plain.rs @@ -174,7 +174,7 @@ where } async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { - self.managed_state.state_table.init_epoch(epoch); + self.managed_state.init_epoch(epoch); self.managed_state .init_topn_cache(NO_GROUP_KEY, &mut self.cache) .await diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 6885701e39179..e3c206f7ac8c7 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -13,9 +13,12 @@ // limitations under the License. use std::ops::Bound; +use std::sync::Arc; use futures::{pin_mut, StreamExt}; +use risingwave_common::buffer::Bitmap; use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::types::ScalarImpl; use risingwave_common::util::epoch::EpochPair; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; @@ -31,7 +34,7 @@ use crate::executor::error::StreamExecutorResult; /// `group_key` is not included. pub struct ManagedTopNState { /// Relational table. - pub(crate) state_table: StateTable, + state_table: StateTable, /// Used for serializing pk into CacheKey. cache_key_serde: CacheKeySerde, @@ -57,6 +60,26 @@ impl ManagedTopNState { } } + /// Get the immutable reference of managed state table. + pub fn table(&self) -> &StateTable { + &self.state_table + } + + /// Init epoch for the managed state table. + pub fn init_epoch(&mut self, epoch: EpochPair) { + self.state_table.init_epoch(epoch) + } + + /// Update vnode bitmap of state table, returning `cache_may_stale`. + pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc) -> bool { + self.state_table.update_vnode_bitmap(new_vnodes).1 + } + + /// Update watermark for the managed state table. + pub fn update_watermark(&mut self, watermark: ScalarImpl, eager_cleaning: bool) { + self.state_table.update_watermark(watermark, eager_cleaning) + } + pub fn insert(&mut self, value: impl Row) { self.state_table.insert(value); } From 88c5acd2a5a31dbd867c4049b5a1d318eb2d536f Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 6 Dec 2023 15:43:50 +0800 Subject: [PATCH 02/11] perf: only iterator over table when not all rows are cached Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_cache.rs | 5 ++++ src/stream/src/executor/top_n/top_n_state.rs | 29 ++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index aed23760c332f..2cc60417b52a2 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -181,6 +181,11 @@ impl TopNCache { self.high.clear(); } + /// Get total count of entries in the cahce. + pub fn len(&self) -> usize { + self.low.len() + self.middle.len() + self.high.len() + } + pub fn is_low_cache_full(&self) -> bool { assert!(self.low.len() <= self.offset); let full = self.low.len() == self.offset; diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index e3c206f7ac8c7..65908ca50db04 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -36,6 +36,9 @@ pub struct ManagedTopNState { /// Relational table. state_table: StateTable, + /// Number of rows in the state table. + row_count: Option, + /// Used for serializing pk into CacheKey. cache_key_serde: CacheKeySerde, } @@ -56,6 +59,7 @@ impl ManagedTopNState { pub fn new(state_table: StateTable, cache_key_serde: CacheKeySerde) -> Self { Self { state_table, + row_count: None, cache_key_serde, } } @@ -82,10 +86,16 @@ impl ManagedTopNState { pub fn insert(&mut self, value: impl Row) { self.state_table.insert(value); + if let Some(row_count) = self.row_count.as_mut() { + *row_count += 1; + } } pub fn delete(&mut self, value: impl Row) { self.state_table.delete(value); + if let Some(row_count) = self.row_count.as_mut() { + *row_count -= 1; + } } fn get_topn_row(&self, row: OwnedRow, group_key_len: usize) -> TopNStateRow { @@ -137,13 +147,19 @@ impl ManagedTopNState { /// * `start_key` - The start point of the key to scan. It should be the last key of the middle /// cache. It doesn't contain the group key. pub async fn fill_high_cache( - &self, + &mut self, group_key: Option, topn_cache: &mut TopNCache, start_key: Option, cache_size_limit: usize, ) -> StreamExecutorResult<()> { + if let Some(row_count) = self.row_count && row_count == topn_cache.len() { + // already cached all rows + return Ok(()) + } + let cache = &mut topn_cache.high; + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); let state_table_iter = self .state_table @@ -156,7 +172,11 @@ impl ManagedTopNState { ) .await?; pin_mut!(state_table_iter); + + let mut table_row_count = Some(0); while let Some(item) = state_table_iter.next().await { + *table_row_count.as_mut().unwrap() += 1; // try count rows in the state table + // Note(bugen): should first compare with start key before constructing TopNStateRow. let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); if let Some(start_key) = start_key.as_ref() @@ -164,12 +184,17 @@ impl ManagedTopNState { { continue; } - // let row= &topn_row.row; cache.insert(topn_row.cache_key, (&topn_row.row).into()); if cache.len() == cache_size_limit { + table_row_count = None; // cache becomes full, we cannot get precise table row count this time break; } } + + if let Some(row_count) = table_row_count { + self.row_count = Some(row_count); + } + if WITH_TIES && topn_cache.is_high_cache_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { From 9efd4bc061995f194ad56487a50532cb51c55e94 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 6 Dec 2023 16:01:39 +0800 Subject: [PATCH 03/11] fix: typo Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index 2cc60417b52a2..a0a6728569d41 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -181,7 +181,7 @@ impl TopNCache { self.high.clear(); } - /// Get total count of entries in the cahce. + /// Get total count of entries in the cache. pub fn len(&self) -> usize { self.low.len() + self.middle.len() + self.high.len() } From 2e727287351e69b59a5f9bb692482555612218d0 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 7 Dec 2023 15:40:00 +0800 Subject: [PATCH 04/11] fix: maintain row count for each group Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_cache.rs | 38 +++++++++++++++----- src/stream/src/executor/top_n/top_n_state.rs | 25 +++---------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index a0a6728569d41..3a87c9530ed39 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -58,6 +58,11 @@ pub struct TopNCache { /// Assumption: `limit != 0` pub limit: usize, + /// Number of rows corresponding to the current group. + /// This is a nice-to-have information. `None` means we don't know the row count, + /// but it doesn't prevent us from working correctly. + table_row_count: Option, + /// Data types for the full row. /// /// For debug formatting only. @@ -169,6 +174,7 @@ impl TopNCache { .unwrap_or(usize::MAX), offset, limit, + table_row_count: None, data_types, } } @@ -186,6 +192,16 @@ impl TopNCache { self.low.len() + self.middle.len() + self.high.len() } + pub(super) fn update_table_row_count(&mut self, table_row_count: usize) { + self.table_row_count = Some(table_row_count) + } + + fn table_row_count_matched(&self) -> bool { + self.table_row_count + .map(|n| n == self.len()) + .unwrap_or(false) + } + pub fn is_low_cache_full(&self) -> bool { assert!(self.low.len() <= self.offset); let full = self.low.len() == self.offset; @@ -224,6 +240,11 @@ impl TopNCache { self.high.len() >= self.high_capacity } + fn last_cache_key_before_high(&self) -> Option<&CacheKey> { + let middle_last_key = self.middle.last_key_value().map(|(k, _)| k); + middle_last_key.or_else(|| self.low.last_key_value().map(|(k, _)| k)) + } + /// Use this method instead of `self.high.insert` directly when possible. /// /// It only inserts into high cache if the key is smaller than the largest key in the high @@ -330,19 +351,20 @@ impl TopNCacheTrait for TopNCache { && (self.offset == 0 || cache_key > *self.low.last_key_value().unwrap().0) { // The row is in mid + self.middle.remove(&cache_key); + // Try to fill the high cache if it is empty - if self.high.is_empty() { + if self.high.is_empty() && !self.table_row_count_matched() { managed_state .fill_high_cache( group_key, self, - Some(self.middle.last_key_value().unwrap().0.clone()), + self.last_cache_key_before_high().cloned(), self.high_capacity, ) .await?; } - self.middle.remove(&cache_key); res_ops.push(Op::Delete); res_rows.push((&row).into()); @@ -365,12 +387,12 @@ impl TopNCacheTrait for TopNCache { self.low.insert(middle_first.0, middle_first.1); // Try to fill the high cache if it is empty - if self.high.is_empty() { + if self.high.is_empty() && !self.table_row_count_matched() { managed_state .fill_high_cache( group_key, self, - Some(self.middle.last_key_value().unwrap().0.clone()), + self.last_cache_key_before_high().cloned(), self.high_capacity, ) .await?; @@ -507,14 +529,12 @@ impl TopNCacheTrait for TopNCache { } // Try to fill the high cache if it is empty - if self.high.is_empty() { + if self.high.is_empty() && !self.table_row_count_matched() { managed_state .fill_high_cache( group_key, self, - self.middle - .last_key_value() - .map(|(key, _value)| key.clone()), + self.last_cache_key_before_high().cloned(), self.high_capacity, ) .await?; diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 65908ca50db04..03b6df871e937 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -36,9 +36,6 @@ pub struct ManagedTopNState { /// Relational table. state_table: StateTable, - /// Number of rows in the state table. - row_count: Option, - /// Used for serializing pk into CacheKey. cache_key_serde: CacheKeySerde, } @@ -59,7 +56,6 @@ impl ManagedTopNState { pub fn new(state_table: StateTable, cache_key_serde: CacheKeySerde) -> Self { Self { state_table, - row_count: None, cache_key_serde, } } @@ -86,16 +82,10 @@ impl ManagedTopNState { pub fn insert(&mut self, value: impl Row) { self.state_table.insert(value); - if let Some(row_count) = self.row_count.as_mut() { - *row_count += 1; - } } pub fn delete(&mut self, value: impl Row) { self.state_table.delete(value); - if let Some(row_count) = self.row_count.as_mut() { - *row_count -= 1; - } } fn get_topn_row(&self, row: OwnedRow, group_key_len: usize) -> TopNStateRow { @@ -153,11 +143,6 @@ impl ManagedTopNState { start_key: Option, cache_size_limit: usize, ) -> StreamExecutorResult<()> { - if let Some(row_count) = self.row_count && row_count == topn_cache.len() { - // already cached all rows - return Ok(()) - } - let cache = &mut topn_cache.high; let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); @@ -173,9 +158,9 @@ impl ManagedTopNState { .await?; pin_mut!(state_table_iter); - let mut table_row_count = Some(0); + let mut group_row_count = Some(0); while let Some(item) = state_table_iter.next().await { - *table_row_count.as_mut().unwrap() += 1; // try count rows in the state table + *group_row_count.as_mut().unwrap() += 1; // try count rows of the group in the state table // Note(bugen): should first compare with start key before constructing TopNStateRow. let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); @@ -186,13 +171,13 @@ impl ManagedTopNState { } cache.insert(topn_row.cache_key, (&topn_row.row).into()); if cache.len() == cache_size_limit { - table_row_count = None; // cache becomes full, we cannot get precise table row count this time + group_row_count = None; // cache becomes full, we cannot get precise table row count this time break; } } - if let Some(row_count) = table_row_count { - self.row_count = Some(row_count); + if let Some(row_count) = group_row_count { + topn_cache.update_table_row_count(row_count); } if WITH_TIES && topn_cache.is_high_cache_full() { From 3fd3ca82f1965ee6e565400a140a6dbeb4efd398 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 8 Dec 2023 16:24:24 +0800 Subject: [PATCH 05/11] fix: fix an EXTREMELY STUPID bug Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_cache.rs | 24 ++++++++++++++++---- src/stream/src/executor/top_n/top_n_state.rs | 2 +- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index 3a87c9530ed39..27a0096f2c796 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -282,6 +282,10 @@ impl TopNCacheTrait for TopNCache { res_ops: &mut Vec, res_rows: &mut Vec, ) { + if let Some(row_count) = self.table_row_count.as_mut() { + *row_count += 1; + } + if !self.is_low_cache_full() { self.low.insert(cache_key, (&row).into()); return; @@ -344,6 +348,10 @@ impl TopNCacheTrait for TopNCache { res_ops: &mut Vec, res_rows: &mut Vec, ) -> StreamExecutorResult<()> { + if let Some(row_count) = self.table_row_count.as_mut() { + *row_count -= 1; + } + if self.is_middle_cache_full() && cache_key > *self.middle.last_key_value().unwrap().0 { // The row is in high self.high.remove(&cache_key); @@ -352,6 +360,8 @@ impl TopNCacheTrait for TopNCache { { // The row is in mid self.middle.remove(&cache_key); + res_ops.push(Op::Delete); + res_rows.push((&row).into()); // Try to fill the high cache if it is empty if self.high.is_empty() && !self.table_row_count_matched() { @@ -365,9 +375,6 @@ impl TopNCacheTrait for TopNCache { .await?; } - res_ops.push(Op::Delete); - res_rows.push((&row).into()); - // Bring one element, if any, from high cache to middle cache if !self.high.is_empty() { let high_first = self.high.pop_first().unwrap(); @@ -375,6 +382,8 @@ impl TopNCacheTrait for TopNCache { res_rows.push(high_first.1.clone()); self.middle.insert(high_first.0, high_first.1); } + + assert!(self.high.is_empty() || self.middle.len() == self.limit); } else { // The row is in low self.low.remove(&cache_key); @@ -420,6 +429,10 @@ impl TopNCacheTrait for TopNCache { res_ops: &mut Vec, res_rows: &mut Vec, ) { + if let Some(row_count) = self.table_row_count.as_mut() { + *row_count += 1; + } + assert!( self.low.is_empty(), "Offset is not supported yet for WITH TIES, so low cache should be empty" @@ -509,8 +522,11 @@ impl TopNCacheTrait for TopNCache { res_ops: &mut Vec, res_rows: &mut Vec, ) -> StreamExecutorResult<()> { - // Since low cache is always empty for WITH_TIES, this unwrap is safe. + if let Some(row_count) = self.table_row_count.as_mut() { + *row_count -= 1; + } + // Since low cache is always empty for WITH_TIES, this unwrap is safe. let middle_last = self.middle.last_key_value().unwrap(); let middle_last_order_by = middle_last.0 .0.clone(); diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 03b6df871e937..427da358cc195 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -137,7 +137,7 @@ impl ManagedTopNState { /// * `start_key` - The start point of the key to scan. It should be the last key of the middle /// cache. It doesn't contain the group key. pub async fn fill_high_cache( - &mut self, + &self, group_key: Option, topn_cache: &mut TopNCache, start_key: Option, From 8be1ebfb744d7cd4229c82295c67a6d41479efe7 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 8 Dec 2023 16:28:00 +0800 Subject: [PATCH 06/11] perf: init table_row_count at startup if possible Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_state.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 427da358cc195..232e68d75ca35 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -282,6 +282,12 @@ impl ManagedTopNState { } } + if topn_cache.len() == 0 { + // after trying to initially fill in the cache, the length is still 0, + // meaning the table is empty + topn_cache.update_table_row_count(0); + } + Ok(()) } From a1e7f918e4e0bd848f80568004be4e174263a7ce Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 12 Dec 2023 20:13:30 +0800 Subject: [PATCH 07/11] fix: fix row count maintanence Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_state.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index b9567a09ca028..688b8bdc7dbf1 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -159,9 +159,10 @@ impl ManagedTopNState { .await?; pin_mut!(state_table_iter); - let mut group_row_count = Some(0); + let mut group_row_count = 0; + while let Some(item) = state_table_iter.next().await { - *group_row_count.as_mut().unwrap() += 1; // try count rows of the group in the state table + group_row_count += 1; // Note(bugen): should first compare with start key before constructing TopNStateRow. let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); @@ -172,18 +173,15 @@ impl ManagedTopNState { } cache.insert(topn_row.cache_key, (&topn_row.row).into()); if cache.len() == cache_size_limit { - group_row_count = None; // cache becomes full, we cannot get precise table row count this time break; } } - if let Some(row_count) = group_row_count { - topn_cache.update_table_row_count(row_count); - } - if WITH_TIES && topn_cache.is_high_cache_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { + group_row_count += 1; + let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); if topn_row.cache_key.0 == high_last_sort_key { topn_cache @@ -195,6 +193,11 @@ impl ManagedTopNState { } } + if state_table_iter.next().await.is_none() { + // We can only update the row count when we have seen all rows of the group in the table. + topn_cache.update_table_row_count(group_row_count); + } + Ok(()) } From 4a2570b230595cf7a329b60cf79aeebb45f980f1 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 12 Dec 2023 23:55:29 +0800 Subject: [PATCH 08/11] perf: init topn cache table_row_count Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_state.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index 688b8bdc7dbf1..b1ee59e8cc481 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -222,8 +222,12 @@ impl ManagedTopNState { ) .await?; pin_mut!(state_table_iter); + + let mut group_row_count = 0; + if topn_cache.offset > 0 { while let Some(item) = state_table_iter.next().await { + group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); topn_cache .low @@ -236,6 +240,7 @@ impl ManagedTopNState { assert!(topn_cache.limit > 0, "topn cache limit should always > 0"); while let Some(item) = state_table_iter.next().await { + group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); topn_cache .middle @@ -247,6 +252,7 @@ impl ManagedTopNState { if WITH_TIES && topn_cache.is_middle_cache_full() { let middle_last_sort_key = topn_cache.middle.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { + group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); if topn_row.cache_key.0 == middle_last_sort_key { topn_cache @@ -268,6 +274,7 @@ impl ManagedTopNState { while !topn_cache.is_high_cache_full() && let Some(item) = state_table_iter.next().await { + group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); topn_cache .high @@ -276,6 +283,7 @@ impl ManagedTopNState { if WITH_TIES && topn_cache.is_high_cache_full() { let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone(); while let Some(item) = state_table_iter.next().await { + group_row_count += 1; let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len()); if topn_row.cache_key.0 == high_last_sort_key { topn_cache @@ -287,10 +295,10 @@ impl ManagedTopNState { } } - if topn_cache.len() == 0 { - // after trying to initially fill in the cache, the length is still 0, - // meaning the table is empty - topn_cache.update_table_row_count(0); + if state_table_iter.next().await.is_none() { + // After trying to initially fill in the cache, all table entries are in the cache, + // we then get the precise table row count. + topn_cache.update_table_row_count(group_row_count); } Ok(()) From 68e689d83c916308fc9f1dca0160617cf587dfb7 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 12 Dec 2023 23:57:42 +0800 Subject: [PATCH 09/11] add a tmp log Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_state.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index b1ee59e8cc481..e8f9dc42f061a 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -143,6 +143,8 @@ impl ManagedTopNState { start_key: Option, cache_size_limit: usize, ) -> StreamExecutorResult<()> { + tracing::warn!("[rc] this should be triggered seldomly!!!"); + let cache = &mut topn_cache.high; let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); From e5df4a730998d777b5acc40fc9fff800af7eca79 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 13 Dec 2023 16:23:41 +0800 Subject: [PATCH 10/11] add min topn cache capacity Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_cache.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/stream/src/executor/top_n/top_n_cache.rs b/src/stream/src/executor/top_n/top_n_cache.rs index 27a0096f2c796..01253469ece34 100644 --- a/src/stream/src/executor/top_n/top_n_cache.rs +++ b/src/stream/src/executor/top_n/top_n_cache.rs @@ -28,6 +28,7 @@ use super::{CacheKey, GroupKey, ManagedTopNState}; use crate::executor::error::StreamExecutorResult; const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2; +const TOPN_CACHE_MIN_CAPACITY: usize = 10; /// Cache for [`ManagedTopNState`]. /// @@ -171,7 +172,8 @@ impl TopNCache { high_capacity: offset .checked_add(limit) .and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR)) - .unwrap_or(usize::MAX), + .unwrap_or(usize::MAX) + .max(TOPN_CACHE_MIN_CAPACITY), offset, limit, table_row_count: None, From 30e76b73970a18977c1e2978af8651d52694cfda Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 13 Dec 2023 16:24:17 +0800 Subject: [PATCH 11/11] remove tmp log Signed-off-by: Richard Chien --- src/stream/src/executor/top_n/top_n_state.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/stream/src/executor/top_n/top_n_state.rs b/src/stream/src/executor/top_n/top_n_state.rs index e8f9dc42f061a..b1ee59e8cc481 100644 --- a/src/stream/src/executor/top_n/top_n_state.rs +++ b/src/stream/src/executor/top_n/top_n_state.rs @@ -143,8 +143,6 @@ impl ManagedTopNState { start_key: Option, cache_size_limit: usize, ) -> StreamExecutorResult<()> { - tracing::warn!("[rc] this should be triggered seldomly!!!"); - let cache = &mut topn_cache.high; let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded);