Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(topn): reduce unnecessary table scan in streaming (Group)TopN executors #13832

Merged
merged 16 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
let mut res_ops = Vec::with_capacity(self.limit);
let mut res_rows = Vec::with_capacity(self.limit);
let keys = K::build(&self.group_by, chunk.data_chunk())?;
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -243,11 +243,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -258,14 +254,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where

let data_types = self.info().schema.data_types();
let row_deserializer = RowDeserializer::new(data_types.clone());
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -223,11 +223,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -242,14 +238,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
43 changes: 34 additions & 9 deletions src/stream/src/executor/top_n/top_n_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ pub struct TopNCache<const WITH_TIES: bool> {
/// Assumption: `limit != 0`
pub limit: usize,

/// Number of rows corresponding to the current group.
/// This is a nice-to-have information. `None` means we don't know the row count,
/// but it doesn't prevent us from working correctly.
table_row_count: Option<usize>,

/// Data types for the full row.
///
/// For debug formatting only.
Expand Down Expand Up @@ -169,6 +174,7 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
.unwrap_or(usize::MAX),
offset,
limit,
table_row_count: None,
data_types,
}
}
Expand All @@ -181,6 +187,21 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.clear();
}

/// Get total count of entries in the cache.
pub fn len(&self) -> usize {
self.low.len() + self.middle.len() + self.high.len()
}

pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
self.table_row_count = Some(table_row_count)
}

fn table_row_count_matched(&self) -> bool {
stdrc marked this conversation as resolved.
Show resolved Hide resolved
self.table_row_count
.map(|n| n == self.len())
.unwrap_or(false)
}

pub fn is_low_cache_full(&self) -> bool {
assert!(self.low.len() <= self.offset);
let full = self.low.len() == self.offset;
Expand Down Expand Up @@ -219,6 +240,11 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.len() >= self.high_capacity
}

fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
middle_last_key.or_else(|| self.low.last_key_value().map(|(k, _)| k))
}

/// Use this method instead of `self.high.insert` directly when possible.
///
/// It only inserts into high cache if the key is smaller than the largest key in the high
Expand Down Expand Up @@ -325,19 +351,20 @@ impl TopNCacheTrait for TopNCache<false> {
&& (self.offset == 0 || cache_key > *self.low.last_key_value().unwrap().0)
{
// The row is in mid
self.middle.remove(&cache_key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why to move this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to keep the entries in cache consistent with table


// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
}

self.middle.remove(&cache_key);
res_ops.push(Op::Delete);
res_rows.push((&row).into());

Expand All @@ -360,12 +387,12 @@ impl TopNCacheTrait for TopNCache<false> {
self.low.insert(middle_first.0, middle_first.1);

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down Expand Up @@ -502,14 +529,12 @@ impl TopNCacheTrait for TopNCache<true> {
}

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
self.middle
.last_key_value()
.map(|(key, _value)| key.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
39 changes: 36 additions & 3 deletions src/stream/src/executor/top_n/top_n_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
// limitations under the License.

use std::ops::Bound;
use std::sync::Arc;

use futures::{pin_mut, StreamExt};
use risingwave_common::buffer::Bitmap;
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::types::ScalarImpl;
use risingwave_common::util::epoch::EpochPair;
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;
Expand All @@ -31,7 +34,7 @@ use crate::executor::error::StreamExecutorResult;
/// `group_key` is not included.
pub struct ManagedTopNState<S: StateStore> {
/// Relational table.
pub(crate) state_table: StateTable<S>,
state_table: StateTable<S>,

/// Used for serializing pk into CacheKey.
cache_key_serde: CacheKeySerde,
Expand All @@ -57,6 +60,26 @@ impl<S: StateStore> ManagedTopNState<S> {
}
}

/// Get the immutable reference of managed state table.
pub fn table(&self) -> &StateTable<S> {
&self.state_table
}

/// Init epoch for the managed state table.
pub fn init_epoch(&mut self, epoch: EpochPair) {
self.state_table.init_epoch(epoch)
}

/// Update vnode bitmap of state table, returning `cache_may_stale`.
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> bool {
self.state_table.update_vnode_bitmap(new_vnodes).1
}

/// Update watermark for the managed state table.
pub fn update_watermark(&mut self, watermark: ScalarImpl, eager_cleaning: bool) {
self.state_table.update_watermark(watermark, eager_cleaning)
}

pub fn insert(&mut self, value: impl Row) {
self.state_table.insert(value);
}
Expand Down Expand Up @@ -114,13 +137,14 @@ impl<S: StateStore> ManagedTopNState<S> {
/// * `start_key` - The start point of the key to scan. It should be the last key of the middle
/// cache. It doesn't contain the group key.
pub async fn fill_high_cache<const WITH_TIES: bool>(
&self,
&mut self,
stdrc marked this conversation as resolved.
Show resolved Hide resolved
group_key: Option<impl GroupKey>,
topn_cache: &mut TopNCache<WITH_TIES>,
start_key: Option<CacheKey>,
cache_size_limit: usize,
) -> StreamExecutorResult<()> {
let cache = &mut topn_cache.high;

let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
let state_table_iter = self
.state_table
Expand All @@ -133,20 +157,29 @@ impl<S: StateStore> ManagedTopNState<S> {
)
.await?;
pin_mut!(state_table_iter);

let mut group_row_count = Some(0);
while let Some(item) = state_table_iter.next().await {
*group_row_count.as_mut().unwrap() += 1; // try count rows of the group in the state table

// Note(bugen): should first compare with start key before constructing TopNStateRow.
let topn_row = self.get_topn_row(item?.into_owned_row(), group_key.len());
if let Some(start_key) = start_key.as_ref()
&& &topn_row.cache_key <= start_key
{
continue;
}
// let row= &topn_row.row;
cache.insert(topn_row.cache_key, (&topn_row.row).into());
if cache.len() == cache_size_limit {
group_row_count = None; // cache becomes full, we cannot get precise table row count this time
break;
}
}

if let Some(row_count) = group_row_count {
topn_cache.update_table_row_count(row_count);
}

if WITH_TIES && topn_cache.is_high_cache_full() {
let high_last_sort_key = topn_cache.high.last_key_value().unwrap().0 .0.clone();
while let Some(item) = state_table_iter.next().await {
Expand Down
Loading