Skip to content

Commit

Permalink
Merge pull request #56 from orxfun/reserve-initialized-maximum-capacity
Browse files Browse the repository at this point in the history
reserve initialized maximum capacity
  • Loading branch information
orxfun authored Aug 28, 2024
2 parents 963f38b + 0728260 commit afb81c9
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 21 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "orx-split-vec"
version = "3.6.0"
version = "3.7.0"
edition = "2021"
authors = ["orxfun <[email protected]>"]
description = "An efficient constant access time vector with dynamic capacity and pinned elements."
Expand All @@ -11,7 +11,7 @@ categories = ["data-structures", "rust-patterns"]

[dependencies]
orx-pseudo-default = "1.4"
orx-pinned-vec = "3.6"
orx-pinned-vec = "3.7"

[[bench]]
name = "serial_access"
Expand Down
8 changes: 4 additions & 4 deletions src/algorithms/in_place_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub fn in_place_sort_by<T, F>(fragments: &mut [Fragment<T>], mut compare: F)
where
F: FnMut(&T, &T) -> Ordering,
{
if fragments.len() == 0 {
if fragments.is_empty() {
return;
}

Expand Down Expand Up @@ -60,9 +60,9 @@ where
true => None,
false => {
let mut best = &fragments[r_best][0];
for q in (r_best + 1)..fragments.len() {
if let Less = compare(&fragments[q][0], best) {
(best, r_best) = (&fragments[q][0], q);
for (q, fragment) in fragments.iter().enumerate().skip(r_best + 1) {
if let Less = compare(&fragment[0], best) {
(best, r_best) = (&fragment[0], q);
}
}

Expand Down
46 changes: 32 additions & 14 deletions src/concurrent_pinned_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct FragmentData {
pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
growth: G,
data: Vec<UnsafeCell<*mut T>>,
num_fragments: AtomicUsize,
capacity: AtomicUsize,
maximum_capacity: usize,
max_num_fragments: usize,
Expand All @@ -30,7 +29,7 @@ pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
fn drop(&mut self) {
let mut take_fragment = |_fragment: Fragment<T>| {};
unsafe { self.into_fragments(self.pinned_vec_len, &mut take_fragment) };
unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
self.zero();
}
}
Expand All @@ -51,15 +50,15 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
}

fn layout(len: usize) -> std::alloc::Layout {
std::alloc::Layout::array::<T>(len).unwrap()
std::alloc::Layout::array::<T>(len).expect("len must not overflow")
}

unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
let ptr = *self.data[data.f].get();
fragment_from_raw(ptr, data.len, data.capacity)
}

unsafe fn into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
where
F: FnMut(Fragment<T>),
{
Expand Down Expand Up @@ -116,12 +115,23 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
}

fn zero(&mut self) {
self.num_fragments = 0.into();
self.capacity = 0.into();
self.maximum_capacity = 0;
self.max_num_fragments = 0;
self.pinned_vec_len = 0;
}

fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
match capacity {
0 => 0,
_ => {
self.growth
.get_fragment_and_inner_indices_unchecked(capacity - 1)
.0
+ 1
}
}
}
}

impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
Expand All @@ -144,7 +154,7 @@ impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSpli
total_len += len;
maximum_capacity += cap;

data.push(UnsafeCell::new(p as *mut T));
data.push(UnsafeCell::new(p));
}
assert_eq!(total_len, pinned_vec_len);
let capacity = maximum_capacity;
Expand All @@ -159,7 +169,6 @@ impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSpli
Self {
growth,
data,
num_fragments: num_fragments.into(),
capacity: capacity.into(),
maximum_capacity,
max_num_fragments,
Expand All @@ -174,7 +183,7 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
unsafe fn into_inner(mut self, len: usize) -> Self::P {
let mut fragments = Vec::with_capacity(self.max_num_fragments);
let mut take_fragment = |fragment| fragments.push(fragment);
self.into_fragments(len, &mut take_fragment);
self.process_into_fragments(len, &mut take_fragment);

self.zero();
SplitVec::from_raw_parts(len, fragments, self.growth.clone())
Expand Down Expand Up @@ -355,8 +364,7 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
match new_capacity <= capacity {
true => Ok(capacity),
false => {
let mut f = self.num_fragments.load(Ordering::Relaxed);

let mut f = self.num_fragments_for_capacity(capacity);
let mut current_capacity = capacity;

while new_capacity > current_capacity {
Expand All @@ -369,7 +377,6 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
current_capacity += new_fragment_capacity;
}

self.num_fragments.store(f, Ordering::SeqCst);
self.capacity.store(current_capacity, Ordering::Release);

Ok(current_capacity)
Expand All @@ -389,7 +396,7 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
match new_capacity <= capacity {
true => Ok(capacity),
false => {
let mut f = self.num_fragments.load(Ordering::Relaxed);
let mut f = self.num_fragments_for_capacity(capacity);

let mut current_capacity = capacity;

Expand All @@ -408,7 +415,6 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
current_capacity += new_fragment_capacity;
}

self.num_fragments.store(f, Ordering::SeqCst);
self.capacity.store(current_capacity, Ordering::Release);

Ok(current_capacity)
Expand Down Expand Up @@ -468,13 +474,25 @@ impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSp
self.maximum_capacity
}

unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
&mut self,
current_len: usize,
new_maximum_capacity: usize,
_fill_with: F,
) -> usize
where
F: Fn() -> T,
{
self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity)
}

unsafe fn set_pinned_vec_len(&mut self, len: usize) {
self.pinned_vec_len = len;
}

unsafe fn clear(&mut self, len: usize) {
let mut take_fragment = |_fragment: Fragment<T>| {};
unsafe { self.into_fragments(len, &mut take_fragment) };
unsafe { self.process_into_fragments(len, &mut take_fragment) };
self.zero();

let max_num_fragments = self.data.len();
Expand Down
1 change: 0 additions & 1 deletion src/fragment/fragment_struct.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#[derive(Default)]
#[allow(clippy::include)]
/// A contagious fragment of the split vector.
///
/// Suppose a split vector contains 10 integers from 0 to 9.
Expand Down
96 changes: 96 additions & 0 deletions tests/con_pinned_vec_grow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,99 @@ fn con_pin_vec_grow_filled() {
test(SplitVec::with_doubling_growth_and_fragments_capacity(32));
test(SplitVec::with_linear_growth_and_fragments_capacity(10, 32));
}

#[test]
fn reserve() {
fn test<G: GrowthWithConstantTimeAccess>(vec: SplitVec<String, G>) {
let initial_capacity = vec.capacity();

let mut con_vec = vec.into_concurrent();
let max_cap = con_vec.max_capacity();

unsafe { con_vec.get_ptr_mut(0).write("first".to_string()) };

assert_eq!(con_vec.capacity(), initial_capacity);

unsafe { con_vec.reserve_maximum_concurrent_capacity(0, max_cap + 1) };
let new_capacity = con_vec.capacity();
assert_eq!(new_capacity, initial_capacity);
assert!(con_vec.max_capacity() >= max_cap + 1);

let vec = unsafe { con_vec.into_inner(1) };

assert_eq!(vec.len(), 1);
assert_eq!(vec.capacity(), initial_capacity);
assert_eq!(&vec[0], &"first".to_string());
}

test(SplitVec::with_doubling_growth_and_fragments_capacity(16));
test(SplitVec::with_linear_growth_and_fragments_capacity(10, 32));
}

#[test]
fn into_concurrent_fill_with() {
fn test<G: GrowthWithConstantTimeAccess>(vec: SplitVec<String, G>) {
let initial_capacity = vec.capacity();
let vec2 = vec.clone();

let con_vec = vec.into_concurrent_filled_with(|| "x".to_string());
let vec = unsafe { con_vec.into_inner(initial_capacity) };
assert_eq!(
vec,
(0..initial_capacity)
.map(|_| "x".to_string())
.collect::<Vec<_>>()
);

let mut vec = vec2;
vec.push("y".to_string());
let con_vec = vec.into_concurrent_filled_with(|| "x".to_string());
let vec = unsafe { con_vec.into_inner(initial_capacity) };
assert_eq!(&vec[0], &"y".to_string());
assert_eq!(
vec.iter().skip(1).cloned().collect::<Vec<_>>(),
(1..initial_capacity)
.map(|_| "x".to_string())
.collect::<Vec<_>>()
);
}
test(SplitVec::with_doubling_growth_and_fragments_capacity(32));
test(SplitVec::with_linear_growth_and_fragments_capacity(10, 32));
}

#[test]
fn reserve_fill_with() {
fn test<G: GrowthWithConstantTimeAccess>(vec: SplitVec<String, G>) {
let initial_capacity = vec.capacity();

let mut con_vec = vec.into_concurrent_filled_with(|| "x".to_string());
let max_cap = con_vec.max_capacity();

assert_eq!(con_vec.capacity(), initial_capacity);

unsafe {
con_vec.reserve_maximum_concurrent_capacity_fill_with(
initial_capacity,
max_cap + 1,
|| "y".to_string(),
)
};
let new_capacity = con_vec.capacity();
assert_eq!(new_capacity, initial_capacity);
assert!(con_vec.max_capacity() >= max_cap + 1);

let vec = unsafe { con_vec.into_inner(initial_capacity) };

assert_eq!(vec.len(), initial_capacity);
assert_eq!(vec.capacity(), initial_capacity);
assert_eq!(
vec,
(0..initial_capacity)
.map(|_| "x".to_string())
.collect::<Vec<_>>()
);
}

test(SplitVec::with_doubling_growth_and_fragments_capacity(16));
test(SplitVec::with_linear_growth_and_fragments_capacity(10, 32));
}

0 comments on commit afb81c9

Please sign in to comment.