Skip to content

Commit

Permalink
better naming
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Feb 22, 2023
1 parent 9442b6d commit 56ec2bf
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/stream/src/executor/aggregation/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl<S: StateStore> DistinctDeduplicater<S> {
Self { deduplicaters }
}

pub fn get_dedup_caches(&mut self) -> impl Iterator<Item = &mut DedupCache> {
pub fn dedup_caches_mut(&mut self) -> impl Iterator<Item = &mut DedupCache> {
self.deduplicaters
.values_mut()
.map(|(_, deduplicater)| &mut deduplicater.cache)
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/global_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
state_changed: false,
};

vars.distinct_dedup.get_dedup_caches().for_each(|cache| {
vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.update_epoch(barrier.epoch.curr);
});

Expand Down Expand Up @@ -340,7 +340,7 @@ impl<S: StateStore> GlobalSimpleAggExecutor<S> {
{
yield Message::Chunk(chunk);
}
vars.distinct_dedup.get_dedup_caches().for_each(|cache| {
vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.update_epoch(barrier.epoch.curr);
});
yield Message::Barrier(barrier);
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
});
this.result_table.init_epoch(barrier.epoch);
vars.agg_group_cache.update_epoch(barrier.epoch.curr);
vars.distinct_dedup.get_dedup_caches().for_each(|cache| {
vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.update_epoch(barrier.epoch.curr);
});

Expand Down Expand Up @@ -604,15 +604,15 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
// Manipulate the cache if necessary.
if cache_may_stale(&previous_vnode_bitmap, &vnode_bitmap) {
vars.agg_group_cache.clear();
vars.distinct_dedup.get_dedup_caches().for_each(|cache| {
vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.clear();
});
}
}

// Update the current epoch.
vars.agg_group_cache.update_epoch(barrier.epoch.curr);
vars.distinct_dedup.get_dedup_caches().for_each(|cache| {
vars.distinct_dedup.dedup_caches_mut().for_each(|cache| {
cache.update_epoch(barrier.epoch.curr);
});

Expand Down

0 comments on commit 56ec2bf

Please sign in to comment.