Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(hash agg): don't emit data in HashAgg when stream is paused #19168

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ struct ExecutionVars<K: HashKey, S: StateStore> {
chunk_builder: StreamChunkBuilder,

buffer: SortBuffer<S>,

/// The stream is paused.
is_paused: bool,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -539,7 +542,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
Ok(())
}

async fn try_flush_data(this: &mut ExecutorInner<K, S>) -> StreamExecutorResult<()> {
async fn try_flush_table(this: &mut ExecutorInner<K, S>) -> StreamExecutorResult<()> {
futures::future::try_join_all(
this.all_state_tables_mut()
.map(|table| async { table.try_flush().await }),
Expand Down Expand Up @@ -589,6 +592,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
window_watermark: None,
chunk_builder: StreamChunkBuilder::new(this.chunk_size, this.info.schema.data_types()),
buffer: SortBuffer::new(window_col_idx_in_group_key, &this.intermediate_state_table),
is_paused: false,
};

// TODO(rc): use something like a `ColumnMapping` type
Expand All @@ -602,12 +606,12 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {

// First barrier
let mut input = input.execute();
let barrier = expect_first_barrier(&mut input).await?;
let first_barrier = expect_first_barrier(&mut input).await?;
this.all_state_tables_mut().for_each(|table| {
table.init_epoch(barrier.epoch);
table.init_epoch(first_barrier.epoch);
});

yield Message::Barrier(barrier);
vars.is_paused = first_barrier.is_pause_on_startup();
yield Message::Barrier(first_barrier);

#[for_await]
for msg in input {
Expand Down Expand Up @@ -635,31 +639,34 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
}
}

Self::try_flush_data(&mut this).await?;
Self::try_flush_table(&mut this).await?;
}
Message::Barrier(barrier) => {
#[for_await]
for chunk in Self::flush_data(&mut this, &mut vars) {
yield Message::Chunk(chunk?);
}
Self::flush_metrics(&this, &mut vars);
Self::commit_state_tables(&mut this, barrier.epoch).await?;

if this.emit_on_window_close {
// ignore watermarks on other columns
if let Some(watermark) =
vars.buffered_watermarks[window_col_idx_in_group_key].take()
{
yield Message::Watermark(watermark);
if !vars.is_paused {
#[for_await]
for chunk in Self::flush_data(&mut this, &mut vars) {
yield Message::Chunk(chunk?);
}
} else {
for buffered_watermark in &mut vars.buffered_watermarks {
if let Some(watermark) = buffered_watermark.take() {

if this.emit_on_window_close {
// ignore watermarks on other columns
if let Some(watermark) =
vars.buffered_watermarks[window_col_idx_in_group_key].take()
{
yield Message::Watermark(watermark);
}
} else {
for buffered_watermark in &mut vars.buffered_watermarks {
if let Some(watermark) = buffered_watermark.take() {
yield Message::Watermark(watermark);
}
}
}
}

Self::flush_metrics(&this, &mut vars);
Self::commit_state_tables(&mut this, barrier.epoch).await?;

// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) {
let previous_vnode_bitmap = this.intermediate_state_table.vnodes().clone();
Expand All @@ -669,13 +676,28 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {

// Manipulate the cache if necessary.
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
// 1. If it's not paused, we should already flushed dirty groups in `flush_data`.
// 2. If it's paused, there should be no `Chunk` message to produce new dirty groups.
assert!(vars.dirty_groups.is_empty());
vars.agg_group_cache.clear();
vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.clear();
});
}
}

if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Pause => {
vars.is_paused = true;
}
Mutation::Resume => {
vars.is_paused = false;
}
_ => (),
}
}

yield Message::Barrier(barrier);
}
}
Expand Down
Loading