Skip to content

Commit

Permalink
fxi
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Nov 2, 2023
1 parent 6157f54 commit ae5abca
Showing 1 changed file with 16 additions and 18 deletions.
34 changes: 16 additions & 18 deletions src/stream/src/executor/managed_state/join/join_row_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::mem;
use auto_enums::auto_enum;
use enum_as_inner::EnumAsInner;

const MAX_VEC_SIZE: usize = 4;

#[derive(Debug, EnumAsInner)]
pub enum JoinRowSet<K, V> {
BTree(BTreeMap<K, V>),
Expand All @@ -28,26 +30,18 @@ pub enum JoinRowSet<K, V> {

impl<K, V> Default for JoinRowSet<K, V> {
fn default() -> Self {
Self::Vec(vec![])
Self::Vec(Vec::new())
}
}

#[derive(Debug)]
#[allow(dead_code)]
pub struct VecOccupiedError<'a, K, V> {
key: &'a K,
old_value: &'a V,
new_value: V,
}

impl<'a, K: Debug, V: Debug> Debug for VecOccupiedError<'a, K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("VecOccupiedError")
.field("key", &self.key)
.field("old_value", &self.old_value)
.field("new_value", &self.new_value)
.finish()
}
}

#[derive(Debug)]
pub enum JoinRowSetOccupiedError<'a, K: Ord, V> {
BTree(BTreeMapOccupiedError<'a, K, V>),
Expand All @@ -60,8 +54,6 @@ impl<K: Ord + Debug, V: Debug> JoinRowSet<K, V> {
key: K,
value: V,
) -> Result<&'_ mut V, JoinRowSetOccupiedError<'_, K, V>> {
const MAX_VEC_SIZE: usize = 4;

if let Self::Vec(inner) = self && inner.len() >= MAX_VEC_SIZE{
let btree = BTreeMap::from_iter(inner.drain(..));
mem::swap(self, &mut Self::BTree(btree));
Expand All @@ -73,27 +65,33 @@ impl<K: Ord + Debug, V: Debug> JoinRowSet<K, V> {
.map_err(JoinRowSetOccupiedError::BTree),
Self::Vec(inner) => {
if let Some(pos) = inner.iter().position(|elem| elem.0 == key) {
return Err(JoinRowSetOccupiedError::Vec(VecOccupiedError {
Err(JoinRowSetOccupiedError::Vec(VecOccupiedError {
key: &inner[pos].0,
old_value: &inner[pos].1,
new_value: value,
}));
}))
} else {
inner.push((key, value));
return Ok(&mut inner.last_mut().unwrap().1);
Ok(&mut inner.last_mut().unwrap().1)
}
}
}
}

pub fn remove(&mut self, key: &K) -> Option<V> {
match self {
let ret = match self {
Self::BTree(inner) => inner.remove(key),
Self::Vec(inner) => inner
.iter()
.position(|elem| &elem.0 == key)
.map(|pos| inner.remove(pos).1),
.map(|pos| inner.swap_remove(pos).1),
};
if let Self::BTree(inner) = self && inner.len() <= MAX_VEC_SIZE / 2 {
let btree = mem::take(inner);
let vec = Vec::from_iter(btree);
mem::swap(self, &mut Self::Vec(vec));
}
ret
}

pub fn len(&self) -> usize {
Expand Down

0 comments on commit ae5abca

Please sign in to comment.