Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(topn): reduce unnecessary table scan in streaming (Group)TopN executors #13832

Merged
merged 16 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
let mut res_ops = Vec::with_capacity(self.limit);
let mut res_rows = Vec::with_capacity(self.limit);
let keys = K::build(&self.group_by, chunk.data_chunk())?;
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -247,11 +247,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -262,14 +258,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where

let data_types = self.info().schema.data_types();
let row_deserializer = RowDeserializer::new(data_types.clone());
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -227,11 +227,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -246,14 +242,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
71 changes: 57 additions & 14 deletions src/stream/src/executor/top_n/top_n_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::{CacheKey, GroupKey, ManagedTopNState};
use crate::executor::error::StreamExecutorResult;

const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
const TOPN_CACHE_MIN_CAPACITY: usize = 10;

/// Cache for [`ManagedTopNState`].
///
Expand Down Expand Up @@ -58,6 +59,11 @@ pub struct TopNCache<const WITH_TIES: bool> {
/// Assumption: `limit != 0`
pub limit: usize,

/// Number of rows corresponding to the current group.
/// This is a nice-to-have information. `None` means we don't know the row count,
/// but it doesn't prevent us from working correctly.
table_row_count: Option<usize>,

/// Data types for the full row.
///
/// For debug formatting only.
Expand Down Expand Up @@ -166,9 +172,11 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
high_capacity: offset
.checked_add(limit)
.and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
.unwrap_or(usize::MAX),
.unwrap_or(usize::MAX)
.max(TOPN_CACHE_MIN_CAPACITY),
offset,
limit,
table_row_count: None,
data_types,
}
}
Expand All @@ -181,6 +189,21 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.clear();
}

/// Get total count of entries in the cache.
pub fn len(&self) -> usize {
self.low.len() + self.middle.len() + self.high.len()
}

pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
self.table_row_count = Some(table_row_count)
}

fn table_row_count_matched(&self) -> bool {
stdrc marked this conversation as resolved.
Show resolved Hide resolved
self.table_row_count
.map(|n| n == self.len())
.unwrap_or(false)
}

pub fn is_low_cache_full(&self) -> bool {
assert!(self.low.len() <= self.offset);
let full = self.low.len() == self.offset;
Expand Down Expand Up @@ -219,6 +242,11 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.len() >= self.high_capacity
}

fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
middle_last_key.or_else(|| self.low.last_key_value().map(|(k, _)| k))
}

/// Use this method instead of `self.high.insert` directly when possible.
///
/// It only inserts into high cache if the key is smaller than the largest key in the high
Expand Down Expand Up @@ -256,6 +284,10 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count += 1;
}

if !self.is_low_cache_full() {
self.low.insert(cache_key, (&row).into());
return;
Expand Down Expand Up @@ -318,36 +350,42 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}

if self.is_middle_cache_full() && cache_key > *self.middle.last_key_value().unwrap().0 {
// The row is in high
self.high.remove(&cache_key);
} else if self.is_low_cache_full()
&& (self.offset == 0 || cache_key > *self.low.last_key_value().unwrap().0)
{
// The row is in mid
self.middle.remove(&cache_key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why to move this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to keep the entries in cache consistent with table

res_ops.push(Op::Delete);
res_rows.push((&row).into());

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
}

self.middle.remove(&cache_key);
res_ops.push(Op::Delete);
res_rows.push((&row).into());

// Bring one element, if any, from high cache to middle cache
if !self.high.is_empty() {
let high_first = self.high.pop_first().unwrap();
res_ops.push(Op::Insert);
res_rows.push(high_first.1.clone());
self.middle.insert(high_first.0, high_first.1);
}

assert!(self.high.is_empty() || self.middle.len() == self.limit);
} else {
// The row is in low
self.low.remove(&cache_key);
Expand All @@ -360,12 +398,12 @@ impl TopNCacheTrait for TopNCache<false> {
self.low.insert(middle_first.0, middle_first.1);

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down Expand Up @@ -393,6 +431,10 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count += 1;
}

assert!(
self.low.is_empty(),
"Offset is not supported yet for WITH TIES, so low cache should be empty"
Expand Down Expand Up @@ -482,8 +524,11 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
// Since low cache is always empty for WITH_TIES, this unwrap is safe.
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}

// Since low cache is always empty for WITH_TIES, this unwrap is safe.
let middle_last = self.middle.last_key_value().unwrap();
let middle_last_order_by = middle_last.0 .0.clone();

Expand All @@ -502,14 +547,12 @@ impl TopNCacheTrait for TopNCache<true> {
}

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
self.middle
.last_key_value()
.map(|(key, _value)| key.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
Loading
Loading