Skip to content

Commit

Permalink
fix: cherry-pick #13271 and #13832 to release-1.5 (#14179)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
Co-authored-by: Richard Chien <[email protected]>
  • Loading branch information
wenym1 and stdrc authored Dec 25, 2023
1 parent 8fd89c4 commit 86b81ea
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 69 deletions.
86 changes: 51 additions & 35 deletions src/stream/src/executor/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::row::{self, OwnedRow};
use risingwave_common::types::{DataType, Datum};
use risingwave_storage::StateStore;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{
Barrier, BoxedMessageStream, Executor, ExecutorInfo, Message, Mutation, PkIndicesRef,
Expand Down Expand Up @@ -55,7 +56,7 @@ impl<S: StateStore> NowExecutor<S> {
#[try_stream(ok = Message, error = StreamExecutorError)]
async fn into_stream(self) {
let Self {
mut barrier_receiver,
barrier_receiver,
mut state_table,
info,
..
Expand All @@ -68,45 +69,60 @@ impl<S: StateStore> NowExecutor<S> {
// Whether the first barrier is handled and `last_timestamp` is initialized.
let mut initialized = false;

while let Some(barrier) = barrier_receiver.recv().await {
if !initialized {
// Handle the first barrier.
state_table.init_epoch(barrier.epoch);
let state_row = {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
let data_iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await?;
pin_mut!(data_iter);
if let Some(keyed_row) = data_iter.next().await {
Some(keyed_row?)
} else {
None
}
};
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else if paused {
// Assert that no data is updated.
state_table.commit_no_data_expected(barrier.epoch);
} else {
state_table.commit(barrier.epoch).await?;
const MAX_MERGE_BARRIER_SIZE: usize = 64;

#[for_await]
for barriers in
UnboundedReceiverStream::new(barrier_receiver).ready_chunks(MAX_MERGE_BARRIER_SIZE)
{
let mut timestamp = None;
if barriers.len() > 1 {
warn!(
"handle multiple barriers at once in now executor: {}",
barriers.len()
);
}
for barrier in barriers {
if !initialized {
// Handle the first barrier.
state_table.init_epoch(barrier.epoch);
let state_row = {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) =
&(Unbounded, Unbounded);
let data_iter = state_table
.iter_with_prefix(row::empty(), sub_range, Default::default())
.await?;
pin_mut!(data_iter);
if let Some(keyed_row) = data_iter.next().await {
Some(keyed_row?)
} else {
None
}
};
last_timestamp = state_row.and_then(|row| row[0].clone());
paused = barrier.is_pause_on_startup();
initialized = true;
} else if paused {
// Assert that no data is updated.
state_table.commit_no_data_expected(barrier.epoch);
} else {
state_table.commit(barrier.epoch).await?;
}

// Extract timestamp from the current epoch.
let timestamp = Some(barrier.get_curr_epoch().as_scalar());
// Extract timestamp from the current epoch.
timestamp = Some(barrier.get_curr_epoch().as_scalar());

// Update paused state.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => paused = true,
Mutation::Resume => paused = false,
_ => {}
// Update paused state.
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => paused = true,
Mutation::Resume => paused = false,
_ => {}
}
}
}

yield Message::Barrier(barrier.clone());
yield Message::Barrier(barrier);
}

// Do not yield any messages if paused.
if paused {
Expand Down
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
let mut res_ops = Vec::with_capacity(self.limit);
let mut res_rows = Vec::with_capacity(self.limit);
let keys = K::build(&self.group_by, chunk.data_chunk())?;
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -243,11 +243,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -258,14 +254,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
11 changes: 3 additions & 8 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where

let data_types = self.info().schema.data_types();
let row_deserializer = RowDeserializer::new(data_types.clone());
let table_id_str = self.managed_state.state_table.table_id().to_string();
let table_id_str = self.managed_state.table().table_id().to_string();
let actor_id_str = self.ctx.id.to_string();
let fragment_id_str = self.ctx.fragment_id.to_string();
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down Expand Up @@ -223,11 +223,7 @@ where
}

fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) {
let (_previous_vnode_bitmap, cache_may_stale) = self
.managed_state
.state_table
.update_vnode_bitmap(vnode_bitmap);

let cache_may_stale = self.managed_state.update_vnode_bitmap(vnode_bitmap);
if cache_may_stale {
self.caches.clear();
}
Expand All @@ -242,14 +238,13 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
Ok(())
}

async fn handle_watermark(&mut self, watermark: Watermark) -> Option<Watermark> {
if watermark.col_idx == self.group_by[0] {
self.managed_state
.state_table
.update_watermark(watermark.val.clone(), false);
Some(watermark)
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
71 changes: 57 additions & 14 deletions src/stream/src/executor/top_n/top_n_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::{CacheKey, GroupKey, ManagedTopNState};
use crate::executor::error::StreamExecutorResult;

const TOPN_CACHE_HIGH_CAPACITY_FACTOR: usize = 2;
const TOPN_CACHE_MIN_CAPACITY: usize = 10;

/// Cache for [`ManagedTopNState`].
///
Expand Down Expand Up @@ -58,6 +59,11 @@ pub struct TopNCache<const WITH_TIES: bool> {
/// Assumption: `limit != 0`
pub limit: usize,

/// Number of rows corresponding to the current group.
/// This is a nice-to-have information. `None` means we don't know the row count,
/// but it doesn't prevent us from working correctly.
table_row_count: Option<usize>,

/// Data types for the full row.
///
/// For debug formatting only.
Expand Down Expand Up @@ -166,9 +172,11 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
high_capacity: offset
.checked_add(limit)
.and_then(|v| v.checked_mul(TOPN_CACHE_HIGH_CAPACITY_FACTOR))
.unwrap_or(usize::MAX),
.unwrap_or(usize::MAX)
.max(TOPN_CACHE_MIN_CAPACITY),
offset,
limit,
table_row_count: None,
data_types,
}
}
Expand All @@ -181,6 +189,21 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.clear();
}

/// Get total count of entries in the cache.
pub fn len(&self) -> usize {
self.low.len() + self.middle.len() + self.high.len()
}

pub(super) fn update_table_row_count(&mut self, table_row_count: usize) {
self.table_row_count = Some(table_row_count)
}

fn table_row_count_matched(&self) -> bool {
self.table_row_count
.map(|n| n == self.len())
.unwrap_or(false)
}

pub fn is_low_cache_full(&self) -> bool {
assert!(self.low.len() <= self.offset);
let full = self.low.len() == self.offset;
Expand Down Expand Up @@ -219,6 +242,11 @@ impl<const WITH_TIES: bool> TopNCache<WITH_TIES> {
self.high.len() >= self.high_capacity
}

fn last_cache_key_before_high(&self) -> Option<&CacheKey> {
let middle_last_key = self.middle.last_key_value().map(|(k, _)| k);
middle_last_key.or_else(|| self.low.last_key_value().map(|(k, _)| k))
}

/// Use this method instead of `self.high.insert` directly when possible.
///
/// It only inserts into high cache if the key is smaller than the largest key in the high
Expand Down Expand Up @@ -256,6 +284,10 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count += 1;
}

if !self.is_low_cache_full() {
self.low.insert(cache_key, (&row).into());
return;
Expand Down Expand Up @@ -318,36 +350,42 @@ impl TopNCacheTrait for TopNCache<false> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}

if self.is_middle_cache_full() && cache_key > *self.middle.last_key_value().unwrap().0 {
// The row is in high
self.high.remove(&cache_key);
} else if self.is_low_cache_full()
&& (self.offset == 0 || cache_key > *self.low.last_key_value().unwrap().0)
{
// The row is in mid
self.middle.remove(&cache_key);
res_ops.push(Op::Delete);
res_rows.push((&row).into());

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
}

self.middle.remove(&cache_key);
res_ops.push(Op::Delete);
res_rows.push((&row).into());

// Bring one element, if any, from high cache to middle cache
if !self.high.is_empty() {
let high_first = self.high.pop_first().unwrap();
res_ops.push(Op::Insert);
res_rows.push(high_first.1.clone());
self.middle.insert(high_first.0, high_first.1);
}

assert!(self.high.is_empty() || self.middle.len() == self.limit);
} else {
// The row is in low
self.low.remove(&cache_key);
Expand All @@ -360,12 +398,12 @@ impl TopNCacheTrait for TopNCache<false> {
self.low.insert(middle_first.0, middle_first.1);

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
Some(self.middle.last_key_value().unwrap().0.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down Expand Up @@ -393,6 +431,10 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) {
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count += 1;
}

assert!(
self.low.is_empty(),
"Offset is not supported yet for WITH TIES, so low cache should be empty"
Expand Down Expand Up @@ -482,8 +524,11 @@ impl TopNCacheTrait for TopNCache<true> {
res_ops: &mut Vec<Op>,
res_rows: &mut Vec<CompactedRow>,
) -> StreamExecutorResult<()> {
// Since low cache is always empty for WITH_TIES, this unwrap is safe.
if let Some(row_count) = self.table_row_count.as_mut() {
*row_count -= 1;
}

// Since low cache is always empty for WITH_TIES, this unwrap is safe.
let middle_last = self.middle.last_key_value().unwrap();
let middle_last_order_by = middle_last.0 .0.clone();

Expand All @@ -502,14 +547,12 @@ impl TopNCacheTrait for TopNCache<true> {
}

// Try to fill the high cache if it is empty
if self.high.is_empty() {
if self.high.is_empty() && !self.table_row_count_matched() {
managed_state
.fill_high_cache(
group_key,
self,
self.middle
.last_key_value()
.map(|(key, _value)| key.clone()),
self.last_cache_key_before_high().cloned(),
self.high_capacity,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/top_n/top_n_plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ where
}

async fn init(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.managed_state.state_table.init_epoch(epoch);
self.managed_state.init_epoch(epoch);
self.managed_state
.init_topn_cache(NO_GROUP_KEY, &mut self.cache)
.await
Expand Down
Loading

0 comments on commit 86b81ea

Please sign in to comment.