Skip to content

Commit

Permalink
clear dedup cache when needed
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Feb 22, 2023
1 parent 12c5419 commit 9442b6d
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,15 +593,20 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {

// Update the vnode bitmap for state tables of all agg calls if asked.
if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) {
iter_table_storage(&mut this.storages).for_each(|state_table| {
let _ = state_table.update_vnode_bitmap(vnode_bitmap.clone());
});
iter_table_storage(&mut this.storages)
.chain(this.distinct_dedup_tables.values_mut())
.for_each(|state_table| {
let _ = state_table.update_vnode_bitmap(vnode_bitmap.clone());
});
let previous_vnode_bitmap =
this.result_table.update_vnode_bitmap(vnode_bitmap.clone());

// Manipulate the cache if necessary.
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
vars.agg_group_cache.clear();
vars.distinct_dedup.get_dedup_caches().for_each(|cache| {
cache.clear();
});
}
}

Expand Down

0 comments on commit 9442b6d

Please sign in to comment.