Skip to content

Commit

Permalink
perf(topn): reduce unnecessary table scan in streaming (Group)TopN ex…
Browse files Browse the repository at this point in the history
…ecutors (#13832)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored and wenym1 committed Dec 25, 2023
1 parent ecd82b8 commit 224aa3a
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 34 deletions.
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
let mut res_ops = Vec::with_capacity(self.limit);
let mut res_rows = Vec::with_capacity(self.limit);
let keys = K::build(&self.group_by, chunk.data_chunk())?;
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -243,11 +243,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -258,14 +254,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where

let data_types = self.info().schema.data_types();
let row_deserializer = RowDeserializer::new(data_types.clone());
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -223,11 +223,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -242,14 +238,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
71 changes: 57 additions & 14 deletions src/stream/src/executor/top_n/top_n_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::{CacheKey, GroupKey, ManagedTopNState};
use crate::executor::error::StreamExecutorResult;

const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
const TOPN_CACHE_MIN_CAPACITY: usize = 10;

/// Cache for [`ManagedTopNState`].
///
Expand Down Expand Up @@ -58,6 +59,11 @@ pub struct TopNCache<const WITH_TIES: bool> {
/// Assumption: `limit != 0`
pub limit: usize,

/// Number of rows corresponding to the current group.
/// This is a nice-to-have information. `None` means we don't know the row count,
/// but it doesn't prevent us from working correctly.
table_row_count: Option<usize>,

/// Data types for the full row.
///
/// For debug formatting only.
Expand Down Expand Up @@ -166,9 +172,11 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
high_capacity: offset
.checked_add(limit)
.and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
.unwrap_or(usize::MAX),
.unwrap_or(usize::MAX)
.max(TOPN_CACHE_MIN_CAPACITY),
offset,
limit,
table_row_count: None,
data_types,
}
}
Expand All @@ -181,6 +189,21 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.clear();
}

/// Get total count of entries in the cache.
pub fn len(&self) -> usize {
self.low.len() + self.middle.len() + self.high.len()
}

pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
self.table_row_count = Some(table_row_count)
}

fn table_row_count_matched(&self) -> bool {
self.table_row_count
.map(|n| n == self.len())
.unwrap_or(false)
}

pub fn is_low_cache_full(&self) -> bool {
assert!(self.low.len() <= self.offset);
let full = self.low.len() == self.offset;
Expand Down Expand Up @@ -219,6 +242,11 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.len() >= self.high_capacity
}

fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
middle_last_key.or_else(|| self.low.last_key_value().map(|(k, _)| k))
}

/// Use this method instead of `self.high.insert` directly when possible.
///
/// It only inserts into high cache if the key is smaller than the largest key in the high
Expand Down Expand Up @@ -256,6 +284,10 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count += 1;
}

if !self.is_low_cache_full() {
self.low.insert(cache_key, (&row).into());
return;
Expand Down Expand Up @@ -318,36 +350,42 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}

if self.is_middle_cache_full() && cache_key > *self.middle.last_key_value().unwrap().0 {
// The row is in high
self.high.remove(&cache_key);
} else if self.is_low_cache_full()
&& (self.offset == 0 || cache_key > *self.low.last_key_value().unwrap().0)
{
// The row is in mid
self.middle.remove(&cache_key);
res_ops.push(Op::Delete);
res_rows.push((&row).into());

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
}

self.middle.remove(&cache_key);
res_ops.push(Op::Delete);
res_rows.push((&row).into());

// Bring one element, if any, from high cache to middle cache
if !self.high.is_empty() {
let high_first = self.high.pop_first().unwrap();
res_ops.push(Op::Insert);
res_rows.push(high_first.1.clone());
self.middle.insert(high_first.0, high_first.1);
}

assert!(self.high.is_empty() || self.middle.len() == self.limit);
} else {
// The row is in low
self.low.remove(&cache_key);
Expand All @@ -360,12 +398,12 @@ impl TopNCacheTrait for TopNCache<false> {
self.low.insert(middle_first.0, middle_first.1);

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down Expand Up @@ -393,6 +431,10 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count += 1;
}

assert!(
self.low.is_empty(),
"Offset is not supported yet for WITH TIES, so low cache should be empty"
Expand Down Expand Up @@ -482,8 +524,11 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
// Since low cache is always empty for WITH_TIES, this unwrap is safe.
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}

// Since low cache is always empty for WITH_TIES, this unwrap is safe.
let middle_last = self.middle.last_key_value().unwrap();
let middle_last_order_by = middle_last.0 .0.clone();

Expand All @@ -502,14 +547,12 @@ impl TopNCacheTrait for TopNCache<true> {
}

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
self.middle
.last_key_value()
.map(|(key, _value)| key.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
Loading

0 comments on commit 224aa3a

Please sign in to comment.