Skip to content

Commit

Permalink
refactor(common): eliminate unsafe for iteration with estimated siz…
Browse files Browse the repository at this point in the history
…es by atomics (#12565)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 10, 2023
1 parent 95e0136 commit 7113abd
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 72 deletions.
7 changes: 4 additions & 3 deletions src/common/src/estimate_size/collections/hashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::ops::Deref;

use super::{MutGuard, UnsafeMutGuard};
use super::{AtomicMutGuard, MutGuard};
use crate::estimate_size::{EstimateSize, KvSize};

pub struct EstimatedHashMap<K, V> {
Expand Down Expand Up @@ -63,10 +63,11 @@ where
.map(|v| MutGuard::new(v, &mut self.heap_size))
}

pub fn values_mut(&mut self) -> impl Iterator<Item = UnsafeMutGuard<V>> + '_ {
pub fn values_mut(&mut self) -> impl Iterator<Item = AtomicMutGuard<'_, V>> + '_ {
let heap_size = &self.heap_size;
self.inner
.values_mut()
.map(|v| UnsafeMutGuard::new(v, &mut self.heap_size))
.map(move |v| AtomicMutGuard::new(v, heap_size))
}

pub fn drain(&mut self) -> impl Iterator<Item = (K, V)> + '_ {
Expand Down
17 changes: 11 additions & 6 deletions src/common/src/estimate_size/collections/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::hash::{BuildHasher, Hash};

use lru::{DefaultHasher, KeyRef, LruCache};

use super::{MutGuard, UnsafeMutGuard};
use super::{AtomicMutGuard, MutGuard};
use crate::estimate_size::{EstimateSize, KvSize};

/// The managed cache is a lru cache that bounds the memory usage by epoch.
Expand Down Expand Up @@ -64,11 +64,6 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
v.map(|inner| MutGuard::new(inner, &mut self.kv_heap_size))
}

pub fn get_mut_unsafe(&mut self, k: &K) -> Option<UnsafeMutGuard<V>> {
let v = self.inner.get_mut(k);
v.map(|inner| UnsafeMutGuard::new(inner, &mut self.kv_heap_size))
}

pub fn get<Q>(&mut self, k: &Q) -> Option<&V>
where
KeyRef<K>: Borrow<Q>,
Expand All @@ -77,6 +72,16 @@ impl<K: Hash + Eq + EstimateSize, V: EstimateSize, S: BuildHasher, A: Clone + Al
self.inner.get(k)
}

pub fn iter_mut(
&mut self,
) -> impl ExactSizeIterator<Item = (&'_ K, AtomicMutGuard<'_, V>)> + '_ {
let kv_heap_size = &self.kv_heap_size;
self.inner.iter_mut().map(move |(k, v)| {
let guard = AtomicMutGuard::new(v, kv_heap_size);
(k, guard)
})
}

pub fn peek_mut(&mut self, k: &K) -> Option<MutGuard<'_, V>> {
let v = self.inner.peek_mut(k);
v.map(|inner| MutGuard::new(inner, &mut self.kv_heap_size))
Expand Down
101 changes: 60 additions & 41 deletions src/common/src/estimate_size/collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;

use super::{EstimateSize, KvSize};

Expand All @@ -25,16 +24,55 @@ pub use vecdeque::EstimatedVecDeque as VecDeque;
pub mod hashmap;
pub use hashmap::EstimatedHashMap as HashMap;

pub struct MutGuard<'a, V: EstimateSize> {
mod private {
use super::*;

/// A trait that dispatches the size update method regarding the mutability of the reference
/// to the [`KvSize`].
pub trait GenericKvSize {
fn update_size(&mut self, from: usize, to: usize);
}

/// For mutable references, the size can be directly updated.
impl GenericKvSize for &'_ mut KvSize {
fn update_size(&mut self, from: usize, to: usize) {
self.add_size(to);
self.sub_size(from);
}
}

/// For immutable references, the size is updated atomically.
impl GenericKvSize for &'_ KvSize {
fn update_size(&mut self, from: usize, to: usize) {
self.update_size_atomic(from, to)
}
}
}

/// A guard holding a mutable reference to a value in a collection. When dropped, the size of the
/// collection will be updated.
pub struct MutGuard<'a, V, S = &'a mut KvSize>
where
V: EstimateSize,
S: private::GenericKvSize,
{
inner: &'a mut V,
// The size of the original value
original_val_size: usize,
// The total size of a collection
total_size: &'a mut KvSize,
total_size: S,
}

impl<'a, V: EstimateSize> MutGuard<'a, V> {
pub fn new(inner: &'a mut V, total_size: &'a mut KvSize) -> Self {
/// Similar to [`MutGuard`], but the size is updated atomically. Useful for creating shared
/// references to the entries in a collection.
pub type AtomicMutGuard<'a, V> = MutGuard<'a, V, &'a KvSize>;

impl<'a, V, S> MutGuard<'a, V, S>
where
V: EstimateSize,
S: private::GenericKvSize,
{
pub fn new(inner: &'a mut V, total_size: S) -> Self {
let original_val_size = inner.estimated_size();
Self {
inner,
Expand All @@ -44,54 +82,35 @@ impl<'a, V: EstimateSize> MutGuard<'a, V> {
}
}

impl<'a, V: EstimateSize> Drop for MutGuard<'a, V> {
impl<'a, V, S> Drop for MutGuard<'a, V, S>
where
V: EstimateSize,
S: private::GenericKvSize,
{
fn drop(&mut self) {
self.total_size.add_size(self.inner.estimated_size());
self.total_size.sub_size(self.original_val_size);
self.total_size
.update_size(self.original_val_size, self.inner.estimated_size());
}
}

impl<'a, V: EstimateSize> Deref for MutGuard<'a, V> {
impl<'a, V, S> Deref for MutGuard<'a, V, S>
where
V: EstimateSize,
S: private::GenericKvSize,
{
type Target = V;

fn deref(&self) -> &Self::Target {
self.inner
}
}

impl<'a, V: EstimateSize> DerefMut for MutGuard<'a, V> {
impl<'a, V, S> DerefMut for MutGuard<'a, V, S>
where
V: EstimateSize,
S: private::GenericKvSize,
{
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner
}
}

pub struct UnsafeMutGuard<V: EstimateSize> {
inner: NonNull<V>,
// The size of the original value
original_val_size: usize,
// The total size of a collection
total_size: NonNull<KvSize>,
}

impl<V: EstimateSize> UnsafeMutGuard<V> {
pub fn new(inner: &mut V, total_size: &mut KvSize) -> Self {
let original_val_size = inner.estimated_size();
Self {
inner: inner.into(),
original_val_size,
total_size: total_size.into(),
}
}

/// # Safety
///
/// 1. Only 1 `MutGuard` should be held for each value.
/// 2. The returned `MutGuard` should not be moved to other threads.
pub unsafe fn as_mut_guard<'a>(&mut self) -> MutGuard<'a, V> {
MutGuard {
inner: self.inner.as_mut(),
original_val_size: self.original_val_size,
total_size: self.total_size.as_mut(),
}
}
}
54 changes: 38 additions & 16 deletions src/common/src/estimate_size/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod collections;

use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};

use bytes::Bytes;
use fixedbitset::FixedBitSet;
Expand Down Expand Up @@ -186,54 +187,75 @@ impl<T: EstimateSize> IntoIterator for VecWithKvSize<T> {
}
}

#[derive(Default, Clone)]
pub struct KvSize(usize);
/// The size of the collection.
///
/// We use an atomic value here to enable operating the size without a mutable reference.
/// See [`collections::AtomicMutGuard`] for more details.
///
/// In the most cases, we have the mutable reference of this struct, so we can directly
/// operate the underlying value.
#[derive(Default)]
pub struct KvSize(AtomicUsize);

/// Clone the [`KvSize`] will duplicate the underlying value.
impl Clone for KvSize {
fn clone(&self) -> Self {
Self(self.size().into())
}
}

impl KvSize {
pub fn new() -> Self {
Self(0)
Self(0.into())
}

pub fn with_size(size: usize) -> Self {
Self(size)
Self(size.into())
}

pub fn add<K: EstimateSize, V: EstimateSize>(&mut self, key: &K, val: &V) {
self.0 = self
.0
.saturating_add(key.estimated_size() + val.estimated_size());
self.add_size(key.estimated_size());
self.add_size(val.estimated_size());
}

pub fn sub<K: EstimateSize, V: EstimateSize>(&mut self, key: &K, val: &V) {
self.0 = self
.0
.saturating_sub(key.estimated_size() + val.estimated_size());
self.sub_size(key.estimated_size());
self.sub_size(val.estimated_size());
}

/// Add the size of `val` and return it.
pub fn add_val<V: EstimateSize>(&mut self, val: &V) -> usize {
let size = val.estimated_size();
self.0 = self.0.saturating_add(size);
self.add_size(size);
size
}

pub fn sub_val<V: EstimateSize>(&mut self, val: &V) {
self.0 = self.0.saturating_sub(val.estimated_size());
self.sub_size(val.estimated_size());
}

pub fn add_size(&mut self, size: usize) {
self.0 += size;
let this = self.0.get_mut(); // get the underlying value since we have a mutable reference
*this = this.saturating_add(size);
}

pub fn sub_size(&mut self, size: usize) {
self.0 -= size;
let this = self.0.get_mut(); // get the underlying value since we have a mutable reference
*this = this.saturating_sub(size);
}

/// Update the size of the collection by `to - from` atomically, i.e., without a mutable reference.
pub fn update_size_atomic(&self, from: usize, to: usize) {
let _ = (self.0).fetch_update(Ordering::Relaxed, Ordering::Relaxed, |this| {
Some(this.saturating_add(to).saturating_sub(from))
});
}

pub fn set(&mut self, size: usize) {
self.0 = size;
self.0 = size.into();
}

pub fn size(&self) -> usize {
self.0
self.0.load(Ordering::Relaxed)
}
}
7 changes: 1 addition & 6 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
} else {
// emit on update
// TODO(wrj,rc): we may need to parallelize it and set a reasonable concurrency limit.
for mut agg_group in vars
.dirty_groups
.values_mut()
// SAFETY: we access the values in `dirty_groups` one by one, so it's safe to deref the `UnsafeMutGuard`
.map(|mut g| unsafe { g.as_mut_guard() })
{
for mut agg_group in vars.dirty_groups.values_mut() {
let agg_group = agg_group.as_mut();
let change = agg_group
.build_change(&this.storages, &this.agg_funcs)
Expand Down

0 comments on commit 7113abd

Please sign in to comment.