diff --git a/src/stream/src/executor/managed_state/join/join_row_set.rs b/src/stream/src/executor/managed_state/join/join_row_set.rs index 1dd20148bc3e9..670b0226ae211 100644 --- a/src/stream/src/executor/managed_state/join/join_row_set.rs +++ b/src/stream/src/executor/managed_state/join/join_row_set.rs @@ -20,6 +20,8 @@ use std::mem; use auto_enums::auto_enum; use enum_as_inner::EnumAsInner; +const MAX_VEC_SIZE: usize = 4; + #[derive(Debug, EnumAsInner)] pub enum JoinRowSet { BTree(BTreeMap), @@ -28,26 +30,18 @@ pub enum JoinRowSet { impl Default for JoinRowSet { fn default() -> Self { - Self::Vec(vec![]) + Self::Vec(Vec::new()) } } +#[derive(Debug)] +#[allow(dead_code)] pub struct VecOccupiedError<'a, K, V> { key: &'a K, old_value: &'a V, new_value: V, } -impl<'a, K: Debug, V: Debug> Debug for VecOccupiedError<'a, K, V> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("VecOccupiedError") - .field("key", &self.key) - .field("old_value", &self.old_value) - .field("new_value", &self.new_value) - .finish() - } -} - #[derive(Debug)] pub enum JoinRowSetOccupiedError<'a, K: Ord, V> { BTree(BTreeMapOccupiedError<'a, K, V>), @@ -60,8 +54,6 @@ impl JoinRowSet { key: K, value: V, ) -> Result<&'_ mut V, JoinRowSetOccupiedError<'_, K, V>> { - const MAX_VEC_SIZE: usize = 4; - if let Self::Vec(inner) = self && inner.len() >= MAX_VEC_SIZE{ let btree = BTreeMap::from_iter(inner.drain(..)); mem::swap(self, &mut Self::BTree(btree)); @@ -73,27 +65,33 @@ impl JoinRowSet { .map_err(JoinRowSetOccupiedError::BTree), Self::Vec(inner) => { if let Some(pos) = inner.iter().position(|elem| elem.0 == key) { - return Err(JoinRowSetOccupiedError::Vec(VecOccupiedError { + Err(JoinRowSetOccupiedError::Vec(VecOccupiedError { key: &inner[pos].0, old_value: &inner[pos].1, new_value: value, - })); + })) } else { inner.push((key, value)); - return Ok(&mut inner.last_mut().unwrap().1); + Ok(&mut inner.last_mut().unwrap().1) } } } } pub fn remove(&mut self, key: &K) -> Option { - match self { + let ret = match self { Self::BTree(inner) => inner.remove(key), Self::Vec(inner) => inner .iter() .position(|elem| &elem.0 == key) - .map(|pos| inner.remove(pos).1), + .map(|pos| inner.swap_remove(pos).1), + }; + if let Self::BTree(inner) = self && inner.len() <= MAX_VEC_SIZE / 2 { + let btree = mem::take(inner); + let vec = Vec::from_iter(btree); + mem::swap(self, &mut Self::Vec(vec)); } + ret } pub fn len(&self) -> usize {