From 9442b6d770e9591b70552807830efc8601a06081 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 22 Feb 2023 14:22:56 +0800 Subject: [PATCH] clear dedup cache when needed Signed-off-by: Richard Chien --- src/stream/src/executor/hash_agg.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/stream/src/executor/hash_agg.rs b/src/stream/src/executor/hash_agg.rs index 413c1f2def4fa..e98e09637e2ba 100644 --- a/src/stream/src/executor/hash_agg.rs +++ b/src/stream/src/executor/hash_agg.rs @@ -593,15 +593,20 @@ impl HashAggExecutor { // Update the vnode bitmap for state tables of all agg calls if asked. if let Some(vnode_bitmap) = barrier.as_update_vnode_bitmap(this.actor_ctx.id) { - iter_table_storage(&mut this.storages).for_each(|state_table| { - let _ = state_table.update_vnode_bitmap(vnode_bitmap.clone()); - }); + iter_table_storage(&mut this.storages) + .chain(this.distinct_dedup_tables.values_mut()) + .for_each(|state_table| { + let _ = state_table.update_vnode_bitmap(vnode_bitmap.clone()); + }); let previous_vnode_bitmap = this.result_table.update_vnode_bitmap(vnode_bitmap.clone()); // Manipulate the cache if necessary. if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) { vars.agg_group_cache.clear(); + vars.distinct_dedup.get_dedup_caches().for_each(|cache| { + cache.clear(); + }); } }