Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick perf(topn): a series of topn refactoring and optimization to release-1.10 #19526

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt'
pkill java

echo "--- e2e, $mode, embedded udf"
python3 -m pip install --break-system-packages flask waitress
sqllogictest -p 4566 -d dev './e2e_test/udf/wasm_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/rust_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/js_udf.slt'
Expand Down
167 changes: 154 additions & 13 deletions src/common/estimate_size/src/collections/btreemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::fmt;
use std::collections::BTreeMap;
use std::ops::{Bound, RangeInclusive};

use crate::{EstimateSize, KvSize};

#[derive(Clone)]
pub struct EstimatedBTreeMap<K, V> {
inner: BTreeMap<K, V>,
heap_size: KvSize,
Expand All @@ -41,47 +43,107 @@ impl<K, V> EstimatedBTreeMap<K, V> {
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}

impl<K, V> Default for EstimatedBTreeMap<K, V> {
fn default() -> Self {
Self::new()
pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
self.inner.iter()
}

pub fn range<R>(&self, range: R) -> std::collections::btree_map::Range<'_, K, V>
where
K: Ord,
R: std::ops::RangeBounds<K>,
{
self.inner.range(range)
}

pub fn values(&self) -> impl Iterator<Item = &V> {
self.inner.values()
}
}

impl<K, V> EstimatedBTreeMap<K, V>
where
K: EstimateSize + Ord,
V: EstimateSize,
K: Ord,
{
pub fn first_key_value(&self) -> Option<(&K, &V)> {
self.inner.first_key_value()
}

pub fn first_key(&self) -> Option<&K> {
self.first_key_value().map(|(k, _)| k)
}

pub fn first_value(&self) -> Option<&V> {
self.first_key_value().map(|(_, v)| v)
}

pub fn last_key_value(&self) -> Option<(&K, &V)> {
self.inner.last_key_value()
}

pub fn insert(&mut self, key: K, row: V) {
pub fn last_key(&self) -> Option<&K> {
self.last_key_value().map(|(k, _)| k)
}

pub fn last_value(&self) -> Option<&V> {
self.last_key_value().map(|(_, v)| v)
}
}

impl<K, V> Default for EstimatedBTreeMap<K, V> {
fn default() -> Self {
Self::new()
}
}

impl<K, V> EstimatedBTreeMap<K, V>
where
K: EstimateSize + Ord,
V: EstimateSize,
{
pub fn insert(&mut self, key: K, value: V) -> Option<V> {
let key_size = self.heap_size.add_val(&key);
self.heap_size.add_val(&row);
if let Some(old_row) = self.inner.insert(key, row) {
self.heap_size.add_val(&value);
let old_value = self.inner.insert(key, value);
if let Some(old_value) = &old_value {
self.heap_size.sub_size(key_size);
self.heap_size.sub_val(&old_row);
self.heap_size.sub_val(old_value);
}
old_value
}

pub fn remove(&mut self, key: &K) {
if let Some(row) = self.inner.remove(key) {
self.heap_size.sub(key, &row);
pub fn remove(&mut self, key: &K) -> Option<V> {
let old_value = self.inner.remove(key);
if let Some(old_value) = &old_value {
self.heap_size.sub(key, old_value);
}
old_value
}

pub fn clear(&mut self) {
self.inner.clear();
self.heap_size.set(0);
}

pub fn pop_first(&mut self) -> Option<(K, V)> {
let (key, value) = self.inner.pop_first()?;
self.heap_size.sub(&key, &value);
Some((key, value))
}

pub fn pop_last(&mut self) -> Option<(K, V)> {
let (key, value) = self.inner.pop_last()?;
self.heap_size.sub(&key, &value);
Some((key, value))
}

pub fn last_entry(&mut self) -> Option<OccupiedEntry<'_, K, V>> {
self.inner.last_entry().map(|inner| OccupiedEntry {
inner,
heap_size: &mut self.heap_size,
})
}

/// Retain the given range of entries in the map, removing others.
pub fn retain_range(&mut self, range: RangeInclusive<&K>) -> (BTreeMap<K, V>, BTreeMap<K, V>)
where
Expand Down Expand Up @@ -114,6 +176,27 @@ where

(left, right)
}

pub fn extract_if<'a, F>(
&'a mut self,
mut pred: F,
) -> ExtractIf<'a, K, V, impl FnMut(&K, &mut V) -> bool>
where
F: 'a + FnMut(&K, &V) -> bool,
{
let pred_immut = move |key: &K, value: &mut V| pred(key, value);
ExtractIf {
inner: self.inner.extract_if(pred_immut),
heap_size: &mut self.heap_size,
}
}

pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&K, &V) -> bool,
{
self.extract_if(|k, v| !f(k, v)).for_each(drop);
}
}

impl<K, V> EstimateSize for EstimatedBTreeMap<K, V>
Expand All @@ -126,6 +209,64 @@ where
}
}

impl<K, V> fmt::Debug for EstimatedBTreeMap<K, V>
where
K: fmt::Debug,
V: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}

pub struct OccupiedEntry<'a, K, V> {
inner: std::collections::btree_map::OccupiedEntry<'a, K, V>,
heap_size: &'a mut KvSize,
}

impl<'a, K, V> OccupiedEntry<'a, K, V>
where
K: EstimateSize + Ord,
V: EstimateSize,
{
pub fn key(&self) -> &K {
self.inner.key()
}

pub fn remove_entry(self) -> (K, V) {
let (key, value) = self.inner.remove_entry();
self.heap_size.sub(&key, &value);
(key, value)
}
}

pub struct ExtractIf<'a, K, V, F>
where
F: FnMut(&K, &mut V) -> bool,
{
inner: std::collections::btree_map::ExtractIf<'a, K, V, F>,
heap_size: &'a mut KvSize,
}

impl<'a, K, V, F> Iterator for ExtractIf<'a, K, V, F>
where
K: EstimateSize,
V: EstimateSize,
F: FnMut(&K, &mut V) -> bool,
{
type Item = (K, V);

fn next(&mut self) -> Option<Self::Item> {
let (key, value) = self.inner.next()?;
self.heap_size.sub(&key, &value);
Some((key, value))
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

#[cfg(test)]
mod tests {
use super::EstimatedBTreeMap;
Expand Down
1 change: 1 addition & 0 deletions src/common/estimate_size/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(allocator_api)]
#![feature(btree_cursors)]
#![feature(btree_extract_if)]

pub mod collections;

Expand Down
19 changes: 0 additions & 19 deletions src/stream/src/common/cache/mod.rs

This file was deleted.

Loading
Loading