Skip to content

Commit

Permalink
Merge pull request #30 from orxfun/support-for-concurrency
Browse files Browse the repository at this point in the history
Support for Concurrency
  • Loading branch information
orxfun authored Mar 24, 2024
2 parents be66b4e + 512b0be commit 14c0f68
Show file tree
Hide file tree
Showing 17 changed files with 944 additions and 84 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orx-split-vec"
version = "2.6.0"
version = "2.7.0"
edition = "2021"
authors = ["orxfun <[email protected]>"]
description = "An efficient constant access time vector with dynamic capacity and pinned elements."
Expand All @@ -10,7 +10,7 @@ keywords = ["vec", "array", "split", "fragments", "pinned"]
categories = ["data-structures", "rust-patterns"]

[dependencies]
orx-pinned-vec = "2.5"
orx-pinned-vec = "2.6"

[[bench]]
name = "serial_access"
Expand Down
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ An efficient constant access time vector with dynamic capacity and pinned elemen

There are various situations where pinned elements are necessary.

* It is critical in enabling **efficient, convenient and safe self-referential collections** with thin references, see [`SelfRefCol`](https://crates.io/crates/orx-selfref-col) for details.
* It is essential in allowing an **immutable push** vector; i.e., [`ImpVec`](https://crates.io/crates/orx-imp-vec). This is a very useful operation when the desired collection is a bag or a container of things, rather than having a collective meaning. In such cases, `ImpVec` avoids heap allocations and wide pointers such as `Box` or `Rc` or etc.
* It is important for **async** code; following [blog](https://blog.cloudflare.com/pin-and-unpin-in-rust) could be useful for the interested.

*As explained in [rust-docs](https://doc.rust-lang.org/std/pin/index.html), there exist `Pin` and `Unpin` types for similar purposes. However, the solution is complicated and low level using `PhantomPinned`, `NonNull`, `dangling`, `Box::pin`, pointer accesses, etc.*
* It is critical in enabling **efficient, convenient and safe self-referential collections** with thin references, see [`SelfRefCol`](https://crates.io/crates/orx-selfref-col) for details, and its special cases such as [`LinkedList`](https://crates.io/crates/orx-linked-list).
* It is essential in allowing an **immutable push** vector; i.e., [`ImpVec`](https://crates.io/crates/orx-imp-vec). This is a very useful operation when the desired collection is a bag or a container of things, rather than having a collective meaning. In such cases, `ImpVec` allows avoiding certain borrow checker complexities, heap allocations and wide pointers such as `Box` or `Rc` or etc.
* It is important for **concurrent** programs since it eliminates safety concerns related with elements implicitly carried to different memory locations. This helps reducing and dealing with the complexity of concurrency. [`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) is a very simplistic and efficient concurrent data structure built on top of pinned vector guarantees.

## B. Comparison with `FixedVec`

[`FixedVec`](https://crates.io/crates/orx-fixed-vec) is another [`PinnedVec`](https://crates.io/crates/orx-pinned-vec) implementation aiming the same goal but with different features. You may see the comparison in the table below.

| **`FixedVec`** | **`SplitVec`** |
|------------------------------------------------------------------------------|----------------------------------------------------------------------------------|
| Implements `PinnedVec` => can be wrapped by an `ImpVec` or `SelfRefCol`. | Implements `PinnedVec` => can be wrapped by an `ImpVec` or `SelfRefCol`. |
| Implements `PinnedVec` => can be wrapped by an `ImpVec` or `SelfRefCol` or `ConcurrentBag`. | Implements `PinnedVec` => can as well be wrapped by them. |
| Requires exact capacity to be known while creating. | Can be created with any level of prior information about required capacity. |
| Cannot grow beyond capacity; panics when `push` is called at capacity. | Can grow dynamically. Further, it provides control on how it must grow. |
| It is just a wrapper around `std::vec::Vec`; hence, has equivalent performance. | Performance-optimized built-in growth strategies also have `std::vec::Vec` equivalent performance. |
Expand Down
58 changes: 45 additions & 13 deletions src/common_traits/clone.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,56 @@
use crate::{Growth, SplitVec};
use orx_pinned_vec::PinnedVec;

impl<T, G> Clone for SplitVec<T, G>
where
T: Clone,
G: Growth,
{
fn clone(&self) -> Self {
let fragments: Vec<_> = self
.fragments
.iter()
.map(|fragment| {
let mut vec = Vec::with_capacity(fragment.capacity());
vec.extend_from_slice(fragment);
vec.into()
})
.collect();
Self {
fragments,
len: self.len,
growth: self.growth.clone(),
let mut fragments = Vec::with_capacity(self.fragments.capacity());

for fragment in &self.fragments {
let mut vec = Vec::with_capacity(fragment.capacity());
vec.extend_from_slice(fragment);
fragments.push(vec.into());
}

Self::from_raw_parts(self.len(), fragments, self.growth().clone())
}
}

#[cfg(test)]
mod tests {
use crate::*;

#[test]
fn clone() {
fn test<G: Growth>(mut vec: SplitVec<usize, G>) {
for i in 0..168 {
vec.push(i);
}

let clone = vec.clone();

assert_eq!(vec.len(), clone.len());
assert_eq!(vec.fragments().len(), clone.fragments().len());
assert_eq!(vec.capacity(), clone.capacity());
assert_eq!(vec.capacity_state(), clone.capacity_state());
assert_eq!(
vec.maximum_concurrent_capacity(),
clone.maximum_concurrent_capacity()
);

for (a, b) in vec.fragments().iter().zip(clone.fragments().iter()) {
assert_eq!(a.len(), b.len());
assert_eq!(a.capacity(), b.capacity());

for (x, y) in a.iter().zip(b.iter()) {
assert_eq!(x, y);
}
}
}

test_all_growth_types!(test);
}
}
24 changes: 24 additions & 0 deletions src/fragment/fragment_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,28 @@ impl<T> Fragment<T> {
pub fn room(&self) -> usize {
self.data.capacity() - self.data.len()
}

// helpers
pub(crate) fn fragments_with_default_capacity() -> Vec<Fragment<T>> {
Vec::new()
}

pub(crate) fn into_fragments(self) -> Vec<Fragment<T>> {
let mut fragments = Self::fragments_with_default_capacity();
fragments.push(self);
fragments
}

pub(crate) fn fragments_with_capacity(fragments_capacity: usize) -> Vec<Fragment<T>> {
Vec::with_capacity(fragments_capacity)
}

pub(crate) fn into_fragments_with_capacity(
self,
fragments_capacity: usize,
) -> Vec<Fragment<T>> {
let mut fragments = Self::fragments_with_capacity(fragments_capacity);
fragments.push(self);
fragments
}
}
201 changes: 196 additions & 5 deletions src/growth/doubling/doubling_growth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,35 @@ impl Growth for Doubling {
unsafe fn get_ptr_mut<T>(&self, fragments: &mut [Fragment<T>], index: usize) -> Option<*mut T> {
<Self as GrowthWithConstantTimeAccess>::get_ptr_mut(self, fragments, index)
}

fn maximum_concurrent_capacity<T>(
&self,
fragments: &[Fragment<T>],
fragments_capacity: usize,
) -> usize {
assert!(fragments_capacity >= fragments.len());

CUMULATIVE_CAPACITIES[fragments_capacity]
}

/// Returns the number of fragments with this growth strategy in order to be able to reach a capacity of `maximum_capacity` of elements.
///
/// This method is relevant and useful for concurrent programs, which helps in avoiding the fragments to allocate.
///
/// # Panics
///
/// Panics if `maximum_capacity` is greater than sum { 2^f | for f in 2..34 }.
fn required_fragments_len<T>(&self, _: &[Fragment<T>], maximum_capacity: usize) -> usize {
assert!(maximum_capacity <= CUMULATIVE_CAPACITIES[32]);

for (f, capacity) in CUMULATIVE_CAPACITIES.iter().enumerate() {
if maximum_capacity <= *capacity {
return f;
}
}

usize::MAX
}
}

impl GrowthWithConstantTimeAccess for Doubling {
Expand Down Expand Up @@ -149,16 +178,33 @@ impl<T> SplitVec<T, Doubling> {
/// assert_eq!(vec.fragments().last().map(|f| f.len()), Some(1));
/// ```
pub fn with_doubling_growth() -> Self {
Self {
fragments: vec![Fragment::new(FIRST_FRAGMENT_CAPACITY)],
growth: Doubling,
len: 0,
}
let fragments = Fragment::new(FIRST_FRAGMENT_CAPACITY).into_fragments();
Self::from_raw_parts(0, fragments, Doubling)
}

/// Creates a new split vector with `Doubling` growth and initial `fragments_capacity`.
///
/// This method differs from [`SplitVec::with_doubling_growth`] only by the pre-allocation of fragments collection.
/// Note that this (only) important for concurrent programs:
/// * SplitVec already keeps all elements pinned to their locations;
/// * Creating a buffer for storing the meta information is important for keeping the meta information pinned as well.
/// This is relevant and important for concurrent programs.
///
/// # Panics
///
/// Panics if `fragments_capacity == 0`.
pub fn with_doubling_growth_and_fragments_capacity(fragments_capacity: usize) -> Self {
assert!(fragments_capacity > 0);
let fragments =
Fragment::new(FIRST_FRAGMENT_CAPACITY).into_fragments_with_capacity(fragments_capacity);
Self::from_raw_parts(0, fragments, Doubling)
}
}

#[cfg(test)]
mod tests {
use orx_pinned_vec::{PinnedVec, PinnedVecGrowthError};

use super::*;

#[test]
Expand Down Expand Up @@ -216,4 +262,149 @@ mod tests {
assert_eq!(None, get_none(index));
}
}

#[test]
fn maximum_concurrent_capacity() {
fn max_cap<T>(vec: &SplitVec<T, Doubling>) -> usize {
vec.growth()
.maximum_concurrent_capacity(vec.fragments(), vec.fragments.capacity())
}

let mut vec: SplitVec<char, Doubling> = SplitVec::with_doubling_growth();
assert_eq!(max_cap(&vec), 4 + 8 + 16 + 32);

let until = max_cap(&vec);
for _ in 0..until {
vec.push('x');
assert_eq!(max_cap(&vec), 4 + 8 + 16 + 32);
}

// fragments allocate beyond max_cap
vec.push('x');
assert_eq!(max_cap(&vec), 4 + 8 + 16 + 32 + 64 + 128 + 256 + 512);
}

#[test]
fn with_doubling_growth() {
let mut vec: SplitVec<char, _> = SplitVec::with_doubling_growth();

assert_eq!(4, vec.fragments.capacity());

for _ in 0..100_000 {
vec.push('x');
}

assert!(vec.fragments.capacity() > 4);

let mut vec: SplitVec<char, _> = SplitVec::with_doubling_growth();
let result = unsafe { vec.grow_to(100_000) };
assert!(result.is_ok());
assert!(result.expect("is-ok") >= 100_000);
}

#[test]
fn with_doubling_growth_and_fragments_capacity_normal_growth() {
let mut vec: SplitVec<char, _> = SplitVec::with_doubling_growth_and_fragments_capacity(1);

assert_eq!(1, vec.fragments.capacity());

for _ in 0..100_000 {
vec.push('x');
}

assert!(vec.fragments.capacity() > 4);
}

#[test]
fn with_doubling_growth_and_fragments_capacity_concurrent_grow_never() {
let mut vec: SplitVec<char, _> = SplitVec::with_doubling_growth_and_fragments_capacity(1);

assert!(!vec.can_concurrently_add_fragment());

let result = unsafe { vec.concurrently_grow_to(vec.capacity() + 1) };
assert_eq!(
result,
Err(PinnedVecGrowthError::FailedToGrowWhileKeepingElementsPinned)
);
}

#[test]
fn with_doubling_growth_and_fragments_capacity_concurrent_grow_once() {
let mut vec: SplitVec<char, _> = SplitVec::with_doubling_growth_and_fragments_capacity(2);

assert!(vec.can_concurrently_add_fragment());

let next_capacity = vec.capacity() + vec.growth().new_fragment_capacity(vec.fragments());

let result = unsafe { vec.concurrently_grow_to(vec.capacity() + 1) };
assert_eq!(result, Ok(next_capacity));

assert!(!vec.can_concurrently_add_fragment());

let result = unsafe { vec.concurrently_grow_to(vec.capacity() + 1) };
assert_eq!(
result,
Err(PinnedVecGrowthError::FailedToGrowWhileKeepingElementsPinned)
);
}

#[test]
fn with_doubling_growth_and_fragments_capacity_concurrent_grow_twice() {
// when possible
let mut vec: SplitVec<char, _> = SplitVec::with_doubling_growth_and_fragments_capacity(3);

assert!(vec.can_concurrently_add_fragment());

let fragment_2_capacity = vec.growth().new_fragment_capacity(vec.fragments());
let fragment_3_capacity = fragment_2_capacity * 2;
let new_capacity = vec.capacity() + fragment_2_capacity + fragment_3_capacity;

let result = unsafe { vec.concurrently_grow_to(new_capacity - 1) };
assert_eq!(result, Ok(new_capacity));

assert!(!vec.can_concurrently_add_fragment());

let result = unsafe { vec.concurrently_grow_to(vec.capacity() + 1) };
assert_eq!(
result,
Err(PinnedVecGrowthError::FailedToGrowWhileKeepingElementsPinned)
);

// when not possible
let mut vec: SplitVec<char, _> = SplitVec::with_doubling_growth_and_fragments_capacity(2);

assert!(vec.can_concurrently_add_fragment()); // although we can add one fragment

let result = unsafe { vec.concurrently_grow_to(new_capacity - 1) }; // we cannot add two
assert_eq!(
result,
Err(PinnedVecGrowthError::FailedToGrowWhileKeepingElementsPinned)
);
}

#[test]
#[should_panic]
fn with_doubling_growth_and_fragments_capacity_zero() {
let _: SplitVec<char, _> = SplitVec::with_doubling_growth_and_fragments_capacity(0);
}

#[test]
fn required_fragments_len() {
let vec: SplitVec<char, Doubling> = SplitVec::with_doubling_growth();
let num_fragments = |max_cap| {
vec.growth()
.required_fragments_len(vec.fragments(), max_cap)
};

// 4 - 12 - 28 - 60 - 124
assert_eq!(num_fragments(0), 0);
assert_eq!(num_fragments(1), 1);
assert_eq!(num_fragments(4), 1);
assert_eq!(num_fragments(5), 2);
assert_eq!(num_fragments(12), 2);
assert_eq!(num_fragments(13), 3);
assert_eq!(num_fragments(36), 4);
assert_eq!(num_fragments(67), 5);
assert_eq!(num_fragments(136), 6);
}
}
6 changes: 1 addition & 5 deletions src/growth/doubling/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ impl<T: Clone> From<Vec<T>> for SplitVec<T, Doubling> {
curr_f += 1;
}

Self {
fragments,
growth: Doubling,
len: 123,
}
Self::from_raw_parts(len, fragments, Doubling)
}
}
Loading

0 comments on commit 14c0f68

Please sign in to comment.