Skip to content

Commit

Permalink
fixx
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Jan 15, 2024
1 parent 47af81e commit ca099f5
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions src/stream/src/executor/top_n/topn_cache_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,25 @@ impl TopNCacheState {
self.inner.range(range)
}

pub fn extract_if<F>(&mut self, pred: F) -> ExtractIf<'_, CacheKey, CompactedRow, F, Global>
pub fn extract_if<'a, F1>(
&'a mut self,
mut pred: F1,
) -> TopNExtractIf<'a, impl FnMut(&CacheKey, &mut CompactedRow) -> bool>
where
F: FnMut(&CacheKey, &mut CompactedRow) -> bool,
F1: 'a + FnMut(&CacheKey, &CompactedRow) -> bool,
{
self.inner.extract_if(pred)
let pred_immut = move |key: &CacheKey, value: &mut CompactedRow| pred(key, value);
TopNExtractIf {
inner: self.inner.extract_if(pred_immut),
kv_heap_size: &mut self.kv_heap_size,
}
}

pub fn retain<F>(&mut self, f: F)
pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&CacheKey, &mut CompactedRow) -> bool,
F: FnMut(&CacheKey, &CompactedRow) -> bool,
{
self.inner.retain(f)
self.extract_if(|k, v| !f(k, v)).for_each(drop);
}
}

Expand Down Expand Up @@ -153,3 +160,28 @@ impl fmt::Debug for TopNCacheState {
self.inner.fmt(f)
}
}

pub struct TopNExtractIf<'a, F>
where
F: FnMut(&CacheKey, &mut CompactedRow) -> bool,
{
inner: ExtractIf<'a, CacheKey, CompactedRow, F, Global>,
kv_heap_size: &'a mut KvSize,
}

impl<'a, F> Iterator for TopNExtractIf<'a, F>
where
F: 'a + FnMut(&CacheKey, &mut CompactedRow) -> bool,
{
type Item = (CacheKey, CompactedRow);

fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.inspect(|(k, v)| self.kv_heap_size.sub(k, v))
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

0 comments on commit ca099f5

Please sign in to comment.